From eb6200f30921ca08c9ae2c372f715c97e9d65aa1 Mon Sep 17 00:00:00 2001 From: Venky Shankar Date: Wed, 9 Aug 2023 09:26:43 +0530 Subject: [PATCH] mgr/volumes: periodically check for async work Useful when "manually" assigning work - with this fix there is not need to bounce ceph-mgr to kick async threads. Fixes: http://tracker.ceph.com/issues/61867 Signed-off-by: Venky Shankar --- src/pybind/mgr/volumes/fs/async_job.py | 26 ++++++++++++++++++++++++-- src/pybind/mgr/volumes/module.py | 15 ++++++++++++++- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/src/pybind/mgr/volumes/fs/async_job.py b/src/pybind/mgr/volumes/fs/async_job.py index a91764e76fe..cd046407777 100644 --- a/src/pybind/mgr/volumes/fs/async_job.py +++ b/src/pybind/mgr/volumes/fs/async_job.py @@ -6,6 +6,7 @@ import traceback from collections import deque from mgr_util import lock_timeout_log, CephfsClient +from .operations.volume import list_volumes from .exception import NotImplementedException log = logging.getLogger(__name__) @@ -41,10 +42,17 @@ class JobThread(threading.Thread): log.info("thread [{0}] terminating due to reconfigure".format(thread_name)) self.async_job.threads.remove(self) return + timo = self.async_job.wakeup_timeout + if timo is not None: + vols = [e['name'] for e in list_volumes(self.vc.mgr)] + missing = set(vols) - set(self.async_job.q) + for m in missing: + self.async_job.jobs[m] = [] + self.async_job.q.append(m) vol_job = self.async_job.get_job() if vol_job: break - self.async_job.cv.wait() + self.async_job.cv.wait(timeout=timo) self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id) # execute the job (outside lock) @@ -85,7 +93,6 @@ class JobThread(threading.Thread): def reset_cancel(self): self.cancel_event.clear() - class AsyncJobs(threading.Thread): """ Class providing asynchronous execution of jobs via worker threads. @@ -105,6 +112,9 @@ class AsyncJobs(threading.Thread): via `should_cancel()` lambda passed to `execute_job()`. """ + # not made configurable on purpose + WAKEUP_TIMEOUT = 5.0 + def __init__(self, volume_client, name_pfx, nr_concurrent_jobs): threading.Thread.__init__(self, name="{0}.tick".format(name_pfx)) self.vc = volume_client @@ -123,6 +133,7 @@ class AsyncJobs(threading.Thread): self.name_pfx = name_pfx # each async job group uses its own libcephfs connection (pool) self.fs_client = CephfsClient(self.vc.mgr) + self.wakeup_timeout = None self.threads = [] for i in range(self.nr_concurrent_jobs): @@ -130,6 +141,17 @@ class AsyncJobs(threading.Thread): self.threads[-1].start() self.start() + def set_wakeup_timeout(self): + with self.lock: + # not made configurable on purpose + self.wakeup_timeout = AsyncJobs.WAKEUP_TIMEOUT + self.cv.notifyAll() + + def unset_wakeup_timeout(self): + with self.lock: + self.wakeup_timeout = None + self.cv.notifyAll() + def run(self): log.debug("tick thread {} starting".format(self.name)) with lock_timeout_log(self.lock): diff --git a/src/pybind/mgr/volumes/module.py b/src/pybind/mgr/volumes/module.py index b9c8e789343..ff7256eebfd 100644 --- a/src/pybind/mgr/volumes/module.py +++ b/src/pybind/mgr/volumes/module.py @@ -484,7 +484,12 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): 'snapshot_clone_delay', type='int', default=0, - desc='Delay clone begin operation by snapshot_clone_delay seconds') + desc='Delay clone begin operation by snapshot_clone_delay seconds'), + Option( + 'periodic_async_work', + type='bool', + default=False, + desc='Periodically check for async work') ] def __init__(self, *args, **kwargs): @@ -492,6 +497,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): # for mypy self.max_concurrent_clones = None self.snapshot_clone_delay = None + self.periodic_async_work = False self.lock = threading.Lock() super(Module, self).__init__(*args, **kwargs) # Initialize config option members @@ -522,6 +528,13 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones) elif opt['name'] == "snapshot_clone_delay": self.vc.cloner.reconfigure_snapshot_clone_delay(self.snapshot_clone_delay) + elif opt['name'] == "periodic_async_work": + if self.periodic_async_work: + self.vc.cloner.set_wakeup_timeout() + self.vc.purge_queue.set_wakeup_timeout() + else: + self.vc.cloner.unset_wakeup_timeout() + self.vc.purge_queue.unset_wakeup_timeout() def handle_command(self, inbuf, cmd): handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_") -- 2.39.5