def set_quota_on_clone(fs_handle, clone_volumes_pair):
src_path = clone_volumes_pair[1].snapshot_data_path(clone_volumes_pair[2])
dst_path = clone_volumes_pair[0].path
- quota = None # type: Optional[int]
+ quota: Optional[int] = None
try:
quota = int(fs_handle.getxattr(src_path, 'ceph.quota.max_bytes').decode('utf-8'))
except cephfs.NoData:
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
- quota_files = None # type: Optional[int]
+ quota_files: Optional[int] = None
try:
quota_files = int(fs_handle.getxattr(src_path, 'ceph.quota.max_files').decode('utf-8'))
except cephfs.NoData:
def prepare_updated_caps_list(existing_caps, mds_cap_str, osd_cap_str, authorize=True):
- caps_list = [] # type: List[str]
+ caps_list: List[str] = []
for k, v in existing_caps['caps'].items():
if k == 'mds' or k == 'osd':
continue
See: https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F14/projects/reports/project6_report.pdf
"""
- _shared_state = {
+ _shared_state: Dict = {
'lock' : Lock(),
'init' : False
- } # type: Dict
+ }
def __init__(self):
with self._shared_state['lock']:
-import os
import errno
import cephfs
try:
pin_setting = _pin_value[pin_type](pin_setting)
- except ValueError as e:
+ except ValueError:
raise VolumeException(-errno.EINVAL, f"pin value wrong type: {pin_setting}")
try:
import cephfs
from .template import GroupTemplate
-from ..fs_util import listdir
from ..exception import VolumeException
log = logging.getLogger(__name__)
subvolume_type = SubvolumeTypes.TYPE_NORMAL
try:
initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
- except OpSmException as oe:
+ except OpSmException:
raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
qpath = subvolume.base_path.decode('utf-8')
# legacy is only upgradable to v1
import os
import errno
import logging
-import sys
import threading
import configparser
import re
return not(self == other)
class SubvolumeOpSm(object):
- transition_table = {} # type: Dict
+ transition_table: Dict = {}
@staticmethod
def is_complete_state(state):
try:
self.fs.stat(self.legacy_config_path)
self.legacy_mode = True
- except cephfs.Error as e:
+ except cephfs.Error:
pass
log.debug("loading config "
def get_attrs(self, pathname):
# get subvolume attributes
- attrs = {} # type: Dict[str, Union[int, str, None]]
+ attrs: Dict[str, Union[int, str, None]] = {}
stx = self.fs.statx(pathname,
cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID
| cephfs.CEPH_STATX_MODE,
try:
# no need to stat the path -- open() does that
return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH).encode('utf-8')
- except MetadataMgrException as me:
+ except MetadataMgrException:
raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
@property
try:
# MDS treats this as a noop for already marked subvolume
self.fs.setxattr(self.path, 'ceph.dir.subvolume', b'1', 0)
- except cephfs.InvalidValue as e:
+ except cephfs.InvalidValue:
raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume")
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
subvolume_type = SubvolumeTypes.TYPE_NORMAL
try:
initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
- except OpSmException as oe:
+ except OpSmException:
raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
subvolume_type = SubvolumeTypes.TYPE_CLONE
try:
initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
- except OpSmException as oe:
+ except OpSmException:
raise VolumeException(-errno.EINVAL, "clone failed: internal error")
subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
"""
with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
- auths = [] # type: List[Dict[str,str]]
+ auths: List[Dict[str,str]] = []
if not meta or not meta['auths']:
return auths
pass
else:
raise
- except MetadataMgrException as me:
+ except MetadataMgrException:
raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
return clone_source
raise
def get_pending_clones(self, snapname):
- pending_clones_info = {"has_pending_clones": "no"} # type: Dict[str, Any]
+ pending_clones_info: Dict[str, Any] = {"has_pending_clones": "no"}
pending_track_id_list = []
pending_clone_list = []
index_path = ""
# If clone is completed between 'list_all_keys_with_specified_values_from_section'
# and readlink(track_id_path) call then readlink will fail with error ENOENT (2)
# Hence we double check whether track_id is exist in .meta file or not.
- value = self.metadata_mgr.get_option('clone snaps', track_id)
# Edge case scenario.
# If track_id for clone exist but path /volumes/_index/clone/{track_id} not found
# then clone is orphan.
path = Path(link_path.decode('utf-8'))
clone_name = os.path.basename(link_path).decode('utf-8')
group_name = os.path.basename(path.parent.absolute())
- details = {"name": clone_name} # type: Dict[str, str]
+ details = {"name": clone_name}
if group_name != Group.NO_GROUP_NAME:
details["target_group"] = group_name
pending_clone_list.append(details)
snap_info[key] = self.fs.getxattr(snappath, val)
pending_clones_info = self.get_pending_clones(snapname)
info_dict = {'created_at': str(datetime.fromtimestamp(float(snap_info['created_at']))),
- 'data_pool': snap_info['data_pool'].decode('utf-8')} # type: Dict[str, Any]
+ 'data_pool': snap_info['data_pool'].decode('utf-8')}
info_dict.update(pending_clones_info);
return info_dict
except cephfs.Error as e:
from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeFeatures
from .op_sm import SubvolumeOpSm
from .subvolume_v1 import SubvolumeV1
-from ..template import SubvolumeTemplate
from ...exception import OpSmException, VolumeException, MetadataMgrException
from ...fs_util import listdir, create_base_dir
from ..template import SubvolumeOpType
try:
# MDS treats this as a noop for already marked subvolume
self.fs.setxattr(self.base_path, 'ceph.dir.subvolume', b'1', 0)
- except cephfs.InvalidValue as e:
+ except cephfs.InvalidValue:
raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume")
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
subvolume_type = SubvolumeTypes.TYPE_NORMAL
try:
initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
- except OpSmException as oe:
+ except OpSmException:
raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
retained = self.retained
subvolume_type = SubvolumeTypes.TYPE_CLONE
try:
initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
- except OpSmException as oe:
+ except OpSmException:
raise VolumeException(-errno.EINVAL, "clone failed: internal error")
retained = self.retained
"""
fs_map = mgr.get("fs_map")
metadata_pool_id = None
- data_pool_ids = [] # type: List[int]
+ data_pool_ids: List[int] = []
for f in fs_map['filesystems']:
if volname == f['mdsmap']['fs_name']:
metadata_pool_id = f['mdsmap']['metadata_pool']
"""
fs_map = mgr.get("fs_map")
metadata_pool_id = None
- data_pool_ids = [] # type: List[int]
+ data_pool_ids: List[int] = []
for f in fs_map['filesystems']:
if volname == f['mdsmap']['fs_name']:
metadata_pool_id = f['mdsmap']['metadata_pool']
import json
import errno
import logging
-import os
import mgr_util
from typing import TYPE_CHECKING
list_volumes, open_volume, get_pool_names, get_pool_ids, get_pending_subvol_deletions_count
from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
create_clone
-from .operations.trash import Trash
from .vol_spec import VolSpec
from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.EVICT) as subvolume:
- key = subvolume.evict(volname, authid)
+ subvolume.evict(volname, authid)
ret = 0, "", ""
except (VolumeException, ClusterTimeout, ClusterError, EvictionError) as e:
if isinstance(e, VolumeException):
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_PROTECT) as subvolume:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_PROTECT):
log.warning("snapshot protect call is deprecated and will be removed in a future release")
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_UNPROTECT) as subvolume:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_UNPROTECT):
log.warning("snapshot unprotect call is deprecated and will be removed in a future release")
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
try:
with open_volume(self, volname) as fs_handle:
- with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_group(fs_handle, self.volspec, groupname):
# as subvolumes are marked with the vxattr ceph.dir.subvolume deny snapshots
# at the subvolume group (see: https://tracker.ceph.com/issues/46074)
# group.create_snapshot(snapname)