]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes: maintain per subvolume trash directory
authorVenky Shankar <vshankar@redhat.com>
Fri, 21 Aug 2020 14:07:37 +0000 (10:07 -0400)
committerShyamsundar Ranganathan <srangana@redhat.com>
Thu, 27 Aug 2020 19:42:58 +0000 (15:42 -0400)
PR https://github.com/ceph/ceph/pull/36472 introduces changes
that disallow nested nested snapshots in a subtree (subvolume)
and renames across subvolumes. This effect asynchronous purge
in mgr/volumes as subvolume are moved to a trash directory for
asynchronous deletion by purge threads.

To workaround this, start maintaining a subvolume specific
trash directory. Use the trash directory as an index to the
subvolume specific trash directory entry.

This changes subvolume deletion logic which currently relies
on `--retain-snapshots` flag to decide if the subvolume user
directory should get purged or the subvolume base directory
itself. Deleting a subvolume moves the user facing directory
to its specific trash directory. Purge threads take care of
deleting user facing directories (in trash) and the subvolume
base directory if required (when certain conditions are met).

Fixes: https://tracker.ceph.com/issues/47154
Signed-off-by: Venky Shankar <vshankar@redhat.com>
(cherry picked from commit aae7a70ed2cf9c32684cfdaf701778a05f229e09)

src/pybind/mgr/volumes/fs/operations/resolver.py
src/pybind/mgr/volumes/fs/operations/trash.py
src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py
src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py
src/pybind/mgr/volumes/fs/purge_queue.py

index bf982af953ef5f5215e0b2780c5574aae8f42e15..a9543654e66d8a80245e4c686cf47c86339c5229 100644 (file)
@@ -15,3 +15,12 @@ def resolve(vol_spec, path):
     groupname = None if parts[2] == Group.NO_GROUP_NAME else parts[2]
     subvolname = parts[3]
     return (groupname, subvolname)
+
+def resolve_trash(vol_spec, path):
+    parts = splitall(path)
+    if len(parts) != 6 or os.path.join(parts[0], parts[1]) != vol_spec.subvolume_prefix or \
+       parts[4] != '.trash':
+        return None
+    groupname = None if parts[2] == Group.NO_GROUP_NAME else parts[2]
+    subvolname = parts[3]
+    return (groupname, subvolname)
index 66e8b6f263ed520d6885b45a496dc816f0b23ad7..66f1d71cf89a73ee0ce791deb758a4fd96158dff 100644 (file)
@@ -36,7 +36,7 @@ class Trash(GroupTemplate):
             with self.fs.opendir(self.path) as d:
                 entry = self.fs.readdir(d)
                 while entry:
-                    if entry.d_name not in exclude_list and entry.is_dir():
+                    if entry.d_name not in exclude_list:
                         return entry.d_name
                     entry = self.fs.readdir(d)
             return None
@@ -52,7 +52,7 @@ class Trash(GroupTemplate):
         """
         return self._get_single_dir_entry(exclude_list)
 
-    def purge(self, trash_entry, should_cancel):
+    def purge(self, trashpath, should_cancel):
         """
         purge a trash entry.
 
@@ -82,7 +82,6 @@ class Trash(GroupTemplate):
             if not should_cancel():
                 self.fs.rmdir(root_path)
 
-        trashpath = os.path.join(self.path, trash_entry)
         # catch any unlink errors
         try:
             rmtree(trashpath)
@@ -101,6 +100,20 @@ class Trash(GroupTemplate):
         except cephfs.Error as e:
             raise VolumeException(-e.args[0], e.args[1])
 
