#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Josh Moore, josh at glencoesoftware.com
# Copyright (c) 2007-2016, Glencoe Software, Inc.
# See LICENSE for details.
#
"""
Python driver for OMERO
Provides access to various OMERO.blitz server- and client-side
utilities, including starting and stopping servers, running
analyses, configuration, and more.
Usable via the ./omero script provided with the distribution
as well as from python via ``import omero.cli; omero.cli.argv()``.
Arguments are taken from (in order of priority): the ``run`` method
arguments, ``sys.argv``, and finally from standard-in using the
``cmd.Cmd.cmdloop`` method.
"""
sys = __import__("sys")
cmd = __import__("cmd")
import re
import os
import subprocess
import socket
import traceback
import glob
import platform
import time
import shlex
import errno
from threading import Lock
from contextlib import contextmanager
from functools import wraps
from omero_ext.argparse import ArgumentError
from omero_ext.argparse import ArgumentTypeError
from omero_ext.argparse import ArgumentParser
from omero_ext.argparse import FileType
from omero_ext.argparse import Namespace
from omero_ext.argparse import _SubParsersAction
# Help text
from omero_ext.argparse import RawTextHelpFormatter
from omero_ext.argparse import SUPPRESS
from omero_ext.path import path
from omero.util.concurrency import get_event
import omero
import warnings
#
# Static setup
#
try:
from omero_version import omero_version
VERSION = omero_version
except ImportError:
VERSION = "Unknown" # Usually during testing
DEBUG = 0
if "DEBUG" in os.environ:
try:
DEBUG = int(os.environ["DEBUG"])
except ValueError:
DEBUG = 1
print("Deprecated warning: use 'omero --debug=x [args]' to debug")
print("Running omero with debugging == 1")
OMERODOC = """
Command-line tool for local and remote interactions with OMERO.
"""
OMEROSHELL = """OMERO Python Shell. Version %s""" % str(VERSION)
OMEROHELP = """Type "help" for more information, "quit" or Ctrl-D to exit"""
OMEROSUBS = """Use %(prog)s <subcommand> -h for more information."""
OMEROSUBM = """<subcommand>"""
OMEROCLI = path(__file__).expand().dirname()
OMERODIR = os.getenv('OMERODIR', None)
if OMERODIR is not None:
OMERODIR = path(OMERODIR)
else:
OMERODIR = OMEROCLI.dirname().dirname().dirname()
OMERO_COMPONENTS = ['common', 'model', 'romio', 'renderer', 'server', 'blitz']
COMMENT = re.compile(r"^\s*#")
RELFILE = re.compile(r"^\w")
LINEWSP = re.compile(r"^\s*\w+\s+")
#
# Possibilities:
# - Always return and print any output
# - Have a callback on the fired event
# - how should state machine work?
# -- is the last control stored somwhere? in a stack history[3]
# -- or do they all share a central memory? self.ctx["MY_VARIABLE"]
# - In almost all cases, mark a flag in the CLI "lastError" and continue,
# allowing users to do something of the form: on_success or on_fail
#####################################################
#
# Exceptions
#
[docs]
class NonZeroReturnCode(Exception):
def __init__(self, rv, *args):
self.rv = rv
Exception.__init__(self, *args)
#####################################################
#
[docs]
class WriteOnceNamespace(Namespace):
"""
Namespace subclass which prevents overwriting any values by accident.
"""
def __setattr__(self, name, value):
if hasattr(self, name):
raise Exception("%s already has field %s"
% (self.__class__.__name__, name))
else:
return Namespace.__setattr__(self, name, value)
[docs]
class Parser(ArgumentParser):
"""
Extension of ArgumentParser for simplifying the
_configure() code in most Controls
"""
def __init__(self, *args, **kwargs):
kwargs["formatter_class"] = HelpFormatter
ArgumentParser.__init__(self, *args, **kwargs)
self._positionals.title = "Positional Arguments"
self._optionals.title = "Optional Arguments"
self._optionals.description = "In addition to any higher level options"
self._sort_args = True
[docs]
def sub(self):
return self.add_subparsers(
title="Subcommands", description=OMEROSUBS, metavar=OMEROSUBM)
[docs]
def add(self, sub, func, help=None, **kwargs):
if help is None:
help = func.__doc__
parser = sub.add_parser(
func.__func__.__name__, help=help, description=help)
parser.set_defaults(func=func, **kwargs)
return parser
[docs]
def add_limit_arguments(self):
self.add_argument(
"--limit", help="maximum number of return values (default=25)",
type=int, default=25)
self.add_argument(
"--offset", help="number of entries to skip (default=0)",
type=int, default=0)
[docs]
def add_style_argument(self):
from omero.util.text import list_styles
self.add_argument(
"--style", help="use alternative output style (default=sql)",
choices=list_styles())
[docs]
def add_login_arguments(self):
group = self.add_argument_group(
'Login arguments', ENV_HELP + """
Optional session arguments:
""")
group.add_argument(
"-C", "--create", action="store_true",
help="Create a new session regardless of existing ones")
group.add_argument("-s", "--server", help="OMERO server hostname")
group.add_argument("-p", "--port", help="OMERO server port")
group.add_argument("-g", "--group", help="OMERO server default group")
group.add_argument("-u", "--user", help="OMERO username")
group.add_argument("-w", "--password", help="OMERO password")
group.add_argument(
"-k", "--key",
help="OMERO session key (UUID of an active session)")
group.add_argument(
"--sudo", metavar="ADMINUSER",
help="Create session as this admin. Changes meaning of password!")
group.add_argument(
"-q", "--quiet", action="store_true",
help="Quiet mode. Causes most warning and diagnostic messages to "
"be suppressed.")
[docs]
def add_group_print_arguments(self):
printgroup = self.add_mutually_exclusive_group()
printgroup.add_argument(
"--long", action="store_true", default=True,
help="Print comma-separated list of all groups (default)")
printgroup.add_argument(
"--count", action="store_true", default=False,
help="Print count of all groups")
[docs]
def add_user_print_arguments(self):
printgroup = self.add_mutually_exclusive_group()
printgroup.add_argument(
"--count", action="store_true", default=True,
help="Print count of all users and owners (default)")
printgroup.add_argument(
"--long", action="store_true", default=False,
help="Print comma-separated list of all users and owners")
[docs]
def add_user_sorting_arguments(self):
sortgroup = self.add_mutually_exclusive_group()
sortgroup.add_argument(
"--sort-by-id", action="store_true", default=True,
help="Sort users by ID (default)")
sortgroup.add_argument(
"--sort-by-login", action="store_true", default=False,
help="Sort users by login")
sortgroup.add_argument(
"--sort-by-first-name", action="store_true", default=False,
help="Sort users by first name")
sortgroup.add_argument(
"--sort-by-last-name", action="store_true", default=False,
help="Sort users by last name")
sortgroup.add_argument(
"--sort-by-email", action="store_true", default=False,
help="Sort users by email")
[docs]
def add_group_sorting_arguments(self):
sortgroup = self.add_mutually_exclusive_group()
sortgroup.add_argument(
"--sort-by-id", action="store_true", default=True,
help="Sort groups by ID (default)")
sortgroup.add_argument(
"--sort-by-name", action="store_true", default=False,
help="Sort groups by name")
[docs]
def set_args_unsorted(self):
self._sort_args = False
def _check_value(self, action, value):
# converted value must be one of the choices (if specified)
if action.choices is not None and value not in action.choices:
msg = 'invalid choice: %r\n\nchoose from:\n' % value
choices = list(action.choices)
if self._sort_args:
choices = sorted(choices)
msg += self._format_list(choices)
raise ArgumentError(action, msg)
def _format_list(self, choices):
lines = ["\t"]
if choices:
while len(choices) > 1:
choice = choices.pop(0)
lines[-1] += ("%s, " % choice)
if len(lines[-1]) > 62:
lines.append("\t")
lines[-1] += choices.pop(0)
return "\n".join(lines)
[docs]
class ProxyStringType(object):
"""
To make use of the omero.proxy_to_instance method,
an instance can be passed to add_argument with a default
value: add_argument(..., type=ProxyStringType("Image"))
which will take either a proxy string of the form:
"Image:1" or simply the ID itself: "1"
"""
def __init__(self, default=None):
self.default = default
def __call__(self, s):
try:
return omero.proxy_to_instance(s, default=self.default)
except omero.ClientError as ce:
raise ValueError(str(ce))
def __repr__(self):
return "proxy"
[docs]
class NewFileType(FileType):
"""
Extension of the argparse.FileType to prevent
overwrite existing files.
"""
def __call__(self, s):
if s != "-" and os.path.exists(s):
raise ArgumentTypeError("File exists: %s" % s)
return FileType.__call__(self, s)
[docs]
class ExistingFile(FileType):
"""
Extension of the argparse.FileType that requires
an existing file.
"""
def __call__(self, s):
if s == "-":
return s
p = path(s)
if not p.exists():
raise ArgumentTypeError("File does not exist: %s" % s)
elif not p.isfile():
raise ArgumentTypeError("Path is not a file: %s" % s)
return FileType.__call__(self, s)
[docs]
class DirectoryType(FileType):
"""
Extension of the argparse.FileType to only allow
existing directories.
"""
def __call__(self, s):
p = path(s)
if not p.exists():
raise ArgumentTypeError("Directory does not exist: %s" % s)
elif not p.isdir():
raise ArgumentTypeError("Path is not a directory: %s" % s)
return str(p.abspath())
[docs]
class ExceptionHandler(object):
"""
Location for all logic which maps from server exceptions
to specific states. This could likely be moved elsewhere
for general client-side usage.
"""
[docs]
def is_constraint_violation(self, ve):
if isinstance(ve, omero.ValidationException):
if "org.hibernate.exception.ConstraintViolationException: " \
"could not insert" in str(ve):
return True
[docs]
def handle_failed_request(self, rfe):
import Ice
if isinstance(rfe, Ice.OperationNotExistException):
return "Operation not supported by the server: %s" % rfe.operation
else:
return "Unknown Ice.RequestFailedException"
DEBUG_HELP = """
Set debug options for developers
The value to the debug argument is a comma-separated list of commands.
Available debugging choices:
'0' Disable debugging
'debug' Enable debugging at the first debug level
'1'-'9' Enable debugging at the specified debug level
'trace' Run the command with tracing enabled
'profile' Run the command with profiling enabled
Note "trace" and "profile" cannot be used simultaneously
Examples:
# Enabled debugging at level 1 and prints tracing
omero --debug=debug,trace admin start
# Enabled debugging at level 1
omero -d1 admin start
# Enabled debugging at level 3
omero -d3 admin start
# Enable profiling
omero -dp admin start
# Fails - cannot print tracing and profiling together
omero -dt,p admin start
# Disable debugging
omero -d0 admin start
"""
ENV_HELP = """Environment variables:
OMERO_USERDIR Set the base directory containing the user's files.
Default: $HOME/omero
OMERO_SESSIONDIR Set the base directory containing local sessions.
Default: $OMERO_USERDIR/sessions
OMERO_TMPDIR Set the base directory containing temporary files.
Default: $OMERO_USERDIR/tmp
OMERO_PASSWORD Set the user's password for creating new sessions.
Ignored if -w or --password is used.
"""
SUDO_HELP = """
The --sudo option is available to all
commands accepting connection arguments.
Below are a few examples showing how to use the sudo option
Examples (admin or group owner only):
# Import data for user *username*
omero import --sudo root -s servername -u username image.tiff
# Create a connection as another user
omero login --sudo root -s servername -u username -g groupname
Password for root:
omero login --sudo owner -s servername -u username -g groupname
Password for owner:
"""
[docs]
class Context(object):
"""
Simple context used for default logic. The CLI registry which registers
the plugins installs itself as a fully functional Context.
The Context class is designed to increase pluggability. Rather than
making calls directly on other plugins directly, the pub() method
routes messages to other commands. Similarly, out() and err() should
be used for printing statements to the user, and die() should be
used for exiting fatally.
"""
def __init__(self, controls=None, params=None, prog=sys.argv[0]):
self.controls = controls
if self.controls is None:
self.controls = {}
self.params = params
if self.params is None:
self.params = {}
self.event = get_event(name="CLI")
self.dir = OMERODIR
self.isquiet = False
# This usage will go away and default will be False
self.isdebug = DEBUG
self.topics = {"debug": DEBUG_HELP, "env": ENV_HELP, "sudo": SUDO_HELP}
self.parser = Parser(prog=prog, description=OMERODOC)
self.subparsers = self.parser_init(self.parser)
[docs]
def post_process(self):
"""
Runs further processing once all the controls have been added.
"""
sessions = self.controls["sessions"]
login = self.subparsers.add_parser(
"login", help="Shortcut for 'sessions login'",
description=sessions.login.__doc__)
login.set_defaults(func=lambda args: sessions.login(args))
sessions._configure_login(login)
logout = self.subparsers.add_parser(
"logout", help="Shortcut for 'sessions logout'")
logout.set_defaults(func=lambda args: sessions.logout(args))
sessions._configure_dir(logout)
[docs]
def parser_init(self, parser):
parser.add_argument(
"-v", "--version", action="version",
version="%%(prog)s %s" % VERSION)
parser.add_argument(
"-d", "--debug",
help="Use 'help debug' for more information", default=SUPPRESS)
parser.add_argument(
"--path", action="append",
help="Add file or directory to plugin list. Supports globs.")
parser.add_login_arguments()
subparsers = parser.add_subparsers(
title="Subcommands", description=OMEROSUBS, metavar=OMEROSUBM)
return subparsers
[docs]
def get(self, key, defvalue=None):
return self.params.get(key, defvalue)
[docs]
def set(self, key, value=True):
self.params[key] = value
[docs]
def safePrint(self, text, stream, newline=True):
"""
Prints text to a given string, capturing any exceptions.
"""
try:
stream.write(text)
if newline:
stream.write("\n")
else:
stream.flush()
except IOError as e:
if e.errno != errno.EPIPE:
raise
except:
print("Error printing text", file=sys.stderr)
try:
print(text, file=sys.stdout)
except UnicodeEncodeError:
print(text.encode('utf-8', 'surrogateescape'), file=sys.stdout)
if self.isdebug:
traceback.print_exc()
[docs]
def pythonpath(self):
"""
Converts the current sys.path to a PYTHONPATH string
to be used by plugins which must start a new process.
Note: this was initially created for running during
testing when PYTHONPATH is not properly set.
"""
path = list(sys.path)
for i in range(0, len(path) - 1):
if path[i] == '':
path[i] = os.getcwd()
pythonpath = ":".join(path)
return pythonpath
[docs]
def userdir(self):
"""
Returns a user directory (as path.path) which can be used
for storing configuration. The directory is guaranteed to
exist and be private (700) after execution.
"""
dir = path(os.path.expanduser("~")) / "omero" / "cli"
if not dir.exists():
dir.mkdir()
elif not dir.isdir():
raise Exception("%s is not a directory" % dir)
dir.chmod(0o700)
return dir
[docs]
def pub(self, args, strict=False):
self.safePrint(str(args), sys.stdout)
[docs]
def input(self, prompt, hidden=False, required=False):
"""
Reads from standard in. If hidden == True, then
uses getpass
"""
try:
while True:
if hidden:
import getpass
rv = getpass.getpass(prompt)
else:
rv = input(prompt)
if required and not rv:
self.out("Input required")
continue
return rv
except KeyboardInterrupt:
self.die(1, "Cancelled")
[docs]
def out(self, text, newline=True):
"""
Expects a single string as argument.
"""
self.safePrint(text, sys.stdout, newline)
[docs]
def err(self, text, newline=True):
"""
Expects a single string as argument.
"""
self.safePrint(text, sys.stderr, newline)
[docs]
def dbg(self, text, newline=True, level=1):
"""
Passes text to err() if self.isdebug is set
"""
if self.isdebug >= level:
self.err(text, newline)
[docs]
def die(self, rc, args):
raise Exception((rc, args))
[docs]
def exit(self, args):
self.out(args)
self.interrupt_loop = True
[docs]
def call(self, args):
self.out(str(args))
[docs]
def popen(self, args):
self.out(str(args))
[docs]
def sleep(self, time):
self.event.wait(time)
#####################################################
#
[docs]
def admin_only(*fargs, **fkwargs):
"""
Checks that the current user is an admin and has sufficient privileges,
or throws an exception. If no arguments are present or if full_admin is
passed and is True, then this method assumes that the user must be a
full admin and have all privileges. To disable this behavior, set
`full_admin` to False.
"""
def _admin_only(func):
@wraps(func)
def _check_admin(*args, **kwargs):
self = args[0]
plugin_args = args[1]
client = self.ctx.conn(plugin_args)
ec = client.sf.getAdminService().getEventContext()
have_privs = set(ec.adminPrivileges)
if fargs:
need_privs = set(fargs)
else:
full_admin = fkwargs.get("full_admin", True)
if not full_admin:
need_privs = set()
else:
try:
types = client.sf.getTypesService()
privs = types.allEnumerations("AdminPrivilege")
need_privs = set(omero.rtypes.unwrap(
[x.getValue() for x in privs]))
except Exception as e:
self.ctx.err("Error: denying access: %s" % e)
# If the user can't load enums assume the worst
self.error_admin_only(fatal=True)
if not ec.isAdmin:
self.error_admin_only(fatal=True)
elif not need_privs <= have_privs:
self.error_admin_only_privs(need_privs - have_privs,
fatal=True)
return func(*args, **kwargs)
return _check_admin
return _admin_only
[docs]
class Error(object):
"""
Wrapper for error messages which can be registered by an BaseControl
subclass in its _configure method. Example:
class MyControl(BaseControl):
def _configure(self, parser):
self.add_error("NAME", 100, "some message: %s")
...
def __call__(self, *args):
self.raise_error("NAME", "my text")
"""
def __init__(self, ctx, rcode, msg):
self.ctx = ctx
self.rcode = rcode
self.msg = msg
[docs]
def die(self, *args):
"""
Call ctx.die passing the return code and the message for this instance
"""
self.ctx.die(self.rcode, self.msg % tuple(args))
def __str__(self):
return "Error(%d, '%s')" % (self.rcode, self.msg)
[docs]
class BaseControl(object):
"""Controls get registered with a CLI instance on loadplugins().
To create a new control, subclass BaseControl, implement _configure,
and end your module with::
try:
register("name", MyControl, HELP)
except:
if __name__ == "__main__":
cli = CLI()
cli.register("name", MyControl, HELP)
cli.invoke(sys.argv[1:])
This module should be put in the omero.plugins package.
All methods which do NOT begin with "_" are assumed to be accessible
to CLI users.
"""
###############################################
#
# Mostly reusable code
#
def __init__(self, ctx=None, dir=OMERODIR):
self.dir = path(dir) # Guaranteed to be a path
self.ctx = ctx
if self.ctx is None:
self.ctx = Context() # Prevents unncessary stop_event creation
self.__errors = {}
[docs]
def add_error(self, name, rcode, msg):
"""
Register an Error by name both for discovery via the ErrorsControl
as well as for raising an exception via raise_error.
"""
err = self.__errors.get(name)
if err is not None:
self.ctx.die(2, "Error already exists: %s (%s)" % (name, err))
self.__errors[name] = Error(self.ctx, rcode, msg)
[docs]
def get_errors(self):
"""
Returns a mapping from name to Error object
"""
return dict(self.__errors)
[docs]
def raise_error(self, name, *args):
"""
Call die on the named Error using the arguments to format the message
"""
err = self.__errors.get(name)
if err is None:
self.ctx.die(2, "Error doesn't exist: %s" % name)
err.die(*args)
[docs]
def reset_errors(self, replacement=None):
old = self.__errors
if replacement is None:
replacement = {}
self.__errors = replacement
return old
def _isWindows(self):
p_s = platform.system()
if p_s == 'Windows':
return True
else:
return False
def _host(self):
"""
Return hostname of current machine. Termed to be the
value return from socket.gethostname() up to the first
decimal.
"""
if not hasattr(self, "hostname") or not self.hostname:
self.hostname = socket.gethostname()
if self.hostname.find(".") > 0:
self.hostname = self.hostname.split(".")[0]
return self.hostname
def _node(self, omero_node=None):
"""
Return the name of this node, using either the environment
vairable OMERO_NODE or _host(). Some subclasses may
override this functionality, most notably "admin" commands
which assume a node name of "master".
If the optional argument is not None, then the OMERO_NODE
environment variable will be set.
"""
if omero_node is not None:
os.environ["OMERO_NODE"] = omero_node
if "OMERO_NODE" in os.environ:
return os.environ["OMERO_NODE"]
else:
return self._host()
def _icedata(self, property):
"""
General data method for creating a path from an Ice property.
"""
try:
nodepath = self._properties()[property]
if RELFILE.match(nodepath):
nodedata = self.dir / path(nodepath)
else:
nodedata = path(nodepath)
created = False
if not nodedata.exists():
self.ctx.out("Creating "+nodedata)
nodedata.makedirs()
created = True
return (nodedata, created)
except KeyError as ke:
self.ctx.err(property + " is not configured")
self.ctx.die(4, str(ke))
def _initDir(self):
"""
Initialize the directory into which the current node will log.
"""
props = self._properties()
self._nodedata()
logdata = self.dir / path(props["Ice.StdOut"]).dirname()
if not logdata.exists():
self.ctx.out("Initializing %s" % logdata)
logdata.makedirs()
def _nodedata(self):
"""
Returns the data directory path for this node. This is determined
from the "IceGrid.Node.Data" property in the _properties()
map.
The directory will be created if it does not exist.
"""
data, created = self._icedata("IceGrid.Node.Data")
return data
def _regdata(self):
"""
Returns the data directory for the IceGrid registry.
This is determined from the "IceGrid.Registry.Data" property
in the _properties() map.
The directory will be created if it does not exist, and
a warning issued.
"""
data, created = self._icedata("IceGrid.Registry.Data")
def _pid(self):
"""
Returns a path of the form _nodedata() / (_node() + ".pid"),
i.e. a file named NODENAME.pid in the node's data directory.
"""
pidfile = self._nodedata() / (self._node() + ".pid")
return pidfile
def _cfglist(self):
"""
Returns a list of configuration files for this node. This
defaults to the internal configuration for all nodes,
followed by a file named NODENAME.cfg under the etc/
directory, following by PLATFORM.cfg if it exists.
"""
cfgs = self.dir / "etc"
internal = cfgs / "internal.cfg"
owncfg = cfgs / self._node() + ".cfg"
results = [internal, owncfg]
# Look for <platform>.cfg
p_s = platform.system()
p_c = cfgs / p_s + ".cfg"
if p_c.exists():
results.append(p_c)
return results
def _icecfg(self):
"""
Uses _cfglist() to return a string argument of the form
"--Ice.Config=..." suitable for passing to omero.client
as an argument.
"""
icecfg = "--Ice.Config=%s" % ",".join(self._cfglist())
return str(icecfg)
def _intcfg(self):
"""
Returns an Ice.Config string with only the internal configuration
file for connecting to the IceGrid Locator.
"""
intcfg = self.dir / "etc" / "internal.cfg"
intcfg.abspath()
return "--Ice.Config=%s" % intcfg
def _properties(self, prefix=""):
"""
Loads all files returned by _cfglist() into a new
Ice.Properties instance and return the map from
getPropertiesForPrefix(prefix) where the default is
to return all properties.
"""
import Ice
if getattr(self, "_props", None) is None:
self._props = Ice.createProperties()
for cfg in self._cfglist():
try:
self._props.load(str(cfg))
except Exception:
self.ctx.dbg("Complete error: %s" % traceback.format_exc())
self.ctx.die(3, "Could not find file: " + cfg +
"\nDid you specify the proper node?")
return self._props.getPropertiesForPrefix(prefix)
def _ask_for_password(self, reason="", root_pass=None, strict=True):
while not root_pass or len(root_pass) < 1:
root_pass = self.ctx.input("Please enter password%s: "
% reason, hidden=True)
if not strict:
return root_pass
if root_pass is None or root_pass == "":
self.ctx.err("Password cannot be empty")
continue
confirm = self.ctx.input("Please re-enter password%s: "
% reason, hidden=True)
if root_pass != confirm:
root_pass = None
self.ctx.err("Passwords don't match")
continue
break
return root_pass
def _add_wait(self, parser, default=-1):
parser.add_argument(
"--wait", type=int,
help="Number of seconds to wait for the processing to complete "
"(Indefinite < 0; No wait=0).", default=default)
[docs]
def get_subcommands(self):
"""Return a list of subcommands"""
parser = Parser()
old = self.reset_errors()
try:
self._configure(parser)
finally:
self.reset_errors(old)
subparsers_actions = [action for action in parser._actions
if isinstance(action, _SubParsersAction)]
subcommands = []
for subparsers_action in subparsers_actions:
for choice, subparser in list(subparsers_action.choices.items()):
subcommands.append(format(choice))
return subcommands
###############################################
#
# Methods likely to be implemented by subclasses
#
def _complete_file(self, f, dir=None):
"""
f: path part
"""
if dir is None:
dir = self.dir
else:
dir = path(dir)
p = path(f)
if p.exists() and p.isdir():
if not f.endswith(os.sep):
return [p.basename()+os.sep]
return [str(x)[len(f):] for x in p.listdir(
unreadable_as_empty=True)]
else:
results = [str(x.basename()) for x in dir.glob(f+"*")]
if len(results) == 1:
# Relative to cwd
maybe_dir = path(results[0])
if maybe_dir.exists() and maybe_dir.isdir():
return [results[0] + os.sep]
return results
def _complete(self, text, line, begidx, endidx):
try:
return self._complete2(text, line, begidx, endidx)
except:
self.ctx.dbg("Complete error: %s" % traceback.format_exc())
def _complete2(self, text, line, begidx, endidx):
items = shlex.split(line)
parser = getattr(self, "parser", None)
if parser:
result = []
actions = getattr(parser, "_actions")
if actions:
if len(items) > 1:
subparsers = [
x for x in actions
if x.__class__.__name__ == "_SubParsersAction"]
if subparsers:
subparsers = subparsers[0] # Guaranteed one
choice = subparsers.choices.get(items[-1])
if choice and choice._actions:
actions = choice._actions
if len(items) > 2:
actions = [] # TBD
for action in actions:
if action.__class__.__name__ == "_HelpAction":
result.append("-h")
elif action.__class__.__name__ == "_SubParsersAction":
result.extend(action.choices)
return ["%s " % x for x in result
if (not text or x.startswith(text)) and
line.find(" %s " % x) < 0]
# Fallback
completions = [method for method in dir(self)
if callable(getattr(self, method))]
return [str(method + " ") for method in completions
if method.startswith(text) and not method.startswith("_")]
[docs]
def error_admin_only(self, msg="SecurityViolation: Admins only!",
code=111, fatal=True):
if fatal:
self.ctx.die(code, msg)
else:
self.ctx.err(msg)
[docs]
def error_admin_only_privs(self, restrictions,
msg="SecurityViolation: Admin restrictions: ",
code=111, fatal=True):
msg += ", ".join(sorted(restrictions))
self.error_admin_only(msg=msg, code=code, fatal=fatal)
def _order_and_range_ids(self, ids):
from itertools import groupby
from operator import itemgetter
out = ""
ids = sorted(ids)
for k, g in groupby(enumerate(ids), lambda i_x: i_x[0]-i_x[1]):
g = list(map(str, list(map(itemgetter(1), g))))
out += g[0]
if len(g) > 2:
out += "-" + g[-1]
elif len(g) == 2:
out += "," + g[1]
out += ","
return out.rstrip(",")
[docs]
class DiagnosticsControl(BaseControl):
"""
Superclass (and SPI-interface) for any control commands that would
like to provide a "diagnostics" method, like `omero admin diagnostics`
and `omero web diagnostics`. The top-level diagnostics command then
can find each such plugin and iterate over it.
"""
def _add_diagnostics(self, parser, sub):
diagnostics = parser.add(
sub, self.diagnostics,
"Run a set of checks on the current, "
"preferably active server")
diagnostics.add_argument(
"--no-logs", action="store_true",
help="Skip log parsing")
return diagnostics
def _diagnostics_banner(self, control_name):
self.ctx.out("""
%s
OMERO Diagnostics (%s)
%s
""" % ("="*80, control_name, "="*80))
def _sz_str(self, sz):
if sz < 1000:
return "{0} B".format(sz)
for x in ["KB", "MB", "GB"]:
sz /= 1000
if sz < 1000:
break
sz = "%.1f %s" % (sz, x)
return sz
def _item(self, cat, msg):
cat = cat + ":"
cat = "%-12s" % cat
self.ctx.out(cat, False)
msg = "%-30s " % msg
self.ctx.out(msg, False)
def _exists(self, p):
if p.isdir():
if not p.exists():
self.ctx.out("doesn't exist")
else:
self.ctx.out("exists")
else:
if not p.exists():
self.ctx.out("n/a")
elif not p.size:
self.ctx.out("empty")
else:
warn_regex = (r'(-! )?[\d\-/]+\s+[\d:,.]+\s+([\w.]+:\s+)?'
r'warn(i(ng:)?)?\s')
err_regex = (r'(!! )?[\d\-/]+\s+[\d:,.]+\s+([\w.]+:\s+)?'
r'error:?\s')
warn = 0
err = 0
for l in p.lines(errors="surrogateescape"):
# ensure errors/warnings search is case-insensitive
lcl = l.lower()
if re.match(warn_regex, lcl):
warn += 1
elif re.match(err_regex, lcl):
err += 1
msg = ""
if warn or err:
msg = " errors=%-4s warnings=%-4s" % (err, warn)
self.ctx.out("%-12s %s" % (self._sz_str(p.size), msg))
[docs]
class ServiceManagerMixin:
"""
A mixin that adds a requires_service_manager method.
This method can be called to check for the presence of an environment
variable that indicates the plugin is being controlled by an external
service manager.
The class must define a property SERVICE_MANAGER_KEY that is used to
construct the name of the OMERO property
`omero.{SERVICE_MANAGER_KEY}.servicemanager.checkenv` defining the name of
the environment variable.
"""
[docs]
def requires_service_manager(self, config):
"""
Checks whether OMERO is being managed by a service manager by
checking that a specified environment variable is non-empty.
config: An OMERO ConfigXml object
"""
service_env = config.as_map().get(
"omero.{}.servicemanager.checkenv".format(
self.SERVICE_MANAGER_KEY))
if service_env:
service_envvalue = os.getenv(service_env)
if not service_envvalue:
self.ctx.die(112, (
"ERROR: OMERO is configured to run under a service "
"manager which should also set {}".format(service_env)))
[docs]
class CLI(cmd.Cmd, Context):
"""
Command line interface class. Supports various styles of executing the
registered plugins. Each plugin is given the chance to update this class
by adding methods of the form "do_<plugin name>".
"""
[docs]
class PluginsLoaded(object):
"""
Thread-safe class for storing whether or not all the plugins
have been loaded
"""
def __init__(self):
self.lock = Lock()
self.done = False
[docs]
def get(self):
self.lock.acquire()
try:
return self.done
finally:
self.lock.release()
[docs]
def set(self):
self.lock.acquire()
try:
self.done = True
finally:
self.lock.release()
def __init__(self, prog=sys.argv[0]):
"""
Also sets the "_client" field for this instance to None. Each cli
maintains a single active client. The "session" plugin is responsible
for the loading of the client object.
"""
cmd.Cmd.__init__(self)
Context.__init__(self, prog=prog)
self.prompt = 'omero> '
self.interrupt_loop = False
self.rv = 0 #: Return value to be returned
self._stack = [] #: List of commands being processed
self._client = None #: Single client for all activities
#: Paths to be loaded; initially official plugins
self._plugin_paths = [OMEROCLI / "plugins"]
self._pluginsLoaded = CLI.PluginsLoaded()
[docs]
def assertRC(self):
if self.rv != 0:
raise NonZeroReturnCode(self.rv, "assert failed")
[docs]
def invoke(self, line, strict=False, previous_args=None):
"""
Copied from cmd.py
"""
try:
line = self.precmd(line)
stop = self.onecmd(line, previous_args)
stop = self.postcmd(stop, line)
if strict:
self.assertRC()
finally:
if len(self._stack) == 0:
self.close()
else:
self.dbg("Delaying close for stack: %s"
% len(self._stack), level=2)
[docs]
def invokeloop(self):
# First we add a few special commands to the loop
class PWD(BaseControl):
def __call__(self, args):
self.ctx.out(os.getcwd())
class LS(BaseControl):
def __call__(self, args):
for p in sorted(path(os.getcwd()).listdir(
unreadable_as_empty=True)):
self.ctx.out(str(p.basename()))
class CD(BaseControl):
def _complete(self, text, line, begidx, endidx):
RE = re.compile(r"\s*cd\s*")
m = RE.match(line)
if m:
replaced = RE.sub('', line)
return self._complete_file(replaced, path(os.getcwd()))
return []
def _configure(self, parser):
parser.set_defaults(func=self.__call__)
parser.add_argument("dir", help="Target directory")
def __call__(self, args):
os.chdir(args.dir)
self.register("pwd", PWD, "Print the current directory")
self.register("ls", LS, "Print files in the current directory")
self.register("dir", LS, "Alias for 'ls'")
self.register("cd", CD, "Change the current directory")
try:
self.selfintro = "\n".join([OMEROSHELL, OMEROHELP])
if not self.stdin.isatty():
self.selfintro = ""
self.prompt = ""
while not self.interrupt_loop:
try:
# Calls the same thing as invoke
self.cmdloop(self.selfintro)
except KeyboardInterrupt:
self.selfintro = ""
self.out("Use quit to exit")
finally:
self.close()
[docs]
def postloop(self):
# We've done the intro once now. Don't repeat yourself.
self.selfintro = ""
[docs]
def onecmd(self, line, previous_args=None):
"""
Single command logic. Overrides the cmd.Cmd logic
by calling execute. Also handles various exception
conditions.
"""
try:
# Starting a new command. Reset the return value to 0
# If err or die are called, set rv non-0 value
self.rv = 0
try:
self._stack.insert(0, line)
self.dbg("Stack+: %s" % len(self._stack), level=2)
self.execute(line, previous_args)
return True
finally:
self._stack.pop(0)
self.dbg("Stack-: %s" % len(self._stack), level=2)
except SystemExit as exc: # Thrown by argparse
self.dbg("SystemExit raised\n%s" % traceback.format_exc())
self.rv = exc.code
return False
#
# This was perhaps only needed previously
# Omitting for the moment with the new
# argparse refactoring
#
# except AttributeError, ae:
# self.err("Possible error in plugin:")
# self.err(str(ae))
# if self.isdebug:
# traceback.print_exc()
except NonZeroReturnCode as nzrc:
self.dbg(traceback.format_exc())
self.rv = nzrc.rv
return False # Continue
[docs]
def postcmd(self, stop, line):
"""
Checks interrupt_loop for True and return as much
which will end the call to cmdloop. Otherwise use
the default postcmd logic (which simply returns stop)
"""
if self.interrupt_loop:
return True
return cmd.Cmd.postcmd(self, stop, line)
[docs]
def execute(self, line, previous_args):
"""
String/list handling as well as EOF and comment handling.
Otherwise, parses the arguments as shlexed and runs the
function returned by argparse.
"""
if isinstance(line, str):
if COMMENT.match(line):
return # EARLY EXIT!
args = shlex.split(line)
elif isinstance(line, (tuple, list)):
args = list(line)
else:
self.die(1, "Bad argument type: %s ('%s')" % (type(line), line))
if not args:
return
elif args == ["EOF"]:
self.exit("")
return
args = self.parser.parse_args(args, previous_args)
args.prog = self.parser.prog
self.waitForPlugins()
self.isquiet = getattr(args, "quiet", False)
debug_str = getattr(args, "debug", "")
debug_opts = set([x.lower() for x in debug_str.split(",")])
if "" in debug_opts:
debug_opts.remove("")
old_debug = self.isdebug
if "debug" in debug_opts:
self.isdebug = 1
debug_opts.remove("debug")
elif "0" in debug_opts:
self.isdebug = 0
debug_opts.remove("0")
for x in range(1, 9):
if str(x) in debug_opts:
self.isdebug = x
debug_opts.remove(str(x))
try:
if len(debug_opts) == 0:
args.func(args)
elif len(debug_opts) > 1:
self.die(9, "Conflicting debug options: %s"
% ", ".join(debug_opts))
elif "t" in debug_opts or "trace" in debug_opts:
import trace
tracer = trace.Trace()
tracer.runfunc(args.func, args)
elif "p" in debug_opts or "profile" in debug_opts:
from hotshot import stats, Profile
from omero.util import get_omero_userdir
profile_file = get_omero_userdir() / "hotshot_edi_stats"
prof = Profile(profile_file)
prof.runcall(lambda: args.func(args))
prof.close()
s = stats.load(profile_file)
s.sort_stats("time").print_stats()
else:
self.die(10, "Unknown debug action: %s" % debug_opts)
finally:
self.isdebug = old_debug
[docs]
def completedefault(self, *args):
return []
[docs]
def completenames(self, text, line, begidx, endidx):
names = list(self.controls.keys())
return [str(n + " ") for n in names if n.startswith(line)]
##########################################
##
# Context interface
##
[docs]
def exit(self, args, newline=True):
self.out(args, newline)
self.interrupt_loop = True
[docs]
def die(self, rc, text, newline=True):
self.err(text, newline)
self.rv = rc
# self.interrupt_loop = True
raise NonZeroReturnCode(rc, "die called: %s" % text)
def _env(self):
"""
Configure environment with PYTHONPATH and PATH as
setup by `omero`
This list needs to be kept in line with omero-py/bin/omero
"""
vlb = str(self.dir / "var" / "lib")
paths = os.path.pathsep.join([vlb])
env = dict(os.environ)
pypath = env.get("PYTHONPATH", None)
if pypath is None:
pypath = paths
else:
if pypath.endswith(os.path.pathsep):
pypath = "%s%s" % (pypath, paths)
else:
pypath = "%s%s%s" % (pypath, os.path.pathsep, paths)
env["PYTHONPATH"] = pypath
ospath = env.get("PATH", None)
binpypath = os.path.dirname(sys.executable)
if ospath:
if binpypath not in os.path.split(os.path.pathsep):
ospath = os.path.pathsep.join([ospath, binpypath])
else:
ospath = binpypath
env["PATH"] = ospath
return env
def _cwd(self, cwd):
if cwd is None:
cwd = str(self.dir)
else:
cwd = str(cwd)
return cwd
[docs]
def call(self, args, strict=True, cwd=None):
"""
Calls the string in a subprocess and dies if the return value is not 0
"""
self.dbg("Executing: %s" % args)
rv = subprocess.call(args, env=self._env(), cwd=self._cwd(cwd))
if strict and not rv == 0:
raise NonZeroReturnCode(rv, "%s => %d" % (" ".join(args), rv))
return rv
[docs]
def popen(self, args, cwd=None, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, **kwargs):
self.dbg("Returning popen: %s" % args)
env = self._env()
env.update(kwargs)
return subprocess.Popen(args, env=env, cwd=self._cwd(cwd),
stdout=stdout, stderr=stderr)
[docs]
def get_config_property_lines(self, root_path):
"""
Construct a generator providing each line of the configuration
property files from OMERO components then from the top level.
Trailing whitespace is stripped from each line.
"""
jar_root = root_path / 'lib' / 'server'
for component in OMERO_COMPONENTS:
from zipfile import ZipFile, is_zipfile, BadZipfile
jar_name = jar_root / 'omero-{}.jar'.format(component)
if is_zipfile(jar_name):
config_name = 'omero-{}.properties'.format(component)
try:
with ZipFile(jar_name, 'r') as jar_file:
with jar_file.open(config_name, 'r') as config:
for line in iter(config.readline, ''):
if not line:
break
yield line.rstrip()
except (BadZipfile, IOError, KeyError):
pass
# etc/omero.properties comes last because it may contain "### END"
try:
file_path = root_path / 'etc' / 'omero.properties'
with open(file_path, 'r') as config:
for line in iter(config.readline, ''):
if not line:
break
yield line.rstrip()
except IOError:
pass
[docs]
def readDefaults(self):
lines = self.get_config_property_lines(path(self._cwd(None)))
defaults = ""
for line in lines:
if isinstance(line, bytes):
defaults += line.decode("utf-8")
else:
defaults += line
defaults += "\n"
if not defaults:
print("No properties files found for OMERO default configuration.")
return defaults
[docs]
def parsePropertyFile(self, data, output):
for line in output.splitlines():
if isinstance(line, bytes):
line = line.decode("utf-8")
if line.startswith(
"Listening for transport dt_socket at address"):
self.dbg(
"Ignoring stdout 'Listening for transport' from DEBUG=1")
continue
parts = line.split("=", 1)
if len(parts) == 2:
data.properties.setProperty(parts[0], parts[1])
self.dbg("Set property: %s=%s" % (parts[0], parts[1]))
else:
self.dbg("Bad property:"+str(parts))
return data
[docs]
def initData(self, properties=None):
"""
Uses "omero prefs" to create an Ice.InitializationData().
"""
if properties is None:
properties = {}
from omero.plugins.prefs import getprefs
try:
output = getprefs(["get"], str(path(self._cwd(None)) / "lib"))
except OSError as err:
self.err("Error getting preferences")
self.dbg(err)
output = ""
import Ice
data = Ice.InitializationData()
data.properties = Ice.createProperties()
for k, v in list(properties.items()):
data.properties.setProperty(k, v)
self.parsePropertyFile(data, output)
return data
[docs]
def conn(self, args=None):
"""
Returns any active _client object. If one is present but
not alive, it will be removed.
If no client is found and arguments are available,
will use the current settings to connect.
If required attributes are missing, will delegate to the login
command.
FIXME: Currently differing setting sessions on the same CLI instance
will misuse a client.
"""
if self.get_client():
self.dbg("Found client")
try:
self.get_client().getSession().keepAlive(None)
self.dbg("Using client")
return self.get_client()
except KeyboardInterrupt:
raise
except Exception as e:
self.dbg("Removing client: %s" % e)
self.get_client().closeSession()
self.set_client(None)
if args is not None:
if "sessions" not in self.controls:
# Most likely to happen during development
self.die(111, "No sessions control! Cannot login")
self.controls["sessions"].login(args)
return self.get_client() # Possibly added by "login"
[docs]
def close(self):
client = self.get_client()
if client:
self.dbg("Closing client: %s" % client)
client.__del__()
##
# Plugin registry
##
[docs]
def register(self, name, Control, help, epilog=None):
self.register_only(name, Control, help, epilog=epilog)
self.configure_plugins()
[docs]
def register_only(self, name, Control, help, epilog=None):
""" This method is added to the globals when exec() is
called on each plugin. A Control class should be
passed to the register method which will be added to the CLI.
"""
self.controls[name] = (Control, help, epilog)
[docs]
def waitForPlugins(self):
if True:
return # Disabling. See comment in argv
self.dbg("Starting waitForPlugins")
while not self._pluginsLoaded.get():
self.dbg("Waiting for plugins...")
time.sleep(0.1)
[docs]
def loadplugins(self):
"""
Finds all plugins and gives them a chance to register
themselves with the CLI instance. Here register_only()
is used to guarantee the orderedness of the plugins
in the parser
"""
paths = set(self._plugin_paths)
for x in sys.path:
x = path(x)
if x.isdir():
x = x / "omero" / "plugins"
if x.exists():
paths.add(x)
else:
if self.isdebug:
print("Can't load %s" % x)
for plugin_path in paths:
self.loadpath(path(plugin_path))
self.configure_plugins()
self._pluginsLoaded.set()
self.post_process()
[docs]
def loadpath(self, pathobj):
if pathobj.isdir():
for plugin in sorted(pathobj.walkfiles("*.py")):
if -1 == plugin.find("#"): # Omit emacs files
self.loadpath(path(plugin))
else:
if self.isdebug:
print("Loading %s" % pathobj)
try:
loc = {"register": self.register_only}
with open(str(pathobj), "r") as f:
exec(f.read(), loc)
except KeyboardInterrupt:
raise
except:
self.err("Error loading: %s" % pathobj)
traceback.print_exc()
[docs]
def get_event_context(self):
return getattr(self, '_event_context', None)
[docs]
def set_event_context(self, ec):
setattr(self, '_event_context', ec)
[docs]
def get_client(self):
return getattr(self, '_client', None)
[docs]
def set_client(self, client):
setattr(self, '_client', client)
# End Cli
###########################################################
[docs]
@contextmanager
def cli_login(*args, **kwargs):
"""
args will be appended to ["-q", "login"] and then
passed to onecmd
kwargs:
- keep_alive
"""
keep_alive = kwargs.get("keep_alive", 300)
try:
cli = omero.cli.CLI()
cli.loadplugins()
login = ["-q", "login"]
login.extend(list(args))
cli.onecmd(login)
if keep_alive is not None:
client = cli.get_client()
if client is not None:
keep_alive = int(keep_alive)
client.enableKeepAlive(keep_alive)
else:
raise Exception("Failed to login")
yield cli
finally:
cli.close()
[docs]
def argv(args=sys.argv):
"""
Main entry point for the OMERO command-line interface. First
loads all plugins by passing them the classes defined here
so they can register their methods.
Then the case where arguments are passed on the command line are
handled.
Finally, the cli enters a command loop reading from standard in.
"""
# Modifying the args list if the name of the file
# has arguments encoded in it
original_executable = path(args[0])
base_executable = str(original_executable.basename())
if base_executable.find("-") >= 0:
parts = base_executable.split("-")
for arg in args[1:]:
parts.append(arg)
args = parts
# Now load other plugins. After debugging is turned on, but before
# tracing.
cli = CLI(prog=original_executable.split("-")[0])
parser = Parser(add_help=False)
# parser.add_argument("-d", "--debug", help="Use 'help debug' for more
# information", default = SUPPRESS)
parser.add_argument(
"--path", action="append",
help="Add file or directory to plugin list. Supports globs.")
ns, args = parser.parse_known_args(args)
if getattr(ns, "path"):
for p in ns.path:
for g in glob.glob(p):
cli._plugin_paths.append(g)
# For argparse dispatch, this cannot be done lazily
cli.loadplugins()
if len(args) > 1:
cli.invoke(args[1:])
return cli.rv
else:
cli.invokeloop()
return cli.rv
#####################################################
#
# Specific argument types
[docs]
class ExperimenterArg(object):
def __init__(self, arg):
self.orig = arg
self.usr = None
try:
self.usr = int(arg)
except ValueError:
if ":" in arg:
parts = arg.split(":", 1)
if parts[0] == "User" or "Experimenter":
try:
self.usr = int(parts[1])
except ValueError:
pass
[docs]
def lookup(self, client):
if self.usr is None:
import omero
a = client.sf.getAdminService()
try:
self.usr = a.lookupExperimenter(self.orig).id.val
except omero.ApiUsageException:
pass
return self.usr
[docs]
class ExperimenterGroupArg(object):
def __init__(self, arg):
self.orig = arg
self.grp = None
try:
self.grp = int(arg)
except ValueError:
if ":" in arg:
parts = arg.split(":", 1)
if parts[0] == "Group" or "ExperimenterGroup":
try:
self.grp = int(parts[1])
except ValueError:
pass
[docs]
def lookup(self, client):
if self.grp is None:
import omero
a = client.sf.getAdminService()
try:
self.grp = a.lookupGroup(self.orig).id.val
except omero.ApiUsageException:
pass
return self.grp
[docs]
class GraphArg(object):
def __init__(self, cmd_type):
self.cmd_type = cmd_type
def __call__(self, arg):
cmd = self.cmd_type()
targetObjects = dict()
try:
parts = arg.split(":", 1)
assert len(parts) == 2
assert '+' not in parts[0]
parts[0] = parts[0].lstrip("/")
graph = parts[0].split("/")
ids = []
needsForce = False
for id in parts[1].split(","):
if "-" in id:
needsForce = True
low, high = list(map(int, id.split("-")))
if high < low:
raise ValueError("Bad range: %s", arg)
ids.extend(list(range(low, high+1)))
else:
ids.append(int(id))
targetObjects[graph[0]] = ids
cmd.targetObjects = targetObjects
if len(graph) > 1:
skiphead = omero.cmd.SkipHead()
skiphead.request = cmd
skiphead.targetObjects = targetObjects
skiphead.startFrom = [graph[-1]]
cmd = skiphead
return cmd, needsForce
except:
raise ValueError("Bad object: %s", arg)
def __repr__(self):
return "argument"
#####################################################
#
# Specific superclasses for various controls
[docs]
class CmdControl(BaseControl):
[docs]
def cmd_type(self):
raise Exception("Must be overridden by subclasses")
def _configure(self, parser):
parser.set_defaults(func=self.main_method)
self._add_wait(parser, default=-1)
[docs]
def main_method(self, args):
client = self.ctx.conn(args)
req = self.cmd_type()
self._process_request(req, args, client)
def _process_request(self, req, args, client):
"""
Allow specific filling of parameters in the request.
"""
cb = None
try:
rsp, status, cb = self.response(client, req, wait=args.wait)
self.print_report(req, rsp, status, args.report)
finally:
if cb is not None:
cb.close(True) # Close handle
[docs]
def get_error(self, rsp):
if not isinstance(rsp, omero.cmd.ERR):
return None
else:
sb = "failed: '%s'\n" % rsp.name
sb += self.create_error_report(rsp)
return sb
[docs]
def create_error_report(self, rsp):
if isinstance(rsp, omero.cmd.GraphException):
return 'failed: %s' % rsp.message
"""
Generate default error report aggregating the response parameters
"""
sb = ""
if rsp.parameters:
for k in sorted(rsp.parameters):
v = rsp.parameters.get(k, "")
sb += "\t%s=%s\n" % (k, v)
return sb
[docs]
def print_report(self, req, rsp, status, detailed):
self.ctx.out(self.print_request_description(req), newline=False)
err = self.get_error(rsp)
if err:
self.ctx.err(err)
else:
if hasattr(req, 'dryRun') and req.dryRun:
self.ctx.out("Dry run performed")
self.ctx.out("ok")
if detailed:
self.ctx.out("Steps: %s" % status.steps)
if status.stopTime > 0 and status.startTime > 0:
elapse = status.stopTime - status.startTime
self.ctx.out("Elapsed time: %s secs." % (elapse / 1000.0))
else:
self.ctx.out("Unfinished.")
self.ctx.out("Flags: %s" % status.flags)
self.print_detailed_report(req, rsp, status)
[docs]
def print_detailed_report(self, req, rsp, status):
"""
Extension point for subclasses.
"""
pass
[docs]
def line_to_opts(self, line, opts):
if not line or line.startswith("#"):
return
parts = line.split("=", 1)
if len(parts) == 1:
parts.append("")
opts[parts[0].strip()] = parts[1].strip()
[docs]
def response(self, client, req, loops=8, ms=500, wait=None):
import omero.callbacks
handle = client.sf.submit(req)
cb = omero.callbacks.CmdCallbackI(client, handle)
if wait is None:
cb.loop(loops, ms)
elif wait == 0:
self.ctx.out("Exiting immediately")
elif wait > 0:
ms = wait * 1000
ms = ms // loops
self.ctx.out("Waiting %s loops of %s ms" % (ms, loops))
cb.loop(loops, ms)
else:
try:
# Wait for finish
while True:
found = cb.block(ms)
if found:
break
# If user uses Ctrl-C, then cancel
except KeyboardInterrupt:
self.ctx.out("Attempting cancel...")
if handle.cancel():
self.ctx.out("Cancelled")
else:
self.ctx.out("Failed to cancel")
return cb.getResponse(), cb.getStatus(), cb
[docs]
class GraphControl(CmdControl):
[docs]
def cmd_type(self):
raise Exception("Must be overridden by subclasses")
def _configure(self, parser):
parser.set_defaults(func=self.main_method)
self._add_wait(parser, default=-1)
parser.add_argument(
"--include",
help="Specify kinds of object to include",
metavar="CLASS",
nargs="+", type=lambda s: s.split(","), action="append")
parser.add_argument(
"--exclude",
help="Specify kinds of object to exclude",
metavar="CLASS",
nargs="+", type=lambda s: s.split(","), action="append")
parser.add_argument(
"--ordered", action="store_true",
help=("Pass objects to commands in the given order"))
parser.add_argument(
"--list", action="store_true",
help="Print a list of all available graph specs")
parser.add_argument(
"--list-details", action="store_true", help=SUPPRESS)
parser.add_argument(
"--report", action="store_true",
help="Print more detailed report of each action")
parser.add_argument(
"--dry-run", action="store_true",
help=("Do a dry run of the command, providing a "
"report of what would have been done"))
parser.add_argument(
"--force", action="store_true",
help=("Force an action that otherwise defaults to a dry run"))
self._pre_objects(parser)
self._objects(parser)
def _pre_objects(self, parser):
"""
Allows configuring before the "obj" n-argument is added.
"""
pass
def _objects(self, parser):
"""
Allows configuring the "obj" n-argument by overriding this method.
"""
parser.add_argument(
"obj", nargs="*", type=GraphArg(self.cmd_type()),
help="Objects to be processed in the form <Class>:<Id>")
[docs]
def as_doall(self, req_or_doall):
if not isinstance(req_or_doall, omero.cmd.DoAll):
req_or_doall = omero.cmd.DoAll([req_or_doall])
return req_or_doall
[docs]
def default_exclude(self):
"""
Return a list of types to exclude by default.
"""
return []
[docs]
def main_method(self, args):
from itertools import chain
client = self.ctx.conn(args)
if args.list_details or args.list:
cb = None
req = omero.cmd.LegalGraphTargets(self.cmd_type()())
try:
try:
speclist, status, cb = self.response(client, req)
except omero.LockTimeout as lt:
self.ctx.die(446, "LockTimeout: %s" % lt.message)
finally:
if cb is not None:
cb.close(True) # Close handle
# Could be put in positive_response helper
err = self.get_error(speclist)
if err:
self.ctx.die(367, err)
specs = sorted([t.split(".")[-1] for t in speclist.targets])
self.ctx.out("\n".join(specs))
return # Early exit.
inc = []
if args.include:
inc.extend(chain(*chain(*args.include)))
exc = self.default_exclude()
if args.exclude:
exc.extend(chain(*chain(*args.exclude)))
if inc or exc:
opt = omero.cmd.graphs.ChildOption()
if inc:
opt.includeType = inc
if exc:
opt.excludeType = exc
if args.obj is None or not args.obj:
self.ctx.die(440, "no object targets supplied for graph operation")
commands, forces = list(zip(*args.obj))
show = not (args.force or args.dry_run)
needsForce = any(forces)
if needsForce and show:
warnings.warn("\nUsing '--dry-run'.\
Future versions will switch to '--force'.\
Explicitly set the parameter for portability",
DeprecationWarning)
for req in commands:
req.dryRun = args.dry_run or needsForce
if args.force:
req.dryRun = False
if inc or exc:
req.childOptions = [opt]
if isinstance(req, omero.cmd.SkipHead):
req.request.childOptions = req.childOptions
req.request.dryRun = req.dryRun
if not args.ordered and len(commands) > 1:
commands = self.combine_commands(commands)
for command_check in commands:
self._check_command(command_check)
if len(commands) == 1:
cmd = commands[0]
else:
cmd = omero.cmd.DoAll(commands)
self._process_request(cmd, args, client)
def _check_command(self, command_check):
query = self.ctx.get_client().sf.getQueryService()
ec = self.ctx.get_event_context()
own_id = ec.userId
if not command_check or not command_check.targetObjects:
return
for k, v in list(command_check.targetObjects.items()):
query_str = (
"select "
"x.details.owner.id, "
"x.details.group.details.permissions "
"from %s x "
"where x.id = :id") % k
if not v:
return
for w in v:
try:
uid, perms = omero.rtypes.unwrap(
query.projection(
query_str,
omero.sys.ParametersI().addId(w),
{"omero.group": "-1"})[0])
perms = perms["perm"]
perms = omero.model.PermissionsI(perms)
if perms.isGroupWrite() and uid != own_id:
self.ctx.err(
"WARNING: %s:%s belongs to user %s" % (
k, w, uid))
except:
self.ctx.dbg(traceback.format_exc())
# Doing nothing since this is a best effort
[docs]
def combine_commands(self, commands):
"""
Combine several commands into as few as possible.
For simple commands a single combined command is possible,
for a skiphead it is more complicated. Here skipheads are
combined using their startFrom object type.
"""
from omero.cmd import SkipHead
skipheads = [req for req in commands if isinstance(req, SkipHead)]
others = [req for req in commands if not isinstance(req, SkipHead)]
rv = []
# Combine all simple commands
if len(others) == 1:
rv.extend(others)
elif len(others) > 1:
for req in others[1:]:
type, ids = list(req.targetObjects.items())[0]
others[0].targetObjects.setdefault(type, []).extend(ids)
rv.append(others[0])
# Group skipheads by their startFrom attribute.
if len(skipheads) == 1:
rv.extend(skipheads)
elif len(skipheads) > 1:
shmap = {skipheads[0].startFrom[0]: skipheads[0]}
for req in skipheads[1:]:
if req.startFrom[0] in shmap:
type, ids = list(req.targetObjects.items())[0]
if type in shmap[req.startFrom[0]].targetObjects:
shmap[req.startFrom[0]].targetObjects[type].extend(ids)
else:
shmap[req.startFrom[0]].targetObjects[type] = ids
else:
shmap[req.startFrom[0]] = req
for req in list(shmap.values()):
rv.append(req)
return rv
[docs]
def print_request_description(self, request):
doall = self.as_doall(request)
cmd_type = self.cmd_type().ice_staticId()[2:].replace("::", ".")
objects = []
for req in doall.requests:
for type in list(req.targetObjects.keys()):
ids = self._order_and_range_ids(req.targetObjects[type])
if isinstance(req, omero.cmd.SkipHead):
type += ("/" + req.startFrom[0])
objects.append('%s:%s' % (type, ids))
return "%s %s " % (cmd_type, ' '.join(objects))
def _get_object_ids(self, objDict):
import collections
objIds = {}
for k in list(objDict.keys()):
if objDict[k]:
objIds[k] = self._order_and_range_ids(objDict[k])
newIds = collections.OrderedDict(sorted(objIds.items()))
objIds = collections.OrderedDict()
for k in newIds:
key = k[k.rfind('.')+1:]
objIds[key] = newIds[k]
return objIds
[docs]
class UserGroupControl(BaseControl):
[docs]
def error_invalid_groupid(self, group_id, msg="Not a valid group ID: %s",
code=502, fatal=True):
if fatal:
self.ctx.die(code, msg % group_id)
else:
self.ctx.err(msg % group_id)
[docs]
def error_invalid_group(self, group, msg="Unknown group: %s", code=503,
fatal=True):
if fatal:
self.ctx.die(code, msg % group)
else:
self.ctx.err(msg % group)
[docs]
def error_no_group_found(self, msg="No group found", code=504,
fatal=True):
if fatal:
self.ctx.die(code, msg)
else:
self.ctx.err(msg)
[docs]
def error_ambiguous_group(self, id_or_name,
msg="Ambiguous group identifier: %s", code=505,
fatal=True):
if fatal:
self.ctx.die(code, msg % id_or_name)
else:
self.ctx.err(msg % id_or_name)
[docs]
def error_invalid_userid(self, user_id, msg="Not a valid user ID: %s",
code=512, fatal=True):
if fatal:
self.ctx.die(code, msg % user_id)
else:
self.ctx.err(msg % user_id)
[docs]
def error_invalid_user(self, user, msg="Unknown user: %s", code=513,
fatal=True):
if fatal:
self.ctx.die(code, msg % user)
else:
self.ctx.err(msg % user)
[docs]
def error_no_user_found(self, msg="No user found", code=514, fatal=True):
if fatal:
self.ctx.die(code, msg)
else:
self.ctx.err(msg)
[docs]
def error_ambiguous_user(self, id_or_name,
msg="Ambiguous user identifier: %s", code=515,
fatal=True):
if fatal:
self.ctx.die(code, msg % id_or_name)
else:
self.ctx.err(msg % id_or_name)
[docs]
def find_group_by_id(self, admin, group_id, fatal=False):
import omero
try:
gid = int(group_id)
g = admin.getGroup(gid)
except ValueError:
self.error_invalid_groupid(group_id, fatal=fatal)
return None, None
except omero.ApiUsageException:
self.error_invalid_group(gid, fatal=fatal)
return None, None
return gid, g
[docs]
def find_group_by_name(self, admin, group_name, fatal=False):
import omero
try:
g = admin.lookupGroup(group_name)
gid = g.id.val
except omero.ApiUsageException:
self.error_invalid_group(group_name, fatal=fatal)
return None, None
return gid, g
[docs]
def find_group(self, admin, id_or_name, fatal=False):
import omero
# Find by group by name
try:
g1 = admin.lookupGroup(id_or_name)
except omero.ApiUsageException:
g1 = None
# Find by group by id
try:
g2 = admin.getGroup(int(id_or_name))
except (ValueError, omero.ApiUsageException):
g2 = None
# Test found groups
if g1 and g2:
if g1.id.val != g2.id.val:
self.error_ambiguous_group(id_or_name, fatal=fatal)
return None, None
else:
g = g1
elif g1:
g = g1
elif g2:
g = g2
else:
self.error_invalid_group(id_or_name, fatal=fatal)
return None, None
return g.id.val, g
[docs]
def find_user_by_id(self, admin, user_id, fatal=False):
import omero
try:
uid = int(user_id)
u = admin.getExperimenter(uid)
except ValueError:
self.error_invalid_userid(user_id, fatal=fatal)
return None, None
except omero.ApiUsageException:
self.error_invalid_user(uid, fatal=fatal)
return None, None
return uid, u
[docs]
def find_user_by_name(self, admin, user_name, fatal=False):
import omero
try:
u = admin.lookupExperimenter(user_name)
uid = u.id.val
except omero.ApiUsageException:
self.error_invalid_user(user_name, fatal=fatal)
return None, None
return uid, u
[docs]
def find_user(self, admin, id_or_name, fatal=False):
import omero
# Find user by name
try:
u1 = admin.lookupExperimenter(id_or_name)
except omero.ApiUsageException:
u1 = None
# Find user by id
try:
u2 = admin.getExperimenter(int(id_or_name))
except (ValueError, omero.ApiUsageException):
u2 = None
# Test found users
if u1 and u2:
if u1.id.val != u2.id.val:
self.error_ambiguous_user(id_or_name, fatal=fatal)
return None, None
else:
u = u1
elif u1:
u = u1
elif u2:
u = u2
else:
self.error_invalid_user(id_or_name, fatal=fatal)
return None, None
return u.id.val, u
[docs]
def addusersbyid(self, admin, group, users):
import omero
for user in list(users):
admin.addGroups(omero.model.ExperimenterI(user, False), [group])
self.ctx.out("Added %s to group %s" % (user, group.id.val))
[docs]
def removeusersbyid(self, admin, group, users):
import omero
for user in list(users):
admin.removeGroups(omero.model.ExperimenterI(user, False), [group])
self.ctx.out("Removed %s from group %s" % (user, group.id.val))
[docs]
def addownersbyid(self, admin, group, users):
import omero
for user in list(users):
admin.addGroupOwners(group,
[omero.model.ExperimenterI(user, False)])
self.ctx.out("Added %s to the owner list of group %s"
% (user, group.id.val))
[docs]
def removeownersbyid(self, admin, group, users):
import omero
for user in list(users):
admin.removeGroupOwners(group,
[omero.model.ExperimenterI(user, False)])
self.ctx.out("Removed %s from the owner list of group %s"
% (user, group.id.val))
[docs]
def getuserids(self, group):
ids = [x.child.id.val for x in group.copyGroupExperimenterMap()]
return ids
[docs]
def getmemberids(self, group):
ids = [x.child.id.val for x in group.copyGroupExperimenterMap()
if not x.owner.val]
return ids
[docs]
def getownerids(self, group):
ids = [x.child.id.val for x in group.copyGroupExperimenterMap()
if x.owner.val]
return ids
[docs]
def output_users_list(self, admin, users, args):
roles = admin.getSecurityRoles()
user_group = roles.userGroupId
sys_group = roles.systemGroupId
from omero.util.text import TableBuilder
if args.count:
tb = TableBuilder("id", "login", "first name", "last name",
"email", "active", "ldap", "admin",
"# group memberships", "# group ownerships")
else:
tb = TableBuilder("id", "login", "first name", "last name",
"email", "active", "ldap", "admin", "member of",
"owner of")
if args.style:
tb.set_style(args.style)
# Sort users
if isinstance(users, list):
if args.sort_by_login:
users.sort(key=lambda x: x.omeName.val)
elif args.sort_by_first_name:
users.sort(key=lambda x: x.firstName.val)
elif args.sort_by_last_name:
users.sort(key=lambda x: x.lastName.val)
elif args.sort_by_email:
users.sort(key=lambda x: (x.email and x.email.val or ""))
elif args.sort_by_id:
users.sort(key=lambda x: x.id.val)
else:
users = [users]
for user in users:
row = [user.id.val, user.omeName.val, user.firstName.val,
user.lastName.val]
row.append(user.email and user.email.val or "")
active = ""
admin = ""
ldap = user.ldap.val
member_of = []
leader_of = []
for x in user.copyGroupExperimenterMap():
if not x:
continue
gid = x.parent.id.val
if user_group == gid:
active = "Yes"
elif sys_group == gid:
admin = "Yes"
elif x.owner.val:
leader_of.append(str(gid))
else:
member_of.append(str(gid))
row.append(active)
row.append(ldap)
row.append(admin)
if member_of:
if args.count:
row.append(len(member_of))
else:
row.append(",".join(member_of))
else:
row.append("")
if leader_of:
if args.count:
row.append(len(leader_of))
else:
row.append(",".join(leader_of))
else:
row.append("")
tb.row(*tuple(row))
self.ctx.out(str(tb.build()))
[docs]
def output_groups_list(self, groups, args):
from omero.util.text import TableBuilder
# Sort groups
if args.sort_by_name:
groups.sort(key=lambda x: x.name.val)
elif args.sort_by_id:
groups.sort(key=lambda x: x.id.val)
if args.long:
tb = TableBuilder("id", "name", "perms", "ldap", "owner ids",
"member ids")
else:
tb = TableBuilder("id", "name", "perms", "ldap", "# of owners",
"# of members")
if args.style:
tb.set_style(args.style)
for group in groups:
row = [group.id.val, group.name.val,
str(group.details.permissions), group.ldap.val]
ownerids = self.getownerids(group)
memberids = self.getmemberids(group)
if args.long:
row.append(",".join(sorted([str(x) for x in ownerids])))
row.append(",".join(sorted([str(x) for x in memberids])))
else:
row.append(len(ownerids))
row.append(len(memberids))
tb.row(*tuple(row))
self.ctx.out(str(tb.build()))
[docs]
def add_id_name_arguments(self, parser, objtype=""):
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--id", help="ID of the %s" % objtype)
group.add_argument(
"--name", help="Name of the %s" % objtype)
return group
[docs]
def add_user_and_group_arguments(self, parser, *args,
**kwargs):
group = parser
try:
if kwargs.pop("exclusive"):
group = parser.add_mutually_exclusive_group()
except:
pass
group.add_argument("--user-id",
help="ID of the user.",
*args, **kwargs)
group.add_argument("--user-name",
help="Name of the user.",
*args, **kwargs)
group.add_argument("--group-id",
help="ID of the group.",
*args, **kwargs)
group.add_argument("--group-name",
help="Name of the group.",
*args, **kwargs)
[docs]
def add_user_arguments(self, parser, action=""):
group = parser.add_argument_group('User arguments')
group.add_argument("user_id_or_name", metavar="user", nargs="*",
help="ID or name of the user(s)%s" % action)
group.add_argument("--user-id", metavar="user", nargs="+",
help="ID of the user(s)%s" % action)
group.add_argument("--user-name", metavar="user", nargs="+",
help="Name of the user(s)%s" % action)
return group
[docs]
def add_single_user_argument(self, parser, action="", required=True):
group = parser.add_mutually_exclusive_group(required=required)
group.add_argument("--user-id", metavar="user",
help="ID of the user%s" % action)
group.add_argument("--user-name", metavar="user",
help="Name of the user%s" % action)
return group
[docs]
def list_users(self, a, args, use_context=False):
"""
Retrieve users from the arguments defined in
:meth:`add_user_arguments`
"""
# Check input arguments
has_user_arguments = (args.user_id_or_name or args.user_id
or args.user_name)
if (not use_context and not has_user_arguments):
self.error_no_input_user(fatal=True)
# Retrieve groups by id or name
uid_list = []
u_list = []
if args.user_id_or_name:
for user in args.user_id_or_name:
[uid, u] = self.find_user(a, user, fatal=False)
if uid is not None:
uid_list.append(uid)
u_list.append(u)
if args.user_id:
for user_id in args.user_id:
[uid, u] = self.find_user_by_id(a, user_id, fatal=False)
if uid is not None:
uid_list.append(uid)
u_list.append(u)
if args.user_name:
for user_name in args.user_name:
[uid, u] = self.find_user_by_name(a, user_name, fatal=False)
if uid is not None:
uid_list.append(uid)
u_list.append(u)
if not uid_list:
if not use_context or has_user_arguments:
self.error_no_user_found(fatal=True)
else:
ec = self.ctx.get_event_context()
[uid, u] = self.find_user_by_id(a, ec.userId, fatal=False)
uid_list.append(uid)
u_list.append(u)
return uid_list, u_list
[docs]
def add_group_arguments(self, parser, action=""):
group = parser.add_argument_group('Group arguments')
group.add_argument(
"group_id_or_name", metavar="group", nargs="*",
help="ID or name of the group(s)%s" % action)
group.add_argument(
"--group-id", metavar="group", nargs="+",
help="ID of the group(s)%s" % action)
group.add_argument(
"--group-name", metavar="group", nargs="+",
help="Name of the group(s)%s" % action)
return group
[docs]
def add_single_group_argument(self, parser, action="", required=True):
group = parser.add_mutually_exclusive_group(required=required)
group.add_argument("--group-id", metavar="group",
help="ID of the group%s" % action)
group.add_argument("--group-name", metavar="group",
help="Name of the group%s" % action)
return group
[docs]
def list_groups(self, a, args, use_context=False):
"""
Retrieve users from the arguments defined in
:meth:`add_user_arguments`
"""
# Check input arguments
has_group_arguments = (args.group_id_or_name or args.group_id
or args.group_name)
if (not use_context and not has_group_arguments):
self.error_no_input_group(fatal=True)
# Retrieve groups by id or name
gid_list = []
g_list = []
if args.group_id_or_name:
for group in args.group_id_or_name:
[gid, g] = self.find_group(a, group, fatal=False)
if g:
gid_list.append(gid)
g_list.append(g)
if args.group_id:
for group_id in args.group_id:
[gid, g] = self.find_group_by_id(a, group_id, fatal=False)
if g:
gid_list.append(gid)
g_list.append(g)
if args.group_name:
for group_name in args.group_name:
[gid, g] = self.find_group_by_name(a, group_name, fatal=False)
if g:
gid_list.append(gid)
g_list.append(g)
if not gid_list:
if not use_context or has_group_arguments:
self.error_no_group_found(fatal=True)
else:
ec = self.ctx.get_event_context()
[gid, g] = self.find_group_by_id(a, ec.groupId, fatal=False)
gid_list.append(gid)
g_list.append(g)
return gid_list, g_list
[docs]
def get_users_groups(self, args, iadmin):
users = []
groups = []
if args.user_name:
for user_name in args.user_name:
uid, u = self.find_user_by_name(
iadmin, user_name, fatal=False)
if uid is not None:
users.append(uid)
if args.user_id:
for user_id in args.user_id:
uid, u = self.find_user_by_id(
iadmin, user_id, fatal=False)
if uid is not None:
users.append(uid)
if args.group_name:
for group_name in args.group_name:
gid, g = self.find_group_by_name(
iadmin, group_name, fatal=False)
if gid is not None:
groups.append(gid)
if args.group_id:
for group_id in args.group_id:
gid, g = self.find_group_by_id(
iadmin, group_id, fatal=False)
if gid is not None:
groups.append(gid)
return users, groups
[docs]
def get_single_user_group(self, args, iadmin):
u = None
g = None
if args.user_name:
uid, u = self.find_user_by_name(
iadmin, args.user_name, fatal=False)
if args.user_id:
uid, u = self.find_user_by_id(
iadmin, args.user_id, fatal=False)
if args.group_name:
gid, g = self.find_group_by_name(
iadmin, args.group_name, fatal=False)
if args.group_id:
for group_id in args.group_id:
gid, g = self.find_group_by_id(
iadmin, args.group_id, fatal=False)
return u, g