]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr / volumes: purge queue for async subvolume delete
authorVenky Shankar <vshankar@redhat.com>
Mon, 17 Jun 2019 12:21:43 +0000 (08:21 -0400)
committerVenky Shankar <vshankar@redhat.com>
Mon, 8 Jul 2019 03:58:16 +0000 (23:58 -0400)
Support asynchronous subvolume deletes by handing off the delete
operation to a dedicated set of threads. A subvolume delete operation
renames the subvolume (subdirectory) to a unique trash path entry
and signals the set of worker threads to pick up entries from the
trash directory for background removal.

This commit implements a `thread pool` strategy as a class mixin.

Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/pybind/mgr/volumes/fs/purge_queue.py [new file with mode: 0644]
src/pybind/mgr/volumes/fs/subvolspec.py
src/pybind/mgr/volumes/fs/subvolume.py
src/pybind/mgr/volumes/fs/volume.py

diff --git a/src/pybind/mgr/volumes/fs/purge_queue.py b/src/pybind/mgr/volumes/fs/purge_queue.py
new file mode 100644 (file)
index 0000000..919d87c
--- /dev/null
@@ -0,0 +1,193 @@
+import time
+import logging
+import threading
+from collections import deque
+
+log = logging.getLogger(__name__)
+
+class PurgeQueueBase(object):
+    """
+    Base class for implementing purge queue strategies.
+    """
+    class PurgeThread(threading.Thread):
+        def __init__(self, name, purge_fn):
+            self.purge_fn = purge_fn
+            # event object to cancel ongoing purge
+            self.cancel_event = threading.Event()
+            threading.Thread.__init__(self, name=name)
+
+        def run(self):
+            self.purge_fn()
+
+        def cancel_job(self):
+            self.cancel_event.set()
+
+        def should_cancel(self):
+            return self.cancel_event.isSet()
+
+        def reset_cancel(self):
+            self.cancel_event.clear()
+
+    def __init__(self, volume_client):
+        self.vc = volume_client
+        # volumes whose subvolumes need to be purged
+        self.q = deque()
+        # job tracking
+        self.jobs = {}
+        # lock, cv for kickstarting purge
+        self.lock = threading.Lock()
+        self.cv = threading.Condition(self.lock)
+        # lock, cv for purge cancellation
+        self.waiting = False
+        self.c_lock = threading.Lock()
+        self.c_cv = threading.Condition(self.c_lock)
+
+    def queue_purge_job(self, volname):
+        with self.lock:
+            if not self.q.count(volname):
+                self.q.append(volname)
+                self.jobs[volname] = []
+            self.cv.notifyAll()
+
+    def cancel_purge_job(self, volname):
+        log.info("cancelling purge jobs for volume '{0}'".format(volname))
+        self.lock.acquire()
+        unlock = True
+        try:
+            if not self.q.count(volname):
+                return
+            self.q.remove(volname)
+            if not self.jobs.get(volname, []):
+                return
+            # cancel in-progress purge operation and wait until complete
+            for j in self.jobs[volname]:
+                j[1].cancel_job()
+            # wait for cancellation to complete
+            with self.c_lock:
+                unlock = False
+                self.waiting = True
+                self.lock.release()
+                while self.waiting:
+                    log.debug("waiting for {0} in-progress purge jobs for volume '{1}' to " \
+                             "cancel".format(len(self.jobs[volname]), volname))
+                    self.c_cv.wait()
+        finally:
+            if unlock:
+                self.lock.release()
+
+    def register_job(self, volname, purge_dir):
+        log.debug("registering purge job: {0}.{1}".format(volname, purge_dir))
+
+        thread_id = threading.currentThread()
+        self.jobs[volname].append((purge_dir, thread_id))
+
+    def unregister_job(self, volname, purge_dir):
+        log.debug("unregistering purge job: {0}.{1}".format(volname, purge_dir))
+
+        thread_id = threading.currentThread()
+        self.jobs[volname].remove((purge_dir, thread_id))
+
+        cancelled = thread_id.should_cancel()
+        thread_id.reset_cancel()
+
+        # wake up cancellation waiters if needed
+        if not self.jobs[volname] and cancelled:
+            logging.info("waking up cancellation waiters")
+            self.jobs.pop(volname)
+            with self.c_lock:
+                self.waiting = False
+                self.c_cv.notifyAll()
+
+    def get_trash_entry_for_volume(self, volname):
+        log.debug("fetching trash entry for volume '{0}'".format(volname))
+
+        exclude_entries = [v[0] for v in self.jobs[volname]]
+        ret = self.vc.get_subvolume_trash_entry(
+            None, vol_name=volname, exclude_entries=exclude_entries)
+        if not ret[0] == 0:
+            log.error("error fetching trash entry for volume '{0}': {1}".format(volname), ret[0])
+            return ret[0], None
+        return 0, ret[1]
+
+    def purge_trash_entry_for_volume(self, volname, purge_dir):
+        log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir, volname))
+
+        thread_id = threading.currentThread()
+        ret = self.vc.purge_subvolume_trash_entry(
+            None, vol_name=volname, purge_dir=purge_dir, should_cancel=lambda: thread_id.should_cancel())
+        return ret[0]
+
+class ThreadPoolPurgeQueueMixin(PurgeQueueBase):
+    """
+    Purge queue mixin class maintaining a pool of threads for purging trash entries.
+    Subvolumes are chosen from volumes in a round robin fashion. If some of the purge
+    entries (belonging to a set of volumes) have huge directory tree's (such as, lots
+    of small files in a directory w/ deep directory trees), this model may lead to
+    _all_ threads purging entries for one volume (starving other volumes).
+    """
+    def __init__(self, volume_client, tp_size):
+        super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client)
+        self.threads = []
+        for i in range(tp_size):
+            self.threads.append(
+                PurgeQueueBase.PurgeThread(name="purgejob.{}".format(i), purge_fn=self.run))
+            self.threads[-1].start()
+
+    def pick_purge_dir_from_volume(self):
+        log.debug("processing {0} purge job entries".format(len(self.q)))
+        nr_vols = len(self.q)
+        to_remove = []
+        to_purge = None, None
+        while nr_vols > 0:
+            volname = self.q[0]
+            # do this now so that the other thread picks up trash entry
+            # for next volume.
+            self.q.rotate(1)
+            ret, purge_dir = self.get_trash_entry_for_volume(volname)
+            if purge_dir:
+                to_purge = volname, purge_dir
+                break
+            # this is an optimization when for a given volume there are no more
+            # entries in trash and no purge operations are in progress. in such
+            # a case we remove the volume from the tracking list so as to:
+            #
+            # a. not query the filesystem for trash entries over and over again
+            # b. keep the filesystem connection idle so that it can be freed
+            #    from the connection pool
+            #
+            # if at all there are subvolume deletes, the volume gets added again
+            # to the tracking list and the purge operations kickstarts.
+            # note that, we do not iterate the volume list fully if there is a
+            # purge entry to process (that will take place eventually).
+            if ret == 0 and not purge_dir and not self.jobs[volname]:
+                to_remove.append(volname)
+            nr_vols -= 1
+        for vol in to_remove:
+            log.debug("auto removing volume '{0}' from purge job".format(vol))
+            self.q.remove(vol)
+            self.jobs.pop(vol)
+        return to_purge
+
+    def get_next_trash_entry(self):
+        while True:
+            # wait till there's a purge job
+            while not self.q:
+                log.debug("purge job list empty, waiting...")
+                self.cv.wait()
+            volname, purge_dir = self.pick_purge_dir_from_volume()
+            if purge_dir:
+                return volname, purge_dir
+            log.debug("no purge jobs available, waiting...")
+            self.cv.wait()
+
+    def run(self):
+        while True:
+            with self.lock:
+                volname, purge_dir = self.get_next_trash_entry()
+                self.register_job(volname, purge_dir)
+            ret = self.purge_trash_entry_for_volume(volname, purge_dir)
+            if ret != 0:
+                log.warn("failed to purge {0}.{1}".format(volname, purge_dir))
+            with self.lock:
+                self.unregister_job(volname, purge_dir)
+            time.sleep(1)
index 6efb24ca8679fb1159f6845fc6636dc074086132..02de8c7a92ebc2529a1c5ce628906968dba00fae 100644 (file)
@@ -1,4 +1,5 @@
 import os
