]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes: support to reject CephFS clones if cloner threads are not available
authorneeraj pratap singh <neerajpratapsingh@li-ff7f0d4c-3462-11b2-a85c-d4004c0fa1a0.ibm.com>
Wed, 2 Aug 2023 02:38:39 +0000 (08:08 +0530)
committerneeraj pratap singh <neerajpratapsingh@li-ff7f0d4c-3462-11b2-a85c-d4004c0fa1a0.ibm.com>
Wed, 21 Feb 2024 14:36:53 +0000 (20:06 +0530)
CephFS clone creation have a limit of 4 parallel clones by default at a time and rest
of the clone create requests are queued. This makes CephFS cloning very slow when
there is large amount of clones being created.After this patch clone requests won't be accepeted
when the requests exceed the `max_concurrent_clones` config value.

Fixes: https://tracker.ceph.com/issues/59714
Signed-off-by: Neeraj Pratap Singh <neesingh@redhat.com>
(cherry picked from commit 079f722c37ef6cc8bd3cc26c49ae119dd83431f9)

Conflicts: src/pybind/mgr/volumes/module.py: This conflict was due to
`period_async_work` config which is present in main but not in reef.

src/pybind/mgr/volumes/fs/async_cloner.py
src/pybind/mgr/volumes/fs/operations/volume.py
src/pybind/mgr/volumes/fs/volume.py
src/pybind/mgr/volumes/module.py

index 95f7d64e1b36422995b5941032c9e4bf39656b18..deb208477bcf5b17321423ad07a52692a0dbf2cc 100644 (file)
@@ -322,9 +322,10 @@ class Cloner(AsyncJobs):
     this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
     the driver. file types supported are directories, symbolic links and regular files.
     """
-    def __init__(self, volume_client, tp_size, snapshot_clone_delay):
+    def __init__(self, volume_client, tp_size, snapshot_clone_delay, clone_no_wait):
         self.vc = volume_client
         self.snapshot_clone_delay = snapshot_clone_delay
+        self.snapshot_clone_no_wait = clone_no_wait
         self.state_table = {
             SubvolumeStates.STATE_PENDING      : handle_clone_pending,
             SubvolumeStates.STATE_INPROGRESS   : handle_clone_in_progress,
@@ -340,6 +341,9 @@ class Cloner(AsyncJobs):
     def reconfigure_snapshot_clone_delay(self, timeout):
         self.snapshot_clone_delay = timeout
 
+    def reconfigure_reject_clones(self, clone_no_wait):
+        self.snapshot_clone_no_wait = clone_no_wait
+
     def is_clone_cancelable(self, clone_state):
         return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
 
index 395a3fb4ea07265e10f1fc409b304bcae9b31ecd..f93ecee19a0679de96864ec413c7c6b98e124b4c 100644 (file)
@@ -9,11 +9,12 @@ from contextlib import contextmanager
 import orchestrator
 
 from .lock import GlobalLock
-from ..exception import VolumeException
+from ..exception import VolumeException, IndexException
 from ..fs_util import create_pool, remove_pool, rename_pool, create_filesystem, \
     remove_filesystem, rename_filesystem, create_mds, volume_exists, listdir
 from .trash import Trash
 from mgr_util import open_filesystem, CephfsConnectionException
+from .clone_index import open_clone_index
 
 log = logging.getLogger(__name__)
 
@@ -260,6 +261,30 @@ def get_pending_subvol_deletions_count(fs, path):
     return {'pending_subvolume_deletions': num_pending_subvol_del}
 
 
+def get_all_pending_clones_count(self, mgr, vol_spec):
+    pending_clones_cnt = 0
+    index_path = ""
+    fs_map = mgr.get('fs_map')
+    for fs in fs_map['filesystems']:
+        volname = fs['mdsmap']['fs_name']
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_clone_index(fs_handle, vol_spec) as index:
+                    index_path = index.path.decode('utf-8')
+                    pending_clones_cnt = pending_clones_cnt \
+                                            + len(listdir(fs_handle, index_path,
+                                                          filter_entries=None, filter_files=False))
+        except IndexException as e:
+            if e.errno == -errno.ENOENT:
+                continue
+            raise VolumeException(-e.args[0], e.args[1])
+        except VolumeException as ve:
+            log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve))
+            raise ve
+
+    return pending_clones_cnt
+
+
 @contextmanager
 def open_volume(vc, volname):
     """