+    def link(self, path, bname):
+        pth = os.path.join(self.path, bname)
+        try:
+            self.fs.symlink(path, pth)
+        except cephfs.Error as e:
+            raise VolumeException(-e.args[0], e.args[1])
+
+    def delink(self, bname):
+        pth = os.path.join(self.path, bname)
+        try:
+            self.fs.unlink(pth)
+        except cephfs.Error as e:
+            raise VolumeException(-e.args[0], e.args[1])
+
 def create_trashcan(fs, vol_spec):
     """
     create a trash can.
index f6f413601ac8d1a8e0cdc1b9b25e076554f340c4..e0e494f9b24f78eaff09868c8d855f57a95a4b7c 100644 (file)
@@ -277,6 +277,12 @@ class SubvolumeBase(object):
             trashcan.dump(path)
             log.info("subvolume path '{0}' moved to trashcan".format(path))
 
+    def _link_dir(self, path, bname):
+        create_trashcan(self.fs, self.vol_spec)
+        with open_trashcan(self.fs, self.vol_spec) as trashcan:
+            trashcan.link(path, bname)
+            log.info("subvolume path '{0}' linked in trashcan bname {1}".format(path, bname))
+
     def trash_base_dir(self):
         if self.legacy_mode:
             self.fs.unlink(self.legacy_config_path)
index 82e90a1a84c8b4b6c66564a4c5654b519ce4020e..49c6020ff616d8c382668d10251427b1318c36f9 100644 (file)
@@ -12,6 +12,7 @@ from .op_sm import SubvolumeOpSm
 from .subvolume_v1 import SubvolumeV1
 from ..template import SubvolumeTemplate
 from ...exception import OpSmException, VolumeException, MetadataMgrException
+from ...fs_util import listdir
 from ..template import SubvolumeOpType
 
 log = logging.getLogger(__name__)
@@ -61,6 +62,37 @@ class SubvolumeV2(SubvolumeV1):
                 raise VolumeException(me.errno, "internal error while processing subvolume '{0}'".format(self.subvolname))
         return False
 
+    @property
+    def is_in_use(self):
+        return not self.path == b''
+
+    @property
+    def has_pending_purges(self):
+        try:
+            return not listdir(self.fs, self.trash_dir) == []
+        except VolumeException as ve:
+            if ve.errno == -errno.ENOENT:
+                return False
+            raise
+
+    @property
+    def trash_dir(self):
+        return os.path.join(self.base_path, b".trash")
+
+    def create_trashcan(self):
+        """per subvolume trash directory"""
+        try:
+            self.fs.stat(self.trash_dir)
+        except cephfs.Error as e:
+            if e.args[0] == errno.ENOENT:
+                try:
+                    log.debug("creating trash can: {0}".format(self.trash_dir))
+                    self.fs.mkdir(self.trash_dir, 0o700)
+                except cephfs.Error as ce:
+                    raise VolumeException(-ce.args[0], ce.args[1])
+            else:
+                raise VolumeException(-e.args[0], e.args[1])
+
     @staticmethod
     def is_valid_uuid(uuid_str):
         try:
@@ -258,6 +290,7 @@ class SubvolumeV2(SubvolumeV1):
                 self.uid = int(st.st_uid)
                 self.gid = int(st.st_gid)
                 self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode))
+            self.create_trashcan()
         except MetadataMgrException as me:
             if me.errno == -errno.ENOENT:
                 raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname))
@@ -269,19 +302,25 @@ class SubvolumeV2(SubvolumeV1):
             raise VolumeException(-e.args[0], e.args[1])
 
     def trash_incarnation_dir(self):
-        self._trash_dir(self.path)
+        """rename subvolume (uuid component) to trash"""
+        try:
+            bname = os.path.basename(self.path)
+            tpath = os.path.join(self.trash_dir, bname)
+            log.debug("trash: {0} -> {1}".format(self.path, tpath))
+            self.fs.rename(self.path, tpath)
+            self._link_dir(tpath, bname)
+        except cephfs.Error as e:
+            raise VolumeException(-e.args[0], e.args[1])
 
     def remove(self, retainsnaps=False):
         if self.list_snapshots():
             if not retainsnaps:
                 raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname))
-            if self.state != SubvolumeStates.STATE_RETAINED:
-                self.trash_incarnation_dir()
-                self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, "")
-                self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, SubvolumeStates.STATE_RETAINED.value)
-                self.metadata_mgr.flush()
-        else:
-            self.trash_base_dir()
+        if self.state != SubvolumeStates.STATE_RETAINED:
+            self.trash_incarnation_dir()
+            self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, "")
+            self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, SubvolumeStates.STATE_RETAINED.value)
+            self.metadata_mgr.flush()
 
     def info(self):
         if self.state != SubvolumeStates.STATE_RETAINED:
@@ -292,6 +331,8 @@ class SubvolumeV2(SubvolumeV1):
     def remove_snapshot(self, snapname):
         super(SubvolumeV2, self).remove_snapshot(snapname)
         if self.state == SubvolumeStates.STATE_RETAINED and not self.list_snapshots():
-            self.trash_base_dir()
+            # fake a trash entry for purge threads to find a job
+            bname = str(uuid.uuid4()).encode('utf-8')
+            self._link_dir(os.path.join(self.trash_dir, bname), bname)
             # tickle the volume purge job to purge this entry, using ESTALE
             raise VolumeException(-errno.ESTALE, "subvolume '{0}' has been removed as the last retained snapshot is removed".format(self.subvolname))
index 6eb715fa3a616460e65a8c81f80d3706c173c838..58e4f067f888abf9be23ba1fe7636d88db8f658f 100644 (file)
@@ -1,8 +1,16 @@
 import errno
 import logging
+import os
+import stat
+
+import cephfs
 
 from .async_job import AsyncJobs
 from .exception import VolumeException
+from .operations.resolver import resolve_trash
+from .operations.template import SubvolumeOpType
+from .operations.group import open_group
+from .operations.subvolume import open_subvol
 from .operations.volume import open_volume, open_volume_lockless
 from .operations.trash import open_trashcan
 
@@ -26,15 +34,63 @@ def get_trash_entry_for_volume(volume_client, volname, running_jobs):
         log.error("error fetching trash entry for volume '{0}' ({1})".format(volname, ve))
         return ve.errno, None
 
+def subvolume_purge(volume_client, volname, trashcan, subvolume_trash_entry, should_cancel):
+    groupname, subvolname = resolve_trash(volume_client.volspec, subvolume_trash_entry.decode('utf-8'))
+    log.debug("subvolume resolved to {0}/{1}".format(groupname, subvolname))
+
+    try:
+        with open_volume(volume_client, volname) as fs_handle:
+            with open_group(fs_handle, volume_client.volspec, groupname) as group:
+                with open_subvol(fs_handle, volume_client.volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume:
+                    log.debug("subvolume.path={0}".format(subvolume.path))
+                    log.debug("subvolume.is_in_use={0}".format(subvolume.is_in_use))
+                    log.debug("subvolume.has_pending_purges={0}".format(subvolume.has_pending_purges))
+                    log.debug("subvolume.list_snapshots={0}".format(subvolume.list_snapshots()))
+                    if subvolume.is_in_use or subvolume.has_pending_purges or subvolume.list_snapshots():
+                        log.debug("not purging subvolume -- bailing out.")
+                        return
+                    # this is fine under the global lock -- there are just a handful
+                    # of entries in the subvolume to purge. moreover, the purge needs
+                    # to be guarded since a create request might sneak in.
+                    trashcan.purge(subvolume.base_path, should_cancel)
+    except VolumeException as ve:
+        if not ve.errno == -errno.ENOENT:
+            raise
+
 # helper for starting a purge operation on a trash entry
-def purge_trash_entry_for_volume(volume_client, volname, purge_dir, should_cancel):
-    log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir, volname))
+def purge_trash_entry_for_volume(volume_client, volname, purge_entry, should_cancel):
+    log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry, volname))
 
     ret = 0
     try:
         with open_volume_lockless(volume_client, volname) as fs_handle:
             with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
-                trashcan.purge(purge_dir, should_cancel)
+                try:
+                    pth = os.path.join(trashcan.path, purge_entry)
+                    stx = fs_handle.statx(pth, cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_SIZE,
+                                          cephfs.AT_SYMLINK_NOFOLLOW)
+                    if stat.S_ISLNK(stx['mode']):
+                        tgt = fs_handle.readlink(pth, 4096)
+                        tgt = tgt[:stx['size']]
+                        log.debug("entry points to subvolume trash: {0}".format(tgt))
+                        delink = True
+                        try:
+                            log.debug("purging subvolume trash: {0}".format(tgt))
+                            trashcan.purge(tgt, should_cancel)
+                        except VolumeException as ve:
+                            if not ve.errno == -errno.ENOENT:
+                                delink = False
+                                return ve.errno
+                        finally:
+                            if delink:
+                                subvolume_purge(volume_client, volname, trashcan, tgt, should_cancel)
+                                log.debug("purging trash link: {0}".format(purge_entry))
+                                trashcan.delink(purge_entry)
+                    else:
+                        log.debug("entry points to trash: {0}".format(pth))
+                        trashcan.purge(pth)
+                except cephfs.Error as e:
+                    log.warn("failed to remove trash entry: {0}".format(e))
     except VolumeException as ve:
         ret = ve.errno
     return ret