]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/smb: earmark resolver for subvolume
authorAvan Thakkar <athakkar@redhat.com>
Wed, 21 Aug 2024 20:08:03 +0000 (01:38 +0530)
committerAvan Thakkar <athakkar@redhat.com>
Tue, 24 Sep 2024 06:59:37 +0000 (12:29 +0530)
Signed-off-by: Avan Thakkar <athakkar@redhat.com>
src/pybind/mgr/mgr_util.py
src/pybind/mgr/smb/handler.py
src/pybind/mgr/smb/module.py
src/pybind/mgr/smb/proto.py
src/pybind/mgr/smb/tests/test_handler.py
src/pybind/mgr/smb/tests/test_smb.py
src/pybind/mgr/tests/test_mgr_util.py
src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py
src/pybind/mgr/volumes/fs/volume.py
src/python-common/ceph/fs/earmarking.py
src/python-common/ceph/tests/test_earmarking.py

index 3497fcdb6559897997d8908fb2b2e316cf7c2055..67246545eea0f8d4638d4e2455d0fc1bf45f4dcd 100644 (file)
@@ -1,5 +1,12 @@
 import os
 
+from ceph.fs.earmarking import (
+    CephFSVolumeEarmarking,
+    EarmarkParseError,
+    EarmarkTopScope,
+    EarmarkException
+)
+
 if 'UNITTEST' in os.environ:
     import tests  # noqa
 
@@ -335,6 +342,93 @@ class CephfsClient(Generic[Module_T]):
         return fs_list
 
 
