]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/volumes: Use operation type during subvolume open
authorShyamsundar Ranganathan <srangana@redhat.com>
Thu, 2 Jul 2020 01:08:34 +0000 (21:08 -0400)
committerShyamsundar Ranganathan <srangana@redhat.com>
Thu, 30 Jul 2020 01:14:39 +0000 (21:14 -0400)
Subvolume open currently takes in 2 optional parameters to
denote desired state and type. This enables the open to
allow the operation to suceed based on the (type, state)
tuple.

Instead, pass an operation type to be performed on a subvolume
during open, and decide internal to a subvolume version if the
operation is allowed based on its state and type.

Also modifies the state machine code, to be more amenable to
modifications and improves redability.

Signed-off-by: Shyamsundar Ranganathan <srangana@redhat.com>
src/pybind/mgr/volumes/fs/async_cloner.py
src/pybind/mgr/volumes/fs/operations/op_sm.py
src/pybind/mgr/volumes/fs/operations/subvolume.py
src/pybind/mgr/volumes/fs/operations/template.py
src/pybind/mgr/volumes/fs/operations/versions/__init__.py
src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py
src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py
src/pybind/mgr/volumes/fs/volume.py

index 7cca7da8ec21d9b42c3cff4203cc156f488cbbe3..25ec793af6d770ddc4a9e85473fe2db229851ee0 100644 (file)
@@ -10,12 +10,17 @@ import cephfs
 from .async_job import AsyncJobs
 from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException
 from .fs_util import copy_file
-from .operations.op_sm import OpSm
+from .operations.op_sm import SubvolumeOpSm
+from .operations.op_sm import SubvolumeTypes
+from .operations.op_sm import SubvolumeActions
+from .operations.op_sm import SubvolumeStates
 from .operations.resolver import resolve
 from .operations.volume import open_volume, open_volume_lockless
 from .operations.group import open_group
 from .operations.subvolume import open_subvol
 from .operations.clone_index import open_clone_index
+from .operations.template import SubvolumeOpType
+from .operations.versions import SubvolumeBase
 
 log = logging.getLogger(__name__)
 
@@ -38,38 +43,52 @@ def get_next_clone_entry(volume_client, volname, running_jobs):
         return ve.errno, None
 
 @contextmanager
-def open_at_volume(volume_client, volname, groupname, subvolname, need_complete=False, expected_types=[]):
+def open_at_volume(volume_client, volname, groupname, subvolname, op_type):
     with open_volume(volume_client, volname) as fs_handle:
         with open_group(fs_handle, volume_client.volspec, groupname) as group:
-            with open_subvol(fs_handle, volume_client.volspec, group, subvolname,
-                             need_complete, expected_types) as subvolume:
+            with open_subvol(fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume:
                 yield subvolume
 
 @contextmanager
-def open_at_group(volume_client, fs_handle, groupname, subvolname, need_complete=False, expected_types=[]):
+def open_at_group(volume_client, fs_handle, groupname, subvolname, op_type):
     with open_group(fs_handle, volume_client.volspec, groupname) as group:
-        with open_subvol(fs_handle, volume_client.volspec, group, subvolname,
-                         need_complete, expected_types) as subvolume:
+        with open_subvol(fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume:
             yield subvolume
 
 def get_clone_state(volume_client, volname, groupname, subvolname):
-    with open_at_volume(volume_client, volname, groupname, subvolname) as subvolume:
+    with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
         return subvolume.state
 
 def set_clone_state(volume_client, volname, groupname, subvolname, state):
-    with open_at_volume(volume_client, volname, groupname, subvolname) as subvolume:
+    with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
         subvolume.state = (state, True)
 
 def get_clone_source(clone_subvolume):
     source = clone_subvolume._get_clone_source()
     return (source['volume'], source.get('group', None), source['subvolume'], source['snapshot'])
 
+def get_next_state_on_error(errnum):
+    if errnum == -errno.EINTR:
+        next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+                                              SubvolumeStates.STATE_INPROGRESS,
+                                              SubvolumeActions.ACTION_CANCELLED)
+    else:
+        # jump to failed state, on all other errors
+        next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+                                              SubvolumeStates.STATE_INPROGRESS,
+                                              SubvolumeActions.ACTION_FAILED)
+    return next_state
+
 def handle_clone_pending(volume_client, volname, index, groupname, subvolname, should_cancel):
     try:
         if should_cancel():
-            next_state = OpSm.get_next_state("clone", "pending", -errno.EINTR)
+            next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+                                                  SubvolumeStates.STATE_PENDING,
+                                                  SubvolumeActions.ACTION_CANCELLED)
         else:
-            next_state = OpSm.get_next_state("clone", "pending", 0)
+            next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+                                                  SubvolumeStates.STATE_PENDING,
+                                                  SubvolumeActions.ACTION_SUCCESS)
     except OpSmException as oe:
         raise VolumeException(oe.errno, oe.error_str)
     return (next_state, False)
