Source code for plugins.user

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# 
# Copyright 2009-2014 Glencoe Software, Inc. All rights reserved.
# Use is subject to license terms supplied in LICENSE.txt
"""
   User administration plugin
"""

import sys

from omero.cli import UserGroupControl, CLI, ExceptionHandler, admin_only
from omero.model.enums import (AdminPrivilegeModifyGroupMembership,
                               AdminPrivilegeModifyUser)
from omero.rtypes import unwrap as _

HELP = "Support for adding and managing users"


[docs] class UserControl(UserGroupControl): def _configure(self, parser): self.exc = ExceptionHandler() parser.add_login_arguments() sub = parser.sub() add = parser.add(sub, self.add, help="Add user") add.add_argument( "--ignore-existing", action="store_true", default=False, help="Do not fail if user already exists") add.add_argument( "-m", "--middlename", help="Middle name, if available") add.add_argument("-e", "--email") add.add_argument("-i", "--institution") # Capitalized since conflict with main values add.add_argument( "-a", "--admin", action="store_true", help="Whether the user should be an admin") add.add_argument("username", help="User's login name") add.add_argument("firstname", help="User's first name") add.add_argument("lastname", help="User's last name") self.add_group_arguments(add, " to join") password_group = add.add_mutually_exclusive_group() password_group.add_argument( "-P", "--userpassword", help="Password for user") password_group.add_argument( "--no-password", action="store_true", default=False, help="Create user with empty password") list = parser.add( sub, self.list, help="List information about all users") info = parser.add( sub, self.info, "List information about the user(s). Default to the context user") self.add_user_arguments(info) for x in (list, info): x.add_style_argument() x.add_group_print_arguments() x.add_user_sorting_arguments() listgroups = parser.add( sub, self.listgroups, "List the groups of the user. Default to the context user") self.add_user_arguments(listgroups) listgroups.add_style_argument() listgroups.add_user_print_arguments() listgroups.add_group_sorting_arguments() password = parser.add( sub, self.password, help="Set user's password") password.add_argument( "username", nargs="?", help="Username if not the current user") email = parser.add( sub, self.email, help="List users' email addresses") email.add_argument( "-n", "--names", action="store_true", default=False, help="Print user names along with email addresses") email.add_argument( "-1", "--one", action="store_true", default=False, help="Print one user per line") email.add_argument( "-i", "--ignore", action="store_true", default=False, help="Ignore users without email addresses") email.add_argument( "--all", action="store_true", default=False, help="Include all users, including deactivated accounts") joingroup = parser.add(sub, self.joingroup, "Join one or more groups") self.add_id_name_arguments( joingroup, "user. Default to the current user") group = self.add_group_arguments(joingroup, " to join") group.add_argument( "--as-owner", action="store_true", default=False, help="Join the group(s) as an owner") leavegroup = parser.add( sub, self.leavegroup, "Leave one or more groups") self.add_id_name_arguments( leavegroup, "user. Default to the current user") group = self.add_group_arguments(leavegroup, " to leave") group.add_argument( "--as-owner", action="store_true", default=False, help="Leave the owner list of the group(s)") for x in (email, password, list, add, joingroup, leavegroup): x.add_login_arguments()
[docs] def format_name(self, exp): record = "" fn = _(exp.firstName) mn = " " if _(exp.middleName): mn = " %s " % _(exp.middleName) ln = _(exp.lastName) record += "%s%s%s" % (fn, mn, ln) return record
[docs] def email(self, args): c = self.ctx.conn(args) a = c.sf.getAdminService() r = a.getSecurityRoles() skipped = [] records = [] for exp in a.lookupExperimenters(): # Handle users without email if not _(exp.email): if not args.ignore: skipped.append(exp) continue # Handle deactivated users if not args.all: groups = exp.linkedExperimenterGroupList() group_ids = [x.id.val for x in groups] if r.userGroupId not in group_ids: continue record = "" if args.names: record += '"%s"' % self.format_name(exp) record += " <%s>" % _(exp.email) else: record += _(exp.email) records.append(record) if args.one: for record in records: self.ctx.out(record) else: self.ctx.out(", ".join(records)) if skipped: self.ctx.err("Missing email addresses:") for s in skipped: self.ctx.err(self.format_name(s))
[docs] def password(self, args): import omero from omero.rtypes import rstring client = self.ctx.conn(args) own_name = self.ctx.get_event_context().userName admin = client.sf.getAdminService() # tickets 3202, 5841 own_pw = self._ask_for_password(" for your user (%s)" % own_name, strict=False) try: client.sf.setSecurityPassword(own_pw) self.ctx.out("Verified password.\n") except omero.SecurityViolation as sv: import traceback self.ctx.die(456, "SecurityViolation: Bad credentials") self.ctx.dbg(traceback.format_exception(None, e, sys.exc_info()[2])) if args.username: try: e = admin.lookupExperimenter(args.username) except omero.ApiUsageException: self.ctx.die(457, "Unknown user: %s" % args.username) return # Never reached self.ctx.out("Changing password for %s (id:%s)" % ( args.username, e.id.val)) else: self.ctx.out("Changing password for %s" % own_name) pw = self._ask_for_password(" to be set") pw = rstring(pw) if args.username: admin.changeUserPassword(args.username, pw) else: admin.changePassword(pw) self.ctx.out("Password changed")
[docs] def list(self, args): c = self.ctx.conn(args) a = c.sf.getAdminService() users = a.lookupExperimenters() self.output_users_list(a, users, args)
[docs] def info(self, args): c = self.ctx.conn(args) a = c.sf.getAdminService() [uids, users] = self.list_users(a, args, use_context=True) self.output_users_list(a, users, args)
[docs] def listgroups(self, args): c = self.ctx.conn(args) admin = c.sf.getAdminService() [uids, groups] = self.list_users(admin, args, use_context=True) if len(uids) != 1: self.ctx.die(516, 'Too many user arguments') groups = admin.containedGroups(uids[0]) self.output_groups_list(groups, args)
@admin_only(AdminPrivilegeModifyUser, AdminPrivilegeModifyGroupMembership) def add(self, args): email = args.email login = args.username first = args.firstname middle = args.middlename last = args.lastname inst = args.institution pasw = args.userpassword import omero from omero.rtypes import rbool, rstring from omero_model_ExperimenterI import ExperimenterI as Exp from omero_model_ExperimenterGroupI import ExperimenterGroupI as Grp c = self.ctx.conn(args) e = Exp() e.omeName = rstring(login) e.firstName = rstring(first) e.lastName = rstring(last) e.middleName = rstring(middle) e.email = rstring(email) e.institution = rstring(inst) e.ldap = rbool(False) # Fail-fast if no-password is passed and the server does not accept # empty passwords configService = c.getSession().getConfigService() password_required = configService.getConfigValue( "omero.security.password_required").lower() if args.no_password and password_required != 'false': self.ctx.die(502, "Server does not allow user creation with empty" " passwords") # Check user existence admin = c.getSession().getAdminService() try: usr = admin.lookupExperimenter(login) if usr: if args.ignore_existing: self.ctx.out("User exists: %s (id=%s)" % (login, usr.id.val)) return else: self.ctx.die(3, "User exists: %s (id=%s)" % (login, usr.id.val)) except omero.ApiUsageException: pass # Apparently no such user exists [gid, groups] = self.list_groups(admin, args, use_context=False) roles = admin.getSecurityRoles() groups.append(Grp(roles.userGroupId, False)) if args.admin: groups.append(Grp(roles.systemGroupId, False)) group = groups.pop(0) try: if args.no_password: id = admin.createExperimenter(e, group, groups) self.ctx.out("Added user %s (id=%s) without password" % (login, id)) else: if pasw is None: pasw = self._ask_for_password(" for your new user (%s)" % login, strict=True) id = admin.createExperimenterWithPassword(e, rstring(pasw), group, groups) self.ctx.out("Added user %s (id=%s) with password" % (login, id)) except omero.ValidationException as ve: # Possible, though unlikely after previous check if self.exc.is_constraint_violation(ve): self.ctx.die(66, "User already exists: %s" % login) else: self.ctx.die(67, "Unknown ValidationException: %s" % ve.message) except omero.SecurityViolation as se: self.ctx.die(68, "Security violation: %s" % se.message)
[docs] def parse_userid(self, a, args): if args.id: user = getattr(args, "id", None) return self.find_user_by_id(a, user, fatal=True) elif args.name: user = getattr(args, "name", None) return self.find_user_by_name(a, user, fatal=True) else: user = self.ctx.get_event_context().userName return self.find_user_by_name(a, user, fatal=True)
[docs] def filter_groups(self, groups, uid, owner=False, join=True): for group in list(groups): if owner: uid_list = self.getownerids(group) relation = "owner of" else: uid_list = self.getuserids(group) relation = "in" if join: if uid in uid_list: self.ctx.out("%s is already %s group %s" % (uid, relation, group.id.val)) groups.remove(group) else: if uid not in uid_list: self.ctx.out("%s is not %s group %s" % (uid, relation, group.id.val)) groups.remove(group) return groups
[docs] def joingroup(self, args): c = self.ctx.conn(args) a = c.sf.getAdminService() uid, username = self.parse_userid(a, args) [gid, groups] = self.list_groups(a, args, use_context=False) groups = self.filter_groups(groups, uid, args.as_owner, True) for group in groups: if args.as_owner: self.addownersbyid(a, group, [uid]) else: self.addusersbyid(a, group, [uid])
[docs] def leavegroup(self, args): c = self.ctx.conn(args) a = c.sf.getAdminService() uid, username = self.parse_userid(a, args) [gid, groups] = self.list_groups(a, args, use_context=False) groups = self.filter_groups(groups, uid, args.as_owner, False) for group in list(groups): if args.as_owner: self.removeownersbyid(a, group, [uid]) else: self.removeusersbyid(a, group, [uid])
try: register("user", UserControl, HELP) except NameError: if __name__ == "__main__": cli = CLI() cli.register("user", UserControl, HELP) cli.invoke(sys.argv[1:])