import os
+from ceph.fs.earmarking import (
+ CephFSVolumeEarmarking,
+ EarmarkParseError,
+ EarmarkTopScope,
+ EarmarkException
+)
+
if 'UNITTEST' in os.environ:
import tests # noqa
return fs_list
+class CephFSEarmarkResolver:
+ def __init__(self, mgr: Module_T, *, client: Optional[CephfsClient] = None) -> None:
+ self._mgr = mgr
+ self._cephfs_client = client or CephfsClient(mgr)
+
+ def _extract_path_component(self, path: str, index: int) -> Optional[str]:
+ """
+ Extracts a specific component from the path based on the given index.
+
+ :param path: The path in the format '/volumes/{subvolumegroup}/{subvolume}/..'
+ :param index: The index of the component to extract (1 for subvolumegroup, 2 for subvolume)
+ :return: The component at the specified index
+ """
+ parts = path.strip('/').split('/')
+ if len(parts) >= 3 and parts[0] == "volumes":
+ return parts[index]
+ return None
+
+ def _fetch_subvolumegroup_from_path(self, path: str) -> Optional[str]:
+ """
+ Extracts and returns the subvolume group name from the given path.
+
+ :param path: The path in the format '/volumes/{subvolumegroup}/{subvolume}/..'
+ :return: The subvolume group name
+ """
+ return self._extract_path_component(path, 1)
+
+ def _fetch_subvolume_from_path(self, path: str) -> Optional[str]:
+ """
+ Extracts and returns the subvolume name from the given path.
+
+ :param path: The path in the format '/volumes/{subvolumegroup}/{subvolume}/..'
+ :return: The subvolume name
+ """
+ return self._extract_path_component(path, 2)
+
+ def _manage_earmark(self, path: str, volume: str, operation: str, earmark: Optional[str] = None) -> Optional[str]:
+ """
+ Manages (get or set) the earmark for a subvolume based on the provided parameters.
+
+ :param path: The path of the subvolume
+ :param volume: The volume name
+ :param earmark: The earmark to set (None if only getting the earmark)
+ :return: The earmark if getting, otherwise None
+ """
+ with open_filesystem(self._cephfs_client, volume) as fs:
+ earmark_manager = CephFSVolumeEarmarking(fs, path)
+ try:
+ if operation == 'set' and earmark is not None:
+ earmark_manager.set_earmark(earmark)
+ return None
+ elif operation == 'get':
+ return earmark_manager.get_earmark()
+ except EarmarkException as e:
+ logger.error(f"Failed to manage earmark: {e}")
+ return None
+ return None
+
+ def get_earmark(self, path: str, volume: str) -> Optional[str]:
+ """
+ Get earmark for a subvolume.
+ """
+ return self._manage_earmark(path, volume, 'get')
+
+ def set_earmark(self, path: str, volume: str, earmark: str) -> None:
+ """
+ Set earmark for a subvolume.
+ """
+ self._manage_earmark(path, volume, 'set', earmark)
+
+ def check_earmark(self, earmark: str, top_level_scope: EarmarkTopScope) -> bool:
+ """
+ Check if the earmark belongs to the mentioned top level scope.
+
+ :param earmark: The earmark string to check.
+ :param top_level_scope: The expected top level scope.
+ :return: True if the earmark matches the top level scope, False otherwise.
+ """
+ try:
+ parsed = CephFSVolumeEarmarking.parse_earmark(earmark)
+ if parsed is None:
+ return False
+ return parsed.top == top_level_scope
+ except EarmarkParseError:
+ return False
+
+
@contextlib.contextmanager
def open_filesystem(fsc: CephfsClient, fs_name: str) -> Generator["cephfs.LibCephFS", None, None]:
"""
import time
from ceph.deployment.service_spec import SMBSpec
+from ceph.fs.earmarking import EarmarkTopScope
from . import config_store, external, resources
from .enums import (
AccessAuthorizer,
ConfigEntry,
ConfigStore,
+ EarmarkResolver,
EntryKey,
OrchSubmitter,
PathResolver,
resolve_exists = resolve
+class _FakeEarmarkResolver:
+ """A stub EarmarkResolver for unit testing."""
+
+ def __init__(self) -> None:
+ self._earmarks: Dict[Tuple[str, str], str] = {}
+
+ def get_earmark(self, path: str, volume: str) -> Optional[str]:
+ return None
+
+ def set_earmark(self, path: str, volume: str, earmark: str) -> None:
+ pass
+
+ def check_earmark(self, earmark: str, top_level_scope: str) -> bool:
+ return True
+
+
class _FakeAuthorizer:
"""A stub AccessAuthorizer for unit testing."""
path_resolver: Optional[PathResolver] = None,
authorizer: Optional[AccessAuthorizer] = None,
orch: Optional[OrchSubmitter] = None,
+ earmark_resolver: Optional[EarmarkResolver] = None,
) -> None:
self.internal_store = internal_store
self.public_store = public_store
authorizer = _FakeAuthorizer()
self._authorizer: AccessAuthorizer = authorizer
self._orch = orch # if None, disables updating the spec via orch
+ if earmark_resolver is None:
+ earmark_resolver = cast(EarmarkResolver, _FakeEarmarkResolver())
+ self._earmark_resolver = earmark_resolver
log.info(
'Initialized new ClusterConfigHandler with'
f' internal store {self.internal_store!r},'
f' priv store {self.priv_store!r},'
f' path resolver {self._path_resolver!r},'
f' authorizer {self._authorizer!r},'
- f' orch {self._orch!r}'
+ f' orch {self._orch!r},'
+ f' earmark resolver {self._earmark_resolver!r}'
)
def apply(
elif isinstance(
resource, (resources.Share, resources.RemovedShare)
):
- _check_share(resource, staging, self._path_resolver)
+ _check_share(
+ resource,
+ staging,
+ self._path_resolver,
+ self._earmark_resolver,
+ )
elif isinstance(resource, resources.JoinAuth):
_check_join_auths(resource, staging)
elif isinstance(resource, resources.UsersAndGroups):
def _check_share(
- share: ShareRef, staging: _Staging, resolver: PathResolver
+ share: ShareRef,
+ staging: _Staging,
+ resolver: PathResolver,
+ earmark_resolver: EarmarkResolver,
) -> None:
"""Check that the share resource can be updated."""
if share.intent == Intent.REMOVED:
)
assert share.cephfs is not None
try:
- resolver.resolve_exists(
+ volpath = resolver.resolve_exists(
share.cephfs.volume,
share.cephfs.subvolumegroup,
share.cephfs.subvolume,
raise ErrorResult(
share, msg="path is not a valid directory in volume"
)
+ if earmark_resolver:
+ earmark = earmark_resolver.get_earmark(
+ volpath,
+ share.cephfs.volume,
+ )
+ if not earmark:
+ smb_earmark = (
+ f"{EarmarkTopScope.SMB.value}.cluster.{share.cluster_id}"
+ )
+ earmark_resolver.set_earmark(
+ volpath,
+ share.cephfs.volume,
+ smb_earmark,
+ )
+ else:
+ if not earmark_resolver.check_earmark(
+ earmark, EarmarkTopScope.SMB
+ ):
+ raise ErrorResult(
+ share,
+ msg=f"earmark has already been set by {earmark.split('.')[0]}",
+ )
+ # Check if earmark is set by same cluster
+ if earmark.split('.')[2] != share.cluster_id:
+ raise ErrorResult(
+ share,
+ msg=f"earmark has already been set by smb cluster {earmark.split('.')[2]}",
+ )
name_used_by = _share_name_in_use(staging, share)
if name_used_by:
raise ErrorResult(
import orchestrator
from ceph.deployment.service_spec import PlacementSpec, SMBSpec
from mgr_module import MgrModule, Option, OptionLevel
+from mgr_util import CephFSEarmarkResolver
from . import (
cli,
path_resolver = kwargs.pop('path_resolver', None)
authorizer = kwargs.pop('authorizer', None)
uo = kwargs.pop('update_orchestration', None)
+ earmark_resolver = kwargs.pop('earmark_resolver', None)
super().__init__(*args, **kwargs)
if internal_store is not None:
self._internal_store = internal_store
public_store or rados_store.RADOSConfigStore.init(self)
)
path_resolver = path_resolver or fs.CachingCephFSPathResolver(self)
+ earmark_resolver = earmark_resolver or CephFSEarmarkResolver(self)
# Why the honk is the cast needed but path_resolver doesn't need it??
# Sometimes mypy drives me batty.
authorizer = cast(
path_resolver=path_resolver,
authorizer=authorizer,
orch=self._orch_backend(enable_orch=uo),
+ earmark_resolver=earmark_resolver,
)
def _backend_store(self, store_conf: str = '') -> ConfigStore:
import sys
from ceph.deployment.service_spec import SMBSpec
+from ceph.fs.earmarking import EarmarkTopScope
# this uses a version check as opposed to a try/except because this
# form makes mypy happy and try/except doesn't.
self, volume: str, entity: str, caps: str = ''
) -> None:
... # pragma: no cover
+
+
+class EarmarkResolver(Protocol):
+ """A protocol for a type that can resolve earmarks for subvolumes."""
+
+ def get_earmark(self, path: str, volume: str) -> Optional[str]:
+ ... # pragma: no cover
+
+ def set_earmark(self, path: str, volume: str, earmark: str) -> None:
+ ... # pragma: no cover
+
+ def check_earmark(
+ self, earmark: str, top_level_scope: EarmarkTopScope
+ ) -> bool:
+ ... # pragma: no cover
import pytest
import smb
+from smb.handler import _FakeEarmarkResolver
def _cluster(**kwargs):
self.deployed.remove(service_name)
thandler._orch = FakeOrch()
+ thandler._earmark_resolver = _FakeEarmarkResolver()
test_apply_full_cluster_create(thandler)
to_apply = [
path_resolver=smb.handler._FakePathResolver(),
authorizer=smb.handler._FakeAuthorizer(),
update_orchestration=False,
+ earmark_resolver=smb.handler._FakeEarmarkResolver(),
)
import datetime
+from unittest.mock import MagicMock, patch
import mgr_util
import pytest
)
def test_pretty_timedelta(delta: datetime.timedelta, out: str):
assert mgr_util.to_pretty_timedelta(delta) == out
+
+
+class TestCephFsEarmarkResolver:
+
+ @pytest.fixture
+ def mock_mgr(self):
+ return MagicMock()
+
+ @pytest.fixture
+ def mock_cephfs_client(self):
+ return MagicMock()
+
+ @pytest.fixture
+ def resolver(self, mock_mgr, mock_cephfs_client):
+ return mgr_util.CephFSEarmarkResolver(mgr=mock_mgr, client=mock_cephfs_client)
+
+ @patch('mgr_util.open_filesystem')
+ def test_get_earmark(self, mock_open_filesystem, resolver):
+ path = "/volumes/group1/subvol1"
+
+ mock_fs_handle = MagicMock()
+ mock_open_filesystem.return_value.__enter__.return_value = mock_fs_handle
+ mock_open_filesystem.return_value.__exit__.return_value = False
+
+ mock_earmarking = MagicMock()
+ mock_earmarking.get_earmark.return_value = "smb.test"
+ with patch('mgr_util.CephFSVolumeEarmarking', return_value=mock_earmarking):
+ result = resolver.get_earmark(path, "test_volume")
+
+ assert result == "smb.test"
+
+ @patch('mgr_util.open_filesystem')
+ def test_set_earmark(self, mock_open_filesystem, resolver):
+ path = "/volumes/group1/subvol1"
+
+ mock_fs_handle = MagicMock()
+ mock_open_filesystem.return_value.__enter__.return_value = mock_fs_handle
+ mock_open_filesystem.return_value.__exit__.return_value = False
+
+ mock_earmarking = MagicMock()
+ mock_open_filesystem.return_value.__enter__.return_value = mock_fs_handle
+ with patch('mgr_util.CephFSVolumeEarmarking', return_value=mock_earmarking):
+ resolver.set_earmark(path, "test_volume", "smb.test2")
+
+ mock_earmarking.set_earmark.assert_called_once_with("smb.test2")
+
+ @patch('mgr_util.CephFSVolumeEarmarking.parse_earmark')
+ def test_check_earmark(self, mock_parse_earmark, resolver):
+ # Test that an earmark with the 'smb' top-level scope is correctly identified
+ mock_parse_earmark.return_value = MagicMock(top=mgr_util.EarmarkTopScope.SMB)
+ result = resolver.check_earmark("smb.cluster.cluster1", mgr_util.EarmarkTopScope.SMB)
+ assert result is True
+
+ # Test with a different top-level scope, should return False
+ mock_parse_earmark.return_value = MagicMock(top=mgr_util.EarmarkTopScope.SMB)
+ result = resolver.check_earmark("smb.cluster.cluster1", mgr_util.EarmarkTopScope.NFS)
+ assert result is False
+
+ # Test with an invalid earmark (parse_earmark returns None), should return False
+ mock_parse_earmark.return_value = None
+ result = resolver.check_earmark("invalid.test", mgr_util.EarmarkTopScope.SMB)
+ assert result is False
+
+ # Test with an exception raised by parse_earmark, should return False
+ mock_parse_earmark.side_effect = mgr_util.EarmarkParseError
+ result = resolver.check_earmark("error.test", mgr_util.EarmarkTopScope.SMB)
+ assert result is False
from .auth_metadata import AuthMetadataManager
from .subvolume_attrs import SubvolumeStates
-from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException # type: ignore
+from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException
log = logging.getLogger(__name__)
import cephfs
-from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException # type: ignore
+from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException
from mgr_util import CephfsClient
import errno
import enum
import logging
-from typing import Optional, Tuple
+from typing import List, NamedTuple, Optional, Tuple
log = logging.getLogger(__name__)
return f"{self.errno} ({self.error_str})"
+class EarmarkContents(NamedTuple):
+ top: 'EarmarkTopScope'
+ subsections: List[str]
+
+
+class EarmarkParseError(ValueError):
+ pass
+
+
class CephFSVolumeEarmarking:
def __init__(self, fs, path: str) -> None:
self.fs = fs
raise EarmarkException(-e.errno, e.strerror) from e
else:
log.error(f"Unexpected error {action} earmark: {e}")
- raise EarmarkException(errno.EIO, "Unexpected error") from e
+ raise EarmarkException
+
+ @staticmethod
+ def parse_earmark(value: str) -> Optional[EarmarkContents]:
+ """
+ Parse an earmark value. Returns None if the value is an empty string.
+ Raises EarmarkParseError if the top-level scope is not valid or the earmark
+ string is not properly structured.
+ Returns an EarmarkContents for valid earmark values.
+
+ :param value: The earmark string to parse.
+ :return: An EarmarkContents instance if valid, None if empty.
+ """
+ if not value:
+ return None
+
+ parts = value.split('.')
+
+ # Check if the top-level scope is valid
+ if parts[0] not in (scope.value for scope in EarmarkTopScope):
+ raise EarmarkParseError(f"Invalid top-level scope: {parts[0]}")
+
+ # Check if all parts are non-empty to ensure valid dot-separated format
+ if not all(parts):
+ raise EarmarkParseError("Earmark contains empty sections.")
+
+ # Return parsed earmark with top scope and subsections
+ return EarmarkContents(top=EarmarkTopScope(parts[0]), subsections=parts[1:])
def _validate_earmark(self, earmark: str) -> bool:
"""
- Validates that the earmark string is either empty or composed of parts separated by scopes,
- with the top-level scope being either 'nfs' or 'smb'.
+ Validates the earmark string further by checking specific conditions for scopes like 'smb'.
:param earmark: The earmark string to validate.
:return: True if valid, False otherwise.
"""
- if not earmark or earmark in (scope.value for scope in EarmarkTopScope):
- return True
+ try:
+ parsed = self.parse_earmark(earmark)
+ except EarmarkParseError:
+ return False
- parts = earmark.split('.')
+ # If parsed is None, it's considered valid since the earmark is empty
+ if not parsed:
+ return True
- if parts[0] not in (scope.value for scope in EarmarkTopScope):
- return False
+ # Specific validation for 'smb' scope
+ if parsed.top == EarmarkTopScope.SMB:
+ # Valid formats: 'smb' or 'smb.cluster.{cluster_id}'
+ if not (len(parsed.subsections) == 0 or
+ (len(parsed.subsections) == 2 and
+ parsed.subsections[0] == 'cluster' and parsed.subsections[1])):
+ return False
- # Check if all parts are non-empty (to ensure valid dot-separated format)
- return all(parts)
+ return True
def get_earmark(self) -> Optional[str]:
try:
errno.EINVAL,
f"Invalid earmark specified: '{earmark}'. "
"A valid earmark should either be empty or start with 'nfs' or 'smb', "
- "followed by dot-separated non-empty components."
+ "followed by dot-separated non-empty components or simply set "
+ "'smb.cluster.{cluster_id}' for the smb intra-cluster scope."
)
try:
import errno
from unittest import mock
-from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException, EarmarkTopScope
-# Mock constants
+from ceph.fs.earmarking import (
+ CephFSVolumeEarmarking,
+ EarmarkException,
+ EarmarkParseError,
+ EarmarkTopScope
+)
+
XATTR_SUBVOLUME_EARMARK_NAME = 'user.ceph.subvolume.earmark'
def earmarking(self, mock_fs):
return CephFSVolumeEarmarking(mock_fs, "/test/path")
- def test_get_earmark_success(self, earmarking, mock_fs):
- mock_fs.getxattr.return_value = b"nfs"
- result = earmarking.get_earmark()
- assert result == "nfs"
- mock_fs.getxattr.assert_called_once_with("/test/path", XATTR_SUBVOLUME_EARMARK_NAME)
+ def test_parse_earmark_valid(self):
+ earmark_value = "nfs.subsection1.subsection2"
+ result = CephFSVolumeEarmarking.parse_earmark(earmark_value)
+ assert result.top == EarmarkTopScope.NFS
+ assert result.subsections == ["subsection1", "subsection2"]
- def test_get_earmark_no_earmark_set(self, earmarking, mock_fs):
- mock_fs.getxattr.return_value = b""
- result = earmarking.get_earmark()
+ def test_parse_earmark_empty_string(self):
+ result = CephFSVolumeEarmarking.parse_earmark("")
+ assert result is None
+
+ def test_parse_earmark_invalid_scope(self):
+ with pytest.raises(EarmarkParseError):
+ CephFSVolumeEarmarking.parse_earmark("invalid.scope")
+
+ def test_parse_earmark_empty_sections(self):
+ with pytest.raises(EarmarkParseError):
+ CephFSVolumeEarmarking.parse_earmark("nfs..section")
+
+ def test_validate_earmark_valid_empty(self, earmarking):
+ assert earmarking._validate_earmark("")
- assert result == ""
- mock_fs.getxattr.assert_called_once_with("/test/path", XATTR_SUBVOLUME_EARMARK_NAME)
+ def test_validate_earmark_valid_smb(self, earmarking):
+ assert earmarking._validate_earmark("smb.cluster.cluster_id")
- def test_get_earmark_error(self, earmarking, mock_fs):
- mock_fs.getxattr.side_effect = OSError(errno.EIO, "I/O error")
+ def test_validate_earmark_invalid_smb_format(self, earmarking):
+ assert not earmarking._validate_earmark("smb.invalid.format")
+ def test_get_earmark_success(self, earmarking):
+ earmarking.fs.getxattr.return_value = b'nfs.valid.earmark'
+ result = earmarking.get_earmark()
+ assert result == 'nfs.valid.earmark'
+
+ def test_get_earmark_handle_error(self, earmarking):
+ earmarking.fs.getxattr.side_effect = OSError(errno.EIO, "I/O error")
with pytest.raises(EarmarkException) as excinfo:
earmarking.get_earmark()
-
assert excinfo.value.errno == -errno.EIO
- assert "I/O error" in str(excinfo.value)
- # Ensure that the getxattr method was called exactly once
- mock_fs.getxattr.assert_called_once_with("/test/path", XATTR_SUBVOLUME_EARMARK_NAME)
-
- def test_set_earmark_success(self, earmarking, mock_fs):
- earmarking.set_earmark(EarmarkTopScope.NFS.value)
- mock_fs.setxattr.assert_called_once_with(
- "/test/path", XATTR_SUBVOLUME_EARMARK_NAME, b"nfs", 0
+ def test_set_earmark_valid(self, earmarking):
+ earmark = "nfs.valid.earmark"
+ earmarking.set_earmark(earmark)
+ earmarking.fs.setxattr.assert_called_with(
+ "/test/path", XATTR_SUBVOLUME_EARMARK_NAME, earmark.encode('utf-8'), 0
)
def test_set_earmark_invalid(self, earmarking):
with pytest.raises(EarmarkException) as excinfo:
- earmarking.set_earmark("invalid_scope")
-
+ earmarking.set_earmark("invalid.earmark")
assert excinfo.value.errno == errno.EINVAL
- assert "Invalid earmark specified" in str(excinfo.value)
-
- def test_set_earmark_error(self, earmarking, mock_fs):
- mock_fs.setxattr.side_effect = OSError(errno.EIO, "I/O error")
+ def test_set_earmark_handle_error(self, earmarking):
+ earmarking.fs.setxattr.side_effect = OSError(errno.EIO, "I/O error")
with pytest.raises(EarmarkException) as excinfo:
- earmarking.set_earmark(EarmarkTopScope.NFS.value)
-
+ earmarking.set_earmark("nfs.valid.earmark")
assert excinfo.value.errno == -errno.EIO
- assert "I/O error" in str(excinfo.value)
- mock_fs.setxattr.assert_called_once_with(
- "/test/path", XATTR_SUBVOLUME_EARMARK_NAME, b"nfs", 0
- )
-
- def test_clear_earmark_success(self, earmarking, mock_fs):
- earmarking.clear_earmark()
- mock_fs.setxattr.assert_called_once_with(
- "/test/path", XATTR_SUBVOLUME_EARMARK_NAME, b"", 0
- )
- def test_clear_earmark_error(self, earmarking, mock_fs):
- mock_fs.setxattr.side_effect = OSError(errno.EIO, "I/O error")
-
- with pytest.raises(EarmarkException) as excinfo:
+ def test_clear_earmark(self, earmarking):
+ with mock.patch.object(earmarking, 'set_earmark') as mock_set_earmark:
earmarking.clear_earmark()
-
- assert excinfo.value.errno == -errno.EIO
- assert "I/O error" in str(excinfo.value)
- mock_fs.setxattr.assert_called_once_with(
- "/test/path", XATTR_SUBVOLUME_EARMARK_NAME, b"", 0
- )
+ mock_set_earmark.assert_called_once_with("")