]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/smb: move functions from handler.py to staging.py
authorJohn Mulligan <jmulligan@redhat.com>
Wed, 2 Jul 2025 17:44:39 +0000 (13:44 -0400)
committerAdam King <adking@redhat.com>
Thu, 10 Jul 2025 16:02:39 +0000 (12:02 -0400)
Create a new file staging.py to reduce the size of handler.py and
organize this a bit more. The staging.py file will be responsible for
the special staging area store as well as functions that work directly
on the staging store like the cross-check of resources.

This change is nearly a straight more except for renaming some functions
from _foo to foo.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
(cherry picked from commit 978e92fb68a1717a2713017382bfb5fbe9a24e63)

src/pybind/mgr/smb/handler.py
src/pybind/mgr/smb/staging.py [new file with mode: 0644]

index 5e58a7b8eebbf625731616707057e222b8e08d29..593f7af99d56bd5bf1d947fe6d073d1dacb88d22 100644 (file)
@@ -14,22 +14,17 @@ from typing import (
 
 import contextlib
 import logging
-import operator
 import time
 
 from ceph.deployment.service_spec import SMBSpec
-from ceph.fs.earmarking import EarmarkTopScope
 
 from . import config_store, external, resources
 from .enums import (
     AuthMode,
     CephFSStorageProvider,
-    ConfigNS,
-    Intent,
     JoinSourceType,
     LoginAccess,
     LoginCategory,
-    SMBClustering,
     State,
     UserGroupSourceType,
 )
@@ -38,21 +33,19 @@ from .internal import (
     JoinAuthEntry,
     ShareEntry,
     UsersAndGroupsEntry,
-    resource_entry,
-    resource_key,
 )
 from .proto import (
     AccessAuthorizer,
     ConfigEntry,
     ConfigStore,
     EarmarkResolver,
-    EntryKey,
     OrchSubmitter,
     PathResolver,
     Simplified,
 )
 from .resources import SMBResource
 from .results import ErrorResult, Result, ResultGroup
+from .staging import Staging, auth_refs, cross_check_resource, ug_refs
 from .utils import checked, ynbool
 
 ClusterRef = Union[resources.Cluster, resources.RemovedCluster]
@@ -209,99 +202,6 @@ class _Matcher:
         )
 
 
-class _Staging:
-    def __init__(self, store: ConfigStore) -> None:
-        self.destination_store = store
-        self.incoming: Dict[EntryKey, SMBResource] = {}
-        self.deleted: Dict[EntryKey, SMBResource] = {}
-        self._store_keycache: Set[EntryKey] = set()
-        self._virt_keycache: Set[EntryKey] = set()
-
-    def stage(self, resource: SMBResource) -> None:
-        self._virt_keycache = set()
-        ekey = resource_key(resource)
-        if resource.intent == Intent.REMOVED:
-            self.deleted[ekey] = resource
-        else:
-            self.deleted.pop(ekey, None)
-            self.incoming[ekey] = resource
-
-    def _virtual_keys(self) -> Collection[EntryKey]:
-        if self._virt_keycache:
-            return self._virt_keycache
-        self._virt_keycache = set(self._store_keys()) - set(
-            self.deleted
-        ) | set(self.incoming)
-        return self._virt_keycache
-
-    def _store_keys(self) -> Collection[EntryKey]:
-        if not self._store_keycache:
-            self._store_keycache = set(self.destination_store)
-        return self._store_keycache
-
-    def __iter__(self) -> Iterator[EntryKey]:
-        return iter(self._virtual_keys())
-
-    def namespaces(self) -> Collection[str]:
-        return {k[0] for k in self}
-
-    def contents(self, ns: str) -> Collection[str]:
-        return {kname for kns, kname in self if kns == ns}
-
-    def is_new(self, resource: SMBResource) -> bool:
-        ekey = resource_key(resource)
-        return ekey not in self._store_keys()
-
-    def get_cluster(self, cluster_id: str) -> resources.Cluster:
-        ekey = (str(ClusterEntry.namespace), cluster_id)
-        if ekey in self.incoming:
-            res = self.incoming[ekey]
-            assert isinstance(res, resources.Cluster)
-            return res
-        return ClusterEntry.from_store(
-            self.destination_store, cluster_id
-        ).get_cluster()
-
-    def get_join_auth(self, auth_id: str) -> resources.JoinAuth:
-        ekey = (str(JoinAuthEntry.namespace), auth_id)
-        if ekey in self.incoming:
-            res = self.incoming[ekey]
-            assert isinstance(res, resources.JoinAuth)
-            return res
-        return JoinAuthEntry.from_store(
-            self.destination_store, auth_id
-        ).get_join_auth()
-
-    def get_users_and_groups(self, ug_id: str) -> resources.UsersAndGroups:
-        ekey = (str(UsersAndGroupsEntry.namespace), ug_id)
-        if ekey in self.incoming:
-            res = self.incoming[ekey]
-            assert isinstance(res, resources.UsersAndGroups)
-            return res
-        return UsersAndGroupsEntry.from_store(
-            self.destination_store, ug_id
-        ).get_users_and_groups()
-
-    def save(self) -> ResultGroup:
-        results = ResultGroup()
-        for res in self.deleted.values():
-            results.append(self._save(res))
-        for res in self.incoming.values():
-            results.append(self._save(res))
-        return results
-
-    def _save(self, resource: SMBResource) -> Result:
-        entry = resource_entry(self.destination_store, resource)
-        if resource.intent == Intent.REMOVED:
-            removed = entry.remove()
-            state = State.REMOVED if removed else State.NOT_PRESENT
-        else:
-            state = entry.create_or_update(resource)
-        log.debug('saved resource: %r; state: %s', resource, state)
-        result = Result(resource, success=True, status={'state': state})
-        return result
-
-
 class ClusterConfigHandler:
     """The central class for ingesting and handling smb configuration change
     requests.
@@ -379,7 +279,7 @@ class ClusterConfigHandler:
         """
         log.debug('applying changes to internal data store')
         results = ResultGroup()
-        staging = _Staging(self.internal_store)
+        staging = Staging(self.internal_store)
         try:
             incoming = order_resources(inputs)
             for resource in incoming:
@@ -478,7 +378,7 @@ class ClusterConfigHandler:
     def _check(
         self,
         resource: SMBResource,
-        staging: _Staging,
+        staging: Staging,
         *,
         create_only: bool = False,
     ) -> Result:
@@ -540,11 +440,11 @@ class ClusterConfigHandler:
                 ],
                 [
                     self._join_auth_entry(_id).get_join_auth()
-                    for _id in _auth_refs(cluster)
+                    for _id in auth_refs(cluster)
                 ],
                 [
                     self._users_and_groups_entry(_id).get_users_and_groups()
-                    for _id in _ug_refs(cluster)
+                    for _id in ug_refs(cluster)
                 ],
             )
             change_groups.append(change_group)
@@ -776,356 +676,7 @@ def order_resources(
     return sorted(resource_objs, key=_keyfunc)
 
 
-def cross_check_resource(
-    resource: SMBResource,
-    staging: _Staging,
-    *,
-    path_resolver: PathResolver,
-    earmark_resolver: EarmarkResolver,
-) -> None:
-    if isinstance(resource, (resources.Cluster, resources.RemovedCluster)):
-        _check_cluster(resource, staging)
-    elif isinstance(resource, (resources.Share, resources.RemovedShare)):
-        _check_share(
-            resource,
-            staging,
-            path_resolver,
-            earmark_resolver,
-        )
-    elif isinstance(resource, resources.JoinAuth):
-        _check_join_auths(resource, staging)
-    elif isinstance(resource, resources.UsersAndGroups):
-        _check_users_and_groups(resource, staging)
-    else:
-        raise TypeError('not a valid smb resource')
-
-
-def _check_cluster(cluster: ClusterRef, staging: _Staging) -> None:
-    """Check that the cluster resource can be updated."""
-    if cluster.intent == Intent.PRESENT:
-        return _check_cluster_present(cluster, staging)
-    return _check_cluster_removed(cluster, staging)
-
-
-def _check_cluster_removed(cluster: ClusterRef, staging: _Staging) -> None:
-    share_ids = ShareEntry.ids(staging)
-    clusters_used = {cid for cid, _ in share_ids}
-    if cluster.cluster_id in clusters_used:
-        raise ErrorResult(
-            cluster,
-            msg="cluster in use by shares",
-            status={
-                'shares': [
-                    shid
-                    for cid, shid in share_ids
-                    if cid == cluster.cluster_id
-                ]
-            },
-        )
-
-
-def _check_cluster_present(cluster: ClusterRef, staging: _Staging) -> None:
-    assert isinstance(cluster, resources.Cluster)
-    cluster.validate()
-    if not staging.is_new(cluster):
-        _check_cluster_modifications(cluster, staging)
-    for auth_ref in _auth_refs(cluster):
-        auth = staging.get_join_auth(auth_ref)
-        if (
-            auth.linked_to_cluster
-            and auth.linked_to_cluster != cluster.cluster_id
-        ):
-            raise ErrorResult(
-                cluster,
-                msg="join auth linked to different cluster",
-                status={
-                    'other_cluster_id': auth.linked_to_cluster,
-                },
-            )
-    for ug_ref in _ug_refs(cluster):
-        ug = staging.get_users_and_groups(ug_ref)
-        if (
-            ug.linked_to_cluster
-            and ug.linked_to_cluster != cluster.cluster_id
-        ):
-            raise ErrorResult(
-                cluster,
-                msg="users and groups linked to different cluster",
-                status={
-                    'other_cluster_id': ug.linked_to_cluster,
-                },
-            )
-
-
-def _check_cluster_modifications(
-    cluster: resources.Cluster, staging: _Staging
-) -> None:
-    """cluster has some fields we do not permit changing after the cluster has
-    been created.
-    """
-    prev = ClusterEntry.from_store(
-        staging.destination_store, cluster.cluster_id
-    ).get_cluster()
-    if cluster.auth_mode != prev.auth_mode:
-        raise ErrorResult(
-            cluster,
-            'auth_mode value may not be changed',
-            status={'existing_auth_mode': prev.auth_mode},
-        )
-    if cluster.auth_mode == AuthMode.ACTIVE_DIRECTORY:
-        assert prev.domain_settings
-        if not cluster.domain_settings:
-            # should not occur
-            raise ErrorResult(cluster, "domain settings missing from cluster")
-        if cluster.domain_settings.realm != prev.domain_settings.realm:
-            raise ErrorResult(
-                cluster,
-                'domain/realm value may not be changed',
-                status={'existing_domain_realm': prev.domain_settings.realm},
-            )
-    if cluster.is_clustered() != prev.is_clustered():
-        prev_clustering = prev.is_clustered()
-        cterms = {True: 'enabled', False: 'disabled'}
-        msg = (
-            f'a cluster resource with clustering {cterms[prev_clustering]}'
-            f' may not be changed to clustering {cterms[not prev_clustering]}'
-        )
-        opt_terms = {
-            True: SMBClustering.ALWAYS.value,
-            False: SMBClustering.NEVER.value,
-        }
-        hint = {
-            'note': (
-                'Set "clustering" to an explicit value that matches the'
-                ' current clustering behavior'
-            ),
-            'value': opt_terms[prev_clustering],
-        }
-        raise ErrorResult(cluster, msg, status={'hint': hint})
-
-
-def _parse_earmark(earmark: str) -> dict:
-    parts = earmark.split('.')
-
-    # If it only has one part (e.g., 'smb'), return None for cluster_id
-    if len(parts) == 1:
-        return {'scope': parts[0], 'cluster_id': None}
-
-    return {
-        'scope': parts[0],
-        'cluster_id': parts[2] if len(parts) > 2 else None,
-    }
-
-
-def _check_share(
-    share: ShareRef,
-    staging: _Staging,
-    resolver: PathResolver,
-    earmark_resolver: EarmarkResolver,
-) -> None:
-    """Check that the share resource can be updated."""
-    if share.intent == Intent.REMOVED:
-        return
-    assert isinstance(share, resources.Share)
-    share.validate()
-    if share.cluster_id not in ClusterEntry.ids(staging):
-        raise ErrorResult(
-            share,
-            msg="no matching cluster id",
-            status={"cluster_id": share.cluster_id},
-        )
-    assert share.cephfs is not None
-    try:
-        volpath = resolver.resolve_exists(
-            share.cephfs.volume,
-            share.cephfs.subvolumegroup,
-            share.cephfs.subvolume,
-            share.cephfs.path,
-        )
-    except (FileNotFoundError, NotADirectoryError):
-        raise ErrorResult(
-            share, msg="path is not a valid directory in volume"
-        )
-    if earmark_resolver:
-        earmark = earmark_resolver.get_earmark(
-            volpath,
-            share.cephfs.volume,
-        )
-        if not earmark:
-            smb_earmark = (
-                f"{EarmarkTopScope.SMB.value}.cluster.{share.cluster_id}"
-            )
-            earmark_resolver.set_earmark(
-                volpath,
-                share.cephfs.volume,
-                smb_earmark,
-            )
-        else:
-            parsed_earmark = _parse_earmark(earmark)
-
-            # Check if the top-level scope is not SMB
-            if not earmark_resolver.check_earmark(
-                earmark, EarmarkTopScope.SMB
-            ):
-                raise ErrorResult(
-                    share,
-                    msg=f"earmark has already been set by {parsed_earmark['scope']}",
-                )
-
-            # Check if the earmark is set by a different cluster
-            if (
-                parsed_earmark['cluster_id']
-                and parsed_earmark['cluster_id'] != share.cluster_id
-            ):
-                raise ErrorResult(
-                    share,
-                    msg="earmark has already been set by smb cluster "
-                    f"{parsed_earmark['cluster_id']}",
-                )
-
-    name_used_by = _share_name_in_use(staging, share)
-    if name_used_by:
-        raise ErrorResult(
-            share,
-            msg="share name already in use",
-            status={"conflicting_share_id": name_used_by},
-        )
-
-
-def _share_name_in_use(
-    staging: _Staging, share: resources.Share
-) -> Optional[str]:
-    """Returns the share_id value if the share's name is already in
-    use by a different share in the cluster. Returns None if no other
-    shares are using the name.
-    """
-    share_ids = (share.cluster_id, share.share_id)
-    share_ns = str(ConfigNS.SHARES)
-    # look for any duplicate names in the staging area.
-    # these items are already in memory
-    for ekey, res in staging.incoming.items():
-        if ekey[0] != share_ns:
-            continue  # not a share
-        assert isinstance(res, resources.Share)
-        if (res.cluster_id, res.share_id) == share_ids:
-            continue  # this share
-        if (res.cluster_id, res.name) == (share.cluster_id, share.name):
-            return res.share_id
-    # look for any duplicate names in the underyling store
-    found = config_store.find_in_store(
-        staging.destination_store,
-        share_ns,
-        {'cluster_id': share.cluster_id, 'name': share.name},
-    )
-    # remove any shares that are deleted in staging
-    found_curr = [
-        entry for entry in found if entry.full_key not in staging.deleted
-    ]
-    # remove self-share from list
-    id_pair = operator.itemgetter('cluster_id', 'share_id')
-    found_curr = [
-        entry for entry in found_curr if id_pair(entry.get()) != share_ids
-    ]
-    if not found_curr:
-        return None
-    if len(found_curr) != 1:
-        # this should not normally happen
-        log.warning(
-            'multiple shares with one name in cluster: %s',
-            ' '.join(s.get()['share_id'] for s in found_curr),
-        )
-    return found_curr[0].get()['share_id']
-
-
-def _check_join_auths(
-    join_auth: resources.JoinAuth, staging: _Staging
-) -> None:
-    """Check that the JoinAuth resource can be updated."""
-    if join_auth.intent == Intent.PRESENT:
-        return _check_join_auths_present(join_auth, staging)
-    return _check_join_auths_removed(join_auth, staging)
-
-
-def _check_join_auths_removed(
-    join_auth: resources.JoinAuth, staging: _Staging
-) -> None:
-    cids = set(ClusterEntry.ids(staging))
-    refs_in_use: Dict[str, List[str]] = {}
-    for cluster_id in cids:
-        cluster = staging.get_cluster(cluster_id)
-        for ref in _auth_refs(cluster):
-            refs_in_use.setdefault(ref, []).append(cluster_id)
-    log.debug('refs_in_use: %r', refs_in_use)
-    if join_auth.auth_id in refs_in_use:
-        raise ErrorResult(
-            join_auth,
-            msg='join auth resource in use by clusters',
-            status={
-                'clusters': refs_in_use[join_auth.auth_id],
-            },
-        )
-
-
-def _check_join_auths_present(
-    join_auth: resources.JoinAuth, staging: _Staging
-) -> None:
-    if join_auth.linked_to_cluster:
-        cids = set(ClusterEntry.ids(staging))
-        if join_auth.linked_to_cluster not in cids:
-            raise ErrorResult(
-                join_auth,
-                msg='linked_to_cluster id not valid',
-                status={
-                    'unknown_id': join_auth.linked_to_cluster,
-                },
-            )
-
-
-def _check_users_and_groups(
-    users_and_groups: resources.UsersAndGroups, staging: _Staging
-) -> None:
-    """Check that the UsersAndGroups resource can be updated."""
-    if users_and_groups.intent == Intent.PRESENT:
-        return _check_users_and_groups_present(users_and_groups, staging)
-    return _check_users_and_groups_removed(users_and_groups, staging)
-
-
-def _check_users_and_groups_removed(
-    users_and_groups: resources.UsersAndGroups, staging: _Staging
-) -> None:
-    refs_in_use: Dict[str, List[str]] = {}
-    cids = set(ClusterEntry.ids(staging))
-    for cluster_id in cids:
-        cluster = staging.get_cluster(cluster_id)
-        for ref in _ug_refs(cluster):
-            refs_in_use.setdefault(ref, []).append(cluster_id)
-    log.debug('refs_in_use: %r', refs_in_use)
-    if users_and_groups.users_groups_id in refs_in_use:
-        raise ErrorResult(
-            users_and_groups,
-            msg='users and groups resource in use by clusters',
-            status={
-                'clusters': refs_in_use[users_and_groups.users_groups_id],
-            },
-        )
-
-
-def _check_users_and_groups_present(
-    users_and_groups: resources.UsersAndGroups, staging: _Staging
-) -> None:
-    if users_and_groups.linked_to_cluster:
-        cids = set(ClusterEntry.ids(staging))
-        if users_and_groups.linked_to_cluster not in cids:
-            raise ErrorResult(
-                users_and_groups,
-                msg='linked_to_cluster id not valid',
-                status={
-                    'unknown_id': users_and_groups.linked_to_cluster,
-                },
-            )
-
-
-def _prune_linked_entries(staging: _Staging) -> None:
+def _prune_linked_entries(staging: Staging) -> None:
     cids = set(ClusterEntry.ids(staging))
     for auth_id in JoinAuthEntry.ids(staging):
         join_auth = staging.get_join_auth(auth_id)
@@ -1144,29 +695,6 @@ def _prune_linked_entries(staging: _Staging) -> None:
             ).remove()
 
 
