From: Venky Shankar Date: Wed, 9 Aug 2023 03:56:43 +0000 (+0530) Subject: mgr/volumes: periodically check for async work X-Git-Tag: v18.2.8~10^2~20^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f70cfd053992b7b36eac4d141a38180c5e9cf7cf;p=ceph.git mgr/volumes: periodically check for async work Useful when "manually" assigning work - with this fix there is not need to bounce ceph-mgr to kick async threads. Fixes: http://tracker.ceph.com/issues/61867 Signed-off-by: Venky Shankar (cherry picked from commit eb6200f30921ca08c9ae2c372f715c97e9d65aa1) Conflicts: src/pybind/mgr/volumes/module.py Adjust for an added config `snapshot_clone_no_wait`. --- diff --git a/src/pybind/mgr/volumes/fs/async_job.py b/src/pybind/mgr/volumes/fs/async_job.py index f1d998c85b8f..8f2afd056f5f 100644 --- a/src/pybind/mgr/volumes/fs/async_job.py +++ b/src/pybind/mgr/volumes/fs/async_job.py @@ -6,6 +6,7 @@ import traceback from collections import deque from mgr_util import lock_timeout_log, CephfsClient +from .operations.volume import list_volumes from .exception import NotImplementedException log = logging.getLogger(__name__) @@ -41,10 +42,17 @@ class JobThread(threading.Thread): log.info("thread [{0}] terminating due to reconfigure".format(thread_name)) self.async_job.threads.remove(self) return + timo = self.async_job.wakeup_timeout + if timo is not None: + vols = [e['name'] for e in list_volumes(self.vc.mgr)] + missing = set(vols) - set(self.async_job.q) + for m in missing: + self.async_job.jobs[m] = [] + self.async_job.q.append(m) vol_job = self.async_job.get_job() if vol_job: break - self.async_job.cv.wait() + self.async_job.cv.wait(timeout=timo) self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id) # execute the job (outside lock) @@ -85,7 +93,6 @@ class JobThread(threading.Thread): def reset_cancel(self): self.cancel_event.clear() - class AsyncJobs(threading.Thread): """ Class providing asynchronous execution of jobs via worker threads. @@ -105,6 +112,9 @@ class AsyncJobs(threading.Thread): via `should_cancel()` lambda passed to `execute_job()`. """ + # not made configurable on purpose + WAKEUP_TIMEOUT = 5.0 + def __init__(self, volume_client, name_pfx, nr_concurrent_jobs): threading.Thread.__init__(self, name="{0}.tick".format(name_pfx)) self.vc = volume_client @@ -123,6 +133,7 @@ class AsyncJobs(threading.Thread): self.name_pfx = name_pfx # each async job group uses its own libcephfs connection (pool) self.fs_client = CephfsClient(self.vc.mgr) + self.wakeup_timeout = None self.threads = [] for i in range(self.nr_concurrent_jobs): @@ -130,6 +141,17 @@ class AsyncJobs(threading.Thread): self.threads[-1].start() self.start() + def set_wakeup_timeout(self): + with self.lock: + # not made configurable on purpose + self.wakeup_timeout = AsyncJobs.WAKEUP_TIMEOUT + self.cv.notifyAll() + + def unset_wakeup_timeout(self): + with self.lock: + self.wakeup_timeout = None + self.cv.notifyAll() + def run(self): log.debug("tick thread {} starting".format(self.name)) with lock_timeout_log(self.lock): diff --git a/src/pybind/mgr/volumes/module.py b/src/pybind/mgr/volumes/module.py index 8a50baaad0c0..1800e1747ca2 100644 --- a/src/pybind/mgr/volumes/module.py +++ b/src/pybind/mgr/volumes/module.py @@ -489,7 +489,12 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): 'snapshot_clone_no_wait', type='bool', default=True, - desc='Reject subvolume clone request when cloner threads are busy') + desc='Reject subvolume clone request when cloner threads are busy'), + Option( + 'periodic_async_work', + type='bool', + default=False, + desc='Periodically check for async work') ] def __init__(self, *args, **kwargs): @@ -498,6 +503,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): self.max_concurrent_clones = None self.snapshot_clone_delay = None self.snapshot_clone_no_wait = None + self.periodic_async_work = False self.lock = threading.Lock() super(Module, self).__init__(*args, **kwargs) # Initialize config option members @@ -530,6 +536,13 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): self.vc.cloner.reconfigure_snapshot_clone_delay(self.snapshot_clone_delay) elif opt['name'] == "snapshot_clone_no_wait": self.vc.cloner.reconfigure_reject_clones(self.snapshot_clone_no_wait) + elif opt['name'] == "periodic_async_work": + if self.periodic_async_work: + self.vc.cloner.set_wakeup_timeout() + self.vc.purge_queue.set_wakeup_timeout() + else: + self.vc.cloner.unset_wakeup_timeout() + self.vc.purge_queue.unset_wakeup_timeout() def handle_command(self, inbuf, cmd): handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")