]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes: Use snapshot root directory attrs when creating clone root
authorShyamsundar Ranganathan <srangana@redhat.com>
Tue, 23 Jun 2020 23:57:52 +0000 (19:57 -0400)
committerShyamsundar Ranganathan <srangana@redhat.com>
Thu, 27 Aug 2020 19:42:58 +0000 (15:42 -0400)
If a subvolumes mode or uid/gid values are changed post a snapshot,
and a clone of a snapshot prior to the change is initiated, the clone
inherits the current source subvolumes attributes, rather than the
snapshots attributes.

Fixing this by using the snapshots subvolume root attributes to create
the clone subvolumes root.

Following attributes are picked from the source subvolume snapshot:
- uid, gid, mode, data pool, pool namespace, quota

Fixes: https://tracker.ceph.com/issues/46163
Signed-off-by: Shyamsundar Ranganathan <srangana@redhat.com>
(cherry picked from commit 0eaf9b0bd2786762160b83edc52d979f5aad233c)

qa/tasks/cephfs/test_volumes.py
src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py
src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py
src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py
src/pybind/mgr/volumes/fs/volume.py

index 409fa80bd49c61720f3df4d9a94aa1bfba6626bc..1a794eb05c0d5f813facb6a7b6c00d6305a4c0bf 100644 (file)
@@ -58,16 +58,22 @@ class TestVolumes(CephFSTestCase):
     def _check_clone_canceled(self, clone, clone_group=None):
         self.__check_clone_state("canceled", clone, clone_group, timo=1)
 
-    def _get_subvolume_snapshot_path(self, snapshot, subvol_path):
-        (base_path, uuid_str) = os.path.split(subvol_path)
-        return os.path.join(base_path, ".snap", snapshot, uuid_str)
+    def _get_subvolume_snapshot_path(self, subvolume, snapshot, source_group, subvol_path, source_version):
+        if source_version == 2:
+            # v2
+            if subvol_path is not None:
+                (base_path, uuid_str) = os.path.split(subvol_path)
+            else:
+                (base_path, uuid_str) = os.path.split(self._get_subvolume_path(self.volname, subvolume, group_name=source_group))
+            return os.path.join(base_path, ".snap", snapshot, uuid_str)
 
-    def _verify_clone_attrs(self, subvolume, clone, source_group=None, clone_group=None, snapshot=None, subvol_path=None):
-        if snapshot and subvol_path:
-            path1 = self._get_subvolume_snapshot_path(snapshot, subvol_path)
-        else:
-            path1 = self._get_subvolume_path(self.volname, subvolume, group_name=source_group)
-        path2 = self._get_subvolume_path(self.volname, clone, group_name=clone_group)
+        # v1
+        base_path = self._get_subvolume_path(self.volname, subvolume, group_name=source_group)
+        return os.path.join(base_path, ".snap", snapshot)
+
+    def _verify_clone_attrs(self, source_path, clone_path):
+        path1 = source_path
+        path2 = clone_path
 
         p = self.mount_a.run_shell(["find", path1])
         paths = p.stdout.getvalue().strip().split()
@@ -101,17 +107,37 @@ class TestVolumes(CephFSTestCase):
             cval = int(self.mount_a.run_shell(['stat', '-c' '%Y', sink_path]).stdout.getvalue().strip())
             self.assertEqual(sval, cval)
 