-def _auth_refs(cluster: resources.Cluster) -> Collection[str]:
-    if cluster.auth_mode != AuthMode.ACTIVE_DIRECTORY:
-        return set()
-    return {
-        j.ref
-        for j in checked(cluster.domain_settings).join_sources
-        if j.source_type == JoinSourceType.RESOURCE and j.ref
-    }
-
-
-def _ug_refs(cluster: resources.Cluster) -> Collection[str]:
-    if (
-        cluster.auth_mode != AuthMode.USER
-        or cluster.user_group_settings is None
-    ):
-        return set()
-    return {
-        ug.ref
-        for ug in cluster.user_group_settings
-        if ug.source_type == UserGroupSourceType.RESOURCE and ug.ref
-    }
-
-
 def _generate_share(
     share: resources.Share, resolver: PathResolver, cephx_entity: str
 ) -> Dict[str, Dict[str, str]]:
diff --git a/src/pybind/mgr/smb/staging.py b/src/pybind/mgr/smb/staging.py
new file mode 100644 (file)
index 0000000..9c9e11d
--- /dev/null
@@ -0,0 +1,512 @@
+from typing import (
+    Collection,
+    Dict,
+    Iterator,
+    List,
+    Optional,
+    Set,
+    Union,
+)
+
+import logging
+import operator
+
+from ceph.fs.earmarking import EarmarkTopScope
+
+from . import config_store, resources
+from .enums import (
+    AuthMode,
+    ConfigNS,
+    Intent,
+    JoinSourceType,
+    SMBClustering,
+    State,
+    UserGroupSourceType,
+)
+from .internal import (
+    ClusterEntry,
+    JoinAuthEntry,
+    ShareEntry,
+    UsersAndGroupsEntry,
+    resource_entry,
+    resource_key,
+)
+from .proto import (
+    ConfigStore,
+    EarmarkResolver,
+    EntryKey,
+    PathResolver,
+)
+from .resources import SMBResource
+from .results import ErrorResult, Result, ResultGroup
+from .utils import checked
+
+ClusterRef = Union[resources.Cluster, resources.RemovedCluster]
+ShareRef = Union[resources.Share, resources.RemovedShare]
+
+log = logging.getLogger(__name__)
+
+
+class Staging:
+    def __init__(self, store: ConfigStore) -> None:
+        self.destination_store = store
+        self.incoming: Dict[EntryKey, SMBResource] = {}
+        self.deleted: Dict[EntryKey, SMBResource] = {}
+        self._store_keycache: Set[EntryKey] = set()
+        self._virt_keycache: Set[EntryKey] = set()
+
+    def stage(self, resource: SMBResource) -> None:
+        self._virt_keycache = set()
+        ekey = resource_key(resource)
+        if resource.intent == Intent.REMOVED:
+            self.deleted[ekey] = resource
+        else:
+            self.deleted.pop(ekey, None)
+            self.incoming[ekey] = resource
+
+    def _virtual_keys(self) -> Collection[EntryKey]:
+        if self._virt_keycache:
+            return self._virt_keycache
+        self._virt_keycache = set(self._store_keys()) - set(
+            self.deleted
+        ) | set(self.incoming)
+        return self._virt_keycache
+
+    def _store_keys(self) -> Collection[EntryKey]:
+        if not self._store_keycache:
+            self._store_keycache = set(self.destination_store)
+        return self._store_keycache
+
+    def __iter__(self) -> Iterator[EntryKey]:
+        return iter(self._virtual_keys())
+
+    def namespaces(self) -> Collection[str]:
+        return {k[0] for k in self}
+
+    def contents(self, ns: str) -> Collection[str]:
+        return {kname for kns, kname in self if kns == ns}
+
+    def is_new(self, resource: SMBResource) -> bool:
+        ekey = resource_key(resource)
+        return ekey not in self._store_keys()
+
+    def get_cluster(self, cluster_id: str) -> resources.Cluster:
+        ekey = (str(ClusterEntry.namespace), cluster_id)
+        if ekey in self.incoming:
+            res = self.incoming[ekey]
+            assert isinstance(res, resources.Cluster)
+            return res
+        return ClusterEntry.from_store(
+            self.destination_store, cluster_id
+        ).get_cluster()
+
+    def get_join_auth(self, auth_id: str) -> resources.JoinAuth:
+        ekey = (str(JoinAuthEntry.namespace), auth_id)
+        if ekey in self.incoming:
+            res = self.incoming[ekey]
+            assert isinstance(res, resources.JoinAuth)
+            return res
+        return JoinAuthEntry.from_store(
+            self.destination_store, auth_id
+        ).get_join_auth()
+
+    def get_users_and_groups(self, ug_id: str) -> resources.UsersAndGroups:
+        ekey = (str(UsersAndGroupsEntry.namespace), ug_id)
+        if ekey in self.incoming:
+            res = self.incoming[ekey]
+            assert isinstance(res, resources.UsersAndGroups)
+            return res
+        return UsersAndGroupsEntry.from_store(
+            self.destination_store, ug_id
+        ).get_users_and_groups()
+
+    def save(self) -> ResultGroup:
+        results = ResultGroup()
+        for res in self.deleted.values():
+            results.append(self._save(res))
+        for res in self.incoming.values():
+            results.append(self._save(res))
+        return results
+
+    def _save(self, resource: SMBResource) -> Result:
+        entry = resource_entry(self.destination_store, resource)
+        if resource.intent == Intent.REMOVED:
+            removed = entry.remove()
+            state = State.REMOVED if removed else State.NOT_PRESENT
+        else:
+            state = entry.create_or_update(resource)
+        log.debug('saved resource: %r; state: %s', resource, state)
+        result = Result(resource, success=True, status={'state': state})
+        return result
+
+
+def auth_refs(cluster: resources.Cluster) -> Collection[str]:
+    if cluster.auth_mode != AuthMode.ACTIVE_DIRECTORY:
+        return set()
+    return {
+        j.ref
+        for j in checked(cluster.domain_settings).join_sources
+        if j.source_type == JoinSourceType.RESOURCE and j.ref
+    }
+
+
+def ug_refs(cluster: resources.Cluster) -> Collection[str]:
+    if (
+        cluster.auth_mode != AuthMode.USER
+        or cluster.user_group_settings is None
+    ):
+        return set()
+    return {
+        ug.ref
+        for ug in cluster.user_group_settings
+        if ug.source_type == UserGroupSourceType.RESOURCE and ug.ref
+    }
+
+
+def cross_check_resource(
+    resource: SMBResource,
+    staging: Staging,
+    *,
+    path_resolver: PathResolver,
+    earmark_resolver: EarmarkResolver,
+) -> None:
+    if isinstance(resource, (resources.Cluster, resources.RemovedCluster)):
+        _check_cluster(resource, staging)
+    elif isinstance(resource, (resources.Share, resources.RemovedShare)):
+        _check_share(
+            resource,
+            staging,
+            path_resolver,
+            earmark_resolver,
+        )
+    elif isinstance(resource, resources.JoinAuth):
+        _check_join_auths(resource, staging)
+    elif isinstance(resource, resources.UsersAndGroups):
+        _check_users_and_groups(resource, staging)
+    else:
+        raise TypeError('not a valid smb resource')
+
+
+def _check_cluster(cluster: ClusterRef, staging: Staging) -> None:
+    """Check that the cluster resource can be updated."""
+    if cluster.intent == Intent.PRESENT:
+        return _check_cluster_present(cluster, staging)
+    return _check_cluster_removed(cluster, staging)
+
+
+def _check_cluster_removed(cluster: ClusterRef, staging: Staging) -> None:
+    share_ids = ShareEntry.ids(staging)
+    clusters_used = {cid for cid, _ in share_ids}
+    if cluster.cluster_id in clusters_used:
+        raise ErrorResult(
+            cluster,
+            msg="cluster in use by shares",
+            status={
+                'shares': [
+                    shid
+                    for cid, shid in share_ids
+                    if cid == cluster.cluster_id
+                ]
+            },
+        )
+
+
+def _check_cluster_present(cluster: ClusterRef, staging: Staging) -> None:
+    assert isinstance(cluster, resources.Cluster)
+    cluster.validate()
+    if not staging.is_new(cluster):
+        _check_cluster_modifications(cluster, staging)
+    for auth_ref in auth_refs(cluster):
+        auth = staging.get_join_auth(auth_ref)
+        if (
+            auth.linked_to_cluster
+            and auth.linked_to_cluster != cluster.cluster_id
+        ):
+            raise ErrorResult(
+                cluster,
+                msg="join auth linked to different cluster",
+                status={
+                    'other_cluster_id': auth.linked_to_cluster,
+                },
+            )
+    for ug_ref in ug_refs(cluster):
+        ug = staging.get_users_and_groups(ug_ref)
+        if (
+            ug.linked_to_cluster
+            and ug.linked_to_cluster != cluster.cluster_id
+        ):
+            raise ErrorResult(
+                cluster,
+                msg="users and groups linked to different cluster",
+                status={
+                    'other_cluster_id': ug.linked_to_cluster,
+                },
+            )
+
+
+def _check_cluster_modifications(
+    cluster: resources.Cluster, staging: Staging
+) -> None:
+    """cluster has some fields we do not permit changing after the cluster has
+    been created.
+    """
+    prev = ClusterEntry.from_store(
+        staging.destination_store, cluster.cluster_id
+    ).get_cluster()
+    if cluster.auth_mode != prev.auth_mode:
+        raise ErrorResult(
+            cluster,
+            'auth_mode value may not be changed',
+            status={'existing_auth_mode': prev.auth_mode},
+        )
+    if cluster.auth_mode == AuthMode.ACTIVE_DIRECTORY:
+        assert prev.domain_settings
+        if not cluster.domain_settings:
+            # should not occur
+            raise ErrorResult(cluster, "domain settings missing from cluster")
+        if cluster.domain_settings.realm != prev.domain_settings.realm:
+            raise ErrorResult(
+                cluster,
+                'domain/realm value may not be changed',
+                status={'existing_domain_realm': prev.domain_settings.realm},
+            )
+    if cluster.is_clustered() != prev.is_clustered():
+        prev_clustering = prev.is_clustered()
+        cterms = {True: 'enabled', False: 'disabled'}
+        msg = (
+            f'a cluster resource with clustering {cterms[prev_clustering]}'
+            f' may not be changed to clustering {cterms[not prev_clustering]}'
+        )
+        opt_terms = {
+            True: SMBClustering.ALWAYS.value,
+            False: SMBClustering.NEVER.value,
+        }
+        hint = {
+            'note': (
+                'Set "clustering" to an explicit value that matches the'
+                ' current clustering behavior'
+            ),
+            'value': opt_terms[prev_clustering],
+        }
+        raise ErrorResult(cluster, msg, status={'hint': hint})
+
+
+def _check_share(
+    share: ShareRef,
+    staging: Staging,
+    resolver: PathResolver,
+    earmark_resolver: EarmarkResolver,
+) -> None:
+    """Check that the share resource can be updated."""
+    if share.intent == Intent.REMOVED:
+        return
+    assert isinstance(share, resources.Share)
+    share.validate()
+    if share.cluster_id not in ClusterEntry.ids(staging):
+        raise ErrorResult(
+            share,
+            msg="no matching cluster id",
+            status={"cluster_id": share.cluster_id},
+        )
+    assert share.cephfs is not None
+    try:
+        volpath = resolver.resolve_exists(
+            share.cephfs.volume,
+            share.cephfs.subvolumegroup,
+            share.cephfs.subvolume,
+            share.cephfs.path,
+        )
+    except (FileNotFoundError, NotADirectoryError):
+        raise ErrorResult(
+            share, msg="path is not a valid directory in volume"
+        )
+    if earmark_resolver:
+        earmark = earmark_resolver.get_earmark(
+            volpath,
+            share.cephfs.volume,
+        )
+        if not earmark:
+            smb_earmark = (
+                f"{EarmarkTopScope.SMB.value}.cluster.{share.cluster_id}"
+            )
+            earmark_resolver.set_earmark(
+                volpath,
+                share.cephfs.volume,
+                smb_earmark,
+            )
+        else:
+            parsed_earmark = _parse_earmark(earmark)
+
+            # Check if the top-level scope is not SMB
+            if not earmark_resolver.check_earmark(
+                earmark, EarmarkTopScope.SMB
+            ):
+                raise ErrorResult(
+                    share,
+                    msg=f"earmark has already been set by {parsed_earmark['scope']}",
+                )
+
+            # Check if the earmark is set by a different cluster
+            if (
+                parsed_earmark['cluster_id']
+                and parsed_earmark['cluster_id'] != share.cluster_id
+            ):
+                raise ErrorResult(
+                    share,
+                    msg="earmark has already been set by smb cluster "
+                    f"{parsed_earmark['cluster_id']}",
+                )
+
+    name_used_by = _share_name_in_use(staging, share)
+    if name_used_by:
+        raise ErrorResult(
+            share,
+            msg="share name already in use",
+            status={"conflicting_share_id": name_used_by},
+        )
+
+
+def _share_name_in_use(
+    staging: Staging, share: resources.Share
+) -> Optional[str]:
+    """Returns the share_id value if the share's name is already in
+    use by a different share in the cluster. Returns None if no other
+    shares are using the name.
+    """
+    share_ids = (share.cluster_id, share.share_id)
+    share_ns = str(ConfigNS.SHARES)
+    # look for any duplicate names in the staging area.
+    # these items are already in memory
+    for ekey, res in staging.incoming.items():
+        if ekey[0] != share_ns:
+            continue  # not a share
+        assert isinstance(res, resources.Share)
+        if (res.cluster_id, res.share_id) == share_ids:
+            continue  # this share
+        if (res.cluster_id, res.name) == (share.cluster_id, share.name):
+            return res.share_id
+    # look for any duplicate names in the underyling store
+    found = config_store.find_in_store(
+        staging.destination_store,
+        share_ns,
+        {'cluster_id': share.cluster_id, 'name': share.name},
+    )
+    # remove any shares that are deleted in staging
+    found_curr = [
+        entry for entry in found if entry.full_key not in staging.deleted
+    ]
+    # remove self-share from list
+    id_pair = operator.itemgetter('cluster_id', 'share_id')
+    found_curr = [
+        entry for entry in found_curr if id_pair(entry.get()) != share_ids
+    ]
+    if not found_curr:
+        return None
+    if len(found_curr) != 1:
+        # this should not normally happen
+        log.warning(
+            'multiple shares with one name in cluster: %s',
+            ' '.join(s.get()['share_id'] for s in found_curr),
+        )
+    return found_curr[0].get()['share_id']
+
+
+def _check_join_auths(
+    join_auth: resources.JoinAuth, staging: Staging
+) -> None:
+    """Check that the JoinAuth resource can be updated."""
+    if join_auth.intent == Intent.PRESENT:
+        return _check_join_auths_present(join_auth, staging)
+    return _check_join_auths_removed(join_auth, staging)
+
+
+def _check_join_auths_removed(
+    join_auth: resources.JoinAuth, staging: Staging
+) -> None:
+    cids = set(ClusterEntry.ids(staging))
+    refs_in_use: Dict[str, List[str]] = {}
+    for cluster_id in cids:
+        cluster = staging.get_cluster(cluster_id)
+        for ref in auth_refs(cluster):
+            refs_in_use.setdefault(ref, []).append(cluster_id)
+    log.debug('refs_in_use: %r', refs_in_use)
+    if join_auth.auth_id in refs_in_use:
+        raise ErrorResult(
+            join_auth,
+            msg='join auth resource in use by clusters',
+            status={
+                'clusters': refs_in_use[join_auth.auth_id],
+            },
+        )
+
+
+def _check_join_auths_present(
+    join_auth: resources.JoinAuth, staging: Staging
+) -> None:
+    if join_auth.linked_to_cluster:
+        cids = set(ClusterEntry.ids(staging))
+        if join_auth.linked_to_cluster not in cids:
+            raise ErrorResult(
+                join_auth,
+                msg='linked_to_cluster id not valid',
+                status={
+                    'unknown_id': join_auth.linked_to_cluster,
+                },
+            )
+
+
+def _check_users_and_groups(
+    users_and_groups: resources.UsersAndGroups, staging: Staging
+) -> None:
+    """Check that the UsersAndGroups resource can be updated."""
+    if users_and_groups.intent == Intent.PRESENT:
+        return _check_users_and_groups_present(users_and_groups, staging)
+    return _check_users_and_groups_removed(users_and_groups, staging)
+
+
+def _check_users_and_groups_removed(
+    users_and_groups: resources.UsersAndGroups, staging: Staging
+) -> None:
+    refs_in_use: Dict[str, List[str]] = {}
+    cids = set(ClusterEntry.ids(staging))
+    for cluster_id in cids:
+        cluster = staging.get_cluster(cluster_id)
+        for ref in ug_refs(cluster):
+            refs_in_use.setdefault(ref, []).append(cluster_id)
+    log.debug('refs_in_use: %r', refs_in_use)
+    if users_and_groups.users_groups_id in refs_in_use:
+        raise ErrorResult(
+            users_and_groups,
+            msg='users and groups resource in use by clusters',
+            status={
+                'clusters': refs_in_use[users_and_groups.users_groups_id],
+            },
+        )
+
+
+def _check_users_and_groups_present(
+    users_and_groups: resources.UsersAndGroups, staging: Staging
+) -> None:
+    if users_and_groups.linked_to_cluster:
+        cids = set(ClusterEntry.ids(staging))
+        if users_and_groups.linked_to_cluster not in cids:
+            raise ErrorResult(
+                users_and_groups,
+                msg='linked_to_cluster id not valid',
+                status={
+                    'unknown_id': users_and_groups.linked_to_cluster,
+                },
+            )
+
+
+def _parse_earmark(earmark: str) -> dict:
+    parts = earmark.split('.')
+
+    # If it only has one part (e.g., 'smb'), return None for cluster_id
+    if len(parts) == 1:
+        return {'scope': parts[0], 'cluster_id': None}
+
+    return {
+        'scope': parts[0],
+        'cluster_id': parts[2] if len(parts) > 2 else None,
+    }