#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2011 - 2016 Glencoe Software, Inc. All rights reserved.
# Use is subject to license terms supplied in LICENSE.txt
"""
User administration plugin (LDAP extension)
"""
import sys
from omero.cli import CLI, ExceptionHandler, admin_only, UserGroupControl
from omero.model.enums import (
AdminPrivilegeModifyGroup, AdminPrivilegeModifyGroupMembership,
AdminPrivilegeModifyUser)
from omero.rtypes import rbool
HELP = """Administrative support for managing users' LDAP settings
Most of these commands should be run as an OMERO administrator such as root.
Examples:
omero login root
omero ldap active
omero ldap list
omero ldap getdn --user-name jack # Get DN of user.
omero ldap setdn --user-name jack true # Enable LDAP login.
omero ldap setdn --user-name jack false # Disable LDAP login.
omero ldap getdn --group-name mylab # Get DN of user group.
omero ldap setdn --group-name mylab true # Mark group as LDAP group.
omero ldap setdn --group-name mylab false # Mark group as local.
omero ldap discover
omero ldap discover --groups
omero ldap create bob # User bob must exist in LDAP
"""
[docs]
class LdapControl(UserGroupControl):
def _configure(self, parser):
self.exc = ExceptionHandler()
sub = parser.sub()
active = parser.add(
sub, self.active,
help="Return code shows if LDAP is configured (admins-only)")
list = parser.add(
sub, self.list,
help="List all OMERO users with DNs")
list.add_style_argument()
getdn = parser.add(sub, self.getdn, help="Get DN for user on stdout")
setdn = parser.add(
sub, self.setdn,
help="""Enable or disable LDAP login for user (admins only)
Once LDAP login is enabled for a user, the password set via OMERO is
ignored, and any attempt to change it will result in an error. When
you disable LDAP login, the previous password will be in effect, but if the
user never had a password, one will need to be set!""")
for x in (getdn, setdn):
self.add_user_and_group_arguments(x)
setdn.add_argument("choice", action="store",
help="Enable/disable LDAP login (true/false)")
discover = parser.add(
sub, self.discover,
help="""Discover DNs for existing OMERO users or groups
This command works in the context of users or groups. Specifying
--groups will only discover groups, that is check which group exists in
the LDAP server and OMERO and has the "ldap" flag disabled - such groups
will be presented to the user. Omitting --groups will apply the same logic
to users.""")
discover.add_argument(
"--commands", action="store_true", default=False,
help="Print setdn commands on standard out")
discover.add_argument("--groups", action="store_true", default=False,
help="Discover LDAP groups, not users.")
create = parser.add(
sub, self.create,
help="Create a local user based on LDAP username (admins only)"
)
create.add_argument(
"username", help="LDAP username of user to be created")
for x in (active, list, getdn, setdn, discover, create):
x.add_login_arguments()
@admin_only(full_admin=False) # IConfig privilege
def active(self, args):
c = self.ctx.conn(args)
ildap = c.sf.getLdapService()
if ildap.getSetting():
self.ctx.out("Yes")
else:
self.ctx.die(1, "No")
[docs]
def list(self, args):
c = self.ctx.conn(args)
iadmin = c.sf.getAdminService()
from omero.rtypes import unwrap
from omero.util.text import TableBuilder
list_of_dn_user_maps = unwrap(iadmin.lookupLdapAuthExperimenters())
if list_of_dn_user_maps is None:
return
count = 0
tb = TableBuilder("#")
if args.style:
tb.set_style(args.style)
tb.cols(["Id", "OmeName", "DN"])
for map in list_of_dn_user_maps:
for dn, id in list(map.items()):
try:
exp = iadmin.getExperimenter(id)
except:
self.ctx.err("Bad experimenter: %s" % id)
tb.row(count, *(id, exp.omeName.val, dn))
count += 1
self.ctx.out(str(tb.build()))
@admin_only(full_admin=False) # ILdap privilege
def getdn(self, args):
c = self.ctx.conn(args)
iadmin = c.sf.getAdminService()
ildap = c.sf.getLdapService()
dn = None
if args.user_name:
[uid, u] = self.find_user_by_name(iadmin, args.user_name,
fatal=True)
if u and u.getLdap().val:
name = u.getOmeName().val
dn = name + ": " + ildap.findDN(name)
elif args.user_id:
[uid, u] = self.find_user_by_id(iadmin, args.user_id, fatal=True)
if u and u.getLdap().val:
name = u.getOmeName().val
dn = name + ": " + ildap.findDN(name)
elif args.group_name:
[gid, g] = self.find_group_by_name(iadmin, args.group_name,
fatal=True)
if g and g.getLdap().val:
name = g.getName().val
dn = name + ": " + ildap.findGroupDN(name)
elif args.group_id:
[gid, g] = self.find_group_by_id(iadmin, args.group_id, fatal=True)
if g and g.getLdap().val:
name = g.getName().val
dn = name + ": " + ildap.findGroupDN(name)
if dn is not None and dn.strip():
self.ctx.out(dn)
@admin_only(AdminPrivilegeModifyGroup)
def update_group(self, iupdate, group):
iupdate.saveObject(group)
@admin_only(AdminPrivilegeModifyUser)
def update_user(self, iupdate, user):
iupdate.saveObject(user)
@admin_only(full_admin=False) # See update_user/update_group
def setdn(self, args):
c = self.ctx.conn(args)
iupdate = c.sf.getUpdateService()
iadmin = c.sf.getAdminService()
obj = None
if args.user_name:
[uid, obj] = self.find_user_by_name(iadmin, args.user_name,
fatal=True)
elif args.user_id:
[uid, obj] = self.find_user_by_id(iadmin, args.user_id, fatal=True)
elif args.group_name:
[gid, obj] = self.find_group_by_name(iadmin, args.group_name,
fatal=True)
elif args.group_id:
[gid, obj] = self.find_group_by_id(iadmin, args.group_id,
fatal=True)
if obj is not None:
from omero.model import Experimenter, ExperimenterGroup
obj.setLdap(rbool(args.choice.lower()
in ("yes", "true", "t", "1")))
if isinstance(obj, ExperimenterGroup):
self.update_group(iupdate, obj)
elif isinstance(obj, Experimenter):
self.update_user(iupdate, obj)
@admin_only(full_admin=False) # ILdap privilege
def discover(self, args):
c = self.ctx.conn(args)
ildap = c.sf.getLdapService()
elements = {}
element_name = "users"
if args.groups:
element_name = "groups"
elements = ildap.discoverGroups()
else:
elements = ildap.discover()
if len(elements) > 0:
self.ctx.out("Following LDAP %s are disabled in OMERO:"
% element_name)
for e in elements:
if args.groups:
if args.commands:
self.ctx.out("%s ldap setdn --group-name %s true"
% (sys.argv[0], e.getName().getValue()))
else:
self.ctx.out("Group=%s\tname=%s"
% (e.getId().getValue(),
e.getName().getValue()))
else:
if args.commands:
self.ctx.out("%s ldap setdn --user-name %s true"
% (sys.argv[0],
e.getOmeName().getValue()))
else:
self.ctx.out("Experimenter=%s\tomeName=%s"
% (e.getId().getValue(),
e.getOmeName().getValue()))
@admin_only(AdminPrivilegeModifyUser, AdminPrivilegeModifyGroupMembership)
def create(self, args):
c = self.ctx.conn(args)
ildap = c.sf.getLdapService()
iadmin = c.sf.getAdminService()
import omero
import Ice
try:
exp = ildap.createUser(args.username)
dn = iadmin.lookupLdapAuthExperimenter(exp.id.val)
self.ctx.out("Added user %s (id=%s) with DN=%s" %
(exp.omeName.val, exp.id.val, dn))
except omero.ValidationException as ve:
self.ctx.die(132, ve.message)
except Ice.RequestFailedException as rfe:
self.ctx.die(133, self.exc.handle_failed_request(rfe))
try:
register("ldap", LdapControl, HELP)
except NameError:
if __name__ == "__main__":
cli = CLI()
cli.register("ldap", LdapControl, HELP)
cli.invoke(sys.argv[1:])