CephFS clone creation have a limit of 4 parallel clones by default at a time and rest
of the clone create requests are queued. This makes CephFS cloning very slow when
there is large amount of clones being created.After this patch clone requests won't be accepeted
when the requests exceed the `max_concurrent_clones` config value.
Fixes: https://tracker.ceph.com/issues/59714
Signed-off-by: Neeraj Pratap Singh <neesingh@redhat.com>
this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
the driver. file types supported are directories, symbolic links and regular files.
"""
- def __init__(self, volume_client, tp_size, snapshot_clone_delay):
+ def __init__(self, volume_client, tp_size, snapshot_clone_delay, clone_no_wait):
self.vc = volume_client
self.snapshot_clone_delay = snapshot_clone_delay
+ self.snapshot_clone_no_wait = clone_no_wait
self.state_table = {
SubvolumeStates.STATE_PENDING : handle_clone_pending,
SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress,
def reconfigure_snapshot_clone_delay(self, timeout):
self.snapshot_clone_delay = timeout
+ def reconfigure_reject_clones(self, clone_no_wait):
+ self.snapshot_clone_no_wait = clone_no_wait
+
def is_clone_cancelable(self, clone_state):
return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
import orchestrator
from .lock import GlobalLock
-from ..exception import VolumeException
+from ..exception import VolumeException, IndexException
from ..fs_util import create_pool, remove_pool, rename_pool, create_filesystem, \
remove_filesystem, rename_filesystem, create_mds, volume_exists, listdir
from .trash import Trash
from mgr_util import open_filesystem, CephfsConnectionException
+from .clone_index import open_clone_index
log = logging.getLogger(__name__)
return {'pending_subvolume_deletions': num_pending_subvol_del}
+def get_all_pending_clones_count(self, mgr, vol_spec):
+ pending_clones_cnt = 0
+ index_path = ""
+ fs_map = mgr.get('fs_map')
+ for fs in fs_map['filesystems']:
+ volname = fs['mdsmap']['fs_name']
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_clone_index(fs_handle, vol_spec) as index:
+ index_path = index.path.decode('utf-8')
+ pending_clones_cnt = pending_clones_cnt \
+ + len(listdir(fs_handle, index_path,
+ filter_entries=None, filter_files=False))
+ except IndexException as e:
+ if e.errno == -errno.ENOENT:
+ continue
+ raise VolumeException(-e.args[0], e.args[1])
+ except VolumeException as ve:
+ log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve))
+ raise ve
+
+ return pending_clones_cnt
+
+
@contextmanager
def open_volume(vc, volname):
"""
from .operations.group import open_group, create_group, remove_group, \
open_group_unique, set_group_attrs
from .operations.volume import create_volume, delete_volume, rename_volume, \
- list_volumes, open_volume, get_pool_names, get_pool_ids, get_pending_subvol_deletions_count
+ list_volumes, open_volume, get_pool_names, get_pool_ids, \
+ get_pending_subvol_deletions_count, get_all_pending_clones_count
from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
create_clone
from .vol_spec import VolSpec
-from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
+from .exception import VolumeException, ClusterError, ClusterTimeout, \
+ EvictionError, IndexException
from .async_cloner import Cloner
from .purge_queue import ThreadPoolPurgeQueueMixin
from .operations.template import SubvolumeOpType
super().__init__(mgr)
# volume specification
self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
- self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
+ self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay,
+ self.mgr.snapshot_clone_no_wait)
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
# on startup, queue purge job for available volumes to kickstart
# purge for leftover subvolume entries in trash. note that, if the
s_groupname = kwargs['group_name']
try:
+ if self.mgr.snapshot_clone_no_wait and \
+ get_all_pending_clones_count(self, self.mgr, self.volspec) >= self.mgr.max_concurrent_clones:
+ raise(VolumeException(-errno.EAGAIN, "all cloner threads are busy, please try again later"))
+
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, s_groupname) as s_group:
with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
'periodic_async_work',
type='bool',
default=False,
- desc='Periodically check for async work')
+ desc='Periodically check for async work'),
+ Option(
+ 'snapshot_clone_no_wait',
+ type='bool',
+ default=True,
+ desc='Reject subvolume clone request when cloner threads are busy')
]
def __init__(self, *args, **kwargs):
self.max_concurrent_clones = None
self.snapshot_clone_delay = None
self.periodic_async_work = False
+ self.snapshot_clone_no_wait = None
self.lock = threading.Lock()
super(Module, self).__init__(*args, **kwargs)
# Initialize config option members
else:
self.vc.cloner.unset_wakeup_timeout()
self.vc.purge_queue.unset_wakeup_timeout()
+ elif opt['name'] == "snapshot_clone_no_wait":
+ self.vc.cloner.reconfigure_reject_clones(self.snapshot_clone_no_wait)
def handle_command(self, inbuf, cmd):
handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")