]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes: purge thread uses new async interface
authorVenky Shankar <vshankar@redhat.com>
Wed, 4 Dec 2019 04:49:12 +0000 (23:49 -0500)
committerRamana Raja <rraja@redhat.com>
Wed, 12 Feb 2020 10:12:00 +0000 (05:12 -0500)
This also makes `_cancel_jobs()` thread safe, which was not the
case earlier (with `_cancel_purge_job()`) -- this also makes the
code simpler by sharing the lock betweent two condition variables.

Signed-off-by: Venky Shankar <vshankar@redhat.com>
(cherry picked from commit f16cc1e8eebb0868a1e07c25f8d8e4656b11b7bf)

src/pybind/mgr/volumes/fs/async_job.py [new file with mode: 0644]
src/pybind/mgr/volumes/fs/exception.py
src/pybind/mgr/volumes/fs/purge_queue.py
src/pybind/mgr/volumes/fs/volume.py

diff --git a/src/pybind/mgr/volumes/fs/async_job.py b/src/pybind/mgr/volumes/fs/async_job.py
new file mode 100644 (file)
index 0000000..b47b45c
--- /dev/null
@@ -0,0 +1,223 @@
+import sys
+import time
+import logging
+import threading
+import traceback
+from collections import deque
+
+from .exception import NotImplementedException
+
+log = logging.getLogger(__name__)
+
+class JobThread(threading.Thread):
+    # this is "not" configurable and there is no need for it to be
+    # configurable. if a thread encounters an exception, we retry
+    # until it hits this many consecutive exceptions.
+    MAX_RETRIES_ON_EXCEPTION = 10
+
+    def __init__(self, async_job, volume_client, name):
+        self.vc = volume_client
+        self.async_job = async_job
+        # event object to cancel jobs
+        self.cancel_event = threading.Event()
+        threading.Thread.__init__(self, name=name)
+
+    def run(self):
+        retries = 0
+        thread_id = threading.currentThread()
+        thread_name = thread_id.getName()
+
+        while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
+            try:
+                # fetch next job to execute
+                with self.async_job.lock:
+                    while True:
+                        vol_job = self.async_job.get_job()
+                        if vol_job:
+                            break
+                        self.async_job.cv.wait()
+                    self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id)
+
+                # execute the job (outside lock)
+                self.async_job.execute_job(vol_job[0], vol_job[1], should_cancel=lambda: thread_id.should_cancel())
+
+                # when done, unregister the job
+                with self.async_job.lock:
+                    self.async_job.unregister_async_job(vol_job[0], vol_job[1], thread_id)
+                retries = 0
+            except NotImplementedException:
+                raise
+            except Exception:
+                # unless the jobs fetching and execution routines are not implemented
+                # retry till we hit cap limit.
+                retries += 1
+                log.warning("thread [{0}] encountered fatal error: (attempt#" \
+                            " {1}/{2})".format(thread_name, retries, JobThread.MAX_RETRIES_ON_EXCEPTION))
+                exc_type, exc_value, exc_traceback = sys.exc_info()
+                log.warning("traceback: {0}".format("".join(
+                    traceback.format_exception(exc_type, exc_value, exc_traceback))))
+            time.sleep(1)
+        log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name))
+        self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name))
+
+    def cancel_job(self):
+        self.cancel_event.set()
+
+    def should_cancel(self):
+        return self.cancel_event.isSet()
+
+    def reset_cancel(self):
+        self.cancel_event.clear()
+
+class AsyncJobs(object):
+    """
+    Class providing asynchronous execution of jobs via worker threads.
+    `jobs` are grouped by `volume`, so a `volume` can have N number of
+    `jobs` executing concurrently (capped by number of concurrent jobs).
+
+    Usability is simple: subclass this and implement the following:
+      - get_next_job(volname, running_jobs)
+      - execute_job(volname, job, should_cancel)
+
+    ... and do not forget to invoke base class constructor.
+
+    Job cancelation is for a volume as a whole, i.e., all executing jobs
+    for a volume are canceled. Cancelation is poll based -- jobs need to
+    periodically check if cancelation is requested, after which the job
+    should return as soon as possible. Cancelation check is provided
+    via `should_cancel()` lambda passed to `execute_job()`.
+    """
+
+    def __init__(self, volume_client, name_pfx, nr_concurrent_jobs):
+        self.vc = volume_client
+        # queue of volumes for starting async jobs
+        self.q = deque()
+        # volume => job tracking
+        self.jobs = {}
+        # lock, cv for kickstarting jobs
+        self.lock = threading.Lock()
+        self.cv = threading.Condition(self.lock)
+        # cv for job cancelation
+        self.waiting = False
+        self.cancel_cv = threading.Condition(self.lock)
+
+        self.threads = []
+        for i in range(nr_concurrent_jobs):
+            self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i)))
+            self.threads[-1].start()
+
+    def get_job(self):
+        log.debug("processing {0} volume entries".format(len(self.q)))
+        nr_vols = len(self.q)
+        to_remove = []
+        next_job = None
+        while nr_vols > 0:
+            volname = self.q[0]
+            # do this now so that the other thread pick up jobs for other volumes
+            self.q.rotate(1)
+            running_jobs = [j[0] for j in self.jobs[volname]]
+            (ret, job) = self.get_next_job(volname, running_jobs)
+            if job:
+                next_job = (volname, job)
+                break
+            # this is an optimization when for a given volume there are no more
+            # jobs and no jobs are in progress. in such cases we remove the volume
+            # from the tracking list so as to:
+            #
+            # a. not query the filesystem for jobs over and over again
+            # b. keep the filesystem connection idle so that it can be freed
+            #    from the connection pool
+            #
+            # if at all there are jobs for a volume, the volume gets added again
+            # to the tracking list and the jobs get kickstarted.
+            # note that, we do not iterate the volume list fully if there is a
+            # jobs to process (that will take place eventually).
+            if ret == 0 and not job and not running_jobs:
+                to_remove.append(volname)
+            nr_vols -= 1
+        for vol in to_remove:
+            log.debug("auto removing volume '{0}' from tracked volumes".format(vol))
+            self.q.remove(vol)
+            self.jobs.pop(vol)
+        return next_job
+
+    def register_async_job(self, volname, job, thread_id):
+        log.debug("registering async job {0}.{1} with thread {2}".format(volname, job, thread_id))
+        self.jobs[volname].append((job, thread_id))
+
+    def unregister_async_job(self, volname, job, thread_id):
+        log.debug("unregistering async job {0}.{1} from thread {2}".format(volname, job, thread_id))
+        self.jobs[volname].remove((job, thread_id))
+
+        cancelled = thread_id.should_cancel()
+        thread_id.reset_cancel()
+
+        # wake up cancellation waiters if needed
+        if not self.jobs[volname] and cancelled:
+            logging.info("waking up cancellation waiters")
+            self.cancel_cv.notifyAll()
+
+    def queue_job(self, volname):
+        """
+        queue a volume for asynchronous job execution.
+        """
+        log.info("queuing job for volume '{0}'".format(volname))
+        with self.lock:
+            if not volname in self.q:
+                self.q.append(volname)
+                self.jobs[volname] = []
+            self.cv.notifyAll()
+
+    def _cancel_jobs(self, volname):
+        """
+        cancel all jobs for the volume. do nothing is the no jobs are
+        executing for the given volume. this would wait until all jobs
+        get interrupted and finish execution.
+        """
+        log.info("cancelling jobs for volume '{0}'".format(volname))
+        try:
+            if not volname in self.q and not volname in self.jobs:
+                return
+            self.q.remove(volname)
+            # cancel in-progress operation and wait until complete
+            for j in self.jobs[volname]:
+                j[1].cancel_job()
+            # wait for cancellation to complete
+            while self.jobs[volname]:
+                log.debug("waiting for {0} in-progress jobs for volume '{1}' to " \
+                          "cancel".format(len(self.jobs[volname]), volname))
+                self.cancel_cv.wait()
+            self.jobs.pop(volname)
+        except (KeyError, ValueError):
+            pass
+
+    def cancel_jobs(self, volname):
+        """
+        cancel all executing jobs for a given volume.
+        """
+        with self.lock:
+            self._cancel_jobs(volname)
+
+    def cancel_all_jobs(self):
+        """
+        call all executing jobs for all volumes.
+        """
+        with self.lock:
+            for volname in list(self.q):
+                self._cancel_jobs(volname)
+
+    def get_next_job(self, volname, running_jobs):
+        """
+        get the next job for asynchronous execution as (retcode, job) tuple. if no
+        jobs are available return (0, None) else return (0, job). on error return
+        (-ret, None). called under `self.lock`.
+        """
+        raise NotImplementedException()
+
+    def execute_job(self, volname, job, should_cancel):
+        """
+        execute a job for a volume. the job can block on I/O operations, sleep for long
+        hours and do all kinds of synchronous work. called outside `self.lock`.
+        """
+        raise NotImplementedException()
+
index 841b35202eecd24161f1672849917760e12a0059..de0b19109f3546c43e752aff9b58f82b9101f909 100644 (file)
@@ -32,3 +32,6 @@ class OpSmException(Exception):
 
     def __str__(self):
         return "{0} ({1})".format(self.errno, self.error_str)
