From: Patrick Donnelly Date: Mon, 22 Mar 2021 16:17:43 +0000 (-0700) Subject: mgr/pybind/volumes: avoid acquiring lock for thread count updates X-Git-Tag: v16.2.2~26^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=fd4637f47528a7bf59ecb9107acc619eb6fa41cc;p=ceph.git mgr/pybind/volumes: avoid acquiring lock for thread count updates Perform thread count updates in a dedicated tick thread. This avoids the mgr Finisher thread from getting potentially hung via a mutex deadlock in the cloner thread management. Fixes: https://tracker.ceph.com/issues/49605 Signed-off-by: Patrick Donnelly (cherry picked from commit b27ddfaed4a3c66bac2343c8315a1fe542edb63e) --- diff --git a/src/pybind/mgr/volumes/fs/async_cloner.py b/src/pybind/mgr/volumes/fs/async_cloner.py index d08b784651293..c042e624c58ca 100644 --- a/src/pybind/mgr/volumes/fs/async_cloner.py +++ b/src/pybind/mgr/volumes/fs/async_cloner.py @@ -276,7 +276,7 @@ class Cloner(AsyncJobs): super(Cloner, self).__init__(volume_client, "cloner", tp_size) def reconfigure_max_concurrent_clones(self, tp_size): - super(Cloner, self).reconfigure_max_concurrent_clones("cloner", tp_size) + return super(Cloner, self).reconfigure_max_async_threads(tp_size) def is_clone_cancelable(self, clone_state): return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state)) diff --git a/src/pybind/mgr/volumes/fs/async_job.py b/src/pybind/mgr/volumes/fs/async_job.py index fb7051f47c242..095ecdd36f8c1 100644 --- a/src/pybind/mgr/volumes/fs/async_job.py +++ b/src/pybind/mgr/volumes/fs/async_job.py @@ -83,7 +83,7 @@ class JobThread(threading.Thread): def reset_cancel(self): self.cancel_event.clear() -class AsyncJobs(object): +class AsyncJobs(threading.Thread): """ Class providing asynchronous execution of jobs via worker threads. `jobs` are grouped by `volume`, so a `volume` can have N number of @@ -103,6 +103,7 @@ class AsyncJobs(object): """ def __init__(self, volume_client, name_pfx, nr_concurrent_jobs): + threading.Thread.__init__(self, name="{0}.tick".format(name_pfx)) self.vc = volume_client # queue of volumes for starting async jobs self.q = deque() # type: deque @@ -113,28 +114,46 @@ class AsyncJobs(object): self.cv = threading.Condition(self.lock) # cv for job cancelation self.waiting = False + self.stopping = threading.Event() self.cancel_cv = threading.Condition(self.lock) self.nr_concurrent_jobs = nr_concurrent_jobs + self.name_pfx = name_pfx self.threads = [] - for i in range(nr_concurrent_jobs): - self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i))) + for i in range(self.nr_concurrent_jobs): + self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(self.name_pfx, i))) self.threads[-1].start() + self.start() - def reconfigure_max_concurrent_clones(self, name_pfx, nr_concurrent_jobs): + def run(self): + log.debug("tick thread {} starting".format(self.name)) + with self.lock: + while not self.stopping.is_set(): + c = len(self.threads) + if c > self.nr_concurrent_jobs: + # Decrease concurrency: notify threads which are waiting for a job to terminate. + log.debug("waking threads to terminate due to job reduction") + self.cv.notifyAll() + elif c < self.nr_concurrent_jobs: + # Increase concurrency: create more threads. + log.debug("creating new threads to job increase") + for i in range(c, self.nr_concurrent_jobs): + self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(self.name_pfx, time.time(), i))) + self.threads[-1].start() + self.cv.wait(timeout=5) + + def shutdown(self): + self.stopping.set() + self.cancel_all_jobs() + with self.lock: + self.cv.notifyAll() + self.join() + + def reconfigure_max_async_threads(self, nr_concurrent_jobs): """ reconfigure number of cloner threads """ - with self.lock: - self.nr_concurrent_jobs = nr_concurrent_jobs - # Decrease in concurrency. Notify threads which are waiting for a job to terminate. - if len(self.threads) > nr_concurrent_jobs: - self.cv.notifyAll() - # Increase in concurrency - if len(self.threads) < nr_concurrent_jobs: - for i in range(len(self.threads), nr_concurrent_jobs): - self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(name_pfx, time.time(), i))) - self.threads[-1].start() + self.nr_concurrent_jobs = nr_concurrent_jobs def get_job(self): log.debug("processing {0} volume entries".format(len(self.q))) diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index dfed54294bf22..ececd16e39ecb 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -68,9 +68,11 @@ class VolumeClient(CephfsClient["Module"]): log.info("shutting down") # first, note that we're shutting down self.stopping.set() - # second, ask purge threads to quit - self.purge_queue.cancel_all_jobs() - # third, delete all libcephfs handles from connection pool + # stop clones + self.cloner.shutdown() + # stop purge threads + self.purge_queue.shutdown() + # last, delete all libcephfs handles from connection pool self.connection_pool.del_all_handles() def cluster_log(self, msg, lvl=None):