this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
the driver. file types supported are directories, symbolic links and regular files.
"""
- def __init__(self, volume_client, tp_size, snapshot_clone_delay):
+ def __init__(self, volume_client, tp_size, snapshot_clone_delay, clone_no_wait):
self.vc = volume_client
self.snapshot_clone_delay = snapshot_clone_delay
+ self.snapshot_clone_no_wait = clone_no_wait
self.state_table = {
SubvolumeStates.STATE_PENDING : handle_clone_pending,
SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress,
def reconfigure_snapshot_clone_delay(self, timeout):
self.snapshot_clone_delay = timeout
+ def reconfigure_reject_clones(self, clone_no_wait):
+ self.snapshot_clone_no_wait = clone_no_wait
+
def is_clone_cancelable(self, clone_state):
return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
import orchestrator
from .lock import GlobalLock
-from ..exception import VolumeException
+from ..exception import VolumeException, IndexException
from ..fs_util import create_pool, remove_pool, rename_pool, create_filesystem, \
remove_filesystem, rename_filesystem, create_mds, volume_exists, listdir
from .trash import Trash
from mgr_util import open_filesystem, CephfsConnectionException
+from .clone_index import open_clone_index
log = logging.getLogger(__name__)
return {'pending_subvolume_deletions': num_pending_subvol_del}
+def get_all_pending_clones_count(self, mgr, vol_spec):
+ pending_clones_cnt = 0
+ index_path = ""
+ fs_map = mgr.get('fs_map')
+ for fs in fs_map['filesystems']:
+ volname = fs['mdsmap']['fs_name']
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_clone_index(fs_handle, vol_spec) as index:
+ index_path = index.path.decode('utf-8')
+ pending_clones_cnt = pending_clones_cnt \
+ + len(listdir(fs_handle, index_path,
+ filter_entries=None, filter_files=False))
+ except IndexException as e:
+ if e.errno == -errno.ENOENT:
+ continue
+ raise VolumeException(-e.args[0], e.args[1])
+ except VolumeException as ve:
+ log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve))
+ raise ve
+
+ return pending_clones_cnt
+
+
@contextmanager
def open_volume(vc, volname):
"""
from .operations.group import open_group, create_group, remove_group, \
open_group_unique, set_group_attrs
from .operations.volume import create_volume, delete_volume, rename_volume, \
- list_volumes, open_volume, get_pool_names, get_pool_ids, get_pending_subvol_deletions_count
+ list_volumes, open_volume, get_pool_names, get_pool_ids, \
+ get_pending_subvol_deletions_count, get_all_pending_clones_count
from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
create_clone
from .operations.trash import Trash
from .vol_spec import VolSpec
-from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
+from .exception import VolumeException, ClusterError, ClusterTimeout, \
+ EvictionError, IndexException
from .async_cloner import Cloner
from .purge_queue import ThreadPoolPurgeQueueMixin
from .operations.template import SubvolumeOpType
super().__init__(mgr)
# volume specification
self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
- self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
+ self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay,
+ self.mgr.snapshot_clone_no_wait)
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
# on startup, queue purge job for available volumes to kickstart
# purge for leftover subvolume entries in trash. note that, if the
s_groupname = kwargs['group_name']
try:
+ if self.mgr.snapshot_clone_no_wait and \
+ get_all_pending_clones_count(self, self.mgr, self.volspec) >= self.mgr.max_concurrent_clones:
+ raise(VolumeException(-errno.EAGAIN, "all cloner threads are busy, please try again later"))
+
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, s_groupname) as s_group:
with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
Option(
'snapshot_clone_delay',
type='int',
- default=0,
- desc='Delay clone begin operation by snapshot_clone_delay seconds')
+ default=0,
+ desc='Delay clone begin operation by snapshot_clone_delay seconds'),
+ Option(
+ 'snapshot_clone_no_wait',
+ type='bool',
+ default=True,
+ desc='Reject subvolume clone request when cloner threads are busy')
]
def __init__(self, *args, **kwargs):
# for mypy
self.max_concurrent_clones = None
self.snapshot_clone_delay = None
+ self.snapshot_clone_no_wait = None
self.lock = threading.Lock()
super(Module, self).__init__(*args, **kwargs)
# Initialize config option members
self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones)
elif opt['name'] == "snapshot_clone_delay":
self.vc.cloner.reconfigure_snapshot_clone_delay(self.snapshot_clone_delay)
+ elif opt['name'] == "snapshot_clone_no_wait":
+ self.vc.cloner.reconfigure_reject_clones(self.snapshot_clone_no_wait)
def handle_command(self, inbuf, cmd):
handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")