-    def _verify_clone(self, subvolume, clone, source_group=None, clone_group=None, snapshot=None, subvol_path=None, timo=120):
-        # pass in snapshot and subvol_path (subvolume path when snapshot was taken) when subvolume is removed
-        # but snapshots are retained for clone verification
-        if snapshot and subvol_path:
-            path1 = self._get_subvolume_snapshot_path(snapshot, subvol_path)
+    def _verify_clone_root(self, source_path, clone_path, clone, clone_group, clone_pool):
+        # verifies following clone root attrs quota, data_pool and pool_namespace
+        # remaining attributes of clone root are validated in _verify_clone_attrs
+
+        clone_info = json.loads(self._get_subvolume_info(self.volname, clone, clone_group))
+
+        # verify quota is inherited from source snapshot
+        src_quota = self.mount_a.getfattr(source_path, "ceph.quota.max_bytes")
+        self.assertEqual(clone_info["bytes_quota"], "infinite" if src_quota is None else int(src_quota))
+
+        if clone_pool:
+            # verify pool is set as per request
+            self.assertEqual(clone_info["data_pool"], clone_pool)
         else:
-            path1 = self._get_subvolume_path(self.volname, subvolume, group_name=source_group)
+            # verify pool and pool namespace are inherited from snapshot
+            self.assertEqual(clone_info["data_pool"],
+                             self.mount_a.getfattr(source_path, "ceph.dir.layout.pool"))
+            self.assertEqual(clone_info["pool_namespace"],
+                             self.mount_a.getfattr(source_path, "ceph.dir.layout.pool_namespace"))
+
+    def _verify_clone(self, subvolume, snapshot, clone,
+                      source_group=None, clone_group=None, clone_pool=None,
+                      subvol_path=None, source_version=2, timo=120):
+        # pass in subvol_path (subvolume path when snapshot was taken) when subvolume is removed
+        # but snapshots are retained for clone verification
+        path1 = self._get_subvolume_snapshot_path(subvolume, snapshot, source_group, subvol_path, source_version)
         path2 = self._get_subvolume_path(self.volname, clone, group_name=clone_group)
 
         check = 0
-        # TODO: currently rentries are not being returned for snapshots, if source entries are removed
+        # TODO: currently snapshot rentries are not stable if snapshot source entries
+        #       are removed, https://tracker.ceph.com/issues/46747
         while check < timo and subvol_path is None:
             val1 = int(self.mount_a.getfattr(path1, "ceph.dir.rentries"))
             val2 = int(self.mount_a.getfattr(path2, "ceph.dir.rentries"))
@@ -121,8 +147,8 @@ class TestVolumes(CephFSTestCase):
             time.sleep(1)
         self.assertTrue(check < timo)
 
-        self._verify_clone_attrs(subvolume, clone, source_group=source_group, clone_group=clone_group,
-                                 snapshot=snapshot, subvol_path=subvol_path)
+        self._verify_clone_root(path1, path2, clone, clone_group, clone_pool)
+        self._verify_clone_attrs(path1, path2)
 
     def _generate_random_volume_name(self, count=1):
         n = self.volume_start
@@ -200,6 +226,25 @@ class TestVolumes(CephFSTestCase):
     def _delete_test_volume(self):
         self._fs_cmd("volume", "rm", self.volname, "--yes-i-really-mean-it")
 
+    def _do_subvolume_pool_and_namespace_update(self, subvolume, pool=None, pool_namespace=None, subvolume_group=None):
+        subvolpath = self._get_subvolume_path(self.volname, subvolume, group_name=subvolume_group)
+
+        if pool is not None:
+            self.mount_a.setfattr(subvolpath, 'ceph.dir.layout.pool', pool)
+
+        if pool_namespace is not None:
+            self.mount_a.setfattr(subvolpath, 'ceph.dir.layout.pool_namespace', pool_namespace)
+
+    def _do_subvolume_attr_update(self, subvolume, uid, gid, mode, subvolume_group=None):
+        subvolpath = self._get_subvolume_path(self.volname, subvolume, group_name=subvolume_group)
+
+        # mode
+        self.mount_a.run_shell(['chmod', mode, subvolpath])
+
+        # ownership
+        self.mount_a.run_shell(['chown', uid, subvolpath])
+        self.mount_a.run_shell(['chgrp', gid, subvolpath])
+
     def _do_subvolume_io(self, subvolume, subvolume_group=None, create_dir=None,
                          number_of_files=DEFAULT_NUMBER_OF_FILES, file_size=DEFAULT_FILE_SIZE):
         # get subvolume path for IO