@@ -147,9 +166,9 @@ def bulk_copy(fs_handle, source_path, dst_path, should_cancel):
 
 def do_clone(volume_client, volname, groupname, subvolname, should_cancel):
     with open_volume_lockless(volume_client, volname) as fs_handle:
-        with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume:
+        with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
             s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
-            with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume:
+            with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
                 src_path = source_subvolume.snapshot_path(s_snapname)
                 dst_path = clone_subvolume.path
                 bulk_copy(fs_handle, src_path, dst_path, should_cancel)
@@ -157,10 +176,11 @@ def do_clone(volume_client, volname, groupname, subvolname, should_cancel):
 def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel):
     try:
         do_clone(volume_client, volname, groupname, subvolname, should_cancel)
-        next_state = OpSm.get_next_state("clone", "in-progress", 0)
+        next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+                                              SubvolumeStates.STATE_INPROGRESS,
+                                              SubvolumeActions.ACTION_SUCCESS)
     except VolumeException as ve:
-        # jump to failed state
-        next_state = OpSm.get_next_state("clone", "in-progress", ve.errno)
+        next_state = get_next_state_on_error(ve.errno)
     except OpSmException as oe:
         raise VolumeException(oe.errno, oe.error_str)
     return (next_state, False)
@@ -169,9 +189,9 @@ def handle_clone_failed(volume_client, volname, index, groupname, subvolname, sh
     try:
         # detach source but leave the clone section intact for later inspection
         with open_volume(volume_client, volname) as fs_handle:
-            with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume:
+            with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
                 s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
-                with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume:
+                with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
                     source_subvolume.detach_snapshot(s_snapname, index)
     except (MetadataMgrException, VolumeException) as e:
         log.error("failed to detach clone from snapshot: {0}".format(e))
@@ -180,9 +200,9 @@ def handle_clone_failed(volume_client, volname, index, groupname, subvolname, sh
 def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel):
     try:
         with open_volume(volume_client, volname) as fs_handle:
-            with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume:
+            with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
                 s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
-                with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume:
+                with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
                     source_subvolume.detach_snapshot(s_snapname, index)
                     clone_subvolume.remove_clone_source(flush=True)
     except (MetadataMgrException, VolumeException) as e:
@@ -227,29 +247,29 @@ def clone(volume_client, volname, index, clone_path, state_table, should_cancel)
 class Cloner(AsyncJobs):
     """
     Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume.
-    this relies on a simple state machine (which mimics states from OpSm class) as
+    this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
     the driver. file types supported are directories, symbolic links and regular files.
     """
     def __init__(self, volume_client, tp_size):
         self.vc = volume_client
         self.state_table = {
-            'pending'     : handle_clone_pending,
-            'in-progress' : handle_clone_in_progress,
-            'complete'    : handle_clone_complete,
-            'failed'      : handle_clone_failed,
-            'canceled'    : handle_clone_failed,
+            SubvolumeStates.STATE_PENDING      : handle_clone_pending,
+            SubvolumeStates.STATE_INPROGRESS   : handle_clone_in_progress,
+            SubvolumeStates.STATE_COMPLETE     : handle_clone_complete,
+            SubvolumeStates.STATE_FAILED       : handle_clone_failed,
+            SubvolumeStates.STATE_CANCELED     : handle_clone_failed,
         }
         super(Cloner, self).__init__(volume_client, "cloner", tp_size)
 
     def is_clone_cancelable(self, clone_state):
-        return not (OpSm.is_final_state(clone_state) or OpSm.is_failed_state(clone_state))
+        return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
 
     def get_clone_tracking_index(self, fs_handle, clone_subvolume):
         with open_clone_index(fs_handle, self.vc.volspec) as index:
             return index.find_clone_entry_index(clone_subvolume.base_path)
 
     def _cancel_pending_clone(self, fs_handle, clone_subvolume, status, track_idx):
-        clone_state = status['state']
+        clone_state = SubvolumeStates.from_value(status['state'])
         assert self.is_clone_cancelable(clone_state)
 
         s_groupname = status['source'].get('group', None)
@@ -257,8 +277,10 @@ class Cloner(AsyncJobs):
         s_snapname = status['source']['snapshot']
 
         with open_group(fs_handle, self.vc.volspec, s_groupname) as s_group:
-            with open_subvol(fs_handle, self.vc.volspec, s_group, s_subvolname) as s_subvolume:
-                next_state = OpSm.get_next_state("clone", clone_state, -errno.EINTR)
+            with open_subvol(fs_handle, self.vc.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
+                next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+                                                      clone_state,
+                                                      SubvolumeActions.ACTION_CANCELLED)
                 clone_subvolume.state = (next_state, True)
                 s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8'))
 
@@ -273,17 +295,16 @@ class Cloner(AsyncJobs):
         try:
             with open_volume(self.vc, volname) as fs_handle:
                 with open_group(fs_handle, self.vc.volspec, groupname) as group:
-                    with open_subvol(fs_handle, self.vc.volspec, group, clonename,
-                                     need_complete=False, expected_types=["clone"]) as clone_subvolume:
+                    with open_subvol(fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
                         status = clone_subvolume.status
-                        clone_state = status['state']
+                        clone_state = SubvolumeStates.from_value(status['state'])
                         if not self.is_clone_cancelable(clone_state):
                             raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
                         track_idx = self.get_clone_tracking_index(fs_handle, clone_subvolume)
                         if not track_idx:
                             log.warning("cannot lookup clone tracking index for {0}".format(clone_subvolume.base_path))
                             raise VolumeException(-errno.EINVAL, "error canceling clone")
-                        if OpSm.is_init_state("clone", clone_state):
+                        if SubvolumeOpSm.is_init_state(SubvolumeTypes.TYPE_CLONE, clone_state):
                             # clone has not started yet -- cancel right away.
                             self._cancel_pending_clone(fs_handle, clone_subvolume, status, track_idx)
                             return
@@ -294,8 +315,7 @@ class Cloner(AsyncJobs):
             with self.lock:
                 with open_volume_lockless(self.vc, volname) as fs_handle:
                     with open_group(fs_handle, self.vc.volspec, groupname) as group:
-                        with open_subvol(fs_handle, self.vc.volspec, group, clonename,
-                                         need_complete=False, expected_types=["clone"]) as clone_subvolume:
+                        with open_subvol(fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
                             if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)):
                                 raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
         except (IndexException, MetadataMgrException) as e:
index a5f44f4d00df6d173cb9a327f31178410441639a..74ce838ee8be0e44f661682e4705376b9d06d054 100644 (file)
 import errno
 
+from enum import Enum, unique
 from typing import Dict
 
+from .versions.subvolume_base import SubvolumeTypes
 from ..exception import OpSmException
 
-class OpSm(object):
-    INIT_STATE_KEY = 'init'
+@unique
+class SubvolumeStates(Enum):
+    STATE_INIT          = 'init'
+    STATE_PENDING       = 'pending'
+    STATE_INPROGRESS    = 'in-progress'
+    STATE_FAILED        = 'failed'
+    STATE_COMPLETE      = 'complete'
+    STATE_CANCELED      = 'canceled'
 
-    FAILED_STATE = 'failed'
-    FINAL_STATE  = 'complete'
-    CANCEL_STATE = 'canceled'
+    @staticmethod
+    def from_value(value):
+        if value == "init":
+            return SubvolumeStates.STATE_INIT
+        if value == "pending":
+            return SubvolumeStates.STATE_PENDING
+        if value == "in-progress":
+            return SubvolumeStates.STATE_INPROGRESS
+        if value == "failed":
+            return SubvolumeStates.STATE_FAILED
+        if value == "complete":
+            return SubvolumeStates.STATE_COMPLETE
+        if value == "canceled":
+            return SubvolumeStates.STATE_CANCELED
+
+        raise OpSmException(-errno.EINVAL, "invalid state '{0}'".format(value))
+
+@unique
+class SubvolumeActions(Enum):
+    ACTION_NONE         = 0
+    ACTION_SUCCESS      = 1
+    ACTION_FAILED       = 2
+    ACTION_CANCELLED    = 3
+
+class TransitionKey(object):
+    def __init__(self, subvol_type, state, action_type):
+        self.transition_key = [subvol_type, state, action_type]
 
-    OP_SM_SUBVOLUME = {
-        INIT_STATE_KEY : FINAL_STATE,
-    }
+    def __hash__(self):
+        return hash(tuple(self.transition_key))
 
-    OP_SM_CLONE = {
-        INIT_STATE_KEY : 'pending',
-        'pending'           : ('in-progress', (FAILED_STATE, CANCEL_STATE)),
-        'in-progress'       : (FINAL_STATE, (FAILED_STATE, CANCEL_STATE)),
-    } # type: Dict
+    def __eq__(self, other):
+        return self.transition_key == other.transition_key
 
-    STATE_MACHINES_TYPES = {
-        "subvolume" : OP_SM_SUBVOLUME,
-        "clone"     : OP_SM_CLONE,
-    } # type: Dict
+    def __neq__(self, other):
+        return not(self == other)
+
+class SubvolumeOpSm(object):
+    transition_table = {}
 
     @staticmethod
-    def is_final_state(state):
-        return state == OpSm.FINAL_STATE
+    def is_complete_state(state):
+        if not isinstance(state, SubvolumeStates):
+            raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state))
+        return state == SubvolumeStates.STATE_COMPLETE
 
     @staticmethod
     def is_failed_state(state):