+
+class NotImplementedException(Exception):
+    pass
index 7c079250d6e21c46caa637d59b265ea894d91f8d..922c8f0027688a323f60a3323baaa72e12c6999f 100644 (file)
-import sys
-import time
 import errno
 import logging
-import threading
-import traceback
-from collections import deque
 
+from .async_job import AsyncJobs
 from .exception import VolumeException
 from .operations.volume import open_volume, open_volume_lockless
 from .operations.trash import open_trashcan
 
 log = logging.getLogger(__name__)
 
-class PurgeQueueBase(object):
-    """
-    Base class for implementing purge queue strategies.
-    """
-
-    # this is "not" configurable and there is no need for it to be
-    # configurable. if a purge thread encounters an exception, we
-    # retry, till it hits this many consecutive exceptions after
-    # which a warning is sent to `ceph status`.
-    MAX_RETRIES_ON_EXCEPTION = 10
-
-    class PurgeThread(threading.Thread):
-        def __init__(self, volume_client, name, purge_fn):
-            self.vc = volume_client
-            self.purge_fn = purge_fn
-            # event object to cancel ongoing purge
-            self.cancel_event = threading.Event()
-            threading.Thread.__init__(self, name=name)
-
-        def run(self):
-            retries = 0
-            thread_name = threading.currentThread().getName()
-            while retries < PurgeQueueBase.MAX_RETRIES_ON_EXCEPTION:
-                try:
-                    self.purge_fn()
-                    retries = 0
-                except Exception:
-                    retries += 1
-                    log.warning("purge thread [{0}] encountered fatal error: (attempt#" \
-                                " {1}/{2})".format(thread_name, retries,
-                                                   PurgeQueueBase.MAX_RETRIES_ON_EXCEPTION))
-                    exc_type, exc_value, exc_traceback = sys.exc_info()
-                    log.warning("traceback: {0}".format("".join(
-                        traceback.format_exception(exc_type, exc_value, exc_traceback))))
-                    time.sleep(1)
-            log.error("purge thread [{0}] reached exception limit, bailing out...".format(thread_name))
-            self.vc.cluster_log("purge thread {0} bailing out due to exception".format(thread_name))
-
-        def cancel_job(self):
-            self.cancel_event.set()
-
-        def should_cancel(self):
-            return self.cancel_event.isSet()
-
-        def reset_cancel(self):
-            self.cancel_event.clear()
-
-    def __init__(self, volume_client):
-        self.vc = volume_client
-        # volumes whose subvolumes need to be purged
-        self.q = deque()
-        # job tracking
-        self.jobs = {}
-        # lock, cv for kickstarting purge
-        self.lock = threading.Lock()
-        self.cv = threading.Condition(self.lock)
-        # lock, cv for purge cancellation
-        self.waiting = False
-        self.c_lock = threading.Lock()
-        self.c_cv = threading.Condition(self.c_lock)
-
-    def queue_purge_job(self, volname):
-        with self.lock:
-            if not self.q.count(volname):
-                self.q.append(volname)
-                self.jobs[volname] = []
-            self.cv.notifyAll()
-
-    def _cancel_purge_job(self, volname):
-        log.info("cancelling purge jobs for volume '{0}'".format(volname))
-        locked = True
-        try:
-            if not self.q.count(volname):
-                return
-            self.q.remove(volname)
-            if not self.jobs.get(volname, []):
-                return
-            # cancel in-progress purge operation and wait until complete
-            for j in self.jobs[volname]:
-                j[1].cancel_job()
-            # wait for cancellation to complete
-            with self.c_lock:
-                locked = False
-                self.waiting = True
-                self.lock.release()
-                while self.waiting:
-                    log.debug("waiting for {0} in-progress purge jobs for volume '{1}' to " \
-                             "cancel".format(len(self.jobs[volname]), volname))
-                    self.c_cv.wait()
-        finally:
-            if not locked:
-                self.lock.acquire()
-
-    def cancel_purge_job(self, volname):
-        self.lock.acquire()
-        self._cancel_purge_job(volname)
-        self.lock.release()
-
-    def cancel_all_jobs(self):
-        self.lock.acquire()
-        for volname in list(self.q):
-            self._cancel_purge_job(volname)
-        self.lock.release()
-
-    def register_job(self, volname, purge_dir):
-        log.debug("registering purge job: {0}.{1}".format(volname, purge_dir))
-
-        thread_id = threading.currentThread()
-        self.jobs[volname].append((purge_dir, thread_id))
-
-    def unregister_job(self, volname, purge_dir):
-        log.debug("unregistering purge job: {0}.{1}".format(volname, purge_dir))
-
-        thread_id = threading.currentThread()
-        self.jobs[volname].remove((purge_dir, thread_id))
-
-        cancelled = thread_id.should_cancel()
-        thread_id.reset_cancel()
-
-        # wake up cancellation waiters if needed
-        if not self.jobs[volname] and cancelled:
-            logging.info("waking up cancellation waiters")
-            self.jobs.pop(volname)
-            with self.c_lock:
-                self.waiting = False
-                self.c_cv.notifyAll()
-
-    def get_trash_entry_for_volume(self, volname):
-        log.debug("fetching trash entry for volume '{0}'".format(volname))
-
-        exclude_entries = [v[0] for v in self.jobs[volname]]
-        fs_handle = None
-        try:
-            with open_volume(self.vc, volname) as fs_handle:
-                with open_trashcan(fs_handle, self.vc.volspec) as trashcan:
-                    path = trashcan.get_trash_entry(exclude_entries)
-                    ret = 0, path
-        except VolumeException as ve:
-            if ve.errno == -errno.ENOENT and fs_handle:
-                ret = 0, None
-            else:
-                log.error("error fetching trash entry for volume '{0}' ({1})".format(volname), ve)
-                ret = ve.errno, None
-        return ret
-
-    def purge_trash_entry_for_volume(self, volname, purge_dir):
-        log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir, volname))
-
-        ret = 0
-        thread_id = threading.currentThread()
-        try:
-            with open_volume_lockless(self.vc, volname) as fs_handle:
-                with open_trashcan(fs_handle, self.vc.volspec) as trashcan:
-                    trashcan.purge(purge_dir, should_cancel=lambda: thread_id.should_cancel())
-        except VolumeException as ve:
-            ret = ve.errno
-        return ret
-
-class ThreadPoolPurgeQueueMixin(PurgeQueueBase):
+# helper for fetching a trash entry for a given volume
+def get_trash_entry_for_volume(volume_client, volname, running_jobs):
+    log.debug("fetching trash entry for volume '{0}'".format(volname))
+
+    try:
+        with open_volume(volume_client, volname) as fs_handle:
+            try:
+                with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
+                    path = trashcan.get_trash_entry(running_jobs)
+                    return 0, path
+            except VolumeException as ve:
+                if ve.errno == -errno.ENOENT:
+                    return 0, None
+                raise ve
+    except VolumeException as ve:
+        log.error("error fetching trash entry for volume '{0}' ({1})".format(volname), ve)
+        return ve.errno, None
+    return ret
+
+# helper for starting a purge operation on a trash entry
+def purge_trash_entry_for_volume(volume_client, volname, purge_dir, should_cancel):
+    log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir, volname))
+
+    ret = 0
+    try:
+        with open_volume_lockless(volume_client, volname) as fs_handle:
+            with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
+                trashcan.purge(purge_dir, should_cancel)
+    except VolumeException as ve:
+        ret = ve.errno
+    return ret
+
+class ThreadPoolPurgeQueueMixin(AsyncJobs):
     """
     Purge queue mixin class maintaining a pool of threads for purging trash entries.
     Subvolumes are chosen from volumes in a round robin fashion. If some of the purge
@@ -179,68 +49,11 @@ class ThreadPoolPurgeQueueMixin(PurgeQueueBase):
     _all_ threads purging entries for one volume (starving other volumes).
     """
     def __init__(self, volume_client, tp_size):