@@ -267,7 +312,7 @@ class TestVolumes(CephFSTestCase):
 
     def _create_v1_subvolume(self, subvol_name, subvol_group=None, has_snapshot=True, subvol_type='subvolume', state='complete'):
         group = subvol_group if subvol_group is not None else '_nogroup'
-        basepath = os.path.join(".", "volumes", group, subvol_name)
+        basepath = os.path.join("volumes", group, subvol_name)
         uuid_str = str(uuid.uuid4())
         createpath = os.path.join(basepath, uuid_str)
         self.mount_a.run_shell(['mkdir', '-p', createpath])
@@ -282,7 +327,7 @@ class TestVolumes(CephFSTestCase):
         self.mount_a.setfattr(createpath, 'ceph.dir.layout.pool', default_pool)
 
         # create a v1 .meta file
-        meta_contents = "[GLOBAL]\nversion = 1\ntype = {0}\npath = {1}\nstate = {2}\n".format(subvol_type, createpath, state)
+        meta_contents = "[GLOBAL]\nversion = 1\ntype = {0}\npath = {1}\nstate = {2}\n".format(subvol_type, "/" + createpath, state)
         if state == 'pending':
             # add a fake clone source
             meta_contents = meta_contents + '[source]\nvolume = fake\nsubvolume = fake\nsnapshot = fake\n'
@@ -1657,13 +1702,16 @@ class TestVolumes(CephFSTestCase):
         subvolume = self._generate_random_subvolume_name()
         snapshot = self._generate_random_snapshot_name()
         clone1, clone2 = self._generate_random_clone_name(2)
+        mode = "777"
+        uid  = "1000"
+        gid  = "1000"
 
         # emulate a v1 subvolume -- in the default group
         subvolume_path = self._create_v1_subvolume(subvolume)
 
         # getpath
-        subvolpath = self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
-        self.assertEqual(subvolpath.rstrip(), subvolume_path)
+        subvolpath = self._get_subvolume_path(self.volname, subvolume)
+        self.assertEqual(subvolpath, subvolume_path)
 
         # ls
         subvolumes = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
@@ -1684,18 +1732,18 @@ class TestVolumes(CephFSTestCase):
             self.assertIn(feature, subvol_info["features"], msg="expected feature '{0}' in subvolume".format(feature))
 
         # resize
-        nsize = self.DEFAULT_FILE_SIZE*1024*1024
+        nsize = self.DEFAULT_FILE_SIZE*1024*1024*10
         self._fs_cmd("subvolume", "resize", self.volname, subvolume, str(nsize))
         subvol_info = json.loads(self._get_subvolume_info(self.volname, subvolume))
         for md in subvol_md:
             self.assertIn(md, subvol_info, "'{0}' key not present in metadata of subvolume".format(md))
         self.assertEqual(subvol_info["bytes_quota"], nsize, "bytes_quota should be set to '{0}'".format(nsize))
 
-        # create (idempotent)
-        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+        # create (idempotent) (change some attrs, to ensure attrs are preserved from the snapshot on clone)
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode", mode, "--uid", uid, "--gid", gid)
 
-        # TODO: do some IO (fails possibly due to permissions)
-        #self._do_subvolume_io(subvolume, number_of_files=64)
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=8)
 
         # snap-create
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
@@ -1709,6 +1757,9 @@ class TestVolumes(CephFSTestCase):
         # ensure clone is v2
         self._assert_meta_location_and_version(self.volname, clone1, version=2)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone1, source_version=1)
+
         # clone (older snapshot)
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, 'fake', clone2)
 
@@ -1718,6 +1769,10 @@ class TestVolumes(CephFSTestCase):
         # ensure clone is v2
         self._assert_meta_location_and_version(self.volname, clone2, version=2)
 
