#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# OMERO Utilities package
#
# Copyright 2009 Glencoe Software, Inc. All Rights Reserved.
# Use is subject to license terms supplied in LICENSE.txt
#
from appdirs import user_data_dir, user_cache_dir
import os
import sys
import Ice
import shlex
import omero
import IcePy
import IceGrid
import logging
import platform
import Glacier2
import threading
import logging.handlers
import omero.util.concurrency
import uuid
import omero.ObjectFactoryRegistrar as ofr
from omero.util.decorators import locked
from omero_version import omero_version
import omero_ext.path as path
LOGDIR = os.path.join("var", "log")
LOGFORMAT = "%(asctime)s %(levelname)-5.5s [%(name)40s] " \
"(%(threadName)-10s) %(message)s"
LOGLEVEL = logging.INFO
LOGSIZE = 500000000
LOGNUM = 9
LOGMODE = "a"
orig_stdout = sys.stdout
orig_stderr = sys.stderr
# Application name, Author (Windows only), Major version
APPDIR_DEFAULTS = ('OMERO.py', 'OME', omero_version.split('.')[0])
[docs]
def make_logname(self):
"""
Generates a logname from the given instance using the module
and name from its class
"""
log_name = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
return log_name
[docs]
class StreamRedirect(object):
"""
Since all server components should exclusively using the logging module
any output to stdout or stderr is caught and logged at "WARN". This is
useful, especially in the case of Windows, where stdout/stderr is eaten.
"""
def __init__(self, logger):
self.logger = logger
self.internal = logging.getLogger("StreamRedirect")
self.softspace = False
[docs]
def write(self, msg):
msg = msg.strip()
if msg:
self.logger.warn(msg)
def __getattr__(self, name):
self.internal.warn("No attribute: %s" % name)
[docs]
class Dependency(object):
"""
Centralized logic for declaring and logging a service
dependency on a non-shipped library. This is called
lazily from the run method of the application to give
logging time to be initialized.
See #4566
"""
def __init__(self, key):
self.key = key
[docs]
def get_version(self, target):
"""
Get version method which returns a string
representing. Should be overwritten by
subclasses for packages/modules with no
__version__ field.
"""
return target.__version__
[docs]
def check(self, logger):
try:
target = __import__(self.key)
version = self.get_version(target)
logger.info("Loaded dependency %s (%s)" % (self.key, version))
return True
except ImportError:
logger.error("Failed to load: '%s'" % self.key)
return False
[docs]
def internal_service_factory(communicator, user="root", group=None, retries=6,
interval=10, client_uuid=None, stop_event=None):
"""
Try to return a ServiceFactory from the grid.
Try a number of times then give up and raise the
last exception returned. This method will only
work internally to the grid, i.e. behind the Glacier2
firewall. It is intended for internal servers to
be able to create sessions for accessing the database. ::
communicator := Ice.Communicator used to find the registry
user := Username which should have a session created
group := Group into which the session should be logged
retries := Number of session creation retries before throwing
interval := Seconds between retries
client_uuid := Uuid of the client which should be used
"""
log = logging.getLogger("omero.utils")
if stop_event is None:
stop_event = omero.util.concurrency.get_event(
name="internal_service_factory")
tryCount = 0
excpt = None
query = communicator.stringToProxy("IceGrid/Query")
query = IceGrid.QueryPrx.checkedCast(query)
implicit_ctx = communicator.getImplicitContext()
implicit_ctx.put(omero.constants.AGENT, "Python service")
if client_uuid is not None:
implicit_ctx.put(omero.constants.CLIENTUUID, client_uuid)
else:
if not implicit_ctx.containsKey(omero.constants.CLIENTUUID):
client_uuid = str(uuid.uuid4())
implicit_ctx.put(omero.constants.CLIENTUUID, client_uuid)
while tryCount < retries:
if stop_event.isSet(): # Something is shutting down, exit.
return None
try:
blitz = query.findAllObjectsByType("::Glacier2::SessionManager")[0]
blitz = Glacier2.SessionManagerPrx.checkedCast(blitz)
sf = blitz.create(user, None)
# Group currently unused.
return omero.api.ServiceFactoryPrx.checkedCast(sf)
except Exception as e:
tryCount += 1
log.info("Failed to get session on attempt %s", str(tryCount))
excpt = e
stop_event.wait(interval)
log.warn("Reason: %s", str(excpt))
if excpt:
raise excpt
[docs]
def create_admin_session(communicator):
"""
"""
reg = communicator.stringToProxy("IceGrid/Registry")
reg = IceGrid.RegistryPrx.checkedCast(reg)
adm = reg.createAdminSession('null', '')
return adm
[docs]
def add_grid_object(communicator, obj):
"""
"""
communicator.identityToString(obj.ice_getIdentity())
adm = create_admin_session(communicator)
prx = adm.getAdmin()
try:
try:
prx.addObject(obj)
except IceGrid.ObjectExistsException:
prx.updateObject(obj)
finally:
adm.destroy()
[docs]
def long_to_path(id, root=""):
"""
Converts a long to a path such that for all directiories only
a 1000 files and a 1000 subdirectories will be returned.
This method duplicates the logic in
ome.io.nio.AbstractFileSystemService.java:getPath()
"""
suffix = ""
remaining = id
dirno = 0
if id is None or id == "":
raise Exception("Expecting a not-null id.")
id = int(id)
if id < 0:
raise Exception("Expecting a non-negative id.")
while (remaining > 999):
remaining /= 1000
if remaining > 0:
dirno = remaining % 1000
suffix = os.path.join("Dir-%03d" % dirno, suffix)
return os.path.join(root, "%s%s" % (suffix, id))
[docs]
def load_dotted_class(dotted_class):
"""
Load a Python class of the form "pkg.mod.Class"
via __import__ and return it. No ctor or similar
is called.
"""
try:
parts = dotted_class.split(".")
pkg = ".".join(parts[0:-2])
mod = str(parts[-2])
kls = parts[-1]
got = __import__(pkg, fromlist=[mod])
got = getattr(got, mod)
return getattr(got, kls)
except Exception as e:
raise Exception("""Failed to load: %s
previous excetion: %s""" % (dotted_class, e))
[docs]
class ServerContext(object):
"""
Context passed to all servants.
server_id, communicator, and stop_event will be
constructed by the top-level Server instance.
A context instance may also be configured to hold
on to an internal session (ServiceFactoryPrx) and
keep it alive.
This instance obeys the Resources API and calls
sf.keepAlive(None) on every check call, but does
nothing on cleanup. The sf instance must be manually
cleaned as the final operation of a servant.
(Note: cleanup of the server context indicates
server shutdown, so should be infrequent)
"""
def __init__(self, server_id, communicator, stop_event,
on_newsession=None):
self._lock = threading.RLock()
self.logger = logging.getLogger("omero.util.ServerContext")
self.server_id = server_id
self.communicator = communicator
self.stop_event = stop_event
self.servant_map = dict()
self.on_newsession = None
@locked
def add_servant(self, adapter_or_current, servant, ice_identity=None):
oa = adapter_or_current
if isinstance(adapter_or_current, (Ice.Current, IcePy.Current)):
oa = oa.adapter
if ice_identity is None:
prx = oa.addWithUUID(servant)
else:
prx = oa.add(servant, ice_identity)
servant.setProxy(prx)
self.servant_map[prx] = servant
return prx
[docs]
def newSession(self):
self.session = internal_service_factory(
self.communicator, stop_event=self.stop_event)
if callable(self.on_newsession):
self.on_newsession(self.session)
[docs]
def hasSession(self):
return hasattr(self, "session")
@locked
def getSession(self, recreate=True):
"""
Returns the ServiceFactoryPrx configured for the context if
available. If the context was not configured for sessions,
an ApiUsageException will be thrown: servants should know
whether or not they were configured for sessions.
See Servant(..., needs_session = True)
Otherwise, if there is no ServiceFactoryPrx, an attempt will
be made to create one if recreate == True. If the value is None
or non can be recreated, an InternalException will be thrown.
TODO : currently no arguments are provided for re-creating these,
but also not in Servant.__init__
"""
if not self.hasSession():
raise omero.ApiUsageException(
"Not configured for server connection")
if self.session:
try:
self.session.keepAlive(None)
except Ice.CommunicatorDestroyedException:
self.session = None # Ignore
except Exception as e:
self.logger.warn("Connection failure: %s" % e)
self.session = None
if self.session is None and recreate:
try:
self.newSession()
self.logger.info("Established connection: %s" % self.session)
except Exception as e:
self.logger.warn("Failed to establish connection: %s" % e)
if self.session is None:
raise omero.InternalException("No connection to server")
return self.session
[docs]
def check(self):
"""
Calls getSession() but always returns True. This keeps the context
available in the resources for later uses, and tries to re-establish
a connection in case Blitz goes down.
"""
try:
self.getSession()
except:
pass
return True
[docs]
def cleanup(self):
"""
Does nothing. Context clean up must happen manually
since later activities may want to reuse it. Servants using
a server connection should cleanup the instance *after* Resources
is cleaned up
"""
pass
[docs]
class Server(Ice.Application):
"""
Basic server implementation which can be used for
implementing a standalone python server which can
be started from icegridnode.
The servant implementation MUST have a constructor
which takes a single ServerContext argument AND
have a cleanup() method
Logging is configured relative to the current directory
to be in var/log by default.
Usage::
if __name__ == "__main__":
app=Server(
ServicesI, "ServicesAdapter", Ice.Identity("Services",""))
sys.exit(app.main(sys.argv))
app.impl now points to an instance of ServicesI
"""
def __init__(self, impl_class, adapter_name, identity, logdir=LOGDIR,
dependencies=()):
self.impl_class = impl_class
self.adapter_name = adapter_name
self.identity = identity
self.logdir = logdir
self.stop_event = omero.util.concurrency.get_event(name="Server")
self.dependencies = dependencies
[docs]
def waitOnStartup(self):
ms = 10000 # 10 seconds by default
try:
i = os.environ.get("OMERO_STARTUP_WAIT", "10000")
ms = int(i)
except:
self.logger.debug(exc_info=1)
try:
self.logger.info("Waiting %s ms on startup" % ms)
self.stop_event.wait(ms // 1000)
except:
self.logger.debug(exc_info=1)
[docs]
def run(self, args):
from omero.rtypes import ObjectFactories as rFactories
from omero.columns import ObjectFactories as cFactories
props = self.communicator().getProperties()
configure_server_logging(props)
self.logger = logging.getLogger("omero.util.Server")
self.logger.info("*" * 80)
self.waitOnStartup()
self.logger.info("Starting")
failures = 0
for x in self.dependencies:
if not x.check(self.logger):
failures += 1
if failures:
self.logger.error("Missing dependencies: %s" % failures)
sys.exit(50)
self.shutdownOnInterrupt()
try:
ofr.registerObjectFactory(self.communicator(), None) # No client
for of in list(rFactories.values()) + list(cFactories.values()):
of.register(self.communicator())
try:
serverid = self.communicator().getProperties().getProperty(
"Ice.ServerId")
ctx = ServerContext(
serverid, self.communicator(), self.stop_event)
self.impl = self.impl_class(ctx)
getattr(self.impl, "cleanup") # Required per docs
except:
self.logger.error("Failed initialization", exc_info=1)
sys.exit(100)
try:
self.adapter = self.communicator().createObjectAdapter(
self.adapter_name)
self.adapter.activate()
# calls setProxy
ctx.add_servant(self.adapter, self.impl, self.identity)
# ticket:1978 for non-collocated registries
prx = self.adapter.createDirectProxy(self.identity)
# This must happen _after_ activation
add_grid_object(self.communicator(), prx)
except:
self.logger.error("Failed activation", exc_info=1)
sys.exit(200)
self.logger.info("Entering main loop")
self.communicator().waitForShutdown()
finally:
self.stop_event.set() # Let's all waits shutdown
self.logger.info("Cleanup")
self.cleanup()
self.logger.info("Stopped")
self.logger.info("*" * 80)
[docs]
def cleanup(self):
"""
Cleans up all resources that were created by this server.
Primarily the one servant instance.
"""
if hasattr(self, "impl"):
try:
self.impl.cleanup()
finally:
del self.impl
[docs]
class SimpleServant(object):
"""
Base servant initialization. Doesn't create or try to cleanup
a top-level Resources thread. This is useful for large numbers
of servants. For servers and other singleton-like servants,
see "Servant"
"""
def __init__(self, ctx):
self._lock = threading.RLock()
self.prx = None # Proxy which points to self
self.ctx = ctx
self.stop_event = ctx.stop_event
self.communicator = ctx.communicator
self.logger = logging.getLogger(make_logname(self))
self.logger.debug("Created")
[docs]
def setProxy(self, prx):
"""
Should be overwritten for post-initialization activities.
The reason this method exists is that the implementation
must be complete before registering it with the adapter.
"""
self.prx = prx
[docs]
class Servant(SimpleServant):
"""
Abstract servant which can be used along with a slice2py
generated dispatch class as the base type of high-level servants.
These provide resource cleanup as per the omero.util.Server
class.
By passing "needs_session = True" to this constructor, an internal
session will be created and stored in ServerContext as well as
registered with self.resources
"""
def __init__(self, ctx, needs_session=False):
SimpleServant.__init__(self, ctx)
self.resources = omero.util.Resources(
sleeptime=60, stop_event=self.stop_event)
if needs_session:
self.ctx.newSession()
self.resources.add(self.ctx)
[docs]
def cleanup(self):
"""
Cleanups all resoures created by this servant. Calling
cleanup multiple times should be safe.
"""
resources = self.resources
self.resources = None
if resources is not None:
self.logger.info("Cleaning up")
resources.cleanup()
self.logger.info("Done")
if self.ctx.hasSession():
try:
sf = self.ctx.getSession(recreate=False)
self.logger.debug("Destroying %s" % sf)
sf.destroy()
except:
pass
def __del__(self):
self.cleanup()
[docs]
class Resources(object):
"""
Container class for storing resources which should be
cleaned up on close and periodically checked. Use
stop_event.set() to stop the internal thread.
"""
def __init__(self, sleeptime=60, stop_event=None):
"""
Add resources via add(object). They should have a no-arg cleanup()
and a check() method.
The check method will be called periodically (default: 60 seconds)
on each resource. The cleanup method will be called on
Resources.cleanup()
"""
self.stuff = []
self._lock = threading.RLock()
self.logger = logging.getLogger("omero.util.Resources")
self.stop_event = stop_event
if not self.stop_event:
self.stop_event = omero.util.concurrency.get_event(
name="Resources")
if sleeptime < 5:
raise Exception(
"Sleep time should be greater than 5: %s" % sleeptime)
self.sleeptime = sleeptime
class Task(threading.Thread):
"""
Internal thread used for checking "stuff"
"""
def run(self):
ctx = self.ctx # Outer class
ctx.logger.info("Starting")
while not ctx.stop_event.isSet():
try:
ctx.logger.debug("Executing")
copy = ctx.copyStuff()
remove = ctx.checkAll(copy)
ctx.removeAll(remove)
except:
ctx.logger.error(
"Exception during execution", exc_info=True)
ctx.logger.debug("Sleeping %s" % ctx.sleeptime)
# ticket:1531 - Attempting to catch threading issues
try:
ctx.stop_event.wait(ctx.sleeptime)
except ValueError:
pass
if isinstance(ctx.stop_event,
omero.util.concurrency.AtExitEvent):
if ctx.stop_event.atexit:
return # Skipping log. See #3260
ctx.logger.info("Halted")
self.thread = Task()
self.thread.ctx = self
self.thread.start()
@locked
def copyStuff(self):
"""
Within a lock, copy the "stuff" list and reverse it.
The list is reversed so that entries added
later, which may depend on earlier added entries
get a chance to be cleaned up first.
"""
copy = list(self.stuff)
copy.reverse()
return copy
# Not locked
[docs]
def checkAll(self, copy):
"""
While stop_event is unset, go through the copy
of stuff and call the check method on each
entry. Any that throws an exception or returns
a False value will be returned in the remove list.
"""
remove = []
for m in copy:
if self.stop_event.isSet():
return # Let cleanup handle this
self.logger.debug("Checking %s" % m[0])
method = getattr(m[0], m[2])
rv = None
try:
rv = method()
except:
self.logger.warn("Error from %s" % method, exc_info=True)
if not rv:
remove.append(m)
return remove
@locked
def removeAll(self, remove):
"""
Finally, within another lock, call the "cleanup"
method on all the entries in remove, and remove
them from the official stuff list. (If stop_event
is set during execution, we return with the assumption
that Resources.cleanup() will take care of them)
"""
for r in remove:
if self.stop_event.isSet():
return # Let cleanup handle this
self.logger.debug("Removing %s" % r[0])
self.safeClean(r)
self.stuff.remove(r)
@locked
def add(self, object, cleanupMethod="cleanup", checkMethod="check"):
entry = (object, cleanupMethod, checkMethod)
self.logger.debug("Adding object %s" % object)
self.stuff.append(entry)
@locked
def cleanup(self):
stuff = self.stuff
self.stuff = None
self.stop_event.set()
if stuff:
for m in stuff:
self.safeClean(m)
self.logger.debug("Cleanup done")
[docs]
def safeClean(self, m):
try:
self.logger.debug("Cleaning %s" % m[0])
method = getattr(m[0], m[1])
method()
except:
self.logger.error("Error cleaning resource: %s" % m[0], exc_info=1)
def __del__(self):
self.cleanup()
[docs]
class Environment(object):
"""
Simple class for creating an executable environment
"""
def __init__(self, *args):
"""
Takes an number of environment variable names which
should be copied to the target environment if present
in the current execution environment.
"""
if sys.platform == "win32":
# Prevents SocketException. See ticket:1518
self.env = os.environ.copy()
else:
self.env = {}
for arg in args:
if arg in os.environ:
self.env[arg] = os.environ[arg]
def __call__(self):
"""
Returns the environment map when called.
"""
return self.env
[docs]
def set(self, key, value):
"""
Manually sets a value in the target environment.
"""
self.env[key] = value
[docs]
def append(self, key, addition):
"""
Manually adds a value to the environment string
"""
if key in list(self.env.keys()):
self.env[key] = os.pathsep.join([self.env[key], addition])
else:
self.set(key, addition)
#
# Miscellaneious utilities
#
[docs]
def get_user(default=None):
"""
Returns the username. For most purposes, this value
will be the same as getpass.getuser on Unix and
win32api.GetUserName on Windows, but in some situations
(when running without a terminal, etc) getuser may throw
a KeyError. In which case, or if the username resolves to
False, the default value be returned.
Any unexpected exceptions will be thrown.
See ticket:6307
"""
rv = None
try:
import getpass
rv = getpass.getuser() # Uses environment variable or pwd
except KeyError: # 6307, probably system
pass
except ImportError: # No pwd on Windows
import win32api
rv = win32api.GetUserName()
if not rv:
return default
else:
return rv
[docs]
def get_omero_userdir():
"""
Returns the OMERO user directory
In 6.0.0 the default will change to use appdirs.user_data_dir.
You can enable this behaviour now by setting OMERO_USERDIR="" (empty
string) instead of unset.
Note that files in the old user directory will not be migrated.
"""
omero_userdir = os.environ.get('OMERO_USERDIR', None)
if omero_userdir:
return path.path(omero_userdir)
elif omero_userdir == "":
return path.path(user_data_dir(*APPDIR_DEFAULTS))
else:
return path.path(get_user_dir()) / "omero"
[docs]
def get_omero_user_cache_dir():
"""Returns the OMERO user cache directory"""
omero_userdir = os.environ.get('OMERO_USERDIR', None)
if omero_userdir:
return path.path(omero_userdir) / "cache"
else:
return path.path(user_cache_dir(*APPDIR_DEFAULTS))
# Other application directories may be added in future, see
# https://pypi.org/project/appdirs/
[docs]
def get_user_dir():
exceptions_to_handle = (ImportError)
try:
from pywintypes import com_error
from win32com.shell import shellcon, shell
exceptions_to_handle = (ImportError, com_error)
homeprop = shell.SHGetFolderPath(0, shellcon.CSIDL_APPDATA, 0, 0)
except exceptions_to_handle:
homeprop = os.path.expanduser("~")
if "~" == homeprop:
# ticket:5583
raise Exception("Unexpanded '~' from expanduser: see ticket:5583")
return homeprop
[docs]
def edit_path(path_or_obj, start_text):
f = path.path(path_or_obj)
editor = os.getenv("VISUAL") or os.getenv("EDITOR")
if not editor:
if platform.system() == "Windows":
editor = "Notepad.exe"
else:
editor = "vi"
if isinstance(start_text, str):
start_text = start_text.encode("utf-8")
f.write_text(start_text)
# If absolute, then use the path
# as is (ticket:4246). Otherwise,
# use which.py to find it.
editor_parts = shlex.split(editor)
editor = editor_parts[0]
editor_obj = path.path(editor)
if editor_obj.isabs():
editor_parts[0] = editor
else:
from omero_ext.which import which
editor_parts[0] = which(editor)
editor_parts.append(f)
pid = os.spawnl(os.P_WAIT, editor_parts[0], *tuple(editor_parts))
if pid:
re = RuntimeError(
"Couldn't spawn editor: %s" % editor_parts[0])
re.pid = pid
raise re
# From: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/157035
[docs]
def tail_lines(filename, linesback=10, returnlist=0):
"""Does what "tail -10 filename" would have done
Parameters:
filename file to read
linesback Number of lines to read from end of file
returnlist Return a list containing the lines instead of a string
"""
avgcharsperline = 75
file = open(filename, 'r')
while 1:
try:
file.seek(-1 * avgcharsperline * linesback, 2)
except IOError:
file.seek(0)
if file.tell() == 0:
atstart = 1
else:
atstart = 0
lines = file.read().split("\n")
if (len(lines) > (linesback + 1)) or atstart:
break
# The lines are bigger than we thought
avgcharsperline = avgcharsperline * 1.3 # Inc avg for retry
file.close()
if len(lines) > linesback:
start = len(lines) - linesback - 1
else:
start = 0
if returnlist:
return lines[start:len(lines) - 1]
out = ""
for l in lines[start:len(lines) - 1]:
out = out + l + "\n"
return out