Source code for util.concurrency

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
# Copyright 2009 Glencoe Software, Inc.  All Rights Reserved.
# Use is subject to license terms supplied in LICENSE.txt
#

"""
OMERO Concurrency Utilities
"""
import atexit
import logging
import threading
import omero.util
import logging.handlers

from threading import Event as _Event
from threading import Timer as _Timer


[docs] def get_event(name="Unknown"): """ Returns a threading.Event instance which is registered to be "set" (Event.set()) on system exit. """ event = AtExitEvent(name=name) atexit.register(event.setAtExit) return event
[docs] class AtExitEvent(_Event): """ threading.Event extension which provides an additional method setAtExit() which sets "atexit" to true. This class was introduced in 4.2.1 to work around issue #3260 in which logging from background threads produced error messages. """ def __init__(self, verbose=None, name="Unknown"): try: super(AtExitEvent, self).__init__(verbose) except TypeError: # in Python 3 there is no verbose argument super(AtExitEvent, self).__init__() self.__name = name self.__atexit = False name = property(lambda self: self.__name) atexit = property(lambda self: self.__atexit)
[docs] def setAtExit(self): self.__atexit = True super(AtExitEvent, self).set()
def __repr__(self): return "%s (%s)" % (super(AtExitEvent, self).__repr__(), self.__name)
[docs] class Timer(_Timer): """Based on threading._Thread but allows for resetting the Timer. t = Timer(30.0, f, args=[], kwargs={}) t.start() t.cancel() # stop the timer's action if it's still waiting # or t.reset() After excecution, the status of the run can be checked via the "completed" and the "exception" Event instances. """ def __init__(self, interval, function, args=None, kwargs=None): if args is None: args = [] if kwargs is None: kwargs = {} _Timer.__init__(self, interval, function, args, kwargs) self.log = logging.getLogger(omero.util.make_logname(self)) self.completed = threading.Event() self.exception = threading.Event() self._reset = threading.Event()
[docs] def reset(self): self.log.debug("Reset called") self._reset.set() # Set first, so that the loop will continue self.finished.set() # Forces waiting thread to fall through
[docs] def run(self): while True: self.finished.wait(self.interval) if self._reset.isSet(): self.finished.clear() self._reset.clear() self.log.debug("Resetting") continue if not self.finished.isSet(): try: self.log.debug("Executing") self.function(*self.args, **self.kwargs) self.completed.set() self.finished.set() except: self.exception.set() self.finished.set() raise break