]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes: Make number of cloner threads configurable 37671/head
authorKotresh HR <khiremat@redhat.com>
Tue, 4 Aug 2020 07:57:53 +0000 (13:27 +0530)
committerVicente Cheng <freeze.bilsted@gmail.com>
Thu, 22 Oct 2020 03:08:17 +0000 (03:08 +0000)
The number of cloner threads is set to 4 and it can't be
configured. This patch makes the number of cloner threads
configurable via the mgr config option "max_concurrent_clones".

On an increase in number of cloner threads, it will just
spawn the difference of threads between existing number of
cloner threads and the new configuration. It will not cancel
the running cloner threads.

On decrease in number of cloner threads, the cases are as follows.

1. If all cloner threads are waiting for the job:
      In this case, all threads are notified and required number
   threads are terminated.

2. If all the cloner threads are processing a job:
      In this case, the condition is validated for each thread after
   the current job is finished and the thread is termianted if the
   condition for required number of cloner threads is not satisified.

3. If few cloner threads are processing and others are waiting:
      The threads which are waiting are notified to validate the
   number of threads required. If terminating those doesn't satisfy the
   required number of threads, the remaining threads are terminated
   upon completion of existing job.

Fixes: https://tracker.ceph.com/issues/46892
Signed-off-by: Kotresh HR <khiremat@redhat.com>
(cherry picked from commit 83c4442c765bb64f9d4fbd3edcb4967dfa21cafe)

Conflicts:
src/pybind/mgr/volumes/fs/volume.py
  - keep connection_pool because octopus does not introduce
    the CephfsClient

doc/cephfs/fs-volumes.rst
qa/tasks/cephfs/test_volumes.py
src/pybind/mgr/volumes/fs/async_cloner.py
src/pybind/mgr/volumes/fs/async_job.py
src/pybind/mgr/volumes/fs/volume.py
src/pybind/mgr/volumes/module.py

index 4efe26d8b642222373a7c39ec61f44be9e5f7d45..15360231e3902a655fcf4995f6367f7d34ae5290 100644 (file)
@@ -251,6 +251,10 @@ Similar to specifying a pool layout when creating a subvolume, pool layout can b
 
   $ ceph fs subvolume snapshot clone <vol_name> <subvol_name> <snap_name> <target_subvol_name> --pool_layout <pool_layout>
 
+Configure maximum number of concurrent clones. The default is set to 4::
+
+  $ ceph config set mgr mgr/volumes/max_concurrent_clones <value>
+
 To check the status of a clone operation use::
 
   $ ceph fs clone status <vol_name> <clone_name> [--group_name <group_name>]
index 88fbe04cd278fe69d71a31d41aa829d44e1a77db..7984cea9205c1c83cae12cf8ee57c63d72a10a49 100644 (file)
@@ -1728,6 +1728,25 @@ class TestVolumes(CephFSTestCase):
         # verify trash dir is clean
         self._wait_for_trash_empty()
 
+    def test_subvolume_snapshot_reconf_max_concurrent_clones(self):
+        """
+        Validate 'max_concurrent_clones' config option
+        """
+
+        # get the default number of cloner threads
+        default_max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
+        self.assertEqual(default_max_concurrent_clones, 4)
+
+        # Increase number of cloner threads
+        self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 6)
+        max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
+        self.assertEqual(max_concurrent_clones, 6)
+
+        # Decrease number of cloner threads
+        self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2)
+        max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
+        self.assertEqual(max_concurrent_clones, 2)
+
     def test_subvolume_snapshot_clone_pool_layout(self):
         subvolume = self._generate_random_subvolume_name()
         snapshot = self._generate_random_snapshot_name()
index 7cca7da8ec21d9b42c3cff4203cc156f488cbbe3..e86abfe0284b5920913207d08e6bbb80aa74aad4 100644 (file)
@@ -241,6 +241,9 @@ class Cloner(AsyncJobs):
         }
         super(Cloner, self).__init__(volume_client, "cloner", tp_size)
 
+    def reconfigure_max_concurrent_clones(self, tp_size):
+        super(Cloner, self).reconfigure_max_concurrent_clones("cloner", tp_size)
+
     def is_clone_cancelable(self, clone_state):
         return not (OpSm.is_final_state(clone_state) or OpSm.is_failed_state(clone_state))
 
index 3bdedb723b9ce4a0c7b60fd4663b31ae0ea048bc..fb7051f47c24282b07ec51f9149fde6f10243763 100644 (file)
@@ -27,6 +27,7 @@ class JobThread(threading.Thread):
         thread_id = threading.currentThread()
         assert isinstance(thread_id, JobThread)
         thread_name = thread_id.getName()
