]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/vol: add pause/resume mechanism for async jobs
authorRishabh Dave <ridave@redhat.com>
Tue, 3 Sep 2024 10:01:07 +0000 (15:31 +0530)
committerRishabh Dave <ridave@redhat.com>
Wed, 12 Mar 2025 18:47:10 +0000 (00:17 +0530)
Add mechansim that allows pausing/resuming of the entire async job
machinery that queues, launches and picks next async job; both async
jobs, clones as well as purges.

And then add mgr/vol config option pause_purging and pause_cloning so
that both of these async jobs can be paused and resumed individually.

Fixes: https://tracker.ceph.com/issues/61903
Fixes: https://tracker.ceph.com/issues/68630
Signed-off-by: Rishabh Dave <ridave@redhat.com>
src/pybind/mgr/volumes/fs/async_job.py
src/pybind/mgr/volumes/module.py

index dc0a2ac8f977584acc8284b7b947c3033cdeecc6..4d78203b3470265d54d798637ec6c486fe2141da 100644 (file)
@@ -36,6 +36,13 @@ class JobThread(threading.Thread):
         while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
             vol_job = None
             try:
+                if not self.async_job.run_event.is_set():
+                    log.debug('will wait for run_event to bet set. postponing '
+                              'getting jobs until then')
+                    self.async_job.run_event.wait()
+                    log.debug('run_event has been set, waiting is complete. '
+                              'proceeding to get jobs now')
+
                 # fetch next job to execute
                 with lock_timeout_log(self.async_job.lock):
                     while True:
@@ -139,6 +146,11 @@ class AsyncJobs(threading.Thread):
         self.stopping = threading.Event()
 
         self.cancel_cv = threading.Condition(self.lock)
+
+        self.run_event = threading.Event()
+        # let async threads run by default
+        self.run_event.set()
+
         self.nr_concurrent_jobs = nr_concurrent_jobs
         self.name_pfx = name_pfx
         # each async job group uses its own libcephfs connection (pool)
@@ -196,6 +208,47 @@ class AsyncJobs(threading.Thread):
                     self.spawn_more_threads()
                 self.cv.wait(timeout=self.wakeup_timeout)
 
+    def pause(self):
+        self.run_event.clear()
+
+        log.debug('pause() cancelling ongoing jobs now and respective worker '
+                  'threads...')
+        self.cancel_all_jobs(update_queue=False)
+
+        # XXX: cancel_all_jobs() sets jobthread.cancel_event causing all ongoing
+        # jobs to cancel. But if there are no jobs (that is self.q is empty),
+        # cancel_all_jobs() will return without doing anything and
+        # jobthread.cancel_event won't be set. This results in future jobs to be
+        # executed even when config option to pause is already set. Similarly,
+        # when there's only 1 ongoing job, jobthread.cancel_event is set for it
+        # but not for other threads causing rest of threads to pick new jobs
+        # when they are queued.
+        # Therefore, set jobthread.cancel_event explicitly.
+        log.debug('pause() pausing rest of worker threads')
+        for t in self.threads:
+            # is_set(), although technically redundant, is called to emphasize
+            # that cancel_event might be set on some threads but might not be
+            # on others. this prevents removal of the call to set() below after
+            # the incomplete observation that cancel_event is already set on
+            # (some) threads.
+            if not t.cancel_event.is_set():
+                t.cancel_event.set()
+        log.debug('pause() all jobs cancelled and cancel_event have been set for '
+                  'all threads, queue and threads have been paused')
+
+    def resume(self):
+        if self.run_event.is_set():
+            log.debug('resume() no need to resume, run_event is already set.')
+            return
+
+        log.debug('resume() enabling worker threads')
+        for t in self.threads:
+            t.cancel_event.clear()
+
+        self.run_event.set()
+        log.debug('resume() run_event has been set, queue and threads have been'
+                  ' resumed')
+
     def shutdown(self):
         self.stopping.set()
         self.cancel_all_jobs()
@@ -240,8 +293,10 @@ class AsyncJobs(threading.Thread):
             nr_vols -= 1
         for vol in to_remove:
             log.debug("auto removing volume '{0}' from tracked volumes".format(vol))