-        return state == OpSm.FAILED_STATE or state == OpSm.CANCEL_STATE
+        if not isinstance(state, SubvolumeStates):
+            raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state))
+        return state == SubvolumeStates.STATE_FAILED or state == SubvolumeStates.STATE_CANCELED
 
     @staticmethod
     def is_init_state(stm_type, state):
-        stm = OpSm.STATE_MACHINES_TYPES.get(stm_type, None)
-        if not stm:
-            raise OpSmException(-errno.ENOENT, "state machine type '{0}' not found".format(stm_type))
-        init_state = stm.get(OpSm.INIT_STATE_KEY, None)
-        return init_state == state
+        if not isinstance(state, SubvolumeStates):
+            raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state))
+        return state == SubvolumeOpSm.get_init_state(stm_type)
 
     @staticmethod
     def get_init_state(stm_type):
-        stm = OpSm.STATE_MACHINES_TYPES.get(stm_type, None)
-        if not stm:
-            raise OpSmException(-errno.ENOENT, "state machine type '{0}' not found".format(stm_type))
-        init_state = stm.get(OpSm.INIT_STATE_KEY, None)
+        if not isinstance(stm_type, SubvolumeTypes):
+            raise OpSmException(-errno.EINVAL, "unknown state machine '{0}'".format(stm_type))
+        init_state =  SubvolumeOpSm.transition_table[TransitionKey(stm_type,
+                                                     SubvolumeStates.STATE_INIT,
+                                                     SubvolumeActions.ACTION_NONE)]
         if not init_state:
