Source code for install.perf_test

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
# Add: screen/plate
# Add: plotting
#

import re
import os
import sys
import time
import omero
import logging

import omero.cli
import omero.util
import omero.util.temp_files
import omero_ext.path as path
import uuid

command_pattern = r"^\s*(\w+)(\((.*)\))?(:(.*))?$"
command_pattern_compiled = re.compile(command_pattern)
log = logging.getLogger("omero.perf")

FILE_FORMAT = """
File format:
    <blank>                                     Ignored
    # comment                                   Ignored
    ServerTime(repeat=100)                      Retrieve the server time 100 \
times
    Import:<file>                               Import given file
    Import(Screen:<id>):<file>                  Import given file into screen
    Import(Dataset:<id>):<file>                 Import given file into \
project/dataset
    Import(Project:<id>,Dataset:<id>):<file>    Import given file into \
project/dataset
    Import(Dataset:some name):<file>            Import given file into a new \
dataset
    Import(Dataset):<file>                      Import given file into last \
created dataset (or create a new one)

    #
    # "Import" is the name of a command available in the current context
    # Use the "--list" command to print them all. All lines must be of the
    # form: %s

""" % command_pattern


#
# Main classes
#

[docs] class ItemException(Exception): pass
[docs] class BadCommand(ItemException): pass
[docs] class BadLine(ItemException): pass
[docs] class BadPath(ItemException): pass
[docs] class BadImport(ItemException): pass
[docs] class Item(object): """ Single line-item in the configuration file """ def __init__(self, line): self.line = line.strip() if not self.comment(): match = command_pattern_compiled.match(self.line) if not match: raise BadLine("Unexpected line: %s" % line) self.command = match.group(1) self.arguments = match.group(3) self.path = match.group(5) self.props = dict() if self.arguments: args = self.arguments.split(",") for arg in args: parts = arg.split("=", 1) value = (len(parts) == 2 and parts[1] or "") self.props[parts[0]] = value log.debug("Found line: %s, %s, %s, %s", self.command, self.arguments, self.path, self.props)
[docs] def repeat(self): return int(self.props.get("repeat", "1"))
[docs] def comment(self): if len(self.line) == 0: return True elif self.line.startswith("#"): return True
[docs] def execute(self, ctx): if self.comment(): return m = getattr(self, "_op_%s" % self.command, None) if m is None: raise BadCommand("Unknown command: %s" % self.command) m(ctx)
[docs] def create_obj(self, ctx, name): id = None id_path = ctx.dir / ("%s.id" % name) prop = self.props.get(name) # Do nothing if not in props if prop is None: return None # If an integer, use that as an id try: id = int(prop) log.debug("Using specified %s:%s" % (name, id)) except: # Otherwise, create/re-use if prop == "": try: id = int(id_path.lines()[0]) except Exception as e: log.debug("No %s.id: %s", name, e) prop = str(uuid.uuid4()) # Now, if there's still no id, create one if id is not None: log.debug("Using reload %s:%s" % (name, id)) else: kls = getattr(omero.model, "%sI" % name) obj = kls() obj.name = omero.rtypes.rstring(prop) obj = ctx.update_service().saveAndReturnObject(obj) id = obj.id.val log.debug("Created obj %s:%s" % (name, id)) id_path.write_text(str(id)) return id
def _op_Import(self, ctx): p = path.path(self.path) if not p.exists(): raise BadPath("File does not exist: %s" % self.path) f = str(p.abspath()) out = ctx.dir / ("import_%s.out" % ctx.count) err = ctx.dir / ("import_%s.err" % ctx.count) args = ["import", "---file=%s" % str(out), "---errs=%s" % str(err), "-s", ctx.host(), "-k", ctx.key(), f] s_id = self.create_obj(ctx, "Screen") if s_id: args.extend(["-r", str(s_id)]) p_id = self.create_obj(ctx, "Project") d_id = self.create_obj(ctx, "Dataset") if p_id and d_id: self.create_link( ctx, "ProjectDatasetLink", omero.model.ProjectI(p_id, False), omero.model.DatasetI(d_id, False)) if d_id: args.extend(["-d", str(d_id)]) ctx.cli.invoke(args) if ctx.cli.rv != 0: raise BadImport("Failed import: rv=%s" % ctx.cli.rv) num_pix = len(out.lines()) log.debug("Import count: %s", num_pix) def _op_ServerTime(self, ctx): ctx.config_service().getServerTime() def _op_LoadFormats(self, ctx): ctx.query_service().findAll("Format", None)
[docs] class Context(object): """ Login context which can be used by any handler for connecting to a single session. """ def __init__(self, id, reporter=None, client=None): self.reporters = [] self.count = 0 self.id = id if client is None: self.client = omero.client(id) self.client.setAgent("OMERO.perf_test") self.client.createSession() else: self.client = client self.services = {} self.cli = omero.cli.CLI() self.cli.loadplugins() self.setup_dir() log.debug("Running performance tests in %s", self.dir)
[docs] def add_reporter(self, reporter): self.reporters.append(reporter)
[docs] def setup_dir(self): self.dir = path.path(".") / ("perfdir-%s" % os.getpid()) if self.dir.exists(): raise Exception("%s exists!" % self.dir) self.dir.makedirs() # Adding a file logger handler = logging.handlers.RotatingFileHandler( str(self.dir / "perf.log"), maxBytes=10000000, backupCount=5) handler.setLevel(logging.DEBUG) formatter = logging.Formatter(omero.util.LOGFORMAT) handler.setFormatter(formatter) logging.getLogger().addHandler(handler) # log.addHandler(handler) log.debug("Started: %s", time.ctime())
[docs] def incr(self): self.count += 1
[docs] def host(self): return self.client.getProperty("omero.host")
[docs] def key(self): return self.client.sf.ice_getIdentity().name
[docs] def report(self, command, start, stop, loops, rv): for reporter in self.reporters: reporter.report(command, start, stop, loops, rv)
def _stateless(self, name, prx): svc = self.services.get(name) if svc: return svc else: svc = self.client.sf.getByName(name) svc = prx.uncheckedCast(svc) self.services[name] = svc return svc
[docs] def query_service(self): return self._stateless(omero.constants.QUERYSERVICE, omero.api.IQueryPrx)
[docs] def config_service(self): return self._stateless(omero.constants.CONFIGSERVICE, omero.api.IConfigPrx)
[docs] def update_service(self): return self._stateless(omero.constants.UPDATESERVICE, omero.api.IUpdatePrx)
[docs] class PerfHandler(object): def __init__(self, ctx=None): self.ctx = ctx def __call__(self, line): (self.ctx.dir / "line.log").write_text(line, append=True) item = Item(line) if item.comment(): return values = {} total = 0.0 self.ctx.incr() start = time.time() loops = item.repeat() for i in range(loops): try: item.execute(self.ctx) except ItemException: log.exception("Error") sys.exit(1) except Exception: log.debug("Error during execution: %s" % item.line.strip(), exc_info=True) errs = values.get("errs", 0) errs += 1 values["errs"] = errs if loops > 1: values["avg"] = total // loops stop = time.time() total += (stop - start) self.ctx.report(item.command, start, stop, loops, values)
# # Reporter hierarchy #
[docs] class Reporter(object): """ Abstract base class of all reporters """
[docs] def report(self, command, start, stop, loops, rv): raise Exception("Not implemented")
[docs] class CsvReporter(Reporter): def __init__(self, dir=None): if dir is None: self.stream = sys.stdout else: self.file = str(dir / "report.csv") self.stream = open(self.file, "w") print("Command,Start,Stop,Elapsed,Average,Values", file=self.stream)
[docs] def report(self, command, start, stop, loops, values): print("%s,%s,%s,%s,%s,%s" % ( command, start, stop, (stop-start), (stop-start) // loops, values), file=self.stream) self.stream.flush()
[docs] class HdfReporter(Reporter): def __init__(self, dir): import tables self.file = str(dir / "report.hdf") # Temporarily support old and new PyTables methods try: open_file = tables.open_file create_table = self.hdf.create_table except AttributeError: open_file = tables.openFile create_table = self.hdf.createTable self.hdf = open_file(self.file, "w") self.tbl = create_table("/", "report", { "Command": tables.StringCol(pos=0, itemsize=64), "Start": tables.Float64Col(pos=1), "Stop": tables.Float64Col(pos=2), "Elapsed": tables.Float64Col(pos=3), "Average": tables.Float64Col(pos=4), "Values": tables.StringCol(pos=5, itemsize=1000) }) self.row = self.tbl.row
[docs] def report(self, command, start, stop, loops, values): self.row["Command"] = command self.row["Start"] = start self.row["Stop"] = stop self.row["Elapsed"] = (stop-start) self.row["Average"] = (stop-start) // loops self.row["Values"] = values self.row.append() self.hdf.flush()
[docs] class PlotReporter(Reporter): def __init__(self): return # import matplotlib.pyplot as plt # self.fig = plt.figure() # self.ax = fig.add_subplot(111)
[docs] def report(self, command, start, stop, loops, values): return
# ax.set_ylim(-2,25) # ax.set_xlim(0, (last-first)) # plt.show() ######################################################## # # Functions for the execution of this module #
[docs] def handle(handler, files): """ Primary method used by the command-line execution of this module. """ log.debug("Running perf on files: %s", files) for file in files: for line in file: handler(line) log.debug("Handled %s lines" % handler.ctx.count)