]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes: Introduce v2 subvolumes
authorShyamsundar Ranganathan <srangana@redhat.com>
Sun, 5 Jul 2020 23:17:02 +0000 (19:17 -0400)
committerShyamsundar Ranganathan <srangana@redhat.com>
Thu, 30 Jul 2020 01:14:40 +0000 (21:14 -0400)
Implements subvolume v2 version.

Following support is added,
- Ability to retain snapshots on subvolume deletion
- Modify directory where snapshot is created to the subvolume
- "features" supported to subvolume info output, specifically ability
for a subvolume to retain snashots
- Current state of subvolume in info output
- Auto upgrade to v2 from eligible v1 subvolumes
- Adjust other functions as needed to support the changes

Signed-off-by: Shyamsundar Ranganathan <srangana@redhat.com>
src/pybind/mgr/volumes/fs/async_cloner.py
src/pybind/mgr/volumes/fs/operations/group.py
src/pybind/mgr/volumes/fs/operations/op_sm.py
src/pybind/mgr/volumes/fs/operations/subvolume.py
src/pybind/mgr/volumes/fs/operations/versions/__init__.py
src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py
src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py
src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py [new file with mode: 0644]
src/pybind/mgr/volumes/fs/volume.py
src/pybind/mgr/volumes/module.py

index 25ec793af6d770ddc4a9e85473fe2db229851ee0..d227dc594df4794030d2292e0a95022e26d504b5 100644 (file)
@@ -55,6 +55,28 @@ def open_at_group(volume_client, fs_handle, groupname, subvolname, op_type):
         with open_subvol(fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume:
             yield subvolume
 
+@contextmanager
+def open_at_group_unique(volume_client, fs_handle, s_groupname, s_subvolname, c_subvolume, c_groupname, c_subvolname, op_type):
+    # if a snapshot of a retained subvolume is being cloned to recreate the same subvolume, return
+    # the clone subvolume as the source subvolume
+    if s_groupname == c_groupname and s_subvolname == c_subvolname:
+        yield c_subvolume
+    else:
+        with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, op_type) as s_subvolume:
+            yield s_subvolume
+
+
+@contextmanager
+def open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname):
+    with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
+        s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
+        if groupname == s_groupname and subvolname == s_subvolname:
+            # use the same subvolume to avoid metadata overwrites
+            yield (clone_subvolume, clone_subvolume, s_snapname)
+        else:
+            with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
+                yield (clone_subvolume, source_subvolume, s_snapname)
+
 def get_clone_state(volume_client, volname, groupname, subvolname):
     with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
         return subvolume.state
@@ -166,12 +188,10 @@ def bulk_copy(fs_handle, source_path, dst_path, should_cancel):
 
 def do_clone(volume_client, volname, groupname, subvolname, should_cancel):
     with open_volume_lockless(volume_client, volname) as fs_handle:
-        with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
-            s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
-            with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
-                src_path = source_subvolume.snapshot_path(s_snapname)
-                dst_path = clone_subvolume.path
-                bulk_copy(fs_handle, src_path, dst_path, should_cancel)
+        with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
+            src_path = clone_volumes[1].snapshot_data_path(clone_volumes[2])
+            dst_path = clone_volumes[0].path
+            bulk_copy(fs_handle, src_path, dst_path, should_cancel)
 
 def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel):
     try:
@@ -187,12 +207,10 @@ def handle_clone_in_progress(volume_client, volname, index, groupname, subvolnam
 
 def handle_clone_failed(volume_client, volname, index, groupname, subvolname, should_cancel):
     try:
-        # detach source but leave the clone section intact for later inspection
         with open_volume(volume_client, volname) as fs_handle:
-            with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
-                s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
-                with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
-                    source_subvolume.detach_snapshot(s_snapname, index)
+            # detach source but leave the clone section intact for later inspection
+            with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
+                clone_volumes[1].detach_snapshot(clone_volumes[2], index)
     except (MetadataMgrException, VolumeException) as e:
         log.error("failed to detach clone from snapshot: {0}".format(e))
     return (None, True)
@@ -200,11 +218,9 @@ def handle_clone_failed(volume_client, volname, index, groupname, subvolname, sh
 def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel):
     try:
         with open_volume(volume_client, volname) as fs_handle:
-            with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
-                s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
-                with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
-                    source_subvolume.detach_snapshot(s_snapname, index)
-                    clone_subvolume.remove_clone_source(flush=True)
+            with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
+                clone_volumes[1].detach_snapshot(clone_volumes[2], index)
+                clone_volumes[0].remove_clone_source(flush=True)
     except (MetadataMgrException, VolumeException) as e:
         log.error("failed to detach clone from snapshot: {0}".format(e))
     return (None, True)
@@ -268,7 +284,7 @@ class Cloner(AsyncJobs):
         with open_clone_index(fs_handle, self.vc.volspec) as index:
             return index.find_clone_entry_index(clone_subvolume.base_path)
 
-    def _cancel_pending_clone(self, fs_handle, clone_subvolume, status, track_idx):
+    def _cancel_pending_clone(self, fs_handle, clone_subvolume, clone_subvolname, clone_groupname, status, track_idx):
         clone_state = SubvolumeStates.from_value(status['state'])
         assert self.is_clone_cancelable(clone_state)
 
@@ -276,13 +292,13 @@ class Cloner(AsyncJobs):
         s_subvolname = status['source']['subvolume']
         s_snapname = status['source']['snapshot']
 
-        with open_group(fs_handle, self.vc.volspec, s_groupname) as s_group:
-            with open_subvol(fs_handle, self.vc.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
-                next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
-                                                      clone_state,
-                                                      SubvolumeActions.ACTION_CANCELLED)
-                clone_subvolume.state = (next_state, True)
-                s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8'))
+        with open_at_group_unique(self.vc, fs_handle, s_groupname, s_subvolname, clone_subvolume, clone_groupname,
+                                  clone_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
+            next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+                                                  clone_state,
+                                                  SubvolumeActions.ACTION_CANCELLED)
+            clone_subvolume.state = (next_state, True)
+            s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8'))
 
     def cancel_job(self, volname, job):
         """
@@ -306,7 +322,7 @@ class Cloner(AsyncJobs):
                             raise VolumeException(-errno.EINVAL, "error canceling clone")
                         if SubvolumeOpSm.is_init_state(SubvolumeTypes.TYPE_CLONE, clone_state):
                             # clone has not started yet -- cancel right away.
-                            self._cancel_pending_clone(fs_handle, clone_subvolume, status, track_idx)
+                            self._cancel_pending_clone(fs_handle, clone_subvolume, clonename, groupname, status, track_idx)
                             return
             # cancelling an on-going clone would persist "canceled" state in subvolume metadata.
             # to persist the new state, async cloner accesses the volume in exclusive mode.
index 8e6eaf936bb1c20134e326c03ea7851409e70486..aac81f299346afcb706577adca2a4d8ab5375908 100644 (file)
@@ -180,3 +180,11 @@ def open_group(fs, vol_spec, groupname):
         else:
             raise VolumeException(-e.args[0], e.args[1])
     yield group
+
+@contextmanager
+def open_group_unique(fs, vol_spec, groupname, c_group, c_groupname):
+    if groupname == c_groupname:
+        yield c_group
+    else:
+        with open_group(fs, vol_spec, groupname) as group:
+            yield group
index 74ce838ee8be0e44f661682e4705376b9d06d054..17d5ea36b02414440b35464d765677b86bd22fc3 100644 (file)
@@ -14,6 +14,7 @@ class SubvolumeStates(Enum):
     STATE_FAILED        = 'failed'
     STATE_COMPLETE      = 'complete'
     STATE_CANCELED      = 'canceled'
+    STATE_RETAINED      = 'snapshot-retained'
 
     @staticmethod
     def from_value(value):
@@ -29,6 +30,8 @@ class SubvolumeStates(Enum):
             return SubvolumeStates.STATE_COMPLETE
         if value == "canceled":
             return SubvolumeStates.STATE_CANCELED
+        if value == "snapshot-retained":
+            return SubvolumeStates.STATE_RETAINED
 
         raise OpSmException(-errno.EINVAL, "invalid state '{0}'".format(value))
 
@@ -38,6 +41,7 @@ class SubvolumeActions(Enum):
     ACTION_SUCCESS      = 1
     ACTION_FAILED       = 2
     ACTION_CANCELLED    = 3
+    ACTION_RETAINED     = 4
 
 class TransitionKey(object):
     def __init__(self, subvol_type, state, action_type):
@@ -105,6 +109,10 @@ SubvolumeOpSm.transition_table = {
                   SubvolumeStates.STATE_INIT,
                   SubvolumeActions.ACTION_NONE) : SubvolumeStates.STATE_COMPLETE,
 
+    TransitionKey(SubvolumeTypes.TYPE_NORMAL,
+                  SubvolumeStates.STATE_COMPLETE,
+                  SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED,
+
     # state transitions for state machine type TYPE_CLONE
     TransitionKey(SubvolumeTypes.TYPE_CLONE,
                   SubvolumeStates.STATE_INIT,
@@ -129,4 +137,16 @@ SubvolumeOpSm.transition_table = {
     TransitionKey(SubvolumeTypes.TYPE_CLONE,
                   SubvolumeStates.STATE_INPROGRESS,
                   SubvolumeActions.ACTION_FAILED) : SubvolumeStates.STATE_FAILED,
+
+    TransitionKey(SubvolumeTypes.TYPE_CLONE,
+                  SubvolumeStates.STATE_COMPLETE,
+                  SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED,
+
+    TransitionKey(SubvolumeTypes.TYPE_CLONE,
+                  SubvolumeStates.STATE_CANCELED,
+                  SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED,
+
+    TransitionKey(SubvolumeTypes.TYPE_CLONE,
+                  SubvolumeStates.STATE_FAILED,
+                  SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED,
 }
index 46ec0f28f74c1d0cfd1f430070a26d8cfac4ef62..b7ef88830b5145826e546696bad57f81e5c432ed 100644 (file)
@@ -46,7 +46,7 @@ def create_clone(fs, vol_spec, group, subvolname, pool, source_volume, source_su
     subvolume = loaded_subvolumes.get_subvolume_object_max(fs, vol_spec, group, subvolname)
     subvolume.create_clone(pool, source_volume, source_subvolume, snapname)
 
-def remove_subvol(fs, vol_spec, group, subvolname, force=False):
+def remove_subvol(fs, vol_spec, group, subvolname, force=False, retainsnaps=False):
     """
     remove a subvolume.
 
@@ -59,9 +59,7 @@ def remove_subvol(fs, vol_spec, group, subvolname, force=False):
     """
     op_type = SubvolumeOpType.REMOVE if not force else SubvolumeOpType.REMOVE_FORCE
     with open_subvol(fs, vol_spec, group, subvolname, op_type) as subvolume:
-        if subvolume.list_snapshots():
-            raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(subvolname))
-        subvolume.remove()
+        subvolume.remove(retainsnaps)
 
 @contextmanager
 def open_subvol(fs, vol_spec, group, subvolname, op_type):
index 0569bd8b5d7e3488a768de004a2cc49daa08d6d6..5ef9aad21a00aa17c2d5477ddf310278f72a2b84 100644 (file)
@@ -5,8 +5,12 @@ import importlib
 import cephfs
 
 from .subvolume_base import SubvolumeBase
+from .subvolume_base import SubvolumeTypes
+from .subvolume_v1 import SubvolumeV1
+from .subvolume_v2 import SubvolumeV2
+from .metadata_manager import MetadataManager
 from ..op_sm import SubvolumeOpSm
-from ..op_sm import SubvolumeTypes
+from ..template import SubvolumeOpType
 from ...exception import MetadataMgrException, OpSmException, VolumeException
 
 log = logging.getLogger(__name__)
@@ -14,7 +18,7 @@ log = logging.getLogger(__name__)
 class SubvolumeLoader(object):
     INVALID_VERSION = -1
 
-    SUPPORTED_MODULES = ['subvolume_v1.SubvolumeV1']
+    SUPPORTED_MODULES = ['subvolume_v1.SubvolumeV1', 'subvolume_v2.SubvolumeV2']
 
     def __init__(self):
         self.max_version = SubvolumeLoader.INVALID_VERSION
@@ -45,6 +49,31 @@ class SubvolumeLoader(object):
     def get_subvolume_object_max(self, fs, vol_spec, group, subvolname):
         return self._get_subvolume_version(self.max_version)(fs, vol_spec, group, subvolname)
 
+    def upgrade_to_v2_subvolume(self, subvolume):
+        # legacy mode subvolumes cannot be upgraded to v2
+        if subvolume.legacy_mode:
+            return
+
+        version = int(subvolume.metadata_mgr.get_global_option('version'))
+        if version >= SubvolumeV2.version():
+            return
+
+        v1_subvolume = self._get_subvolume_version(version)(subvolume.fs, subvolume.vol_spec, subvolume.group, subvolume.subvolname)
+        try:
+            v1_subvolume.open(SubvolumeOpType.SNAP_LIST)
+        except VolumeException as ve:
+            # if volume is not ready for snapshot listing, do not upgrade at present
+            if ve.errno == -errno.EAGAIN:
+                return
+            raise
+
+        # v1 subvolumes with snapshots cannot be upgraded to v2
+        if v1_subvolume.list_snapshots():
+            return
+
+        subvolume.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_VERSION, SubvolumeV2.version())
+        subvolume.metadata_mgr.flush()
+
     def upgrade_legacy_subvolume(self, fs, subvolume):
         assert subvolume.legacy_mode
         try:
@@ -57,12 +86,14 @@ class SubvolumeLoader(object):
         except OpSmException as oe:
             raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
         qpath = subvolume.base_path.decode('utf-8')
-        subvolume.init_config(self.max_version, subvolume_type, qpath, initial_state)
+        # legacy is only upgradable to v1
+        subvolume.init_config(SubvolumeV1.version(), subvolume_type, qpath, initial_state)
 
     def get_subvolume_object(self, fs, vol_spec, group, subvolname, upgrade=True):
         subvolume = SubvolumeBase(fs, vol_spec, group, subvolname)
         try:
             subvolume.discover()
+            self.upgrade_to_v2_subvolume(subvolume)
             version = int(subvolume.metadata_mgr.get_global_option('version'))
             return self._get_subvolume_version(version)(fs, vol_spec, group, subvolname, legacy=subvolume.legacy_mode)
         except MetadataMgrException as me:
index 5108cfd83294840629556b87f50b9ec24cd27d67..6de9916ef01f9166a1d8e61b34a873b819af4aea 100644 (file)
@@ -19,6 +19,7 @@ log = logging.getLogger(__name__)
 class SubvolumeFeatures(Enum):
     FEATURE_SNAPSHOT_CLONE       = "snapshot-clone"
     FEATURE_SNAPSHOT_AUTOPROTECT = "snapshot-autoprotect"
+    FEATURE_SNAPSHOT_RETENTION   = "snapshot-retention"
 
 @unique
 class SubvolumeTypes(Enum):
@@ -111,10 +112,22 @@ class SubvolumeBase(object):
     def legacy_mode(self, mode):
         self.legacy = mode
 
+    @property
+    def path(self):
+        raise NotImplementedError
+
     @property
     def features(self):
         raise NotImplementedError
 
+    @property
+    def state(self):
+        raise NotImplementedError
+
+    @property
+    def subvol_type(self):
+        return SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
+
     def load_config(self):
         if self.legacy_mode:
             self.metadata_mgr = MetadataManager(self.fs, self.legacy_config_path, 0o640)
@@ -242,14 +255,16 @@ class SubvolumeBase(object):
                 raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname))
             raise VolumeException(-e.args[0], "error accessing subvolume '{0}'".format(self.subvolname))
 
+    def _trash_dir(self, path):
+        create_trashcan(self.fs, self.vol_spec)
+        with open_trashcan(self.fs, self.vol_spec) as trashcan:
+            trashcan.dump(path)
+            log.info("subvolume path '{0}' moved to trashcan".format(path))
+
     def trash_base_dir(self):
         if self.legacy_mode:
             self.fs.unlink(self.legacy_config_path)
-        subvol_path = self.base_path
-        create_trashcan(self.fs, self.vol_spec)
-        with open_trashcan(self.fs, self.vol_spec) as trashcan:
-            trashcan.dump(subvol_path)
-            log.info("subvolume with path '{0}' moved to trashcan".format(subvol_path))
+        self._trash_dir(self.base_path)
 
     def create_base_dir(self, mode):
         try:
@@ -258,8 +273,8 @@ class SubvolumeBase(object):
             raise VolumeException(-e.args[0], e.args[1])
 
     def info (self):
-        subvolpath = self.metadata_mgr.get_global_option('path')
-        etype = SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
+        subvolpath = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH)
+        etype = self.subvol_type
         st = self.fs.statx(subvolpath, cephfs.CEPH_STATX_BTIME | cephfs.CEPH_STATX_SIZE |
                                        cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID |
                                        cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_ATIME |
@@ -282,4 +297,4 @@ class SubvolumeBase(object):
             'mode': int(st["mode"]), 'data_pool': data_pool, 'created_at': str(st["btime"]),
             'bytes_quota': "infinite" if nsize == 0 else nsize, 'bytes_used': int(usedbytes),
             'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0),
-            'pool_namespace': pool_namespace, 'features': self.features}
+            'pool_namespace': pool_namespace, 'features': self.features, 'state': self.state.value}
index 3c0625b8a5d08c89e82a40682878e2fa6e187f80..e587fd33f3a47dedb441c42095ef789df4f4a4d1 100644 (file)
@@ -8,9 +8,10 @@ from datetime import datetime
 import cephfs
 
 from .metadata_manager import MetadataManager
-from .subvolume_base import SubvolumeBase, SubvolumeFeatures
+from .subvolume_base import SubvolumeBase
+from .subvolume_base import SubvolumeTypes
+from .subvolume_base import SubvolumeFeatures
 from ..op_sm import SubvolumeOpSm
-from ..op_sm import SubvolumeTypes
 from ..op_sm import SubvolumeStates
 from ..template import SubvolumeTemplate
 from ..snapshot_util import mksnap, rmsnap
@@ -23,6 +24,20 @@ from ..clone_index import open_clone_index, create_clone_index
 log = logging.getLogger(__name__)
 
 class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
+    """
+    Version 1 subvolumes creates a subvolume with path as follows,
+        volumes/<group-name>/<subvolume-name>/<uuid>/
+
+    - The directory under which user data resides is <uuid>
+    - Snapshots of the subvolume are taken within the <uuid> directory
+    - A meta file is maintained under the <subvolume-name> directory as a metadata store, typically storing,
+        - global information about the subvolume (version, path, type, state)
+        - snapshots attached to an ongoing clone operation
+        - clone snapshot source if subvolume is a clone of a snapshot
+    - It retains backward compatability with legacy subvolumes by creating the meta file for legacy subvolumes under
+    /volumes/_legacy/ (see legacy_config_path), thus allowing cloning of older legacy volumes that lack the <uuid>
+    component in the path.
+    """
     VERSION = 1
 
     @staticmethod
@@ -33,7 +48,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
     def path(self):
         try:
             # no need to stat the path -- open() does that
-            return self.metadata_mgr.get_global_option('path').encode('utf-8')
+            return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH).encode('utf-8')
         except MetadataMgrException as me:
             raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
 
@@ -146,7 +161,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
         try:
             self.metadata_mgr.refresh()
 
-            etype = SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
+            etype = self.subvol_type
             if op_type not in self.allowed_ops_by_type(etype):
                 raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format(
                                       op_type.value, self.subvolname, etype.value))
@@ -195,7 +210,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
     @property
     def status(self):
         state = SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
-        subvolume_type = SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
+        subvolume_type = self.subvol_type
         subvolume_status = {
             'state' : state.value
         }
@@ -215,7 +230,11 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
         if flush:
             self.metadata_mgr.flush()
 
-    def remove(self):
+    def remove(self, retainsnaps=False):
+        if retainsnaps:
+            raise VolumeException(-errno.EINVAL, "subvolume '{0}' does not support snapshot retention on delete".format(self.subvolname))
+        if self.list_snapshots():
+            raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname))
         self.trash_base_dir()
 
     def resize(self, newsize, noshrink):
@@ -227,6 +246,9 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
                             self.vol_spec.snapshot_dir_prefix.encode('utf-8'),
                             snapname.encode('utf-8'))
 
+    def snapshot_data_path(self, snapname):
+        return self.snapshot_path(snapname)
+
     def create_snapshot(self, snapname):
         snappath = self.snapshot_path(snapname)
         mksnap(self.fs, snappath)
@@ -260,7 +282,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
         except cephfs.Error as e:
             if e.errno == errno.ENOENT:
                 raise VolumeException(-errno.ENOENT,
-                                      "snapshot '{0}' doesnot exist".format(snapname))
+                                      "snapshot '{0}' does not exist".format(snapname))
             raise VolumeException(-e.args[0], e.args[1])
 
     def list_snapshots(self):
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py
new file mode 100644 (file)
index 0000000..23127d3
--- /dev/null
@@ -0,0 +1,439 @@
+import os
+import stat
+import uuid
+import errno
+import logging
+from datetime import datetime
+
+import cephfs
+
+from .metadata_manager import MetadataManager
+from .subvolume_base import SubvolumeBase
+from .subvolume_base import SubvolumeTypes
+from .subvolume_base import SubvolumeFeatures
+from ..op_sm import SubvolumeOpSm
+from ..op_sm import SubvolumeStates
+from ..template import SubvolumeTemplate
+from ..snapshot_util import mksnap, rmsnap
+from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException
+from ...fs_util import listdir
+from ..template import SubvolumeOpType
+
+from ..clone_index import open_clone_index, create_clone_index
+
+log = logging.getLogger(__name__)
+
+class SubvolumeV2(SubvolumeBase, SubvolumeTemplate):
+    """
+    Version 2 subvolumes creates a subvolume with path as follows,
+        volumes/<group-name>/<subvolume-name>/<uuid>/
+
+    The distinguishing feature of V2 subvolume as compared to V1 subvolumes is its ability to retain snapshots
+    of a subvolume on removal. This is done by creating snapshots under the <subvolume-name> directory,
+    rather than under the <uuid> directory, as is the case of V1 subvolumes.
+
+    - The directory under which user data resides is <uuid>
+    - Snapshots of the subvolume are taken within the <subvolume-name> directory
+    - A meta file is maintained under the <subvolume-name> directory as a metadata store, storing information similar
+    to V1 subvolumes
+    - On a request to remove subvolume but retain its snapshots, only the <uuid> directory is moved to trash, retaining
+    the rest of the subvolume and its meta file.
+        - The <uuid> directory, when present, is the current incarnation of the subvolume, which may have snapshots of
+        older incarnations of the same subvolume.
+    - V1 subvolumes that currently do not have any snapshots are upgraded to V2 subvolumes automatically, to support the
+    snapshot retention feature
+    """
+    VERSION = 2
+
+    @staticmethod
+    def version():
+        return SubvolumeV2.VERSION
+
+    @property
+    def path(self):
+        try:
+            # no need to stat the path -- open() does that
+            return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH).encode('utf-8')
+        except MetadataMgrException as me:
+            raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
+
+    @property
+    def features(self):
+        return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value,
+                SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value,
+                SubvolumeFeatures.FEATURE_SNAPSHOT_RETENTION.value]
+
+    def _set_incarnation_metadata(self, subvolume_type, qpath, initial_state):
+        self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_TYPE, subvolume_type.value)
+        self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, qpath)
+        self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, initial_state.value)
+
+    def create(self, size, isolate_nspace, pool, mode, uid, gid):
+        subvolume_type = SubvolumeTypes.TYPE_NORMAL
+        try:
+            initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
+        except OpSmException as oe:
+            raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
+
+        subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
+        try:
+            self.fs.mkdirs(subvol_path, mode)
+            self.set_attrs(subvol_path, size, isolate_nspace, pool, uid, gid)
+
+            # persist subvolume metadata
+            qpath = subvol_path.decode('utf-8')
+            try:
+                self.metadata_mgr.refresh()
+                if self.state == SubvolumeStates.STATE_RETAINED:
+                    self._set_incarnation_metadata(subvolume_type, qpath, initial_state)
+                    self.metadata_mgr.flush()
+                else:
+                    raise VolumeException(-errno.EINVAL, "invalid state for subvolume '{0}' during create".format(self.subvolname))
+            except MetadataMgrException as me:
+                if me.errno != -errno.ENOENT:
+                    raise
+                self.init_config(SubvolumeV2.VERSION, subvolume_type, qpath, initial_state)
+        except (VolumeException, MetadataMgrException, cephfs.Error) as e:
+            try:
+                log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
+                self.remove()
+            except VolumeException as ve:
+                log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
+
+            if isinstance(e, MetadataMgrException):
+                log.error("metadata manager exception: {0}".format(e))
+                e = VolumeException(-errno.EINVAL, "exception in subvolume metadata")
+            elif isinstance(e, cephfs.Error):
+                e = VolumeException(-e.args[0], e.args[1])
+            raise e
+
+    def add_clone_source(self, volname, subvolume, snapname, flush=False):
+        self.metadata_mgr.add_section("source")
+        self.metadata_mgr.update_section("source", "volume", volname)
+        if not subvolume.group.is_default_group():
+            self.metadata_mgr.update_section("source", "group", subvolume.group_name)
+        self.metadata_mgr.update_section("source", "subvolume", subvolume.subvol_name)
+        self.metadata_mgr.update_section("source", "snapshot", snapname)
+        if flush:
+            self.metadata_mgr.flush()
+
+    def remove_clone_source(self, flush=False):
+        self.metadata_mgr.remove_section("source")
+        if flush:
+            self.metadata_mgr.flush()
+
+    def create_clone(self, pool, source_volname, source_subvolume, snapname):
+        subvolume_type = SubvolumeTypes.TYPE_CLONE
+        try:
+            initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
+        except OpSmException as oe:
+            raise VolumeException(-errno.EINVAL, "clone failed: internal error")
+
+        subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
+        try:
+            stx = self.fs.statx(source_subvolume.snapshot_data_path(snapname),
+                                cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID,
+                                cephfs.AT_SYMLINK_NOFOLLOW)
+            uid= stx.get('uid')
+            gid = stx.get('gid')
+            stx_mode = stx.get('mode')
+            if stx_mode is not None:
+                mode = stx_mode & ~stat.S_IFMT(stx_mode)
+            else:
+                mode = None
+
+            # create directory and set attributes
+            self.fs.mkdirs(subvol_path, mode)
+            self.set_attrs(subvol_path, None, None, pool, uid, gid)
+
+            # persist subvolume metadata and clone source
+            qpath = subvol_path.decode('utf-8')
+            try:
+                self.metadata_mgr.refresh()
+                if self.state == SubvolumeStates.STATE_RETAINED:
+                    self._set_incarnation_metadata(subvolume_type, qpath, initial_state)
+                else:
+                    raise VolumeException(-errno.EINVAL, "invalid state for subvolume '{0}' during clone".format(self.subvolname))
+            except MetadataMgrException as me:
+                if me.errno != -errno.ENOENT:
+                    raise
+                self.metadata_mgr.init(SubvolumeV2.VERSION, subvolume_type.value, qpath, initial_state.value)
+            self.add_clone_source(source_volname, source_subvolume, snapname)
+            self.metadata_mgr.flush()
+        except (VolumeException, MetadataMgrException, cephfs.Error) as e:
+            try:
+                log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
+                self.remove()
+            except VolumeException as ve:
+                log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
+
+            if isinstance(e, MetadataMgrException):
+                log.error("metadata manager exception: {0}".format(e))
+                e = VolumeException(-errno.EINVAL, "exception in subvolume metadata")
+            elif isinstance(e, cephfs.Error):
+                e = VolumeException(-e.args[0], e.args[1])
+            raise e
+
+    def allowed_ops_by_type(self, vol_type):
+        if vol_type == SubvolumeTypes.TYPE_CLONE:
+            return {op_type for op_type in SubvolumeOpType}
+
+        if vol_type == SubvolumeTypes.TYPE_NORMAL:
+            return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS,
+                                                               SubvolumeOpType.CLONE_CANCEL,
+                                                               SubvolumeOpType.CLONE_INTERNAL}
+
+        return {}
+
+    def allowed_ops_by_state(self, vol_state):
+        if vol_state == SubvolumeStates.STATE_COMPLETE:
+            return {op_type for op_type in SubvolumeOpType}
+
+        if vol_state == SubvolumeStates.STATE_RETAINED:
+            return {
+                SubvolumeOpType.REMOVE,
+                SubvolumeOpType.REMOVE_FORCE,
+                SubvolumeOpType.LIST,
+                SubvolumeOpType.INFO,
+                SubvolumeOpType.SNAP_REMOVE,
+                SubvolumeOpType.SNAP_LIST,
+                SubvolumeOpType.SNAP_INFO,
+                SubvolumeOpType.SNAP_PROTECT,
+                SubvolumeOpType.SNAP_UNPROTECT,
+                SubvolumeOpType.CLONE_SOURCE
+            }
+
+        return {SubvolumeOpType.REMOVE_FORCE,
+                SubvolumeOpType.CLONE_CREATE,
+                SubvolumeOpType.CLONE_STATUS,
+                SubvolumeOpType.CLONE_CANCEL,
+                SubvolumeOpType.CLONE_INTERNAL,
+                SubvolumeOpType.CLONE_SOURCE}
+
+    def open(self, op_type):
+        if not isinstance(op_type, SubvolumeOpType):
+            raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format(
+                                  op_type.value, self.subvolname))
+        try:
+            self.metadata_mgr.refresh()
+
+            etype = self.subvol_type
+            if op_type not in self.allowed_ops_by_type(etype):
+                raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format(
+                                      op_type.value, self.subvolname, etype.value))
+
+            estate = self.state
+            if op_type not in self.allowed_ops_by_state(estate) and estate == SubvolumeStates.STATE_RETAINED:
+                raise VolumeException(-errno.ENOENT, "subvolume '{0}' is removed and has only snapshots retained".format(
+                                      self.subvolname))
+
+            if op_type not in self.allowed_ops_by_state(estate) and estate != SubvolumeStates.STATE_RETAINED:
+                raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format(
+                                      self.subvolname, op_type.value))
+
+            if estate != SubvolumeStates.STATE_RETAINED:
+                subvol_path = self.path
+                log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path))
+                st = self.fs.stat(subvol_path)
+
+                self.uid = int(st.st_uid)
+                self.gid = int(st.st_gid)
+                self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode))
+        except MetadataMgrException as me:
+            if me.errno == -errno.ENOENT:
+                raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname))
+            raise VolumeException(me.args[0], me.args[1])
+        except cephfs.ObjectNotFound:
+            log.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path, self.subvolname))
+            raise VolumeException(-errno.ENOENT, "mount path missing for subvolume '{0}'".format(self.subvolname))
+        except cephfs.Error as e:
+            raise VolumeException(-e.args[0], e.args[1])
+
+    def _get_clone_source(self):
+        try:
+            clone_source = {
+                'volume'   : self.metadata_mgr.get_option("source", "volume"),
+                'subvolume': self.metadata_mgr.get_option("source", "subvolume"),
+                'snapshot' : self.metadata_mgr.get_option("source", "snapshot"),
+            }
+
+            try:
+                clone_source["group"] = self.metadata_mgr.get_option("source", "group")
+            except MetadataMgrException as me:
+                if me.errno == -errno.ENOENT:
+                    pass
+                else:
+                    raise
+        except MetadataMgrException as me:
+            raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
+        return clone_source
+
+    @property
+    def status(self):
+        state = SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
+        subvolume_type = self.subvol_type
+        subvolume_status = {
+            'state' : state.value
+        }
+        if not SubvolumeOpSm.is_complete_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE:
+            subvolume_status["source"] = self._get_clone_source()
+        return subvolume_status
+
+    @property
+    def state(self):
+        return SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
+
+    @state.setter
+    def state(self, val):
+        state = val[0].value
+        flush = val[1]
+        self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, state)
+        if flush:
+            self.metadata_mgr.flush()
+
+    @property
+    def type(self):
+        return SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
+
+    def trash_incarnation_dir(self):
+        self._trash_dir(self.path)
+
+    def remove(self, retainsnaps=False):
+        if self.list_snapshots():
+            if not retainsnaps:
+                raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname))
+            self.trash_incarnation_dir()
+            self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, "")
+            self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, SubvolumeStates.STATE_RETAINED.value)
+            self.metadata_mgr.flush()
+        else:
+            self.trash_base_dir()
+
+    def resize(self, newsize, noshrink):
+        subvol_path = self.path
+        return self._resize(subvol_path, newsize, noshrink)
+
+    def info(self):
+        if self.state != SubvolumeStates.STATE_RETAINED:
+            return super(SubvolumeV2, self).info()
+
+        return {'type': self.subvol_type.value, 'features': self.features, 'state': SubvolumeStates.STATE_RETAINED.value}
+
+    def snapshot_path(self, snapname):
+        return os.path.join(self.base_path,
+                            self.vol_spec.snapshot_dir_prefix.encode('utf-8'),
+                            snapname.encode('utf-8'))
+
+    @staticmethod
+    def is_valid_uuid(uuid_str):
+        try:
+            uuid.UUID(uuid_str)
+            return True
+        except ValueError:
+            return False
+
+    def snapshot_data_path(self, snapname):
+        snap_base_path = self.snapshot_path(snapname)
+        uuid_str = None
+        try:
+            with self.fs.opendir(snap_base_path) as dir_handle:
+                d = self.fs.readdir(dir_handle)
+                while d:
+                    if d.d_name not in (b".", b".."):
+                        d_full_path = os.path.join(snap_base_path, d.d_name)
+                        stx = self.fs.statx(d_full_path, cephfs.CEPH_STATX_MODE, cephfs.AT_SYMLINK_NOFOLLOW)
+                        if stat.S_ISDIR(stx.get('mode')):
+                            if self.is_valid_uuid(d.d_name.decode('utf-8')):
+                                uuid_str = d.d_name
+                    d = self.fs.readdir(dir_handle)
+        except cephfs.Error as e:
+            if e.errno == errno.ENOENT:
+                raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+            raise VolumeException(-e.args[0], e.args[1])
+
+        if not uuid_str:
+            raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+
+        return os.path.join(snap_base_path, uuid_str)
+
+    def create_snapshot(self, snapname):
+        snappath = self.snapshot_path(snapname)
+        mksnap(self.fs, snappath)
+
+    def has_pending_clones(self, snapname):
+        try:
+            return self.metadata_mgr.section_has_item('clone snaps', snapname)
+        except MetadataMgrException as me:
+            if me.errno == -errno.ENOENT:
+                return False
+            raise
+
+    def remove_snapshot(self, snapname):
+        if self.has_pending_clones(snapname):
+            raise VolumeException(-errno.EAGAIN, "snapshot '{0}' has pending clones".format(snapname))
+        snappath = self.snapshot_path(snapname)
+        rmsnap(self.fs, snappath)
+        if self.state == SubvolumeStates.STATE_RETAINED and not self.list_snapshots():
+            self.trash_base_dir()
+            # tickle the volume purge job to purge this entry, using ESTALE
+            raise VolumeException(-errno.ESTALE, "subvolume '{0}' has been removed as the last retained snapshot is removed".format(self.subvolname))
+
+    def snapshot_info(self, snapname):
+        snappath = self.snapshot_data_path(snapname)
+        snap_info = {}
+        try:
+            snap_attrs = {'created_at':'ceph.snap.btime', 'size':'ceph.dir.rbytes',
+                          'data_pool':'ceph.dir.layout.pool'}
+            for key, val in snap_attrs.items():
+                snap_info[key] = self.fs.getxattr(snappath, val)
+            return {'size': int(snap_info['size']),
+                    'created_at': str(datetime.fromtimestamp(float(snap_info['created_at']))),
+                    'data_pool': snap_info['data_pool'].decode('utf-8'),
+                    'has_pending_clones': "yes" if self.has_pending_clones(snapname) else "no"}
+        except cephfs.Error as e:
+            if e.errno == errno.ENOENT:
+                raise VolumeException(-errno.ENOENT,
+                                      "snapshot '{0}' does not exist".format(snapname))
+            raise VolumeException(-e.args[0], e.args[1])
+
+    def list_snapshots(self):
+        try:
+            dirpath = os.path.join(self.base_path,
+                                   self.vol_spec.snapshot_dir_prefix.encode('utf-8'))
+            return listdir(self.fs, dirpath)
+        except VolumeException as ve:
+            if ve.errno == -errno.ENOENT:
+                return []
+            raise
+
+    def _add_snap_clone(self, track_id, snapname):
+        self.metadata_mgr.add_section("clone snaps")
+        self.metadata_mgr.update_section("clone snaps", track_id, snapname)
+        self.metadata_mgr.flush()
+
+    def _remove_snap_clone(self, track_id):
+        self.metadata_mgr.remove_option("clone snaps", track_id)
+        self.metadata_mgr.flush()
+
+    def attach_snapshot(self, snapname, tgt_subvolume):
+        if not snapname.encode('utf-8') in self.list_snapshots():
+            raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+        try:
+            create_clone_index(self.fs, self.vol_spec)
+            with open_clone_index(self.fs, self.vol_spec) as index:
+                track_idx = index.track(tgt_subvolume.base_path)
+                self._add_snap_clone(track_idx, snapname)
+        except (IndexException, MetadataMgrException) as e:
+            log.warning("error creating clone index: {0}".format(e))
+            raise VolumeException(-errno.EINVAL, "error cloning subvolume")
+
+    def detach_snapshot(self, snapname, track_id):
+        if not snapname.encode('utf-8') in self.list_snapshots():
+            raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+        try:
+            with open_clone_index(self.fs, self.vol_spec) as index:
+                index.untrack(track_id)
+                self._remove_snap_clone(track_id)
+        except (IndexException, MetadataMgrException) as e:
+            log.warning("error delining snapshot from clone: {0}".format(e))
+            raise VolumeException(-errno.EINVAL, "error delinking snapshot from clone")
index 612cd882cc3c3b8593bef2b5a9e28494c70db691..73e474315b2553009e010f0f4cd28e3729cbd7cc 100644 (file)
@@ -10,7 +10,7 @@ from .fs_util import listdir
 
 from .operations.volume import create_volume, \
     delete_volume, list_volumes, open_volume, get_pool_names
-from .operations.group import open_group, create_group, remove_group
+from .operations.group import open_group, create_group, remove_group, open_group_unique
 from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
     create_clone
 
@@ -175,16 +175,17 @@ class VolumeClient(CephfsClient):
         return ret
 
     def remove_subvolume(self, **kwargs):
-        ret        = 0, "", ""
-        volname    = kwargs['vol_name']
-        subvolname = kwargs['sub_name']
-        groupname  = kwargs['group_name']
-        force      = kwargs['force']
+        ret         = 0, "", ""
+        volname     = kwargs['vol_name']
+        subvolname  = kwargs['sub_name']
+        groupname   = kwargs['group_name']
+        force       = kwargs['force']
+        retainsnaps = kwargs['retain_snapshots']
 
         try:
             with open_volume(self, volname) as fs_handle:
                 with open_group(fs_handle, self.volspec, groupname) as group:
-                    remove_subvol(fs_handle, self.volspec, group, subvolname, force)
+                    remove_subvol(fs_handle, self.volspec, group, subvolname, force, retainsnaps)
                     # kick the purge threads for async removal -- note that this
                     # assumes that the subvolume is moved to trash can.
                     # TODO: make purge queue as singleton so that trash can kicks
@@ -322,7 +323,11 @@ class VolumeClient(CephfsClient):
                     with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_REMOVE) as subvolume:
                         subvolume.remove_snapshot(snapname)
         except VolumeException as ve:
-            if not (ve.errno == -errno.ENOENT and force):
+            # ESTALE serves as an error to state that subvolume is currently stale due to internal removal and,
+            # we should tickle the purge jobs to purge the same
+            if ve.errno == -errno.ESTALE:
+                self.purge_queue.queue_job(volname)
+            elif not (ve.errno == -errno.ENOENT and force):
                 ret = self.volume_exception_to_retval(ve)
         return ret
 
@@ -389,52 +394,59 @@ class VolumeClient(CephfsClient):
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    def _prepare_clone_subvolume(self, fs_handle, volname, subvolume, snapname, target_group, target_subvolname, target_pool):
-        create_clone(fs_handle, self.volspec, target_group, target_subvolname, target_pool, volname, subvolume, snapname)
-        with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_INTERNAL) as target_subvolume:
+    def _prepare_clone_subvolume(self, fs_handle, volname, s_subvolume, s_snapname, t_group, t_subvolname, **kwargs):
+        t_pool              = kwargs['pool_layout']
+        s_subvolname        = kwargs['sub_name']
+        s_groupname         = kwargs['group_name']
+        t_groupname         = kwargs['target_group_name']
+
+        create_clone(fs_handle, self.volspec, t_group, t_subvolname, t_pool, volname, s_subvolume, s_snapname)
+        with open_subvol(fs_handle, self.volspec, t_group, t_subvolname, SubvolumeOpType.CLONE_INTERNAL) as t_subvolume:
             try:
-                subvolume.attach_snapshot(snapname, target_subvolume)
+                if t_groupname == s_groupname and t_subvolname == s_subvolname:
+                    t_subvolume.attach_snapshot(s_snapname, t_subvolume)
+                else:
+                    s_subvolume.attach_snapshot(s_snapname, t_subvolume)
                 self.cloner.queue_job(volname)
             except VolumeException as ve:
                 try:
-                    target_subvolume.remove()
+                    t_subvolume.remove()
                     self.purge_queue.queue_job(volname)
                 except Exception as e:
-                    log.warning("failed to cleanup clone subvolume '{0}' ({1})".format(target_subvolname, e))
+                    log.warning("failed to cleanup clone subvolume '{0}' ({1})".format(t_subvolname, e))
                 raise ve
 
-    def _clone_subvolume_snapshot(self, fs_handle, volname, subvolume, **kwargs):
-        snapname          = kwargs['snap_name']
-        target_pool       = kwargs['pool_layout']
-        target_subvolname = kwargs['target_sub_name']
-        target_groupname  = kwargs['target_group_name']
+    def _clone_subvolume_snapshot(self, fs_handle, volname, s_group, s_subvolume, **kwargs):
+        s_snapname          = kwargs['snap_name']
+        target_subvolname   = kwargs['target_sub_name']
+        target_groupname    = kwargs['target_group_name']
+        s_groupname         = kwargs['group_name']
 
-        if not snapname.encode('utf-8') in subvolume.list_snapshots():
-            raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+        if not s_snapname.encode('utf-8') in s_subvolume.list_snapshots():
+            raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(s_snapname))
 
-        # TODO: when the target group is same as source, reuse group object.
-        with open_group(fs_handle, self.volspec, target_groupname) as target_group:
+        with open_group_unique(fs_handle, self.volspec, target_groupname, s_group, s_groupname) as target_group:
             try:
                 with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_CREATE):
                     raise VolumeException(-errno.EEXIST, "subvolume '{0}' exists".format(target_subvolname))
             except VolumeException as ve:
                 if ve.errno == -errno.ENOENT:
-                    self._prepare_clone_subvolume(fs_handle, volname, subvolume, snapname,
-                                                  target_group, target_subvolname, target_pool)
+                    self._prepare_clone_subvolume(fs_handle, volname, s_subvolume, s_snapname,
+                                                  target_group, target_subvolname, **kwargs)
                 else:
                     raise
 
     def clone_subvolume_snapshot(self, **kwargs):
         ret        = 0, "", ""
         volname    = kwargs['vol_name']
-        subvolname = kwargs['sub_name']
-        groupname  = kwargs['group_name']
+        s_subvolname = kwargs['sub_name']
+        s_groupname  = kwargs['group_name']
 
         try:
             with open_volume(self, volname) as fs_handle:
-                with open_group(fs_handle, self.volspec, groupname) as group:
-                    with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CLONE_SOURCE) as subvolume:
-                        self._clone_subvolume_snapshot(fs_handle, volname, subvolume, **kwargs)
+                with open_group(fs_handle, self.volspec, s_groupname) as s_group:
+                    with open_subvol(fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
+                        self._clone_subvolume_snapshot(fs_handle, volname, s_group, s_subvolume, **kwargs)
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
index 76d660ae652f81c476605e97fb744dccf9cb4129..b902e37e90c21b8068feeeaa01d2d91277de0ada 100644 (file)
@@ -83,9 +83,11 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
                    'name=vol_name,type=CephString '
                    'name=sub_name,type=CephString '
                    'name=group_name,type=CephString,req=false '
-                   'name=force,type=CephBool,req=false ',
+                   'name=force,type=CephBool,req=false '
+                   'name=retain_snapshots,type=CephBool,req=false ',
             'desc': "Delete a CephFS subvolume in a volume, and optionally, "
-                    "in a specific subvolume group",
+                    "in a specific subvolume group, force deleting a cancelled or failed "
+                    "clone, and retaining existing subvolume snapshots",
             'perm': 'rw'
         },
         {
@@ -408,7 +410,8 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
         return self.vc.remove_subvolume(vol_name=cmd['vol_name'],
                                         sub_name=cmd['sub_name'],
                                         group_name=cmd.get('group_name', None),
-                                        force=cmd.get('force', False))
+                                        force=cmd.get('force', False),
+                                        retain_snapshots=cmd.get('retain_snapshots', False))
 
     def _cmd_fs_subvolume_ls(self, inbuf, cmd):
         return self.vc.list_subvolumes(vol_name=cmd['vol_name'],