+        # verify clone
+        # TODO: rentries will mismatch till this is fixed https://tracker.ceph.com/issues/46747
+        #self._verify_clone(subvolume, 'fake', clone2, source_version=1)
+
         # snap-info
         snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot))
         for md in snap_md:
@@ -1764,11 +1819,11 @@ class TestVolumes(CephFSTestCase):
         self._create_v1_subvolume(subvolume3, subvol_type='clone', has_snapshot=False, state='pending')
 
         # this would attempt auto-upgrade on access, but fail to do so as snapshots exist
-        subvolpath1 = self._fs_cmd("subvolume", "getpath", self.volname, subvolume1)
-        self.assertEqual(subvolpath1.rstrip(), subvol1_path)
+        subvolpath1 = self._get_subvolume_path(self.volname, subvolume1)
+        self.assertEqual(subvolpath1, subvol1_path)
 
-        subvolpath2 = self._fs_cmd("subvolume", "getpath", self.volname, subvolume2, group)
-        self.assertEqual(subvolpath2.rstrip(), subvol2_path)
+        subvolpath2 = self._get_subvolume_path(self.volname, subvolume2, group_name=group)
+        self.assertEqual(subvolpath2, subvol2_path)
 
         # this would attempt auto-upgrade on access, but fail to do so as volume is not complete
         # use clone status, as only certain operations are allowed in pending state
@@ -1819,11 +1874,11 @@ class TestVolumes(CephFSTestCase):
         subvol2_path = self._create_v1_subvolume(subvolume2, subvol_group=group, has_snapshot=False)
 
         # this would attempt auto-upgrade on access
-        subvolpath1 = self._fs_cmd("subvolume", "getpath", self.volname, subvolume1)
-        self.assertEqual(subvolpath1.rstrip(), subvol1_path)
+        subvolpath1 = self._get_subvolume_path(self.volname, subvolume1)
+        self.assertEqual(subvolpath1, subvol1_path)
 
-        subvolpath2 = self._fs_cmd("subvolume", "getpath", self.volname, subvolume2, group)
-        self.assertEqual(subvolpath2.rstrip(), subvol2_path)
+        subvolpath2 = self._get_subvolume_path(self.volname, subvolume2, group_name=group)
+        self.assertEqual(subvolpath2, subvol2_path)
 
         # ensure metadata file is in v2 location, with version retained as v2
         self._assert_meta_location_and_version(self.volname, subvolume1, version=2)
@@ -2072,7 +2127,7 @@ class TestVolumes(CephFSTestCase):
         self._wait_for_clone_to_complete(clone)
 
         # verify clone
-        self._verify_clone(subvolume, clone, snapshot=snapshot, subvol_path=subvol_path)
+        self._verify_clone(subvolume, snapshot, clone, subvol_path=subvol_path)
 
         # remove snapshots (removes retained volume)
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
@@ -2116,7 +2171,7 @@ class TestVolumes(CephFSTestCase):
         self._wait_for_clone_to_complete(subvolume)
 
         # verify clone
-        self._verify_clone(subvolume, subvolume, snapshot=snapshot, subvol_path=subvol_path)
+        self._verify_clone(subvolume, snapshot, subvolume, subvol_path=subvol_path)
 
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
@@ -2161,7 +2216,7 @@ class TestVolumes(CephFSTestCase):
         self._wait_for_clone_to_complete(clone)
 
         # verify clone
-        self._verify_clone(subvolume, clone, snapshot=snapshot1, subvol_path=subvol1_path)
+        self._verify_clone(subvolume, snapshot1, clone, subvol_path=subvol1_path)
 
         # create a snapshot on the clone
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, clone, snapshot2)
@@ -2247,7 +2302,7 @@ class TestVolumes(CephFSTestCase):
         self._wait_for_clone_to_complete(clone)
 
         # verify clone