index 6772d64c3970bf8c17bb6209029c451abb25244f..1b7c5aeb1a55e7cdac6b0dba2b3dc5b6f9bb8f1f 100644 (file)
@@ -14,13 +14,15 @@ from .fs_util import listdir, has_subdir
 from .operations.group import open_group, create_group, remove_group, \
     open_group_unique, set_group_attrs
 from .operations.volume import create_volume, delete_volume, rename_volume, \
-    list_volumes, open_volume, get_pool_names, get_pool_ids, get_pending_subvol_deletions_count
+    list_volumes, open_volume, get_pool_names, get_pool_ids, \
+    get_pending_subvol_deletions_count, get_all_pending_clones_count
 from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
     create_clone
 from .operations.trash import Trash
 
 from .vol_spec import VolSpec
-from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
+from .exception import VolumeException, ClusterError, ClusterTimeout, \
+    EvictionError, IndexException
 from .async_cloner import Cloner
 from .purge_queue import ThreadPoolPurgeQueueMixin
 from .operations.template import SubvolumeOpType
@@ -55,7 +57,8 @@ class VolumeClient(CephfsClient["Module"]):
         super().__init__(mgr)
         # volume specification
         self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
-        self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
+        self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay,
+                             self.mgr.snapshot_clone_no_wait)
         self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
         # on startup, queue purge job for available volumes to kickstart
         # purge for leftover subvolume entries in trash. note that, if the
@@ -766,6 +769,10 @@ class VolumeClient(CephfsClient["Module"]):
         s_groupname  = kwargs['group_name']
 
         try:
+            if self.mgr.snapshot_clone_no_wait and \
+               get_all_pending_clones_count(self, self.mgr, self.volspec) >= self.mgr.max_concurrent_clones:
+                raise(VolumeException(-errno.EAGAIN, "all cloner threads are busy, please try again later"))
+            
             with open_volume(self, volname) as fs_handle:
                 with open_group(fs_handle, self.volspec, s_groupname) as s_group:
                     with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
index 1d62f447b0efaf549559649399f75904f810f160..8a50baaad0c0f08dc484e0cf4f2492e3ef7fe542 100644 (file)
@@ -483,8 +483,13 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
         Option(
             'snapshot_clone_delay',
             type='int',
-            default=0,
-            desc='Delay clone begin operation by snapshot_clone_delay seconds')
+            default=0,            
+            desc='Delay clone begin operation by snapshot_clone_delay seconds'),
+        Option(
+            'snapshot_clone_no_wait',
+            type='bool',
+            default=True,
+            desc='Reject subvolume clone request when cloner threads are busy')
     ]
 
     def __init__(self, *args, **kwargs):
@@ -492,6 +497,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
         # for mypy
         self.max_concurrent_clones = None
         self.snapshot_clone_delay = None
+        self.snapshot_clone_no_wait = None
         self.lock = threading.Lock()
         super(Module, self).__init__(*args, **kwargs)
         # Initialize config option members
@@ -522,6 +528,8 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
                         self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones)
                     elif opt['name'] == "snapshot_clone_delay":
                         self.vc.cloner.reconfigure_snapshot_clone_delay(self.snapshot_clone_delay)
+                    elif opt['name'] == "snapshot_clone_no_wait":
+                        self.vc.cloner.reconfigure_reject_clones(self.snapshot_clone_no_wait)
 
     def handle_command(self, inbuf, cmd):
         handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")