+import uuid
 
 class SubvolumeSpec(object):
     """
@@ -64,6 +65,13 @@ class SubvolumeSpec(object):
         """
         return os.path.join(self.subvolume_prefix, "_deleting", self.subvolumeid)
 
+    @property
+    def unique_trash_path(self):
+        """
+        return a unique trash directory entry path
+        """
+        return os.path.join(self.subvolume_prefix, "_deleting", str(uuid.uuid4()))
+
     @property
     def fs_namespace(self):
         """
index ce5142d3784fadd407fa1769f844a38f305bcf91..6abbc581c8a8309fd1adcd34d6ccf89dfd820b9f 100644 (file)
@@ -56,6 +56,28 @@ class SubVolume(object):
             except cephfs.Error as e:
                 raise VolumeException(-e.args[0], e.args[1])
 
+    def _get_single_dir_entry(self, dir_path, exclude=[]):
+        """
+        Return a directory entry in a given directory exclusing passed
+        in entries.
+        """
+        try:
+            dir_handle = self.fs.opendir(dir_path)
+        except cephfs.Error as e:
+            raise VolumeException(-e.args[0], e.args[1])
+
+        exclude.extend([".", ".."])
+
+        d = self.fs.readdir(dir_handle)
+        d_name = None
+        while d:
+            if not d.d_name.decode('utf-8') in exclude and d.is_dir():
+                d_name = d.d_name.decode('utf-8')
+                break
+            d = self.fs.readdir(dir_handle)
+        self.fs.closedir(dir_handle)
+        return d_name
+
     ### basic subvolume operations
 
     def create_subvolume(self, spec, size=None, namespace_isolated=True, mode=0o755, pool=None):
@@ -99,8 +121,8 @@ class SubVolume(object):
     def remove_subvolume(self, spec, force):
         """
         Make a subvolume inaccessible to guests.  This function is idempotent.