-        self._verify_clone(subvolume, clone, snapshot=snapshot2, subvol_path=subvol2_path)
+        self._verify_clone(subvolume, snapshot2, clone, subvol_path=subvol2_path)
 
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot1)
@@ -2293,12 +2348,12 @@ class TestVolumes(CephFSTestCase):
         # now, unprotect snapshot
         self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
         self._fs_cmd("subvolume", "rm", self.volname, clone)
@@ -2326,12 +2381,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
         self._fs_cmd("subvolume", "rm", self.volname, clone)
@@ -2363,12 +2418,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone, clone_pool=new_pool)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone)
-
         subvol_path = self._get_subvolume_path(self.volname, clone)
         desired_pool = self.mount_a.getfattr(subvol_path, "ceph.dir.layout.pool")
         self.assertEqual(desired_pool, new_pool)
@@ -2388,6 +2443,9 @@ class TestVolumes(CephFSTestCase):
         mode = "777"
         uid  = "1000"
         gid  = "1000"
+        new_uid  = "1001"
+        new_gid  = "1001"
+        new_mode = "700"
 
         # create subvolume
         self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode", mode, "--uid", uid, "--gid", gid)
@@ -2398,17 +2456,64 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
+        # change subvolume attrs (to ensure clone picks up snapshot attrs)
+        self._do_subvolume_attr_update(subvolume, new_uid, new_gid, new_mode)
+
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_clone_inherit_snapshot_namespace_and_size(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+        osize = self.DEFAULT_FILE_SIZE*1024*1024*12
+
+        # create subvolume, in an isolated namespace with a specified size
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--namespace-isolated", "--size", str(osize))
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=8)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # create a pool different from current subvolume pool
+        subvol_path = self._get_subvolume_path(self.volname, subvolume)
+        default_pool = self.mount_a.getfattr(subvol_path, "ceph.dir.layout.pool")
+        new_pool = "new_pool"
+        self.assertNotEqual(default_pool, new_pool)
+        self.fs.add_data_pool(new_pool)
+
+        # update source subvolume pool
+        self._do_subvolume_pool_and_namespace_update(subvolume, pool=new_pool, pool_namespace="")
+
+        # schedule a clone, with NO --pool specification
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
         # verify clone
-        self._verify_clone(subvolume, clone)
+        self._verify_clone(subvolume, snapshot, clone)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
@@ -2437,12 +2542,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone1)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone1)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone1)
-
         # now the clone is just like a normal subvolume -- snapshot the clone and fork
         # another clone. before that do some IO so it's can be differentiated.
         self._do_subvolume_io(clone1, create_dir="data", number_of_files=32)
@@ -2456,12 +2561,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone2)
 
+        # verify clone
+        self._verify_clone(clone1, snapshot, clone2)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, clone1, snapshot)
 
-        # verify clone
-        self._verify_clone(clone1, clone2)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
         self._fs_cmd("subvolume", "rm", self.volname, clone1)
@@ -2494,12 +2599,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone, clone_group=group)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone, clone_group=group)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone, clone_group=group)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
         self._fs_cmd("subvolume", "rm", self.volname, clone, group)
@@ -2534,12 +2639,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone, source_group=group)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, group)
 
-        # verify clone
-        self._verify_clone(subvolume, clone, source_group=group)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
         self._fs_cmd("subvolume", "rm", self.volname, clone)
@@ -2576,12 +2681,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone, clone_group=c_group)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone, source_group=s_group, clone_group=c_group)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, s_group)
 
-        # verify clone
-        self._verify_clone(subvolume, clone, source_group=s_group, clone_group=c_group)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume, s_group)
         self._fs_cmd("subvolume", "rm", self.volname, clone, c_group)
@@ -2608,6 +2713,10 @@ class TestVolumes(CephFSTestCase):
         createpath = os.path.join(".", "volumes", "_nogroup", subvolume)
         self.mount_a.run_shell(['mkdir', '-p', createpath])
 
