From 483a2141fe8c9a58bc25a544412cdf5b047ad772 Mon Sep 17 00:00:00 2001 From: Venky Shankar Date: Mon, 17 Jun 2019 08:21:43 -0400 Subject: [PATCH] mgr / volumes: purge queue for async subvolume delete Support asynchronous subvolume deletes by handing off the delete operation to a dedicated set of threads. A subvolume delete operation renames the subvolume (subdirectory) to a unique trash path entry and signals the set of worker threads to pick up entries from the trash directory for background removal. This commit implements a `thread pool` strategy as a class mixin. Signed-off-by: Venky Shankar --- src/pybind/mgr/volumes/fs/purge_queue.py | 193 +++++++++++++++++++++++ src/pybind/mgr/volumes/fs/subvolspec.py | 8 + src/pybind/mgr/volumes/fs/subvolume.py | 53 ++++++- src/pybind/mgr/volumes/fs/volume.py | 36 ++++- 4 files changed, 281 insertions(+), 9 deletions(-) create mode 100644 src/pybind/mgr/volumes/fs/purge_queue.py diff --git a/src/pybind/mgr/volumes/fs/purge_queue.py b/src/pybind/mgr/volumes/fs/purge_queue.py new file mode 100644 index 0000000000000..919d87c1737d0 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/purge_queue.py @@ -0,0 +1,193 @@ +import time +import logging +import threading +from collections import deque + +log = logging.getLogger(__name__) + +class PurgeQueueBase(object): + """ + Base class for implementing purge queue strategies. + """ + class PurgeThread(threading.Thread): + def __init__(self, name, purge_fn): + self.purge_fn = purge_fn + # event object to cancel ongoing purge + self.cancel_event = threading.Event() + threading.Thread.__init__(self, name=name) + + def run(self): + self.purge_fn() + + def cancel_job(self): + self.cancel_event.set() + + def should_cancel(self): + return self.cancel_event.isSet() + + def reset_cancel(self): + self.cancel_event.clear() + + def __init__(self, volume_client): + self.vc = volume_client + # volumes whose subvolumes need to be purged + self.q = deque() + # job tracking + self.jobs = {} + # lock, cv for kickstarting purge + self.lock = threading.Lock() + self.cv = threading.Condition(self.lock) + # lock, cv for purge cancellation + self.waiting = False + self.c_lock = threading.Lock() + self.c_cv = threading.Condition(self.c_lock) + + def queue_purge_job(self, volname): + with self.lock: + if not self.q.count(volname): + self.q.append(volname) + self.jobs[volname] = [] + self.cv.notifyAll() + + def cancel_purge_job(self, volname): + log.info("cancelling purge jobs for volume '{0}'".format(volname)) + self.lock.acquire() + unlock = True + try: + if not self.q.count(volname): + return + self.q.remove(volname) + if not self.jobs.get(volname, []): + return + # cancel in-progress purge operation and wait until complete + for j in self.jobs[volname]: + j[1].cancel_job() + # wait for cancellation to complete + with self.c_lock: + unlock = False + self.waiting = True + self.lock.release() + while self.waiting: + log.debug("waiting for {0} in-progress purge jobs for volume '{1}' to " \ + "cancel".format(len(self.jobs[volname]), volname)) + self.c_cv.wait() + finally: + if unlock: + self.lock.release() + + def register_job(self, volname, purge_dir): + log.debug("registering purge job: {0}.{1}".format(volname, purge_dir)) + + thread_id = threading.currentThread() + self.jobs[volname].append((purge_dir, thread_id)) + + def unregister_job(self, volname, purge_dir): + log.debug("unregistering purge job: {0}.{1}".format(volname, purge_dir)) + + thread_id = threading.currentThread() + self.jobs[volname].remove((purge_dir, thread_id)) + + cancelled = thread_id.should_cancel() + thread_id.reset_cancel() + + # wake up cancellation waiters if needed + if not self.jobs[volname] and cancelled: + logging.info("waking up cancellation waiters") + self.jobs.pop(volname) + with self.c_lock: + self.waiting = False + self.c_cv.notifyAll() + + def get_trash_entry_for_volume(self, volname): + log.debug("fetching trash entry for volume '{0}'".format(volname)) + + exclude_entries = [v[0] for v in self.jobs[volname]] + ret = self.vc.get_subvolume_trash_entry( + None, vol_name=volname, exclude_entries=exclude_entries) + if not ret[0] == 0: + log.error("error fetching trash entry for volume '{0}': {1}".format(volname), ret[0]) + return ret[0], None + return 0, ret[1] + + def purge_trash_entry_for_volume(self, volname, purge_dir): + log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir, volname)) + + thread_id = threading.currentThread() + ret = self.vc.purge_subvolume_trash_entry( + None, vol_name=volname, purge_dir=purge_dir, should_cancel=lambda: thread_id.should_cancel()) + return ret[0] + +class ThreadPoolPurgeQueueMixin(PurgeQueueBase): + """ + Purge queue mixin class maintaining a pool of threads for purging trash entries. + Subvolumes are chosen from volumes in a round robin fashion. If some of the purge + entries (belonging to a set of volumes) have huge directory tree's (such as, lots + of small files in a directory w/ deep directory trees), this model may lead to + _all_ threads purging entries for one volume (starving other volumes). + """ + def __init__(self, volume_client, tp_size): + super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client) + self.threads = [] + for i in range(tp_size): + self.threads.append( + PurgeQueueBase.PurgeThread(name="purgejob.{}".format(i), purge_fn=self.run)) + self.threads[-1].start() + + def pick_purge_dir_from_volume(self): + log.debug("processing {0} purge job entries".format(len(self.q))) + nr_vols = len(self.q) + to_remove = [] + to_purge = None, None + while nr_vols > 0: + volname = self.q[0] + # do this now so that the other thread picks up trash entry + # for next volume. + self.q.rotate(1) + ret, purge_dir = self.get_trash_entry_for_volume(volname) + if purge_dir: + to_purge = volname, purge_dir + break + # this is an optimization when for a given volume there are no more + # entries in trash and no purge operations are in progress. in such + # a case we remove the volume from the tracking list so as to: + # + # a. not query the filesystem for trash entries over and over again + # b. keep the filesystem connection idle so that it can be freed + # from the connection pool + # + # if at all there are subvolume deletes, the volume gets added again + # to the tracking list and the purge operations kickstarts. + # note that, we do not iterate the volume list fully if there is a + # purge entry to process (that will take place eventually). + if ret == 0 and not purge_dir and not self.jobs[volname]: + to_remove.append(volname) + nr_vols -= 1 + for vol in to_remove: + log.debug("auto removing volume '{0}' from purge job".format(vol)) + self.q.remove(vol) + self.jobs.pop(vol) + return to_purge + + def get_next_trash_entry(self): + while True: + # wait till there's a purge job + while not self.q: + log.debug("purge job list empty, waiting...") + self.cv.wait() + volname, purge_dir = self.pick_purge_dir_from_volume() + if purge_dir: + return volname, purge_dir + log.debug("no purge jobs available, waiting...") + self.cv.wait() + + def run(self): + while True: + with self.lock: + volname, purge_dir = self.get_next_trash_entry() + self.register_job(volname, purge_dir) + ret = self.purge_trash_entry_for_volume(volname, purge_dir) + if ret != 0: + log.warn("failed to purge {0}.{1}".format(volname, purge_dir)) + with self.lock: + self.unregister_job(volname, purge_dir) + time.sleep(1) diff --git a/src/pybind/mgr/volumes/fs/subvolspec.py b/src/pybind/mgr/volumes/fs/subvolspec.py index 6efb24ca8679f..02de8c7a92ebc 100644 --- a/src/pybind/mgr/volumes/fs/subvolspec.py +++ b/src/pybind/mgr/volumes/fs/subvolspec.py @@ -1,4 +1,5 @@ import os +import uuid class SubvolumeSpec(object): """ @@ -64,6 +65,13 @@ class SubvolumeSpec(object): """ return os.path.join(self.subvolume_prefix, "_deleting", self.subvolumeid) + @property + def unique_trash_path(self): + """ + return a unique trash directory entry path + """ + return os.path.join(self.subvolume_prefix, "_deleting", str(uuid.uuid4())) + @property def fs_namespace(self): """ diff --git a/src/pybind/mgr/volumes/fs/subvolume.py b/src/pybind/mgr/volumes/fs/subvolume.py index ce5142d3784fa..6abbc581c8a83 100644 --- a/src/pybind/mgr/volumes/fs/subvolume.py +++ b/src/pybind/mgr/volumes/fs/subvolume.py @@ -56,6 +56,28 @@ class SubVolume(object): except cephfs.Error as e: raise VolumeException(-e.args[0], e.args[1]) + def _get_single_dir_entry(self, dir_path, exclude=[]): + """ + Return a directory entry in a given directory exclusing passed + in entries. + """ + try: + dir_handle = self.fs.opendir(dir_path) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + exclude.extend([".", ".."]) + + d = self.fs.readdir(dir_handle) + d_name = None + while d: + if not d.d_name.decode('utf-8') in exclude and d.is_dir(): + d_name = d.d_name.decode('utf-8') + break + d = self.fs.readdir(dir_handle) + self.fs.closedir(dir_handle) + return d_name + ### basic subvolume operations def create_subvolume(self, spec, size=None, namespace_isolated=True, mode=0o755, pool=None): @@ -99,8 +121,8 @@ class SubVolume(object): def remove_subvolume(self, spec, force): """ Make a subvolume inaccessible to guests. This function is idempotent. - This is the fast part of tearing down a subvolume: you must also later - call purge_subvolume, which is the slow part. + This is the fast part of tearing down a subvolume. The subvolume will + get purged in the background. :param spec: subvolume path specification :param force: flag to ignore non-existent path (never raise exception) @@ -114,7 +136,10 @@ class SubVolume(object): trashdir = spec.trash_dir self._mkdir_p(trashdir) - trashpath = spec.trash_path + # mangle the trash directroy entry to a random string so that subsequent + # subvolume create and delete with same name moves the subvolume directory + # to a unique trash dir (else, rename() could fail if the trash dir exist). + trashpath = spec.unique_trash_path try: self.fs.rename(subvolpath, trashpath) except cephfs.ObjectNotFound: @@ -124,10 +149,9 @@ class SubVolume(object): except cephfs.Error as e: raise VolumeException(-e.args[0], e.args[1]) - def purge_subvolume(self, spec): + def purge_subvolume(self, spec, should_cancel): """ - Finish clearing up a subvolume that was previously passed to delete_subvolume. This - function is idempotent. + Finish clearing up a subvolume from the trash directory. """ def rmtree(root_path): @@ -139,7 +163,7 @@ class SubVolume(object): except cephfs.Error as e: raise VolumeException(-e.args[0], e.args[1]) d = self.fs.readdir(dir_handle) - while d: + while d and not should_cancel(): d_name = d.d_name.decode('utf-8') if d_name not in [".", ".."]: # Do not use os.path.join because it is sensitive @@ -153,7 +177,10 @@ class SubVolume(object): d = self.fs.readdir(dir_handle) self.fs.closedir(dir_handle) - self.fs.rmdir(root_path) + # remove the directory only if we were not asked to cancel + # (else we would fail to remove this anyway) + if not should_cancel(): + self.fs.rmdir(root_path) trashpath = spec.trash_path # catch any unlink errors @@ -256,6 +283,16 @@ class SubVolume(object): snappath = spec.make_group_snap_path(self.rados.conf_get('client_snapdir'), snapname) return self._snapshot_delete(snappath, force) + def get_trash_entry(self, spec, exclude): + try: + trashdir = spec.trash_dir + return self._get_single_dir_entry(trashdir, exclude) + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + # trash dir does not exist yet, signal success + return None + raise + ### context manager routines def __enter__(self): diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index c1ebe7e325149..37d82402acf21 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -16,6 +16,7 @@ import orchestrator from .subvolspec import SubvolumeSpec from .subvolume import SubVolume from .exception import VolumeException +from .purge_queue import ThreadPoolPurgeQueueMixin log = logging.getLogger(__name__) @@ -160,6 +161,8 @@ class VolumeClient(object): def __init__(self, mgr): self.mgr = mgr self.connection_pool = ConnectionPool(self.mgr) + # TODO: make thread pool size configurable + self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4) def gen_pool_names(self, volname): """ @@ -273,6 +276,7 @@ class VolumeClient(object): """ delete the given module (tear down mds, remove filesystem) """ + self.purge_queue.cancel_purge_job(volname) self.connection_pool.del_fs_handle(volname) # Tear down MDS daemons try: @@ -390,7 +394,7 @@ class VolumeClient(object): spec = SubvolumeSpec(subvolname, groupname) if self.group_exists(sv, spec): sv.remove_subvolume(spec, force) - sv.purge_subvolume(spec) + self.purge_queue.queue_purge_job(volname) elif not force: raise VolumeException( -errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \ @@ -545,3 +549,33 @@ class VolumeClient(object): except VolumeException as ve: ret = self.volume_exception_to_retval(ve) return ret + + @connection_pool_wrap + def get_subvolume_trash_entry(self, fs_handle, **kwargs): + ret = None + volname = kwargs['vol_name'] + exclude = kwargs.get('exclude_entries', []) + + try: + with SubVolume(self.mgr, fs_handle) as sv: + spec = SubvolumeSpec("", "") + path = sv.get_trash_entry(spec, exclude) + ret = 0, path, "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + @connection_pool_wrap + def purge_subvolume_trash_entry(self, fs_handle, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + purge_dir = kwargs['purge_dir'] + should_cancel = kwargs.get('should_cancel', lambda: False) + + try: + with SubVolume(self.mgr, fs_handle) as sv: + spec = SubvolumeSpec(purge_dir, "") + sv.purge_subvolume(spec, should_cancel) + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret -- 2.39.5