-        super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client)
-        self.threads = []
-        for i in range(tp_size):
-            self.threads.append(
-                PurgeQueueBase.PurgeThread(volume_client, name="purgejob.{}".format(i), purge_fn=self.run))
-            self.threads[-1].start()
-
-    def pick_purge_dir_from_volume(self):
-        log.debug("processing {0} purge job entries".format(len(self.q)))
-        nr_vols = len(self.q)
-        to_remove = []
-        to_purge = None, None
-        while nr_vols > 0:
-            volname = self.q[0]
-            # do this now so that the other thread picks up trash entry
-            # for next volume.
-            self.q.rotate(1)
-            ret, purge_dir = self.get_trash_entry_for_volume(volname)
-            if purge_dir:
-                to_purge = volname, purge_dir
-                break
-            # this is an optimization when for a given volume there are no more
-            # entries in trash and no purge operations are in progress. in such
-            # a case we remove the volume from the tracking list so as to:
-            #
-            # a. not query the filesystem for trash entries over and over again
-            # b. keep the filesystem connection idle so that it can be freed
-            #    from the connection pool
-            #
-            # if at all there are subvolume deletes, the volume gets added again
-            # to the tracking list and the purge operations kickstarts.
-            # note that, we do not iterate the volume list fully if there is a
-            # purge entry to process (that will take place eventually).
-            if ret == 0 and not purge_dir and not self.jobs[volname]:
-                to_remove.append(volname)
-            nr_vols -= 1
-        for vol in to_remove:
-            log.debug("auto removing volume '{0}' from purge job".format(vol))
-            self.q.remove(vol)
-            self.jobs.pop(vol)
-        return to_purge
+        self.vc = volume_client
+        super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client, "puregejob", tp_size)
 