+        # add required xattrs to subvolume
+        default_pool = self.mount_a.getfattr(".", "ceph.dir.layout.pool")
+        self.mount_a.setfattr(createpath, 'ceph.dir.layout.pool', default_pool)
+
         # do some IO
         self._do_subvolume_io(subvolume, number_of_files=64)
 
@@ -2631,12 +2740,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone, source_version=1)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone)
-
         # ensure metadata file is in v2 location, with required version v2
         self._assert_meta_location_and_version(self.volname, clone)
 
@@ -2680,12 +2789,12 @@ class TestVolumes(CephFSTestCase):
         subvolpath = self._get_subvolume_path(self.volname, clone)
         self.assertNotEqual(subvolpath, None)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
         self._fs_cmd("subvolume", "rm", self.volname, clone)
@@ -2725,12 +2834,12 @@ class TestVolumes(CephFSTestCase):
         subvolpath = self._get_subvolume_path(self.volname, clone)
         self.assertNotEqual(subvolpath, None)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
         self._fs_cmd("subvolume", "rm", self.volname, clone)
@@ -2770,12 +2879,12 @@ class TestVolumes(CephFSTestCase):
         subvolpath = self._get_subvolume_path(self.volname, clone)
         self.assertNotEqual(subvolpath, None)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
         self._fs_cmd("subvolume", "rm", self.volname, clone)
@@ -2841,12 +2950,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
+        # verify clone
+        self._verify_clone(subvolume1, snapshot, clone)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume1, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume1, clone)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume1)
         self._fs_cmd("subvolume", "rm", self.volname, subvolume2)
@@ -2887,7 +2996,7 @@ class TestVolumes(CephFSTestCase):
         self._wait_for_clone_to_complete(clone1)
 
         # verify clone
-        self._verify_clone(subvolume, clone1)
+        self._verify_clone(subvolume, snapshot, clone1, clone_pool=new_pool)
 
         # wait a bit so that subsequent I/O will give pool full error
         time.sleep(120)
@@ -2938,12 +3047,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
         self._fs_cmd("subvolume", "rm", self.volname, clone)
index ad3db096a3248115331cc2b0ad28d91d76ce3c99..f6f413601ac8d1a8e0cdc1b9b25e076554f340c4 100644 (file)
@@ -1,7 +1,10 @@
 import os
+import stat
+import uuid
 import errno
 import logging
 from hashlib import md5
+from typing import Dict, Union
 
 import cephfs
 
@@ -116,33 +119,65 @@ class SubvolumeBase(object):
         else:
             self.metadata_mgr = MetadataManager(self.fs, self.config_path, 0o640)
 
-    def set_attrs(self, path, size, isolate_namespace, pool, uid, gid):
+    def get_attrs(self, pathname):
+        # get subvolume attributes
+        attrs = {} # type: Dict[str, Union[int, str, None]]
+        stx = self.fs.statx(pathname,
+                            cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID | cephfs.CEPH_STATX_MODE,
+                            cephfs.AT_SYMLINK_NOFOLLOW)
+
+        attrs["uid"] = int(stx["uid"])
+        attrs["gid"] = int(stx["gid"])
+        attrs["mode"] = int(int(stx["mode"]) & ~stat.S_IFMT(stx["mode"]))
+
+        try:
+            attrs["data_pool"] = self.fs.getxattr(pathname, 'ceph.dir.layout.pool').decode('utf-8')
+        except cephfs.NoData:
+            attrs["data_pool"] = None
+
+        try:
+            attrs["pool_namespace"] = self.fs.getxattr(pathname, 'ceph.dir.layout.pool_namespace').decode('utf-8')
+        except cephfs.NoData:
+            attrs["pool_namespace"] = None
+
+        try:
+            attrs["quota"] = int(self.fs.getxattr(pathname, 'ceph.quota.max_bytes').decode('utf-8'))
+        except cephfs.NoData:
+            attrs["quota"] = None
+
+        return attrs
+
+    def set_attrs(self, path, attrs):
+        # set subvolume attributes
         # set size