-        This is the fast part of tearing down a subvolume: you must also later
-        call purge_subvolume, which is the slow part.
+        This is the fast part of tearing down a subvolume. The subvolume will
+        get purged in the background.
 
         :param spec: subvolume path specification
         :param force: flag to ignore non-existent path (never raise exception)
@@ -114,7 +136,10 @@ class SubVolume(object):
         trashdir = spec.trash_dir
         self._mkdir_p(trashdir)
 
-        trashpath = spec.trash_path
+        # mangle the trash directroy entry to a random string so that subsequent
+        # subvolume create and delete with same name moves the subvolume directory
+        # to a unique trash dir (else, rename() could fail if the trash dir exist).
+        trashpath = spec.unique_trash_path
         try:
             self.fs.rename(subvolpath, trashpath)
         except cephfs.ObjectNotFound:
@@ -124,10 +149,9 @@ class SubVolume(object):
         except cephfs.Error as e:
             raise VolumeException(-e.args[0], e.args[1])
 
-    def purge_subvolume(self, spec):
+    def purge_subvolume(self, spec, should_cancel):
         """
-        Finish clearing up a subvolume that was previously passed to delete_subvolume.  This
-        function is idempotent.
+        Finish clearing up a subvolume from the trash directory.
         """
 
         def rmtree(root_path):
