]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes: Make number of cloner threads configurable 37936/head
authorKotresh HR <khiremat@redhat.com>
Tue, 4 Aug 2020 07:57:53 +0000 (13:27 +0530)
committerKotresh HR <khiremat@redhat.com>
Thu, 7 Jan 2021 09:23:23 +0000 (14:53 +0530)
The number of cloner threads is set to 4 and it can't be
configured. This patch makes the number of cloner threads
configurable via the mgr config option "max_concurrent_clones".

On an increase in number of cloner threads, it will just
spawn the difference of threads between existing number of
cloner threads and the new configuration. It will not cancel
the running cloner threads.

On decrease in number of cloner threads, the cases are as follows.

1. If all cloner threads are waiting for the job:
      In this case, all threads are notified and required number
   threads are terminated.

2. If all the cloner threads are processing a job:
      In this case, the condition is validated for each thread after
   the current job is finished and the thread is termianted if the
   condition for required number of cloner threads is not satisified.

3. If few cloner threads are processing and others are waiting:
      The threads which are waiting are notified to validate the
   number of threads required. If terminating those doesn't satisfy the
   required number of threads, the remaining threads are terminated
   upon completion of existing job.

Fixes: https://tracker.ceph.com/issues/46892
Signed-off-by: Kotresh HR <khiremat@redhat.com>
(cherry picked from commit 83c4442c765bb64f9d4fbd3edcb4967dfa21cafe)

Conflicts:
    src/pybind/mgr/volumes/fs/volume.py
     - keep connection_pool because nautilus does not introduce
       the CephfsClient
    src/pybind/mgr/volumes/module.py
     - Remove nfs export related things as nautilus doesn't contains NFSExport, FSExport class
    qa/tasks/cephfs/test_volumes.py
     - nautilus expects particular 'mgr.id', hence used 'mgr.x' instead of 'mgr'

doc/cephfs/fs-volumes.rst
qa/tasks/cephfs/test_volumes.py
src/pybind/mgr/volumes/fs/async_cloner.py
src/pybind/mgr/volumes/fs/async_job.py
src/pybind/mgr/volumes/fs/volume.py
src/pybind/mgr/volumes/module.py

index 65f20ccd4978154005c6fcece9aff2c28aa1c5fb..efc2053caefa4b5f0ae6791a3ff288b3f7714a20 100644 (file)
@@ -272,6 +272,10 @@ Similar to specifying a pool layout when creating a subvolume, pool layout can b
 
   $ ceph fs subvolume snapshot clone <vol_name> <subvol_name> <snap_name> <target_subvol_name> --pool_layout <pool_layout>
 
+Configure maximum number of concurrent clones. The default is set to 4::
+
+  $ ceph config set mgr mgr/volumes/max_concurrent_clones <value>
+
 To check the status of a clone operation use::
 
   $ ceph fs clone status <vol_name> <clone_name> [--group_name <group_name>]
index ec97848f8a5e9fd4e90ee3917afea1d407bd19ef..cf2ba318b3e5a3c1d2a59a2b4f285e8d29d141f6 100644 (file)
@@ -2692,6 +2692,25 @@ class TestVolumes(CephFSTestCase):
         # verify trash dir is clean
         self._wait_for_trash_empty()
 
+    def test_subvolume_snapshot_reconf_max_concurrent_clones(self):
+        """
+        Validate 'max_concurrent_clones' config option
+        """
+
+        # get the default number of cloner threads
+        default_max_concurrent_clones = int(self.config_get('mgr.x', 'mgr/volumes/max_concurrent_clones'))
+        self.assertEqual(default_max_concurrent_clones, 4)
+
+        # Increase number of cloner threads
+        self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 6)
+        max_concurrent_clones = int(self.config_get('mgr.x', 'mgr/volumes/max_concurrent_clones'))
+        self.assertEqual(max_concurrent_clones, 6)
+
+        # Decrease number of cloner threads
+        self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2)
+        max_concurrent_clones = int(self.config_get('mgr.x', 'mgr/volumes/max_concurrent_clones'))
+        self.assertEqual(max_concurrent_clones, 2)
+
     def test_subvolume_snapshot_clone_pool_layout(self):
         subvolume = self._generate_random_subvolume_name()
         snapshot = self._generate_random_snapshot_name()
index 7b6bdcb8c28fff4a1b6cd47099e0326ae49c0b3c..c66eb712a427c5dd45b86c90dd9beb4632ff6326 100644 (file)
@@ -274,6 +274,9 @@ class Cloner(AsyncJobs):
         }
         super(Cloner, self).__init__(volume_client, "cloner", tp_size)
 
+    def reconfigure_max_concurrent_clones(self, tp_size):
+        super(Cloner, self).reconfigure_max_concurrent_clones("cloner", tp_size)
+
     def is_clone_cancelable(self, clone_state):
         return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
 