-            raise OpSmException(-errno.ENOENT, "initial state unavailable for state machine '{0}'".format(stm_type))
+            raise OpSmException(-errno.ENOENT, "initial state for state machine '{0}' not found".format(stm_type))
         return init_state
 
     @staticmethod
-    def get_next_state(stm_type, current_state, ret):
-        stm = OpSm.STATE_MACHINES_TYPES.get(stm_type, None)
-        if not stm:
-            raise OpSmException(-errno.ENOENT, "state machine type '{0}' not found".format(stm_type))
-        next_state = stm.get(current_state, None)
-        if not next_state:
-            raise OpSmException(-errno.EINVAL, "invalid current state '{0}'".format(current_state))
-        if ret == 0:
-            return next_state[0]
-        elif ret == -errno.EINTR:
-            return next_state[1][1]
-        else:
-            return next_state[1][0]
+    def transition(stm_type, current_state, action):
+        if not isinstance(stm_type, SubvolumeTypes):
+            raise OpSmException(-errno.EINVAL, "unknown state machine '{0}'".format(stm_type))
+        if not isinstance(current_state, SubvolumeStates):
+            raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(current_state))
+        if not isinstance(action, SubvolumeActions):
+            raise OpSmException(-errno.EINVAL, "unknown action '{0}'".format(action))
+
+        transition = SubvolumeOpSm.transition_table[TransitionKey(stm_type, current_state, action)]
+        if not transition:
+            raise OpSmException(-errno.EINVAL, "invalid action '{0}' on current state {1} for state machine '{2}'".format(action, current_state, stm_type))
+
+        return transition
+
+SubvolumeOpSm.transition_table = {
+    # state transitions for state machine type TYPE_NORMAL
+    TransitionKey(SubvolumeTypes.TYPE_NORMAL,
+                  SubvolumeStates.STATE_INIT,
+                  SubvolumeActions.ACTION_NONE) : SubvolumeStates.STATE_COMPLETE,
+
+    # state transitions for state machine type TYPE_CLONE
+    TransitionKey(SubvolumeTypes.TYPE_CLONE,
+                  SubvolumeStates.STATE_INIT,
+                  SubvolumeActions.ACTION_NONE) : SubvolumeStates.STATE_PENDING,
+
+    TransitionKey(SubvolumeTypes.TYPE_CLONE,
+                  SubvolumeStates.STATE_PENDING,
+                  SubvolumeActions.ACTION_SUCCESS) : SubvolumeStates.STATE_INPROGRESS,
+
+    TransitionKey(SubvolumeTypes.TYPE_CLONE,
+                  SubvolumeStates.STATE_PENDING,
+                  SubvolumeActions.ACTION_CANCELLED) : SubvolumeStates.STATE_CANCELED,
+
+    TransitionKey(SubvolumeTypes.TYPE_CLONE,
+                  SubvolumeStates.STATE_INPROGRESS,
+                  SubvolumeActions.ACTION_SUCCESS) : SubvolumeStates.STATE_COMPLETE,
+
+    TransitionKey(SubvolumeTypes.TYPE_CLONE,
+                  SubvolumeStates.STATE_INPROGRESS,
+                  SubvolumeActions.ACTION_CANCELLED) : SubvolumeStates.STATE_CANCELED,
+
+    TransitionKey(SubvolumeTypes.TYPE_CLONE,
+                  SubvolumeStates.STATE_INPROGRESS,
+                  SubvolumeActions.ACTION_FAILED) : SubvolumeStates.STATE_FAILED,
+}
index 70e5dbb5f1d0b11084a2bf0e35c2bb36e14f6053..46ec0f28f74c1d0cfd1f430070a26d8cfac4ef62 100644 (file)
@@ -7,6 +7,7 @@ import cephfs
 from .snapshot_util import mksnap, rmsnap
 from ..fs_util import listdir, get_ancestor_xattr
 from ..exception import VolumeException
