]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test: add subvolume clone tests
authorVenky Shankar <vshankar@redhat.com>
Mon, 2 Dec 2019 08:50:23 +0000 (03:50 -0500)
committerRamana Raja <rraja@redhat.com>
Wed, 12 Feb 2020 10:12:00 +0000 (05:12 -0500)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
(cherry picked from commit b5970ff80d9d203e6289770cbb8f51cc6a2c5938)

qa/suites/fs/basic_functional/tasks/volumes.yaml
qa/tasks/cephfs/test_volumes.py

index d4b65bae73bf6588e916810571f862db22bea854..7364fd211d48d3f6af84bdc8d14d50f5e69f6b9d 100644 (file)
@@ -1,3 +1,15 @@
+overrides:
+  ceph:
+    log-whitelist:
+      - OSD full dropping all updates
+      - OSD near full
+      - pausewr flag
+      - failsafe engaged, dropping updates
+      - failsafe disengaged, no longer dropping
+      - is full \(reached quota
+      - POOL_FULL
+      - POOL_BACKFILLFULL
+
 tasks:
   - cephfs_test_runner:
       modules:
index 54f8619a451403747f019b29e2cee3905bc54ff5..14b266e4a1770578a5562706e3627b452a20b21a 100644 (file)
@@ -1,5 +1,6 @@
 import os
 import json
+import time
 import errno
 import random
 import logging
@@ -15,6 +16,7 @@ class TestVolumes(CephFSTestCase):
     TEST_SUBVOLUME_PREFIX="subvolume"
     TEST_GROUP_PREFIX="group"
     TEST_SNAPSHOT_PREFIX="snapshot"
+    TEST_CLONE_PREFIX="clone"
     TEST_FILE_NAME_PREFIX="subvolume_file"
 
     # for filling subvolume with data
@@ -27,6 +29,52 @@ class TestVolumes(CephFSTestCase):
     def _fs_cmd(self, *args):
         return self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", *args)
 
+    def __check_clone_state(self, state, clone, clone_group=None, timo=120):
+        check = 0
+        args = ["clone", "status", self.volname, clone]
+        if clone_group:
+            args.append(clone_group)
+        args = tuple(args)
+        while check < timo:
+            result = json.loads(self._fs_cmd(*args))
+            if result["status"]["state"] == state:
+                break
+            check += 1
+            time.sleep(1)
+        self.assertTrue(check < timo)
+
+    def _wait_for_clone_to_complete(self, clone, clone_group=None, timo=120):
+        self.__check_clone_state("complete", clone, clone_group, timo)
+
+    def _wait_for_clone_to_fail(self, clone, clone_group=None, timo=120):
+        self.__check_clone_state("failed", clone, clone_group, timo)
+
+    def _verify_clone(self, subvolume, clone, source_group=None, clone_group=None, timo=120):
+        path1 = self._get_subvolume_path(self.volname, subvolume, group_name=source_group)
+        path2 = self._get_subvolume_path(self.volname, clone, group_name=clone_group)
+
+        s_uid = int(self.mount_a.run_shell(['stat', '-c' '%u', path1]).stdout.getvalue().strip())
+        c_uid = int(self.mount_a.run_shell(['stat', '-c' '%u', path2]).stdout.getvalue().strip())
+        self.assertEqual(s_uid, c_uid)
+
+        s_gid = int(self.mount_a.run_shell(['stat', '-c' '%g', path1]).stdout.getvalue().strip())
+        c_gid = int(self.mount_a.run_shell(['stat', '-c' '%g', path2]).stdout.getvalue().strip())
+        self.assertEqual(s_gid, c_gid)
+
+        s_mode = int(self.mount_a.run_shell(['stat', '-c' '%a', path1]).stdout.getvalue().strip())
+        c_mode = int(self.mount_a.run_shell(['stat', '-c' '%a', path2]).stdout.getvalue().strip())
+        self.assertEqual(s_mode, c_mode)
+
+        check = 0
+        while check < timo:
+            val1 = int(self.mount_a.getfattr(path1, "ceph.dir.rentries"))
+            val2 = int(self.mount_a.getfattr(path2, "ceph.dir.rentries"))
+            if val1 == val2:
+                break
+            check += 1
+            time.sleep(1)
+        self.assertTrue(check < timo)
+
     def _generate_random_volume_name(self, count=1):
         r = random.sample(range(10000), count)
         volumes = ["{0}_{1}".format(TestVolumes.TEST_VOLUME_PREFIX, c) for c in r]
@@ -47,6 +95,11 @@ class TestVolumes(CephFSTestCase):
         snaps = ["{0}_{1}".format(TestVolumes.TEST_SNAPSHOT_PREFIX, c) for c in r]
         return snaps[0] if count == 1 else snaps
 
+    def _generate_random_clone_name(self, count=1):
+        r = random.sample(range(1000), count)
+        clones = ["{0}_{1}".format(TestVolumes.TEST_CLONE_PREFIX, c) for c in r]
+        return clones[0] if count == 1 else clones
+
     def _enable_multi_fs(self):
         self._fs_cmd("flag", "set", "enable_multiple", "true", "--yes-i-really-mean-it")
 
@@ -77,17 +130,26 @@ class TestVolumes(CephFSTestCase):
     def _delete_test_volume(self):
         self._fs_cmd("volume", "rm", self.volname, "--yes-i-really-mean-it")
 
-    def _do_subvolume_io(self, subvolume, number_of_files=DEFAULT_NUMBER_OF_FILES,
-                         file_size=DEFAULT_FILE_SIZE):
+    def _do_subvolume_io(self, subvolume, subvolume_group=None, create_dir=None,
+                         number_of_files=DEFAULT_NUMBER_OF_FILES, file_size=DEFAULT_FILE_SIZE):
         # get subvolume path for IO
-        subvolpath = self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
+        args = ["subvolume", "getpath", self.volname, subvolume]
+        if subvolume_group:
+            args.append(subvolume_group)
+        args = tuple(args)
+        subvolpath = self._fs_cmd(*args)
         self.assertNotEqual(subvolpath, None)
         subvolpath = subvolpath[1:].rstrip() # remove "/" prefix and any trailing newline
 
-        log.debug("filling subvolume {0} with {1} files each {2}MB size".format(subvolume, number_of_files, file_size))
+        io_path = subvolpath
+        if create_dir:
+            io_path = os.path.join(subvolpath, create_dir)
+            self.mount_a.run_shell(["mkdir", "-p", io_path])
+
+        log.debug("filling subvolume {0} with {1} files each {2}MB size under directory {3}".format(subvolume, number_of_files, file_size, io_path))
         for i in range(number_of_files):
             filename = "{0}.{1}".format(TestVolumes.TEST_FILE_NAME_PREFIX, i)
-            self.mount_a.write_n_mb(os.path.join(subvolpath, filename), file_size)
+            self.mount_a.write_n_mb(os.path.join(io_path, filename), file_size)
 
     def _wait_for_trash_empty(self, timeout=30):
         # XXX: construct the trash dir path (note that there is no mgr
@@ -1245,3 +1307,723 @@ class TestVolumes(CephFSTestCase):
 
         # verify trash dir is clean
         self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_protect_unprotect(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # protect a nonexistent snapshot
+        try:
+            self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.ENOENT:
+                raise RuntimeError("invalid error code when protecting a non-existing snapshot")
+        else:
+            raise RuntimeError("expected protection of non existent snapshot to fail")
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # now, protect snapshot
+        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
+
+        # protecting snapshot again, should return EEXIST
+        try:
+            self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.EEXIST:
+                raise RuntimeError("invalid error code when protecting a protected snapshot")
+        else:
+            raise RuntimeError("expected protection of already protected snapshot to fail")
+
+        # remove snapshot should fail since the snapshot is protected
+        try:
+            self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.EINVAL:
+                raise RuntimeError("invalid error code when removing a protected snapshot")
+        else:
+            raise RuntimeError("expected removal of protected snapshot to fail")
+
+        # now, unprotect snapshot
+        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_clone_unprotected_snapshot(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # clone a non protected snapshot
+        try:
+            self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.EINVAL:
+                raise RuntimeError("invalid error code when cloning a non protected snapshot")
+        else:
+            raise RuntimeError("expected cloning of unprotected snapshot to fail")
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_clone(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=64)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # now, protect snapshot
+        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
+
+        # schedule a clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+        # unprotecting when a clone is in progress should fail
+        try:
+            self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.EEXIST:
+                raise RuntimeError("invalid error code when unprotecting snapshot during clone")
+        else:
+            raise RuntimeError("expected unprotecting a snapshot to fail since it has pending clones")
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
+        # now, unprotect snapshot
+        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # verify clone
+        self._verify_clone(subvolume, clone)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_clone_pool_layout(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+
+        # add data pool
+        new_pool = "new_pool"
+        self.fs.add_data_pool(new_pool)
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=32)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # now, protect snapshot
+        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
+
+        # schedule a clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone, "--pool_layout", new_pool)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
+        # now, unprotect snapshot
+        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # verify clone
+        self._verify_clone(subvolume, clone)
+
+        subvol_path = self._get_subvolume_path(self.volname, clone)
+        desired_pool = self.mount_a.getfattr(subvol_path, "ceph.dir.layout.pool")
+        self.assertEqual(desired_pool, new_pool)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_clone_with_attrs(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+
+        mode = "777"
+        uid  = "1000"
+        gid  = "1000"
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode", mode, "--uid", uid, "--gid", gid)
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=32)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # now, protect snapshot
+        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
+
+        # schedule a clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
+        # now, unprotect snapshot
+        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # verify clone
+        self._verify_clone(subvolume, clone)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_clone_and_reclone(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone1, clone2 = self._generate_random_clone_name(2)
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=32)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # now, protect snapshot
+        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
+
+        # schedule a clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone1)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone1)
+
+        # now, unprotect snapshot
+        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # verify clone
+        self._verify_clone(subvolume, clone1)
+
+        # now the clone is just like a normal subvolume -- snapshot the clone and fork
+        # another clone. before that do some IO so it's can be differentiated.
+        self._do_subvolume_io(clone1, create_dir="data", number_of_files=32)
+
+        # snapshot clone -- use same snap name
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, clone1, snapshot)
+
+        # now, protect snapshot
+        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, clone1, snapshot)
+
+        # schedule a clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, clone1, snapshot, clone2)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone2)
+
+        # now, unprotect snapshot
+        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, clone1, snapshot)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, clone1, snapshot)
+
+        # verify clone
+        self._verify_clone(clone1, clone2)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone1)
+        self._fs_cmd("subvolume", "rm", self.volname, clone2)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_clone_under_group(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+        group = self._generate_random_group_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=32)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # now, protect snapshot
+        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
+
+        # create group
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # schedule a clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone, '--target_group_name', group)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone, clone_group=group)
+
+        # now, unprotect snapshot
+        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # verify clone
+        self._verify_clone(subvolume, clone, clone_group=group)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone, group)
+
+        # remove group
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_under_group_snapshot_clone(self):
+        subvolume = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+
+        # create group
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, group)
+
+        # do some IO
+        self._do_subvolume_io(subvolume, subvolume_group=group, number_of_files=32)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot, group)
+
+        # now, protect snapshot
+        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot, group)
+
+        # schedule a clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone, '--group_name', group)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
+        # now, unprotect snapshot
+        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot, group)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, group)
+
+        # verify clone
+        self._verify_clone(subvolume, clone, source_group=group)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+        self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+        # remove group
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_clone_different_groups(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+        s_group, c_group = self._generate_random_group_name(2)
+
+        # create groups
+        self._fs_cmd("subvolumegroup", "create", self.volname, s_group)
+        self._fs_cmd("subvolumegroup", "create", self.volname, c_group)
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, s_group)
+
+        # do some IO
+        self._do_subvolume_io(subvolume, subvolume_group=s_group, number_of_files=32)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot, s_group)
+
+        # now, protect snapshot
+        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot, s_group)
+
+        # schedule a clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone,
+                     '--group_name', s_group, '--target_group_name', c_group)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone, clone_group=c_group)
+
+        # now, unprotect snapshot
+        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot, s_group)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, s_group)
+
+        # verify clone
+        self._verify_clone(subvolume, clone, source_group=s_group, clone_group=c_group)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, s_group)
+        self._fs_cmd("subvolume", "rm", self.volname, clone, c_group)
+
+        # remove groups
+        self._fs_cmd("subvolumegroup", "rm", self.volname, s_group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, c_group)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_clone_with_upgrade(self):
+        """
+        yet another poor man's upgrade test -- rather than going through a full
+        upgrade cycle, emulate old types subvolumes by going through the wormhole
+        and verify clone operation.
+        """
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+
+        # emulate a old-fashioned subvolume
+        createpath = os.path.join(".", "volumes", "_nogroup", subvolume)
+        self.mount_a.run_shell(['mkdir', '-p', createpath])
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=32)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # now, protect snapshot
+        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
+
+        # schedule a clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
+        # now, unprotect snapshot
+        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # verify clone
+        self._verify_clone(subvolume, clone)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_clone_in_progress_getpath(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=64)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # now, protect snapshot
+        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
+
+        # schedule a clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+        # clone should not be accessible right now
+        try:
+            self._get_subvolume_path(self.volname, clone)
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.EAGAIN:
+                raise RuntimeError("invalid error code when cloning a non protected snapshot")
+        else:
+            raise RuntimeError("expected fetching path of an pending clone to fail")
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
+        # clone should be accessible now
+        subvolpath = self._get_subvolume_path(self.volname, clone)
+        self.assertNotEqual(subvolpath, None)
+
+        # now, unprotect snapshot
+        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # verify clone
+        self._verify_clone(subvolume, clone)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_clone_in_progress_source(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=64)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # now, protect snapshot
+        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
+
+        # schedule a clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+        # verify clone source
+        result = json.loads(self._fs_cmd("clone", "status", self.volname, clone))
+        source = result['status']['source']
+        self.assertEqual(source['volume'], self.volname)
+        self.assertEqual(source['subvolume'], subvolume)
+        self.assertEqual(source.get('group', None), None)
+        self.assertEqual(source['snapshot'], snapshot)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
+        # clone should be accessible now
+        subvolpath = self._get_subvolume_path(self.volname, clone)
+        self.assertNotEqual(subvolpath, None)
+
+        # now, unprotect snapshot
+        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # verify clone
+        self._verify_clone(subvolume, clone)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_non_clone_status(self):
+        subvolume = self._generate_random_subvolume_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        try:
+            self._fs_cmd("clone", "status", self.volname, subvolume)
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.ENOTSUP:
+                raise RuntimeError("invalid error code when fetching status of a non cloned subvolume")
+        else:
+            raise RuntimeError("expected fetching of clone status of a subvolume to fail")
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_clone_on_existing_subvolumes(self):
+        subvolume1, subvolume2 = self._generate_random_subvolume_name(2)
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+
+        # create subvolumes
+        self._fs_cmd("subvolume", "create", self.volname, subvolume1)
+        self._fs_cmd("subvolume", "create", self.volname, subvolume2)
+
+        # do some IO
+        self._do_subvolume_io(subvolume1, number_of_files=32)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume1, snapshot)
+
+        # now, protect snapshot
+        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume1, snapshot)
+
+        # schedule a clone with target as subvolume2
+        try:
+            self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume1, snapshot, subvolume2)
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.EEXIST:
+                raise RuntimeError("invalid error code when cloning to existing subvolume")
+        else:
+            raise RuntimeError("expected cloning to fail if the target is an existing subvolume")
+
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume1, snapshot, clone)
+
+        # schedule a clone with target as clone
+        try:
+            self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume1, snapshot, clone)
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.EEXIST:
+                raise RuntimeError("invalid error code when cloning to existing clone")
+        else:
+            raise RuntimeError("expected cloning to fail if the target is an existing clone")
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
+        # now, unprotect snapshot
+        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume1, snapshot)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume1, snapshot)
+
+        # verify clone
+        self._verify_clone(subvolume1, clone)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume1)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume2)
+        self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_clone_fail_with_remove(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone1, clone2 = self._generate_random_clone_name(2)
+
+        pool_capacity = 32 * 1024 * 1024
+        # number of files required to fill up 99% of the pool
+        nr_files = int((pool_capacity * 0.99) / (TestVolumes.DEFAULT_FILE_SIZE * 1024 * 1024))
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=nr_files)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # now, protect snapshot
+        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
+
+        # add data pool
+        new_pool = "new_pool"
+        self.fs.add_data_pool(new_pool)
+
+        self.fs.mon_manager.raw_cluster_cmd("osd", "pool", "set-quota", new_pool,
+                                            "max_bytes", "{0}".format(pool_capacity / 4))
+
+        # schedule a clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone1, "--pool_layout", new_pool)
+
+        # check clone status -- this should dramatically overshoot the pool quota
+        self._wait_for_clone_to_complete(clone1)
+
+        # verify clone
+        self._verify_clone(subvolume, clone1)
+
+        # wait a bit so that subsequent I/O will give pool full error
+        time.sleep(120)
+
+        # schedule a clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone2, "--pool_layout", new_pool)
+
+        # check clone status
+        self._wait_for_clone_to_fail(clone2)
+
+        # now, unprotect snapshot
+        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone1)
+        try:
+            self._fs_cmd("subvolume", "rm", self.volname, clone2)
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.EAGAIN:
+                raise RuntimeError("invalid error code when trying to remove failed clone")
+        else:
+            raise RuntimeError("expected error when removing a failed clone")
+
+        #  ... and with force, failed clone can be removed
+        self._fs_cmd("subvolume", "rm", self.volname, clone2, "--force")
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()