]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/volumes: periodically check for async work
authorVenky Shankar <vshankar@redhat.com>
Wed, 9 Aug 2023 03:56:43 +0000 (09:26 +0530)
committerVenky Shankar <vshankar@redhat.com>
Mon, 28 Aug 2023 06:17:35 +0000 (02:17 -0400)
Useful when "manually" assigning work - with this fix there
is not need to bounce ceph-mgr to kick async threads.

Fixes: http://tracker.ceph.com/issues/61867
Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/pybind/mgr/volumes/fs/async_job.py
src/pybind/mgr/volumes/module.py

index a91764e76fe247607b02c03bf05892afe296fec2..cd046407777130701bb5e16920779134b020076a 100644 (file)
@@ -6,6 +6,7 @@ import traceback
 from collections import deque
 from mgr_util import lock_timeout_log, CephfsClient
 
+from .operations.volume import list_volumes
 from .exception import NotImplementedException
 
 log = logging.getLogger(__name__)
@@ -41,10 +42,17 @@ class JobThread(threading.Thread):
                             log.info("thread [{0}] terminating due to reconfigure".format(thread_name))
                             self.async_job.threads.remove(self)
                             return
+                        timo = self.async_job.wakeup_timeout
+                        if timo is not None:
+                            vols = [e['name'] for e in list_volumes(self.vc.mgr)]
+                            missing = set(vols) - set(self.async_job.q)
+                            for m in missing:
+                                self.async_job.jobs[m] = []
+                                self.async_job.q.append(m)
                         vol_job = self.async_job.get_job()
                         if vol_job:
                             break
-                        self.async_job.cv.wait()
+                        self.async_job.cv.wait(timeout=timo)
                     self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id)
 
                 # execute the job (outside lock)
@@ -85,7 +93,6 @@ class JobThread(threading.Thread):
     def reset_cancel(self):
         self.cancel_event.clear()
 
-
 class AsyncJobs(threading.Thread):
     """
     Class providing asynchronous execution of jobs via worker threads.
@@ -105,6 +112,9 @@ class AsyncJobs(threading.Thread):
     via `should_cancel()` lambda passed to `execute_job()`.
     """
 
+    # not made configurable on purpose
+    WAKEUP_TIMEOUT = 5.0
+
     def __init__(self, volume_client, name_pfx, nr_concurrent_jobs):
         threading.Thread.__init__(self, name="{0}.tick".format(name_pfx))
         self.vc = volume_client
@@ -123,6 +133,7 @@ class AsyncJobs(threading.Thread):
         self.name_pfx = name_pfx
         # each async job group uses its own libcephfs connection (pool)
         self.fs_client = CephfsClient(self.vc.mgr)
+        self.wakeup_timeout = None
 
         self.threads = []
         for i in range(self.nr_concurrent_jobs):
@@ -130,6 +141,17 @@ class AsyncJobs(threading.Thread):
             self.threads[-1].start()
         self.start()
 
+    def set_wakeup_timeout(self):
+        with self.lock:
+            # not made configurable on purpose
+            self.wakeup_timeout = AsyncJobs.WAKEUP_TIMEOUT
+            self.cv.notifyAll()
+
+    def unset_wakeup_timeout(self):
+        with self.lock:
+            self.wakeup_timeout = None
+            self.cv.notifyAll()
+
     def run(self):
         log.debug("tick thread {} starting".format(self.name))
         with lock_timeout_log(self.lock):
index b9c8e7893435934318aee3f8180f424bce813947..ff7256eebfd34ec16df7ea646f2da69d6584cf11 100644 (file)
@@ -484,7 +484,12 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
             'snapshot_clone_delay',
             type='int',
             default=0,
-            desc='Delay clone begin operation by snapshot_clone_delay seconds')
+            desc='Delay clone begin operation by snapshot_clone_delay seconds'),
+        Option(
+            'periodic_async_work',
+            type='bool',
+            default=False,
+            desc='Periodically check for async work')
     ]
 
     def __init__(self, *args, **kwargs):
@@ -492,6 +497,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
         # for mypy
         self.max_concurrent_clones = None
         self.snapshot_clone_delay = None
+        self.periodic_async_work = False
         self.lock = threading.Lock()
         super(Module, self).__init__(*args, **kwargs)
         # Initialize config option members
@@ -522,6 +528,13 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
                         self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones)
                     elif opt['name'] == "snapshot_clone_delay":
                         self.vc.cloner.reconfigure_snapshot_clone_delay(self.snapshot_clone_delay)
+                    elif opt['name'] == "periodic_async_work":
+                        if self.periodic_async_work:
+                            self.vc.cloner.set_wakeup_timeout()
+                            self.vc.purge_queue.set_wakeup_timeout()
+                        else:
+                            self.vc.cloner.unset_wakeup_timeout()
+                            self.vc.purge_queue.unset_wakeup_timeout()
 
     def handle_command(self, inbuf, cmd):
         handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")