+from .template import SubvolumeOpType
 
 from .versions import loaded_subvolumes
 
@@ -56,14 +57,14 @@ def remove_subvol(fs, vol_spec, group, subvolname, force=False):
     :param force: force remove subvolumes
     :return: None
     """
-    nc_flag = True if not force else False
-    with open_subvol(fs, vol_spec, group, subvolname, need_complete=nc_flag) as subvolume:
+    op_type = SubvolumeOpType.REMOVE if not force else SubvolumeOpType.REMOVE_FORCE
+    with open_subvol(fs, vol_spec, group, subvolname, op_type) as subvolume:
         if subvolume.list_snapshots():
             raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(subvolname))
         subvolume.remove()
 
 @contextmanager
-def open_subvol(fs, vol_spec, group, subvolname, need_complete=True, expected_types=[]):
+def open_subvol(fs, vol_spec, group, subvolname, op_type):
     """
     open a subvolume. This API is to be used as a context manager.
 
@@ -71,12 +72,9 @@ def open_subvol(fs, vol_spec, group, subvolname, need_complete=True, expected_ty
     :param vol_spec: volume specification
     :param group: group object for the subvolume
     :param subvolname: subvolume name
-    :param need_complete: check if the subvolume is usable (since cloned subvolumes can
-                          be in transient state). defaults to True.
-    :param expected_types: check if the subvolume is one the provided types. defaults to
-                           all.
+    :param op_type: operation type for which subvolume is being opened
     :return: yields a subvolume object (subclass of SubvolumeTemplate)
     """
     subvolume = loaded_subvolumes.get_subvolume_object(fs, vol_spec, group, subvolname)
-    subvolume.open(need_complete, expected_types)
+    subvolume.open(op_type)
     yield subvolume
index 40d9efb5931f3ccbcdedecb69e832e8365c5c42e..dc82766fcb9ec6f52dd3c0f3442b58907b362735 100644 (file)
@@ -1,5 +1,7 @@
 import errno
 
+from enum import Enum, unique
+
 from ..exception import VolumeException
 
 class GroupTemplate(object):
@@ -33,6 +35,28 @@ class GroupTemplate(object):
         """
         raise VolumeException(-errno.ENOTSUP, "operation not supported.")
 
+@unique
+class SubvolumeOpType(Enum):
+    CREATE          = 'create'
+    REMOVE          = 'rm'
+    REMOVE_FORCE    = 'rm-force'
+    PIN             = 'pin'
+    LIST            = 'ls'
+    GETPATH         = 'getpath'
+    INFO            = 'info'
+    RESIZE          = 'resize'
+    SNAP_CREATE     = 'snap-create'
+    SNAP_REMOVE     = 'snap-rm'
+    SNAP_LIST       = 'snap-ls'
+    SNAP_INFO       = 'snap-info'
+    SNAP_PROTECT    = 'snap-protect'
+    SNAP_UNPROTECT  = 'snap-unprotect'
+    CLONE_SOURCE    = 'clone-source'
+    CLONE_CREATE    = 'clone-create'
+    CLONE_STATUS    = 'clone-status'
+    CLONE_CANCEL    = 'clone-cancel'
+    CLONE_INTERNAL  = 'clone_internal'
+
 class SubvolumeTemplate(object):
     VERSION = None
 
@@ -40,7 +64,7 @@ class SubvolumeTemplate(object):
     def version():
         return SubvolumeTemplate.VERSION
 
-    def open(self, need_complete=True, expected_types=[]):
+    def open(self, op_type):
         raise VolumeException(-errno.ENOTSUP, "operation not supported.")
 
     def status(self):
index f79631b04990b51c858bb493dedc4dfcfd161a85..0569bd8b5d7e3488a768de004a2cc49daa08d6d6 100644 (file)
@@ -5,7 +5,8 @@ import importlib
 import cephfs
 
 from .subvolume_base import SubvolumeBase
-from ..op_sm import OpSm
+from ..op_sm import SubvolumeOpSm
+from ..op_sm import SubvolumeTypes
 from ...exception import MetadataMgrException, OpSmException, VolumeException
 
 log = logging.getLogger(__name__)
@@ -50,9 +51,9 @@ class SubvolumeLoader(object):
             fs.mkdirs(subvolume.legacy_dir, 0o700)
         except cephfs.Error as e:
             raise VolumeException(-e.args[0], "error accessing subvolume")
-        subvolume_type = SubvolumeBase.SUBVOLUME_TYPE_NORMAL
+        subvolume_type = SubvolumeTypes.TYPE_NORMAL
         try:
-            initial_state = OpSm.get_init_state(subvolume_type)
+            initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
         except OpSmException as oe:
             raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
         qpath = subvolume.base_path.decode('utf-8')
index 523f8e3d70edf96625f546fb86b6c9e93489a99e..5108cfd83294840629556b87f50b9ec24cd27d67 100644 (file)
@@ -20,12 +20,23 @@ class SubvolumeFeatures(Enum):
     FEATURE_SNAPSHOT_CLONE       = "snapshot-clone"
     FEATURE_SNAPSHOT_AUTOPROTECT = "snapshot-autoprotect"
 
+@unique
+class SubvolumeTypes(Enum):
+    TYPE_NORMAL  = "subvolume"
+    TYPE_CLONE   = "clone"
+
+    @staticmethod
+    def from_value(value):
+        if value == "subvolume":
+            return SubvolumeTypes.TYPE_NORMAL
+        if value == "clone":
+            return SubvolumeTypes.TYPE_CLONE
+
+        raise VolumeException(-errno.EINVAL, "invalid subvolume type '{0}'".format(value))
+
 class SubvolumeBase(object):
     LEGACY_CONF_DIR = "_legacy"
 
-    SUBVOLUME_TYPE_NORMAL = "subvolume"
-    SUBVOLUME_TYPE_CLONE  = "clone"
-
     def __init__(self, fs, vol_spec, group, subvolname, legacy=False):
         self.fs = fs
         self.cmode = None
@@ -210,7 +221,7 @@ class SubvolumeBase(object):
         return pin(self.fs, self.base_path, pin_type, pin_setting)
 
     def init_config(self, version, subvolume_type, subvolume_path, subvolume_state):
-        self.metadata_mgr.init(version, subvolume_type, subvolume_path, subvolume_state)
+        self.metadata_mgr.init(version, subvolume_type.value, subvolume_path, subvolume_state.value)
         self.metadata_mgr.flush()
 
     def discover(self):
@@ -248,7 +259,7 @@ class SubvolumeBase(object):
 
     def info (self):
         subvolpath = self.metadata_mgr.get_global_option('path')
-        etype = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE)
+        etype = SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
         st = self.fs.statx(subvolpath, cephfs.CEPH_STATX_BTIME | cephfs.CEPH_STATX_SIZE |
                                        cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID |
                                        cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_ATIME |
@@ -266,7 +277,7 @@ class SubvolumeBase(object):
         except cephfs.Error as e:
             raise VolumeException(-e.args[0], e.args[1])
 
-        return {'path': subvolpath, 'type': etype, 'uid': int(st["uid"]), 'gid': int(st["gid"]),
+        return {'path': subvolpath, 'type': etype.value, 'uid': int(st["uid"]), 'gid': int(st["gid"]),
             'atime': str(st["atime"]), 'mtime': str(st["mtime"]), 'ctime': str(st["ctime"]),
             'mode': int(st["mode"]), 'data_pool': data_pool, 'created_at': str(st["btime"]),
             'bytes_quota': "infinite" if nsize == 0 else nsize, 'bytes_used': int(usedbytes),
index 73363d3dc5643b23c433f9b014975024a90cf5f3..3c0625b8a5d08c89e82a40682878e2fa6e187f80 100644 (file)
@@ -9,11 +9,14 @@ import cephfs
 
 from .metadata_manager import MetadataManager
 from .subvolume_base import SubvolumeBase, SubvolumeFeatures
-from ..op_sm import OpSm
+from ..op_sm import SubvolumeOpSm
+from ..op_sm import SubvolumeTypes
+from ..op_sm import SubvolumeStates
 from ..template import SubvolumeTemplate
 from ..snapshot_util import mksnap, rmsnap
 from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException
 from ...fs_util import listdir
+from ..template import SubvolumeOpType
 
 from ..clone_index import open_clone_index, create_clone_index
 
@@ -39,9 +42,9 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
         return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value, SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value]
 
     def create(self, size, isolate_nspace, pool, mode, uid, gid):
-        subvolume_type = SubvolumeBase.SUBVOLUME_TYPE_NORMAL
+        subvolume_type = SubvolumeTypes.TYPE_NORMAL
         try:
-            initial_state = OpSm.get_init_state(subvolume_type)
+            initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
         except OpSmException as oe:
             raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
 
@@ -84,9 +87,9 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
             self.metadata_mgr.flush()
 
     def create_clone(self, pool, source_volname, source_subvolume, snapname):
-        subvolume_type = SubvolumeBase.SUBVOLUME_TYPE_CLONE
+        subvolume_type = SubvolumeTypes.TYPE_CLONE
         try:
-            initial_state = OpSm.get_init_state(subvolume_type)
+            initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
         except OpSmException as oe:
             raise VolumeException(-errno.EINVAL, "clone failed: internal error")
 
@@ -98,7 +101,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
 
             # persist subvolume metadata and clone source
             qpath = subvol_path.decode('utf-8')
-            self.metadata_mgr.init(SubvolumeV1.VERSION, subvolume_type, qpath, initial_state)
+            self.metadata_mgr.init(SubvolumeV1.VERSION, subvolume_type.value, qpath, initial_state.value)
             self.add_clone_source(source_volname, source_subvolume, snapname)
             self.metadata_mgr.flush()
         except (VolumeException, MetadataMgrException, cephfs.Error) as e:
@@ -115,21 +118,48 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
                 e = VolumeException(-e.args[0], e.args[1])
             raise e
 
-    def open(self, need_complete=True, expected_types=[]):
+    def allowed_ops_by_type(self, vol_type):
+        if vol_type == SubvolumeTypes.TYPE_CLONE:
+            return {op_type for op_type in SubvolumeOpType}
+
+        if vol_type == SubvolumeTypes.TYPE_NORMAL:
+            return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS,
+                                                               SubvolumeOpType.CLONE_CANCEL,
+                                                               SubvolumeOpType.CLONE_INTERNAL}
+
+        return {}
+
+    def allowed_ops_by_state(self, vol_state):
+        if vol_state == SubvolumeStates.STATE_COMPLETE:
+            return {op_type for op_type in SubvolumeOpType}
+
+        return {SubvolumeOpType.REMOVE_FORCE,
+                SubvolumeOpType.CLONE_CREATE,
+                SubvolumeOpType.CLONE_STATUS,
+                SubvolumeOpType.CLONE_CANCEL,
+                SubvolumeOpType.CLONE_INTERNAL}
+
+    def open(self, op_type):
+        if not isinstance(op_type, SubvolumeOpType):
+            raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format(
+                                  op_type.value, self.subvolname))
         try:
             self.metadata_mgr.refresh()
