Source code for tables

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2009 Glencoe Software, Inc.  All Rights Reserved.
# Use is subject to license terms supplied in LICENSE.txt
#

"""
OMERO Grid Processor
"""
import Ice
import time
import traceback

import omero  # Do we need both??
import omero.clients
import omero.callbacks
from omero_ext.path import path

# For ease of use
from omero import LockTimeout
from omero.rtypes import rstring
from omero.rtypes import unwrap
from omero.util.decorators import remoted, perf


sys = __import__("sys")  # Python sys
tables = __import__("tables")  # Pytables

VERSION = '2'
RETRIES = 20


[docs] def slen(rv): """ Returns the length of the argument or None if the argument is None """ if rv is None: return None return len(rv)
[docs] class TableI(omero.grid.Table, omero.util.SimpleServant): """ Spreadsheet implementation based on pytables. """ def __init__(self, ctx, file_obj, file_path, factory, storage_factory, read_only=False, uuid="unknown", call_context=None, adapter=None): self.id = Ice.Identity() self.id.name = uuid self.uuid = uuid self.file_obj = file_obj self.factory = factory self.storage = storage_factory.getOrCreate(file_path, self, read_only) self.call_context = call_context self.adapter = adapter self.can_write = factory.getAdminService().canUpdate( file_obj, call_context) omero.util.SimpleServant.__init__(self, ctx) self.stamp = time.time() self._closed = False if (not self.file_obj.isLoaded() or self.file_obj.getDetails() is None or self.file_obj.details.group is None): self.file_obj = self.ctx.getSession().getQueryService().get( 'omero.model.OriginalFileI', unwrap(file_obj.id), {"omero.group": "-1"})
[docs] def assert_write(self): """ Checks that the current user can write to the given object at the database level. If not, no FS level writes are permitted either. ticket:2910 """ if not self.can_write: raise omero.SecurityViolation( "Current user cannot write to file %s" % self.file_obj.id.val)
[docs] def check(self): """ Called periodically to check the resource is alive. Returns False if this resource can be cleaned up. (Resources API) """ self.logger.debug("Checking %s" % self) if self._closed: return False idname = 'UNKNOWN' try: # Quietly loading the session will mean that the last access # time will not be incremented so that dangling files can be # cleaned up. Note: this is different that the strategy of a # script which *wants* to keep its session alive. idname = self.factory.ice_getIdentity().name clientSession = self.ctx.getSession().getSessionService() \ .getSession(idname, {"quietly": "true"}) if clientSession.getClosed(): self.logger.debug("Client session closed: %s" % idname) return False return True except Exception: self.logger.debug("Client session not found: %s" % idname) return False
[docs] def cleanup(self): """ Decrements the counter on the held storage to allow it to be cleaned up. Returns the current file-size. """ if self.storage: try: self.storage.decr(self) return self.storage.size() finally: self.storage = None
def __str__(self): return "Table-%s" % self.uuid @remoted @perf def close(self, current=None): try: self.__close_file() finally: if self.adapter is not None: self.adapter.remove(self.id) self.adapter = None def __close_file(self): if self._closed: self.logger.warn( "File object %d already closed", unwrap(self.file_obj.id) if self.file_obj else None) return modified = self.storage.modified() try: size = self.cleanup() self.logger.info("Closed %s", self) except: self.logger.warn("Closed %s with errors", self) self._closed = True fid = unwrap(self.file_obj.id) if self.file_obj is not None and self.can_write and modified: gid = unwrap(self.file_obj.details.group.id) client_uuid = self.factory.ice_getIdentity().category[8:] ctx = { "omero.group": str(gid), omero.constants.CLIENTUUID: client_uuid} try: # Size to reset the server object to (must be checked after # the underlying HDF file has been closed) rfs = self.factory.createRawFileStore(ctx) try: rfs.setFileId(fid, ctx) if size: rfs.truncate(size, ctx) # May do nothing rfs.write([], size, 0, ctx) # Force an update else: rfs.write([], 0, 0, ctx) # No-op file_obj = rfs.save(ctx) finally: rfs.close(ctx) self.logger.info( "Updated file object %s to hash=%s (%s bytes)", fid, unwrap(file_obj.hash), unwrap(file_obj.size)) except: self.logger.warn("Failed to update file object %s", fid, exc_info=1) else: self.logger.info("File object %s not updated", fid) # TABLES READ API ============================ @remoted @perf def getOriginalFile(self, current=None): msg = "unknown" if self.file_obj: if self.file_obj.id: msg = self.file_obj.id.val self.logger.info("%s.getOriginalFile() => id=%s", self, msg) return self.file_obj @remoted @perf def getHeaders(self, current=None): rv = self.storage.cols(None, current) self.logger.info("%s.getHeaders() => size=%s", self, slen(rv)) return rv @remoted @perf def getNumberOfRows(self, current=None): rv = self.storage.rows() self.logger.info("%s.getNumberOfRows() => %s", self, rv) return int(rv) @remoted @perf def getWhereList(self, condition, variables, start, stop, step, current=None): variables = unwrap(variables) if stop == 0: stop = None if step == 0: step = None rv = self.storage.getWhereList( self.stamp, condition, variables, None, start, stop, step) self.logger.info("%s.getWhereList(%s, %s, %s, %s, %s) => size=%s", self, condition, variables, start, stop, step, slen(rv)) return rv @remoted @perf def readCoordinates(self, rowNumbers, current=None): self.logger.info("%s.readCoordinates(size=%s)", self, slen(rowNumbers)) try: return self.storage.readCoordinates(self.stamp, rowNumbers, current) except tables.HDF5ExtError as err: aue = omero.ApiUsageException() aue.message = "Error reading coordinates. Most likely out of range" aue.serverStackTrace = "".join(traceback.format_exc()) aue.serverExceptionClass = str(err.__class__.__name__) raise aue @remoted @perf def read(self, colNumbers, start, stop, current=None): self.logger.info("%s.read(%s, %s, %s)", self, colNumbers, start, stop) if start == 0 and stop == 0: stop = None try: return self.storage.read(self.stamp, colNumbers, start, stop, current) except tables.HDF5ExtError as err: aue = omero.ApiUsageException() aue.message = "Error reading coordinates. Most likely out of range" aue.serverStackTrace = "".join(traceback.format_exc()) aue.serverExceptionClass = str(err.__class__.__name__) raise aue @remoted @perf def slice(self, colNumbers, rowNumbers, current=None): self.logger.info( "%s.slice(size=%s, size=%s)", self, slen(colNumbers), slen(rowNumbers)) return self.storage.slice(self.stamp, colNumbers, rowNumbers, current) # TABLES WRITE API =========================== @remoted @perf def initialize(self, cols, current=None): self.assert_write() self.storage.initialize(cols) if cols: self.logger.info("Initialized %s with %s col(s)", self, slen(cols)) @remoted @perf def addColumn(self, col, current=None): self.assert_write() raise omero.ApiUsageException(None, None, "NYI") @remoted @perf def addData(self, cols, current=None): self.assert_write() self.storage.append(cols) if cols and cols[0] and cols[0].getsize(): self.logger.info( "Added %s row(s) of data to %s", cols[0].getsize(), self) @remoted @perf def update(self, data, current=None): self.assert_write() if data: self.storage.update(self.stamp, data) self.logger.info( "Updated %s row(s) of data to %s", slen(data.rowNumbers), self) @remoted @perf def delete(self, current=None): self.assert_write() self.close() dc = omero.cmd.Delete2( targetObjects={"OriginalFile": [self.file_obj.id.val]} ) handle = self.factory.submit(dc) # Copied from clients.py since none is available try: callback = omero.callbacks.CmdCallbackI( current.adapter, handle, "Fake") except: # Since the callback won't escape this method, # close the handle if requested. handle.close() raise try: try: callback.loop(20, 500) except LockTimeout: raise omero.InternalException(None, None, "delete timed-out") rsp = callback.getResponse() if isinstance(rsp, omero.cmd.ERR): raise omero.InternalException(None, None, str(rsp)) finally: callback.close(True) # TABLES METADATA API =========================== @remoted @perf def getMetadata(self, key, current=None): rv = self.storage.get_meta_map() rv = rv.get(key) self.logger.info("%s.getMetadata() => %s", self, unwrap(rv)) return rv @remoted @perf def getAllMetadata(self, current=None): rv = self.storage.get_meta_map() self.logger.info("%s.getMetadata() => size=%s", self, slen(rv)) return rv @remoted @perf def setMetadata(self, key, value, current=None): self.assert_write() self.storage.add_meta_map({key: value}) self.logger.info("%s.setMetadata() => %s=%s", self, key, unwrap(value)) @remoted @perf def setAllMetadata(self, value, current=None): self.assert_write() self.storage.add_meta_map(value, replace=True) self.logger.info("%s.setMetadata() => number=%s", self, slen(value))
# Column methods missing
[docs] class TablesI(omero.grid.Tables, omero.util.Servant): """ Implementation of the omero.grid.Tables API. Provides spreadsheet like functionality across the OMERO.grid. This servant serves as a session-less, user-less resource for obtaining omero.grid.Table proxies. The first major step in initialization is getting a session. This will block until the Blitz server is reachable. """ def __init__( self, ctx, table_cast=omero.grid.TablePrx.uncheckedCast, internal_repo_cast=omero.grid.InternalRepositoryPrx.checkedCast, storage_factory=None, retries=None): omero.util.Servant.__init__(self, ctx, needs_session=True) # Storing these methods, mainly to allow overriding via # test methods. Static methods are evil. self._table_cast = table_cast self._internal_repo_cast = internal_repo_cast self.__stores = [] if storage_factory is None: from omero.hdfstorageV2 import HDFLIST self._storage_factory = HDFLIST else: self._storage_factory = storage_factory self.logger.info("Using storage factory: %s.%s", str(self._storage_factory.__module__), self._storage_factory.__class__.__name__) self.repo_cfg = None self.repo_mgr = None self.repo_obj = None self.repo_svc = None self.repo_uuid = None try: config_service = ctx.getSession().getConfigService() prefix = "omero.cluster.read_only.runtime" self.read_only = "true" in [config_service.getConfigValue( "{}.{}".format(prefix, suffix)) for suffix in ["db", "repo"]] except: self.read_only = False if self.read_only: self.logger.info("Starting in read-only mode.") if retries is None: retries = RETRIES wait = float(self.communicator.getProperties().getPropertyWithDefault( "omero.repo.wait", "1")) per_loop = wait / retries exc = None for x in range(retries): try: self._get_dir() self._get_uuid() self._get_repo() except Exception as e: self.logger.warn("Failed to find repo_svc: %s" % e) exc = e if self.repo_svc: break else: msg = "waiting %ss (%s of %s)" % (per_loop, x+1, retries) self.logger.debug(msg) self.stop_event.wait(per_loop) if exc: raise exc def _get_dir(self): """ Second step in initialization is to find the .omero/repository directory. If this is not created, then a required server has not started, and so this instance will not start. """ self.repo_dir = self.communicator.getProperties().getProperty( "omero.repo.dir") if not self.repo_dir: # Implies this is the legacy directory. Obtain from server self.repo_dir = self.ctx.getSession( ).getConfigService().getConfigValue("omero.data.dir") self.repo_cfg = path(self.repo_dir) / ".omero" / "repository" if not self.repo_cfg.exists(): msg = "No repository found: %s" % self.repo_cfg raise omero.ResourceError(None, None, msg) def _get_uuid(self): """ Third step in initialization is to find the database uuid for this grid instance. Multiple OMERO.grids could be watching the same directory. """ cfg = self.ctx.getSession().getConfigService() self.db_uuid = cfg.getDatabaseUuid() self.instance = self.repo_cfg / self.db_uuid def _get_repo(self): """ Fourth step in initialization is to find the repository object for the UUID found in .omero/repository/<db_uuid>, and then create a proxy for the InternalRepository attached to that. """ uuidfile = self.instance / "repo_uuid" if not uuidfile.exists(): msg = "%s doesn't exist" % uuidfile raise IOError(msg) # Get and parse the uuid from the RandomAccessFile format from # FileMaker self.repo_uuid = uuidfile.lines()[0].strip() if len(self.repo_uuid) != 38: raise omero.ResourceError( "Poorly formed UUID: %s" % self.repo_uuid) self.repo_uuid = self.repo_uuid[2:] # Using the repo_uuid, find our OriginalFile object self.repo_obj = self.ctx.getSession().getQueryService().findByQuery( "select f from OriginalFile f where hash = :uuid", omero.sys.ParametersI().add("uuid", rstring(self.repo_uuid))) self.repo_mgr = self.communicator.stringToProxy( "InternalRepository-%s" % self.repo_uuid) self.repo_mgr = self._internal_repo_cast(self.repo_mgr) self.repo_svc = self.repo_mgr.getProxy() @remoted def getRepository(self, current=None): """ Returns the Repository object for this Tables server. """ return self.repo_svc @remoted @perf def getTable(self, file_obj, factory, current=None): """ Create and/or register a table servant. """ # Will throw an exception if not allowed. file_id = None if file_obj is not None and file_obj.id is not None: file_id = file_obj.id.val self.logger.info("getTable: %s %s", file_id, current.ctx) file_path = self.repo_mgr.getFilePath(file_obj) p = path(file_path).dirname() if not p.exists(): p.makedirs() table = TableI(self.ctx, file_obj,file_path, factory, self._storage_factory, read_only=self.read_only, uuid=Ice.generateUUID(), call_context=current.ctx, adapter=current.adapter) self.resources.add(table) prx = current.adapter.add(table, table.id) return self._table_cast(prx)