Source code for callbacks

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
#
#   Copyright 2010 Glencoe Software, Inc. All rights reserved.
#   Use is subject to license terms supplied in LICENSE.txt
#
#

"""
  Callbacks to be used with asynchronous services. The
  ProcessCallbackI is also included in the omero.scripts
  module for backwards compatibility.
"""

import Ice
import logging
import threading
import uuid

import omero
import omero.all
import omero.util.concurrency


PROC_LOG = logging.getLogger("omero.scripts.ProcessCallback")
DEL_LOG = logging.getLogger("omero.api.DeleteCallback")
CMD_LOG = logging.getLogger("omero.cmd.CmdCallback")


[docs] def adapter_and_category(adapter_or_client, category): if isinstance(adapter_or_client, Ice.ObjectAdapter): # This should be the case either when an # instance is created server-side or when # the user has passed in a category # explicitly. If it's missing, then we'll # have to throw if not category: raise omero.ClientError("No category available") return adapter_or_client, category else: # This is the case client-side, where an # omero.client instance is available. # If a category is passed we use that # (though it's unlikely that that will be useful) if not category: category = adapter_or_client.getCategory() return adapter_or_client.getAdapter(), category
[docs] class ProcessCallbackI(omero.grid.ProcessCallback): """ Simple callback which registers itself with the given process. """ FINISHED = "FINISHED" CANCELLED = "CANCELLED" KILLED = "KILLED" def __init__(self, adapter_or_client, process, poll=True, category=None): self.event = omero.util.concurrency.get_event(name="ProcessCallbackI") self.result = None self.poll = poll self.process = process self.adapter, self.category = \ adapter_and_category(adapter_or_client, category) self.id = Ice.Identity(str(uuid.uuid4()), self.category) self.prx = self.adapter.add(self, self.id) # OK ADAPTER USAGE self.prx = omero.grid.ProcessCallbackPrx.uncheckedCast(self.prx) process.registerCallback(self.prx)
[docs] def block(self, ms): """ Should only be used if the default logic of the process methods is kept in place. If "event.set" does not get called, this method will always block for the given milliseconds. """ if self.poll: try: rc = self.process.poll() if rc is not None: self.processFinished(rc.getValue()) except Exception as e: PROC_LOG.warn("Error calling poll: %s" % e) self.event.wait(ms / 1000) if self.event.isSet(): return self.result return None
[docs] def processCancelled(self, success, current=None): self.result = ProcessCallbackI.CANCELLED self.event.set()
[docs] def processFinished(self, returncode, current=None): self.result = ProcessCallbackI.FINISHED self.event.set()
[docs] def processKilled(self, success, current=None): self.result = ProcessCallbackI.KILLED self.event.set()
[docs] def close(self): self.adapter.remove(self.id) # OK ADAPTER USAGE
[docs] class CmdCallbackI(omero.cmd.CmdCallback): """ Callback servant used to wait until a HandlePrx would return non-null on getReponse. The server will notify of completion to prevent constantly polling on getResponse. Subclasses can override methods for handling based on the completion status. Example usage:: cb = CmdCallbackI(client, handle) response = None while (response is None): response = cb.block(500) # or response = cb.loop(5, 500) """ def __init__(self, adapter_or_client, handle, category=None, foreground_poll=True): if adapter_or_client is None: raise omero.ClientError("Null client") if handle is None: raise omero.ClientError("Null handle") self.event = omero.util.concurrency.get_event(name="CmdCallbackI") self.state = (None, None) # (Response, Status) self.handle = handle self.adapter, self.category = \ adapter_and_category(adapter_or_client, category) self.id = Ice.Identity(str(uuid.uuid4()), self.category) self.prx = self.adapter.add(self, self.id) # OK ADAPTER USAGE self.prx = omero.cmd.CmdCallbackPrx.uncheckedCast(self.prx) handle.addCallback(self.prx) self.initialPoll(foreground_poll)
[docs] def initialPoll(self, foreground_poll=False): """ Called at the end of construction to check a race condition. If HandlePrx finishes its execution before the CmdCallbackPrx has been sent set via addCallback, then there's a chance that this implementation will never receive a call to finished, leading to perceived hangs. By default, this method starts a background thread and calls poll(). An Ice.ObjectNotExistException implies that another caller has already closed the HandlePrx. By passing, foreground_poll=True, the poll() invocation can be performed in the calling thread as in 5.1.0 and before. """ if foreground_poll: return self.poll() class T(threading.Thread): def run(this): try: self.poll() except: # don't throw any exceptions, e.g. if the # handle has already been closed. self.onFinished(None, None, None) T().start()
# # Local invocations #
[docs] def getResponse(self): """ Returns possibly null Response value. If null, then neither has the remote server nor the local poll method called finish with non-null values. """ return self.state[0]
[docs] def getStatus(self): """ Returns possibly null Status value. If null, then neither has the remote server nor the local poll method called finish with non-null values. """ return self.state[1]
[docs] def getStatusOrThrow(self): s = self.getStatus() if not s: raise omero.ClientError("Status not present!") return s
[docs] def isCancelled(self): """ Returns whether Status::CANCELLED is contained in the flags variable of the Status instance. If no Status is available, a ClientError will be thrown. """ s = self.getStatusOrThrow() try: s.flags.index(omero.cmd.State.CANCELLED) return True except: return False
[docs] def isFailure(self): """ Returns whether Status::FAILURE is contained in the flags variable of the Status instance. If no Status is available, a ClientError will be thrown. """ s = self.getStatusOrThrow() try: s.flags.index(omero.cmd.State.FAILURE) return True except: return False
[docs] def loop(self, loops, ms): """ Calls block(long) "loops" number of times with the "ms" argument. This means the total wait time for the delete to occur is: loops X ms. Sensible values might be 10 loops for 500 ms, or 5 seconds. @param loops Number of times to call block(long) @param ms Number of milliseconds to pass to block(long @throws omero.LockTimeout if block(long) does not return a non-null value after loops calls. """ count = 0 found = False while count < loops: count += 1 found = self.block(ms) if found: break if found: return self.getResponse() else: waited = (ms / 1000.0) * loops raise omero.LockTimeout( None, None, "Command unfinished after %s seconds" % waited, 5000, int(waited))
[docs] def block(self, ms): """ Blocks for the given number of milliseconds unless finished(Response, Status, Current) has been called in which case it returns immediately with true. If false is returned, then the timeout was reached. """ self.event.wait(ms / 1000) return self.event.isSet()
# # Remote invocations #
[docs] def poll(self): """ Calls HandlePrx#getResponse in order to check for a non-null value. If so, {@link Handle#getStatus} is also called, and the two non-null values are passed to finished(Response, Status, Current). This should typically not be used. Instead, favor the use of block and loop. """ rsp = self.handle.getResponse() if rsp is not None: s = self.handle.getStatus() # Only time that current should be null self.finished(rsp, s, None)
[docs] def step(self, complete, total, current=None): """ Called periodically by the server to signal that processing is moving forward. Default implementation does nothing. """ pass
[docs] def finished(self, rsp, status, current=None): """ Called when the command has completed whether with a cancellation or a completion. """ self.state = (rsp, status) self.event.set() self.onFinished(rsp, status, current)
[docs] def onFinished(self, rsp, status, current): """ Method intended to be overridden by subclasses. Default logic does nothing. """ pass
[docs] def close(self, closeHandle): """ First removes self from the adapter so as to no longer receive notifications, and the calls close on the remote handle if requested. """ self.adapter.remove(self.id) # OK ADAPTER USAGE if closeHandle: self.handle.close()