From: Shyamsundar Ranganathan Date: Thu, 2 Jul 2020 01:08:34 +0000 (-0400) Subject: mgr/volumes: Use operation type during subvolume open X-Git-Tag: wip-pdonnell-testing-20200918.022351~489^2~5 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=f380bf79435ccae21df12d19e530ef0d89a5cc8d;p=ceph-ci.git mgr/volumes: Use operation type during subvolume open Subvolume open currently takes in 2 optional parameters to denote desired state and type. This enables the open to allow the operation to suceed based on the (type, state) tuple. Instead, pass an operation type to be performed on a subvolume during open, and decide internal to a subvolume version if the operation is allowed based on its state and type. Also modifies the state machine code, to be more amenable to modifications and improves redability. Signed-off-by: Shyamsundar Ranganathan --- diff --git a/src/pybind/mgr/volumes/fs/async_cloner.py b/src/pybind/mgr/volumes/fs/async_cloner.py index 7cca7da8ec2..25ec793af6d 100644 --- a/src/pybind/mgr/volumes/fs/async_cloner.py +++ b/src/pybind/mgr/volumes/fs/async_cloner.py @@ -10,12 +10,17 @@ import cephfs from .async_job import AsyncJobs from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException from .fs_util import copy_file -from .operations.op_sm import OpSm +from .operations.op_sm import SubvolumeOpSm +from .operations.op_sm import SubvolumeTypes +from .operations.op_sm import SubvolumeActions +from .operations.op_sm import SubvolumeStates from .operations.resolver import resolve from .operations.volume import open_volume, open_volume_lockless from .operations.group import open_group from .operations.subvolume import open_subvol from .operations.clone_index import open_clone_index +from .operations.template import SubvolumeOpType +from .operations.versions import SubvolumeBase log = logging.getLogger(__name__) @@ -38,38 +43,52 @@ def get_next_clone_entry(volume_client, volname, running_jobs): return ve.errno, None @contextmanager -def open_at_volume(volume_client, volname, groupname, subvolname, need_complete=False, expected_types=[]): +def open_at_volume(volume_client, volname, groupname, subvolname, op_type): with open_volume(volume_client, volname) as fs_handle: with open_group(fs_handle, volume_client.volspec, groupname) as group: - with open_subvol(fs_handle, volume_client.volspec, group, subvolname, - need_complete, expected_types) as subvolume: + with open_subvol(fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume: yield subvolume @contextmanager -def open_at_group(volume_client, fs_handle, groupname, subvolname, need_complete=False, expected_types=[]): +def open_at_group(volume_client, fs_handle, groupname, subvolname, op_type): with open_group(fs_handle, volume_client.volspec, groupname) as group: - with open_subvol(fs_handle, volume_client.volspec, group, subvolname, - need_complete, expected_types) as subvolume: + with open_subvol(fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume: yield subvolume def get_clone_state(volume_client, volname, groupname, subvolname): - with open_at_volume(volume_client, volname, groupname, subvolname) as subvolume: + with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume: return subvolume.state def set_clone_state(volume_client, volname, groupname, subvolname, state): - with open_at_volume(volume_client, volname, groupname, subvolname) as subvolume: + with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume: subvolume.state = (state, True) def get_clone_source(clone_subvolume): source = clone_subvolume._get_clone_source() return (source['volume'], source.get('group', None), source['subvolume'], source['snapshot']) +def get_next_state_on_error(errnum): + if errnum == -errno.EINTR: + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_CANCELLED) + else: + # jump to failed state, on all other errors + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_FAILED) + return next_state + def handle_clone_pending(volume_client, volname, index, groupname, subvolname, should_cancel): try: if should_cancel(): - next_state = OpSm.get_next_state("clone", "pending", -errno.EINTR) + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_PENDING, + SubvolumeActions.ACTION_CANCELLED) else: - next_state = OpSm.get_next_state("clone", "pending", 0) + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_PENDING, + SubvolumeActions.ACTION_SUCCESS) except OpSmException as oe: raise VolumeException(oe.errno, oe.error_str) return (next_state, False) @@ -147,9 +166,9 @@ def bulk_copy(fs_handle, source_path, dst_path, should_cancel): def do_clone(volume_client, volname, groupname, subvolname, should_cancel): with open_volume_lockless(volume_client, volname) as fs_handle: - with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume: + with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume: s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume) - with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume: + with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume: src_path = source_subvolume.snapshot_path(s_snapname) dst_path = clone_subvolume.path bulk_copy(fs_handle, src_path, dst_path, should_cancel) @@ -157,10 +176,11 @@ def do_clone(volume_client, volname, groupname, subvolname, should_cancel): def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel): try: do_clone(volume_client, volname, groupname, subvolname, should_cancel) - next_state = OpSm.get_next_state("clone", "in-progress", 0) + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_SUCCESS) except VolumeException as ve: - # jump to failed state - next_state = OpSm.get_next_state("clone", "in-progress", ve.errno) + next_state = get_next_state_on_error(ve.errno) except OpSmException as oe: raise VolumeException(oe.errno, oe.error_str) return (next_state, False) @@ -169,9 +189,9 @@ def handle_clone_failed(volume_client, volname, index, groupname, subvolname, sh try: # detach source but leave the clone section intact for later inspection with open_volume(volume_client, volname) as fs_handle: - with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume: + with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume: s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume) - with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume: + with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume: source_subvolume.detach_snapshot(s_snapname, index) except (MetadataMgrException, VolumeException) as e: log.error("failed to detach clone from snapshot: {0}".format(e)) @@ -180,9 +200,9 @@ def handle_clone_failed(volume_client, volname, index, groupname, subvolname, sh def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel): try: with open_volume(volume_client, volname) as fs_handle: - with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume: + with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume: s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume) - with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume: + with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume: source_subvolume.detach_snapshot(s_snapname, index) clone_subvolume.remove_clone_source(flush=True) except (MetadataMgrException, VolumeException) as e: @@ -227,29 +247,29 @@ def clone(volume_client, volname, index, clone_path, state_table, should_cancel) class Cloner(AsyncJobs): """ Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume. - this relies on a simple state machine (which mimics states from OpSm class) as + this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as the driver. file types supported are directories, symbolic links and regular files. """ def __init__(self, volume_client, tp_size): self.vc = volume_client self.state_table = { - 'pending' : handle_clone_pending, - 'in-progress' : handle_clone_in_progress, - 'complete' : handle_clone_complete, - 'failed' : handle_clone_failed, - 'canceled' : handle_clone_failed, + SubvolumeStates.STATE_PENDING : handle_clone_pending, + SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress, + SubvolumeStates.STATE_COMPLETE : handle_clone_complete, + SubvolumeStates.STATE_FAILED : handle_clone_failed, + SubvolumeStates.STATE_CANCELED : handle_clone_failed, } super(Cloner, self).__init__(volume_client, "cloner", tp_size) def is_clone_cancelable(self, clone_state): - return not (OpSm.is_final_state(clone_state) or OpSm.is_failed_state(clone_state)) + return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state)) def get_clone_tracking_index(self, fs_handle, clone_subvolume): with open_clone_index(fs_handle, self.vc.volspec) as index: return index.find_clone_entry_index(clone_subvolume.base_path) def _cancel_pending_clone(self, fs_handle, clone_subvolume, status, track_idx): - clone_state = status['state'] + clone_state = SubvolumeStates.from_value(status['state']) assert self.is_clone_cancelable(clone_state) s_groupname = status['source'].get('group', None) @@ -257,8 +277,10 @@ class Cloner(AsyncJobs): s_snapname = status['source']['snapshot'] with open_group(fs_handle, self.vc.volspec, s_groupname) as s_group: - with open_subvol(fs_handle, self.vc.volspec, s_group, s_subvolname) as s_subvolume: - next_state = OpSm.get_next_state("clone", clone_state, -errno.EINTR) + with open_subvol(fs_handle, self.vc.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume: + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + clone_state, + SubvolumeActions.ACTION_CANCELLED) clone_subvolume.state = (next_state, True) s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8')) @@ -273,17 +295,16 @@ class Cloner(AsyncJobs): try: with open_volume(self.vc, volname) as fs_handle: with open_group(fs_handle, self.vc.volspec, groupname) as group: - with open_subvol(fs_handle, self.vc.volspec, group, clonename, - need_complete=False, expected_types=["clone"]) as clone_subvolume: + with open_subvol(fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume: status = clone_subvolume.status - clone_state = status['state'] + clone_state = SubvolumeStates.from_value(status['state']) if not self.is_clone_cancelable(clone_state): raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)") track_idx = self.get_clone_tracking_index(fs_handle, clone_subvolume) if not track_idx: log.warning("cannot lookup clone tracking index for {0}".format(clone_subvolume.base_path)) raise VolumeException(-errno.EINVAL, "error canceling clone") - if OpSm.is_init_state("clone", clone_state): + if SubvolumeOpSm.is_init_state(SubvolumeTypes.TYPE_CLONE, clone_state): # clone has not started yet -- cancel right away. self._cancel_pending_clone(fs_handle, clone_subvolume, status, track_idx) return @@ -294,8 +315,7 @@ class Cloner(AsyncJobs): with self.lock: with open_volume_lockless(self.vc, volname) as fs_handle: with open_group(fs_handle, self.vc.volspec, groupname) as group: - with open_subvol(fs_handle, self.vc.volspec, group, clonename, - need_complete=False, expected_types=["clone"]) as clone_subvolume: + with open_subvol(fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume: if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)): raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)") except (IndexException, MetadataMgrException) as e: diff --git a/src/pybind/mgr/volumes/fs/operations/op_sm.py b/src/pybind/mgr/volumes/fs/operations/op_sm.py index a5f44f4d00d..74ce838ee8b 100644 --- a/src/pybind/mgr/volumes/fs/operations/op_sm.py +++ b/src/pybind/mgr/volumes/fs/operations/op_sm.py @@ -1,68 +1,132 @@ import errno +from enum import Enum, unique from typing import Dict +from .versions.subvolume_base import SubvolumeTypes from ..exception import OpSmException -class OpSm(object): - INIT_STATE_KEY = 'init' +@unique +class SubvolumeStates(Enum): + STATE_INIT = 'init' + STATE_PENDING = 'pending' + STATE_INPROGRESS = 'in-progress' + STATE_FAILED = 'failed' + STATE_COMPLETE = 'complete' + STATE_CANCELED = 'canceled' - FAILED_STATE = 'failed' - FINAL_STATE = 'complete' - CANCEL_STATE = 'canceled' + @staticmethod + def from_value(value): + if value == "init": + return SubvolumeStates.STATE_INIT + if value == "pending": + return SubvolumeStates.STATE_PENDING + if value == "in-progress": + return SubvolumeStates.STATE_INPROGRESS + if value == "failed": + return SubvolumeStates.STATE_FAILED + if value == "complete": + return SubvolumeStates.STATE_COMPLETE + if value == "canceled": + return SubvolumeStates.STATE_CANCELED + + raise OpSmException(-errno.EINVAL, "invalid state '{0}'".format(value)) + +@unique +class SubvolumeActions(Enum): + ACTION_NONE = 0 + ACTION_SUCCESS = 1 + ACTION_FAILED = 2 + ACTION_CANCELLED = 3 + +class TransitionKey(object): + def __init__(self, subvol_type, state, action_type): + self.transition_key = [subvol_type, state, action_type] - OP_SM_SUBVOLUME = { - INIT_STATE_KEY : FINAL_STATE, - } + def __hash__(self): + return hash(tuple(self.transition_key)) - OP_SM_CLONE = { - INIT_STATE_KEY : 'pending', - 'pending' : ('in-progress', (FAILED_STATE, CANCEL_STATE)), - 'in-progress' : (FINAL_STATE, (FAILED_STATE, CANCEL_STATE)), - } # type: Dict + def __eq__(self, other): + return self.transition_key == other.transition_key - STATE_MACHINES_TYPES = { - "subvolume" : OP_SM_SUBVOLUME, - "clone" : OP_SM_CLONE, - } # type: Dict + def __neq__(self, other): + return not(self == other) + +class SubvolumeOpSm(object): + transition_table = {} @staticmethod - def is_final_state(state): - return state == OpSm.FINAL_STATE + def is_complete_state(state): + if not isinstance(state, SubvolumeStates): + raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state)) + return state == SubvolumeStates.STATE_COMPLETE @staticmethod def is_failed_state(state): - return state == OpSm.FAILED_STATE or state == OpSm.CANCEL_STATE + if not isinstance(state, SubvolumeStates): + raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state)) + return state == SubvolumeStates.STATE_FAILED or state == SubvolumeStates.STATE_CANCELED @staticmethod def is_init_state(stm_type, state): - stm = OpSm.STATE_MACHINES_TYPES.get(stm_type, None) - if not stm: - raise OpSmException(-errno.ENOENT, "state machine type '{0}' not found".format(stm_type)) - init_state = stm.get(OpSm.INIT_STATE_KEY, None) - return init_state == state + if not isinstance(state, SubvolumeStates): + raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state)) + return state == SubvolumeOpSm.get_init_state(stm_type) @staticmethod def get_init_state(stm_type): - stm = OpSm.STATE_MACHINES_TYPES.get(stm_type, None) - if not stm: - raise OpSmException(-errno.ENOENT, "state machine type '{0}' not found".format(stm_type)) - init_state = stm.get(OpSm.INIT_STATE_KEY, None) + if not isinstance(stm_type, SubvolumeTypes): + raise OpSmException(-errno.EINVAL, "unknown state machine '{0}'".format(stm_type)) + init_state = SubvolumeOpSm.transition_table[TransitionKey(stm_type, + SubvolumeStates.STATE_INIT, + SubvolumeActions.ACTION_NONE)] if not init_state: - raise OpSmException(-errno.ENOENT, "initial state unavailable for state machine '{0}'".format(stm_type)) + raise OpSmException(-errno.ENOENT, "initial state for state machine '{0}' not found".format(stm_type)) return init_state @staticmethod - def get_next_state(stm_type, current_state, ret): - stm = OpSm.STATE_MACHINES_TYPES.get(stm_type, None) - if not stm: - raise OpSmException(-errno.ENOENT, "state machine type '{0}' not found".format(stm_type)) - next_state = stm.get(current_state, None) - if not next_state: - raise OpSmException(-errno.EINVAL, "invalid current state '{0}'".format(current_state)) - if ret == 0: - return next_state[0] - elif ret == -errno.EINTR: - return next_state[1][1] - else: - return next_state[1][0] + def transition(stm_type, current_state, action): + if not isinstance(stm_type, SubvolumeTypes): + raise OpSmException(-errno.EINVAL, "unknown state machine '{0}'".format(stm_type)) + if not isinstance(current_state, SubvolumeStates): + raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(current_state)) + if not isinstance(action, SubvolumeActions): + raise OpSmException(-errno.EINVAL, "unknown action '{0}'".format(action)) + + transition = SubvolumeOpSm.transition_table[TransitionKey(stm_type, current_state, action)] + if not transition: + raise OpSmException(-errno.EINVAL, "invalid action '{0}' on current state {1} for state machine '{2}'".format(action, current_state, stm_type)) + + return transition + +SubvolumeOpSm.transition_table = { + # state transitions for state machine type TYPE_NORMAL + TransitionKey(SubvolumeTypes.TYPE_NORMAL, + SubvolumeStates.STATE_INIT, + SubvolumeActions.ACTION_NONE) : SubvolumeStates.STATE_COMPLETE, + + # state transitions for state machine type TYPE_CLONE + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INIT, + SubvolumeActions.ACTION_NONE) : SubvolumeStates.STATE_PENDING, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_PENDING, + SubvolumeActions.ACTION_SUCCESS) : SubvolumeStates.STATE_INPROGRESS, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_PENDING, + SubvolumeActions.ACTION_CANCELLED) : SubvolumeStates.STATE_CANCELED, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_SUCCESS) : SubvolumeStates.STATE_COMPLETE, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_CANCELLED) : SubvolumeStates.STATE_CANCELED, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_FAILED) : SubvolumeStates.STATE_FAILED, +} diff --git a/src/pybind/mgr/volumes/fs/operations/subvolume.py b/src/pybind/mgr/volumes/fs/operations/subvolume.py index 70e5dbb5f1d..46ec0f28f74 100644 --- a/src/pybind/mgr/volumes/fs/operations/subvolume.py +++ b/src/pybind/mgr/volumes/fs/operations/subvolume.py @@ -7,6 +7,7 @@ import cephfs from .snapshot_util import mksnap, rmsnap from ..fs_util import listdir, get_ancestor_xattr from ..exception import VolumeException +from .template import SubvolumeOpType from .versions import loaded_subvolumes @@ -56,14 +57,14 @@ def remove_subvol(fs, vol_spec, group, subvolname, force=False): :param force: force remove subvolumes :return: None """ - nc_flag = True if not force else False - with open_subvol(fs, vol_spec, group, subvolname, need_complete=nc_flag) as subvolume: + op_type = SubvolumeOpType.REMOVE if not force else SubvolumeOpType.REMOVE_FORCE + with open_subvol(fs, vol_spec, group, subvolname, op_type) as subvolume: if subvolume.list_snapshots(): raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(subvolname)) subvolume.remove() @contextmanager -def open_subvol(fs, vol_spec, group, subvolname, need_complete=True, expected_types=[]): +def open_subvol(fs, vol_spec, group, subvolname, op_type): """ open a subvolume. This API is to be used as a context manager. @@ -71,12 +72,9 @@ def open_subvol(fs, vol_spec, group, subvolname, need_complete=True, expected_ty :param vol_spec: volume specification :param group: group object for the subvolume :param subvolname: subvolume name - :param need_complete: check if the subvolume is usable (since cloned subvolumes can - be in transient state). defaults to True. - :param expected_types: check if the subvolume is one the provided types. defaults to - all. + :param op_type: operation type for which subvolume is being opened :return: yields a subvolume object (subclass of SubvolumeTemplate) """ subvolume = loaded_subvolumes.get_subvolume_object(fs, vol_spec, group, subvolname) - subvolume.open(need_complete, expected_types) + subvolume.open(op_type) yield subvolume diff --git a/src/pybind/mgr/volumes/fs/operations/template.py b/src/pybind/mgr/volumes/fs/operations/template.py index 40d9efb5931..dc82766fcb9 100644 --- a/src/pybind/mgr/volumes/fs/operations/template.py +++ b/src/pybind/mgr/volumes/fs/operations/template.py @@ -1,5 +1,7 @@ import errno +from enum import Enum, unique + from ..exception import VolumeException class GroupTemplate(object): @@ -33,6 +35,28 @@ class GroupTemplate(object): """ raise VolumeException(-errno.ENOTSUP, "operation not supported.") +@unique +class SubvolumeOpType(Enum): + CREATE = 'create' + REMOVE = 'rm' + REMOVE_FORCE = 'rm-force' + PIN = 'pin' + LIST = 'ls' + GETPATH = 'getpath' + INFO = 'info' + RESIZE = 'resize' + SNAP_CREATE = 'snap-create' + SNAP_REMOVE = 'snap-rm' + SNAP_LIST = 'snap-ls' + SNAP_INFO = 'snap-info' + SNAP_PROTECT = 'snap-protect' + SNAP_UNPROTECT = 'snap-unprotect' + CLONE_SOURCE = 'clone-source' + CLONE_CREATE = 'clone-create' + CLONE_STATUS = 'clone-status' + CLONE_CANCEL = 'clone-cancel' + CLONE_INTERNAL = 'clone_internal' + class SubvolumeTemplate(object): VERSION = None @@ -40,7 +64,7 @@ class SubvolumeTemplate(object): def version(): return SubvolumeTemplate.VERSION - def open(self, need_complete=True, expected_types=[]): + def open(self, op_type): raise VolumeException(-errno.ENOTSUP, "operation not supported.") def status(self): diff --git a/src/pybind/mgr/volumes/fs/operations/versions/__init__.py b/src/pybind/mgr/volumes/fs/operations/versions/__init__.py index f79631b0499..0569bd8b5d7 100644 --- a/src/pybind/mgr/volumes/fs/operations/versions/__init__.py +++ b/src/pybind/mgr/volumes/fs/operations/versions/__init__.py @@ -5,7 +5,8 @@ import importlib import cephfs from .subvolume_base import SubvolumeBase -from ..op_sm import OpSm +from ..op_sm import SubvolumeOpSm +from ..op_sm import SubvolumeTypes from ...exception import MetadataMgrException, OpSmException, VolumeException log = logging.getLogger(__name__) @@ -50,9 +51,9 @@ class SubvolumeLoader(object): fs.mkdirs(subvolume.legacy_dir, 0o700) except cephfs.Error as e: raise VolumeException(-e.args[0], "error accessing subvolume") - subvolume_type = SubvolumeBase.SUBVOLUME_TYPE_NORMAL + subvolume_type = SubvolumeTypes.TYPE_NORMAL try: - initial_state = OpSm.get_init_state(subvolume_type) + initial_state = SubvolumeOpSm.get_init_state(subvolume_type) except OpSmException as oe: raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error") qpath = subvolume.base_path.decode('utf-8') diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py index 523f8e3d70e..5108cfd8329 100644 --- a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py +++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py @@ -20,12 +20,23 @@ class SubvolumeFeatures(Enum): FEATURE_SNAPSHOT_CLONE = "snapshot-clone" FEATURE_SNAPSHOT_AUTOPROTECT = "snapshot-autoprotect" +@unique +class SubvolumeTypes(Enum): + TYPE_NORMAL = "subvolume" + TYPE_CLONE = "clone" + + @staticmethod + def from_value(value): + if value == "subvolume": + return SubvolumeTypes.TYPE_NORMAL + if value == "clone": + return SubvolumeTypes.TYPE_CLONE + + raise VolumeException(-errno.EINVAL, "invalid subvolume type '{0}'".format(value)) + class SubvolumeBase(object): LEGACY_CONF_DIR = "_legacy" - SUBVOLUME_TYPE_NORMAL = "subvolume" - SUBVOLUME_TYPE_CLONE = "clone" - def __init__(self, fs, vol_spec, group, subvolname, legacy=False): self.fs = fs self.cmode = None @@ -210,7 +221,7 @@ class SubvolumeBase(object): return pin(self.fs, self.base_path, pin_type, pin_setting) def init_config(self, version, subvolume_type, subvolume_path, subvolume_state): - self.metadata_mgr.init(version, subvolume_type, subvolume_path, subvolume_state) + self.metadata_mgr.init(version, subvolume_type.value, subvolume_path, subvolume_state.value) self.metadata_mgr.flush() def discover(self): @@ -248,7 +259,7 @@ class SubvolumeBase(object): def info (self): subvolpath = self.metadata_mgr.get_global_option('path') - etype = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE) + etype = SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE)) st = self.fs.statx(subvolpath, cephfs.CEPH_STATX_BTIME | cephfs.CEPH_STATX_SIZE | cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID | cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_ATIME | @@ -266,7 +277,7 @@ class SubvolumeBase(object): except cephfs.Error as e: raise VolumeException(-e.args[0], e.args[1]) - return {'path': subvolpath, 'type': etype, 'uid': int(st["uid"]), 'gid': int(st["gid"]), + return {'path': subvolpath, 'type': etype.value, 'uid': int(st["uid"]), 'gid': int(st["gid"]), 'atime': str(st["atime"]), 'mtime': str(st["mtime"]), 'ctime': str(st["ctime"]), 'mode': int(st["mode"]), 'data_pool': data_pool, 'created_at': str(st["btime"]), 'bytes_quota': "infinite" if nsize == 0 else nsize, 'bytes_used': int(usedbytes), diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py index 73363d3dc56..3c0625b8a5d 100644 --- a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py +++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py @@ -9,11 +9,14 @@ import cephfs from .metadata_manager import MetadataManager from .subvolume_base import SubvolumeBase, SubvolumeFeatures -from ..op_sm import OpSm +from ..op_sm import SubvolumeOpSm +from ..op_sm import SubvolumeTypes +from ..op_sm import SubvolumeStates from ..template import SubvolumeTemplate from ..snapshot_util import mksnap, rmsnap from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException from ...fs_util import listdir +from ..template import SubvolumeOpType from ..clone_index import open_clone_index, create_clone_index @@ -39,9 +42,9 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value, SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value] def create(self, size, isolate_nspace, pool, mode, uid, gid): - subvolume_type = SubvolumeBase.SUBVOLUME_TYPE_NORMAL + subvolume_type = SubvolumeTypes.TYPE_NORMAL try: - initial_state = OpSm.get_init_state(subvolume_type) + initial_state = SubvolumeOpSm.get_init_state(subvolume_type) except OpSmException as oe: raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error") @@ -84,9 +87,9 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): self.metadata_mgr.flush() def create_clone(self, pool, source_volname, source_subvolume, snapname): - subvolume_type = SubvolumeBase.SUBVOLUME_TYPE_CLONE + subvolume_type = SubvolumeTypes.TYPE_CLONE try: - initial_state = OpSm.get_init_state(subvolume_type) + initial_state = SubvolumeOpSm.get_init_state(subvolume_type) except OpSmException as oe: raise VolumeException(-errno.EINVAL, "clone failed: internal error") @@ -98,7 +101,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): # persist subvolume metadata and clone source qpath = subvol_path.decode('utf-8') - self.metadata_mgr.init(SubvolumeV1.VERSION, subvolume_type, qpath, initial_state) + self.metadata_mgr.init(SubvolumeV1.VERSION, subvolume_type.value, qpath, initial_state.value) self.add_clone_source(source_volname, source_subvolume, snapname) self.metadata_mgr.flush() except (VolumeException, MetadataMgrException, cephfs.Error) as e: @@ -115,21 +118,48 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): e = VolumeException(-e.args[0], e.args[1]) raise e - def open(self, need_complete=True, expected_types=[]): + def allowed_ops_by_type(self, vol_type): + if vol_type == SubvolumeTypes.TYPE_CLONE: + return {op_type for op_type in SubvolumeOpType} + + if vol_type == SubvolumeTypes.TYPE_NORMAL: + return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS, + SubvolumeOpType.CLONE_CANCEL, + SubvolumeOpType.CLONE_INTERNAL} + + return {} + + def allowed_ops_by_state(self, vol_state): + if vol_state == SubvolumeStates.STATE_COMPLETE: + return {op_type for op_type in SubvolumeOpType} + + return {SubvolumeOpType.REMOVE_FORCE, + SubvolumeOpType.CLONE_CREATE, + SubvolumeOpType.CLONE_STATUS, + SubvolumeOpType.CLONE_CANCEL, + SubvolumeOpType.CLONE_INTERNAL} + + def open(self, op_type): + if not isinstance(op_type, SubvolumeOpType): + raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format( + op_type.value, self.subvolname)) try: self.metadata_mgr.refresh() + + etype = SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE)) + if op_type not in self.allowed_ops_by_type(etype): + raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format( + op_type.value, self.subvolname, etype.value)) + + estate = self.state + if op_type not in self.allowed_ops_by_state(estate): + raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format( + self.subvolname, op_type.value)) + subvol_path = self.path log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path)) st = self.fs.stat(subvol_path) - etype = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE) - if len(expected_types) and not etype in expected_types: - raise VolumeException(-errno.ENOTSUP, "subvolume '{0}' is not {1}".format( - self.subvolname, "a {0}".format(expected_types[0]) if len(expected_types) == 1 else \ - "one of types ({0})".format(",".join(expected_types)))) - if need_complete: - estate = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE) - if not OpSm.is_final_state(estate): - raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for use".format(self.subvolname)) + self.uid = int(st.st_uid) self.gid = int(st.st_gid) self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode)) @@ -164,22 +194,22 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): @property def status(self): - state = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE) - subvolume_type = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE) + state = SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE)) + subvolume_type = SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE)) subvolume_status = { - 'state' : state + 'state' : state.value } - if not OpSm.is_final_state(state) and subvolume_type == SubvolumeBase.SUBVOLUME_TYPE_CLONE: + if not SubvolumeOpSm.is_complete_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE: subvolume_status["source"] = self._get_clone_source() return subvolume_status @property def state(self): - return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE) + return SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE)) @state.setter def state(self, val): - state = val[0] + state = val[0].value flush = val[1] self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, state) if flush: diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index 065b1fe6463..612cd882cc3 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -18,6 +18,7 @@ from .vol_spec import VolSpec from .exception import VolumeException from .async_cloner import Cloner from .purge_queue import ThreadPoolPurgeQueueMixin +from .operations.template import SubvolumeOpType log = logging.getLogger(__name__) @@ -158,7 +159,7 @@ class VolumeClient(CephfsClient): with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, groupname) as group: try: - with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume: + with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CREATE) as subvolume: # idempotent creation -- valid. Attributes set is supported. uid = uid if uid else subvolume.uid gid = gid if gid else subvolume.gid @@ -208,7 +209,7 @@ class VolumeClient(CephfsClient): try: with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, groupname) as group: - with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume: + with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.RESIZE) as subvolume: nsize, usedbytes = subvolume.resize(newsize, noshrink) ret = 0, json.dumps( [{'bytes_used': usedbytes},{'bytes_quota': nsize}, @@ -229,7 +230,7 @@ class VolumeClient(CephfsClient): try: with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, groupname) as group: - with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume: + with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.PIN) as subvolume: subvolume.pin(pin_type, pin_setting) ret = 0, json.dumps({}), "" except VolumeException as ve: @@ -245,7 +246,7 @@ class VolumeClient(CephfsClient): try: with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, groupname) as group: - with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume: + with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.GETPATH) as subvolume: subvolpath = subvolume.path ret = 0, subvolpath.decode("utf-8"), "" except VolumeException as ve: @@ -261,7 +262,7 @@ class VolumeClient(CephfsClient): try: with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, groupname) as group: - with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume: + with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.INFO) as subvolume: mon_addr_lst = [] mon_map_mons = self.mgr.get('mon_map')['mons'] for mon in mon_map_mons: @@ -301,7 +302,7 @@ class VolumeClient(CephfsClient): try: with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, groupname) as group: - with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume: + with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_CREATE) as subvolume: subvolume.create_snapshot(snapname) except VolumeException as ve: ret = self.volume_exception_to_retval(ve) @@ -318,7 +319,7 @@ class VolumeClient(CephfsClient): try: with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, groupname) as group: - with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume: + with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_REMOVE) as subvolume: subvolume.remove_snapshot(snapname) except VolumeException as ve: if not (ve.errno == -errno.ENOENT and force): @@ -335,7 +336,7 @@ class VolumeClient(CephfsClient): try: with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, groupname) as group: - with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume: + with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_INFO) as subvolume: snap_info_dict = subvolume.snapshot_info(snapname) ret = 0, json.dumps(snap_info_dict, indent=4, sort_keys=True), "" except VolumeException as ve: @@ -351,7 +352,7 @@ class VolumeClient(CephfsClient): try: with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, groupname) as group: - with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume: + with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_LIST) as subvolume: snapshots = subvolume.list_snapshots() ret = 0, name_to_json(snapshots), "" except VolumeException as ve: @@ -367,7 +368,7 @@ class VolumeClient(CephfsClient): try: with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, groupname) as group: - with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume: + with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_PROTECT) as subvolume: log.warning("snapshot protect call is deprecated and will be removed in a future release") except VolumeException as ve: ret = self.volume_exception_to_retval(ve) @@ -382,7 +383,7 @@ class VolumeClient(CephfsClient): try: with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, groupname) as group: - with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume: + with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_UNPROTECT) as subvolume: log.warning("snapshot unprotect call is deprecated and will be removed in a future release") except VolumeException as ve: ret = self.volume_exception_to_retval(ve) @@ -390,7 +391,7 @@ class VolumeClient(CephfsClient): def _prepare_clone_subvolume(self, fs_handle, volname, subvolume, snapname, target_group, target_subvolname, target_pool): create_clone(fs_handle, self.volspec, target_group, target_subvolname, target_pool, volname, subvolume, snapname) - with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, need_complete=False) as target_subvolume: + with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_INTERNAL) as target_subvolume: try: subvolume.attach_snapshot(snapname, target_subvolume) self.cloner.queue_job(volname) @@ -414,7 +415,7 @@ class VolumeClient(CephfsClient): # TODO: when the target group is same as source, reuse group object. with open_group(fs_handle, self.volspec, target_groupname) as target_group: try: - with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, need_complete=False): + with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_CREATE): raise VolumeException(-errno.EEXIST, "subvolume '{0}' exists".format(target_subvolname)) except VolumeException as ve: if ve.errno == -errno.ENOENT: @@ -432,7 +433,7 @@ class VolumeClient(CephfsClient): try: with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, groupname) as group: - with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume: + with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CLONE_SOURCE) as subvolume: self._clone_subvolume_snapshot(fs_handle, volname, subvolume, **kwargs) except VolumeException as ve: ret = self.volume_exception_to_retval(ve) @@ -447,8 +448,7 @@ class VolumeClient(CephfsClient): try: with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, groupname) as group: - with open_subvol(fs_handle, self.volspec, group, clonename, - need_complete=False, expected_types=["clone"]) as subvolume: + with open_subvol(fs_handle, self.volspec, group, clonename, SubvolumeOpType.CLONE_STATUS) as subvolume: ret = 0, json.dumps({'status' : subvolume.status}, indent=2), "" except VolumeException as ve: ret = self.volume_exception_to_retval(ve)