Source code for plugins.chgrp
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2011-2015 Glencoe Software, Inc. All Rights Reserved.
# Use is subject to license terms supplied in LICENSE.txt
#
"""
chgrp plugin
Plugin read by omero.cli.Cli during initialization. The method(s)
defined here will be added to the Cli class for later use.
"""
from omero.cli import CLI, GraphControl, ExperimenterGroupArg
import sys
HELP = """Move data between groups
Move entire graphs of data based on the ID of the top-node.
Examples:
# In each case move an image to group 101
omero chgrp 101 Image:1
omero chgrp Group:101 Image:2
omero chgrp ExperimenterGroup:101 Image:3
# In both cases move five images to the group named "My Lab"
omero chgrp "My Lab" Image:51,52,53,54,56
omero chgrp "My Lab" Image:51-54,56
# Move a plate but leave all images in the original group
omero chgrp 201 Plate:1 --exclude Image
# Move all images contained under a project
omero chgrp 101 Project/Dataset/Image:53
# Move all images contained under two projects
omero chgrp 101 Project/Image:201,202
# Do a dry run of a move reporting the outcome if the move had been run
omero chgrp 101 Dataset:53 --dry-run
# Do a dry run of a move, reporting all the objects
# that would have been moved
omero chgrp 101 Dataset:53 --dry-run --report
"""
[docs]
class ChgrpControl(GraphControl):
[docs]
def cmd_type(self):
import omero
import omero.all
return omero.cmd.Chgrp2
def _pre_objects(self, parser):
parser.add_argument(
"grp", nargs="?", type=ExperimenterGroupArg,
help="""Group to move objects to""")
[docs]
def is_admin(self, client):
# check if the user currently logged is an admin
from omero.model.enums import AdminPrivilegeChgrp
ec = self.ctx.get_event_context()
return AdminPrivilegeChgrp in ec.adminPrivileges
def _process_request(self, req, args, client):
# Retrieve group id
gid = args.grp.lookup(client)
if gid is None:
self.ctx.die(196, "Failed to find group: %s" % args.grp.orig)
# Retrieve group
import omero
try:
group = client.sf.getAdminService().getGroup(gid)
except omero.ApiUsageException:
self.ctx.die(196, "Failed to find group: %s" % args.grp.orig)
# Check session owner is member of the target group
uid = self.ctx.get_event_context().userId
admin = self.is_admin(client)
ids = [x.child.id.val for x in group.copyGroupExperimenterMap()]
# check if the user is an admin
if uid not in ids and not admin:
self.ctx.die(197, "Current user is not member of group: %s" %
group.id.val)
# Set requests group
if isinstance(req, omero.cmd.DoAll):
for request in req.requests:
if isinstance(request, omero.cmd.SkipHead):
request.request.groupId = gid
else:
request.groupId = gid
else:
if isinstance(req, omero.cmd.SkipHead):
req.request.groupId = gid
else:
req.groupId = gid
super(ChgrpControl, self)._process_request(req, args, client)
[docs]
def print_detailed_report(self, req, rsp, status):
import omero
if isinstance(rsp, omero.cmd.DoAllRsp):
for response in rsp.responses:
if isinstance(response, omero.cmd.Chgrp2Response):
self.print_chgrp_response(response)
elif isinstance(rsp, omero.cmd.Chgrp2Response):
self.print_chgrp_response(rsp)
[docs]
def print_chgrp_response(self, rsp):
if rsp.includedObjects:
self.ctx.out("Included objects")
obj_ids = self._get_object_ids(rsp.includedObjects)
for k in obj_ids:
self.ctx.out(" %s:%s" % (k, obj_ids[k]))
if rsp.deletedObjects:
self.ctx.out("Deleted objects")
obj_ids = self._get_object_ids(rsp.deletedObjects)
for k in obj_ids:
self.ctx.out(" %s:%s" % (k, obj_ids[k]))
try:
register("chgrp", ChgrpControl, HELP)
except NameError:
if __name__ == "__main__":
cli = CLI()
cli.register("chgrp", ChgrpControl, HELP)
cli.invoke(sys.argv[1:])