-    def get_next_trash_entry(self):
-        while True:
-            # wait till there's a purge job
-            while not self.q:
-                log.debug("purge job list empty, waiting...")
-                self.cv.wait()
-            volname, purge_dir = self.pick_purge_dir_from_volume()
-            if purge_dir:
-                return volname, purge_dir
-            log.debug("no purge jobs available, waiting...")
-            self.cv.wait()
+    def get_next_job(self, volname, running_jobs):
+        return get_trash_entry_for_volume(self.vc, volname, running_jobs)
 
-    def run(self):
-        while True:
-            with self.lock:
-                volname, purge_dir = self.get_next_trash_entry()
-                self.register_job(volname, purge_dir)
-            ret = self.purge_trash_entry_for_volume(volname, purge_dir)
-            if ret != 0:
-                log.warn("failed to purge {0}.{1} ({2})".format(volname, purge_dir, ret))
-            with self.lock:
-                self.unregister_job(volname, purge_dir)
-            time.sleep(1)
+    def execute_job(self, volname, job, should_cancel):
+        purge_trash_entry_for_volume(self.vc, volname, job, should_cancel)
index f5c0102d979b6743a8f7cc2e6f85adcf0f58294e..e384db4745a38d8224135abd689ecc8c8e7d8ac8 100644 (file)
@@ -50,7 +50,7 @@ class VolumeClient(object):
         # job list.
         fs_map = self.mgr.get('fs_map')
         for fs in fs_map['filesystems']:
