From 44484295c76ba6cc14965388c9c05ba5993f6e9c Mon Sep 17 00:00:00 2001 From: neeraj pratap singh Date: Wed, 2 Aug 2023 08:08:39 +0530 Subject: [PATCH] mgr/volumes: support to reject CephFS clones if cloner threads are not available CephFS clone creation have a limit of 4 parallel clones by default at a time and rest of the clone create requests are queued. This makes CephFS cloning very slow when there is large amount of clones being created.After this patch clone requests won't be accepeted when the requests exceed the `max_concurrent_clones` config value. Fixes: https://tracker.ceph.com/issues/59714 Signed-off-by: Neeraj Pratap Singh (cherry picked from commit 079f722c37ef6cc8bd3cc26c49ae119dd83431f9) Conflicts: src/pybind/mgr/volumes/module.py: This conflict was due to `period_async_work` config which is present in main but not in reef. --- src/pybind/mgr/volumes/fs/async_cloner.py | 6 ++++- .../mgr/volumes/fs/operations/volume.py | 27 ++++++++++++++++++- src/pybind/mgr/volumes/fs/volume.py | 13 ++++++--- src/pybind/mgr/volumes/module.py | 12 +++++++-- 4 files changed, 51 insertions(+), 7 deletions(-) diff --git a/src/pybind/mgr/volumes/fs/async_cloner.py b/src/pybind/mgr/volumes/fs/async_cloner.py index 95f7d64e1b364..deb208477bcf5 100644 --- a/src/pybind/mgr/volumes/fs/async_cloner.py +++ b/src/pybind/mgr/volumes/fs/async_cloner.py @@ -322,9 +322,10 @@ class Cloner(AsyncJobs): this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as the driver. file types supported are directories, symbolic links and regular files. """ - def __init__(self, volume_client, tp_size, snapshot_clone_delay): + def __init__(self, volume_client, tp_size, snapshot_clone_delay, clone_no_wait): self.vc = volume_client self.snapshot_clone_delay = snapshot_clone_delay + self.snapshot_clone_no_wait = clone_no_wait self.state_table = { SubvolumeStates.STATE_PENDING : handle_clone_pending, SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress, @@ -340,6 +341,9 @@ class Cloner(AsyncJobs): def reconfigure_snapshot_clone_delay(self, timeout): self.snapshot_clone_delay = timeout + def reconfigure_reject_clones(self, clone_no_wait): + self.snapshot_clone_no_wait = clone_no_wait + def is_clone_cancelable(self, clone_state): return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state)) diff --git a/src/pybind/mgr/volumes/fs/operations/volume.py b/src/pybind/mgr/volumes/fs/operations/volume.py index 395a3fb4ea072..f93ecee19a067 100644 --- a/src/pybind/mgr/volumes/fs/operations/volume.py +++ b/src/pybind/mgr/volumes/fs/operations/volume.py @@ -9,11 +9,12 @@ from contextlib import contextmanager import orchestrator from .lock import GlobalLock -from ..exception import VolumeException +from ..exception import VolumeException, IndexException from ..fs_util import create_pool, remove_pool, rename_pool, create_filesystem, \ remove_filesystem, rename_filesystem, create_mds, volume_exists, listdir from .trash import Trash from mgr_util import open_filesystem, CephfsConnectionException +from .clone_index import open_clone_index log = logging.getLogger(__name__) @@ -260,6 +261,30 @@ def get_pending_subvol_deletions_count(fs, path): return {'pending_subvolume_deletions': num_pending_subvol_del} +def get_all_pending_clones_count(self, mgr, vol_spec): + pending_clones_cnt = 0 + index_path = "" + fs_map = mgr.get('fs_map') + for fs in fs_map['filesystems']: + volname = fs['mdsmap']['fs_name'] + try: + with open_volume(self, volname) as fs_handle: + with open_clone_index(fs_handle, vol_spec) as index: + index_path = index.path.decode('utf-8') + pending_clones_cnt = pending_clones_cnt \ + + len(listdir(fs_handle, index_path, + filter_entries=None, filter_files=False)) + except IndexException as e: + if e.errno == -errno.ENOENT: + continue + raise VolumeException(-e.args[0], e.args[1]) + except VolumeException as ve: + log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve)) + raise ve + + return pending_clones_cnt + + @contextmanager def open_volume(vc, volname): """ diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index 6772d64c3970b..1b7c5aeb1a55e 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -14,13 +14,15 @@ from .fs_util import listdir, has_subdir from .operations.group import open_group, create_group, remove_group, \ open_group_unique, set_group_attrs from .operations.volume import create_volume, delete_volume, rename_volume, \ - list_volumes, open_volume, get_pool_names, get_pool_ids, get_pending_subvol_deletions_count + list_volumes, open_volume, get_pool_names, get_pool_ids, \ + get_pending_subvol_deletions_count, get_all_pending_clones_count from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \ create_clone from .operations.trash import Trash from .vol_spec import VolSpec -from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError +from .exception import VolumeException, ClusterError, ClusterTimeout, \ + EvictionError, IndexException from .async_cloner import Cloner from .purge_queue import ThreadPoolPurgeQueueMixin from .operations.template import SubvolumeOpType @@ -55,7 +57,8 @@ class VolumeClient(CephfsClient["Module"]): super().__init__(mgr) # volume specification self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir')) - self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay) + self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay, + self.mgr.snapshot_clone_no_wait) self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4) # on startup, queue purge job for available volumes to kickstart # purge for leftover subvolume entries in trash. note that, if the @@ -766,6 +769,10 @@ class VolumeClient(CephfsClient["Module"]): s_groupname = kwargs['group_name'] try: + if self.mgr.snapshot_clone_no_wait and \ + get_all_pending_clones_count(self, self.mgr, self.volspec) >= self.mgr.max_concurrent_clones: + raise(VolumeException(-errno.EAGAIN, "all cloner threads are busy, please try again later")) + with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, s_groupname) as s_group: with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume: diff --git a/src/pybind/mgr/volumes/module.py b/src/pybind/mgr/volumes/module.py index 1d62f447b0efa..8a50baaad0c0f 100644 --- a/src/pybind/mgr/volumes/module.py +++ b/src/pybind/mgr/volumes/module.py @@ -483,8 +483,13 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): Option( 'snapshot_clone_delay', type='int', - default=0, - desc='Delay clone begin operation by snapshot_clone_delay seconds') + default=0, + desc='Delay clone begin operation by snapshot_clone_delay seconds'), + Option( + 'snapshot_clone_no_wait', + type='bool', + default=True, + desc='Reject subvolume clone request when cloner threads are busy') ] def __init__(self, *args, **kwargs): @@ -492,6 +497,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): # for mypy self.max_concurrent_clones = None self.snapshot_clone_delay = None + self.snapshot_clone_no_wait = None self.lock = threading.Lock() super(Module, self).__init__(*args, **kwargs) # Initialize config option members @@ -522,6 +528,8 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones) elif opt['name'] == "snapshot_clone_delay": self.vc.cloner.reconfigure_snapshot_clone_delay(self.snapshot_clone_delay) + elif opt['name'] == "snapshot_clone_no_wait": + self.vc.cloner.reconfigure_reject_clones(self.snapshot_clone_no_wait) def handle_command(self, inbuf, cmd): handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_") -- 2.39.5