From bc168d7eda98d59295bcd749543d2a9747de77a8 Mon Sep 17 00:00:00 2001 From: Avan Thakkar Date: Thu, 22 Aug 2024 01:38:03 +0530 Subject: [PATCH] mgr/smb: earmark resolver for subvolume Signed-off-by: Avan Thakkar --- src/pybind/mgr/mgr_util.py | 94 ++++++++++++++++ src/pybind/mgr/smb/handler.py | 67 +++++++++++- src/pybind/mgr/smb/module.py | 4 + src/pybind/mgr/smb/proto.py | 16 +++ src/pybind/mgr/smb/tests/test_handler.py | 2 + src/pybind/mgr/smb/tests/test_smb.py | 1 + src/pybind/mgr/tests/test_mgr_util.py | 68 ++++++++++++ .../fs/operations/versions/subvolume_base.py | 2 +- src/pybind/mgr/volumes/fs/volume.py | 2 +- src/python-common/ceph/fs/earmarking.py | 68 +++++++++--- .../ceph/tests/test_earmarking.py | 100 +++++++++--------- 11 files changed, 355 insertions(+), 69 deletions(-) diff --git a/src/pybind/mgr/mgr_util.py b/src/pybind/mgr/mgr_util.py index 3497fcdb65598..67246545eea0f 100644 --- a/src/pybind/mgr/mgr_util.py +++ b/src/pybind/mgr/mgr_util.py @@ -1,5 +1,12 @@ import os +from ceph.fs.earmarking import ( + CephFSVolumeEarmarking, + EarmarkParseError, + EarmarkTopScope, + EarmarkException +) + if 'UNITTEST' in os.environ: import tests # noqa @@ -335,6 +342,93 @@ class CephfsClient(Generic[Module_T]): return fs_list +class CephFSEarmarkResolver: + def __init__(self, mgr: Module_T, *, client: Optional[CephfsClient] = None) -> None: + self._mgr = mgr + self._cephfs_client = client or CephfsClient(mgr) + + def _extract_path_component(self, path: str, index: int) -> Optional[str]: + """ + Extracts a specific component from the path based on the given index. + + :param path: The path in the format '/volumes/{subvolumegroup}/{subvolume}/..' + :param index: The index of the component to extract (1 for subvolumegroup, 2 for subvolume) + :return: The component at the specified index + """ + parts = path.strip('/').split('/') + if len(parts) >= 3 and parts[0] == "volumes": + return parts[index] + return None + + def _fetch_subvolumegroup_from_path(self, path: str) -> Optional[str]: + """ + Extracts and returns the subvolume group name from the given path. + + :param path: The path in the format '/volumes/{subvolumegroup}/{subvolume}/..' + :return: The subvolume group name + """ + return self._extract_path_component(path, 1) + + def _fetch_subvolume_from_path(self, path: str) -> Optional[str]: + """ + Extracts and returns the subvolume name from the given path. + + :param path: The path in the format '/volumes/{subvolumegroup}/{subvolume}/..' + :return: The subvolume name + """ + return self._extract_path_component(path, 2) + + def _manage_earmark(self, path: str, volume: str, operation: str, earmark: Optional[str] = None) -> Optional[str]: + """ + Manages (get or set) the earmark for a subvolume based on the provided parameters. + + :param path: The path of the subvolume + :param volume: The volume name + :param earmark: The earmark to set (None if only getting the earmark) + :return: The earmark if getting, otherwise None + """ + with open_filesystem(self._cephfs_client, volume) as fs: + earmark_manager = CephFSVolumeEarmarking(fs, path) + try: + if operation == 'set' and earmark is not None: + earmark_manager.set_earmark(earmark) + return None + elif operation == 'get': + return earmark_manager.get_earmark() + except EarmarkException as e: + logger.error(f"Failed to manage earmark: {e}") + return None + return None + + def get_earmark(self, path: str, volume: str) -> Optional[str]: + """ + Get earmark for a subvolume. + """ + return self._manage_earmark(path, volume, 'get') + + def set_earmark(self, path: str, volume: str, earmark: str) -> None: + """ + Set earmark for a subvolume. + """ + self._manage_earmark(path, volume, 'set', earmark) + + def check_earmark(self, earmark: str, top_level_scope: EarmarkTopScope) -> bool: + """ + Check if the earmark belongs to the mentioned top level scope. + + :param earmark: The earmark string to check. + :param top_level_scope: The expected top level scope. + :return: True if the earmark matches the top level scope, False otherwise. + """ + try: + parsed = CephFSVolumeEarmarking.parse_earmark(earmark) + if parsed is None: + return False + return parsed.top == top_level_scope + except EarmarkParseError: + return False + + @contextlib.contextmanager def open_filesystem(fsc: CephfsClient, fs_name: str) -> Generator["cephfs.LibCephFS", None, None]: """ diff --git a/src/pybind/mgr/smb/handler.py b/src/pybind/mgr/smb/handler.py index fc93a7277cb0b..670cb15a58766 100644 --- a/src/pybind/mgr/smb/handler.py +++ b/src/pybind/mgr/smb/handler.py @@ -18,6 +18,7 @@ import operator import time from ceph.deployment.service_spec import SMBSpec +from ceph.fs.earmarking import EarmarkTopScope from . import config_store, external, resources from .enums import ( @@ -43,6 +44,7 @@ from .proto import ( AccessAuthorizer, ConfigEntry, ConfigStore, + EarmarkResolver, EntryKey, OrchSubmitter, PathResolver, @@ -112,6 +114,22 @@ class _FakePathResolver: resolve_exists = resolve +class _FakeEarmarkResolver: + """A stub EarmarkResolver for unit testing.""" + + def __init__(self) -> None: + self._earmarks: Dict[Tuple[str, str], str] = {} + + def get_earmark(self, path: str, volume: str) -> Optional[str]: + return None + + def set_earmark(self, path: str, volume: str, earmark: str) -> None: + pass + + def check_earmark(self, earmark: str, top_level_scope: str) -> bool: + return True + + class _FakeAuthorizer: """A stub AccessAuthorizer for unit testing.""" @@ -325,6 +343,7 @@ class ClusterConfigHandler: path_resolver: Optional[PathResolver] = None, authorizer: Optional[AccessAuthorizer] = None, orch: Optional[OrchSubmitter] = None, + earmark_resolver: Optional[EarmarkResolver] = None, ) -> None: self.internal_store = internal_store self.public_store = public_store @@ -336,6 +355,9 @@ class ClusterConfigHandler: authorizer = _FakeAuthorizer() self._authorizer: AccessAuthorizer = authorizer self._orch = orch # if None, disables updating the spec via orch + if earmark_resolver is None: + earmark_resolver = cast(EarmarkResolver, _FakeEarmarkResolver()) + self._earmark_resolver = earmark_resolver log.info( 'Initialized new ClusterConfigHandler with' f' internal store {self.internal_store!r},' @@ -343,7 +365,8 @@ class ClusterConfigHandler: f' priv store {self.priv_store!r},' f' path resolver {self._path_resolver!r},' f' authorizer {self._authorizer!r},' - f' orch {self._orch!r}' + f' orch {self._orch!r},' + f' earmark resolver {self._earmark_resolver!r}' ) def apply( @@ -474,7 +497,12 @@ class ClusterConfigHandler: elif isinstance( resource, (resources.Share, resources.RemovedShare) ): - _check_share(resource, staging, self._path_resolver) + _check_share( + resource, + staging, + self._path_resolver, + self._earmark_resolver, + ) elif isinstance(resource, resources.JoinAuth): _check_join_auths(resource, staging) elif isinstance(resource, resources.UsersAndGroups): @@ -807,7 +835,10 @@ def _check_cluster(cluster: ClusterRef, staging: _Staging) -> None: def _check_share( - share: ShareRef, staging: _Staging, resolver: PathResolver + share: ShareRef, + staging: _Staging, + resolver: PathResolver, + earmark_resolver: EarmarkResolver, ) -> None: """Check that the share resource can be updated.""" if share.intent == Intent.REMOVED: @@ -822,7 +853,7 @@ def _check_share( ) assert share.cephfs is not None try: - resolver.resolve_exists( + volpath = resolver.resolve_exists( share.cephfs.volume, share.cephfs.subvolumegroup, share.cephfs.subvolume, @@ -832,6 +863,34 @@ def _check_share( raise ErrorResult( share, msg="path is not a valid directory in volume" ) + if earmark_resolver: + earmark = earmark_resolver.get_earmark( + volpath, + share.cephfs.volume, + ) + if not earmark: + smb_earmark = ( + f"{EarmarkTopScope.SMB.value}.cluster.{share.cluster_id}" + ) + earmark_resolver.set_earmark( + volpath, + share.cephfs.volume, + smb_earmark, + ) + else: + if not earmark_resolver.check_earmark( + earmark, EarmarkTopScope.SMB + ): + raise ErrorResult( + share, + msg=f"earmark has already been set by {earmark.split('.')[0]}", + ) + # Check if earmark is set by same cluster + if earmark.split('.')[2] != share.cluster_id: + raise ErrorResult( + share, + msg=f"earmark has already been set by smb cluster {earmark.split('.')[2]}", + ) name_used_by = _share_name_in_use(staging, share) if name_used_by: raise ErrorResult( diff --git a/src/pybind/mgr/smb/module.py b/src/pybind/mgr/smb/module.py index 1e71721202e80..7483eb7964b4a 100644 --- a/src/pybind/mgr/smb/module.py +++ b/src/pybind/mgr/smb/module.py @@ -5,6 +5,7 @@ import logging import orchestrator from ceph.deployment.service_spec import PlacementSpec, SMBSpec from mgr_module import MgrModule, Option, OptionLevel +from mgr_util import CephFSEarmarkResolver from . import ( cli, @@ -55,6 +56,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): path_resolver = kwargs.pop('path_resolver', None) authorizer = kwargs.pop('authorizer', None) uo = kwargs.pop('update_orchestration', None) + earmark_resolver = kwargs.pop('earmark_resolver', None) super().__init__(*args, **kwargs) if internal_store is not None: self._internal_store = internal_store @@ -69,6 +71,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): public_store or rados_store.RADOSConfigStore.init(self) ) path_resolver = path_resolver or fs.CachingCephFSPathResolver(self) + earmark_resolver = earmark_resolver or CephFSEarmarkResolver(self) # Why the honk is the cast needed but path_resolver doesn't need it?? # Sometimes mypy drives me batty. authorizer = cast( @@ -81,6 +84,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): path_resolver=path_resolver, authorizer=authorizer, orch=self._orch_backend(enable_orch=uo), + earmark_resolver=earmark_resolver, ) def _backend_store(self, store_conf: str = '') -> ConfigStore: diff --git a/src/pybind/mgr/smb/proto.py b/src/pybind/mgr/smb/proto.py index 858975f7390db..847869a3cbe84 100644 --- a/src/pybind/mgr/smb/proto.py +++ b/src/pybind/mgr/smb/proto.py @@ -15,6 +15,7 @@ from typing import ( import sys from ceph.deployment.service_spec import SMBSpec +from ceph.fs.earmarking import EarmarkTopScope # this uses a version check as opposed to a try/except because this # form makes mypy happy and try/except doesn't. @@ -185,3 +186,18 @@ class AccessAuthorizer(Protocol): self, volume: str, entity: str, caps: str = '' ) -> None: ... # pragma: no cover + + +class EarmarkResolver(Protocol): + """A protocol for a type that can resolve earmarks for subvolumes.""" + + def get_earmark(self, path: str, volume: str) -> Optional[str]: + ... # pragma: no cover + + def set_earmark(self, path: str, volume: str, earmark: str) -> None: + ... # pragma: no cover + + def check_earmark( + self, earmark: str, top_level_scope: EarmarkTopScope + ) -> bool: + ... # pragma: no cover diff --git a/src/pybind/mgr/smb/tests/test_handler.py b/src/pybind/mgr/smb/tests/test_handler.py index ec9b6669d13a6..bd9125c2d7be8 100644 --- a/src/pybind/mgr/smb/tests/test_handler.py +++ b/src/pybind/mgr/smb/tests/test_handler.py @@ -1,6 +1,7 @@ import pytest import smb +from smb.handler import _FakeEarmarkResolver def _cluster(**kwargs): @@ -880,6 +881,7 @@ def test_apply_remove_all_clusters(thandler): self.deployed.remove(service_name) thandler._orch = FakeOrch() + thandler._earmark_resolver = _FakeEarmarkResolver() test_apply_full_cluster_create(thandler) to_apply = [ diff --git a/src/pybind/mgr/smb/tests/test_smb.py b/src/pybind/mgr/smb/tests/test_smb.py index 86a2310a4de98..c9fd02968b904 100644 --- a/src/pybind/mgr/smb/tests/test_smb.py +++ b/src/pybind/mgr/smb/tests/test_smb.py @@ -26,6 +26,7 @@ def tmodule(): path_resolver=smb.handler._FakePathResolver(), authorizer=smb.handler._FakeAuthorizer(), update_orchestration=False, + earmark_resolver=smb.handler._FakeEarmarkResolver(), ) diff --git a/src/pybind/mgr/tests/test_mgr_util.py b/src/pybind/mgr/tests/test_mgr_util.py index fb7732d5cc801..b9307ccca4270 100644 --- a/src/pybind/mgr/tests/test_mgr_util.py +++ b/src/pybind/mgr/tests/test_mgr_util.py @@ -1,4 +1,5 @@ import datetime +from unittest.mock import MagicMock, patch import mgr_util import pytest @@ -17,3 +18,70 @@ import pytest ) def test_pretty_timedelta(delta: datetime.timedelta, out: str): assert mgr_util.to_pretty_timedelta(delta) == out + + +class TestCephFsEarmarkResolver: + + @pytest.fixture + def mock_mgr(self): + return MagicMock() + + @pytest.fixture + def mock_cephfs_client(self): + return MagicMock() + + @pytest.fixture + def resolver(self, mock_mgr, mock_cephfs_client): + return mgr_util.CephFSEarmarkResolver(mgr=mock_mgr, client=mock_cephfs_client) + + @patch('mgr_util.open_filesystem') + def test_get_earmark(self, mock_open_filesystem, resolver): + path = "/volumes/group1/subvol1" + + mock_fs_handle = MagicMock() + mock_open_filesystem.return_value.__enter__.return_value = mock_fs_handle + mock_open_filesystem.return_value.__exit__.return_value = False + + mock_earmarking = MagicMock() + mock_earmarking.get_earmark.return_value = "smb.test" + with patch('mgr_util.CephFSVolumeEarmarking', return_value=mock_earmarking): + result = resolver.get_earmark(path, "test_volume") + + assert result == "smb.test" + + @patch('mgr_util.open_filesystem') + def test_set_earmark(self, mock_open_filesystem, resolver): + path = "/volumes/group1/subvol1" + + mock_fs_handle = MagicMock() + mock_open_filesystem.return_value.__enter__.return_value = mock_fs_handle + mock_open_filesystem.return_value.__exit__.return_value = False + + mock_earmarking = MagicMock() + mock_open_filesystem.return_value.__enter__.return_value = mock_fs_handle + with patch('mgr_util.CephFSVolumeEarmarking', return_value=mock_earmarking): + resolver.set_earmark(path, "test_volume", "smb.test2") + + mock_earmarking.set_earmark.assert_called_once_with("smb.test2") + + @patch('mgr_util.CephFSVolumeEarmarking.parse_earmark') + def test_check_earmark(self, mock_parse_earmark, resolver): + # Test that an earmark with the 'smb' top-level scope is correctly identified + mock_parse_earmark.return_value = MagicMock(top=mgr_util.EarmarkTopScope.SMB) + result = resolver.check_earmark("smb.cluster.cluster1", mgr_util.EarmarkTopScope.SMB) + assert result is True + + # Test with a different top-level scope, should return False + mock_parse_earmark.return_value = MagicMock(top=mgr_util.EarmarkTopScope.SMB) + result = resolver.check_earmark("smb.cluster.cluster1", mgr_util.EarmarkTopScope.NFS) + assert result is False + + # Test with an invalid earmark (parse_earmark returns None), should return False + mock_parse_earmark.return_value = None + result = resolver.check_earmark("invalid.test", mgr_util.EarmarkTopScope.SMB) + assert result is False + + # Test with an exception raised by parse_earmark, should return False + mock_parse_earmark.side_effect = mgr_util.EarmarkParseError + result = resolver.check_earmark("error.test", mgr_util.EarmarkTopScope.SMB) + assert result is False diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py index 05df31014170a..75382a1ca7eab 100644 --- a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py +++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py @@ -18,7 +18,7 @@ from ...exception import MetadataMgrException, VolumeException from .auth_metadata import AuthMetadataManager from .subvolume_attrs import SubvolumeStates -from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException # type: ignore +from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException log = logging.getLogger(__name__) diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index 43ca060607d03..9679e171e8d26 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -9,7 +9,7 @@ from urllib.parse import urlsplit, urlunsplit import cephfs -from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException # type: ignore +from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException from mgr_util import CephfsClient diff --git a/src/python-common/ceph/fs/earmarking.py b/src/python-common/ceph/fs/earmarking.py index 3d11da933397f..238f2d8755f13 100644 --- a/src/python-common/ceph/fs/earmarking.py +++ b/src/python-common/ceph/fs/earmarking.py @@ -19,7 +19,7 @@ supported top-level scopes. import errno import enum import logging -from typing import Optional, Tuple +from typing import List, NamedTuple, Optional, Tuple log = logging.getLogger(__name__) @@ -43,6 +43,15 @@ class EarmarkException(Exception): return f"{self.errno} ({self.error_str})" +class EarmarkContents(NamedTuple): + top: 'EarmarkTopScope' + subsections: List[str] + + +class EarmarkParseError(ValueError): + pass + + class CephFSVolumeEarmarking: def __init__(self, fs, path: str) -> None: self.fs = fs @@ -56,26 +65,60 @@ class CephFSVolumeEarmarking: raise EarmarkException(-e.errno, e.strerror) from e else: log.error(f"Unexpected error {action} earmark: {e}") - raise EarmarkException(errno.EIO, "Unexpected error") from e + raise EarmarkException + + @staticmethod + def parse_earmark(value: str) -> Optional[EarmarkContents]: + """ + Parse an earmark value. Returns None if the value is an empty string. + Raises EarmarkParseError if the top-level scope is not valid or the earmark + string is not properly structured. + Returns an EarmarkContents for valid earmark values. + + :param value: The earmark string to parse. + :return: An EarmarkContents instance if valid, None if empty. + """ + if not value: + return None + + parts = value.split('.') + + # Check if the top-level scope is valid + if parts[0] not in (scope.value for scope in EarmarkTopScope): + raise EarmarkParseError(f"Invalid top-level scope: {parts[0]}") + + # Check if all parts are non-empty to ensure valid dot-separated format + if not all(parts): + raise EarmarkParseError("Earmark contains empty sections.") + + # Return parsed earmark with top scope and subsections + return EarmarkContents(top=EarmarkTopScope(parts[0]), subsections=parts[1:]) def _validate_earmark(self, earmark: str) -> bool: """ - Validates that the earmark string is either empty or composed of parts separated by scopes, - with the top-level scope being either 'nfs' or 'smb'. + Validates the earmark string further by checking specific conditions for scopes like 'smb'. :param earmark: The earmark string to validate. :return: True if valid, False otherwise. """ - if not earmark or earmark in (scope.value for scope in EarmarkTopScope): - return True + try: + parsed = self.parse_earmark(earmark) + except EarmarkParseError: + return False - parts = earmark.split('.') + # If parsed is None, it's considered valid since the earmark is empty + if not parsed: + return True - if parts[0] not in (scope.value for scope in EarmarkTopScope): - return False + # Specific validation for 'smb' scope + if parsed.top == EarmarkTopScope.SMB: + # Valid formats: 'smb' or 'smb.cluster.{cluster_id}' + if not (len(parsed.subsections) == 0 or + (len(parsed.subsections) == 2 and + parsed.subsections[0] == 'cluster' and parsed.subsections[1])): + return False - # Check if all parts are non-empty (to ensure valid dot-separated format) - return all(parts) + return True def get_earmark(self) -> Optional[str]: try: @@ -95,7 +138,8 @@ class CephFSVolumeEarmarking: errno.EINVAL, f"Invalid earmark specified: '{earmark}'. " "A valid earmark should either be empty or start with 'nfs' or 'smb', " - "followed by dot-separated non-empty components." + "followed by dot-separated non-empty components or simply set " + "'smb.cluster.{cluster_id}' for the smb intra-cluster scope." ) try: diff --git a/src/python-common/ceph/tests/test_earmarking.py b/src/python-common/ceph/tests/test_earmarking.py index 2add5d620649b..28c54f0770c97 100644 --- a/src/python-common/ceph/tests/test_earmarking.py +++ b/src/python-common/ceph/tests/test_earmarking.py @@ -2,8 +2,13 @@ import pytest import errno from unittest import mock -from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException, EarmarkTopScope -# Mock constants +from ceph.fs.earmarking import ( + CephFSVolumeEarmarking, + EarmarkException, + EarmarkParseError, + EarmarkTopScope +) + XATTR_SUBVOLUME_EARMARK_NAME = 'user.ceph.subvolume.earmark' @@ -17,70 +22,63 @@ class TestCephFSVolumeEarmarking: def earmarking(self, mock_fs): return CephFSVolumeEarmarking(mock_fs, "/test/path") - def test_get_earmark_success(self, earmarking, mock_fs): - mock_fs.getxattr.return_value = b"nfs" - result = earmarking.get_earmark() - assert result == "nfs" - mock_fs.getxattr.assert_called_once_with("/test/path", XATTR_SUBVOLUME_EARMARK_NAME) + def test_parse_earmark_valid(self): + earmark_value = "nfs.subsection1.subsection2" + result = CephFSVolumeEarmarking.parse_earmark(earmark_value) + assert result.top == EarmarkTopScope.NFS + assert result.subsections == ["subsection1", "subsection2"] - def test_get_earmark_no_earmark_set(self, earmarking, mock_fs): - mock_fs.getxattr.return_value = b"" - result = earmarking.get_earmark() + def test_parse_earmark_empty_string(self): + result = CephFSVolumeEarmarking.parse_earmark("") + assert result is None + + def test_parse_earmark_invalid_scope(self): + with pytest.raises(EarmarkParseError): + CephFSVolumeEarmarking.parse_earmark("invalid.scope") + + def test_parse_earmark_empty_sections(self): + with pytest.raises(EarmarkParseError): + CephFSVolumeEarmarking.parse_earmark("nfs..section") + + def test_validate_earmark_valid_empty(self, earmarking): + assert earmarking._validate_earmark("") - assert result == "" - mock_fs.getxattr.assert_called_once_with("/test/path", XATTR_SUBVOLUME_EARMARK_NAME) + def test_validate_earmark_valid_smb(self, earmarking): + assert earmarking._validate_earmark("smb.cluster.cluster_id") - def test_get_earmark_error(self, earmarking, mock_fs): - mock_fs.getxattr.side_effect = OSError(errno.EIO, "I/O error") + def test_validate_earmark_invalid_smb_format(self, earmarking): + assert not earmarking._validate_earmark("smb.invalid.format") + def test_get_earmark_success(self, earmarking): + earmarking.fs.getxattr.return_value = b'nfs.valid.earmark' + result = earmarking.get_earmark() + assert result == 'nfs.valid.earmark' + + def test_get_earmark_handle_error(self, earmarking): + earmarking.fs.getxattr.side_effect = OSError(errno.EIO, "I/O error") with pytest.raises(EarmarkException) as excinfo: earmarking.get_earmark() - assert excinfo.value.errno == -errno.EIO - assert "I/O error" in str(excinfo.value) - # Ensure that the getxattr method was called exactly once - mock_fs.getxattr.assert_called_once_with("/test/path", XATTR_SUBVOLUME_EARMARK_NAME) - - def test_set_earmark_success(self, earmarking, mock_fs): - earmarking.set_earmark(EarmarkTopScope.NFS.value) - mock_fs.setxattr.assert_called_once_with( - "/test/path", XATTR_SUBVOLUME_EARMARK_NAME, b"nfs", 0 + def test_set_earmark_valid(self, earmarking): + earmark = "nfs.valid.earmark" + earmarking.set_earmark(earmark) + earmarking.fs.setxattr.assert_called_with( + "/test/path", XATTR_SUBVOLUME_EARMARK_NAME, earmark.encode('utf-8'), 0 ) def test_set_earmark_invalid(self, earmarking): with pytest.raises(EarmarkException) as excinfo: - earmarking.set_earmark("invalid_scope") - + earmarking.set_earmark("invalid.earmark") assert excinfo.value.errno == errno.EINVAL - assert "Invalid earmark specified" in str(excinfo.value) - - def test_set_earmark_error(self, earmarking, mock_fs): - mock_fs.setxattr.side_effect = OSError(errno.EIO, "I/O error") + def test_set_earmark_handle_error(self, earmarking): + earmarking.fs.setxattr.side_effect = OSError(errno.EIO, "I/O error") with pytest.raises(EarmarkException) as excinfo: - earmarking.set_earmark(EarmarkTopScope.NFS.value) - + earmarking.set_earmark("nfs.valid.earmark") assert excinfo.value.errno == -errno.EIO - assert "I/O error" in str(excinfo.value) - mock_fs.setxattr.assert_called_once_with( - "/test/path", XATTR_SUBVOLUME_EARMARK_NAME, b"nfs", 0 - ) - - def test_clear_earmark_success(self, earmarking, mock_fs): - earmarking.clear_earmark() - mock_fs.setxattr.assert_called_once_with( - "/test/path", XATTR_SUBVOLUME_EARMARK_NAME, b"", 0 - ) - def test_clear_earmark_error(self, earmarking, mock_fs): - mock_fs.setxattr.side_effect = OSError(errno.EIO, "I/O error") - - with pytest.raises(EarmarkException) as excinfo: + def test_clear_earmark(self, earmarking): + with mock.patch.object(earmarking, 'set_earmark') as mock_set_earmark: earmarking.clear_earmark() - - assert excinfo.value.errno == -errno.EIO - assert "I/O error" in str(excinfo.value) - mock_fs.setxattr.assert_called_once_with( - "/test/path", XATTR_SUBVOLUME_EARMARK_NAME, b"", 0 - ) + mock_set_earmark.assert_called_once_with("") -- 2.39.5