]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes: support to reject CephFS clones if cloner threads are not available
authorneeraj pratap singh <neerajpratapsingh@li-ff7f0d4c-3462-11b2-a85c-d4004c0fa1a0.ibm.com>
Wed, 2 Aug 2023 02:38:39 +0000 (08:08 +0530)
committerneeraj pratap singh <neerajpratapsingh@li-ff7f0d4c-3462-11b2-a85c-d4004c0fa1a0.ibm.com>
Thu, 8 Feb 2024 05:40:38 +0000 (11:10 +0530)
CephFS clone creation have a limit of 4 parallel clones by default at a time and rest
of the clone create requests are queued. This makes CephFS cloning very slow when
there is large amount of clones being created.After this patch clone requests won't be accepeted
when the requests exceed the `max_concurrent_clones` config value.

Fixes: https://tracker.ceph.com/issues/59714
Signed-off-by: Neeraj Pratap Singh <neesingh@redhat.com>
src/pybind/mgr/volumes/fs/async_cloner.py
src/pybind/mgr/volumes/fs/operations/volume.py
src/pybind/mgr/volumes/fs/volume.py
src/pybind/mgr/volumes/module.py

index 146d2e75590276943791ccce72a0c315fd9083a5..685b2f03c78c06a40a71b5173cd2f476b13ca152 100644 (file)
@@ -337,9 +337,10 @@ class Cloner(AsyncJobs):
     this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
     the driver. file types supported are directories, symbolic links and regular files.
     """
-    def __init__(self, volume_client, tp_size, snapshot_clone_delay):
+    def __init__(self, volume_client, tp_size, snapshot_clone_delay, clone_no_wait):
         self.vc = volume_client
         self.snapshot_clone_delay = snapshot_clone_delay
+        self.snapshot_clone_no_wait = clone_no_wait
         self.state_table = {
             SubvolumeStates.STATE_PENDING      : handle_clone_pending,
             SubvolumeStates.STATE_INPROGRESS   : handle_clone_in_progress,
@@ -355,6 +356,9 @@ class Cloner(AsyncJobs):
     def reconfigure_snapshot_clone_delay(self, timeout):
         self.snapshot_clone_delay = timeout
 
+    def reconfigure_reject_clones(self, clone_no_wait):
+        self.snapshot_clone_no_wait = clone_no_wait
+
     def is_clone_cancelable(self, clone_state):
         return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
 
index 67fbb891cefb2b2c663ff0d4d012a1f136b3a366..0bf42827161eb513f5f4c917e747c691670decbe 100644 (file)
@@ -9,11 +9,12 @@ from contextlib import contextmanager
 import orchestrator
 
 from .lock import GlobalLock
-from ..exception import VolumeException
+from ..exception import VolumeException, IndexException
 from ..fs_util import create_pool, remove_pool, rename_pool, create_filesystem, \
     remove_filesystem, rename_filesystem, create_mds, volume_exists, listdir
 from .trash import Trash
 from mgr_util import open_filesystem, CephfsConnectionException
+from .clone_index import open_clone_index
 
 log = logging.getLogger(__name__)
 
@@ -260,6 +261,30 @@ def get_pending_subvol_deletions_count(fs, path):
     return {'pending_subvolume_deletions': num_pending_subvol_del}
 
 
+def get_all_pending_clones_count(self, mgr, vol_spec):
+    pending_clones_cnt = 0
+    index_path = ""
+    fs_map = mgr.get('fs_map')
+    for fs in fs_map['filesystems']:
+        volname = fs['mdsmap']['fs_name']
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_clone_index(fs_handle, vol_spec) as index:
+                    index_path = index.path.decode('utf-8')
+                    pending_clones_cnt = pending_clones_cnt \
+                                            + len(listdir(fs_handle, index_path,
+                                                          filter_entries=None, filter_files=False))
+        except IndexException as e:
+            if e.errno == -errno.ENOENT:
+                continue
+            raise VolumeException(-e.args[0], e.args[1])
+        except VolumeException as ve:
+            log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve))
+            raise ve
+
+    return pending_clones_cnt
+
+
 @contextmanager
 def open_volume(vc, volname):
     """
index c896fd73d0b05cc71cce8c1d1d8a94439da63d05..0c4a075980540956b2c5376468a3cdf67f29ac50 100644 (file)
@@ -13,12 +13,14 @@ from .fs_util import listdir, has_subdir
 from .operations.group import open_group, create_group, remove_group, \
     open_group_unique, set_group_attrs
 from .operations.volume import create_volume, delete_volume, rename_volume, \
-    list_volumes, open_volume, get_pool_names, get_pool_ids, get_pending_subvol_deletions_count
+    list_volumes, open_volume, get_pool_names, get_pool_ids, \
+    get_pending_subvol_deletions_count, get_all_pending_clones_count
 from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
     create_clone
 
 from .vol_spec import VolSpec
-from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
+from .exception import VolumeException, ClusterError, ClusterTimeout, \
+    EvictionError, IndexException
 from .async_cloner import Cloner
 from .purge_queue import ThreadPoolPurgeQueueMixin
 from .operations.template import SubvolumeOpType
@@ -53,7 +55,8 @@ class VolumeClient(CephfsClient["Module"]):
         super().__init__(mgr)
         # volume specification
         self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
-        self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
+        self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay,
+                             self.mgr.snapshot_clone_no_wait)
         self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
         # on startup, queue purge job for available volumes to kickstart
         # purge for leftover subvolume entries in trash. note that, if the
@@ -764,6 +767,10 @@ class VolumeClient(CephfsClient["Module"]):
         s_groupname  = kwargs['group_name']
 
         try:
+            if self.mgr.snapshot_clone_no_wait and \
+               get_all_pending_clones_count(self, self.mgr, self.volspec) >= self.mgr.max_concurrent_clones:
+                raise(VolumeException(-errno.EAGAIN, "all cloner threads are busy, please try again later"))
+            
             with open_volume(self, volname) as fs_handle:
                 with open_group(fs_handle, self.volspec, s_groupname) as s_group:
                     with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
index 68031ed55a3b890681141cea848f0ebf88a09b7b..4a28fdc869ead109203644151fa6b958e2a73db3 100644 (file)
@@ -489,7 +489,12 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
             'periodic_async_work',
             type='bool',
             default=False,
-            desc='Periodically check for async work')
+            desc='Periodically check for async work'),
+        Option(
+            'snapshot_clone_no_wait',
+            type='bool',
+            default=True,
+            desc='Reject subvolume clone request when cloner threads are busy')
     ]
 
     def __init__(self, *args, **kwargs):
@@ -498,6 +503,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
         self.max_concurrent_clones = None
         self.snapshot_clone_delay = None
         self.periodic_async_work = False
+        self.snapshot_clone_no_wait = None
         self.lock = threading.Lock()
         super(Module, self).__init__(*args, **kwargs)
         # Initialize config option members
@@ -532,6 +538,8 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
                         else:
                             self.vc.cloner.unset_wakeup_timeout()
                             self.vc.purge_queue.unset_wakeup_timeout()
+                    elif opt['name'] == "snapshot_clone_no_wait":
+                        self.vc.cloner.reconfigure_reject_clones(self.snapshot_clone_no_wait)
 
     def handle_command(self, inbuf, cmd):
         handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")