+
+            etype = SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
+            if op_type not in self.allowed_ops_by_type(etype):
+                raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format(
+                                      op_type.value, self.subvolname, etype.value))
+
+            estate = self.state
+            if op_type not in self.allowed_ops_by_state(estate):
+                raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format(
+                                      self.subvolname, op_type.value))
+
             subvol_path = self.path
             log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path))
             st = self.fs.stat(subvol_path)
-            etype = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE)
-            if len(expected_types) and not etype in expected_types:
-                raise VolumeException(-errno.ENOTSUP, "subvolume '{0}' is not {1}".format(
-                    self.subvolname, "a {0}".format(expected_types[0]) if len(expected_types) == 1 else \
-                    "one of types ({0})".format(",".join(expected_types))))
-            if need_complete:
-                estate = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE)
-                if not OpSm.is_final_state(estate):
-                    raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for use".format(self.subvolname))
+
             self.uid = int(st.st_uid)
             self.gid = int(st.st_gid)
             self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode))
@@ -164,22 +194,22 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
 
     @property
     def status(self):
-        state = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE)
-        subvolume_type = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE)
+        state = SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
+        subvolume_type = SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
         subvolume_status = {
-            'state' : state
+            'state' : state.value
         }
-        if not OpSm.is_final_state(state) and subvolume_type == SubvolumeBase.SUBVOLUME_TYPE_CLONE:
+        if not SubvolumeOpSm.is_complete_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE:
             subvolume_status["source"] = self._get_clone_source()
         return subvolume_status
 
     @property
     def state(self):
