Implements subvolume v2 version.
Following support is added,
- Ability to retain snapshots on subvolume deletion
- Modify directory where snapshot is created to the subvolume
- "features" supported to subvolume info output, specifically ability
for a subvolume to retain snashots
- Current state of subvolume in info output
- Auto upgrade to v2 from eligible v1 subvolumes
- Adjust other functions as needed to support the changes
Signed-off-by: Shyamsundar Ranganathan <srangana@redhat.com>
with open_subvol(fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume:
yield subvolume
+@contextmanager
+def open_at_group_unique(volume_client, fs_handle, s_groupname, s_subvolname, c_subvolume, c_groupname, c_subvolname, op_type):
+ # if a snapshot of a retained subvolume is being cloned to recreate the same subvolume, return
+ # the clone subvolume as the source subvolume
+ if s_groupname == c_groupname and s_subvolname == c_subvolname:
+ yield c_subvolume
+ else:
+ with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, op_type) as s_subvolume:
+ yield s_subvolume
+
+
+@contextmanager
+def open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname):
+ with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
+ s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
+ if groupname == s_groupname and subvolname == s_subvolname:
+ # use the same subvolume to avoid metadata overwrites
+ yield (clone_subvolume, clone_subvolume, s_snapname)
+ else:
+ with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
+ yield (clone_subvolume, source_subvolume, s_snapname)
+
def get_clone_state(volume_client, volname, groupname, subvolname):
with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
return subvolume.state
def do_clone(volume_client, volname, groupname, subvolname, should_cancel):
with open_volume_lockless(volume_client, volname) as fs_handle:
- with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
- s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
- with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
- src_path = source_subvolume.snapshot_path(s_snapname)
- dst_path = clone_subvolume.path
- bulk_copy(fs_handle, src_path, dst_path, should_cancel)
+ with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
+ src_path = clone_volumes[1].snapshot_data_path(clone_volumes[2])
+ dst_path = clone_volumes[0].path
+ bulk_copy(fs_handle, src_path, dst_path, should_cancel)
def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel):
try:
def handle_clone_failed(volume_client, volname, index, groupname, subvolname, should_cancel):
try:
- # detach source but leave the clone section intact for later inspection
with open_volume(volume_client, volname) as fs_handle:
- with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
- s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
- with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
- source_subvolume.detach_snapshot(s_snapname, index)
+ # detach source but leave the clone section intact for later inspection
+ with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
+ clone_volumes[1].detach_snapshot(clone_volumes[2], index)
except (MetadataMgrException, VolumeException) as e:
log.error("failed to detach clone from snapshot: {0}".format(e))
return (None, True)
def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel):
try:
with open_volume(volume_client, volname) as fs_handle:
- with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
- s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
- with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
- source_subvolume.detach_snapshot(s_snapname, index)
- clone_subvolume.remove_clone_source(flush=True)
+ with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
+ clone_volumes[1].detach_snapshot(clone_volumes[2], index)
+ clone_volumes[0].remove_clone_source(flush=True)
except (MetadataMgrException, VolumeException) as e:
log.error("failed to detach clone from snapshot: {0}".format(e))
return (None, True)
with open_clone_index(fs_handle, self.vc.volspec) as index:
return index.find_clone_entry_index(clone_subvolume.base_path)
- def _cancel_pending_clone(self, fs_handle, clone_subvolume, status, track_idx):
+ def _cancel_pending_clone(self, fs_handle, clone_subvolume, clone_subvolname, clone_groupname, status, track_idx):
clone_state = SubvolumeStates.from_value(status['state'])
assert self.is_clone_cancelable(clone_state)
s_subvolname = status['source']['subvolume']
s_snapname = status['source']['snapshot']
- with open_group(fs_handle, self.vc.volspec, s_groupname) as s_group:
- with open_subvol(fs_handle, self.vc.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
- next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
- clone_state,
- SubvolumeActions.ACTION_CANCELLED)
- clone_subvolume.state = (next_state, True)
- s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8'))
+ with open_at_group_unique(self.vc, fs_handle, s_groupname, s_subvolname, clone_subvolume, clone_groupname,
+ clone_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
+ next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+ clone_state,
+ SubvolumeActions.ACTION_CANCELLED)
+ clone_subvolume.state = (next_state, True)
+ s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8'))
def cancel_job(self, volname, job):
"""
raise VolumeException(-errno.EINVAL, "error canceling clone")
if SubvolumeOpSm.is_init_state(SubvolumeTypes.TYPE_CLONE, clone_state):
# clone has not started yet -- cancel right away.
- self._cancel_pending_clone(fs_handle, clone_subvolume, status, track_idx)
+ self._cancel_pending_clone(fs_handle, clone_subvolume, clonename, groupname, status, track_idx)
return
# cancelling an on-going clone would persist "canceled" state in subvolume metadata.
# to persist the new state, async cloner accesses the volume in exclusive mode.
else:
raise VolumeException(-e.args[0], e.args[1])
yield group
+
+@contextmanager
+def open_group_unique(fs, vol_spec, groupname, c_group, c_groupname):
+ if groupname == c_groupname:
+ yield c_group
+ else:
+ with open_group(fs, vol_spec, groupname) as group:
+ yield group
STATE_FAILED = 'failed'
STATE_COMPLETE = 'complete'
STATE_CANCELED = 'canceled'
+ STATE_RETAINED = 'snapshot-retained'
@staticmethod
def from_value(value):
return SubvolumeStates.STATE_COMPLETE
if value == "canceled":
return SubvolumeStates.STATE_CANCELED
+ if value == "snapshot-retained":
+ return SubvolumeStates.STATE_RETAINED
raise OpSmException(-errno.EINVAL, "invalid state '{0}'".format(value))
ACTION_SUCCESS = 1
ACTION_FAILED = 2
ACTION_CANCELLED = 3
+ ACTION_RETAINED = 4
class TransitionKey(object):
def __init__(self, subvol_type, state, action_type):
SubvolumeStates.STATE_INIT,
SubvolumeActions.ACTION_NONE) : SubvolumeStates.STATE_COMPLETE,
+ TransitionKey(SubvolumeTypes.TYPE_NORMAL,
+ SubvolumeStates.STATE_COMPLETE,
+ SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED,
+
# state transitions for state machine type TYPE_CLONE
TransitionKey(SubvolumeTypes.TYPE_CLONE,
SubvolumeStates.STATE_INIT,
TransitionKey(SubvolumeTypes.TYPE_CLONE,
SubvolumeStates.STATE_INPROGRESS,
SubvolumeActions.ACTION_FAILED) : SubvolumeStates.STATE_FAILED,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_COMPLETE,
+ SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_CANCELED,
+ SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_FAILED,
+ SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED,
}
subvolume = loaded_subvolumes.get_subvolume_object_max(fs, vol_spec, group, subvolname)
subvolume.create_clone(pool, source_volume, source_subvolume, snapname)
-def remove_subvol(fs, vol_spec, group, subvolname, force=False):
+def remove_subvol(fs, vol_spec, group, subvolname, force=False, retainsnaps=False):
"""
remove a subvolume.
"""
op_type = SubvolumeOpType.REMOVE if not force else SubvolumeOpType.REMOVE_FORCE
with open_subvol(fs, vol_spec, group, subvolname, op_type) as subvolume:
- if subvolume.list_snapshots():
- raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(subvolname))
- subvolume.remove()
+ subvolume.remove(retainsnaps)
@contextmanager
def open_subvol(fs, vol_spec, group, subvolname, op_type):
import cephfs
from .subvolume_base import SubvolumeBase
+from .subvolume_base import SubvolumeTypes
+from .subvolume_v1 import SubvolumeV1
+from .subvolume_v2 import SubvolumeV2
+from .metadata_manager import MetadataManager
from ..op_sm import SubvolumeOpSm
-from ..op_sm import SubvolumeTypes
+from ..template import SubvolumeOpType
from ...exception import MetadataMgrException, OpSmException, VolumeException
log = logging.getLogger(__name__)
class SubvolumeLoader(object):
INVALID_VERSION = -1
- SUPPORTED_MODULES = ['subvolume_v1.SubvolumeV1']
+ SUPPORTED_MODULES = ['subvolume_v1.SubvolumeV1', 'subvolume_v2.SubvolumeV2']
def __init__(self):
self.max_version = SubvolumeLoader.INVALID_VERSION
def get_subvolume_object_max(self, fs, vol_spec, group, subvolname):
return self._get_subvolume_version(self.max_version)(fs, vol_spec, group, subvolname)
+ def upgrade_to_v2_subvolume(self, subvolume):
+ # legacy mode subvolumes cannot be upgraded to v2
+ if subvolume.legacy_mode:
+ return
+
+ version = int(subvolume.metadata_mgr.get_global_option('version'))
+ if version >= SubvolumeV2.version():
+ return
+
+ v1_subvolume = self._get_subvolume_version(version)(subvolume.fs, subvolume.vol_spec, subvolume.group, subvolume.subvolname)
+ try:
+ v1_subvolume.open(SubvolumeOpType.SNAP_LIST)
+ except VolumeException as ve:
+ # if volume is not ready for snapshot listing, do not upgrade at present
+ if ve.errno == -errno.EAGAIN:
+ return
+ raise
+
+ # v1 subvolumes with snapshots cannot be upgraded to v2
+ if v1_subvolume.list_snapshots():
+ return
+
+ subvolume.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_VERSION, SubvolumeV2.version())
+ subvolume.metadata_mgr.flush()
+
def upgrade_legacy_subvolume(self, fs, subvolume):
assert subvolume.legacy_mode
try:
except OpSmException as oe:
raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
qpath = subvolume.base_path.decode('utf-8')
- subvolume.init_config(self.max_version, subvolume_type, qpath, initial_state)
+ # legacy is only upgradable to v1
+ subvolume.init_config(SubvolumeV1.version(), subvolume_type, qpath, initial_state)
def get_subvolume_object(self, fs, vol_spec, group, subvolname, upgrade=True):
subvolume = SubvolumeBase(fs, vol_spec, group, subvolname)
try:
subvolume.discover()
+ self.upgrade_to_v2_subvolume(subvolume)
version = int(subvolume.metadata_mgr.get_global_option('version'))
return self._get_subvolume_version(version)(fs, vol_spec, group, subvolname, legacy=subvolume.legacy_mode)
except MetadataMgrException as me:
class SubvolumeFeatures(Enum):
FEATURE_SNAPSHOT_CLONE = "snapshot-clone"
FEATURE_SNAPSHOT_AUTOPROTECT = "snapshot-autoprotect"
+ FEATURE_SNAPSHOT_RETENTION = "snapshot-retention"
@unique
class SubvolumeTypes(Enum):
def legacy_mode(self, mode):
self.legacy = mode
+ @property
+ def path(self):
+ raise NotImplementedError
+
@property
def features(self):
raise NotImplementedError
+ @property
+ def state(self):
+ raise NotImplementedError
+
+ @property
+ def subvol_type(self):
+ return SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
+
def load_config(self):
if self.legacy_mode:
self.metadata_mgr = MetadataManager(self.fs, self.legacy_config_path, 0o640)
raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname))
raise VolumeException(-e.args[0], "error accessing subvolume '{0}'".format(self.subvolname))
+ def _trash_dir(self, path):
+ create_trashcan(self.fs, self.vol_spec)
+ with open_trashcan(self.fs, self.vol_spec) as trashcan:
+ trashcan.dump(path)
+ log.info("subvolume path '{0}' moved to trashcan".format(path))
+
def trash_base_dir(self):
if self.legacy_mode:
self.fs.unlink(self.legacy_config_path)
- subvol_path = self.base_path
- create_trashcan(self.fs, self.vol_spec)
- with open_trashcan(self.fs, self.vol_spec) as trashcan:
- trashcan.dump(subvol_path)
- log.info("subvolume with path '{0}' moved to trashcan".format(subvol_path))
+ self._trash_dir(self.base_path)
def create_base_dir(self, mode):
try:
raise VolumeException(-e.args[0], e.args[1])
def info (self):
- subvolpath = self.metadata_mgr.get_global_option('path')
- etype = SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
+ subvolpath = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH)
+ etype = self.subvol_type
st = self.fs.statx(subvolpath, cephfs.CEPH_STATX_BTIME | cephfs.CEPH_STATX_SIZE |
cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID |
cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_ATIME |
'mode': int(st["mode"]), 'data_pool': data_pool, 'created_at': str(st["btime"]),
'bytes_quota': "infinite" if nsize == 0 else nsize, 'bytes_used': int(usedbytes),
'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0),
- 'pool_namespace': pool_namespace, 'features': self.features}
+ 'pool_namespace': pool_namespace, 'features': self.features, 'state': self.state.value}
import cephfs
from .metadata_manager import MetadataManager
-from .subvolume_base import SubvolumeBase, SubvolumeFeatures
+from .subvolume_base import SubvolumeBase
+from .subvolume_base import SubvolumeTypes
+from .subvolume_base import SubvolumeFeatures
from ..op_sm import SubvolumeOpSm
-from ..op_sm import SubvolumeTypes
from ..op_sm import SubvolumeStates
from ..template import SubvolumeTemplate
from ..snapshot_util import mksnap, rmsnap
log = logging.getLogger(__name__)
class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
+ """
+ Version 1 subvolumes creates a subvolume with path as follows,
+ volumes/<group-name>/<subvolume-name>/<uuid>/
+
+ - The directory under which user data resides is <uuid>
+ - Snapshots of the subvolume are taken within the <uuid> directory
+ - A meta file is maintained under the <subvolume-name> directory as a metadata store, typically storing,
+ - global information about the subvolume (version, path, type, state)
+ - snapshots attached to an ongoing clone operation
+ - clone snapshot source if subvolume is a clone of a snapshot
+ - It retains backward compatability with legacy subvolumes by creating the meta file for legacy subvolumes under
+ /volumes/_legacy/ (see legacy_config_path), thus allowing cloning of older legacy volumes that lack the <uuid>
+ component in the path.
+ """
VERSION = 1
@staticmethod
def path(self):
try:
# no need to stat the path -- open() does that
- return self.metadata_mgr.get_global_option('path').encode('utf-8')
+ return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH).encode('utf-8')
except MetadataMgrException as me:
raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
try:
self.metadata_mgr.refresh()
- etype = SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
+ etype = self.subvol_type
if op_type not in self.allowed_ops_by_type(etype):
raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format(
op_type.value, self.subvolname, etype.value))
@property
def status(self):
state = SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
- subvolume_type = SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
+ subvolume_type = self.subvol_type
subvolume_status = {
'state' : state.value
}
if flush:
self.metadata_mgr.flush()
- def remove(self):
+ def remove(self, retainsnaps=False):
+ if retainsnaps:
+ raise VolumeException(-errno.EINVAL, "subvolume '{0}' does not support snapshot retention on delete".format(self.subvolname))
+ if self.list_snapshots():
+ raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname))
self.trash_base_dir()
def resize(self, newsize, noshrink):
self.vol_spec.snapshot_dir_prefix.encode('utf-8'),
snapname.encode('utf-8'))
+ def snapshot_data_path(self, snapname):
+ return self.snapshot_path(snapname)
+
def create_snapshot(self, snapname):
snappath = self.snapshot_path(snapname)
mksnap(self.fs, snappath)
except cephfs.Error as e:
if e.errno == errno.ENOENT:
raise VolumeException(-errno.ENOENT,
- "snapshot '{0}' doesnot exist".format(snapname))
+ "snapshot '{0}' does not exist".format(snapname))
raise VolumeException(-e.args[0], e.args[1])
def list_snapshots(self):
--- /dev/null
+import os
+import stat
+import uuid
+import errno
+import logging
+from datetime import datetime
+
+import cephfs
+
+from .metadata_manager import MetadataManager
+from .subvolume_base import SubvolumeBase
+from .subvolume_base import SubvolumeTypes
+from .subvolume_base import SubvolumeFeatures
+from ..op_sm import SubvolumeOpSm
+from ..op_sm import SubvolumeStates
+from ..template import SubvolumeTemplate
+from ..snapshot_util import mksnap, rmsnap
+from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException
+from ...fs_util import listdir
+from ..template import SubvolumeOpType
+
+from ..clone_index import open_clone_index, create_clone_index
+
+log = logging.getLogger(__name__)
+
+class SubvolumeV2(SubvolumeBase, SubvolumeTemplate):
+ """
+ Version 2 subvolumes creates a subvolume with path as follows,
+ volumes/<group-name>/<subvolume-name>/<uuid>/
+
+ The distinguishing feature of V2 subvolume as compared to V1 subvolumes is its ability to retain snapshots
+ of a subvolume on removal. This is done by creating snapshots under the <subvolume-name> directory,
+ rather than under the <uuid> directory, as is the case of V1 subvolumes.
+
+ - The directory under which user data resides is <uuid>
+ - Snapshots of the subvolume are taken within the <subvolume-name> directory
+ - A meta file is maintained under the <subvolume-name> directory as a metadata store, storing information similar
+ to V1 subvolumes
+ - On a request to remove subvolume but retain its snapshots, only the <uuid> directory is moved to trash, retaining
+ the rest of the subvolume and its meta file.
+ - The <uuid> directory, when present, is the current incarnation of the subvolume, which may have snapshots of
+ older incarnations of the same subvolume.
+ - V1 subvolumes that currently do not have any snapshots are upgraded to V2 subvolumes automatically, to support the
+ snapshot retention feature
+ """
+ VERSION = 2
+
+ @staticmethod
+ def version():
+ return SubvolumeV2.VERSION
+
+ @property
+ def path(self):
+ try:
+ # no need to stat the path -- open() does that
+ return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH).encode('utf-8')
+ except MetadataMgrException as me:
+ raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
+
+ @property
+ def features(self):
+ return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value,
+ SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value,
+ SubvolumeFeatures.FEATURE_SNAPSHOT_RETENTION.value]
+
+ def _set_incarnation_metadata(self, subvolume_type, qpath, initial_state):
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_TYPE, subvolume_type.value)
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, qpath)
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, initial_state.value)
+
+ def create(self, size, isolate_nspace, pool, mode, uid, gid):
+ subvolume_type = SubvolumeTypes.TYPE_NORMAL
+ try:
+ initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
+ except OpSmException as oe:
+ raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
+
+ subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
+ try:
+ self.fs.mkdirs(subvol_path, mode)
+ self.set_attrs(subvol_path, size, isolate_nspace, pool, uid, gid)
+
+ # persist subvolume metadata
+ qpath = subvol_path.decode('utf-8')
+ try:
+ self.metadata_mgr.refresh()
+ if self.state == SubvolumeStates.STATE_RETAINED:
+ self._set_incarnation_metadata(subvolume_type, qpath, initial_state)
+ self.metadata_mgr.flush()
+ else:
+ raise VolumeException(-errno.EINVAL, "invalid state for subvolume '{0}' during create".format(self.subvolname))
+ except MetadataMgrException as me:
+ if me.errno != -errno.ENOENT:
+ raise
+ self.init_config(SubvolumeV2.VERSION, subvolume_type, qpath, initial_state)
+ except (VolumeException, MetadataMgrException, cephfs.Error) as e:
+ try:
+ log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
+ self.remove()
+ except VolumeException as ve:
+ log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
+
+ if isinstance(e, MetadataMgrException):
+ log.error("metadata manager exception: {0}".format(e))
+ e = VolumeException(-errno.EINVAL, "exception in subvolume metadata")
+ elif isinstance(e, cephfs.Error):
+ e = VolumeException(-e.args[0], e.args[1])
+ raise e
+
+ def add_clone_source(self, volname, subvolume, snapname, flush=False):
+ self.metadata_mgr.add_section("source")
+ self.metadata_mgr.update_section("source", "volume", volname)
+ if not subvolume.group.is_default_group():
+ self.metadata_mgr.update_section("source", "group", subvolume.group_name)
+ self.metadata_mgr.update_section("source", "subvolume", subvolume.subvol_name)
+ self.metadata_mgr.update_section("source", "snapshot", snapname)
+ if flush:
+ self.metadata_mgr.flush()
+
+ def remove_clone_source(self, flush=False):
+ self.metadata_mgr.remove_section("source")
+ if flush:
+ self.metadata_mgr.flush()
+
+ def create_clone(self, pool, source_volname, source_subvolume, snapname):
+ subvolume_type = SubvolumeTypes.TYPE_CLONE
+ try:
+ initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
+ except OpSmException as oe:
+ raise VolumeException(-errno.EINVAL, "clone failed: internal error")
+
+ subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
+ try:
+ stx = self.fs.statx(source_subvolume.snapshot_data_path(snapname),
+ cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID,
+ cephfs.AT_SYMLINK_NOFOLLOW)
+ uid= stx.get('uid')
+ gid = stx.get('gid')
+ stx_mode = stx.get('mode')
+ if stx_mode is not None:
+ mode = stx_mode & ~stat.S_IFMT(stx_mode)
+ else:
+ mode = None
+
+ # create directory and set attributes
+ self.fs.mkdirs(subvol_path, mode)
+ self.set_attrs(subvol_path, None, None, pool, uid, gid)
+
+ # persist subvolume metadata and clone source
+ qpath = subvol_path.decode('utf-8')
+ try:
+ self.metadata_mgr.refresh()
+ if self.state == SubvolumeStates.STATE_RETAINED:
+ self._set_incarnation_metadata(subvolume_type, qpath, initial_state)
+ else:
+ raise VolumeException(-errno.EINVAL, "invalid state for subvolume '{0}' during clone".format(self.subvolname))
+ except MetadataMgrException as me:
+ if me.errno != -errno.ENOENT:
+ raise
+ self.metadata_mgr.init(SubvolumeV2.VERSION, subvolume_type.value, qpath, initial_state.value)
+ self.add_clone_source(source_volname, source_subvolume, snapname)
+ self.metadata_mgr.flush()
+ except (VolumeException, MetadataMgrException, cephfs.Error) as e:
+ try:
+ log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
+ self.remove()
+ except VolumeException as ve:
+ log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
+
+ if isinstance(e, MetadataMgrException):
+ log.error("metadata manager exception: {0}".format(e))
+ e = VolumeException(-errno.EINVAL, "exception in subvolume metadata")
+ elif isinstance(e, cephfs.Error):
+ e = VolumeException(-e.args[0], e.args[1])
+ raise e
+
+ def allowed_ops_by_type(self, vol_type):
+ if vol_type == SubvolumeTypes.TYPE_CLONE:
+ return {op_type for op_type in SubvolumeOpType}
+
+ if vol_type == SubvolumeTypes.TYPE_NORMAL:
+ return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS,
+ SubvolumeOpType.CLONE_CANCEL,
+ SubvolumeOpType.CLONE_INTERNAL}
+
+ return {}
+
+ def allowed_ops_by_state(self, vol_state):
+ if vol_state == SubvolumeStates.STATE_COMPLETE:
+ return {op_type for op_type in SubvolumeOpType}
+
+ if vol_state == SubvolumeStates.STATE_RETAINED:
+ return {
+ SubvolumeOpType.REMOVE,
+ SubvolumeOpType.REMOVE_FORCE,
+ SubvolumeOpType.LIST,
+ SubvolumeOpType.INFO,
+ SubvolumeOpType.SNAP_REMOVE,
+ SubvolumeOpType.SNAP_LIST,
+ SubvolumeOpType.SNAP_INFO,
+ SubvolumeOpType.SNAP_PROTECT,
+ SubvolumeOpType.SNAP_UNPROTECT,
+ SubvolumeOpType.CLONE_SOURCE
+ }
+
+ return {SubvolumeOpType.REMOVE_FORCE,
+ SubvolumeOpType.CLONE_CREATE,
+ SubvolumeOpType.CLONE_STATUS,
+ SubvolumeOpType.CLONE_CANCEL,
+ SubvolumeOpType.CLONE_INTERNAL,
+ SubvolumeOpType.CLONE_SOURCE}
+
+ def open(self, op_type):
+ if not isinstance(op_type, SubvolumeOpType):
+ raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format(
+ op_type.value, self.subvolname))
+ try:
+ self.metadata_mgr.refresh()
+
+ etype = self.subvol_type
+ if op_type not in self.allowed_ops_by_type(etype):
+ raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format(
+ op_type.value, self.subvolname, etype.value))
+
+ estate = self.state
+ if op_type not in self.allowed_ops_by_state(estate) and estate == SubvolumeStates.STATE_RETAINED:
+ raise VolumeException(-errno.ENOENT, "subvolume '{0}' is removed and has only snapshots retained".format(
+ self.subvolname))
+
+ if op_type not in self.allowed_ops_by_state(estate) and estate != SubvolumeStates.STATE_RETAINED:
+ raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format(
+ self.subvolname, op_type.value))
+
+ if estate != SubvolumeStates.STATE_RETAINED:
+ subvol_path = self.path
+ log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path))
+ st = self.fs.stat(subvol_path)
+
+ self.uid = int(st.st_uid)
+ self.gid = int(st.st_gid)
+ self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode))
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname))
+ raise VolumeException(me.args[0], me.args[1])
+ except cephfs.ObjectNotFound:
+ log.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path, self.subvolname))
+ raise VolumeException(-errno.ENOENT, "mount path missing for subvolume '{0}'".format(self.subvolname))
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def _get_clone_source(self):
+ try:
+ clone_source = {
+ 'volume' : self.metadata_mgr.get_option("source", "volume"),
+ 'subvolume': self.metadata_mgr.get_option("source", "subvolume"),
+ 'snapshot' : self.metadata_mgr.get_option("source", "snapshot"),
+ }
+
+ try:
+ clone_source["group"] = self.metadata_mgr.get_option("source", "group")
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT:
+ pass
+ else:
+ raise
+ except MetadataMgrException as me:
+ raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
+ return clone_source
+
+ @property
+ def status(self):
+ state = SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
+ subvolume_type = self.subvol_type
+ subvolume_status = {
+ 'state' : state.value
+ }
+ if not SubvolumeOpSm.is_complete_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE:
+ subvolume_status["source"] = self._get_clone_source()
+ return subvolume_status
+
+ @property
+ def state(self):
+ return SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
+
+ @state.setter
+ def state(self, val):
+ state = val[0].value
+ flush = val[1]
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, state)
+ if flush:
+ self.metadata_mgr.flush()
+
+ @property
+ def type(self):
+ return SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
+
+ def trash_incarnation_dir(self):
+ self._trash_dir(self.path)
+
+ def remove(self, retainsnaps=False):
+ if self.list_snapshots():
+ if not retainsnaps:
+ raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname))
+ self.trash_incarnation_dir()
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, "")
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, SubvolumeStates.STATE_RETAINED.value)
+ self.metadata_mgr.flush()
+ else:
+ self.trash_base_dir()
+
+ def resize(self, newsize, noshrink):
+ subvol_path = self.path
+ return self._resize(subvol_path, newsize, noshrink)
+
+ def info(self):
+ if self.state != SubvolumeStates.STATE_RETAINED:
+ return super(SubvolumeV2, self).info()
+
+ return {'type': self.subvol_type.value, 'features': self.features, 'state': SubvolumeStates.STATE_RETAINED.value}
+
+ def snapshot_path(self, snapname):
+ return os.path.join(self.base_path,
+ self.vol_spec.snapshot_dir_prefix.encode('utf-8'),
+ snapname.encode('utf-8'))
+
+ @staticmethod
+ def is_valid_uuid(uuid_str):
+ try:
+ uuid.UUID(uuid_str)
+ return True
+ except ValueError:
+ return False
+
+ def snapshot_data_path(self, snapname):
+ snap_base_path = self.snapshot_path(snapname)
+ uuid_str = None
+ try:
+ with self.fs.opendir(snap_base_path) as dir_handle:
+ d = self.fs.readdir(dir_handle)
+ while d:
+ if d.d_name not in (b".", b".."):
+ d_full_path = os.path.join(snap_base_path, d.d_name)
+ stx = self.fs.statx(d_full_path, cephfs.CEPH_STATX_MODE, cephfs.AT_SYMLINK_NOFOLLOW)
+ if stat.S_ISDIR(stx.get('mode')):
+ if self.is_valid_uuid(d.d_name.decode('utf-8')):
+ uuid_str = d.d_name
+ d = self.fs.readdir(dir_handle)
+ except cephfs.Error as e:
+ if e.errno == errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ raise VolumeException(-e.args[0], e.args[1])
+
+ if not uuid_str:
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+
+ return os.path.join(snap_base_path, uuid_str)
+
+ def create_snapshot(self, snapname):
+ snappath = self.snapshot_path(snapname)
+ mksnap(self.fs, snappath)
+
+ def has_pending_clones(self, snapname):
+ try:
+ return self.metadata_mgr.section_has_item('clone snaps', snapname)
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT:
+ return False
+ raise
+
+ def remove_snapshot(self, snapname):
+ if self.has_pending_clones(snapname):
+ raise VolumeException(-errno.EAGAIN, "snapshot '{0}' has pending clones".format(snapname))
+ snappath = self.snapshot_path(snapname)
+ rmsnap(self.fs, snappath)
+ if self.state == SubvolumeStates.STATE_RETAINED and not self.list_snapshots():
+ self.trash_base_dir()
+ # tickle the volume purge job to purge this entry, using ESTALE
+ raise VolumeException(-errno.ESTALE, "subvolume '{0}' has been removed as the last retained snapshot is removed".format(self.subvolname))
+
+ def snapshot_info(self, snapname):
+ snappath = self.snapshot_data_path(snapname)
+ snap_info = {}
+ try:
+ snap_attrs = {'created_at':'ceph.snap.btime', 'size':'ceph.dir.rbytes',
+ 'data_pool':'ceph.dir.layout.pool'}
+ for key, val in snap_attrs.items():
+ snap_info[key] = self.fs.getxattr(snappath, val)
+ return {'size': int(snap_info['size']),
+ 'created_at': str(datetime.fromtimestamp(float(snap_info['created_at']))),
+ 'data_pool': snap_info['data_pool'].decode('utf-8'),
+ 'has_pending_clones': "yes" if self.has_pending_clones(snapname) else "no"}
+ except cephfs.Error as e:
+ if e.errno == errno.ENOENT:
+ raise VolumeException(-errno.ENOENT,
+ "snapshot '{0}' does not exist".format(snapname))
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def list_snapshots(self):
+ try:
+ dirpath = os.path.join(self.base_path,
+ self.vol_spec.snapshot_dir_prefix.encode('utf-8'))
+ return listdir(self.fs, dirpath)
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ return []
+ raise
+
+ def _add_snap_clone(self, track_id, snapname):
+ self.metadata_mgr.add_section("clone snaps")
+ self.metadata_mgr.update_section("clone snaps", track_id, snapname)
+ self.metadata_mgr.flush()
+
+ def _remove_snap_clone(self, track_id):
+ self.metadata_mgr.remove_option("clone snaps", track_id)
+ self.metadata_mgr.flush()
+
+ def attach_snapshot(self, snapname, tgt_subvolume):
+ if not snapname.encode('utf-8') in self.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ try:
+ create_clone_index(self.fs, self.vol_spec)
+ with open_clone_index(self.fs, self.vol_spec) as index:
+ track_idx = index.track(tgt_subvolume.base_path)
+ self._add_snap_clone(track_idx, snapname)
+ except (IndexException, MetadataMgrException) as e:
+ log.warning("error creating clone index: {0}".format(e))
+ raise VolumeException(-errno.EINVAL, "error cloning subvolume")
+
+ def detach_snapshot(self, snapname, track_id):
+ if not snapname.encode('utf-8') in self.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ try:
+ with open_clone_index(self.fs, self.vol_spec) as index:
+ index.untrack(track_id)
+ self._remove_snap_clone(track_id)
+ except (IndexException, MetadataMgrException) as e:
+ log.warning("error delining snapshot from clone: {0}".format(e))
+ raise VolumeException(-errno.EINVAL, "error delinking snapshot from clone")
from .operations.volume import create_volume, \
delete_volume, list_volumes, open_volume, get_pool_names
-from .operations.group import open_group, create_group, remove_group
+from .operations.group import open_group, create_group, remove_group, open_group_unique
from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
create_clone
return ret
def remove_subvolume(self, **kwargs):
- ret = 0, "", ""
- volname = kwargs['vol_name']
- subvolname = kwargs['sub_name']
- groupname = kwargs['group_name']
- force = kwargs['force']
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ force = kwargs['force']
+ retainsnaps = kwargs['retain_snapshots']
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- remove_subvol(fs_handle, self.volspec, group, subvolname, force)
+ remove_subvol(fs_handle, self.volspec, group, subvolname, force, retainsnaps)
# kick the purge threads for async removal -- note that this
# assumes that the subvolume is moved to trash can.
# TODO: make purge queue as singleton so that trash can kicks
with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_REMOVE) as subvolume:
subvolume.remove_snapshot(snapname)
except VolumeException as ve:
- if not (ve.errno == -errno.ENOENT and force):
+ # ESTALE serves as an error to state that subvolume is currently stale due to internal removal and,
+ # we should tickle the purge jobs to purge the same
+ if ve.errno == -errno.ESTALE:
+ self.purge_queue.queue_job(volname)
+ elif not (ve.errno == -errno.ENOENT and force):
ret = self.volume_exception_to_retval(ve)
return ret
ret = self.volume_exception_to_retval(ve)
return ret
- def _prepare_clone_subvolume(self, fs_handle, volname, subvolume, snapname, target_group, target_subvolname, target_pool):
- create_clone(fs_handle, self.volspec, target_group, target_subvolname, target_pool, volname, subvolume, snapname)
- with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_INTERNAL) as target_subvolume:
+ def _prepare_clone_subvolume(self, fs_handle, volname, s_subvolume, s_snapname, t_group, t_subvolname, **kwargs):
+ t_pool = kwargs['pool_layout']
+ s_subvolname = kwargs['sub_name']
+ s_groupname = kwargs['group_name']
+ t_groupname = kwargs['target_group_name']
+
+ create_clone(fs_handle, self.volspec, t_group, t_subvolname, t_pool, volname, s_subvolume, s_snapname)
+ with open_subvol(fs_handle, self.volspec, t_group, t_subvolname, SubvolumeOpType.CLONE_INTERNAL) as t_subvolume:
try:
- subvolume.attach_snapshot(snapname, target_subvolume)
+ if t_groupname == s_groupname and t_subvolname == s_subvolname:
+ t_subvolume.attach_snapshot(s_snapname, t_subvolume)
+ else:
+ s_subvolume.attach_snapshot(s_snapname, t_subvolume)
self.cloner.queue_job(volname)
except VolumeException as ve:
try:
- target_subvolume.remove()
+ t_subvolume.remove()
self.purge_queue.queue_job(volname)
except Exception as e:
- log.warning("failed to cleanup clone subvolume '{0}' ({1})".format(target_subvolname, e))
+ log.warning("failed to cleanup clone subvolume '{0}' ({1})".format(t_subvolname, e))
raise ve
- def _clone_subvolume_snapshot(self, fs_handle, volname, subvolume, **kwargs):
- snapname = kwargs['snap_name']
- target_pool = kwargs['pool_layout']
- target_subvolname = kwargs['target_sub_name']
- target_groupname = kwargs['target_group_name']
+ def _clone_subvolume_snapshot(self, fs_handle, volname, s_group, s_subvolume, **kwargs):
+ s_snapname = kwargs['snap_name']
+ target_subvolname = kwargs['target_sub_name']
+ target_groupname = kwargs['target_group_name']
+ s_groupname = kwargs['group_name']
- if not snapname.encode('utf-8') in subvolume.list_snapshots():
- raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ if not s_snapname.encode('utf-8') in s_subvolume.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(s_snapname))
- # TODO: when the target group is same as source, reuse group object.
- with open_group(fs_handle, self.volspec, target_groupname) as target_group:
+ with open_group_unique(fs_handle, self.volspec, target_groupname, s_group, s_groupname) as target_group:
try:
with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_CREATE):
raise VolumeException(-errno.EEXIST, "subvolume '{0}' exists".format(target_subvolname))
except VolumeException as ve:
if ve.errno == -errno.ENOENT:
- self._prepare_clone_subvolume(fs_handle, volname, subvolume, snapname,
- target_group, target_subvolname, target_pool)
+ self._prepare_clone_subvolume(fs_handle, volname, s_subvolume, s_snapname,
+ target_group, target_subvolname, **kwargs)
else:
raise
def clone_subvolume_snapshot(self, **kwargs):
ret = 0, "", ""
volname = kwargs['vol_name']
- subvolname = kwargs['sub_name']
- groupname = kwargs['group_name']
+ s_subvolname = kwargs['sub_name']
+ s_groupname = kwargs['group_name']
try:
with open_volume(self, volname) as fs_handle:
- with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CLONE_SOURCE) as subvolume:
- self._clone_subvolume_snapshot(fs_handle, volname, subvolume, **kwargs)
+ with open_group(fs_handle, self.volspec, s_groupname) as s_group:
+ with open_subvol(fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
+ self._clone_subvolume_snapshot(fs_handle, volname, s_group, s_subvolume, **kwargs)
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
'name=vol_name,type=CephString '
'name=sub_name,type=CephString '
'name=group_name,type=CephString,req=false '
- 'name=force,type=CephBool,req=false ',
+ 'name=force,type=CephBool,req=false '
+ 'name=retain_snapshots,type=CephBool,req=false ',
'desc': "Delete a CephFS subvolume in a volume, and optionally, "
- "in a specific subvolume group",
+ "in a specific subvolume group, force deleting a cancelled or failed "
+ "clone, and retaining existing subvolume snapshots",
'perm': 'rw'
},
{
return self.vc.remove_subvolume(vol_name=cmd['vol_name'],
sub_name=cmd['sub_name'],
group_name=cmd.get('group_name', None),
- force=cmd.get('force', False))
+ force=cmd.get('force', False),
+ retain_snapshots=cmd.get('retain_snapshots', False))
def _cmd_fs_subvolume_ls(self, inbuf, cmd):
return self.vc.list_subvolumes(vol_name=cmd['vol_name'],