From: Rishabh Dave Date: Tue, 3 Sep 2024 10:01:07 +0000 (+0530) Subject: mgr/vol: add pause/resume mechanism for async jobs X-Git-Tag: v20.3.0~370^2~6 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=01d37d5e1ba0e250e9d3a5f28ec7f3fa3597c63f;p=ceph.git mgr/vol: add pause/resume mechanism for async jobs Add mechansim that allows pausing/resuming of the entire async job machinery that queues, launches and picks next async job; both async jobs, clones as well as purges. And then add mgr/vol config option pause_purging and pause_cloning so that both of these async jobs can be paused and resumed individually. Fixes: https://tracker.ceph.com/issues/61903 Fixes: https://tracker.ceph.com/issues/68630 Signed-off-by: Rishabh Dave --- diff --git a/src/pybind/mgr/volumes/fs/async_job.py b/src/pybind/mgr/volumes/fs/async_job.py index dc0a2ac8f9775..4d78203b34702 100644 --- a/src/pybind/mgr/volumes/fs/async_job.py +++ b/src/pybind/mgr/volumes/fs/async_job.py @@ -36,6 +36,13 @@ class JobThread(threading.Thread): while retries < JobThread.MAX_RETRIES_ON_EXCEPTION: vol_job = None try: + if not self.async_job.run_event.is_set(): + log.debug('will wait for run_event to bet set. postponing ' + 'getting jobs until then') + self.async_job.run_event.wait() + log.debug('run_event has been set, waiting is complete. ' + 'proceeding to get jobs now') + # fetch next job to execute with lock_timeout_log(self.async_job.lock): while True: @@ -139,6 +146,11 @@ class AsyncJobs(threading.Thread): self.stopping = threading.Event() self.cancel_cv = threading.Condition(self.lock) + + self.run_event = threading.Event() + # let async threads run by default + self.run_event.set() + self.nr_concurrent_jobs = nr_concurrent_jobs self.name_pfx = name_pfx # each async job group uses its own libcephfs connection (pool) @@ -196,6 +208,47 @@ class AsyncJobs(threading.Thread): self.spawn_more_threads() self.cv.wait(timeout=self.wakeup_timeout) + def pause(self): + self.run_event.clear() + + log.debug('pause() cancelling ongoing jobs now and respective worker ' + 'threads...') + self.cancel_all_jobs(update_queue=False) + + # XXX: cancel_all_jobs() sets jobthread.cancel_event causing all ongoing + # jobs to cancel. But if there are no jobs (that is self.q is empty), + # cancel_all_jobs() will return without doing anything and + # jobthread.cancel_event won't be set. This results in future jobs to be + # executed even when config option to pause is already set. Similarly, + # when there's only 1 ongoing job, jobthread.cancel_event is set for it + # but not for other threads causing rest of threads to pick new jobs + # when they are queued. + # Therefore, set jobthread.cancel_event explicitly. + log.debug('pause() pausing rest of worker threads') + for t in self.threads: + # is_set(), although technically redundant, is called to emphasize + # that cancel_event might be set on some threads but might not be + # on others. this prevents removal of the call to set() below after + # the incomplete observation that cancel_event is already set on + # (some) threads. + if not t.cancel_event.is_set(): + t.cancel_event.set() + log.debug('pause() all jobs cancelled and cancel_event have been set for ' + 'all threads, queue and threads have been paused') + + def resume(self): + if self.run_event.is_set(): + log.debug('resume() no need to resume, run_event is already set.') + return + + log.debug('resume() enabling worker threads') + for t in self.threads: + t.cancel_event.clear() + + self.run_event.set() + log.debug('resume() run_event has been set, queue and threads have been' + ' resumed') + def shutdown(self): self.stopping.set() self.cancel_all_jobs() @@ -240,8 +293,10 @@ class AsyncJobs(threading.Thread): nr_vols -= 1 for vol in to_remove: log.debug("auto removing volume '{0}' from tracked volumes".format(vol)) - self.q.remove(vol) - self.jobs.pop(vol) + if vol in self.q: + self.q.remove(vol) + if vol in self.jobs: + self.jobs.pop(vol) return next_job def register_async_job(self, volname, job, thread_id): @@ -253,7 +308,10 @@ class AsyncJobs(threading.Thread): self.jobs[volname].remove((job, thread_id)) cancelled = thread_id.should_cancel() - thread_id.reset_cancel() + # don't clear cancel_event flag if queuing and threads have been paused + # (that is, run_event is not set). + if self.run_event.is_set(): + thread_id.reset_cancel() # wake up cancellation waiters if needed if cancelled: @@ -271,7 +329,7 @@ class AsyncJobs(threading.Thread): self.jobs[volname] = [] self.cv.notifyAll() - def _cancel_jobs(self, volname): + def _cancel_jobs(self, volname, update_queue=True): """ cancel all jobs for the volume. do nothing is the no jobs are executing for the given volume. this would wait until all jobs @@ -281,7 +339,10 @@ class AsyncJobs(threading.Thread): try: if volname not in self.q and volname not in self.jobs: return - self.q.remove(volname) + + if update_queue: + self.q.remove(volname) + # cancel in-progress operation and wait until complete for j in self.jobs[volname]: j[1].cancel_job() @@ -290,7 +351,9 @@ class AsyncJobs(threading.Thread): log.debug("waiting for {0} in-progress jobs for volume '{1}' to " "cancel".format(len(self.jobs[volname]), volname)) self.cancel_cv.wait() - self.jobs.pop(volname) + + if update_queue: + self.jobs.pop(volname) except (KeyError, ValueError): pass @@ -328,13 +391,13 @@ class AsyncJobs(threading.Thread): with lock_timeout_log(self.lock): self._cancel_jobs(volname) - def cancel_all_jobs(self): + def cancel_all_jobs(self, update_queue=True): """ call all executing jobs for all volumes. """ with lock_timeout_log(self.lock): for volname in list(self.q): - self._cancel_jobs(volname) + self._cancel_jobs(volname, update_queue=update_queue) def get_next_job(self, volname, running_jobs): """ diff --git a/src/pybind/mgr/volumes/module.py b/src/pybind/mgr/volumes/module.py index e8694709c5306..5de0e28fd42cd 100644 --- a/src/pybind/mgr/volumes/module.py +++ b/src/pybind/mgr/volumes/module.py @@ -596,7 +596,17 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): 'snapshot_clone_no_wait', type='bool', default=True, - desc='Reject subvolume clone request when cloner threads are busy') + desc='Reject subvolume clone request when cloner threads are busy'), + Option( + 'pause_purging', + type='bool', + default=False, + desc='Pause asynchronous subvolume purge threads'), + Option( + 'pause_cloning', + type='bool', + default=False, + desc='Pause asynchronous cloner threads') ] def __init__(self, *args, **kwargs): @@ -606,6 +616,8 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): self.snapshot_clone_delay = None self.periodic_async_work = False self.snapshot_clone_no_wait = None + self.pause_purging = False + self.pause_cloning = False self.lock = threading.Lock() super(Module, self).__init__(*args, **kwargs) # Initialize config option members @@ -642,6 +654,17 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): self.vc.purge_queue.unset_wakeup_timeout() elif opt['name'] == "snapshot_clone_no_wait": self.vc.cloner.reconfigure_reject_clones(self.snapshot_clone_no_wait) + elif opt['name'] == "pause_purging": + if self.pause_purging: + self.vc.purge_queue.pause() + else: + self.vc.purge_queue.resume() + elif opt['name'] == "pause_cloning": + if self.pause_cloning: + self.vc.cloner.pause() + else: + self.vc.cloner.resume() + def handle_command(self, inbuf, cmd): handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")