]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes: Add config to insert delay at the beginning of the clone
authorKotresh HR <khiremat@redhat.com>
Mon, 28 Feb 2022 10:53:39 +0000 (16:23 +0530)
committerKotresh HR <khiremat@redhat.com>
Mon, 28 Feb 2022 11:55:50 +0000 (17:25 +0530)
Added the config 'delay_snapshot_clone' to insert delay at the beginning
of the clone to avoid races in tests. The default value is set to 0.

Fixes: https://tracker.ceph.com/issues/48231
Signed-off-by: Kotresh HR <khiremat@redhat.com>
(cherry picked from commit 7588f985054282d2cff7f3582e995584b1fd20f8)

Conflicts:
 qa/tasks/cephfs/test_volumes.py: Conflicts due to tests ordering
 src/pybind/mgr/volumes/fs/volume.py: The commit e308bf898955 is not
backported
 src/pybind/mgr/volumes/module.py: The commit f002c6ce4033 is not
backported

qa/tasks/cephfs/test_volumes.py
src/pybind/mgr/volumes/fs/async_cloner.py
src/pybind/mgr/volumes/fs/volume.py
src/pybind/mgr/volumes/module.py

index 13eb2e641f21ef6818b6f2f7ac9ef2a4490c26ee..b2258c2a355d64b2310174b32e0a1e3f9105e5ca 100644 (file)
@@ -3788,6 +3788,25 @@ class TestVolumes(CephFSTestCase):
         max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
         self.assertEqual(max_concurrent_clones, 2)
 
+    def test_subvolume_snapshot_config_snapshot_clone_delay(self):
+        """
+        Validate 'snapshot_clone_delay' config option
+        """
+
+        # get the default delay before starting the clone
+        default_timeout = int(self.config_get('mgr', 'mgr/volumes/snapshot_clone_delay'))
+        self.assertEqual(default_timeout, 0)
+
+        # Insert delay of 2 seconds at the beginning of the snapshot clone
+        self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
+        default_timeout = int(self.config_get('mgr', 'mgr/volumes/snapshot_clone_delay'))
+        self.assertEqual(default_timeout, 2)
+
+        # Decrease number of cloner threads
+        self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2)
+        max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
+        self.assertEqual(max_concurrent_clones, 2)
+
     def test_subvolume_snapshot_clone_pool_layout(self):
         subvolume = self._generate_random_subvolume_name()
         snapshot = self._generate_random_snapshot_name()
@@ -4120,6 +4139,9 @@ class TestVolumes(CephFSTestCase):
         # ensure metadata file is in legacy location, with required version v1
         self._assert_meta_location_and_version(self.volname, subvolume, version=1, legacy=True)
 
+        # Insert delay at the beginning of snapshot clone
+        self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
+
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
@@ -4164,6 +4186,9 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
+        # Insert delay at the beginning of snapshot clone
+        self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
+
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
@@ -4210,6 +4235,9 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
+        # Insert delay at the beginning of snapshot clone
+        self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
+
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
@@ -4255,6 +4283,9 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
+        # Insert delay at the beginning of snapshot clone
+        self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
+
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
@@ -4468,6 +4499,9 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
+        # Insert delay at the beginning of snapshot clone
+        self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
+
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
index d08b7846512938d7443c20e7ef60dfdf32a597da..30f0af6183647fa999631e1f6fec4f5d8a3f548f 100644 (file)
@@ -223,12 +223,15 @@ def handle_clone_complete(volume_client, volname, index, groupname, subvolname,
         log.error("failed to detach clone from snapshot: {0}".format(e))
     return (None, True)
 
-def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel):
+def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay):
     finished = False
     current_state = None
     try:
         current_state = get_clone_state(volume_client, volname, groupname, subvolname)
         log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state))
+        if current_state == SubvolumeStates.STATE_PENDING:
+            time.sleep(snapshot_clone_delay)
+            log.info("Delayed cloning ({0}, {1}, {2}) -- by {3} seconds".format(volname, groupname, subvolname, snapshot_clone_delay))
         while not finished:
             handler = state_table.get(current_state, None)
             if not handler:
@@ -243,7 +246,7 @@ def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_t
         log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\
                                                                                              subvolname, current_state, ve))
 
-def clone(volume_client, volname, index, clone_path, state_table, should_cancel):
+def clone(volume_client, volname, index, clone_path, state_table, should_cancel, snapshot_clone_delay):
     log.info("cloning to subvolume path: {0}".format(clone_path))
     resolved = resolve(volume_client.volspec, clone_path)
 
@@ -253,7 +256,7 @@ def clone(volume_client, volname, index, clone_path, state_table, should_cancel)
 
     try:
         log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
-        start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel)
+        start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay)
         log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
     except VolumeException as ve:
         log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve))
@@ -264,8 +267,9 @@ class Cloner(AsyncJobs):
     this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
     the driver. file types supported are directories, symbolic links and regular files.
     """
-    def __init__(self, volume_client, tp_size):
+    def __init__(self, volume_client, tp_size, snapshot_clone_delay):
         self.vc = volume_client
+        self.snapshot_clone_delay = snapshot_clone_delay
         self.state_table = {
             SubvolumeStates.STATE_PENDING      : handle_clone_pending,
             SubvolumeStates.STATE_INPROGRESS   : handle_clone_in_progress,
@@ -278,6 +282,9 @@ class Cloner(AsyncJobs):
     def reconfigure_max_concurrent_clones(self, tp_size):
         super(Cloner, self).reconfigure_max_concurrent_clones("cloner", tp_size)
 
+    def reconfigure_snapshot_clone_delay(self, timeout):
+        self.snapshot_clone_delay = timeout
+
     def is_clone_cancelable(self, clone_state):
         return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
 
@@ -343,4 +350,4 @@ class Cloner(AsyncJobs):
         return get_next_clone_entry(self.vc, volname, running_jobs)
 
     def execute_job(self, volname, job, should_cancel):
-        clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel)
+        clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel, self.snapshot_clone_delay)
index 039d2ee1844fb51269ddadb63594c95e669dc5ee..95663569fcb37b579f4bb020edf1c3167165e0cf 100644 (file)
@@ -46,7 +46,7 @@ class VolumeClient(CephfsClient):
         # volume specification
         self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
         # TODO: make thread pool size configurable
-        self.cloner = Cloner(self, self.mgr.max_concurrent_clones)
+        self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
         self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
         # on startup, queue purge job for available volumes to kickstart
         # purge for leftover subvolume entries in trash. note that, if the
index d6e118747f4ef0f8ea79efb53c5fac70aae615d2..67450665f041a570b93626c4b992455b7536b057 100644 (file)
@@ -418,13 +418,20 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
             'type': 'int',
             'default': 4,
             'desc': 'Number of asynchronous cloner threads',
-        }
+        },
+        {
+            'name': 'snapshot_clone_delay',
+            'type': 'int',
+            'default':0,
+            'desc':'Delay clone begin operation by snapshot_clone_delay seconds',
+        },
     ]
 
     def __init__(self, *args, **kwargs):
         self.inited = False
         # for mypy
         self.max_concurrent_clones = None
+        self.snapshot_clone_delay = None
         self.lock = threading.Lock()
         super(Module, self).__init__(*args, **kwargs)
         # Initialize config option members
@@ -455,6 +462,8 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
                 if self.inited:
                     if opt['name'] == "max_concurrent_clones":
                         self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones)
+                    elif opt['name'] == "snapshot_clone_delay":
+                        self.vc.cloner.reconfigure_snapshot_clone_delay(self.snapshot_clone_delay)
 
     def handle_command(self, inbuf, cmd):
         handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")