Source code for plugins.search

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
# Copyright (C) 2012 Glencoe Software, Inc. All Rights Reserved.
# Use is subject to license terms supplied in LICENSE.txt
#

"""
Simple command-line searching. Similar to the hql plugin.
"""


import sys
import time

from Ice import OperationNotExistException
from omero.cli import CLI
from omero.plugins.hql import HqlControl
from omero.rtypes import robject


HELP = """Search for object ids by string.

Examples:

  omero search Image "my-text"
  omero search Image "with wildcard*"
  omero search Project "with wildcard*"

Examples (admin-only):

  omero search --index Image:1
  omero search --index Well:5

See also:

  https://lucene.apache.org/java/2_9_1/queryparsersyntax.html

"""


[docs] class SearchControl(HqlControl): def _configure(self, parser): parser.add_argument( "--index", action="store_true", default=False, help="Index an object as a administrator") parser.add_argument( "--no-parse", action="store_true", help="Pass the search string directly to Lucene with no parsing") parser.add_argument( "--field", nargs="*", default=(), help=("Fields which should be searched " "(e.g. name, description, annotation)")) parser.add_argument( "--from", dest="_from", metavar="YYYY-MM-DD", type=self.date, help="Start date for limiting searches (YYYY-MM-DD)") parser.add_argument( "--to", dest="_to", metavar="YYYY-MM-DD", type=self.date, help="End date for limiting searches (YYYY-MM-DD)") parser.add_argument( "--date-type", default="import", choices=("acquisitionDate", "import"), help=("Which field to use for --from/--to " "(default: acquisitionDate)")) parser.add_argument( "type", help="Object type to search for, e.g. 'Image' or 'Well'") HqlControl._configure(self, parser) parser.set_defaults(func=self.search) self.add_error("BAD_OBJECT", 431, "Bad object: %s") self.add_error("ADMIN_ONLY", 432, "Only admin can index object") self.add_error("NO_RESULTS", 433, "No results found.") self.add_error("USAGE", 434, "usage: %s")
[docs] def date(self, user_string): try: t = time.strptime(user_string, "%Y-%m-%d") return time.strftime("%Y%m%d", t) except Exception as e: self.ctx.dbg(str(e)) raise
[docs] def search(self, args): c = self.ctx.conn(args) import omero import omero.all if args.index: if not self.ctx.get_event_context().isAdmin: self.raise_error("ADMIN_ONLY") try: parts = args.type.split(":") kls = parts[0].strip() kls = getattr(omero.model, kls) kls = kls.ice_staticId() of = c.getCommunicator().findObjectFactory(kls) obj = of.create(kls) id = int(parts[1].strip()) obj.setId(omero.rtypes.rlong(id)) except Exception as e: self.ctx.dbg(e) self.raise_error("BAD_OBJECT", args.type) c.sf.getUpdateService().indexObject(obj) else: group = None if args.admin: group = "-1" ctx = c.getContext(group) search = c.sf.createSearchService() try: try: # Matching OMEROGateway.search() search.setAllowLeadingWildcard(True) search.setCaseSensitive(False) search.onlyType(args.type) if args.no_parse: if args._from or args._to or args.field: self.ctx.err("Ignoring from/to/fields") search.byFullText(args.query) else: try: if args.date_type == "import": args.date_type = "details.creationEvent.time" search.byLuceneQueryBuilder( ",".join(args.field), args._from, args._to, args.date_type, args.query, ctx) except OperationNotExistException: self.ctx.err( "Server does not support byLuceneQueryBuilder") search.byFullText(args.query) if not search.hasNext(ctx): self.raise_error("NO_RESULTS") self.ctx.set("search.results", []) while search.hasNext(ctx): results = search.results(ctx) self.ctx.get("search.results").extend(results) results = [[x] for x in results] if not args.ids_only: results = [[robject(x[0])] for x in results] self.display(results, style=args.style, idsonly=args.ids_only) except omero.ApiUsageException as aue: self.raise_error("USAGE", aue.message) finally: search.close()
try: register("search", SearchControl, HELP) except NameError: if __name__ == "__main__": cli = CLI() cli.register("search", SearchControl, HELP) cli.invoke(sys.argv[1:])