Source code for util.cleanse

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Reconcile and cleanse where necessary an OMERO data directory of orphaned data.
"""

#
#  Copyright (c) 2009-2016 University of Dundee. All rights reserved.
#
#  Redistribution and use in source and binary forms, with or without
#  modification, are permitted provided that the following conditions
#  are met:
#  1. Redistributions of source code must retain the above copyright
#     notice, this list of conditions and the following disclaimer.
#  2. Redistributions in binary form must reproduce the above copyright
#     notice, this list of conditions and the following disclaimer in the
#     documentation and/or other materials provided with the distribution.
#
#  THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
#  ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
#  ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
#  FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
#  DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
#  OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
#  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
#  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
#  OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
#  SUCH DAMAGE.

import omero.clients
import omero
import sys
import os
import getpass
import Ice
import warnings

from Glacier2 import PermissionDeniedException
from getopt import getopt, GetoptError
from omero.util import get_user, long_to_path
from math import ceil
from stat import ST_SIZE


# The directories underneath an OMERO data directory to search for "dangling"
# files and reconcile with the database. Directory name key and corresponding
# OMERO data type value.
SEARCH_DIRECTORIES = {
    'Pixels': 'Pixels',
    'Files': 'OriginalFile',
    'Thumbnails': 'Thumbnail'
}


[docs] def usage(error): """ Prints usage so that we don't have to. :) """ cmd = sys.argv[0] print("""%s Usage: %s [--dry-run] [-u username | -k] <omero.data.dir> Cleanses files in the OMERO data directory that have no reference in the OMERO database. NOTE: As this script is designed to be run via cron or in a scheduled manner it produces NO output unless a dry run is performed. Options: -u Administrator username to log in to OMERO with -k Session key to log in to OMERO with --dry-run Just prints out what would have been done --subdirectory Limit search to a single directory, e.g. Files Examples: %s --dry-run -u root /OMERO Report bugs at https://forum.image.sc/""" % \ (error, cmd, cmd)) sys.exit(2)
[docs] class Cleanser(object): """ Keeps file cleansing state and performs OMERO database reconciliation of files within an OMERO binary repository. """ # Number of objects to defer before we actually make a query QUERY_THRESHOLD = 25 # Strings identifying pyramid files PYRAMID_FILE = "_pyramid" PYRAMID_LOCK = ".pyr_lock" PYRAMID_TEMP = ".tmp" def __init__(self, query_service, object_type, data_dir): self.query_service = query_service self.object_type = object_type self.cleansed = list() self.bytes_cleansed = 0 self.deferred_paths = list() self.dry_run = False self.verbose = False self.data_dir = data_dir
[docs] def is_object_id(self, path): file = os.path.basename(path) try: ofid = int(file) expected_path = long_to_path( ofid, os.path.join(self.data_dir, 'Files')) if expected_path == path: return True except ValueError: pass return False
[docs] def cleanse(self, root): """ Begins a cleansing operation from a given OMERO binary repository root directory. /OMERO/Files or /OMERO/Pixels for instance. """ for file in os.listdir(root): path = os.path.join(root, file) if os.path.isdir(path): # Check if it's an OriginalFile ID if path.startswith(os.path.join(self.data_dir, 'Files')) and \ self.is_object_id(path): self.query_or_defer(path) else: # If it's not a candidate for deletion, recurse into it. self.cleanse(path) else: self.query_or_defer(path)
[docs] def query_or_defer(self, path): """ Adds a given path to the list of deferred paths. If the number of deferred paths has reached the QUERY_THRESHOLD (to reduce database hits) a reconciliation check will happen against OMERO. """ self.deferred_paths.append(path) if len(self.deferred_paths) == self.QUERY_THRESHOLD: self.do_cleanse()
[docs] def do_cleanse(self): """ Actually performs the reconciliation check against OMERO and removes relevant files. """ if len(self.deferred_paths) == 0: return split = os.path.split object_ids = [] for path in self.deferred_paths: file_name = split(path)[1] try: object_id = omero.rtypes.rlong(int(file_name)) except ValueError: try: file_name.index(self.PYRAMID_FILE) id_part = file_name.split("_")[0] if file_name.endswith(self.PYRAMID_FILE): object_id = omero.rtypes.rlong(int(id_part)) elif (file_name.endswith(self.PYRAMID_LOCK) or file_name.endswith(self.PYRAMID_TEMP)): object_id = omero.rtypes.rlong( int(id_part.lstrip('.'))) else: object_id = omero.rtypes.rlong(-1) except ValueError: object_id = omero.rtypes.rlong(-1) object_ids.append(object_id) parameters = omero.sys.Parameters() parameters.map = {'ids': omero.rtypes.rlist(object_ids)} rows = self.query_service.projection( "select o.id from %s as o where o.id in (:ids)" % self.object_type, parameters, {"omero.group": "-1"}) existing_ids = [cols[0].val for cols in rows] for i, object_id in enumerate(object_ids): path = self.deferred_paths[i] if object_id.val not in existing_ids: if object_id.val == -1: if self.dry_run and self.verbose: print(r" \_ %s (ignored/keep)" % path) else: size = os.stat(path)[ST_SIZE] self.cleansed.append(path) self.bytes_cleansed = size if os.path.isdir(path): if self.dry_run: print(f" \_ {path} (removedir)") else: print(f"No action taken for directory {path}") else: if self.dry_run: print(f" \_ {path} (remove)") else: try: os.unlink(path) except OSError as e: print(e) elif self.dry_run and self.verbose: print(r" \_ %s (keep)" % path) self.deferred_paths = list()
[docs] def finalize(self): """ Takes the final set of deferred paths and performs a reconciliation check against OMERO for them. This method's purpose is basically to catch the final set of paths in the deferred path list and/or perform any cleanup. """ self.do_cleanse()
def __str__(self): return "Cleansing context: %d files (%d bytes)" % \ (len(self.cleansed), self.bytes_cleansed)
[docs] def initial_check(config_service, admin_service=None): if admin_service is None: raise Exception("No admin service provided!") ctx = admin_service.getEventContext() if not ctx.isAdmin: raise Exception('SecurityViolation: Admins only!') # # Compare server versions. See ticket #3123 # if config_service is None: print("No config service provided! " "Waiting 10 seconds to allow cancellation") from threading import Event Event().wait(10) server_version = config_service.getVersion() server_tuple = tuple([int(x) for x in server_version.split(".")]) if server_tuple < (4, 2, 1): print("Server version is too old! (%s) Aborting..." % server_version) sys.exit(3)
[docs] def cleanse_dir(data_dir, directory, dry_run, verbose, query_service): full_path = os.path.join(data_dir, directory) if not os.path.exists(full_path): print("%s does not exist. Skipping..." % full_path) return None if dry_run: print("Reconciling OMERO data directory...\n %s" % full_path) object_type = SEARCH_DIRECTORIES[directory] cleanser = Cleanser(query_service, object_type, data_dir) cleanser.dry_run = dry_run cleanser.verbose = verbose cleanser.cleanse(full_path) cleanser.finalize() return cleanser
[docs] def cleanse(data_dir, client, dry_run=False, subdirectory=None, verbose=False): client.getImplicitContext().put(omero.constants.GROUP, '-1') admin_service = client.sf.getAdminService() query_service = client.sf.getQueryService() config_service = client.sf.getConfigService() initial_check(config_service, admin_service) if subdirectory is None or subdirectory != "ManagedRepository": try: cleanser = "" if subdirectory is not None: cleanser = cleanse_dir( data_dir, subdirectory, dry_run, verbose, query_service) else: for directory in SEARCH_DIRECTORIES: cleanser = cleanse_dir( data_dir, directory, dry_run, verbose, query_service) finally: if dry_run: print(cleanser) if subdirectory is None or subdirectory == "ManagedRepository": # delete empty directories from the managed repositories proxy, description = client.getManagedRepository(description=True) if proxy: root = description.path.val + description.name.val print("Removing empty directories from...\n %s" % root) delete_empty_dirs(proxy, root, client, dry_run)
[docs] def delete_empty_dirs(repo, root, client, dry_run): # empty subdirectories are to be appended to this list to_delete = [] # find the empty subdirectories is_empty_dir(repo, '/', False, to_delete) if dry_run: for directory in to_delete: print(r" \_ %s%s (remove)" % (root, directory)) elif to_delete: # probably less than a screenful batch_size = 20 for from_index in range(0, len(to_delete), batch_size): batch_to_delete = to_delete[from_index:from_index + batch_size] for directory in batch_to_delete: print("Removing %s%s" % (root, directory)) handle = repo.deletePaths(batch_to_delete, True, False) try: client.waitOnCmd(handle, closehandle=True) except omero.CmdError as ce: if isinstance(ce.err, omero.cmd.GraphException): raise Exception("failed delete: " + ce.err.message) else: raise Exception("failed: " + ce.err.name)
[docs] def is_empty_dir(repo, directory, may_delete_dir, to_delete): empty_subdirs = [] is_empty = True try: entries = repo.listFiles(directory) except omero.ServerError as e: print(f" ERROR: {e.message}") return False for entry in entries: subdirectory = directory + entry.name.val + '/' may_delete_subdir = entry.details.permissions.canDelete() if entry.mimetype is not None and \ entry.mimetype.val == 'Directory' and \ is_empty_dir(repo, subdirectory, may_delete_subdir, empty_subdirs): if may_delete_subdir: # note empty subdirectories that can be deleted empty_subdirs.append(subdirectory) else: is_empty = False if not (may_delete_dir and is_empty): # cannot delete this directory, so note empty subdirectories to_delete.extend(empty_subdirs) return is_empty
[docs] def fixpyramids(data_dir, query_service, dry_run=False, config_service=None, admin_service=None): initial_check(config_service, admin_service) # look for any pyramid files with length 0 # if there is no matching .*.tmp or .*.pyr_lock file, then # the pyramid file will be removed pixels_dir = os.path.join(data_dir, "Pixels") for root, dirs, files in os.walk(pixels_dir): for f in files: pixels_file = os.path.join(root, f) length = os.path.getsize(pixels_file) if length == 0 and f.endswith("_pyramid"): delete_pyramid = True for lockfile in os.listdir(pixels_dir): if lockfile.startswith("." + f) and \ (lockfile.endswith(".tmp") or lockfile.endswith(".pyr_lock")): delete_pyramid = False break if delete_pyramid: if dry_run: print("Would remove %s" % f) else: print("Removing %s" % f) os.remove(pixels_file)
[docs] def removepyramids(client, little_endian=None, dry_run=False, imported_after=None, wait=25, limit=500): client.getImplicitContext().put(omero.constants.GROUP, '-1') admin_service = client.sf.getAdminService() config_service = client.sf.getConfigService() initial_check(config_service, admin_service) value = int(limit) # If the default limit is changed, please update the help # in admin.py if value > 500 or value <= 0: value = int(500) print("No more than %s pyramids will be removed" % value) # look for any pyramid files with the specified endianness # the pyramid file will be removed request = omero.cmd.FindPyramids() request.littleEndian = little_endian request.importedAfter = imported_after request.limit = value cb = None ms = 500 loops = ceil(wait * 1000.0 / ms) try: cb = client.submit(request, loops=loops, ms=ms, failonerror=True, failontimeout=True) rsp = cb.getResponse() except omero.CmdError as ce: print("Failed to load pyramids: %s" % ce.err.name) return finally: if cb: cb.close(True) if len(rsp.pyramidFiles) == 0: print("No pyramids to remove") return for j in range(len(rsp.pyramidFiles)): image_id = rsp.pyramidFiles[j] if dry_run: print("Would remove pyramid for image %s" % image_id) else: req = omero.cmd.ManageImageBinaries() req.imageId = image_id req.deletePyramid = True req.deleteThumbnails = True try: cb = client.submit(req, loops=loops, ms=ms, failonerror=True, failontimeout=True) print("Pyramid removed for image %s" % image_id) except omero.CmdError as ce: print("Failed to remove for image %s: %s" % ( image_id, ce.err.name)) finally: if cb: try: cb.close(True) except Ice.NotRegisteredException: print("Error closing callback for %s" % image_id)
[docs] def main(): """ Default main() that performs OMERO data directory cleansing. """ warnings.warn( "Calling omero.util.cleanse.main directly is deprecated. " "Use omero admin cleanse instead", DeprecationWarning) try: options, args = getopt(sys.argv[1:], "u:k:", ["dry-run", "subdirectory=", "verbose"]) except GetoptError as xxx_todo_changeme: (msg, opt) = xxx_todo_changeme.args usage(msg) try: data_dir, = args except: usage('Expecting single OMERO data directory!') username = get_user("root") session_key = None dry_run = False subdirectory = None verbose = False for option, argument in options: if option == "-u": username = argument if option == "-k": session_key = argument if option == "--dry-run": dry_run = True if option == "--subdirectory": subdirectory = argument if option == "--verbose": verbose = True if session_key is None: print("Username: %s" % username) try: password = getpass.getpass() except KeyboardInterrupt: sys.exit(2) try: client = omero.client('localhost') client.setAgent("OMERO.cleanse") if session_key is None: client.createSession(username, password) else: client.createSession(session_key) except PermissionDeniedException: print("%s: Permission denied" % sys.argv[0]) print("Sorry.") sys.exit(1) try: print(f'verbose is {verbose}') cleanse(data_dir, client, dry_run, subdirectory=subdirectory, verbose=verbose) finally: if session_key is None: client.closeSession()
if __name__ == '__main__': main()