From f6f4f1e03355dcfba681ea08ad29be9918a6ee91 Mon Sep 17 00:00:00 2001 From: Kotresh HR Date: Tue, 4 Aug 2020 13:27:53 +0530 Subject: [PATCH] mgr/volumes: Make number of cloner threads configurable The number of cloner threads is set to 4 and it can't be configured. This patch makes the number of cloner threads configurable via the mgr config option "max_concurrent_clones". On an increase in number of cloner threads, it will just spawn the difference of threads between existing number of cloner threads and the new configuration. It will not cancel the running cloner threads. On decrease in number of cloner threads, the cases are as follows. 1. If all cloner threads are waiting for the job: In this case, all threads are notified and required number threads are terminated. 2. If all the cloner threads are processing a job: In this case, the condition is validated for each thread after the current job is finished and the thread is termianted if the condition for required number of cloner threads is not satisified. 3. If few cloner threads are processing and others are waiting: The threads which are waiting are notified to validate the number of threads required. If terminating those doesn't satisfy the required number of threads, the remaining threads are terminated upon completion of existing job. Fixes: https://tracker.ceph.com/issues/46892 Signed-off-by: Kotresh HR (cherry picked from commit 83c4442c765bb64f9d4fbd3edcb4967dfa21cafe) Conflicts: src/pybind/mgr/volumes/fs/volume.py - keep connection_pool because octopus does not introduce the CephfsClient --- doc/cephfs/fs-volumes.rst | 4 +++ qa/tasks/cephfs/test_volumes.py | 19 +++++++++++ src/pybind/mgr/volumes/fs/async_cloner.py | 3 ++ src/pybind/mgr/volumes/fs/async_job.py | 27 ++++++++++++++++ src/pybind/mgr/volumes/fs/volume.py | 3 +- src/pybind/mgr/volumes/module.py | 39 +++++++++++++++++++++-- 6 files changed, 90 insertions(+), 5 deletions(-) diff --git a/doc/cephfs/fs-volumes.rst b/doc/cephfs/fs-volumes.rst index 4efe26d8b6422..15360231e3902 100644 --- a/doc/cephfs/fs-volumes.rst +++ b/doc/cephfs/fs-volumes.rst @@ -251,6 +251,10 @@ Similar to specifying a pool layout when creating a subvolume, pool layout can b $ ceph fs subvolume snapshot clone --pool_layout +Configure maximum number of concurrent clones. The default is set to 4:: + + $ ceph config set mgr mgr/volumes/max_concurrent_clones + To check the status of a clone operation use:: $ ceph fs clone status [--group_name ] diff --git a/qa/tasks/cephfs/test_volumes.py b/qa/tasks/cephfs/test_volumes.py index 88fbe04cd278f..7984cea9205c1 100644 --- a/qa/tasks/cephfs/test_volumes.py +++ b/qa/tasks/cephfs/test_volumes.py @@ -1728,6 +1728,25 @@ class TestVolumes(CephFSTestCase): # verify trash dir is clean self._wait_for_trash_empty() + def test_subvolume_snapshot_reconf_max_concurrent_clones(self): + """ + Validate 'max_concurrent_clones' config option + """ + + # get the default number of cloner threads + default_max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones')) + self.assertEqual(default_max_concurrent_clones, 4) + + # Increase number of cloner threads + self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 6) + max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones')) + self.assertEqual(max_concurrent_clones, 6) + + # Decrease number of cloner threads + self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2) + max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones')) + self.assertEqual(max_concurrent_clones, 2) + def test_subvolume_snapshot_clone_pool_layout(self): subvolume = self._generate_random_subvolume_name() snapshot = self._generate_random_snapshot_name() diff --git a/src/pybind/mgr/volumes/fs/async_cloner.py b/src/pybind/mgr/volumes/fs/async_cloner.py index 7cca7da8ec21d..e86abfe0284b5 100644 --- a/src/pybind/mgr/volumes/fs/async_cloner.py +++ b/src/pybind/mgr/volumes/fs/async_cloner.py @@ -241,6 +241,9 @@ class Cloner(AsyncJobs): } super(Cloner, self).__init__(volume_client, "cloner", tp_size) + def reconfigure_max_concurrent_clones(self, tp_size): + super(Cloner, self).reconfigure_max_concurrent_clones("cloner", tp_size) + def is_clone_cancelable(self, clone_state): return not (OpSm.is_final_state(clone_state) or OpSm.is_failed_state(clone_state)) diff --git a/src/pybind/mgr/volumes/fs/async_job.py b/src/pybind/mgr/volumes/fs/async_job.py index 3bdedb723b9ce..fb7051f47c242 100644 --- a/src/pybind/mgr/volumes/fs/async_job.py +++ b/src/pybind/mgr/volumes/fs/async_job.py @@ -27,6 +27,7 @@ class JobThread(threading.Thread): thread_id = threading.currentThread() assert isinstance(thread_id, JobThread) thread_name = thread_id.getName() + log.debug("thread [{0}] starting".format(thread_name)) while retries < JobThread.MAX_RETRIES_ON_EXCEPTION: vol_job = None @@ -34,6 +35,10 @@ class JobThread(threading.Thread): # fetch next job to execute with self.async_job.lock: while True: + if self.should_reconfigure_num_threads(): + log.info("thread [{0}] terminating due to reconfigure".format(thread_name)) + self.async_job.threads.remove(self) + return vol_job = self.async_job.get_job() if vol_job: break @@ -62,6 +67,12 @@ class JobThread(threading.Thread): time.sleep(1) log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name)) self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name)) + with self.async_job.lock: + self.async_job.threads.remove(self) + + def should_reconfigure_num_threads(self): + # reconfigure of max_concurrent_clones + return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs def cancel_job(self): self.cancel_event.set() @@ -103,12 +114,28 @@ class AsyncJobs(object): # cv for job cancelation self.waiting = False self.cancel_cv = threading.Condition(self.lock) + self.nr_concurrent_jobs = nr_concurrent_jobs self.threads = [] for i in range(nr_concurrent_jobs): self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i))) self.threads[-1].start() + def reconfigure_max_concurrent_clones(self, name_pfx, nr_concurrent_jobs): + """ + reconfigure number of cloner threads + """ + with self.lock: + self.nr_concurrent_jobs = nr_concurrent_jobs + # Decrease in concurrency. Notify threads which are waiting for a job to terminate. + if len(self.threads) > nr_concurrent_jobs: + self.cv.notifyAll() + # Increase in concurrency + if len(self.threads) < nr_concurrent_jobs: + for i in range(len(self.threads), nr_concurrent_jobs): + self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(name_pfx, time.time(), i))) + self.threads[-1].start() + def get_job(self): log.debug("processing {0} volume entries".format(len(self.q))) nr_vols = len(self.q) diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index 8edaca6def129..10197dd311145 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -42,8 +42,7 @@ class VolumeClient(object): # volume specification self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir')) self.connection_pool = ConnectionPool(self.mgr) - # TODO: make thread pool size configurable - self.cloner = Cloner(self, 4) + self.cloner = Cloner(self, self.mgr.max_concurrent_clones) self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4) # on startup, queue purge job for available volumes to kickstart # purge for leftover subvolume entries in trash. note that, if the diff --git a/src/pybind/mgr/volumes/module.py b/src/pybind/mgr/volumes/module.py index 3d4ddc0f4d78f..9fc2e3043fa08 100644 --- a/src/pybind/mgr/volumes/module.py +++ b/src/pybind/mgr/volumes/module.py @@ -2,6 +2,7 @@ import errno import json import logging import traceback +import threading from mgr_module import MgrModule import orchestrator @@ -371,11 +372,28 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): # volume in the lifetime of this module instance. ] + MODULE_OPTIONS = [ + { + 'name': 'max_concurrent_clones', + 'type': 'int', + 'default': 4, + 'desc': 'Number of asynchronous cloner threads', + } + ] + def __init__(self, *args, **kwargs): + self.inited = False + # for mypy + self.max_concurrent_clones = None + self.lock = threading.Lock() super(Module, self).__init__(*args, **kwargs) - self.vc = VolumeClient(self) - self.fs_export = FSExport(self) - self.nfs = NFSCluster(self) + # Initialize config option members + self.config_notify() + with self.lock: + self.vc = VolumeClient(self) + self.fs_export = FSExport(self) + self.nfs = NFSCluster(self) + self.inited = True def __del__(self): self.vc.shutdown() @@ -383,6 +401,21 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): def shutdown(self): self.vc.shutdown() + def config_notify(self): + """ + This method is called whenever one of our config options is changed. + """ + with self.lock: + for opt in self.MODULE_OPTIONS: + setattr(self, + opt['name'], # type: ignore + self.get_module_option(opt['name'])) # type: ignore + self.log.debug(' mgr option %s = %s', + opt['name'], getattr(self, opt['name'])) # type: ignore + if self.inited: + if opt['name'] == "max_concurrent_clones": + self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones) + def handle_command(self, inbuf, cmd): handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_") try: -- 2.39.5