# verify trash dir is clean
self._wait_for_trash_empty()
+ def test_subvolume_snapshot_info_without_snapshot_clone(self):
+ """
+ Verify subvolume snapshot info output without clonnnig snapshot.
+ If no clone is performed then path /volumes/_index/clone/{track_id}
+ will not exist.
+ """
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create subvolume.
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # list snapshot info
+ result = json.loads(self._fs_cmd("subvolume", "snapshot", "info", self.volname, subvolume, snapshot))
+
+ # verify snapshot info
+ self.assertEqual(result['has_pending_clones'], "no")
+ self.assertFalse('orphan_clones_count' in result)
+ self.assertFalse('pending_clones' in result)
+
+ # remove snapshot, subvolume, clone
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_snapshot_info_if_no_clone_pending(self):
+ """
+ Verify subvolume snapshot info output if no clone is in pending state.
+ """
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+ clone_list = [f'clone_{i}' for i in range(3)]
+
+ # create subvolume.
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # schedule a clones
+ for clone in clone_list:
+ self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+ # check clones status
+ for clone in clone_list:
+ self._wait_for_clone_to_complete(clone)
+
+ # list snapshot info
+ result = json.loads(self._fs_cmd("subvolume", "snapshot", "info", self.volname, subvolume, snapshot))
+
+ # verify snapshot info
+ self.assertEqual(result['has_pending_clones'], "no")
+ self.assertFalse('orphan_clones_count' in result)
+ self.assertFalse('pending_clones' in result)
+
+ # remove snapshot, subvolume, clone
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ for clone in clone_list:
+ self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_snapshot_info_if_clone_pending_for_no_group(self):
+ """
+ Verify subvolume snapshot info output if clones are in pending state.
+ Clones are not specified for particular target_group. Hence target_group
+ should not be in the output as we don't show _nogroup (default group)
+ """
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+ clone_list = [f'clone_{i}' for i in range(3)]
+
+ # create subvolume.
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # insert delay at the beginning of snapshot clone
+ self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 5)
+
+ # schedule a clones
+ for clone in clone_list:
+ self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+ # list snapshot info
+ result = json.loads(self._fs_cmd("subvolume", "snapshot", "info", self.volname, subvolume, snapshot))
+
+ # verify snapshot info
+ expected_clone_list = []
+ for clone in clone_list:
+ expected_clone_list.append({"name": clone})
+ self.assertEqual(result['has_pending_clones'], "yes")
+ self.assertFalse('orphan_clones_count' in result)
+ self.assertListEqual(result['pending_clones'], expected_clone_list)
+ self.assertEqual(len(result['pending_clones']), 3)
+
+ # check clones status
+ for clone in clone_list:
+ self._wait_for_clone_to_complete(clone)
+
+ # remove snapshot, subvolume, clone
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ for clone in clone_list:
+ self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_snapshot_info_if_clone_pending_for_target_group(self):
+ """
+ Verify subvolume snapshot info output if clones are in pending state.
+ Clones are not specified for target_group.
+ """
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+ clone = self._generate_random_clone_name()
+ group = self._generate_random_group_name()
+ target_group = self._generate_random_group_name()
+
+ # create groups
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+ self._fs_cmd("subvolumegroup", "create", self.volname, target_group)
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, group, "--mode=777")
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot, group)
+
+ # insert delay at the beginning of snapshot clone
+ self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 5)
+
+ # schedule a clone
+ self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone,
+ "--group_name", group, "--target_group_name", target_group)
+
+ # list snapshot info
+ result = json.loads(self._fs_cmd("subvolume", "snapshot", "info", self.volname, subvolume, snapshot, "--group_name", group))
+
+ # verify snapshot info
+ expected_clone_list = [{"name": clone, "target_group": target_group}]
+ self.assertEqual(result['has_pending_clones'], "yes")
+ self.assertFalse('orphan_clones_count' in result)
+ self.assertListEqual(result['pending_clones'], expected_clone_list)
+ self.assertEqual(len(result['pending_clones']), 1)
+
+ # check clone status
+ self._wait_for_clone_to_complete(clone, clone_group=target_group)
+
+ # remove snapshot
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, group)
+
+ # remove subvolumes
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+ self._fs_cmd("subvolume", "rm", self.volname, clone, target_group)
+
+ # remove groups
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, target_group)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_snapshot_info_if_orphan_clone(self):
+ """
+ Verify subvolume snapshot info output if orphan clones exists.
+ Orphan clones should not list under pending clones.
+ orphan_clones_count should display correct count of orphan clones'
+ """
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+ clone_list = [f'clone_{i}' for i in range(3)]
+
+ # create subvolume.
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # insert delay at the beginning of snapshot clone
+ self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 5)
+
+ # schedule a clones
+ for clone in clone_list:
+ self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+ # remove track file for third clone to make it orphan
+ meta_path = os.path.join(".", "volumes", "_nogroup", subvolume, ".meta")
+ pending_clones_result = self.mount_a.run_shell(f"sudo grep \"clone snaps\" -A3 {meta_path}", omit_sudo=False, stdout=StringIO(), stderr=StringIO())
+ third_clone_track_id = pending_clones_result.stdout.getvalue().splitlines()[3].split(" = ")[0]
+ third_clone_track_path = os.path.join(".", "volumes", "_index", "clone", third_clone_track_id)
+ self.mount_a.run_shell(f"sudo rm -f {third_clone_track_path}", omit_sudo=False)
+
+ # list snapshot info
+ result = json.loads(self._fs_cmd("subvolume", "snapshot", "info", self.volname, subvolume, snapshot))
+
+ # verify snapshot info
+ expected_clone_list = []
+ for i in range(len(clone_list)-1):
+ expected_clone_list.append({"name": clone_list[i]})
+ self.assertEqual(result['has_pending_clones'], "yes")
+ self.assertEqual(result['orphan_clones_count'], 1)
+ self.assertListEqual(result['pending_clones'], expected_clone_list)
+ self.assertEqual(len(result['pending_clones']), 2)
+
+ # check clones status
+ for i in range(len(clone_list)-1):
+ self._wait_for_clone_to_complete(clone_list[i])
+
+ # list snapshot info after cloning completion
+ res = json.loads(self._fs_cmd("subvolume", "snapshot", "info", self.volname, subvolume, snapshot))
+
+ # verify snapshot info (has_pending_clones should be no)
+ self.assertEqual(res['has_pending_clones'], "no")
+
def test_non_clone_status(self):
subvolume = self._generate_random_subvolume_name()