-        if size is not None:
+        quota = attrs.get("quota")
+        if quota is not None:
             try:
-                self.fs.setxattr(path, 'ceph.quota.max_bytes', str(size).encode('utf-8'), 0)
+                self.fs.setxattr(path, 'ceph.quota.max_bytes', str(quota).encode('utf-8'), 0)
             except cephfs.InvalidValue as e:
-                raise VolumeException(-errno.EINVAL, "invalid size specified: '{0}'".format(size))
+                raise VolumeException(-errno.EINVAL, "invalid size specified: '{0}'".format(quota))
             except cephfs.Error as e:
                 raise VolumeException(-e.args[0], e.args[1])
 
         # set pool layout
-        if pool:
+        data_pool = attrs.get("data_pool")
+        if data_pool is not None:
             try:
-                self.fs.setxattr(path, 'ceph.dir.layout.pool', pool.encode('utf-8'), 0)
+                self.fs.setxattr(path, 'ceph.dir.layout.pool', data_pool.encode('utf-8'), 0)
             except cephfs.InvalidValue:
                 raise VolumeException(-errno.EINVAL,
-                                      "invalid pool layout '{0}' -- need a valid data pool".format(pool))
+                                      "invalid pool layout '{0}' -- need a valid data pool".format(data_pool))
             except cephfs.Error as e:
                 raise VolumeException(-e.args[0], e.args[1])
 
         # isolate namespace
         xattr_key = xattr_val = None
-        if isolate_namespace:
+        pool_namespace = attrs.get("pool_namespace")
+        if pool_namespace is not None:
             # enforce security isolation, use separate namespace for this subvolume
             xattr_key = 'ceph.dir.layout.pool_namespace'
-            xattr_val = self.namespace
-        elif not pool:
+            xattr_val = pool_namespace
+        elif not data_pool:
             # If subvolume's namespace layout is not set, then the subvolume's pool
             # layout remains unset and will undesirably change with ancestor's
             # pool layout changes.
@@ -159,24 +194,26 @@ class SubvolumeBase(object):
                 raise VolumeException(-e.args[0], e.args[1])
 
         # set uid/gid
+        uid = attrs.get("uid")
         if uid is None:
             uid = self.group.uid
         else:
             try:
-                uid = int(uid)
                 if uid < 0:
                     raise ValueError
             except ValueError:
                 raise VolumeException(-errno.EINVAL, "invalid UID")
+
+        gid = attrs.get("gid")
         if gid is None:
             gid = self.group.gid
         else:
             try:
-                gid = int(gid)
                 if gid < 0:
                     raise ValueError
             except ValueError:
                 raise VolumeException(-errno.EINVAL, "invalid GID")
+
         if uid is not None and gid is not None:
             self.fs.chown(path, uid, gid)
 
index e2876dfda6b0656c8025581d07e9ff7bf8dd8002..a794d35d9acbdc9c4f5f6deaed2a4d7bf89322e5 100644 (file)
@@ -77,7 +77,14 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
         try:
             # create directory and set attributes
             self.fs.mkdirs(subvol_path, mode)
-            self.set_attrs(subvol_path, size, isolate_nspace, pool, uid, gid)
+            attrs = {
+                'uid': uid,
+                'gid': gid,
+                'data_pool': pool,
+                'pool_namespace': self.namespace if isolate_nspace else None,
+                'quota': size
+            }
+            self.set_attrs(subvol_path, attrs)
 
             # persist subvolume metadata
             qpath = subvol_path.decode('utf-8')
@@ -120,9 +127,18 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
 
         subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
         try:
+            # source snapshot attrs are used to create clone subvolume.
+            # attributes of subvolume's content though, are synced during the cloning process.
+            attrs = source_subvolume.get_attrs(source_subvolume.snapshot_data_path(snapname))
+
+            # override snapshot pool setting, if one is provided for the clone
+            if pool is not None:
+                attrs["data_pool"] = pool
+                attrs["pool_namespace"] = None
+
             # create directory and set attributes
-            self.fs.mkdirs(subvol_path, source_subvolume.mode)
-            self.set_attrs(subvol_path, None, None, pool, source_subvolume.uid, source_subvolume.gid)
+            self.fs.mkdirs(subvol_path, attrs.get("mode"))
+            self.set_attrs(subvol_path, attrs)
 
             # persist subvolume metadata and clone source
             qpath = subvol_path.decode('utf-8')
index 9848acf86616441d4b5c5d767e56ebe589c8f7bc..1c1ccb25fd122cee19001dc8d0b9f1c5c4dc4ea8 100644 (file)
@@ -119,7 +119,14 @@ class SubvolumeV2(SubvolumeV1):
         subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
         try:
             self.fs.mkdirs(subvol_path, mode)
-            self.set_attrs(subvol_path, size, isolate_nspace, pool, uid, gid)
+            attrs = {
+                'uid': uid,
+                'gid': gid,
+                'data_pool': pool,
+                'pool_namespace': self.namespace if isolate_nspace else None,
+                'quota': size
+            }
+            self.set_attrs(subvol_path, attrs)
 
             # persist subvolume metadata
             qpath = subvol_path.decode('utf-8')
@@ -151,20 +158,18 @@ class SubvolumeV2(SubvolumeV1):
         retained = self._is_retained()
         subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
         try:
-            stx = self.fs.statx(source_subvolume.snapshot_data_path(snapname),
-                                cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID,
-                                cephfs.AT_SYMLINK_NOFOLLOW)
-            uid = stx.get('uid')
-            gid = stx.get('gid')
-            stx_mode = stx.get('mode')
-            if stx_mode is not None:
-                mode = stx_mode & ~stat.S_IFMT(stx_mode)
-            else:
-                mode = None
+            # source snapshot attrs are used to create clone subvolume
+            # attributes of subvolume's content though, are synced during the cloning process.
+            attrs = source_subvolume.get_attrs(source_subvolume.snapshot_data_path(snapname))
+
+            # override snapshot pool setting, if one is provided for the clone
+            if pool is not None:
+                attrs["data_pool"] = pool
+                attrs["pool_namespace"] = None
 
             # create directory and set attributes
-            self.fs.mkdirs(subvol_path, mode)
-            self.set_attrs(subvol_path, None, None, pool, uid, gid)
+            self.fs.mkdirs(subvol_path, attrs.get("mode"))
+            self.set_attrs(subvol_path, attrs)
 
             # persist subvolume metadata and clone source
             qpath = subvol_path.decode('utf-8')
index d67de58bfc33d1601944ef0228a43bf546f990e4..4c314119e0dc7a92c658cfcb2ae6bba0d3a84226 100644 (file)
@@ -163,9 +163,14 @@ class VolumeClient(object):
                     try:
                         with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CREATE) as subvolume:
                             # idempotent creation -- valid. Attributes set is supported.
-                            uid = uid if uid else subvolume.uid
-                            gid = gid if gid else subvolume.gid
-                            subvolume.set_attrs(subvolume.path, size, isolate_nspace, pool, uid, gid)
+                            attrs = {
+                                'uid': uid if uid else subvolume.uid,
+                                'gid': gid if gid else subvolume.gid,
+                                'data_pool': pool,
+                                'pool_namespace': subvolume.namespace if isolate_nspace else None,
+                                'quota': size
+                            }
+                            subvolume.set_attrs(subvolume.path, attrs)
                     except VolumeException as ve:
                         if ve.errno == -errno.ENOENT:
                             self._create_subvolume(fs_handle, volname, group, subvolname, **kwargs)