-            self.q.remove(vol)
-            self.jobs.pop(vol)
+            if vol in self.q:
+                self.q.remove(vol)
+            if vol in self.jobs:
+                self.jobs.pop(vol)
         return next_job
 
     def register_async_job(self, volname, job, thread_id):
@@ -253,7 +308,10 @@ class AsyncJobs(threading.Thread):
         self.jobs[volname].remove((job, thread_id))
 
         cancelled = thread_id.should_cancel()
-        thread_id.reset_cancel()
+        # don't clear cancel_event flag if queuing and threads have been paused
+        # (that is, run_event is not set).
+        if self.run_event.is_set():
+            thread_id.reset_cancel()
 
         # wake up cancellation waiters if needed
         if cancelled:
@@ -271,7 +329,7 @@ class AsyncJobs(threading.Thread):
                 self.jobs[volname] = []
             self.cv.notifyAll()
 
-    def _cancel_jobs(self, volname):
+    def _cancel_jobs(self, volname, update_queue=True):
         """
         cancel all jobs for the volume. do nothing is the no jobs are
         executing for the given volume. this would wait until all jobs
@@ -281,7 +339,10 @@ class AsyncJobs(threading.Thread):
         try:
             if volname not in self.q and volname not in self.jobs:
                 return
-            self.q.remove(volname)
+
+            if update_queue:
+                self.q.remove(volname)
+
             # cancel in-progress operation and wait until complete
             for j in self.jobs[volname]:
                 j[1].cancel_job()
@@ -290,7 +351,9 @@ class AsyncJobs(threading.Thread):
                 log.debug("waiting for {0} in-progress jobs for volume '{1}' to "
                           "cancel".format(len(self.jobs[volname]), volname))
                 self.cancel_cv.wait()
-            self.jobs.pop(volname)
+
+            if update_queue:
+                self.jobs.pop(volname)
         except (KeyError, ValueError):
             pass
 
@@ -328,13 +391,13 @@ class AsyncJobs(threading.Thread):
         with lock_timeout_log(self.lock):
             self._cancel_jobs(volname)
 
-    def cancel_all_jobs(self):
+    def cancel_all_jobs(self, update_queue=True):
         """
         call all executing jobs for all volumes.
         """
         with lock_timeout_log(self.lock):
             for volname in list(self.q):
-                self._cancel_jobs(volname)
+                self._cancel_jobs(volname, update_queue=update_queue)
 
     def get_next_job(self, volname, running_jobs):
         """
index e8694709c530668463865052ddf66ce1241d074c..5de0e28fd42cd094971012942878b58a8631228b 100644 (file)
@@ -596,7 +596,17 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
             'snapshot_clone_no_wait',
             type='bool',
             default=True,
-            desc='Reject subvolume clone request when cloner threads are busy')
+            desc='Reject subvolume clone request when cloner threads are busy'),
+        Option(
+            'pause_purging',
+            type='bool',
+            default=False,
+            desc='Pause asynchronous subvolume purge threads'),
+        Option(
+            'pause_cloning',
+            type='bool',
+            default=False,
+            desc='Pause asynchronous cloner threads')
     ]
 
     def __init__(self, *args, **kwargs):
@@ -606,6 +616,8 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
         self.snapshot_clone_delay = None
         self.periodic_async_work = False
         self.snapshot_clone_no_wait = None
+        self.pause_purging = False
+        self.pause_cloning = False
         self.lock = threading.Lock()
         super(Module, self).__init__(*args, **kwargs)
         # Initialize config option members
@@ -642,6 +654,17 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
                             self.vc.purge_queue.unset_wakeup_timeout()
                     elif opt['name'] == "snapshot_clone_no_wait":
                         self.vc.cloner.reconfigure_reject_clones(self.snapshot_clone_no_wait)
+                    elif opt['name'] == "pause_purging":
+                        if self.pause_purging:
+                            self.vc.purge_queue.pause()
+                        else:
+                            self.vc.purge_queue.resume()
+                    elif opt['name'] == "pause_cloning":
+                        if self.pause_cloning:
+                            self.vc.cloner.pause()
+                        else:
+                            self.vc.cloner.resume()
+
 
     def handle_command(self, inbuf, cmd):
         handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")