Source code for plugins.tag

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
# Copyright (C) 2013-2015 Glencoe Software, Inc. All rights reserved.
# Use is subject to license terms supplied in LICENSE.txt
# Sam Hart <sam@glencoesoftware.com>

"""
Tag plugin for command-line tag manipulation
"""

import platform
import subprocess
import sys
import json

import omero
from omero.cli import BaseControl, CLI, ExceptionHandler
from omero.rtypes import rlong, rstring, unwrap
from omero.model import TagAnnotationI, AnnotationAnnotationLinkI

HELP = """Manage OMERO user tags.

Plugin for managing and viewing OMERO user tags.

Examples:

    omero tag list       # List all the tags, grouped by tagset
    omero tag create     # Creates a tag

    # Create a tag set named 'data_10.28' and associate the tag number 18 wth
    # it.
    omero tag createset --tag 18 --name data_10.28
"""


[docs] class Tag(object): def __init__(self, tag_id=None, name=None, description=None, owner=None, children=None): self.tag_id = tag_id self.name = name self.description = description self.owner = owner self.children = children
[docs] class TagCollection(object): def __init__(self): self.tags = dict() self.owners = dict() self.mapping = dict() self.orphans = [] self.empties = []
[docs] def exec_command(cmd): """ given a command, will execute it in the parent environment Returns a list containing the output """ p = subprocess.Popen(cmd, stdout=subprocess.PIPE) output = p.stdout.readlines() p.stdout.close() return output
[docs] def clip(s, width): """ Given a string, s, and a width, will clip the string to that width or fill it with spaces up to that width. Returns modified string """ mod_s = s if s and len(s) > width: mod_s = s[:width] elif s and len(s) < width: mod_s = s + " " * (width - len(s)) return mod_s
[docs] class TagControl(BaseControl): def _configure(self, parser): self.exc = ExceptionHandler() parser.add_login_arguments() sub = parser.sub() listtags = parser.add( sub, self.list, help="List all the tags, grouped by tagset") self.add_standard_params(listtags) listtags.add_argument( "--tagset", nargs="+", type=int, help="One or more tagset IDs") self.add_tag_common_params(listtags) listtags.add_login_arguments() listsets = parser.add(sub, self.listsets, help="List tag sets") self.add_standard_params(listsets) listsets.add_argument( "--tag", nargs="+", type=int, help="List only tagsets containing the following tag ID(s)") self.add_tag_common_params(listsets) listsets.add_login_arguments() create = parser.add(sub, self.create, help="Create a new tag") self.add_newtag_params(create) create.add_login_arguments() createset = parser.add( sub, self.createset, help="Create a new tag set") createset.add_argument( "--tag", nargs="+", required=True, type=int, help="ID(s) of the tag(s) to include in this set") self.add_newtag_params(createset) createset.add_login_arguments() loadj = sub.add_parser( self.load.__func__.__name__, help="Import new tag(s) and tagset(s) from JSON file", description="Import new tag(s) and tagset(s) from JSON file", epilog=""" JSON File Format: The format of the JSON file should be as follows: [{ "name" : "Name of the tagset", "desc" : "Description of the tagset", "set" : [{ "name" : "Name of tag", "desc" : "Description of tag" },{ "name" : "Name of tag", "desc" : "Description of tag" },{ .... },{ .... }] """) loadj.set_defaults(func=self.load) loadj.add_argument( "filename", nargs="?", help="The filename containing tag JSON") loadj.add_login_arguments() links = parser.add( sub, self.link, help="Link annotation to an object") links.add_argument( "object", help="The object to link to. Should be of form" " <object_type>:<object_id>") links.add_argument( 'tag_id', type=int, help="The tag annotation ID") self.add_standard_params(links) links.add_login_arguments() # Recurring parameter methods
[docs] def add_newtag_params(self, parser): parser.add_argument( "--name", help="The name of the new tag or tagset") parser.add_argument( "--desc", "--description", help="The description of the new tag or tagset")
[docs] def add_tag_common_params(self, parser): parser.add_argument( "--uid", help="List only tags/tagsets belonging to the following user ID") parser.add_argument( "--desc", "--description", "--descriptions", action="store_true", default=False, help="Display descriptions of tags")
[docs] def add_standard_params(self, parser): parser.add_argument( "--admin", action="store_true", default=False, help="Perform action as an administrator") parser.add_argument( "--nopage", action="store_true", default=False, help="Disable pagination")
# Output methods
[docs] def print_line(self, line, index): if self.console_length is None: self.ctx.out(line) elif index % self.console_length == 0 and index: input_val = input("[Enter], [f]orward forever, or [q]uit: ") if input_val.lower() == 'q': sys.exit(0) elif input_val.lower() == 'f': self.console_length = None else: self.ctx.out(line)
[docs] def pagetext(self, lines): for index, line in enumerate(lines): self.print_line(line, index)
[docs] def pagetext_format(self, format, elements): for index, line in enumerate(elements): self.print_line(format.format(*line), index)
[docs] def determine_console_size(self): """ Will attempt to determine console size based upon the current platform. Returns tuple of width and length. """ # The defaults if we can't figure it out lines = 25 width = 80 this_system = platform.system().lower() try: if this_system in ['linux', 'darwin', 'macosx', 'cygwin']: output = exec_command(['tput', 'lines']) if len(output) > 0: lines = int(output[0].rstrip()) output = exec_command(['tput', 'cols']) if len(output) > 0: width = int(output[0].rstrip()) elif this_system in ['windows', 'win32']: # http://stackoverflow.com/questions/566746/\ # how-to-get-console-window-width-in-python from ctypes import windll, create_string_buffer from struct import unpack # stdin handle is -10 # stdout handle is -11 # stderr handle is -12 h = windll.kernel32.GetStdHandle(-12) csbi = create_string_buffer(22) res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) if res: (bufx, bufy, curx, cury, wattr, left, top, right, bottom, maxx, maxy) = unpack("hhhhHhhhhhh", csbi.raw) lines = bottom - top + 1 width = bottom - top + 1 except: # Possible evil to ignore what the error was, but, truthfully, # the reason we do so is because it means it's a platform we # don't have support for, or a platform we should have support # for but which has some non-standard witch-craftery going on self.ctx.out("Could not determine the console length.") return width, lines
# Data gathering methods
[docs] def list_tags(self, args, tagset=None): """ Returns a TagCollection object """ # Get all tags except empty tagsets recursively tc = self.list_tags_recursive(args, tagset=tagset) # Now add the empty tagsets to the collection params = omero.sys.ParametersI() params.addString('ns', omero.constants.metadata.NSINSIGHTTAGSET) ice_map = dict() if args.admin: ice_map["omero.group"] = "-1" client = self.ctx.conn(args) session = client.getSession() q = session.getQueryService() sql = """ select ann.id, ann.description, ann.textValue from TagAnnotation ann where ann.id not in (select distinct l.parent.id from AnnotationAnnotationLink l) and ann.ns=:ns """ if args.uid: params.map["eid"] = rlong(int(args.uid)) sql += " and ann.details.owner.id = :eid" if tagset: sql += " and ann.id = :tid" params.map['tid'] = rlong(int(tagset)) for element in q.projection(sql, params, ice_map): tag_id, description, text = list(map(unwrap, element)) tc.empties.append(Tag( tag_id=tag_id, name=text, description=description)) return tc
[docs] def list_tags_recursive(self, args, tagset=None): """ Returns a TagCollection object """ params = omero.sys.ParametersI() params.addString('ns', omero.constants.metadata.NSINSIGHTTAGSET) ice_map = dict() if args.admin: ice_map["omero.group"] = "-1" client = self.ctx.conn(args) session = client.getSession() q = session.getQueryService() parent_tags = [] sql = """ select aal.parent.id, aal.child.id, aal.parent.description, aal.parent.textValue from AnnotationAnnotationLink aal inner join aal.parent ann where ann.ns=:ns """ if args.uid: params.map["eid"] = rlong(int(args.uid)) sql += " and ann.details.owner.id = :eid" if tagset: sql += " and ann.id = :tid" params.map['tid'] = rlong(int(tagset)) tc = TagCollection() for element in q.projection(sql, params, ice_map): parent = unwrap(element[0]) child = unwrap(element[1]) tc.mapping.setdefault(parent, []).append(child) parent_tags.append(Tag( tag_id=parent, name=unwrap(element[3]), description=unwrap(element[2]) )) if tagset: sql = """ select distinct child.id, child.description, child.textValue, owner.id, owner.firstName, owner.lastName from AnnotationAnnotationLink as ann join ann.child as child join child.details.owner as owner join ann.parent as parent where parent.id = :tid or child.id = :tid """ else: # Get orphans first sql = """ select ann.id, ann.textValue, ann.description from TagAnnotation ann where ann.id not in (select distinct l.child.id from AnnotationAnnotationLink l join l.parent as ts where ts.ns = :ns) and ann.ns is null """ if args.uid: sql += " and ann.details.owner.id = :eid" for element in q.projection(sql, params, ice_map): tag_id, text, description = list(map(unwrap, element)) tc.orphans.append(Tag( tag_id=tag_id, name=text, description=description)) # Now set up search for rest of tags sql = """ select ann.id, ann.description, ann.textValue, ann.details.owner.id, ann.details.owner.firstName, ann.details.owner.lastName from TagAnnotation ann """ if args.uid: if tagset: sql += " and ann.details.owner.id = :eid" else: sql += " where ann.details.owner.id = :eid" for element in q.projection(sql, params, ice_map): tag_id, description, text, owner, first, last = list(map(unwrap, element)) tc.tags[tag_id] = Tag( tag_id=tag_id, name=text, description=description, owner=owner, children=tc.mapping.get(tag_id) or 0 ) tc.owners[owner] = "%s %s" % (first, last) for parent in parent_tags: if parent.tag_id not in tc.tags: tc.tags[parent.tag_id] = parent return tc
[docs] def list_tagsets(self, args, tag): """ Returns a TagCollection of just the tagsets. If tag is provided, will return the tagsets with those tags. """ params = omero.sys.ParametersI() params.addString('ns', omero.constants.metadata.NSINSIGHTTAGSET) ice_map = dict() if args.admin: ice_map["omero.group"] = "-1" client = self.ctx.conn(args) session = client.getSession() q = session.getQueryService() tc = TagCollection() if tag: sql = """ select a.id, a.description, a.textValue, a.details.owner.id, a.details.owner.firstName, a.details.owner.lastName from AnnotationAnnotationLink b inner join b.parent a where a.ns=:ns """ sql += " and b.child.id = :tid" params.map['tid'] = rlong(int(tag)) else: sql = """ select a.id, a.description, a.textValue, a.details.owner.id, a.details.owner.firstName, a.details.owner.lastName from TagAnnotation a where a.ns=:ns """ if args.uid: params.map["eid"] = rlong(int(args.uid)) sql += " and a.details.owner.id = :eid" for element in q.projection(sql, params, ice_map): tag_id, description, text, owner, first, last = list(map(unwrap, element)) tc.tags[tag_id] = Tag( tag_id=tag_id, name=text, description=description, owner=owner ) tc.owners[owner] = "%s %s" % (first, last) return tc
[docs] def generate_tagset(self, tags, mapping, args): """ Given a dict of tags and mappings for parent/child relationships return a list of lines representing the tagset output. """ lines = [] for key in list(mapping.keys()): lines.append("+- %s:'%s'" % (str(key), tags[key].name)) if args.desc: lines.append("| '%s'" % tags[key].description) lines.append("|") lines.append('|\\') for tag_key in mapping[key]: lines.append("| +- %s:'%s'" % (str(tag_key), tags[tag_key].name)) if args.desc: lines.append("| '%s'" % tags[tag_key].description) lines.append("|") lines.append('') return lines
[docs] def generate_orphans(self, orphans, args): """ Given a list of orphaned tags, return a list of lines representing the orphan output. """ lines = [] lines.append('Orphaned tags:') for orphan in orphans: lines.append("> %s:'%s'" % (str(orphan.tag_id), orphan.name)) if args.desc: lines.append(" '%s'" % orphan.description) lines.append('') return lines
[docs] def generate_empties(self, empties, args): """ Given a list of empty tagsets, return a list of lines representing the empty tagset output. """ lines = [] lines.append('Empty tagsets:') for empty in empties: lines.append("> %s:'%s'" % (str(empty.tag_id), empty.name)) if args.desc: lines.append(" '%s'" % empty.description) lines.append('') return lines
[docs] def create_tag(self, name, description, text="tag"): """ Creates a new tag object. Returns the new tag object. If name parameter is None, the user will be prompted to input it. The "text" parameter should be the text description to use upon user input. For example, if we were creating a tag, this would be "tag" (the default). If we were creating a tagset, this could be "tag set". """ if name is None: name = input("Please enter a name for this %s: " % text) if name is not None and name != '': tag = TagAnnotationI() tag.textValue = rstring(name) if description is not None and len(description) > 0: tag.description = rstring(description) return tag else: self.ctx.err("Tag/tagset name cannot be 'None' or empty.") sys.exit(1)
# Actual command methods
[docs] def create(self, args): """ create a tag command. """ tag = self.create_tag(args.name, args.desc) client = self.ctx.conn(args) session = client.getSession() update_service = session.getUpdateService() tag = update_service.saveAndReturnObject(tag) self.ctx.out("TagAnnotation:%s" % tag.id.val)
[docs] def createset(self, args): """ Create a tag set command. """ tags = [] if args.tag: if type(args.tag) is list: tags = args.tag else: tags = [args.tag] else: # Should not happen self.ctx.err("Missing tag parameter") sys.exit(1) tag = self.create_tag(args.name, args.desc, text="tag set") tag.ns = rstring(omero.constants.metadata.NSINSIGHTTAGSET) links = [] for t in tags: link = AnnotationAnnotationLinkI() link.parent = tag link.child = TagAnnotationI(rlong(int(t)), False) links.append(link) client = self.ctx.conn(args) session = client.getSession() update_service = session.getUpdateService() try: links = update_service.saveAndReturnArray(links) self.ctx.out("TagAnnotation:%s" % links[0].parent.id.val) except omero.ValidationException as e: self.ctx.err(e.message) self.ctx.err("Check that tag '%s' exists." % t) sys.exit(1)
[docs] def load(self, args): """ Import new tag(s) from json. """ if args.filename: fobj = open(args.filename) else: fobj = sys.stdin p = json.load(fobj) if fobj is not sys.stdin: fobj.close() to_add = [] for element in p: if 'set' in element: tag = self.create_tag(str(element['name']), str(element['desc'])) tag.ns = rstring(omero.constants.metadata.NSINSIGHTTAGSET) links = [] for e in element['set']: t = self.create_tag(str(e['name']), str(e['desc'])) link = AnnotationAnnotationLinkI() link.parent = tag link.child = t links.append(link) to_add.extend(links) else: to_add.append(self.create_tag(str(element['name']), str(element['desc']))) client = self.ctx.conn(args) session = client.getSession() update_service = session.getUpdateService() to_add = update_service.saveAndReturnArray(to_add) ids = [] for element in to_add: if isinstance(element, TagAnnotationI): self.ctx.out("TagAnnotation:%s" % element.id.val) ids.append(element.id.val) else: tag_id = element.parent.id.val if tag_id not in ids: self.ctx.out("TagAnnotation:%s" % tag_id) ids.append(tag_id)
[docs] def list(self, args): """ List tags command. """ if args.nopage: self.console_length = None self.width = 80 else: self.width, self.console_length = self.determine_console_size() tagsets = [None] lines = [] if args.tagset: if type(args.tagset) is list: tagsets = args.tagset else: tagsets = [args.tagset] for tagset in tagsets: tc = self.list_tags(args, tagset) lines.extend(self.generate_tagset(tc.tags, tc.mapping, args)) if len(tc.orphans) > 0: lines.extend(self.generate_orphans(tc.orphans, args)) if len(tc.empties) > 0: lines.append('') if len(tc.empties) > 0: lines.extend(self.generate_empties(tc.empties, args)) self.pagetext(lines)
[docs] def listsets(self, args): """ List tag sets command. """ # The max width of the ID field. We need something here unless # we want to pre-search to determine the max size. Our assumption # here is that we wont have more than 10,000,000 tags. # FIXME - We can figure this out easily enough- Sam max_id_width = 8 if args.nopage: self.console_length = None self.width = 80 else: self.width, self.console_length = self.determine_console_size() if args.desc: max_field_width = int(((self.width - max_id_width) / 2.0) - 2) else: max_field_width = self.width - max_id_width - 2 tags = [None] lines = [] if args.tag: if type(args.tag) is list: tags = args.tag else: tags = [args.tag] separator = ( "-" * max_id_width, "-" * max_field_width, "-" * max_field_width ) lines.append(separator) lines.append(( clip("ID", max_id_width), clip("Name", max_field_width), clip("Description", max_field_width) )) lines.append(separator) for tag_id in tags: tc = self.list_tagsets(args, tag_id) for key, tag in list(tc.tags.items()): if args.nopage: lines.append(( tag.tag_id, tag.name, tag.description )) else: lines.append(( clip(str(tag.tag_id), max_id_width), clip(tag.name, max_field_width), clip(tag.description, max_field_width) )) lines.append(separator) if args.desc: self.pagetext_format("{0}|{1}|{2}", lines) else: self.pagetext_format("{0}|{1}", lines)
try: register("tag", TagControl, HELP) except NameError: if __name__ == "__main__": cli = CLI() cli.register("user", TagControl, HELP) cli.invoke(sys.argv[1:])