Source code for plugins.script

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
# Copyright 2008 Glencoe Software, Inc. All rights reserved.
# Use is subject to license terms supplied in LICENSE.txt

"""
script plugin

Plugin read by omero.cli.Cli during initialization. The method(s)
defined here will be added to the Cli class for later use.

The script plugin is used to run arbitrary blitz scripts which
take as their sole input Ice configuration arguments, including
--Ice.Config=file1,file2.

The first parameter, the script itself, should be natively executable
on a given platform i.e. invokable by subprocess.call([file,...])
"""

import re
import os
import sys
import signal
import atexit

from omero.cli import CLI
from omero.cli import BaseControl

from omero.util.sessions import SessionsStore

from omero_ext.path import path

MIMETYPE = "text/x-python"

HELP = """Support for launching, uploading and otherwise managing \
OMERO.scripts"""

DEMO_SCRIPT = """#!/usr/bin/env python
import omero
import omero.rtypes as rtypes
import omero.scripts as scripts

o = scripts.Long("opt", min=0, max=5)
a = scripts.String("a", values=("foo", "bar"), optional=False)
b = scripts.Long("b").out()

client = scripts.client("length of input string",
\"\"\"
    Trivial example script which calculates the length
    of the string passed in as the "a" input, and returns
    the value as the long "b"
\"\"\", a, b, o,

authors = ["OME Team"],
institutions = ["openmicroscopy.org"])

print("Starting script")

try:
    a = client.getInput("a").getValue()
    b = len(a)
    client.setOutput("b", rtypes.rlong(b))
    client.setOutput("unregistered-output-param", rtypes.wrap([1,2,3]))
finally:
    client.closeSession()

print("Finished script")
"""

RE0 = re.compile(r"\s*script\s+upload\s*")
RE1 = re.compile(r"\s*script\s+upload\s+--official\s*")


