]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/pybind/volumes: avoid acquiring lock for thread count updates 41394/head
authorPatrick Donnelly <pdonnell@redhat.com>
Mon, 22 Mar 2021 16:17:43 +0000 (09:17 -0700)
committerVenky Shankar <vshankar@redhat.com>
Wed, 26 May 2021 13:43:41 +0000 (09:43 -0400)
Perform thread count updates in a dedicated tick thread. This avoids the
mgr Finisher thread from getting potentially hung via a mutex deadlock
in the cloner thread management.

Fixes: https://tracker.ceph.com/issues/49605
Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
(cherry picked from commit b27ddfaed4a3c66bac2343c8315a1fe542edb63e)

src/pybind/mgr/volumes/fs/async_cloner.py
src/pybind/mgr/volumes/fs/async_job.py
src/pybind/mgr/volumes/fs/volume.py

index c42808582eec2fa10d8619d5387a11147b3491ec..b3c5780dedab682fec542231cba3eb34cd5a2cc6 100644 (file)
@@ -276,7 +276,7 @@ class Cloner(AsyncJobs):
         super(Cloner, self).__init__(volume_client, "cloner", tp_size)
 
     def reconfigure_max_concurrent_clones(self, tp_size):
-        super(Cloner, self).reconfigure_max_concurrent_clones("cloner", tp_size)
+        return super(Cloner, self).reconfigure_max_async_threads(tp_size)
 
     def is_clone_cancelable(self, clone_state):
         return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
index 954e89c4f54ca6f089bf1243e27ccc92dc6f4504..f8b7439a0bfb0946a2092fe43823e91f607f60b5 100644 (file)
@@ -83,7 +83,7 @@ class JobThread(threading.Thread):
     def reset_cancel(self):
         self.cancel_event.clear()
 
-class AsyncJobs(object):
+class AsyncJobs(threading.Thread):
     """
     Class providing asynchronous execution of jobs via worker threads.
     `jobs` are grouped by `volume`, so a `volume` can have N number of
@@ -103,6 +103,7 @@ class AsyncJobs(object):
     """
 
     def __init__(self, volume_client, name_pfx, nr_concurrent_jobs):
+        threading.Thread.__init__(self, name="{0}.tick".format(name_pfx))
         self.vc = volume_client
         # queue of volumes for starting async jobs
         self.q = deque()
@@ -113,28 +114,46 @@ class AsyncJobs(object):
         self.cv = threading.Condition(self.lock)
         # cv for job cancelation
         self.waiting = False
+        self.stopping = threading.Event()
         self.cancel_cv = threading.Condition(self.lock)
         self.nr_concurrent_jobs = nr_concurrent_jobs
+        self.name_pfx = name_pfx
 
         self.threads = []
-        for i in range(nr_concurrent_jobs):
-            self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i)))
+        for i in range(self.nr_concurrent_jobs):
+            self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(self.name_pfx, i)))
             self.threads[-1].start()
+        self.start()
 
-    def reconfigure_max_concurrent_clones(self, name_pfx, nr_concurrent_jobs):
+    def run(self):
+        log.debug("tick thread {} starting".format(self.name))
+        with self.lock:
+            while not self.stopping.is_set():
+                c = len(self.threads)
+                if c > self.nr_concurrent_jobs:
+                    # Decrease concurrency: notify threads which are waiting for a job to terminate.
+                    log.debug("waking threads to terminate due to job reduction")
+                    self.cv.notifyAll()
+                elif c < self.nr_concurrent_jobs:
+                    # Increase concurrency: create more threads.
+                    log.debug("creating new threads to job increase")
+                    for i in range(c, self.nr_concurrent_jobs):
+                        self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(self.name_pfx, time.time(), i)))
+                        self.threads[-1].start()
+                self.cv.wait(timeout=5)
+
+    def shutdown(self):
+        self.stopping.set()
+        self.cancel_all_jobs()
+        with self.lock:
+            self.cv.notifyAll()
+        self.join()
+
+    def reconfigure_max_async_threads(self, nr_concurrent_jobs):
         """
         reconfigure number of cloner threads
         """
-        with self.lock:
-            self.nr_concurrent_jobs = nr_concurrent_jobs
-            # Decrease in concurrency. Notify threads which are waiting for a job to terminate.
-            if len(self.threads) > nr_concurrent_jobs:
-                self.cv.notifyAll()
-            # Increase in concurrency
-            if len(self.threads) < nr_concurrent_jobs:
-                for i in range(len(self.threads), nr_concurrent_jobs):
-                    self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(name_pfx, time.time(), i)))
-                    self.threads[-1].start()
+        self.nr_concurrent_jobs = nr_concurrent_jobs
 
     def get_job(self):
         log.debug("processing {0} volume entries".format(len(self.q)))
index ab2875396a7958a3d82dcc740910d36b44bfe22d..6a1d0d22155e49cc8844fb513d583c2ccf33e0cb 100644 (file)
@@ -65,9 +65,11 @@ class VolumeClient(object):
         log.info("shutting down")
         # first, note that we're shutting down
         self.stopping.set()
-        # second, ask purge threads to quit
-        self.purge_queue.cancel_all_jobs()
-        # third, delete all libcephfs handles from connection pool
+        # stop clones
+        self.cloner.shutdown()
+        # stop purge threads
+        self.purge_queue.shutdown()
+        # last, delete all libcephfs handles from connection pool
         self.connection_pool.del_all_handles()
 
     def cluster_log(self, msg, lvl=None):