+        log.debug("thread [{0}] starting".format(thread_name))
 
         while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
             vol_job = None
@@ -34,6 +35,10 @@ class JobThread(threading.Thread):
                 # fetch next job to execute
                 with self.async_job.lock:
                     while True:
+                        if self.should_reconfigure_num_threads():
+                            log.info("thread [{0}] terminating due to reconfigure".format(thread_name))
+                            self.async_job.threads.remove(self)
+                            return
                         vol_job = self.async_job.get_job()
                         if vol_job:
                             break
@@ -62,6 +67,12 @@ class JobThread(threading.Thread):
                 time.sleep(1)
         log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name))
         self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name))
+        with self.async_job.lock:
+            self.async_job.threads.remove(self)
+
+    def should_reconfigure_num_threads(self):
+        # reconfigure of max_concurrent_clones
+        return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs
 
     def cancel_job(self):
         self.cancel_event.set()
@@ -103,12 +114,28 @@ class AsyncJobs(object):
         # cv for job cancelation
         self.waiting = False
         self.cancel_cv = threading.Condition(self.lock)
+        self.nr_concurrent_jobs = nr_concurrent_jobs
 
         self.threads = []
         for i in range(nr_concurrent_jobs):
             self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i)))
             self.threads[-1].start()
 
+    def reconfigure_max_concurrent_clones(self, name_pfx, nr_concurrent_jobs):
+        """
+        reconfigure number of cloner threads
+        """
+        with self.lock:
+            self.nr_concurrent_jobs = nr_concurrent_jobs
+            # Decrease in concurrency. Notify threads which are waiting for a job to terminate.
+            if len(self.threads) > nr_concurrent_jobs:
+                self.cv.notifyAll()
+            # Increase in concurrency
+            if len(self.threads) < nr_concurrent_jobs:
+                for i in range(len(self.threads), nr_concurrent_jobs):
+                    self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(name_pfx, time.time(), i)))
+                    self.threads[-1].start()
+
     def get_job(self):
         log.debug("processing {0} volume entries".format(len(self.q)))
         nr_vols = len(self.q)
index 8edaca6def12901b414b7db66c6efeb2efbba153..10197dd3111452a2e621d2b3b7b4807b71b1d933 100644 (file)
@@ -42,8 +42,7 @@ class VolumeClient(object):
         # volume specification
         self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
         self.connection_pool = ConnectionPool(self.mgr)
-        # TODO: make thread pool size configurable
-        self.cloner = Cloner(self, 4)
+        self.cloner = Cloner(self, self.mgr.max_concurrent_clones)
         self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
         # on startup, queue purge job for available volumes to kickstart
         # purge for leftover subvolume entries in trash. note that, if the
index 3d4ddc0f4d78f1f84848dbdc061da297a5804e34..9fc2e3043fa0823b0696dc9092ceda4f5be18d45 100644 (file)
@@ -2,6 +2,7 @@ import errno
 import json
 import logging
 import traceback
+import threading
 
 from mgr_module import MgrModule
 import orchestrator
@@ -371,11 +372,28 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
         # volume in the lifetime of this module instance.
     ]
 
+    MODULE_OPTIONS = [
+        {
+            'name': 'max_concurrent_clones',
+            'type': 'int',
+            'default': 4,
+            'desc': 'Number of asynchronous cloner threads',
+        }
+    ]
+
     def __init__(self, *args, **kwargs):
+        self.inited = False
+        # for mypy
+        self.max_concurrent_clones = None
+        self.lock = threading.Lock()
         super(Module, self).__init__(*args, **kwargs)
-        self.vc = VolumeClient(self)
-        self.fs_export = FSExport(self)
-        self.nfs = NFSCluster(self)
+        # Initialize config option members
+        self.config_notify()
+        with self.lock:
+            self.vc = VolumeClient(self)
+            self.fs_export = FSExport(self)
+            self.nfs = NFSCluster(self)
+            self.inited = True
 
     def __del__(self):
         self.vc.shutdown()
@@ -383,6 +401,21 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
     def shutdown(self):
         self.vc.shutdown()
 
+    def config_notify(self):
+        """
+        This method is called whenever one of our config options is changed.
+        """
+        with self.lock:
+            for opt in self.MODULE_OPTIONS:
+                setattr(self,
+                        opt['name'],  # type: ignore
+                        self.get_module_option(opt['name']))  # type: ignore
+                self.log.debug(' mgr option %s = %s',
+                               opt['name'], getattr(self, opt['name']))  # type: ignore
+                if self.inited:
+                    if opt['name'] == "max_concurrent_clones":
+                        self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones)
+
     def handle_command(self, inbuf, cmd):
         handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")
         try: