from .async_job import AsyncJobs
from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException
from .fs_util import copy_file
-from .operations.op_sm import OpSm
+from .operations.op_sm import SubvolumeOpSm
+from .operations.op_sm import SubvolumeTypes
+from .operations.op_sm import SubvolumeActions
+from .operations.op_sm import SubvolumeStates
from .operations.resolver import resolve
from .operations.volume import open_volume, open_volume_lockless
from .operations.group import open_group
from .operations.subvolume import open_subvol
from .operations.clone_index import open_clone_index
+from .operations.template import SubvolumeOpType
+from .operations.versions import SubvolumeBase
log = logging.getLogger(__name__)
return ve.errno, None
@contextmanager
-def open_at_volume(volume_client, volname, groupname, subvolname, need_complete=False, expected_types=[]):
+def open_at_volume(volume_client, volname, groupname, subvolname, op_type):
with open_volume(volume_client, volname) as fs_handle:
with open_group(fs_handle, volume_client.volspec, groupname) as group:
- with open_subvol(fs_handle, volume_client.volspec, group, subvolname,
- need_complete, expected_types) as subvolume:
+ with open_subvol(fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume:
yield subvolume
@contextmanager
-def open_at_group(volume_client, fs_handle, groupname, subvolname, need_complete=False, expected_types=[]):
+def open_at_group(volume_client, fs_handle, groupname, subvolname, op_type):
with open_group(fs_handle, volume_client.volspec, groupname) as group:
- with open_subvol(fs_handle, volume_client.volspec, group, subvolname,
- need_complete, expected_types) as subvolume:
+ with open_subvol(fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume:
yield subvolume
def get_clone_state(volume_client, volname, groupname, subvolname):
- with open_at_volume(volume_client, volname, groupname, subvolname) as subvolume:
+ with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
return subvolume.state
def set_clone_state(volume_client, volname, groupname, subvolname, state):
- with open_at_volume(volume_client, volname, groupname, subvolname) as subvolume:
+ with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
subvolume.state = (state, True)
def get_clone_source(clone_subvolume):
source = clone_subvolume._get_clone_source()
return (source['volume'], source.get('group', None), source['subvolume'], source['snapshot'])
+def get_next_state_on_error(errnum):
+ if errnum == -errno.EINTR:
+ next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INPROGRESS,
+ SubvolumeActions.ACTION_CANCELLED)
+ else:
+ # jump to failed state, on all other errors
+ next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INPROGRESS,
+ SubvolumeActions.ACTION_FAILED)
+ return next_state
+
def handle_clone_pending(volume_client, volname, index, groupname, subvolname, should_cancel):
try:
if should_cancel():
- next_state = OpSm.get_next_state("clone", "pending", -errno.EINTR)
+ next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_PENDING,
+ SubvolumeActions.ACTION_CANCELLED)
else:
- next_state = OpSm.get_next_state("clone", "pending", 0)
+ next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_PENDING,
+ SubvolumeActions.ACTION_SUCCESS)
except OpSmException as oe:
raise VolumeException(oe.errno, oe.error_str)
return (next_state, False)
def do_clone(volume_client, volname, groupname, subvolname, should_cancel):
with open_volume_lockless(volume_client, volname) as fs_handle:
- with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume:
+ with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
- with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume:
+ with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
src_path = source_subvolume.snapshot_path(s_snapname)
dst_path = clone_subvolume.path
bulk_copy(fs_handle, src_path, dst_path, should_cancel)
def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel):
try:
do_clone(volume_client, volname, groupname, subvolname, should_cancel)
- next_state = OpSm.get_next_state("clone", "in-progress", 0)
+ next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INPROGRESS,
+ SubvolumeActions.ACTION_SUCCESS)
except VolumeException as ve:
- # jump to failed state
- next_state = OpSm.get_next_state("clone", "in-progress", ve.errno)
+ next_state = get_next_state_on_error(ve.errno)
except OpSmException as oe:
raise VolumeException(oe.errno, oe.error_str)
return (next_state, False)
try:
# detach source but leave the clone section intact for later inspection
with open_volume(volume_client, volname) as fs_handle:
- with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume:
+ with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
- with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume:
+ with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
source_subvolume.detach_snapshot(s_snapname, index)
except (MetadataMgrException, VolumeException) as e:
log.error("failed to detach clone from snapshot: {0}".format(e))
def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel):
try:
with open_volume(volume_client, volname) as fs_handle:
- with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume:
+ with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
- with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume:
+ with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
source_subvolume.detach_snapshot(s_snapname, index)
clone_subvolume.remove_clone_source(flush=True)
except (MetadataMgrException, VolumeException) as e:
class Cloner(AsyncJobs):
"""
Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume.
- this relies on a simple state machine (which mimics states from OpSm class) as
+ this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
the driver. file types supported are directories, symbolic links and regular files.
"""
def __init__(self, volume_client, tp_size):
self.vc = volume_client
self.state_table = {
- 'pending' : handle_clone_pending,
- 'in-progress' : handle_clone_in_progress,
- 'complete' : handle_clone_complete,
- 'failed' : handle_clone_failed,
- 'canceled' : handle_clone_failed,
+ SubvolumeStates.STATE_PENDING : handle_clone_pending,
+ SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress,
+ SubvolumeStates.STATE_COMPLETE : handle_clone_complete,
+ SubvolumeStates.STATE_FAILED : handle_clone_failed,
+ SubvolumeStates.STATE_CANCELED : handle_clone_failed,
}
super(Cloner, self).__init__(volume_client, "cloner", tp_size)
def is_clone_cancelable(self, clone_state):
- return not (OpSm.is_final_state(clone_state) or OpSm.is_failed_state(clone_state))
+ return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
def get_clone_tracking_index(self, fs_handle, clone_subvolume):
with open_clone_index(fs_handle, self.vc.volspec) as index:
return index.find_clone_entry_index(clone_subvolume.base_path)
def _cancel_pending_clone(self, fs_handle, clone_subvolume, status, track_idx):
- clone_state = status['state']
+ clone_state = SubvolumeStates.from_value(status['state'])
assert self.is_clone_cancelable(clone_state)
s_groupname = status['source'].get('group', None)
s_snapname = status['source']['snapshot']
with open_group(fs_handle, self.vc.volspec, s_groupname) as s_group:
- with open_subvol(fs_handle, self.vc.volspec, s_group, s_subvolname) as s_subvolume:
- next_state = OpSm.get_next_state("clone", clone_state, -errno.EINTR)
+ with open_subvol(fs_handle, self.vc.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
+ next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+ clone_state,
+ SubvolumeActions.ACTION_CANCELLED)
clone_subvolume.state = (next_state, True)
s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8'))
try:
with open_volume(self.vc, volname) as fs_handle:
with open_group(fs_handle, self.vc.volspec, groupname) as group:
- with open_subvol(fs_handle, self.vc.volspec, group, clonename,
- need_complete=False, expected_types=["clone"]) as clone_subvolume:
+ with open_subvol(fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
status = clone_subvolume.status
- clone_state = status['state']
+ clone_state = SubvolumeStates.from_value(status['state'])
if not self.is_clone_cancelable(clone_state):
raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
track_idx = self.get_clone_tracking_index(fs_handle, clone_subvolume)
if not track_idx:
log.warning("cannot lookup clone tracking index for {0}".format(clone_subvolume.base_path))
raise VolumeException(-errno.EINVAL, "error canceling clone")
- if OpSm.is_init_state("clone", clone_state):
+ if SubvolumeOpSm.is_init_state(SubvolumeTypes.TYPE_CLONE, clone_state):
# clone has not started yet -- cancel right away.
self._cancel_pending_clone(fs_handle, clone_subvolume, status, track_idx)
return
with self.lock:
with open_volume_lockless(self.vc, volname) as fs_handle:
with open_group(fs_handle, self.vc.volspec, groupname) as group:
- with open_subvol(fs_handle, self.vc.volspec, group, clonename,
- need_complete=False, expected_types=["clone"]) as clone_subvolume:
+ with open_subvol(fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)):
raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
except (IndexException, MetadataMgrException) as e:
import errno
+from enum import Enum, unique
from typing import Dict
+from .versions.subvolume_base import SubvolumeTypes
from ..exception import OpSmException
-class OpSm(object):
- INIT_STATE_KEY = 'init'
+@unique
+class SubvolumeStates(Enum):
+ STATE_INIT = 'init'
+ STATE_PENDING = 'pending'
+ STATE_INPROGRESS = 'in-progress'
+ STATE_FAILED = 'failed'
+ STATE_COMPLETE = 'complete'
+ STATE_CANCELED = 'canceled'
- FAILED_STATE = 'failed'
- FINAL_STATE = 'complete'
- CANCEL_STATE = 'canceled'
+ @staticmethod
+ def from_value(value):
+ if value == "init":
+ return SubvolumeStates.STATE_INIT
+ if value == "pending":
+ return SubvolumeStates.STATE_PENDING
+ if value == "in-progress":
+ return SubvolumeStates.STATE_INPROGRESS
+ if value == "failed":
+ return SubvolumeStates.STATE_FAILED
+ if value == "complete":
+ return SubvolumeStates.STATE_COMPLETE
+ if value == "canceled":
+ return SubvolumeStates.STATE_CANCELED
+
+ raise OpSmException(-errno.EINVAL, "invalid state '{0}'".format(value))
+
+@unique
+class SubvolumeActions(Enum):
+ ACTION_NONE = 0
+ ACTION_SUCCESS = 1
+ ACTION_FAILED = 2
+ ACTION_CANCELLED = 3
+
+class TransitionKey(object):
+ def __init__(self, subvol_type, state, action_type):
+ self.transition_key = [subvol_type, state, action_type]
- OP_SM_SUBVOLUME = {
- INIT_STATE_KEY : FINAL_STATE,
- }
+ def __hash__(self):
+ return hash(tuple(self.transition_key))
- OP_SM_CLONE = {
- INIT_STATE_KEY : 'pending',
- 'pending' : ('in-progress', (FAILED_STATE, CANCEL_STATE)),
- 'in-progress' : (FINAL_STATE, (FAILED_STATE, CANCEL_STATE)),
- } # type: Dict
+ def __eq__(self, other):
+ return self.transition_key == other.transition_key
- STATE_MACHINES_TYPES = {
- "subvolume" : OP_SM_SUBVOLUME,
- "clone" : OP_SM_CLONE,
- } # type: Dict
+ def __neq__(self, other):
+ return not(self == other)
+
+class SubvolumeOpSm(object):
+ transition_table = {}
@staticmethod
- def is_final_state(state):
- return state == OpSm.FINAL_STATE
+ def is_complete_state(state):
+ if not isinstance(state, SubvolumeStates):
+ raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state))
+ return state == SubvolumeStates.STATE_COMPLETE
@staticmethod
def is_failed_state(state):
- return state == OpSm.FAILED_STATE or state == OpSm.CANCEL_STATE
+ if not isinstance(state, SubvolumeStates):
+ raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state))
+ return state == SubvolumeStates.STATE_FAILED or state == SubvolumeStates.STATE_CANCELED
@staticmethod
def is_init_state(stm_type, state):
- stm = OpSm.STATE_MACHINES_TYPES.get(stm_type, None)
- if not stm:
- raise OpSmException(-errno.ENOENT, "state machine type '{0}' not found".format(stm_type))
- init_state = stm.get(OpSm.INIT_STATE_KEY, None)
- return init_state == state
+ if not isinstance(state, SubvolumeStates):
+ raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state))
+ return state == SubvolumeOpSm.get_init_state(stm_type)
@staticmethod
def get_init_state(stm_type):
- stm = OpSm.STATE_MACHINES_TYPES.get(stm_type, None)
- if not stm:
- raise OpSmException(-errno.ENOENT, "state machine type '{0}' not found".format(stm_type))
- init_state = stm.get(OpSm.INIT_STATE_KEY, None)
+ if not isinstance(stm_type, SubvolumeTypes):
+ raise OpSmException(-errno.EINVAL, "unknown state machine '{0}'".format(stm_type))
+ init_state = SubvolumeOpSm.transition_table[TransitionKey(stm_type,
+ SubvolumeStates.STATE_INIT,
+ SubvolumeActions.ACTION_NONE)]
if not init_state:
- raise OpSmException(-errno.ENOENT, "initial state unavailable for state machine '{0}'".format(stm_type))
+ raise OpSmException(-errno.ENOENT, "initial state for state machine '{0}' not found".format(stm_type))
return init_state
@staticmethod
- def get_next_state(stm_type, current_state, ret):
- stm = OpSm.STATE_MACHINES_TYPES.get(stm_type, None)
- if not stm:
- raise OpSmException(-errno.ENOENT, "state machine type '{0}' not found".format(stm_type))
- next_state = stm.get(current_state, None)
- if not next_state:
- raise OpSmException(-errno.EINVAL, "invalid current state '{0}'".format(current_state))
- if ret == 0:
- return next_state[0]
- elif ret == -errno.EINTR:
- return next_state[1][1]
- else:
- return next_state[1][0]
+ def transition(stm_type, current_state, action):
+ if not isinstance(stm_type, SubvolumeTypes):
+ raise OpSmException(-errno.EINVAL, "unknown state machine '{0}'".format(stm_type))
+ if not isinstance(current_state, SubvolumeStates):
+ raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(current_state))
+ if not isinstance(action, SubvolumeActions):
+ raise OpSmException(-errno.EINVAL, "unknown action '{0}'".format(action))
+
+ transition = SubvolumeOpSm.transition_table[TransitionKey(stm_type, current_state, action)]
+ if not transition:
+ raise OpSmException(-errno.EINVAL, "invalid action '{0}' on current state {1} for state machine '{2}'".format(action, current_state, stm_type))
+
+ return transition
+
+SubvolumeOpSm.transition_table = {
+ # state transitions for state machine type TYPE_NORMAL
+ TransitionKey(SubvolumeTypes.TYPE_NORMAL,
+ SubvolumeStates.STATE_INIT,
+ SubvolumeActions.ACTION_NONE) : SubvolumeStates.STATE_COMPLETE,
+
+ # state transitions for state machine type TYPE_CLONE
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INIT,
+ SubvolumeActions.ACTION_NONE) : SubvolumeStates.STATE_PENDING,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_PENDING,
+ SubvolumeActions.ACTION_SUCCESS) : SubvolumeStates.STATE_INPROGRESS,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_PENDING,
+ SubvolumeActions.ACTION_CANCELLED) : SubvolumeStates.STATE_CANCELED,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INPROGRESS,
+ SubvolumeActions.ACTION_SUCCESS) : SubvolumeStates.STATE_COMPLETE,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INPROGRESS,
+ SubvolumeActions.ACTION_CANCELLED) : SubvolumeStates.STATE_CANCELED,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INPROGRESS,
+ SubvolumeActions.ACTION_FAILED) : SubvolumeStates.STATE_FAILED,
+}
from .snapshot_util import mksnap, rmsnap
from ..fs_util import listdir, get_ancestor_xattr
from ..exception import VolumeException
+from .template import SubvolumeOpType
from .versions import loaded_subvolumes
:param force: force remove subvolumes
:return: None
"""
- nc_flag = True if not force else False
- with open_subvol(fs, vol_spec, group, subvolname, need_complete=nc_flag) as subvolume:
+ op_type = SubvolumeOpType.REMOVE if not force else SubvolumeOpType.REMOVE_FORCE
+ with open_subvol(fs, vol_spec, group, subvolname, op_type) as subvolume:
if subvolume.list_snapshots():
raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(subvolname))
subvolume.remove()
@contextmanager
-def open_subvol(fs, vol_spec, group, subvolname, need_complete=True, expected_types=[]):
+def open_subvol(fs, vol_spec, group, subvolname, op_type):
"""
open a subvolume. This API is to be used as a context manager.
:param vol_spec: volume specification
:param group: group object for the subvolume
:param subvolname: subvolume name
- :param need_complete: check if the subvolume is usable (since cloned subvolumes can
- be in transient state). defaults to True.
- :param expected_types: check if the subvolume is one the provided types. defaults to
- all.
+ :param op_type: operation type for which subvolume is being opened
:return: yields a subvolume object (subclass of SubvolumeTemplate)
"""
subvolume = loaded_subvolumes.get_subvolume_object(fs, vol_spec, group, subvolname)
- subvolume.open(need_complete, expected_types)
+ subvolume.open(op_type)
yield subvolume
import errno
+from enum import Enum, unique
+
from ..exception import VolumeException
class GroupTemplate(object):
"""
raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+@unique
+class SubvolumeOpType(Enum):
+ CREATE = 'create'
+ REMOVE = 'rm'
+ REMOVE_FORCE = 'rm-force'
+ PIN = 'pin'
+ LIST = 'ls'
+ GETPATH = 'getpath'
+ INFO = 'info'
+ RESIZE = 'resize'
+ SNAP_CREATE = 'snap-create'
+ SNAP_REMOVE = 'snap-rm'
+ SNAP_LIST = 'snap-ls'
+ SNAP_INFO = 'snap-info'
+ SNAP_PROTECT = 'snap-protect'
+ SNAP_UNPROTECT = 'snap-unprotect'
+ CLONE_SOURCE = 'clone-source'
+ CLONE_CREATE = 'clone-create'
+ CLONE_STATUS = 'clone-status'
+ CLONE_CANCEL = 'clone-cancel'
+ CLONE_INTERNAL = 'clone_internal'
+
class SubvolumeTemplate(object):
VERSION = None
def version():
return SubvolumeTemplate.VERSION
- def open(self, need_complete=True, expected_types=[]):
+ def open(self, op_type):
raise VolumeException(-errno.ENOTSUP, "operation not supported.")
def status(self):
import cephfs
from .subvolume_base import SubvolumeBase
-from ..op_sm import OpSm
+from ..op_sm import SubvolumeOpSm
+from ..op_sm import SubvolumeTypes
from ...exception import MetadataMgrException, OpSmException, VolumeException
log = logging.getLogger(__name__)
fs.mkdirs(subvolume.legacy_dir, 0o700)
except cephfs.Error as e:
raise VolumeException(-e.args[0], "error accessing subvolume")
- subvolume_type = SubvolumeBase.SUBVOLUME_TYPE_NORMAL
+ subvolume_type = SubvolumeTypes.TYPE_NORMAL
try:
- initial_state = OpSm.get_init_state(subvolume_type)
+ initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
except OpSmException as oe:
raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
qpath = subvolume.base_path.decode('utf-8')
FEATURE_SNAPSHOT_CLONE = "snapshot-clone"
FEATURE_SNAPSHOT_AUTOPROTECT = "snapshot-autoprotect"
+@unique
+class SubvolumeTypes(Enum):
+ TYPE_NORMAL = "subvolume"
+ TYPE_CLONE = "clone"
+
+ @staticmethod
+ def from_value(value):
+ if value == "subvolume":
+ return SubvolumeTypes.TYPE_NORMAL
+ if value == "clone":
+ return SubvolumeTypes.TYPE_CLONE
+
+ raise VolumeException(-errno.EINVAL, "invalid subvolume type '{0}'".format(value))
+
class SubvolumeBase(object):
LEGACY_CONF_DIR = "_legacy"
- SUBVOLUME_TYPE_NORMAL = "subvolume"
- SUBVOLUME_TYPE_CLONE = "clone"
-
def __init__(self, fs, vol_spec, group, subvolname, legacy=False):
self.fs = fs
self.cmode = None
return pin(self.fs, self.base_path, pin_type, pin_setting)
def init_config(self, version, subvolume_type, subvolume_path, subvolume_state):
- self.metadata_mgr.init(version, subvolume_type, subvolume_path, subvolume_state)
+ self.metadata_mgr.init(version, subvolume_type.value, subvolume_path, subvolume_state.value)
self.metadata_mgr.flush()
def discover(self):
def info (self):
subvolpath = self.metadata_mgr.get_global_option('path')
- etype = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE)
+ etype = SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
st = self.fs.statx(subvolpath, cephfs.CEPH_STATX_BTIME | cephfs.CEPH_STATX_SIZE |
cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID |
cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_ATIME |
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
- return {'path': subvolpath, 'type': etype, 'uid': int(st["uid"]), 'gid': int(st["gid"]),
+ return {'path': subvolpath, 'type': etype.value, 'uid': int(st["uid"]), 'gid': int(st["gid"]),
'atime': str(st["atime"]), 'mtime': str(st["mtime"]), 'ctime': str(st["ctime"]),
'mode': int(st["mode"]), 'data_pool': data_pool, 'created_at': str(st["btime"]),
'bytes_quota': "infinite" if nsize == 0 else nsize, 'bytes_used': int(usedbytes),
from .metadata_manager import MetadataManager
from .subvolume_base import SubvolumeBase, SubvolumeFeatures
-from ..op_sm import OpSm
+from ..op_sm import SubvolumeOpSm
+from ..op_sm import SubvolumeTypes
+from ..op_sm import SubvolumeStates
from ..template import SubvolumeTemplate
from ..snapshot_util import mksnap, rmsnap
from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException
from ...fs_util import listdir
+from ..template import SubvolumeOpType
from ..clone_index import open_clone_index, create_clone_index
return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value, SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value]
def create(self, size, isolate_nspace, pool, mode, uid, gid):
- subvolume_type = SubvolumeBase.SUBVOLUME_TYPE_NORMAL
+ subvolume_type = SubvolumeTypes.TYPE_NORMAL
try:
- initial_state = OpSm.get_init_state(subvolume_type)
+ initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
except OpSmException as oe:
raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
self.metadata_mgr.flush()
def create_clone(self, pool, source_volname, source_subvolume, snapname):
- subvolume_type = SubvolumeBase.SUBVOLUME_TYPE_CLONE
+ subvolume_type = SubvolumeTypes.TYPE_CLONE
try:
- initial_state = OpSm.get_init_state(subvolume_type)
+ initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
except OpSmException as oe:
raise VolumeException(-errno.EINVAL, "clone failed: internal error")
# persist subvolume metadata and clone source
qpath = subvol_path.decode('utf-8')
- self.metadata_mgr.init(SubvolumeV1.VERSION, subvolume_type, qpath, initial_state)
+ self.metadata_mgr.init(SubvolumeV1.VERSION, subvolume_type.value, qpath, initial_state.value)
self.add_clone_source(source_volname, source_subvolume, snapname)
self.metadata_mgr.flush()
except (VolumeException, MetadataMgrException, cephfs.Error) as e:
e = VolumeException(-e.args[0], e.args[1])
raise e
- def open(self, need_complete=True, expected_types=[]):
+ def allowed_ops_by_type(self, vol_type):
+ if vol_type == SubvolumeTypes.TYPE_CLONE:
+ return {op_type for op_type in SubvolumeOpType}
+
+ if vol_type == SubvolumeTypes.TYPE_NORMAL:
+ return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS,
+ SubvolumeOpType.CLONE_CANCEL,
+ SubvolumeOpType.CLONE_INTERNAL}
+
+ return {}
+
+ def allowed_ops_by_state(self, vol_state):
+ if vol_state == SubvolumeStates.STATE_COMPLETE:
+ return {op_type for op_type in SubvolumeOpType}
+
+ return {SubvolumeOpType.REMOVE_FORCE,
+ SubvolumeOpType.CLONE_CREATE,
+ SubvolumeOpType.CLONE_STATUS,
+ SubvolumeOpType.CLONE_CANCEL,
+ SubvolumeOpType.CLONE_INTERNAL}
+
+ def open(self, op_type):
+ if not isinstance(op_type, SubvolumeOpType):
+ raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format(
+ op_type.value, self.subvolname))
try:
self.metadata_mgr.refresh()
+
+ etype = SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
+ if op_type not in self.allowed_ops_by_type(etype):
+ raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format(
+ op_type.value, self.subvolname, etype.value))
+
+ estate = self.state
+ if op_type not in self.allowed_ops_by_state(estate):
+ raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format(
+ self.subvolname, op_type.value))
+
subvol_path = self.path
log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path))
st = self.fs.stat(subvol_path)
- etype = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE)
- if len(expected_types) and not etype in expected_types:
- raise VolumeException(-errno.ENOTSUP, "subvolume '{0}' is not {1}".format(
- self.subvolname, "a {0}".format(expected_types[0]) if len(expected_types) == 1 else \
- "one of types ({0})".format(",".join(expected_types))))
- if need_complete:
- estate = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE)
- if not OpSm.is_final_state(estate):
- raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for use".format(self.subvolname))
+
self.uid = int(st.st_uid)
self.gid = int(st.st_gid)
self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode))
@property
def status(self):
- state = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE)
- subvolume_type = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE)
+ state = SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
+ subvolume_type = SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
subvolume_status = {
- 'state' : state
+ 'state' : state.value
}
- if not OpSm.is_final_state(state) and subvolume_type == SubvolumeBase.SUBVOLUME_TYPE_CLONE:
+ if not SubvolumeOpSm.is_complete_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE:
subvolume_status["source"] = self._get_clone_source()
return subvolume_status
@property
def state(self):
- return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE)
+ return SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
@state.setter
def state(self, val):
- state = val[0]
+ state = val[0].value
flush = val[1]
self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, state)
if flush:
from .exception import VolumeException
from .async_cloner import Cloner
from .purge_queue import ThreadPoolPurgeQueueMixin
+from .operations.template import SubvolumeOpType
log = logging.getLogger(__name__)
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
try:
- with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+ with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CREATE) as subvolume:
# idempotent creation -- valid. Attributes set is supported.
uid = uid if uid else subvolume.uid
gid = gid if gid else subvolume.gid
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+ with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.RESIZE) as subvolume:
nsize, usedbytes = subvolume.resize(newsize, noshrink)
ret = 0, json.dumps(
[{'bytes_used': usedbytes},{'bytes_quota': nsize},
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+ with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.PIN) as subvolume:
subvolume.pin(pin_type, pin_setting)
ret = 0, json.dumps({}), ""
except VolumeException as ve:
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+ with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.GETPATH) as subvolume:
subvolpath = subvolume.path
ret = 0, subvolpath.decode("utf-8"), ""
except VolumeException as ve:
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+ with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.INFO) as subvolume:
mon_addr_lst = []
mon_map_mons = self.mgr.get('mon_map')['mons']
for mon in mon_map_mons:
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+ with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_CREATE) as subvolume:
subvolume.create_snapshot(snapname)
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+ with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_REMOVE) as subvolume:
subvolume.remove_snapshot(snapname)
except VolumeException as ve:
if not (ve.errno == -errno.ENOENT and force):
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+ with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_INFO) as subvolume:
snap_info_dict = subvolume.snapshot_info(snapname)
ret = 0, json.dumps(snap_info_dict, indent=4, sort_keys=True), ""
except VolumeException as ve:
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+ with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_LIST) as subvolume:
snapshots = subvolume.list_snapshots()
ret = 0, name_to_json(snapshots), ""
except VolumeException as ve:
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+ with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_PROTECT) as subvolume:
log.warning("snapshot protect call is deprecated and will be removed in a future release")
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+ with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_UNPROTECT) as subvolume:
log.warning("snapshot unprotect call is deprecated and will be removed in a future release")
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
def _prepare_clone_subvolume(self, fs_handle, volname, subvolume, snapname, target_group, target_subvolname, target_pool):
create_clone(fs_handle, self.volspec, target_group, target_subvolname, target_pool, volname, subvolume, snapname)
- with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, need_complete=False) as target_subvolume:
+ with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_INTERNAL) as target_subvolume:
try:
subvolume.attach_snapshot(snapname, target_subvolume)
self.cloner.queue_job(volname)
# TODO: when the target group is same as source, reuse group object.
with open_group(fs_handle, self.volspec, target_groupname) as target_group:
try:
- with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, need_complete=False):
+ with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_CREATE):
raise VolumeException(-errno.EEXIST, "subvolume '{0}' exists".format(target_subvolname))
except VolumeException as ve:
if ve.errno == -errno.ENOENT:
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+ with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CLONE_SOURCE) as subvolume:
self._clone_subvolume_snapshot(fs_handle, volname, subvolume, **kwargs)
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(fs_handle, self.volspec, group, clonename,
- need_complete=False, expected_types=["clone"]) as subvolume:
+ with open_subvol(fs_handle, self.volspec, group, clonename, SubvolumeOpType.CLONE_STATUS) as subvolume:
ret = 0, json.dumps({'status' : subvolume.status}, indent=2), ""
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)