+class CephFSEarmarkResolver:
+    def __init__(self, mgr: Module_T, *, client: Optional[CephfsClient] = None) -> None:
+        self._mgr = mgr
+        self._cephfs_client = client or CephfsClient(mgr)
+
+    def _extract_path_component(self, path: str, index: int) -> Optional[str]:
+        """
+        Extracts a specific component from the path based on the given index.
+
+        :param path: The path in the format '/volumes/{subvolumegroup}/{subvolume}/..'
+        :param index: The index of the component to extract (1 for subvolumegroup, 2 for subvolume)
+        :return: The component at the specified index
+        """
+        parts = path.strip('/').split('/')
+        if len(parts) >= 3 and parts[0] == "volumes":
+            return parts[index]
+        return None
+
+    def _fetch_subvolumegroup_from_path(self, path: str) -> Optional[str]:
+        """
+        Extracts and returns the subvolume group name from the given path.
+
+        :param path: The path in the format '/volumes/{subvolumegroup}/{subvolume}/..'
+        :return: The subvolume group name
+        """
+        return self._extract_path_component(path, 1)
+
+    def _fetch_subvolume_from_path(self, path: str) -> Optional[str]:
+        """
+        Extracts and returns the subvolume name from the given path.
+
+        :param path: The path in the format '/volumes/{subvolumegroup}/{subvolume}/..'
+        :return: The subvolume name
+        """
+        return self._extract_path_component(path, 2)
+
+    def _manage_earmark(self, path: str, volume: str, operation: str, earmark: Optional[str] = None) -> Optional[str]:
+        """
+        Manages (get or set) the earmark for a subvolume based on the provided parameters.
+
+        :param path: The path of the subvolume
+        :param volume: The volume name
+        :param earmark: The earmark to set (None if only getting the earmark)
+        :return: The earmark if getting, otherwise None
+        """
+        with open_filesystem(self._cephfs_client, volume) as fs:
+            earmark_manager = CephFSVolumeEarmarking(fs, path)
+            try:
+                if operation == 'set' and earmark is not None:
+                    earmark_manager.set_earmark(earmark)
+                    return None
+                elif operation == 'get':
+                    return earmark_manager.get_earmark()
+            except EarmarkException as e:
+                logger.error(f"Failed to manage earmark: {e}")
+                return None
+        return None
+
+    def get_earmark(self, path: str, volume: str) -> Optional[str]:
+        """
+        Get earmark for a subvolume.
+        """
+        return self._manage_earmark(path, volume, 'get')
+
+    def set_earmark(self, path: str, volume: str, earmark: str) -> None:
+        """
+        Set earmark for a subvolume.
+        """
+        self._manage_earmark(path, volume, 'set', earmark)
+
+    def check_earmark(self, earmark: str, top_level_scope: EarmarkTopScope) -> bool:
+        """
+        Check if the earmark belongs to the mentioned top level scope.
+
+        :param earmark: The earmark string to check.
+        :param top_level_scope: The expected top level scope.
+        :return: True if the earmark matches the top level scope, False otherwise.
+        """
+        try:
+            parsed = CephFSVolumeEarmarking.parse_earmark(earmark)
+            if parsed is None:
+                return False
+            return parsed.top == top_level_scope
+        except EarmarkParseError:
+            return False
+
+
 @contextlib.contextmanager
 def open_filesystem(fsc: CephfsClient, fs_name: str) -> Generator["cephfs.LibCephFS", None, None]:
     """
index fc93a7277cb0b83fc2aaf3b2a5323a615e0cb385..670cb15a58766aaf64ce4823a25604bf6c3069ec 100644 (file)
@@ -18,6 +18,7 @@ import operator
 import time
 
 from ceph.deployment.service_spec import SMBSpec
+from ceph.fs.earmarking import EarmarkTopScope
 
 from . import config_store, external, resources
 from .enums import (
@@ -43,6 +44,7 @@ from .proto import (
     AccessAuthorizer,
     ConfigEntry,
     ConfigStore,
+    EarmarkResolver,
     EntryKey,
     OrchSubmitter,
     PathResolver,
@@ -112,6 +114,22 @@ class _FakePathResolver:
     resolve_exists = resolve
 
 
+class _FakeEarmarkResolver:
+    """A stub EarmarkResolver for unit testing."""
+
+    def __init__(self) -> None:
+        self._earmarks: Dict[Tuple[str, str], str] = {}
+
+    def get_earmark(self, path: str, volume: str) -> Optional[str]:
+        return None
+
+    def set_earmark(self, path: str, volume: str, earmark: str) -> None:
+        pass
+
+    def check_earmark(self, earmark: str, top_level_scope: str) -> bool:
+        return True
+
+
 class _FakeAuthorizer:
     """A stub AccessAuthorizer for unit testing."""
 
@@ -325,6 +343,7 @@ class ClusterConfigHandler:
         path_resolver: Optional[PathResolver] = None,
         authorizer: Optional[AccessAuthorizer] = None,
         orch: Optional[OrchSubmitter] = None,
+        earmark_resolver: Optional[EarmarkResolver] = None,
     ) -> None:
         self.internal_store = internal_store
         self.public_store = public_store
@@ -336,6 +355,9 @@ class ClusterConfigHandler:
             authorizer = _FakeAuthorizer()
         self._authorizer: AccessAuthorizer = authorizer
         self._orch = orch  # if None, disables updating the spec via orch
+        if earmark_resolver is None:
+            earmark_resolver = cast(EarmarkResolver, _FakeEarmarkResolver())
+        self._earmark_resolver = earmark_resolver
         log.info(
             'Initialized new ClusterConfigHandler with'
             f' internal store {self.internal_store!r},'
@@ -343,7 +365,8 @@ class ClusterConfigHandler:
             f' priv store {self.priv_store!r},'
             f' path resolver {self._path_resolver!r},'
             f' authorizer {self._authorizer!r},'
-            f' orch {self._orch!r}'
+            f' orch {self._orch!r},'
+            f' earmark resolver {self._earmark_resolver!r}'
         )
 
     def apply(
@@ -474,7 +497,12 @@ class ClusterConfigHandler:
             elif isinstance(
                 resource, (resources.Share, resources.RemovedShare)
             ):
-                _check_share(resource, staging, self._path_resolver)
+                _check_share(
+                    resource,
+                    staging,
+                    self._path_resolver,
+                    self._earmark_resolver,
+                )
             elif isinstance(resource, resources.JoinAuth):
                 _check_join_auths(resource, staging)
             elif isinstance(resource, resources.UsersAndGroups):
@@ -807,7 +835,10 @@ def _check_cluster(cluster: ClusterRef, staging: _Staging) -> None:
 
 
 def _check_share(
-    share: ShareRef, staging: _Staging, resolver: PathResolver
+    share: ShareRef,
+    staging: _Staging,
+    resolver: PathResolver,
+    earmark_resolver: EarmarkResolver,
 ) -> None:
     """Check that the share resource can be updated."""
     if share.intent == Intent.REMOVED:
@@ -822,7 +853,7 @@ def _check_share(
         )
     assert share.cephfs is not None
     try:
-        resolver.resolve_exists(
+        volpath = resolver.resolve_exists(
             share.cephfs.volume,
             share.cephfs.subvolumegroup,
             share.cephfs.subvolume,
@@ -832,6 +863,34 @@ def _check_share(
         raise ErrorResult(
             share, msg="path is not a valid directory in volume"
         )
+    if earmark_resolver:
+        earmark = earmark_resolver.get_earmark(
+            volpath,
+            share.cephfs.volume,
+        )
+        if not earmark:
+            smb_earmark = (
+                f"{EarmarkTopScope.SMB.value}.cluster.{share.cluster_id}"
+            )
+            earmark_resolver.set_earmark(
+                volpath,
+                share.cephfs.volume,
+                smb_earmark,
+            )
+        else:
+            if not earmark_resolver.check_earmark(
+                earmark, EarmarkTopScope.SMB
+            ):
+                raise ErrorResult(
+                    share,
+                    msg=f"earmark has already been set by {earmark.split('.')[0]}",
+                )
+            # Check if earmark is set by same cluster
+            if earmark.split('.')[2] != share.cluster_id:
+                raise ErrorResult(
+                    share,
+                    msg=f"earmark has already been set by smb cluster {earmark.split('.')[2]}",
+                )
     name_used_by = _share_name_in_use(staging, share)
     if name_used_by:
         raise ErrorResult(
index 1e71721202e806434e3ea7f377b02499d677e732..7483eb7964b4ad739d2beebcc98b9960b269d754 100644 (file)
@@ -5,6 +5,7 @@ import logging
 import orchestrator
 from ceph.deployment.service_spec import PlacementSpec, SMBSpec
 from mgr_module import MgrModule, Option, OptionLevel
+from mgr_util import CephFSEarmarkResolver
 
 from . import (
     cli,
@@ -55,6 +56,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
         path_resolver = kwargs.pop('path_resolver', None)
         authorizer = kwargs.pop('authorizer', None)
         uo = kwargs.pop('update_orchestration', None)
+        earmark_resolver = kwargs.pop('earmark_resolver', None)
         super().__init__(*args, **kwargs)
         if internal_store is not None:
             self._internal_store = internal_store
@@ -69,6 +71,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
             public_store or rados_store.RADOSConfigStore.init(self)
         )
         path_resolver = path_resolver or fs.CachingCephFSPathResolver(self)
+        earmark_resolver = earmark_resolver or CephFSEarmarkResolver(self)
         # Why the honk is the cast needed but path_resolver doesn't need it??
         # Sometimes mypy drives me batty.
         authorizer = cast(
@@ -81,6 +84,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
             path_resolver=path_resolver,
             authorizer=authorizer,
             orch=self._orch_backend(enable_orch=uo),
+            earmark_resolver=earmark_resolver,
         )
 
     def _backend_store(self, store_conf: str = '') -> ConfigStore:
index 858975f7390dbdcd25bae3400a40679c0fe158ea..847869a3cbe84fd3c9f4fbb532afe39cd64449f6 100644 (file)
@@ -15,6 +15,7 @@ from typing import (
 import sys
 
 from ceph.deployment.service_spec import SMBSpec
+from ceph.fs.earmarking import EarmarkTopScope
 
 # this uses a version check as opposed to a try/except because this
 # form makes mypy happy and try/except doesn't.
@@ -185,3 +186,18 @@ class AccessAuthorizer(Protocol):
         self, volume: str, entity: str, caps: str = ''
     ) -> None:
         ...  # pragma: no cover
+
+
+class EarmarkResolver(Protocol):
+    """A protocol for a type that can resolve earmarks for subvolumes."""
+
+    def get_earmark(self, path: str, volume: str) -> Optional[str]:
+        ...  # pragma: no cover
+
+    def set_earmark(self, path: str, volume: str, earmark: str) -> None:
+        ...  # pragma: no cover
+
+    def check_earmark(
+        self, earmark: str, top_level_scope: EarmarkTopScope
+    ) -> bool:
+        ...  # pragma: no cover
index ec9b6669d13a66594e67ccc30e62127b382b258d..bd9125c2d7be8d813b89569db2fd2489b7bc6c81 100644 (file)
@@ -1,6 +1,7 @@
 import pytest
 
 import smb
+from smb.handler import _FakeEarmarkResolver
 
 
 def _cluster(**kwargs):
@@ -880,6 +881,7 @@ def test_apply_remove_all_clusters(thandler):
             self.deployed.remove(service_name)
 
     thandler._orch = FakeOrch()
+    thandler._earmark_resolver = _FakeEarmarkResolver()
     test_apply_full_cluster_create(thandler)
 
     to_apply = [
index 86a2310a4de9827ca5d81c41934faaf015f37f2b..c9fd02968b9042cdabc5bf5dc804cd808afa4db3 100644 (file)
@@ -26,6 +26,7 @@ def tmodule():
         path_resolver=smb.handler._FakePathResolver(),
         authorizer=smb.handler._FakeAuthorizer(),
         update_orchestration=False,
+        earmark_resolver=smb.handler._FakeEarmarkResolver(),
     )
 
 
index fb7732d5cc801243ae9b0e69a114846b3e8acdcc..b9307ccca4270894ad0334d7c25789fd183c999b 100644 (file)
@@ -1,4 +1,5 @@
 import datetime
+from unittest.mock import MagicMock, patch
 import mgr_util
 
 import pytest
@@ -17,3 +18,70 @@ import pytest
 )
 def test_pretty_timedelta(delta: datetime.timedelta, out: str):
     assert mgr_util.to_pretty_timedelta(delta) == out
+
+
+class TestCephFsEarmarkResolver:
+
+    @pytest.fixture
+    def mock_mgr(self):
+        return MagicMock()
+
+    @pytest.fixture
+    def mock_cephfs_client(self):
+        return MagicMock()
+
+    @pytest.fixture
+    def resolver(self, mock_mgr, mock_cephfs_client):
+        return mgr_util.CephFSEarmarkResolver(mgr=mock_mgr, client=mock_cephfs_client)
+
+    @patch('mgr_util.open_filesystem')
+    def test_get_earmark(self, mock_open_filesystem, resolver):
+        path = "/volumes/group1/subvol1"
+
+        mock_fs_handle = MagicMock()
+        mock_open_filesystem.return_value.__enter__.return_value = mock_fs_handle
+        mock_open_filesystem.return_value.__exit__.return_value = False
+
+        mock_earmarking = MagicMock()
+        mock_earmarking.get_earmark.return_value = "smb.test"
+        with patch('mgr_util.CephFSVolumeEarmarking', return_value=mock_earmarking):
+            result = resolver.get_earmark(path, "test_volume")
+
+        assert result == "smb.test"
+
+    @patch('mgr_util.open_filesystem')
+    def test_set_earmark(self, mock_open_filesystem, resolver):
+        path = "/volumes/group1/subvol1"
+
+        mock_fs_handle = MagicMock()
+        mock_open_filesystem.return_value.__enter__.return_value = mock_fs_handle
+        mock_open_filesystem.return_value.__exit__.return_value = False
+
+        mock_earmarking = MagicMock()
+        mock_open_filesystem.return_value.__enter__.return_value = mock_fs_handle
+        with patch('mgr_util.CephFSVolumeEarmarking', return_value=mock_earmarking):
+            resolver.set_earmark(path, "test_volume", "smb.test2")
+
+        mock_earmarking.set_earmark.assert_called_once_with("smb.test2")
+
+    @patch('mgr_util.CephFSVolumeEarmarking.parse_earmark')
+    def test_check_earmark(self, mock_parse_earmark, resolver):
+        # Test that an earmark with the 'smb' top-level scope is correctly identified
+        mock_parse_earmark.return_value = MagicMock(top=mgr_util.EarmarkTopScope.SMB)
+        result = resolver.check_earmark("smb.cluster.cluster1", mgr_util.EarmarkTopScope.SMB)
+        assert result is True
+
+        # Test with a different top-level scope, should return False
+        mock_parse_earmark.return_value = MagicMock(top=mgr_util.EarmarkTopScope.SMB)
+        result = resolver.check_earmark("smb.cluster.cluster1", mgr_util.EarmarkTopScope.NFS)
+        assert result is False
+
+        # Test with an invalid earmark (parse_earmark returns None), should return False
+        mock_parse_earmark.return_value = None
+        result = resolver.check_earmark("invalid.test", mgr_util.EarmarkTopScope.SMB)
+        assert result is False
+
+        # Test with an exception raised by parse_earmark, should return False
+        mock_parse_earmark.side_effect = mgr_util.EarmarkParseError
+        result = resolver.check_earmark("error.test", mgr_util.EarmarkTopScope.SMB)
+        assert result is False
index 05df31014170a71d3e285162580cca725f4cde1f..75382a1ca7eaba5de485f96bd74fae26a775a21b 100644 (file)
@@ -18,7 +18,7 @@ from ...exception import MetadataMgrException, VolumeException
 from .auth_metadata import AuthMetadataManager
 from .subvolume_attrs import SubvolumeStates
 
-from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException  # type: ignore
+from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException
 
 log = logging.getLogger(__name__)
 
index 43ca060607d0368b00cebbe5b3acbd8d75964948..9679e171e8d2672bda6857788385e8b3ddbf1a57 100644 (file)
@@ -9,7 +9,7 @@ from urllib.parse import urlsplit, urlunsplit
 
 import cephfs
 
-from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException  # type: ignore
+from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException
 
 from mgr_util import CephfsClient
 
index 3d11da933397fcf8368aad7aaaeeca6879b66b6c..238f2d8755f13b9f0381f275b73aaa250a0ff3c3 100644 (file)
@@ -19,7 +19,7 @@ supported top-level scopes.
 import errno
 import enum
 import logging
-from typing import Optional, Tuple
+from typing import List, NamedTuple, Optional, Tuple
 
 log = logging.getLogger(__name__)
 
@@ -43,6 +43,15 @@ class EarmarkException(Exception):
         return f"{self.errno} ({self.error_str})"
 
 
+class EarmarkContents(NamedTuple):
+    top: 'EarmarkTopScope'
+    subsections: List[str]
+
+
+class EarmarkParseError(ValueError):
+    pass
+
+
 class CephFSVolumeEarmarking:
     def __init__(self, fs, path: str) -> None:
         self.fs = fs
@@ -56,26 +65,60 @@ class CephFSVolumeEarmarking:
             raise EarmarkException(-e.errno, e.strerror) from e
         else:
             log.error(f"Unexpected error {action} earmark: {e}")
-            raise EarmarkException(errno.EIO, "Unexpected error") from e
+            raise EarmarkException
+
+    @staticmethod
+    def parse_earmark(value: str) -> Optional[EarmarkContents]:
+        """
+        Parse an earmark value. Returns None if the value is an empty string.
+        Raises EarmarkParseError if the top-level scope is not valid or the earmark
+        string is not properly structured.
+        Returns an EarmarkContents for valid earmark values.
+
+        :param value: The earmark string to parse.
+        :return: An EarmarkContents instance if valid, None if empty.
+        """
+        if not value:
+            return None
+
+        parts = value.split('.')
+
+        # Check if the top-level scope is valid
+        if parts[0] not in (scope.value for scope in EarmarkTopScope):
+            raise EarmarkParseError(f"Invalid top-level scope: {parts[0]}")
+
+        # Check if all parts are non-empty to ensure valid dot-separated format
+        if not all(parts):
+            raise EarmarkParseError("Earmark contains empty sections.")
+
+        # Return parsed earmark with top scope and subsections
+        return EarmarkContents(top=EarmarkTopScope(parts[0]), subsections=parts[1:])
 
     def _validate_earmark(self, earmark: str) -> bool:
         """
-        Validates that the earmark string is either empty or composed of parts separated by scopes,
-        with the top-level scope being either 'nfs' or 'smb'.
+        Validates the earmark string further by checking specific conditions for scopes like 'smb'.
 
         :param earmark: The earmark string to validate.
         :return: True if valid, False otherwise.
         """
-        if not earmark or earmark in (scope.value for scope in EarmarkTopScope):
-            return True
+        try:
+            parsed = self.parse_earmark(earmark)
+        except EarmarkParseError:
+            return False
 
-        parts = earmark.split('.')
+        # If parsed is None, it's considered valid since the earmark is empty
+        if not parsed:
+            return True
 
-        if parts[0] not in (scope.value for scope in EarmarkTopScope):
-            return False
+        # Specific validation for 'smb' scope
+        if parsed.top == EarmarkTopScope.SMB:
+            # Valid formats: 'smb' or 'smb.cluster.{cluster_id}'
+            if not (len(parsed.subsections) == 0 or
+                    (len(parsed.subsections) == 2 and
+                    parsed.subsections[0] == 'cluster' and parsed.subsections[1])):
+                return False
 
-        # Check if all parts are non-empty (to ensure valid dot-separated format)
-        return all(parts)
+        return True
 
     def get_earmark(self) -> Optional[str]:
         try:
@@ -95,7 +138,8 @@ class CephFSVolumeEarmarking:
                 errno.EINVAL,
                 f"Invalid earmark specified: '{earmark}'. "
                 "A valid earmark should either be empty or start with 'nfs' or 'smb', "
-                "followed by dot-separated non-empty components."
+                "followed by dot-separated non-empty components or simply set "
+                "'smb.cluster.{cluster_id}' for the smb intra-cluster scope."
                 )
 
         try:
index 2add5d620649b11e0ebd3abcbff5daa59dd744e0..28c54f0770c97b35a5683c8b76975e778e9545d0 100644 (file)
@@ -2,8 +2,13 @@ import pytest
 import errno
 from unittest import mock
 
-from ceph.fs.earmarking import CephFSVolumeEarmarking, EarmarkException, EarmarkTopScope
-# Mock constants
+from ceph.fs.earmarking import (
+    CephFSVolumeEarmarking,
+    EarmarkException,
+    EarmarkParseError,
+    EarmarkTopScope
+)
+
 XATTR_SUBVOLUME_EARMARK_NAME = 'user.ceph.subvolume.earmark'
 
 
@@ -17,70 +22,63 @@ class TestCephFSVolumeEarmarking:
     def earmarking(self, mock_fs):
         return CephFSVolumeEarmarking(mock_fs, "/test/path")
 
-    def test_get_earmark_success(self, earmarking, mock_fs):
-        mock_fs.getxattr.return_value = b"nfs"
-        result = earmarking.get_earmark()
-        assert result == "nfs"
-        mock_fs.getxattr.assert_called_once_with("/test/path", XATTR_SUBVOLUME_EARMARK_NAME)
+    def test_parse_earmark_valid(self):
+        earmark_value = "nfs.subsection1.subsection2"
+        result = CephFSVolumeEarmarking.parse_earmark(earmark_value)
+        assert result.top == EarmarkTopScope.NFS
+        assert result.subsections == ["subsection1", "subsection2"]
 
-    def test_get_earmark_no_earmark_set(self, earmarking, mock_fs):
-        mock_fs.getxattr.return_value = b""
-        result = earmarking.get_earmark()
+    def test_parse_earmark_empty_string(self):
+        result = CephFSVolumeEarmarking.parse_earmark("")
+        assert result is None
+
+    def test_parse_earmark_invalid_scope(self):
+        with pytest.raises(EarmarkParseError):
+            CephFSVolumeEarmarking.parse_earmark("invalid.scope")
+
+    def test_parse_earmark_empty_sections(self):
+        with pytest.raises(EarmarkParseError):
+            CephFSVolumeEarmarking.parse_earmark("nfs..section")
+
+    def test_validate_earmark_valid_empty(self, earmarking):
+        assert earmarking._validate_earmark("")
 
-        assert result == ""
-        mock_fs.getxattr.assert_called_once_with("/test/path", XATTR_SUBVOLUME_EARMARK_NAME)
+    def test_validate_earmark_valid_smb(self, earmarking):
+        assert earmarking._validate_earmark("smb.cluster.cluster_id")
 
-    def test_get_earmark_error(self, earmarking, mock_fs):
-        mock_fs.getxattr.side_effect = OSError(errno.EIO, "I/O error")
+    def test_validate_earmark_invalid_smb_format(self, earmarking):
+        assert not earmarking._validate_earmark("smb.invalid.format")
 
+    def test_get_earmark_success(self, earmarking):
+        earmarking.fs.getxattr.return_value = b'nfs.valid.earmark'
+        result = earmarking.get_earmark()
+        assert result == 'nfs.valid.earmark'
+
+    def test_get_earmark_handle_error(self, earmarking):
+        earmarking.fs.getxattr.side_effect = OSError(errno.EIO, "I/O error")
         with pytest.raises(EarmarkException) as excinfo:
             earmarking.get_earmark()
-
         assert excinfo.value.errno == -errno.EIO
-        assert "I/O error" in str(excinfo.value)
 
-        # Ensure that the getxattr method was called exactly once
-        mock_fs.getxattr.assert_called_once_with("/test/path", XATTR_SUBVOLUME_EARMARK_NAME)
-
-    def test_set_earmark_success(self, earmarking, mock_fs):
-        earmarking.set_earmark(EarmarkTopScope.NFS.value)
-        mock_fs.setxattr.assert_called_once_with(
-            "/test/path", XATTR_SUBVOLUME_EARMARK_NAME, b"nfs", 0
+    def test_set_earmark_valid(self, earmarking):
+        earmark = "nfs.valid.earmark"
+        earmarking.set_earmark(earmark)
+        earmarking.fs.setxattr.assert_called_with(
+            "/test/path", XATTR_SUBVOLUME_EARMARK_NAME, earmark.encode('utf-8'), 0
         )
 
     def test_set_earmark_invalid(self, earmarking):
         with pytest.raises(EarmarkException) as excinfo:
-            earmarking.set_earmark("invalid_scope")
-
+            earmarking.set_earmark("invalid.earmark")
         assert excinfo.value.errno == errno.EINVAL
-        assert "Invalid earmark specified" in str(excinfo.value)
-
-    def test_set_earmark_error(self, earmarking, mock_fs):
-        mock_fs.setxattr.side_effect = OSError(errno.EIO, "I/O error")
 
+    def test_set_earmark_handle_error(self, earmarking):
+        earmarking.fs.setxattr.side_effect = OSError(errno.EIO, "I/O error")
         with pytest.raises(EarmarkException) as excinfo:
-            earmarking.set_earmark(EarmarkTopScope.NFS.value)
-
+            earmarking.set_earmark("nfs.valid.earmark")
         assert excinfo.value.errno == -errno.EIO
-        assert "I/O error" in str(excinfo.value)
-        mock_fs.setxattr.assert_called_once_with(
-            "/test/path", XATTR_SUBVOLUME_EARMARK_NAME, b"nfs", 0
-        )
-
-    def test_clear_earmark_success(self, earmarking, mock_fs):
-        earmarking.clear_earmark()
-        mock_fs.setxattr.assert_called_once_with(
-            "/test/path", XATTR_SUBVOLUME_EARMARK_NAME, b"", 0
-        )
 
-    def test_clear_earmark_error(self, earmarking, mock_fs):
-        mock_fs.setxattr.side_effect = OSError(errno.EIO, "I/O error")
-
-        with pytest.raises(EarmarkException) as excinfo:
+    def test_clear_earmark(self, earmarking):
+        with mock.patch.object(earmarking, 'set_earmark') as mock_set_earmark:
             earmarking.clear_earmark()
-
-        assert excinfo.value.errno == -errno.EIO
-        assert "I/O error" in str(excinfo.value)
-        mock_fs.setxattr.assert_called_once_with(
-            "/test/path", XATTR_SUBVOLUME_EARMARK_NAME, b"", 0
-        )
+            mock_set_earmark.assert_called_once_with("")