Source code for plugins.hql

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# 
# Copyright 2008 Glencoe Software, Inc. All rights reserved.
# Use is subject to license terms supplied in LICENSE.txt

"""
   HQL plugin

   Plugin read by omero.cli.Cli during initialization. The method(s)
   defined here will be added to the Cli class for later use.
"""

from omero.cli import BaseControl, CLI
import time
import sys

HELP = """Executes an HQL statement with the given parameters

If no query is given, then a shell is opened which will run any entered query
with the current parameters.
"""

BLACKLISTED_KEYS = ["_id", "_loaded", "_bytes"]
WHITELISTED_VALUES = [0, False]


[docs] class HqlControl(BaseControl): def _configure(self, parser): parser.set_defaults(func=self.__call__) parser.add_argument("query", nargs="?", help="Single query to run") parser.add_argument( "--admin", help="Run an admin query (deprecated; use 'all')", default=False, action="store_true") parser.add_argument( "--all", help="Perform query on all groups", default=False, action="store_true", dest="admin") parser.add_argument( "--ids-only", action="store_true", help="Show only the ids of returned objects") parser.add_limit_arguments() parser.add_style_argument() parser.add_login_arguments() self.add_error("NO_QUIET", 67, "Can't ask for query with --quiet option") self.add_error("NOT_ADMIN", 53, ("SecurityViolation: Current user is not an" " admin and cannot use '--admin'")) self.add_error("BAD_QUERY", 52, "Bad query: %s") def __call__(self, args): if args.query: self.hql(args) else: if self.ctx.isquiet: self.raise_error("NO_QUIET") while True: args.query = self.ctx.input("Enter query:") if not args.query: break if not self.hql(args, loop=True): break
[docs] def hql(self, args, loop=False): from omero_sys_ParametersI import ParametersI ice_map = dict() if args.admin: ice_map["omero.group"] = "-1" c = self.ctx.conn(args) q = c.sf.getQueryService() p = ParametersI() p.page(args.offset, args.limit) rv = self.project(q, args.query, p, ice_map) has_details = self.display(rv, style=args.style, idsonly=args.ids_only) if self.ctx.isquiet or not sys.stdout.isatty(): return input = """ To see details for object, enter line number. To move ahead one page, enter 'p' To re-display list, enter 'r'. To quit, enter 'q' or just enter. """ if loop: input = input + """To run another query, press enter\n""" while True: id = self.ctx.input(input) id = id.lower() # Exit loop if not id: return True if id.startswith("q"): return False # Stay in loop if id.startswith("p"): p.page(p.getOffset().val + p.getLimit().val, p.getLimit()) self.ctx.dbg("\nCurrent page: offset=%s, limit=%s\n" % (p.theFilter.offset.val, p.theFilter.limit.val)) rv = self.project(q, args.query, p, ice_map) self.display(rv, style=args.style, idsonly=args.ids_only) elif id.startswith("r"): self.display(rv, style=args.style, idsonly=args.ids_only) else: try: id = int(id) obj = rv[id] if id not in has_details: self.ctx.out("No details available: %s" % id) continue else: # Unwrap the object_list from IQuery.projection obj = obj[0].val except: self.ctx.out("Invalid choice: %s" % id) continue keys = sorted(obj.__dict__) keys.remove("_id") keys.remove("_details") self.ctx.out("id = %s" % obj.id.val) for key in keys: value = self.unwrap(obj.__dict__[key]) if isinstance(value, str): value = "'%s'" % value if key.startswith("_"): key = key[1:] self.ctx.out("%s = %s" % (key, value)) continue
[docs] def display(self, rv, cols=None, style=None, idsonly=False): import omero.all import omero.rtypes from omero.util.text import TableBuilder has_details = [] tb = TableBuilder("#") if style: tb.set_style(style) for idx, object_list in enumerate(rv): klass = "Null" id = "" values = {} # Handling for simple lookup if not idsonly and len(object_list) == 1 and \ isinstance(object_list[0], omero.rtypes.RObjectI): has_details.append(idx) o = object_list[0].val if o: tb.cols(["Class", "Id"]) klass = o.__class__.__name__ id = o.id.val for k, v in list(o.__dict__.items()): values[k] = self.unwrap(v) values = self.filter(values) tb.cols(list(values.keys())) tb.row(idx, klass, id, **values) # Handling for true projections else: indices = list(range(1, len(object_list) + 1)) if cols is not None: tb.cols(cols) else: tb.cols(["Col%s" % x for x in indices]) values = tuple([self.unwrap(x) for x in object_list]) tb.row(idx, *values) self.ctx.out(str(tb.build())) return has_details
[docs] def unwrap(self, object, cache=None): if cache is None: cache = {} elif object in cache: return cache[id(object)] from omero.rtypes import unwrap from omero.model import IObject from omero.model import Details from omero.rtypes import RTimeI # if isinstance(object, list): # return [self.unwrap(x, cache) for x in object] # elif isinstance(object, RObject): # return self.unwrap(object.val, cache) unwrapped = unwrap(object, cache) if isinstance(unwrapped, IObject): rv = "%s:%s" % (unwrapped.__class__.__name__, unwrapped.id.val) elif isinstance(object, RTimeI): rv = time.ctime(unwrapped / 1000.0) elif isinstance(object, Details): owner = None group = None if unwrapped.owner is not None: owner = unwrapped.owner.id.val if unwrapped.group is not None: group = unwrapped.group.id.val rv = "owner=%s;group=%s" % (owner, group) else: rv = unwrapped cache[id(object)] = rv return rv
[docs] def filter(self, values): values = dict(values) # Filter out blacklisted items blacklisted_keys = [x for x in values if x in BLACKLISTED_KEYS] for key in blacklisted_keys: values.pop(key) # Filter out _Loaded keys loaded_keys = [x for x in values if x.startswith("_") and x.endswith("Loaded")] for key in loaded_keys: values.pop(key) # Filter out details if "owner=None;group=None" == values.get("_details"): values.pop("_details") # Filter out multi-valued and empty-valued items multi_valued = sorted([k for k in values if isinstance(values[k], list)]) empty_valued = sorted([ k for k in values if not values[k] and values[k] not in WHITELISTED_VALUES]) for x in multi_valued + empty_valued: if x in values: values.pop(x) rv = dict() for k, v in list(values.items()): if k.startswith("_"): rv[k[1:]] = v else: rv[k] = v return rv
[docs] def project(self, querySvc, queryStr, params, ice_map): import omero try: rv = querySvc.projection(queryStr, params, ice_map) self.ctx.set("last.hql.rv", rv) return rv except omero.SecurityViolation as sv: if "omero.group" in ice_map: self.raise_error("NOT_ADMIN") else: self.ctx.die(54, "SecurityViolation: %s" % sv) except omero.QueryException as qe: self.ctx.set("last.hql.rv", []) self.raise_error("BAD_QUERY", qe.message)
try: register("hql", HqlControl, HELP) except NameError: if __name__ == "__main__": cli = CLI() cli.register("hql", HqlControl, HELP) cli.invoke(sys.argv[1:])