[docs] class ScriptControl(BaseControl): def _complete(self, text, line, begidx, endidx): """ Returns a file after "upload" and otherwise delegates to the BaseControl """ for RE in (RE1, RE0): m = RE.match(line) if m: replaced = RE.sub('', line) suggestions = self._complete_file(replaced, os.getcwd()) if False: # line.find("--official") < 0: add = "--official" parts = line.split(" ") if "--official".startswith(parts[-1]): new = add[len(parts[-1]):] if new: add = new suggestions.insert(0, add) return suggestions return BaseControl._complete(self, text, line, begidx, endidx) def _configure(self, parser): def _who(parser): return parser.add_argument( "who", nargs="*", help="Who to execute for: user, group, user=1, group=5" " (default=official)") parser.add_login_arguments() sub = parser.sub() # Disabling for 4.2 release. help = parser.add(sub, self.help, # "Extended help") demo = parser.add( sub, self.demo, "Runs a short demo of the scripting system") list = parser.add( sub, self.list, help="List files for user or group") _who(list) cat = parser.add(sub, self.cat, "Prints a script to standard out") edit = parser.add( sub, self.edit, "Opens a script in $EDITOR and saves it back to the server") params = parser.add( sub, self.params, help="Print the parameters for a given script") launch = parser.add( sub, self.launch, help="Launch a script with parameters") disable = parser.add( sub, self.disable, help="Makes script non-executable by setting the mimetype") disable.add_argument( "--mimetype", default="text/plain", help="Use a mimetype other than the default (%(default)s)") enable = parser.add( sub, self.enable, help="Makes a script executable (e.g. sets" " mimetype to %s)" % MIMETYPE) enable.add_argument( "--mimetype", default=MIMETYPE, help="Use a mimetype other than the default (%(default)s)") replace = parser.add( sub, self.replace, help="Replace an existing script with a new value") for x in (launch, params, cat, disable, enable, edit, replace): x.add_argument( "original_file", help="Id or path of a script file stored in OMERO") launch.add_argument( "input", nargs="*", help="Inputs for the script of the form 'param=value'") jobs = parser.add( sub, self.jobs, help="List current jobs for user or group") jobs.add_argument( "--all", action="store_true", help="Show all jobs, not just running ones") _who(jobs) serve = parser.add( sub, self.serve, help="Start a usermode processor for scripts") serve.add_argument( "--verbose", action="store_true", help="Enable debug logging on processor") serve.add_argument( "-b", "--background", action="store_true", help="Run processor in background. Used in demo") serve.add_argument( "-t", "--timeout", default=0, type=int, help="Seconds that the processor should run. 0 means no timeout") _who(serve) upload = parser.add(sub, self.upload, help="Upload a script") upload.add_argument( "--official", action="store_true", help="If set, creates a system script. Must be an admin") upload.add_argument( "file", help="Local script file to upload to OMERO") replace.add_argument( "file", help="Local script which should overwrite the existing one") delete = parser.add( sub, self.delete, help="delete an existing script") delete.add_argument( "id", type=int, help="Id of the original file which is to be deleted") run = parser.add( sub, self.run, help="Run a script with the OMERO libraries loaded and current" " login") run.add_argument("file", help="Local script file to run") run.add_argument( "input", nargs="*", help="Inputs for the script of the form 'param=value'") # log = parser.add(sub, self.log, help = "TBD", tbd="TRUE") for x in (demo, cat, edit, params, launch, disable, enable, jobs, serve, upload, replace, delete, run): x.add_login_arguments()
[docs] def help(self, args): self.ctx.out(""" Available or planned(*) commands: ================================ demo set set file=[id|name] set user=[id|name] set group=[id|name] set user= Note: Some of the other actions call set internally. cat cat file=[id|name] mv rm cp --replace / --overwrite register publish processors chain edit jobs jobs user jobs group launch launch file=[id|name] list list user list group list user=[id|name] list group=[id|name] list publication=[regex] list ofifical list namespace=[] log log file log user log group log file=[id|name] log user=[id|name] log group=[id|name] params params file=[id|name] serve serve --background serve --timeout={min} serve --verbose serve user serve group serve user=[id|name] serve group=[id|name] serve group=[id|name] user=[id|name] serve count=1 serve log serve log=some/file/somewhere upload file=/tmp/my_script.py replace delete # # Other # run """)
[docs] def demo(self, args): from omero.util.temp_files import create_path t = create_path("Demo_Script", ".py") from hashlib import sha1 digest = sha1() digest.update(DEMO_SCRIPT.encode('utf-8')) sha1 = digest.hexdigest() self.ctx.out("\nExample script writing session") self.ctx.out("="*80) def msg(title, method=None, *arguments): self.ctx.out("\n") self.ctx.out("\t+" + ("-"*68) + "+") title = "\t| %-66.66s | " % title self.ctx.out(title) if method: cmd = "%s %s" % (method.__name__, " ".join(arguments)) cmd = "\t| COMMAND: omero script %-40.40s | " % cmd self.ctx.out(cmd) self.ctx.out("\t+" + ("-"*68) + "+") self.ctx.out(" ") if method: try: self.ctx.invoke(['script', method.__name__] + list(arguments)) except Exception as e: import traceback self.ctx.out("\nEXECUTION FAILED: %s" % e) self.ctx.dbg(traceback.format_exc()) client = self.ctx.conn(args) current_user = self.ctx.get_event_context().userId query = "select o from OriginalFile o where o.hash = '%s' and" \ " o.details.owner.id = %s" % (sha1, current_user) files = client.sf.getQueryService().findAllByQuery(query, None) if len(files) == 0: msg("Saving demo script to %s" % t) t.write_text(DEMO_SCRIPT) msg("Uploading script", self.upload, str(t)) id = self.ctx.get("script.file.id") else: id = files[0].id.val msg("Reusing demo script %s" % id) msg("Listing available scripts for user", self.list, "user") msg("Printing script content for file %s" % id, self.cat, str(id)) msg("Serving file %s in background" % id, self.serve, "user", "--background") msg("Printing script params for file %s" % id, self.params, "file=%s" % id) msg("Launching script with parameters: a=bad-string (fails)", self.launch, "file=%s" % id, "a=bad-string") msg("Launching script with parameters: a=bad-string opt=6 (fails)", self.launch, "file=%s" % id, "a=bad-string", "opt=6") msg("Launching script with parameters: a=foo opt=1 (passes)", self.launch, "file=%s" % id, "a=foo", "opt=1") try: for p in list(getattr(self, "_processors", [])): p.cleanup() self._processors.remove(p) except Exception as e: self.ctx.err("Failed to clean processors: %s" % e) self.ctx.out("\nDeleting script from server...") args.id = int(id) self.delete(args)
[docs] def cat(self, args): client = self.ctx.conn(args) script_id, ofile = self._file(args, client) try: self.ctx.out(client.sf.getScriptService().getScriptText(script_id)) except Exception as e: self.ctx.err("Failed to find script: %s (%s)" % (script_id, e))
[docs] def edit(self, args): client = self.ctx.conn(args) scriptSvc = client.sf.getScriptService() script_id, ofile = self._file(args, client) txt = None try: txt = client.sf.getScriptService().getScriptText(script_id) if not txt: self.ctx.err("No text for script: %s" % script_id) self.ctx.err("Does this file appear in the script list?") self.ctx.err("If not, try 'replace'") return from omero.util.temp_files import create_path from omero.util import edit_path p = create_path() edit_path(p, txt) scriptSvc.editScript(ofile, p.text()) except Exception as e: self.ctx.err("Failed to find script: %s (%s)" % (script_id, e))
[docs] def jobs(self, args): self.ctx.conn(args) cols = ("username", "groupname", "started", "finished") query = "select j, %s, s.value from Job j join j.status s" \ % (",".join(["j.%s" % j for j in cols])) if not args.all: query += " where j.finished is null" self.ctx.out("Running query via 'hql' subcommand: %s" % query) self.ctx.invoke("""hql "%s" """ % query)
[docs] def launch(self, args): """ """ client = self.ctx.conn(args) script_id, ofile = self._file(args, client) import omero import omero.scripts import omero.rtypes svc = client.sf.getScriptService() try: params = svc.getParams(script_id) except omero.ValidationException as ve: self.ctx.die(502, "ValidationException: %s" % ve.message) m = self._parse_inputs(args, params) try: proc = svc.runScript(script_id, m, None) job = proc.getJob() except omero.ValidationException as ve: self.ctx.err("Bad parameters:\n%s" % ve) return # EARLY EXIT # Adding notification to wait on result cb = omero.scripts.ProcessCallbackI(client, proc) try: self.ctx.out("Job %s ready" % job.id.val) self.ctx.out("Waiting....") while proc.poll() is None: cb.block(1000) self.ctx.out("Callback received: %s" % cb.block(0)) rv = proc.getResults(3) finally: cb.close() def p(m): class handle(object): def write(this, val): val = "\t* %s" % val.decode() val = val.replace("\n", "\n\t* ") self.ctx.out(val, newline=False) def close(this): pass f = rv.get(m, None) if f and f.val: self.ctx.out("\n\t*** start %s (id=%s)***" % (m, f.val.id.val)) try: client.download(ofile=f.val, filehandle=handle()) except Exception: self.ctx.err("Failed to display %s" % m) self.ctx.out("\n\t*** end %s ***\n" % m) p("stdout") p("stderr") self.ctx.out("\n\t*** out parameters ***") for k, v in list(rv.items()): if k not in ("stdout", "stderr", "omero.scripts.parse"): self.ctx.out("\t* %s=%s" % (k, omero.rtypes.unwrap(v))) self.ctx.out("\t*** done ***")
[docs] def list(self, args): client = self.ctx.conn(args) sf = client.sf svc = sf.getScriptService() if args.who: who = [self._parse_who(w) for w in args.who] scripts = svc.getUserScripts(who) banner = "Scripts for %s" % ", ".join(args.who) else: scripts = svc.getScripts() banner = "Official scripts" self._parse_scripts(scripts, banner)
[docs] def log(self, args): print(args) pass
[docs] def params(self, args): client = self.ctx.conn(args) script_id, ofile = self._file(args, client) import omero svc = client.sf.getScriptService() try: job_params = svc.getParams(script_id) except omero.ValidationException as ve: self.ctx.die(454, "ValidationException: %s" % ve.message) except omero.ResourceError as re: self.ctx.die(455, "ResourceError: %s" % re.message) if job_params: self.ctx.out("") self.ctx.out("id: %s" % script_id) self.ctx.out("name: %s" % job_params.name) self.ctx.out("version: %s" % job_params.version) self.ctx.out("authors: %s" % ", ".join(job_params.authors)) self.ctx.out("institutions: %s" % ", ".join(job_params.institutions)) self.ctx.out("description: %s" % job_params.description) self.ctx.out("namespaces: %s" % ", ".join(job_params.namespaces)) self.ctx.out("stdout: %s" % job_params.stdoutFormat) self.ctx.out("stderr: %s" % job_params.stderrFormat) def print_params(which, params): import omero self.ctx.out(which) for k in sorted(params, key=lambda name: params.get(name).grouping): v = params.get(k) self.ctx.out(" %s - %s" % (k, (v.description and v.description or "(no description)"))) self.ctx.out(" Optional: %s" % v.optional) self.ctx.out(" Type: %s" % v.prototype.ice_staticId()) if isinstance(v.prototype, omero.RCollection): coll = v.prototype.val if len(coll) == 0: self.ctx.out(" Subtype: (empty)") else: self.ctx.out(" Subtype: %s" % coll[0].ice_staticId()) elif isinstance(v.prototype, omero.RMap): try: proto_value = \ v.prototype.val.values[0].ice_staticId() except Exception: proto_value = None self.ctx.out(" Subtype: %s" % proto_value) # ticket:11472 - string min/max need quoting def min_max(x): if x: if x.val is None: return "" elif isinstance(x, omero.RString): return "'%s'" % x.val else: return x.val return "" self.ctx.out(" Min: %s" % min_max(v.min)) self.ctx.out(" Max: %s" % min_max(v.max)) values = omero.rtypes.unwrap(v.values) self.ctx.out(" Values: %s" % (values and ", ".join(values) or "")) print_params("inputs:", job_params.inputs) print_params("outputs:", job_params.outputs)
[docs] def serve(self, args): # List of processors which have been started if not hasattr(self, "_processors"): self._processors = [] debug = args.verbose background = args.background timeout = args.timeout client = self.ctx.conn(args) who = [self._parse_who(w) for w in args.who] if not who: who = [] # Official scripts only # Similar to omero.util.Server starting here import logging original = list(logging._handlerList) roots = list(logging.getLogger().handlers) logging._handlerList = [] logging.getLogger().handlers = [] from omero.util import configure_logging from omero.processor import usermode_processor lvl = debug and 10 or 20 configure_logging(loglevel=lvl) try: try: impl = usermode_processor( client, serverid="omero.scripts.serve", accepts_list=who, omero_home=self.ctx.dir) self._processors.append(impl) except Exception as e: self.ctx.die(100, "Failed initialization: %s" % e) if background: def cleanup(): impl.cleanup() logging._handlerList = original logging.getLogger().handlers = roots atexit.register(cleanup) else: if self._isWindows(): self.foreground_win(impl, timeout) else: self.foreground_nix(impl, timeout) finally: if not background: logging._handlerList = original logging.getLogger().handlers = roots return impl
[docs] def foreground_nix(self, impl, timeout): """ Use signal.SIGALRM to wait for the timeout to signal """ def handler(signum, frame): raise SystemExit() old = signal.signal(signal.SIGALRM, handler) try: signal.alarm(timeout) self.ctx.input("Press any key to exit...\n") signal.alarm(0) finally: self.ctx.dbg("DONE") signal.signal(signal.SIGTERM, old) impl.cleanup()
[docs] def foreground_win(self, impl, timeout): """ Note: currently simply fails. An implementation might be possible using msvcrt. See: \ http://stackoverflow.com/questions/3471461/raw-input-and-timeout/3911560 """ try: if timeout != 0: self.ctx.die(144, "Timeout not supported on Windows") else: self.ctx.input("Press any key to exit...\n") self.ctx.dbg("DONE") finally: impl.cleanup()
[docs] def upload(self, args): p = path(args.file) if not p.exists(): self.ctx.die(502, "File does not exist: %s" % p.abspath()) import omero c = self.ctx.conn(args) scriptSvc = c.sf.getScriptService() if args.official: try: id = scriptSvc.uploadOfficialScript(args.file, p.text()) except omero.ApiUsageException as aue: if "editScript" in aue.message: self.ctx.die(502, "%s already exists; use 'replace'" " instead" % args.file) else: self.ctx.die(504, "ApiUsageException: %s" % aue.message) except omero.SecurityViolation as sv: self.ctx.die(503, "SecurityViolation: %s" % sv.message) else: id = scriptSvc.uploadScript(args.file, p.text()) self.ctx.err("Uploaded %sscript" % (args.official and "official " or "")) self.ctx.out("OriginalFile:%s" % id) self.ctx.set("script.file.id", id)
[docs] def replace(self, args): import omero client = self.ctx.conn(args) script_id, ofile = self._file(args, client) fpath = args.file file = open(fpath) scriptText = file.read() file.close() scriptSvc = client.sf.getScriptService() try: scriptSvc.editScript(ofile, scriptText) except omero.SecurityViolation as sv: self.ctx.die(200, sv.message)
[docs] def delete(self, args): ofile = args.id client = self.ctx.conn(args) try: client.sf.getScriptService().deleteScript(ofile) except Exception as e: self.ctx.err("Failed to delete script: %s (%s)" % (ofile, e))
[docs] def disable(self, args): ofile = self.setmimetype(args) self.ctx.out("Disabled %s by setting mimetype to %s" % (ofile.id.val, args.mimetype))
[docs] def enable(self, args): ofile = self.setmimetype(args) self.ctx.out("Enabled %s by setting mimetype to %s" % (ofile.id.val, args.mimetype))
[docs] def setmimetype(self, args): from omero.rtypes import rstring client = self.ctx.conn(args) script_id, ofile = self._file(args, client) if args.mimetype == MIMETYPE: # is default if ofile.name.val.endswith(".jy"): args.mimetype = "text/x-jython" elif ofile.name.val.endswith(".m"): args.mimetype = "text/x-matlab" ofile.setMimetype(rstring(args.mimetype)) return client.sf.getUpdateService().saveAndReturnObject(ofile)
# # Other #
[docs] def run(self, args): if not os.path.exists(args.file): self.ctx.die(670, "No such file: %s" % args.file) else: client = self.ctx.conn(args) store = SessionsStore() srv, usr, uuid, port = store.get_current() props = store.get(srv, usr, uuid) from omero.scripts import parse_file from omero.util.temp_files import create_path path = create_path() text = """ omero.host=%(omero.host)s omero.user=%(omero.sess)s omero.pass=%(omero.sess)s """ path.write_text(text % props) params = parse_file(args.file) m = self._parse_inputs(args, params) for k, v in list(m.items()): if v is not None: client.setInput(k, v) p = self.ctx.popen([sys.executable, args.file], stdout=sys.stdout, stderr=sys.stderr, ICE_CONFIG=str(path)) p.wait() if p.poll() != 0: self.ctx.die(p.poll(), "Execution failed.")
# # Helpers # def _parse_inputs(self, args, params): from omero.scripts import parse_inputs, parse_input, MissingInputs try: rv = parse_inputs(args.input, params) except MissingInputs as mi: rv = mi.inputs for key in mi.keys: value = self.ctx.input("""Enter value for "%s": """ % key, required=True) rv.update(parse_input("%s=%s" % (key, value), params)) return rv def _parse_scripts(self, scripts, msg): """ Parses a list of scripts to self.ctx.out """ from omero.util.text import TableBuilder tb = TableBuilder("id", msg) for x in scripts: tb.row(x.id.val, x.path.val + x.name.val) self.ctx.out(str(tb.build())) def _file(self, args, client): f = args.original_file q = client.sf.getQueryService() svc = client.sf.getScriptService() if f is None: self.ctx.die(100, "No script provided") elif f.startswith("file="): f = f[5:] try: script_id = int(f) except Exception: script_path = str(f) script_id = svc.getScriptID(script_path) ofile = q.get("OriginalFile", script_id) return script_id, ofile def _parse_who(self, who): """ Parses who items of the form: "user", "group", "user=1", "group=6" """ import omero WHO_FACTORY = {"user": omero.model.ExperimenterI, "group": omero.model.ExperimenterGroupI} WHO_CURRENT = {"user": lambda ec: ec.userId, "group": lambda ec: ec.groupId} for key, factory in list(WHO_FACTORY.items()): if who.startswith(key): if who == key: id = WHO_CURRENT[key](self.ctx.get_event_context()) return factory(id, False) else: parts = who.split("=") if len(parts) != 2: continue else: id = int(parts[1]) return factory(id, False)
try: register("script", ScriptControl, HELP) except NameError: if __name__ == "__main__": cli = CLI() cli.register("script", ScriptControl, HELP) cli.invoke(sys.argv[1:])