-            self.purge_queue.queue_purge_job(fs['mdsmap']['fs_name'])
+            self.purge_queue.queue_job(fs['mdsmap']['fs_name'])
 
     def is_stopping(self):
         return self.stopping.isSet()
@@ -95,7 +95,7 @@ class VolumeClient(object):
                 "that is what you want, re-issue the command followed by " \
                 "--yes-i-really-mean-it.".format(volname)
 
-        self.purge_queue.cancel_purge_job(volname)
+        self.purge_queue.cancel_jobs(volname)
         self.connection_pool.del_fs_handle(volname, wait=True)
         return delete_volume(self.mgr, volname)
 
@@ -121,7 +121,7 @@ class VolumeClient(object):
         except VolumeException as ve:
             # kick the purge threads for async removal -- note that this
             # assumes that the subvolume is moved to trashcan for cleanup on error.
-            self.purge_queue.queue_purge_job(volname)
+            self.purge_queue.queue_job(volname)
             raise ve
 
     def create_subvolume(self, **kwargs):
@@ -162,7 +162,7 @@ class VolumeClient(object):
                     # assumes that the subvolume is moved to trash can.
                     # TODO: make purge queue as singleton so that trash can kicks
                     # the purge threads on dump.
-                    self.purge_queue.queue_purge_job(volname)
+                    self.purge_queue.queue_job(volname)
         except VolumeException as ve:
             if not (ve.errno == -errno.ENOENT and force):
                 ret = self.volume_exception_to_retval(ve)
@@ -311,7 +311,7 @@ class VolumeClient(object):
             except VolumeException as ve:
                 try:
                     target_subvolume.remove()
-                    self.purge_queue.queue_purge_job(volname)
+                    self.purge_queue.queue_job(volname)
                 except Exception as e:
                     log.warn("failed to cleanup clone subvolume '{0}' ({1})".format(target_subvolname, e))
                 raise ve