@@ -139,7 +163,7 @@ class SubVolume(object):
             except cephfs.Error as e:
                 raise VolumeException(-e.args[0], e.args[1])
             d = self.fs.readdir(dir_handle)
-            while d:
+            while d and not should_cancel():
                 d_name = d.d_name.decode('utf-8')
                 if d_name not in [".", ".."]:
                     # Do not use os.path.join because it is sensitive
@@ -153,7 +177,10 @@ class SubVolume(object):
 
                 d = self.fs.readdir(dir_handle)
             self.fs.closedir(dir_handle)
-            self.fs.rmdir(root_path)
+            # remove the directory only if we were not asked to cancel
+            # (else we would fail to remove this anyway)
+            if not should_cancel():
+                self.fs.rmdir(root_path)
 
         trashpath = spec.trash_path
         # catch any unlink errors
@@ -256,6 +283,16 @@ class SubVolume(object):
         snappath = spec.make_group_snap_path(self.rados.conf_get('client_snapdir'), snapname)
         return self._snapshot_delete(snappath, force)
 
+    def get_trash_entry(self, spec, exclude):
+        try:
+            trashdir = spec.trash_dir
+            return self._get_single_dir_entry(trashdir, exclude)
+        except VolumeException as ve:
+            if ve.errno == -errno.ENOENT:
+                # trash dir does not exist yet, signal success
+                return None
+            raise
+
     ### context manager routines
 
     def __enter__(self):
index c1ebe7e325149a0a7ea0f15ed04b1fc8bb480ef4..37d82402acf21ab9e1cdc720342b97293ac9b788 100644 (file)
@@ -16,6 +16,7 @@ import orchestrator
 from .subvolspec import SubvolumeSpec
 from .subvolume import SubVolume
 from .exception import VolumeException
+from .purge_queue import ThreadPoolPurgeQueueMixin
 
 log = logging.getLogger(__name__)
 
@@ -160,6 +161,8 @@ class VolumeClient(object):
     def __init__(self, mgr):
         self.mgr = mgr
         self.connection_pool = ConnectionPool(self.mgr)
+        # TODO: make thread pool size configurable
+        self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
 
     def gen_pool_names(self, volname):
         """
@@ -273,6 +276,7 @@ class VolumeClient(object):
         """
         delete the given module (tear down mds, remove filesystem)
         """
+        self.purge_queue.cancel_purge_job(volname)
         self.connection_pool.del_fs_handle(volname)
         # Tear down MDS daemons
         try:
@@ -390,7 +394,7 @@ class VolumeClient(object):
                 spec = SubvolumeSpec(subvolname, groupname)
                 if self.group_exists(sv, spec):
                     sv.remove_subvolume(spec, force)
-                    sv.purge_subvolume(spec)
+                    self.purge_queue.queue_purge_job(volname)
                 elif not force:
                     raise VolumeException(
                         -errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \
@@ -545,3 +549,33 @@ class VolumeClient(object):
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
+
+    @connection_pool_wrap
+    def get_subvolume_trash_entry(self, fs_handle, **kwargs):
+        ret = None
+        volname = kwargs['vol_name']
+        exclude = kwargs.get('exclude_entries', [])
+
+        try:
+            with SubVolume(self.mgr, fs_handle) as sv:
+                spec = SubvolumeSpec("", "")
+                path = sv.get_trash_entry(spec, exclude)
+                ret = 0, path, ""
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    @connection_pool_wrap
+    def purge_subvolume_trash_entry(self, fs_handle, **kwargs):
+        ret = 0, "", ""
+        volname = kwargs['vol_name']
+        purge_dir = kwargs['purge_dir']
+        should_cancel = kwargs.get('should_cancel', lambda: False)
+
+        try:
+            with SubVolume(self.mgr, fs_handle) as sv:
+                spec = SubvolumeSpec(purge_dir, "")
+                sv.purge_subvolume(spec, should_cancel)
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret