max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
self.assertEqual(max_concurrent_clones, 2)
+ def test_subvolume_snapshot_config_snapshot_clone_delay(self):
+ """
+ Validate 'snapshot_clone_delay' config option
+ """
+
+ # get the default delay before starting the clone
+ default_timeout = int(self.config_get('mgr', 'mgr/volumes/snapshot_clone_delay'))
+ self.assertEqual(default_timeout, 0)
+
+ # Insert delay of 2 seconds at the beginning of the snapshot clone
+ self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
+ default_timeout = int(self.config_get('mgr', 'mgr/volumes/snapshot_clone_delay'))
+ self.assertEqual(default_timeout, 2)
+
+ # Decrease number of cloner threads
+ self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2)
+ max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
+ self.assertEqual(max_concurrent_clones, 2)
+
def test_subvolume_snapshot_clone_pool_layout(self):
subvolume = self._generate_random_subvolume_name()
snapshot = self._generate_random_snapshot_name()
# ensure metadata file is in legacy location, with required version v1
self._assert_meta_location_and_version(self.volname, subvolume, version=1, legacy=True)
+ # Insert delay at the beginning of snapshot clone
+ self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
+
# schedule a clone
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+ # Insert delay at the beginning of snapshot clone
+ self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
+
# schedule a clone
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+ # Insert delay at the beginning of snapshot clone
+ self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
+
# schedule a clone
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+ # Insert delay at the beginning of snapshot clone
+ self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
+
# schedule a clone
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+ # Insert delay at the beginning of snapshot clone
+ self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
+
# schedule a clone
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
log.error("failed to detach clone from snapshot: {0}".format(e))
return (None, True)
-def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel):
+def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay):
finished = False
current_state = None
try:
current_state = get_clone_state(volume_client, volname, groupname, subvolname)
log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state))
+ if current_state == SubvolumeStates.STATE_PENDING:
+ time.sleep(snapshot_clone_delay)
+ log.info("Delayed cloning ({0}, {1}, {2}) -- by {3} seconds".format(volname, groupname, subvolname, snapshot_clone_delay))
while not finished:
handler = state_table.get(current_state, None)
if not handler:
log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\
subvolname, current_state, ve))
-def clone(volume_client, volname, index, clone_path, state_table, should_cancel):
+def clone(volume_client, volname, index, clone_path, state_table, should_cancel, snapshot_clone_delay):
log.info("cloning to subvolume path: {0}".format(clone_path))
resolved = resolve(volume_client.volspec, clone_path)
try:
log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
- start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel)
+ start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay)
log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
except VolumeException as ve:
log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve))
this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
the driver. file types supported are directories, symbolic links and regular files.
"""
- def __init__(self, volume_client, tp_size):
+ def __init__(self, volume_client, tp_size, snapshot_clone_delay):
self.vc = volume_client
+ self.snapshot_clone_delay = snapshot_clone_delay
self.state_table = {
SubvolumeStates.STATE_PENDING : handle_clone_pending,
SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress,
def reconfigure_max_concurrent_clones(self, tp_size):
super(Cloner, self).reconfigure_max_concurrent_clones("cloner", tp_size)
+ def reconfigure_snapshot_clone_delay(self, timeout):
+ self.snapshot_clone_delay = timeout
+
def is_clone_cancelable(self, clone_state):
return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
return get_next_clone_entry(self.vc, volname, running_jobs)
def execute_job(self, volname, job, should_cancel):
- clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel)
+ clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel, self.snapshot_clone_delay)
# volume specification
self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
# TODO: make thread pool size configurable
- self.cloner = Cloner(self, self.mgr.max_concurrent_clones)
+ self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
# on startup, queue purge job for available volumes to kickstart
# purge for leftover subvolume entries in trash. note that, if the
'type': 'int',
'default': 4,
'desc': 'Number of asynchronous cloner threads',
- }
+ },
+ {
+ 'name': 'snapshot_clone_delay',
+ 'type': 'int',
+ 'default':0,
+ 'desc':'Delay clone begin operation by snapshot_clone_delay seconds',
+ },
]
def __init__(self, *args, **kwargs):
self.inited = False
# for mypy
self.max_concurrent_clones = None
+ self.snapshot_clone_delay = None
self.lock = threading.Lock()
super(Module, self).__init__(*args, **kwargs)
# Initialize config option members
if self.inited:
if opt['name'] == "max_concurrent_clones":
self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones)
+ elif opt['name'] == "snapshot_clone_delay":
+ self.vc.cloner.reconfigure_snapshot_clone_delay(self.snapshot_clone_delay)
def handle_command(self, inbuf, cmd):
handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")