From 079f722c37ef6cc8bd3cc26c49ae119dd83431f9 Mon Sep 17 00:00:00 2001 From: neeraj pratap singh Date: Wed, 2 Aug 2023 08:08:39 +0530 Subject: [PATCH] mgr/volumes: support to reject CephFS clones if cloner threads are not available CephFS clone creation have a limit of 4 parallel clones by default at a time and rest of the clone create requests are queued. This makes CephFS cloning very slow when there is large amount of clones being created.After this patch clone requests won't be accepeted when the requests exceed the `max_concurrent_clones` config value. Fixes: https://tracker.ceph.com/issues/59714 Signed-off-by: Neeraj Pratap Singh --- src/pybind/mgr/volumes/fs/async_cloner.py | 6 ++++- .../mgr/volumes/fs/operations/volume.py | 27 ++++++++++++++++++- src/pybind/mgr/volumes/fs/volume.py | 13 ++++++--- src/pybind/mgr/volumes/module.py | 10 ++++++- 4 files changed, 50 insertions(+), 6 deletions(-) diff --git a/src/pybind/mgr/volumes/fs/async_cloner.py b/src/pybind/mgr/volumes/fs/async_cloner.py index 146d2e7559027..685b2f03c78c0 100644 --- a/src/pybind/mgr/volumes/fs/async_cloner.py +++ b/src/pybind/mgr/volumes/fs/async_cloner.py @@ -337,9 +337,10 @@ class Cloner(AsyncJobs): this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as the driver. file types supported are directories, symbolic links and regular files. """ - def __init__(self, volume_client, tp_size, snapshot_clone_delay): + def __init__(self, volume_client, tp_size, snapshot_clone_delay, clone_no_wait): self.vc = volume_client self.snapshot_clone_delay = snapshot_clone_delay + self.snapshot_clone_no_wait = clone_no_wait self.state_table = { SubvolumeStates.STATE_PENDING : handle_clone_pending, SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress, @@ -355,6 +356,9 @@ class Cloner(AsyncJobs): def reconfigure_snapshot_clone_delay(self, timeout): self.snapshot_clone_delay = timeout + def reconfigure_reject_clones(self, clone_no_wait): + self.snapshot_clone_no_wait = clone_no_wait + def is_clone_cancelable(self, clone_state): return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state)) diff --git a/src/pybind/mgr/volumes/fs/operations/volume.py b/src/pybind/mgr/volumes/fs/operations/volume.py index 67fbb891cefb2..0bf42827161eb 100644 --- a/src/pybind/mgr/volumes/fs/operations/volume.py +++ b/src/pybind/mgr/volumes/fs/operations/volume.py @@ -9,11 +9,12 @@ from contextlib import contextmanager import orchestrator from .lock import GlobalLock -from ..exception import VolumeException +from ..exception import VolumeException, IndexException from ..fs_util import create_pool, remove_pool, rename_pool, create_filesystem, \ remove_filesystem, rename_filesystem, create_mds, volume_exists, listdir from .trash import Trash from mgr_util import open_filesystem, CephfsConnectionException +from .clone_index import open_clone_index log = logging.getLogger(__name__) @@ -260,6 +261,30 @@ def get_pending_subvol_deletions_count(fs, path): return {'pending_subvolume_deletions': num_pending_subvol_del} +def get_all_pending_clones_count(self, mgr, vol_spec): + pending_clones_cnt = 0 + index_path = "" + fs_map = mgr.get('fs_map') + for fs in fs_map['filesystems']: + volname = fs['mdsmap']['fs_name'] + try: + with open_volume(self, volname) as fs_handle: + with open_clone_index(fs_handle, vol_spec) as index: + index_path = index.path.decode('utf-8') + pending_clones_cnt = pending_clones_cnt \ + + len(listdir(fs_handle, index_path, + filter_entries=None, filter_files=False)) + except IndexException as e: + if e.errno == -errno.ENOENT: + continue + raise VolumeException(-e.args[0], e.args[1]) + except VolumeException as ve: + log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve)) + raise ve + + return pending_clones_cnt + + @contextmanager def open_volume(vc, volname): """ diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index c896fd73d0b05..0c4a075980540 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -13,12 +13,14 @@ from .fs_util import listdir, has_subdir from .operations.group import open_group, create_group, remove_group, \ open_group_unique, set_group_attrs from .operations.volume import create_volume, delete_volume, rename_volume, \ - list_volumes, open_volume, get_pool_names, get_pool_ids, get_pending_subvol_deletions_count + list_volumes, open_volume, get_pool_names, get_pool_ids, \ + get_pending_subvol_deletions_count, get_all_pending_clones_count from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \ create_clone from .vol_spec import VolSpec -from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError +from .exception import VolumeException, ClusterError, ClusterTimeout, \ + EvictionError, IndexException from .async_cloner import Cloner from .purge_queue import ThreadPoolPurgeQueueMixin from .operations.template import SubvolumeOpType @@ -53,7 +55,8 @@ class VolumeClient(CephfsClient["Module"]): super().__init__(mgr) # volume specification self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir')) - self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay) + self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay, + self.mgr.snapshot_clone_no_wait) self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4) # on startup, queue purge job for available volumes to kickstart # purge for leftover subvolume entries in trash. note that, if the @@ -764,6 +767,10 @@ class VolumeClient(CephfsClient["Module"]): s_groupname = kwargs['group_name'] try: + if self.mgr.snapshot_clone_no_wait and \ + get_all_pending_clones_count(self, self.mgr, self.volspec) >= self.mgr.max_concurrent_clones: + raise(VolumeException(-errno.EAGAIN, "all cloner threads are busy, please try again later")) + with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, s_groupname) as s_group: with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume: diff --git a/src/pybind/mgr/volumes/module.py b/src/pybind/mgr/volumes/module.py index 68031ed55a3b8..4a28fdc869ead 100644 --- a/src/pybind/mgr/volumes/module.py +++ b/src/pybind/mgr/volumes/module.py @@ -489,7 +489,12 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): 'periodic_async_work', type='bool', default=False, - desc='Periodically check for async work') + desc='Periodically check for async work'), + Option( + 'snapshot_clone_no_wait', + type='bool', + default=True, + desc='Reject subvolume clone request when cloner threads are busy') ] def __init__(self, *args, **kwargs): @@ -498,6 +503,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): self.max_concurrent_clones = None self.snapshot_clone_delay = None self.periodic_async_work = False + self.snapshot_clone_no_wait = None self.lock = threading.Lock() super(Module, self).__init__(*args, **kwargs) # Initialize config option members @@ -532,6 +538,8 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): else: self.vc.cloner.unset_wakeup_timeout() self.vc.purge_queue.unset_wakeup_timeout() + elif opt['name'] == "snapshot_clone_no_wait": + self.vc.cloner.reconfigure_reject_clones(self.snapshot_clone_no_wait) def handle_command(self, inbuf, cmd): handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_") -- 2.39.5