-        return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE)
+        return SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
 
     @state.setter
     def state(self, val):
-        state = val[0]
+        state = val[0].value
         flush = val[1]
         self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, state)
         if flush:
index 065b1fe6463159e09b025bc79290ad8b07fe85ea..612cd882cc3c3b8593bef2b5a9e28494c70db691 100644 (file)
@@ -18,6 +18,7 @@ from .vol_spec import VolSpec
 from .exception import VolumeException
 from .async_cloner import Cloner
 from .purge_queue import ThreadPoolPurgeQueueMixin
+from .operations.template import SubvolumeOpType
 
 log = logging.getLogger(__name__)
 
@@ -158,7 +159,7 @@ class VolumeClient(CephfsClient):
             with open_volume(self, volname) as fs_handle:
                 with open_group(fs_handle, self.volspec, groupname) as group:
                     try:
-                        with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+                        with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CREATE) as subvolume:
                             # idempotent creation -- valid. Attributes set is supported.
                             uid = uid if uid else subvolume.uid
                             gid = gid if gid else subvolume.gid
@@ -208,7 +209,7 @@ class VolumeClient(CephfsClient):
         try:
             with open_volume(self, volname) as fs_handle:
                 with open_group(fs_handle, self.volspec, groupname) as group:
-                    with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+                    with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.RESIZE) as subvolume:
                         nsize, usedbytes = subvolume.resize(newsize, noshrink)
                         ret = 0, json.dumps(
                             [{'bytes_used': usedbytes},{'bytes_quota': nsize},
@@ -229,7 +230,7 @@ class VolumeClient(CephfsClient):
         try:
             with open_volume(self, volname) as fs_handle:
                 with open_group(fs_handle, self.volspec, groupname) as group:
-                    with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+                    with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.PIN) as subvolume:
                         subvolume.pin(pin_type, pin_setting)
                         ret = 0, json.dumps({}), ""
         except VolumeException as ve:
@@ -245,7 +246,7 @@ class VolumeClient(CephfsClient):
         try:
             with open_volume(self, volname) as fs_handle:
                 with open_group(fs_handle, self.volspec, groupname) as group:
-                    with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+                    with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.GETPATH) as subvolume:
                         subvolpath = subvolume.path
                         ret = 0, subvolpath.decode("utf-8"), ""
         except VolumeException as ve:
@@ -261,7 +262,7 @@ class VolumeClient(CephfsClient):
         try:
             with open_volume(self, volname) as fs_handle:
                 with open_group(fs_handle, self.volspec, groupname) as group:
-                    with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+                    with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.INFO) as subvolume:
                         mon_addr_lst = []
                         mon_map_mons = self.mgr.get('mon_map')['mons']
                         for mon in mon_map_mons:
@@ -301,7 +302,7 @@ class VolumeClient(CephfsClient):
         try:
             with open_volume(self, volname) as fs_handle:
                 with open_group(fs_handle, self.volspec, groupname) as group:
-                    with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+                    with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_CREATE) as subvolume:
                         subvolume.create_snapshot(snapname)
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
@@ -318,7 +319,7 @@ class VolumeClient(CephfsClient):
         try:
             with open_volume(self, volname) as fs_handle:
                 with open_group(fs_handle, self.volspec, groupname) as group:
-                    with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+                    with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_REMOVE) as subvolume:
                         subvolume.remove_snapshot(snapname)
         except VolumeException as ve:
             if not (ve.errno == -errno.ENOENT and force):
@@ -335,7 +336,7 @@ class VolumeClient(CephfsClient):
         try:
             with open_volume(self, volname) as fs_handle:
                 with open_group(fs_handle, self.volspec, groupname) as group:
-                    with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+                    with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_INFO) as subvolume:
                         snap_info_dict = subvolume.snapshot_info(snapname)
                         ret = 0, json.dumps(snap_info_dict, indent=4, sort_keys=True), ""
         except VolumeException as ve:
@@ -351,7 +352,7 @@ class VolumeClient(CephfsClient):
         try:
             with open_volume(self, volname) as fs_handle:
                 with open_group(fs_handle, self.volspec, groupname) as group:
-                    with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+                    with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_LIST) as subvolume:
                         snapshots = subvolume.list_snapshots()
                         ret = 0, name_to_json(snapshots), ""
         except VolumeException as ve:
@@ -367,7 +368,7 @@ class VolumeClient(CephfsClient):
         try:
             with open_volume(self, volname) as fs_handle:
                 with open_group(fs_handle, self.volspec, groupname) as group:
-                    with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+                    with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_PROTECT) as subvolume:
                         log.warning("snapshot protect call is deprecated and will be removed in a future release")
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
@@ -382,7 +383,7 @@ class VolumeClient(CephfsClient):
         try:
             with open_volume(self, volname) as fs_handle:
                 with open_group(fs_handle, self.volspec, groupname) as group:
-                    with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+                    with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_UNPROTECT) as subvolume:
                         log.warning("snapshot unprotect call is deprecated and will be removed in a future release")
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
@@ -390,7 +391,7 @@ class VolumeClient(CephfsClient):
 
     def _prepare_clone_subvolume(self, fs_handle, volname, subvolume, snapname, target_group, target_subvolname, target_pool):
         create_clone(fs_handle, self.volspec, target_group, target_subvolname, target_pool, volname, subvolume, snapname)
-        with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, need_complete=False) as target_subvolume:
+        with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_INTERNAL) as target_subvolume:
             try:
                 subvolume.attach_snapshot(snapname, target_subvolume)
                 self.cloner.queue_job(volname)
@@ -414,7 +415,7 @@ class VolumeClient(CephfsClient):
         # TODO: when the target group is same as source, reuse group object.
         with open_group(fs_handle, self.volspec, target_groupname) as target_group:
             try:
-                with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, need_complete=False):
+                with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_CREATE):
                     raise VolumeException(-errno.EEXIST, "subvolume '{0}' exists".format(target_subvolname))
             except VolumeException as ve:
                 if ve.errno == -errno.ENOENT:
@@ -432,7 +433,7 @@ class VolumeClient(CephfsClient):
         try:
             with open_volume(self, volname) as fs_handle:
                 with open_group(fs_handle, self.volspec, groupname) as group:
-                    with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+                    with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CLONE_SOURCE) as subvolume:
                         self._clone_subvolume_snapshot(fs_handle, volname, subvolume, **kwargs)
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
@@ -447,8 +448,7 @@ class VolumeClient(CephfsClient):
         try:
             with open_volume(self, volname) as fs_handle:
                 with open_group(fs_handle, self.volspec, groupname) as group:
-                    with open_subvol(fs_handle, self.volspec, group, clonename,
-                                     need_complete=False, expected_types=["clone"]) as subvolume:
+                    with open_subvol(fs_handle, self.volspec, group, clonename, SubvolumeOpType.CLONE_STATUS) as subvolume:
                         ret = 0, json.dumps({'status' : subvolume.status}, indent=2), ""
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)