index b64590e5878719a5ebb9fa8468d77afb04eb63a3..954e89c4f54ca6f089bf1243e27ccc92dc6f4504 100644 (file)
@@ -27,6 +27,7 @@ class JobThread(threading.Thread):
         thread_id = threading.currentThread()
         assert isinstance(thread_id, JobThread)
         thread_name = thread_id.getName()
+        log.debug("thread [{0}] starting".format(thread_name))
 
         while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
             vol_job = None
@@ -34,6 +35,10 @@ class JobThread(threading.Thread):
                 # fetch next job to execute
                 with self.async_job.lock:
                     while True:
+                        if self.should_reconfigure_num_threads():
+                            log.info("thread [{0}] terminating due to reconfigure".format(thread_name))
+                            self.async_job.threads.remove(self)
+                            return
                         vol_job = self.async_job.get_job()
                         if vol_job:
                             break
@@ -62,6 +67,12 @@ class JobThread(threading.Thread):
                 time.sleep(1)
         log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name))
         self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name))
+        with self.async_job.lock:
+            self.async_job.threads.remove(self)
+
+    def should_reconfigure_num_threads(self):
+        # reconfigure of max_concurrent_clones
+        return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs
 
     def cancel_job(self):
         self.cancel_event.set()
@@ -103,12 +114,28 @@ class AsyncJobs(object):
         # cv for job cancelation
         self.waiting = False
         self.cancel_cv = threading.Condition(self.lock)
+        self.nr_concurrent_jobs = nr_concurrent_jobs
 
         self.threads = []
         for i in range(nr_concurrent_jobs):
             self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i)))
             self.threads[-1].start()
 
+    def reconfigure_max_concurrent_clones(self, name_pfx, nr_concurrent_jobs):
+        """
+        reconfigure number of cloner threads
+        """
+        with self.lock:
+            self.nr_concurrent_jobs = nr_concurrent_jobs
+            # Decrease in concurrency. Notify threads which are waiting for a job to terminate.
+            if len(self.threads) > nr_concurrent_jobs:
+                self.cv.notifyAll()
+            # Increase in concurrency
+            if len(self.threads) < nr_concurrent_jobs:
+                for i in range(len(self.threads), nr_concurrent_jobs):
+                    self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(name_pfx, time.time(), i)))
+                    self.threads[-1].start()
+
     def get_job(self):
         log.debug("processing {0} volume entries".format(len(self.q)))
         nr_vols = len(self.q)
index 2477c83232529bf02ae192272fcdb0c967fd8ece..28f7b7c561b60a3ecfa2937ac4ea1579c16f6edf 100644 (file)
@@ -43,8 +43,7 @@ class VolumeClient(object):
         # volume specification
         self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
         self.connection_pool = ConnectionPool(self.mgr)
-        # TODO: make thread pool size configurable
-        self.cloner = Cloner(self, 4)
+        self.cloner = Cloner(self, self.mgr.max_concurrent_clones)
         self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
         # on startup, queue purge job for available volumes to kickstart
         # purge for leftover subvolume entries in trash. note that, if the
index ae188d531ed570c3bc14f3282c6956b96100c503..18feb3267354ce079ae4b3e5a51f07eaa7251d50 100644 (file)
@@ -2,6 +2,7 @@ import errno
 import json
 import logging
 import traceback
+import threading
 
 from mgr_module import MgrModule
 import orchestrator
@@ -278,9 +279,26 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
         # volume in the lifetime of this module instance.
     ]
 
+    MODULE_OPTIONS = [
+        {
+            'name': 'max_concurrent_clones',
+            'type': 'int',
+            'default': 4,
+            'desc': 'Number of asynchronous cloner threads',
+        }
+    ]
+
     def __init__(self, *args, **kwargs):
+        self.inited = False
+        # for mypy
+        self.max_concurrent_clones = None
+        self.lock = threading.Lock()
         super(Module, self).__init__(*args, **kwargs)
-        self.vc = VolumeClient(self)
+        # Initialize config option members
+        self.config_notify()
+        with self.lock:
+            self.vc = VolumeClient(self)
+            self.inited = True
 
     def __del__(self):
         self.vc.shutdown()
@@ -288,6 +306,21 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
     def shutdown(self):
         self.vc.shutdown()
 
+    def config_notify(self):
+        """
+        This method is called whenever one of our config options is changed.
+        """
+        with self.lock:
+            for opt in self.MODULE_OPTIONS:
+                setattr(self,
+                        opt['name'],  # type: ignore
+                        self.get_module_option(opt['name']))  # type: ignore
+                self.log.debug(' mgr option %s = %s',
+                               opt['name'], getattr(self, opt['name']))  # type: ignore
+                if self.inited:
+                    if opt['name'] == "max_concurrent_clones":
+                        self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones)
+
     def handle_command(self, inbuf, cmd):
         handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")
         try: