from .template import SubvolumeOpType
from .versions import loaded_subvolumes
-def create_subvol(mgr, fs, vol_spec, group, subvolname, size, isolate_nspace, pool, mode, uid, gid, earmark, normalization, casesensitive):
+def create_subvol(mgr, fs, vol_spec, group, subvolname, size, isolate_nspace, pool, mode, uid, gid, earmark, normalization, casesensitive, enctag):
"""
create a subvolume (create a subvolume with the max known version).
:param earmark: metadata string to identify if subvolume is associated with nfs/smb
:param normalization: the unicode normalization form to use (nfd, nfc, nfkd or nfkc)
:param casesensitive: whether to make the subvolume case insensitive or not
+ :param enctag: metadata string to associate subvolume with an encryption tag
:return: None
"""
subvolume = loaded_subvolumes.get_subvolume_object_max(mgr, fs, vol_spec, group, subvolname)
- subvolume.create(size, isolate_nspace, pool, mode, uid, gid, earmark, normalization, casesensitive)
+ subvolume.create(size, isolate_nspace, pool, mode, uid, gid, earmark, normalization, casesensitive, enctag)
def create_clone(mgr, fs, vol_spec, group, subvolname, pool, source_volume, source_subvolume, snapname):
EARMARK_SET = 'earmark-set'
EARMARK_CLEAR = 'earmark-clear'
SNAPSHOT_VISIBILITY = 'snapshot-visibility'
+ ENCTAG_GET = 'enctag-get'
+ ENCTAG_SET = 'enctag-set'
+ ENCTAG_CLEAR = 'enctag-clear'
class SubvolumeTemplate(object):
VERSION = None # type: int
from .subvolume_attrs import SubvolumeStates
from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException
+from ceph.fs.enctag import CephFSVolumeEncryptionTag, EncryptionTagException
log = logging.getLogger(__name__)
except cephfs.NoData:
attrs["casesensitive"] = True
+ try:
+ fs_enctag = CephFSVolumeEncryptionTag(self.fs, pathname)
+ attrs["enctag"] = fs_enctag.get_tag()
+ except cephfs.NoData:
+ attrs["enctag"] = ''
+ except EncryptionTagException:
+ attrs["enctag"] = ''
+
return attrs
def set_attrs(self, path, attrs):
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
+ # set encryption tag string identifier
+ enctag = attrs.get("enctag", None)
+ if enctag is not None:
+ fs_enctag = CephFSVolumeEncryptionTag(self.fs, path)
+ fs_enctag.set_tag(enctag)
+
def _resize(self, path, newsize, noshrink):
try:
newsize = int(newsize)
except cephfs.NoData:
casesensitive = True
+ try:
+ fs_enctag = CephFSVolumeEncryptionTag(self.fs, subvolpath)
+ enctag = fs_enctag.get_tag()
+ except cephfs.NoData:
+ enctag = ''
+ except EncryptionTagException:
+ enctag = ''
+
subvol_info = {
'path': subvolpath,
'type': etype.value,
'earmark': earmark,
'normalization': normalization,
'casesensitive': casesensitive,
+ 'enctag': enctag,
}
subvol_src_info = self._get_clone_source()
""" Path to user data directory within a subvolume snapshot named 'snapname' """
return self.snapshot_path(snapname)
- def create(self, size, isolate_nspace, pool, mode, uid, gid, earmark, normalization, casesensitive):
+ def create(self, size, isolate_nspace, pool, mode, uid, gid, earmark, normalization, casesensitive, enctag):
subvolume_type = SubvolumeTypes.TYPE_NORMAL
try:
initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
'earmark': earmark,
'normalization': normalization,
'casesensitive': casesensitive,
+ 'enctag': enctag,
}
self.set_attrs(subvol_path, attrs)
self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, qpath)
self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, initial_state.value)
- def create(self, size, isolate_nspace, pool, mode, uid, gid, earmark, normalization, casesensitive):
+ def create(self, size, isolate_nspace, pool, mode, uid, gid, earmark, normalization, casesensitive, enctag):
subvolume_type = SubvolumeTypes.TYPE_NORMAL
try:
initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
'earmark': earmark,
'normalization': normalization,
'casesensitive': casesensitive,
+ 'enctag': enctag,
}
self.set_attrs(subvol_path, attrs)
import cephfs
from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException
+from ceph.fs.enctag import CephFSVolumeEncryptionTag, EncryptionTagException
from mgr_util import CephfsClient
earmark = kwargs['earmark'] or '' # if not set, default to empty string --> no earmark
normalization = kwargs['normalization']
casesensitive = kwargs['casesensitive']
+ enctag = kwargs['enctag'] or '' # if not set, default to empty string
oct_mode = octal_str_to_decimal_int(mode)
try:
create_subvol(
- self.mgr, fs_handle, self.volspec, group, subvolname, size, isolate_nspace, pool, oct_mode, uid, gid, earmark, normalization, casesensitive)
+ self.mgr, fs_handle, self.volspec, group, subvolname, size, isolate_nspace, pool, oct_mode, uid, gid, earmark, normalization, casesensitive, enctag)
except VolumeException as ve:
# kick the purge threads for async removal -- note that this
# assumes that the subvolume is moved to trashcan for cleanup on error.
earmark = kwargs['earmark'] or '' # if not set, default to empty string --> no earmark
normalization = kwargs['normalization']
casesensitive = kwargs['casesensitive']
+ enctag = kwargs['enctag'] or '' # if not set, default to empty string --> no encryption tag
try:
with open_volume(self, volname) as fs_handle:
'earmark': earmark,
'normalization': normalization,
'casesensitive': casesensitive,
+ 'enctag': enctag,
}
subvolume.set_attrs(subvolume.path, attrs)
except VolumeException as ve:
ret = ee.to_tuple() # type: ignore
return ret
+ def get_enctag(self, **kwargs) -> Tuple[int, Optional[str], str]:
+ ret: Tuple[int, Optional[str], str] = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.ENCTAG_GET) as subvolume:
+ log.info("Getting enctag for subvolume %s", subvolume.path)
+ fs_enctag = CephFSVolumeEncryptionTag(fs_handle, subvolume.path)
+ enctag = fs_enctag.get_tag()
+ ret = 0, enctag, ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ except EncryptionTagException as ee:
+ log.error(f"EncryptionTag error occurred: {ee}")
+ ret = ee.to_tuple()
+ return ret
+
+ def set_enctag(self, **kwargs): # type: ignore
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ enctag = kwargs['enctag']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.ENCTAG_SET) as subvolume:
+ log.info("Setting enctag %s for subvolume %s", enctag, subvolume.path)
+ fs_enctag = CephFSVolumeEncryptionTag(fs_handle, subvolume.path)
+ fs_enctag.set_tag(enctag)
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ except EncryptionTagException as ee:
+ log.error(f"EncryptionTag error occurred: {ee}")
+ ret = ee.to_tuple() # type: ignore
+ return ret
+
+ def clear_enctag(self, **kwargs): # type: ignore
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.ENCTAG_CLEAR) as subvolume:
+ log.info("Removing enctag for subvolume %s", subvolume.path)
+ fs_enctag = CephFSVolumeEncryptionTag(fs_handle, subvolume.path)
+ fs_enctag.clear_tag()
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ except EncryptionTagException as ee:
+ log.error(f"EncryptionTag error occurred: {ee}")
+ ret = ee.to_tuple() # type: ignore
+ return ret
+
### subvolume snapshot
def create_subvolume_snapshot(self, **kwargs):
'name=namespace_isolated,type=CephBool,req=false '
'name=earmark,type=CephString,req=false '
'name=normalization,type=CephChoices,strings=nfd|nfc|nfkd|nfkc,req=false '
- 'name=casesensitive,type=CephBool,req=false ',
+ 'name=casesensitive,type=CephBool,req=false '
+ 'name=enctag,type=CephString,req=false ',
'desc': "Create a CephFS subvolume in a volume, and optionally, "
"with a specific size (in bytes), a specific data pool layout, "
"a specific mode, in a specific subvolume group and in separate "
'desc': "Remove earmark from a subvolume",
'perm': 'rw'
},
+ {
+ 'cmd': 'fs subvolume enctag get '
+ 'name=vol_name,type=CephString '
+ 'name=sub_name,type=CephString '
+ 'name=group_name,type=CephString,req=false ',
+ 'desc': "Get encryption tag for a subvolume",
+ 'perm': 'r'
+ },
+ {
+ 'cmd': 'fs subvolume enctag set '
+ 'name=vol_name,type=CephString '
+ 'name=sub_name,type=CephString '
+ 'name=group_name,type=CephString,req=false '
+ 'name=enctag,type=CephString ',
+ 'desc': "Set encryption tag for a subvolume",
+ 'perm': 'rw'
+ },
+ {
+ 'cmd': 'fs subvolume enctag rm '
+ 'name=vol_name,type=CephString '
+ 'name=sub_name,type=CephString '
+ 'name=group_name,type=CephString,req=false ',
+ 'desc': "Remove encryption tag from a subvolume",
+ 'perm': 'rw'
+ },
{
'cmd': 'fs quiesce '
'name=vol_name,type=CephString '
namespace_isolated=cmd.get('namespace_isolated', False),
earmark=cmd.get('earmark', None),
normalization=cmd.get('normalization', None),
- casesensitive=cmd.get('casesensitive', None))
+ casesensitive=cmd.get('casesensitive', None),
+ enctag=cmd.get('enctag', None))
@mgr_cmd_wrap
def _cmd_fs_subvolume_rm(self, inbuf, cmd):
sub_name=cmd['sub_name'],
group_name=cmd.get('group_name', None))
+ @mgr_cmd_wrap
+ def _cmd_fs_subvolume_enctag_get(self, inbuf, cmd):
+ return self.vc.get_enctag(vol_name=cmd['vol_name'],
+ sub_name=cmd['sub_name'],
+ group_name=cmd.get('group_name', None))
+
+ @mgr_cmd_wrap
+ def _cmd_fs_subvolume_enctag_set(self, inbuf, cmd):
+ return self.vc.set_enctag(vol_name=cmd['vol_name'],
+ sub_name=cmd['sub_name'],
+ group_name=cmd.get('group_name', None),
+ enctag=cmd['enctag'])
+
+ @mgr_cmd_wrap
+ def _cmd_fs_subvolume_enctag_rm(self, inbuf, cmd):
+ return self.vc.clear_enctag(vol_name=cmd['vol_name'],
+ sub_name=cmd['sub_name'],
+ group_name=cmd.get('group_name', None))
+
@mgr_cmd_wrap
def _cmd_fs_quiesce(self, inbuf, cmd):
return self.vc.quiesce(cmd)
--- /dev/null
+"""
+Module: CephFS Volume Encryption Tag
+
+This module provides the `CephFSVolumeEncryptionTag` class, which is designed to manage encryption tags
+of subvolumes within a CephFS filesystem. The encryption tag mechanism allows
+administrators to tag specific subvolumes with identifiers that indicate encryption information,
+such as a keyid or other itentifier tags.
+
+Key Features:
+- **Set Encryption Tag**: Assigns an tag to a subvolume.
+- **Get Encryption Tag**: Retrieves the existing tag of a subvolume, if any.
+- **Remove Tag**: Removes the tag from a subvolume, making it available for reallocation.
+supported top-level scopes.
+"""
+
+import errno
+import enum
+import logging
+from typing import Optional, Tuple
+
+log = logging.getLogger(__name__)
+
+XATTR_SUBVOLUME_ENCTAG_NAME = 'user.ceph.subvolume.enctag'
+
+
+class EncryptionTagException(Exception):
+ def __init__(self, error_code: int, error_message: str) -> None:
+ self.errno = error_code
+ self.error_str = error_message
+
+ def to_tuple(self) -> Tuple[int, Optional[str], str]:
+ return self.errno, "", self.error_str
+
+ def __str__(self) -> str:
+ return f"{self.errno} ({self.error_str})"
+
+
+class CephFSVolumeEncryptionTag:
+ def __init__(self, fs, path: str) -> None:
+ self.fs = fs
+ self.path = path
+
+ def _handle_cephfs_error(self, e: Exception, action: str) -> None:
+ if isinstance(e, ValueError):
+ raise EncryptionTagException(errno.EINVAL, f"Invalid encryption tag specified: {e}") from e
+ elif isinstance(e, OSError):
+ log.error(f"Error {action} encryption tag: {e}")
+ raise EncryptionTagException(-e.errno, e.strerror) from e
+ else:
+ log.error(f"Unexpected error {action} encryption tag: {e}")
+ raise EncryptionTagException(errno.EIO, "Unexpected error") from e
+
+ def get_tag(self) -> Optional[str]:
+ try:
+ enc_tag_value = (
+ self.fs.getxattr(self.path, XATTR_SUBVOLUME_ENCTAG_NAME)
+ .decode('utf-8')
+ )
+ return enc_tag_value
+ except Exception as e:
+ self._handle_cephfs_error(e, "getting")
+ return None
+
+ def set_tag(self, enc_tag: str):
+ try:
+ self.fs.setxattr(self.path, XATTR_SUBVOLUME_ENCTAG_NAME, enc_tag.encode('utf-8'), 0)
+ log.info(f"Encryption Tag '{enc_tag}' set on {self.path}.")
+ except Exception as e:
+ self._handle_cephfs_error(e, "setting")
+
+ def clear_tag(self) -> None:
+ self.set_tag("")