From: Kotresh HR Date: Tue, 4 Aug 2020 07:57:53 +0000 (+0530) Subject: mgr/volumes: Make number of cloner threads configurable X-Git-Tag: v14.2.17~72^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=89f4f4255c1cef018eb613f22bf84f9f160ee732;p=ceph.git mgr/volumes: Make number of cloner threads configurable The number of cloner threads is set to 4 and it can't be configured. This patch makes the number of cloner threads configurable via the mgr config option "max_concurrent_clones". On an increase in number of cloner threads, it will just spawn the difference of threads between existing number of cloner threads and the new configuration. It will not cancel the running cloner threads. On decrease in number of cloner threads, the cases are as follows. 1. If all cloner threads are waiting for the job: In this case, all threads are notified and required number threads are terminated. 2. If all the cloner threads are processing a job: In this case, the condition is validated for each thread after the current job is finished and the thread is termianted if the condition for required number of cloner threads is not satisified. 3. If few cloner threads are processing and others are waiting: The threads which are waiting are notified to validate the number of threads required. If terminating those doesn't satisfy the required number of threads, the remaining threads are terminated upon completion of existing job. Fixes: https://tracker.ceph.com/issues/46892 Signed-off-by: Kotresh HR (cherry picked from commit 83c4442c765bb64f9d4fbd3edcb4967dfa21cafe) Conflicts: src/pybind/mgr/volumes/fs/volume.py - keep connection_pool because nautilus does not introduce the CephfsClient src/pybind/mgr/volumes/module.py - Remove nfs export related things as nautilus doesn't contains NFSExport, FSExport class qa/tasks/cephfs/test_volumes.py - nautilus expects particular 'mgr.id', hence used 'mgr.x' instead of 'mgr' --- diff --git a/doc/cephfs/fs-volumes.rst b/doc/cephfs/fs-volumes.rst index 65f20ccd49781..efc2053caefa4 100644 --- a/doc/cephfs/fs-volumes.rst +++ b/doc/cephfs/fs-volumes.rst @@ -272,6 +272,10 @@ Similar to specifying a pool layout when creating a subvolume, pool layout can b $ ceph fs subvolume snapshot clone --pool_layout +Configure maximum number of concurrent clones. The default is set to 4:: + + $ ceph config set mgr mgr/volumes/max_concurrent_clones + To check the status of a clone operation use:: $ ceph fs clone status [--group_name ] diff --git a/qa/tasks/cephfs/test_volumes.py b/qa/tasks/cephfs/test_volumes.py index ec97848f8a5e9..cf2ba318b3e5a 100644 --- a/qa/tasks/cephfs/test_volumes.py +++ b/qa/tasks/cephfs/test_volumes.py @@ -2692,6 +2692,25 @@ class TestVolumes(CephFSTestCase): # verify trash dir is clean self._wait_for_trash_empty() + def test_subvolume_snapshot_reconf_max_concurrent_clones(self): + """ + Validate 'max_concurrent_clones' config option + """ + + # get the default number of cloner threads + default_max_concurrent_clones = int(self.config_get('mgr.x', 'mgr/volumes/max_concurrent_clones')) + self.assertEqual(default_max_concurrent_clones, 4) + + # Increase number of cloner threads + self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 6) + max_concurrent_clones = int(self.config_get('mgr.x', 'mgr/volumes/max_concurrent_clones')) + self.assertEqual(max_concurrent_clones, 6) + + # Decrease number of cloner threads + self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2) + max_concurrent_clones = int(self.config_get('mgr.x', 'mgr/volumes/max_concurrent_clones')) + self.assertEqual(max_concurrent_clones, 2) + def test_subvolume_snapshot_clone_pool_layout(self): subvolume = self._generate_random_subvolume_name() snapshot = self._generate_random_snapshot_name() diff --git a/src/pybind/mgr/volumes/fs/async_cloner.py b/src/pybind/mgr/volumes/fs/async_cloner.py index 7b6bdcb8c28ff..c66eb712a427c 100644 --- a/src/pybind/mgr/volumes/fs/async_cloner.py +++ b/src/pybind/mgr/volumes/fs/async_cloner.py @@ -274,6 +274,9 @@ class Cloner(AsyncJobs): } super(Cloner, self).__init__(volume_client, "cloner", tp_size) + def reconfigure_max_concurrent_clones(self, tp_size): + super(Cloner, self).reconfigure_max_concurrent_clones("cloner", tp_size) + def is_clone_cancelable(self, clone_state): return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state)) diff --git a/src/pybind/mgr/volumes/fs/async_job.py b/src/pybind/mgr/volumes/fs/async_job.py index b64590e587871..954e89c4f54ca 100644 --- a/src/pybind/mgr/volumes/fs/async_job.py +++ b/src/pybind/mgr/volumes/fs/async_job.py @@ -27,6 +27,7 @@ class JobThread(threading.Thread): thread_id = threading.currentThread() assert isinstance(thread_id, JobThread) thread_name = thread_id.getName() + log.debug("thread [{0}] starting".format(thread_name)) while retries < JobThread.MAX_RETRIES_ON_EXCEPTION: vol_job = None @@ -34,6 +35,10 @@ class JobThread(threading.Thread): # fetch next job to execute with self.async_job.lock: while True: + if self.should_reconfigure_num_threads(): + log.info("thread [{0}] terminating due to reconfigure".format(thread_name)) + self.async_job.threads.remove(self) + return vol_job = self.async_job.get_job() if vol_job: break @@ -62,6 +67,12 @@ class JobThread(threading.Thread): time.sleep(1) log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name)) self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name)) + with self.async_job.lock: + self.async_job.threads.remove(self) + + def should_reconfigure_num_threads(self): + # reconfigure of max_concurrent_clones + return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs def cancel_job(self): self.cancel_event.set() @@ -103,12 +114,28 @@ class AsyncJobs(object): # cv for job cancelation self.waiting = False self.cancel_cv = threading.Condition(self.lock) + self.nr_concurrent_jobs = nr_concurrent_jobs self.threads = [] for i in range(nr_concurrent_jobs): self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i))) self.threads[-1].start() + def reconfigure_max_concurrent_clones(self, name_pfx, nr_concurrent_jobs): + """ + reconfigure number of cloner threads + """ + with self.lock: + self.nr_concurrent_jobs = nr_concurrent_jobs + # Decrease in concurrency. Notify threads which are waiting for a job to terminate. + if len(self.threads) > nr_concurrent_jobs: + self.cv.notifyAll() + # Increase in concurrency + if len(self.threads) < nr_concurrent_jobs: + for i in range(len(self.threads), nr_concurrent_jobs): + self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(name_pfx, time.time(), i))) + self.threads[-1].start() + def get_job(self): log.debug("processing {0} volume entries".format(len(self.q))) nr_vols = len(self.q) diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index 2477c83232529..28f7b7c561b60 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -43,8 +43,7 @@ class VolumeClient(object): # volume specification self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir')) self.connection_pool = ConnectionPool(self.mgr) - # TODO: make thread pool size configurable - self.cloner = Cloner(self, 4) + self.cloner = Cloner(self, self.mgr.max_concurrent_clones) self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4) # on startup, queue purge job for available volumes to kickstart # purge for leftover subvolume entries in trash. note that, if the diff --git a/src/pybind/mgr/volumes/module.py b/src/pybind/mgr/volumes/module.py index ae188d531ed57..18feb3267354c 100644 --- a/src/pybind/mgr/volumes/module.py +++ b/src/pybind/mgr/volumes/module.py @@ -2,6 +2,7 @@ import errno import json import logging import traceback +import threading from mgr_module import MgrModule import orchestrator @@ -278,9 +279,26 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): # volume in the lifetime of this module instance. ] + MODULE_OPTIONS = [ + { + 'name': 'max_concurrent_clones', + 'type': 'int', + 'default': 4, + 'desc': 'Number of asynchronous cloner threads', + } + ] + def __init__(self, *args, **kwargs): + self.inited = False + # for mypy + self.max_concurrent_clones = None + self.lock = threading.Lock() super(Module, self).__init__(*args, **kwargs) - self.vc = VolumeClient(self) + # Initialize config option members + self.config_notify() + with self.lock: + self.vc = VolumeClient(self) + self.inited = True def __del__(self): self.vc.shutdown() @@ -288,6 +306,21 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): def shutdown(self): self.vc.shutdown() + def config_notify(self): + """ + This method is called whenever one of our config options is changed. + """ + with self.lock: + for opt in self.MODULE_OPTIONS: + setattr(self, + opt['name'], # type: ignore + self.get_module_option(opt['name'])) # type: ignore + self.log.debug(' mgr option %s = %s', + opt['name'], getattr(self, opt['name'])) # type: ignore + if self.inited: + if opt['name'] == "max_concurrent_clones": + self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones) + def handle_command(self, inbuf, cmd): handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_") try: