]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes: Make number of cloner threads configurable 36554/head
authorKotresh HR <khiremat@redhat.com>
Tue, 4 Aug 2020 07:57:53 +0000 (13:27 +0530)
committerKotresh HR <khiremat@redhat.com>
Fri, 11 Sep 2020 09:06:37 +0000 (14:36 +0530)
The number of cloner threads is set to 4 and it can't be
configured. This patch makes the number of cloner threads
configurable via the mgr config option "max_concurrent_clones".

On an increase in number of cloner threads, it will just
spawn the difference of threads between existing number of
cloner threads and the new configuration. It will not cancel
the running cloner threads.

On decrease in number of cloner threads, the cases are as follows.

1. If all cloner threads are waiting for the job:
      In this case, all threads are notified and required number
   threads are terminated.

2. If all the cloner threads are processing a job:
      In this case, the condition is validated for each thread after
   the current job is finished and the thread is termianted if the
   condition for required number of cloner threads is not satisified.

3. If few cloner threads are processing and others are waiting:
      The threads which are waiting are notified to validate the
   number of threads required. If terminating those doesn't satisfy the
   required number of threads, the remaining threads are terminated
   upon completion of existing job.

Fixes: https://tracker.ceph.com/issues/46892
Signed-off-by: Kotresh HR <khiremat@redhat.com>
doc/cephfs/fs-volumes.rst
qa/tasks/cephfs/test_volumes.py
src/pybind/mgr/volumes/fs/async_cloner.py
src/pybind/mgr/volumes/fs/async_job.py
src/pybind/mgr/volumes/fs/volume.py
src/pybind/mgr/volumes/module.py

index e5811041ae734ecf6d587fc7fdbe1079c4517cba..48346cd5ce0a6bd6fcbd8dbe9ff53a21e046bb30 100644 (file)
@@ -275,6 +275,10 @@ Similar to specifying a pool layout when creating a subvolume, pool layout can b
 
   $ ceph fs subvolume snapshot clone <vol_name> <subvol_name> <snap_name> <target_subvol_name> --pool_layout <pool_layout>
 
+Configure maximum number of concurrent clones. The default is set to 4::
+
+  $ ceph config set mgr mgr/volumes/max_concurrent_clones <value>
+
 To check the status of a clone operation use::
 
   $ ceph fs clone status <vol_name> <clone_name> [--group_name <group_name>]
index e8098262597b9ffa49b46372034a1af3667f4946..f1046901efeec57acdfcfbc5cc4a4d928c81aa34 100644 (file)
@@ -2395,6 +2395,25 @@ class TestVolumes(CephFSTestCase):
         # verify trash dir is clean
         self._wait_for_trash_empty()
 
+    def test_subvolume_snapshot_reconf_max_concurrent_clones(self):
+        """
+        Validate 'max_concurrent_clones' config option
+        """
+
+        # get the default number of cloner threads
+        default_max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
+        self.assertEqual(default_max_concurrent_clones, 4)
+
+        # Increase number of cloner threads
+        self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 6)
+        max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
+        self.assertEqual(max_concurrent_clones, 6)
+
+        # Decrease number of cloner threads
+        self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2)
+        max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
+        self.assertEqual(max_concurrent_clones, 2)
+
     def test_subvolume_snapshot_clone_pool_layout(self):
         subvolume = self._generate_random_subvolume_name()
         snapshot = self._generate_random_snapshot_name()
index ac3b10d6a9c3be4544d16f2d003fcf6d9d2a875e..61928ec2d0f4230774e341422b84ae8bd09877b6 100644 (file)
@@ -274,6 +274,9 @@ class Cloner(AsyncJobs):
         }
         super(Cloner, self).__init__(volume_client, "cloner", tp_size)
 
+    def reconfigure_max_concurrent_clones(self, tp_size):
+        super(Cloner, self).reconfigure_max_concurrent_clones("cloner", tp_size)
+
     def is_clone_cancelable(self, clone_state):
         return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
 
index 3bdedb723b9ce4a0c7b60fd4663b31ae0ea048bc..fb7051f47c24282b07ec51f9149fde6f10243763 100644 (file)
@@ -27,6 +27,7 @@ class JobThread(threading.Thread):
         thread_id = threading.currentThread()
         assert isinstance(thread_id, JobThread)
         thread_name = thread_id.getName()
+        log.debug("thread [{0}] starting".format(thread_name))
 
         while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
             vol_job = None
@@ -34,6 +35,10 @@ class JobThread(threading.Thread):
                 # fetch next job to execute
                 with self.async_job.lock:
                     while True:
+                        if self.should_reconfigure_num_threads():
+                            log.info("thread [{0}] terminating due to reconfigure".format(thread_name))
+                            self.async_job.threads.remove(self)
+                            return
                         vol_job = self.async_job.get_job()
                         if vol_job:
                             break
@@ -62,6 +67,12 @@ class JobThread(threading.Thread):
                 time.sleep(1)
         log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name))
         self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name))
+        with self.async_job.lock:
+            self.async_job.threads.remove(self)
+
+    def should_reconfigure_num_threads(self):
+        # reconfigure of max_concurrent_clones
+        return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs
 
     def cancel_job(self):
         self.cancel_event.set()
@@ -103,12 +114,28 @@ class AsyncJobs(object):
         # cv for job cancelation
         self.waiting = False
         self.cancel_cv = threading.Condition(self.lock)
+        self.nr_concurrent_jobs = nr_concurrent_jobs
 
         self.threads = []
         for i in range(nr_concurrent_jobs):
             self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i)))
             self.threads[-1].start()
 
+    def reconfigure_max_concurrent_clones(self, name_pfx, nr_concurrent_jobs):
+        """
+        reconfigure number of cloner threads
+        """
+        with self.lock:
+            self.nr_concurrent_jobs = nr_concurrent_jobs
+            # Decrease in concurrency. Notify threads which are waiting for a job to terminate.
+            if len(self.threads) > nr_concurrent_jobs:
+                self.cv.notifyAll()
+            # Increase in concurrency
+            if len(self.threads) < nr_concurrent_jobs:
+                for i in range(len(self.threads), nr_concurrent_jobs):
+                    self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(name_pfx, time.time(), i)))
+                    self.threads[-1].start()
+
     def get_job(self):
         log.debug("processing {0} volume entries".format(len(self.q)))
         nr_vols = len(self.q)
index 2d9c24725a882a88cd6052897a3d9aca880e7f63..023334fff9c0c6bae1230431829e9554a5b7067a 100644 (file)
@@ -45,8 +45,7 @@ class VolumeClient(CephfsClient):
         super().__init__(mgr)
         # volume specification
         self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
-        # TODO: make thread pool size configurable
-        self.cloner = Cloner(self, 4)
+        self.cloner = Cloner(self, self.mgr.max_concurrent_clones)
         self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
         # on startup, queue purge job for available volumes to kickstart
         # purge for leftover subvolume entries in trash. note that, if the
index 60bd597a9f1d7a8e540c3d8c15c16b347c1d47f2..d30ccf6bb468d8361b897cfcba09197946b45680 100644 (file)
@@ -2,6 +2,7 @@ import errno
 import json
 import logging
 import traceback
+import threading
 
 from mgr_module import MgrModule
 import orchestrator
@@ -361,11 +362,28 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
         # volume in the lifetime of this module instance.
     ]
 
+    MODULE_OPTIONS = [
+        {
+            'name': 'max_concurrent_clones',
+            'type': 'int',
+            'default': 4,
+            'desc': 'Number of asynchronous cloner threads',
+        }
+    ]
+
     def __init__(self, *args, **kwargs):
+        self.inited = False
+        # for mypy
+        self.max_concurrent_clones = None
+        self.lock = threading.Lock()
         super(Module, self).__init__(*args, **kwargs)
-        self.vc = VolumeClient(self)
-        self.fs_export = FSExport(self)
-        self.nfs = NFSCluster(self)
+        # Initialize config option members
+        self.config_notify()
+        with self.lock:
+            self.vc = VolumeClient(self)
+            self.fs_export = FSExport(self)
+            self.nfs = NFSCluster(self)
+            self.inited = True
 
     def __del__(self):
         self.vc.shutdown()
@@ -373,6 +391,21 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
     def shutdown(self):
         self.vc.shutdown()
 
+    def config_notify(self):
+        """
+        This method is called whenever one of our config options is changed.
+        """
+        with self.lock:
+            for opt in self.MODULE_OPTIONS:
+                setattr(self,
+                        opt['name'],  # type: ignore
+                        self.get_module_option(opt['name']))  # type: ignore
+                self.log.debug(' mgr option %s = %s',
+                               opt['name'], getattr(self, opt['name']))  # type: ignore
+                if self.inited:
+                    if opt['name'] == "max_concurrent_clones":
+                        self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones)
+
     def handle_command(self, inbuf, cmd):
         handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")
         try: