from .template import SubvolumeOpType
from .versions import loaded_subvolumes
-def create_subvol(mgr, fs, vol_spec, group, subvolname, size, isolate_nspace, pool, mode, uid, gid):
+def create_subvol(mgr, fs, vol_spec, group, subvolname, size, isolate_nspace, pool, mode, uid, gid, earmark):
"""
create a subvolume (create a subvolume with the max known version).
:param mode: the user permissions
:param uid: the user identifier
:param gid: the group identifier
+ :param earmark: metadata string to identify if subvolume is associated with nfs/smb
:return: None
"""
subvolume = loaded_subvolumes.get_subvolume_object_max(mgr, fs, vol_spec, group, subvolname)
- subvolume.create(size, isolate_nspace, pool, mode, uid, gid)
+ subvolume.create(size, isolate_nspace, pool, mode, uid, gid, earmark)
def create_clone(mgr, fs, vol_spec, group, subvolname, pool, source_volume, source_subvolume, snapname):
SNAP_METADATA_GET = 'snap-metadata-get'
SNAP_METADATA_LIST = 'snap-metadata-ls'
SNAP_METADATA_REMOVE = 'snap-metadata-rm'
+ EARMARK_GET = 'earmark-get'
+ EARMARK_SET = 'earmark-set'
+ EARMARK_CLEAR = 'earmark-clear'
class SubvolumeTemplate(object):
VERSION = None # type: int
from .auth_metadata import AuthMetadataManager
from .subvolume_attrs import SubvolumeStates
+from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException # type: ignore
+
log = logging.getLogger(__name__)
except cephfs.NoData:
attrs["quota"] = None
+ try:
+ fs_earmark = CephFSVolumeEarmarking(self.fs, pathname)
+ attrs["earmark"] = fs_earmark.get_earmark()
+ except cephfs.NoData:
+ attrs["earmark"] = ''
+ except EarmarkException:
+ attrs["earmark"] = ''
+
return attrs
def set_attrs(self, path, attrs):
if mode is not None:
self.fs.lchmod(path, mode)
+ # set earmark
+ earmark = attrs.get("earmark")
+ if earmark is not None:
+ fs_earmark = CephFSVolumeEarmarking(self.fs, path)
+ fs_earmark.set_earmark(earmark)
+
def _resize(self, path, newsize, noshrink):
try:
newsize = int(newsize)
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
+ try:
+ fs_earmark = CephFSVolumeEarmarking(self.fs, subvolpath)
+ earmark = fs_earmark.get_earmark()
+ except cephfs.NoData:
+ earmark = ''
+ except EarmarkException:
+ earmark = ''
+
return {'path': subvolpath,
'type': etype.value,
'uid': int(st["uid"]),
if nsize == 0
else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0),
'pool_namespace': pool_namespace,
- 'features': self.features, 'state': self.state.value}
+ 'features': self.features,
+ 'state': self.state.value,
+ 'earmark': earmark}
def set_user_metadata(self, keyname, value):
try:
""" Path to user data directory within a subvolume snapshot named 'snapname' """
return self.snapshot_path(snapname)
- def create(self, size, isolate_nspace, pool, mode, uid, gid):
+ def create(self, size, isolate_nspace, pool, mode, uid, gid, earmark):
subvolume_type = SubvolumeTypes.TYPE_NORMAL
try:
initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
'gid': gid,
'data_pool': pool,
'pool_namespace': self.namespace if isolate_nspace else None,
- 'quota': size
+ 'quota': size,
+ 'earmark': earmark
}
self.set_attrs(subvol_path, attrs)
self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, qpath)
self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, initial_state.value)
- def create(self, size, isolate_nspace, pool, mode, uid, gid):
+ def create(self, size, isolate_nspace, pool, mode, uid, gid, earmark):
subvolume_type = SubvolumeTypes.TYPE_NORMAL
try:
initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
'gid': gid,
'data_pool': pool,
'pool_namespace': self.namespace if isolate_nspace else None,
- 'quota': size
+ 'quota': size,
+ 'earmark': earmark
}
self.set_attrs(subvol_path, attrs)
import mgr_util
import inspect
import functools
-from typing import TYPE_CHECKING, Any, Callable, Optional
+from typing import TYPE_CHECKING, Any, Callable, Optional, Tuple
from urllib.parse import urlsplit, urlunsplit
import cephfs
+from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException # type: ignore
+
from mgr_util import CephfsClient
from .fs_util import listdir, has_subdir
gid = kwargs['gid']
mode = kwargs['mode']
isolate_nspace = kwargs['namespace_isolated']
+ earmark = kwargs['earmark'] or '' # if not set, default to empty string --> no earmark
oct_mode = octal_str_to_decimal_int(mode)
+
try:
create_subvol(
- self.mgr, fs_handle, self.volspec, group, subvolname, size, isolate_nspace, pool, oct_mode, uid, gid)
+ self.mgr, fs_handle, self.volspec, group, subvolname, size, isolate_nspace, pool, oct_mode, uid, gid, earmark)
except VolumeException as ve:
# kick the purge threads for async removal -- note that this
# assumes that the subvolume is moved to trashcan for cleanup on error.
gid = kwargs['gid']
mode = kwargs['mode']
isolate_nspace = kwargs['namespace_isolated']
+ earmark = kwargs['earmark'] or '' # if not set, default to empty string --> no earmark
try:
with open_volume(self, volname) as fs_handle:
'mode': octal_str_to_decimal_int(mode),
'data_pool': pool,
'pool_namespace': subvolume.namespace if isolate_nspace else None,
- 'quota': size
+ 'quota': size,
+ 'earmark': earmark
}
subvolume.set_attrs(subvolume.path, attrs)
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
+ def get_earmark(self, **kwargs) -> Tuple[int, Optional[str], str]:
+ ret: Tuple[int, Optional[str], str] = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.EARMARK_GET) as subvolume:
+ log.info("Getting earmark for subvolume %s", subvolume.path)
+ fs_earmark = CephFSVolumeEarmarking(fs_handle, subvolume.path)
+ earmark = fs_earmark.get_earmark()
+ ret = 0, earmark, ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ except EarmarkException as ee:
+ log.error(f"Earmark error occurred: {ee}")
+ ret = ee.to_tuple()
+ return ret
+
+ def set_earmark(self, **kwargs): # type: ignore
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ earmark = kwargs['earmark']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.EARMARK_SET) as subvolume:
+ log.info("Setting earmark %s for subvolume %s", earmark, subvolume.path)
+ fs_earmark = CephFSVolumeEarmarking(fs_handle, subvolume.path)
+ fs_earmark.set_earmark(earmark)
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ except EarmarkException as ee:
+ log.error(f"Earmark error occurred: {ee}")
+ ret = ee.to_tuple() # type: ignore
+ return ret
+
+ def clear_earmark(self, **kwargs): # type: ignore
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.EARMARK_CLEAR) as subvolume:
+ log.info("Removing earmark for subvolume %s", subvolume.path)
+ fs_earmark = CephFSVolumeEarmarking(fs_handle, subvolume.path)
+ fs_earmark.clear_earmark()
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ except EarmarkException as ee:
+ log.error(f"Earmark error occurred: {ee}")
+ ret = ee.to_tuple() # type: ignore
+ return ret
+
### subvolume snapshot
def create_subvolume_snapshot(self, **kwargs):
'name=uid,type=CephInt,req=false '
'name=gid,type=CephInt,req=false '
'name=mode,type=CephString,req=false '
- 'name=namespace_isolated,type=CephBool,req=false ',
+ 'name=namespace_isolated,type=CephBool,req=false '
+ 'name=earmark,type=CephString,req=false ',
'desc': "Create a CephFS subvolume in a volume, and optionally, "
"with a specific size (in bytes), a specific data pool layout, "
"a specific mode, in a specific subvolume group and in separate "
"and optionally, in a specific subvolume group",
'perm': 'rw'
},
+ {
+ 'cmd': 'fs subvolume earmark get '
+ 'name=vol_name,type=CephString '
+ 'name=sub_name,type=CephString '
+ 'name=group_name,type=CephString,req=false ',
+ 'desc': "Get earmark for a subvolume",
+ 'perm': 'r'
+ },
+ {
+ 'cmd': 'fs subvolume earmark set '
+ 'name=vol_name,type=CephString '
+ 'name=sub_name,type=CephString '
+ 'name=group_name,type=CephString,req=false '
+ 'name=earmark,type=CephString ',
+ 'desc': "Set earmark for a subvolume",
+ 'perm': 'rw'
+ },
+ {
+ 'cmd': 'fs subvolume earmark rm '
+ 'name=vol_name,type=CephString '
+ 'name=sub_name,type=CephString '
+ 'name=group_name,type=CephString,req=false ',
+ 'desc': "Remove earmark from a subvolume",
+ 'perm': 'rw'
+ },
{
'cmd': 'fs quiesce '
'name=vol_name,type=CephString '
group_name=cmd['group_name'],
new_size=cmd['new_size'],
no_shrink=cmd.get('no_shrink', False))
+
@mgr_cmd_wrap
def _cmd_fs_subvolumegroup_ls(self, inbuf, cmd):
return self.vc.list_subvolume_groups(vol_name=cmd['vol_name'])
uid=cmd.get('uid', None),
gid=cmd.get('gid', None),
mode=cmd.get('mode', '755'),
- namespace_isolated=cmd.get('namespace_isolated', False))
+ namespace_isolated=cmd.get('namespace_isolated', False),
+ earmark=cmd.get('earmark', None))
@mgr_cmd_wrap
def _cmd_fs_subvolume_rm(self, inbuf, cmd):
def _cmd_fs_subvolume_exist(self, inbuf, cmd):
return self.vc.subvolume_exists(vol_name=cmd['vol_name'],
group_name=cmd.get('group_name', None))
-
+
@mgr_cmd_wrap
def _cmd_fs_subvolume_metadata_set(self, inbuf, cmd):
return self.vc.set_user_metadata(vol_name=cmd['vol_name'],
key_name=cmd['key_name'],
group_name=cmd.get('group_name', None),
force=cmd.get('force', False))
-
+
+ @mgr_cmd_wrap
+ def _cmd_fs_subvolume_earmark_get(self, inbuf, cmd):
+ return self.vc.get_earmark(vol_name=cmd['vol_name'],
+ sub_name=cmd['sub_name'],
+ group_name=cmd.get('group_name', None))
+
+ @mgr_cmd_wrap
+ def _cmd_fs_subvolume_earmark_set(self, inbuf, cmd):
+ return self.vc.set_earmark(vol_name=cmd['vol_name'],
+ sub_name=cmd['sub_name'],
+ group_name=cmd.get('group_name', None),
+ earmark=cmd['earmark'])
+
+ @mgr_cmd_wrap
+ def _cmd_fs_subvolume_earmark_rm(self, inbuf, cmd):
+ return self.vc.clear_earmark(vol_name=cmd['vol_name'],
+ sub_name=cmd['sub_name'],
+ group_name=cmd.get('group_name', None))
+
@mgr_cmd_wrap
def _cmd_fs_quiesce(self, inbuf, cmd):
return self.vc.quiesce(cmd)
--- /dev/null
+import logging
+
+log = logging.getLogger(__name__)
--- /dev/null
+"""
+Module: CephFS Volume Earmarking
+
+This module provides the `CephFSVolumeEarmarking` class, which is designed to manage the earmarking
+of subvolumes within a CephFS filesystem. The earmarking mechanism allows
+administrators to tag specific subvolumes with identifiers that indicate their intended use
+such as NFS or SMB, ensuring that only one file service is assigned to a particular subvolume
+at a time. This is crucial to prevent data corruption in environments where
+mixed protocol support (NFS and SMB) is not yet available.
+
+Key Features:
+- **Set Earmark**: Assigns an earmark to a subvolume.
+- **Get Earmark**: Retrieves the existing earmark of a subvolume, if any.
+- **Remove Earmark**: Removes the earmark from a subvolume, making it available for reallocation.
+- **Validate Earmark**: Ensures that the earmark follows the correct format and only uses
+supported top-level scopes.
+"""
+
+import errno
+import enum
+import logging
+from typing import Optional, Tuple
+
+log = logging.getLogger(__name__)
+
+XATTR_SUBVOLUME_EARMARK_NAME = 'user.ceph.subvolume.earmark'
+
+
+class EarmarkTopScope(enum.Enum):
+ NFS = "nfs"
+ SMB = "smb"
+
+
+class EarmarkException(Exception):
+ def __init__(self, error_code: int, error_message: str) -> None:
+ self.errno = error_code
+ self.error_str = error_message
+
+ def to_tuple(self) -> Tuple[int, Optional[str], str]:
+ return self.errno, "", self.error_str
+
+ def __str__(self) -> str:
+ return f"{self.errno} ({self.error_str})"
+
+
+class CephFSVolumeEarmarking:
+ def __init__(self, fs, path: str) -> None:
+ self.fs = fs
+ self.path = path
+
+ def _handle_cephfs_error(self, e: Exception, action: str) -> None:
+ if isinstance(e, ValueError):
+ raise EarmarkException(errno.EINVAL, f"Invalid earmark specified: {e}") from e
+ elif isinstance(e, OSError):
+ log.error(f"Error {action} earmark: {e}")
+ raise EarmarkException(-e.errno, e.strerror) from e
+ else:
+ log.error(f"Unexpected error {action} earmark: {e}")
+ raise EarmarkException(errno.EIO, "Unexpected error") from e
+
+ def _validate_earmark(self, earmark: str) -> bool:
+ """
+ Validates that the earmark string is either empty or composed of parts separated by scopes,
+ with the top-level scope being either 'nfs' or 'smb'.
+
+ :param earmark: The earmark string to validate.
+ :return: True if valid, False otherwise.
+ """
+ if not earmark or earmark in (scope.value for scope in EarmarkTopScope):
+ return True
+
+ parts = earmark.split('.')
+
+ if parts[0] not in (scope.value for scope in EarmarkTopScope):
+ return False
+
+ # Check if all parts are non-empty (to ensure valid dot-separated format)
+ return all(parts)
+
+ def get_earmark(self) -> Optional[str]:
+ try:
+ earmark_value = (
+ self.fs.getxattr(self.path, XATTR_SUBVOLUME_EARMARK_NAME)
+ .decode('utf-8')
+ )
+ return earmark_value
+ except Exception as e:
+ self._handle_cephfs_error(e, "getting")
+ return None
+
+ def set_earmark(self, earmark: str):
+ # Validate the earmark before attempting to set it
+ if not self._validate_earmark(earmark):
+ raise EarmarkException(
+ errno.EINVAL,
+ f"Invalid earmark specified: '{earmark}'. "
+ "A valid earmark should either be empty or start with 'nfs' or 'smb', "
+ "followed by dot-separated non-empty components."
+ )
+
+ try:
+ self.fs.setxattr(self.path, XATTR_SUBVOLUME_EARMARK_NAME, earmark.encode('utf-8'), 0)
+ log.info(f"Earmark '{earmark}' set on {self.path}.")
+ except Exception as e:
+ self._handle_cephfs_error(e, "setting")
+
+ def clear_earmark(self) -> None:
+ self.set_earmark("")