From: John Mulligan Date: Wed, 2 Jul 2025 17:44:39 +0000 (-0400) Subject: mgr/smb: move functions from handler.py to staging.py X-Git-Tag: v20.1.0~40^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c0b4a4c6edbd2009dc376dfc0badfb6dcc6394dc;p=ceph.git mgr/smb: move functions from handler.py to staging.py Create a new file staging.py to reduce the size of handler.py and organize this a bit more. The staging.py file will be responsible for the special staging area store as well as functions that work directly on the staging store like the cross-check of resources. This change is nearly a straight more except for renaming some functions from _foo to foo. Signed-off-by: John Mulligan (cherry picked from commit 978e92fb68a1717a2713017382bfb5fbe9a24e63) --- diff --git a/src/pybind/mgr/smb/handler.py b/src/pybind/mgr/smb/handler.py index 5e58a7b8eeb..593f7af99d5 100644 --- a/src/pybind/mgr/smb/handler.py +++ b/src/pybind/mgr/smb/handler.py @@ -14,22 +14,17 @@ from typing import ( import contextlib import logging -import operator import time from ceph.deployment.service_spec import SMBSpec -from ceph.fs.earmarking import EarmarkTopScope from . import config_store, external, resources from .enums import ( AuthMode, CephFSStorageProvider, - ConfigNS, - Intent, JoinSourceType, LoginAccess, LoginCategory, - SMBClustering, State, UserGroupSourceType, ) @@ -38,21 +33,19 @@ from .internal import ( JoinAuthEntry, ShareEntry, UsersAndGroupsEntry, - resource_entry, - resource_key, ) from .proto import ( AccessAuthorizer, ConfigEntry, ConfigStore, EarmarkResolver, - EntryKey, OrchSubmitter, PathResolver, Simplified, ) from .resources import SMBResource from .results import ErrorResult, Result, ResultGroup +from .staging import Staging, auth_refs, cross_check_resource, ug_refs from .utils import checked, ynbool ClusterRef = Union[resources.Cluster, resources.RemovedCluster] @@ -209,99 +202,6 @@ class _Matcher: ) -class _Staging: - def __init__(self, store: ConfigStore) -> None: - self.destination_store = store - self.incoming: Dict[EntryKey, SMBResource] = {} - self.deleted: Dict[EntryKey, SMBResource] = {} - self._store_keycache: Set[EntryKey] = set() - self._virt_keycache: Set[EntryKey] = set() - - def stage(self, resource: SMBResource) -> None: - self._virt_keycache = set() - ekey = resource_key(resource) - if resource.intent == Intent.REMOVED: - self.deleted[ekey] = resource - else: - self.deleted.pop(ekey, None) - self.incoming[ekey] = resource - - def _virtual_keys(self) -> Collection[EntryKey]: - if self._virt_keycache: - return self._virt_keycache - self._virt_keycache = set(self._store_keys()) - set( - self.deleted - ) | set(self.incoming) - return self._virt_keycache - - def _store_keys(self) -> Collection[EntryKey]: - if not self._store_keycache: - self._store_keycache = set(self.destination_store) - return self._store_keycache - - def __iter__(self) -> Iterator[EntryKey]: - return iter(self._virtual_keys()) - - def namespaces(self) -> Collection[str]: - return {k[0] for k in self} - - def contents(self, ns: str) -> Collection[str]: - return {kname for kns, kname in self if kns == ns} - - def is_new(self, resource: SMBResource) -> bool: - ekey = resource_key(resource) - return ekey not in self._store_keys() - - def get_cluster(self, cluster_id: str) -> resources.Cluster: - ekey = (str(ClusterEntry.namespace), cluster_id) - if ekey in self.incoming: - res = self.incoming[ekey] - assert isinstance(res, resources.Cluster) - return res - return ClusterEntry.from_store( - self.destination_store, cluster_id - ).get_cluster() - - def get_join_auth(self, auth_id: str) -> resources.JoinAuth: - ekey = (str(JoinAuthEntry.namespace), auth_id) - if ekey in self.incoming: - res = self.incoming[ekey] - assert isinstance(res, resources.JoinAuth) - return res - return JoinAuthEntry.from_store( - self.destination_store, auth_id - ).get_join_auth() - - def get_users_and_groups(self, ug_id: str) -> resources.UsersAndGroups: - ekey = (str(UsersAndGroupsEntry.namespace), ug_id) - if ekey in self.incoming: - res = self.incoming[ekey] - assert isinstance(res, resources.UsersAndGroups) - return res - return UsersAndGroupsEntry.from_store( - self.destination_store, ug_id - ).get_users_and_groups() - - def save(self) -> ResultGroup: - results = ResultGroup() - for res in self.deleted.values(): - results.append(self._save(res)) - for res in self.incoming.values(): - results.append(self._save(res)) - return results - - def _save(self, resource: SMBResource) -> Result: - entry = resource_entry(self.destination_store, resource) - if resource.intent == Intent.REMOVED: - removed = entry.remove() - state = State.REMOVED if removed else State.NOT_PRESENT - else: - state = entry.create_or_update(resource) - log.debug('saved resource: %r; state: %s', resource, state) - result = Result(resource, success=True, status={'state': state}) - return result - - class ClusterConfigHandler: """The central class for ingesting and handling smb configuration change requests. @@ -379,7 +279,7 @@ class ClusterConfigHandler: """ log.debug('applying changes to internal data store') results = ResultGroup() - staging = _Staging(self.internal_store) + staging = Staging(self.internal_store) try: incoming = order_resources(inputs) for resource in incoming: @@ -478,7 +378,7 @@ class ClusterConfigHandler: def _check( self, resource: SMBResource, - staging: _Staging, + staging: Staging, *, create_only: bool = False, ) -> Result: @@ -540,11 +440,11 @@ class ClusterConfigHandler: ], [ self._join_auth_entry(_id).get_join_auth() - for _id in _auth_refs(cluster) + for _id in auth_refs(cluster) ], [ self._users_and_groups_entry(_id).get_users_and_groups() - for _id in _ug_refs(cluster) + for _id in ug_refs(cluster) ], ) change_groups.append(change_group) @@ -776,356 +676,7 @@ def order_resources( return sorted(resource_objs, key=_keyfunc) -def cross_check_resource( - resource: SMBResource, - staging: _Staging, - *, - path_resolver: PathResolver, - earmark_resolver: EarmarkResolver, -) -> None: - if isinstance(resource, (resources.Cluster, resources.RemovedCluster)): - _check_cluster(resource, staging) - elif isinstance(resource, (resources.Share, resources.RemovedShare)): - _check_share( - resource, - staging, - path_resolver, - earmark_resolver, - ) - elif isinstance(resource, resources.JoinAuth): - _check_join_auths(resource, staging) - elif isinstance(resource, resources.UsersAndGroups): - _check_users_and_groups(resource, staging) - else: - raise TypeError('not a valid smb resource') - - -def _check_cluster(cluster: ClusterRef, staging: _Staging) -> None: - """Check that the cluster resource can be updated.""" - if cluster.intent == Intent.PRESENT: - return _check_cluster_present(cluster, staging) - return _check_cluster_removed(cluster, staging) - - -def _check_cluster_removed(cluster: ClusterRef, staging: _Staging) -> None: - share_ids = ShareEntry.ids(staging) - clusters_used = {cid for cid, _ in share_ids} - if cluster.cluster_id in clusters_used: - raise ErrorResult( - cluster, - msg="cluster in use by shares", - status={ - 'shares': [ - shid - for cid, shid in share_ids - if cid == cluster.cluster_id - ] - }, - ) - - -def _check_cluster_present(cluster: ClusterRef, staging: _Staging) -> None: - assert isinstance(cluster, resources.Cluster) - cluster.validate() - if not staging.is_new(cluster): - _check_cluster_modifications(cluster, staging) - for auth_ref in _auth_refs(cluster): - auth = staging.get_join_auth(auth_ref) - if ( - auth.linked_to_cluster - and auth.linked_to_cluster != cluster.cluster_id - ): - raise ErrorResult( - cluster, - msg="join auth linked to different cluster", - status={ - 'other_cluster_id': auth.linked_to_cluster, - }, - ) - for ug_ref in _ug_refs(cluster): - ug = staging.get_users_and_groups(ug_ref) - if ( - ug.linked_to_cluster - and ug.linked_to_cluster != cluster.cluster_id - ): - raise ErrorResult( - cluster, - msg="users and groups linked to different cluster", - status={ - 'other_cluster_id': ug.linked_to_cluster, - }, - ) - - -def _check_cluster_modifications( - cluster: resources.Cluster, staging: _Staging -) -> None: - """cluster has some fields we do not permit changing after the cluster has - been created. - """ - prev = ClusterEntry.from_store( - staging.destination_store, cluster.cluster_id - ).get_cluster() - if cluster.auth_mode != prev.auth_mode: - raise ErrorResult( - cluster, - 'auth_mode value may not be changed', - status={'existing_auth_mode': prev.auth_mode}, - ) - if cluster.auth_mode == AuthMode.ACTIVE_DIRECTORY: - assert prev.domain_settings - if not cluster.domain_settings: - # should not occur - raise ErrorResult(cluster, "domain settings missing from cluster") - if cluster.domain_settings.realm != prev.domain_settings.realm: - raise ErrorResult( - cluster, - 'domain/realm value may not be changed', - status={'existing_domain_realm': prev.domain_settings.realm}, - ) - if cluster.is_clustered() != prev.is_clustered(): - prev_clustering = prev.is_clustered() - cterms = {True: 'enabled', False: 'disabled'} - msg = ( - f'a cluster resource with clustering {cterms[prev_clustering]}' - f' may not be changed to clustering {cterms[not prev_clustering]}' - ) - opt_terms = { - True: SMBClustering.ALWAYS.value, - False: SMBClustering.NEVER.value, - } - hint = { - 'note': ( - 'Set "clustering" to an explicit value that matches the' - ' current clustering behavior' - ), - 'value': opt_terms[prev_clustering], - } - raise ErrorResult(cluster, msg, status={'hint': hint}) - - -def _parse_earmark(earmark: str) -> dict: - parts = earmark.split('.') - - # If it only has one part (e.g., 'smb'), return None for cluster_id - if len(parts) == 1: - return {'scope': parts[0], 'cluster_id': None} - - return { - 'scope': parts[0], - 'cluster_id': parts[2] if len(parts) > 2 else None, - } - - -def _check_share( - share: ShareRef, - staging: _Staging, - resolver: PathResolver, - earmark_resolver: EarmarkResolver, -) -> None: - """Check that the share resource can be updated.""" - if share.intent == Intent.REMOVED: - return - assert isinstance(share, resources.Share) - share.validate() - if share.cluster_id not in ClusterEntry.ids(staging): - raise ErrorResult( - share, - msg="no matching cluster id", - status={"cluster_id": share.cluster_id}, - ) - assert share.cephfs is not None - try: - volpath = resolver.resolve_exists( - share.cephfs.volume, - share.cephfs.subvolumegroup, - share.cephfs.subvolume, - share.cephfs.path, - ) - except (FileNotFoundError, NotADirectoryError): - raise ErrorResult( - share, msg="path is not a valid directory in volume" - ) - if earmark_resolver: - earmark = earmark_resolver.get_earmark( - volpath, - share.cephfs.volume, - ) - if not earmark: - smb_earmark = ( - f"{EarmarkTopScope.SMB.value}.cluster.{share.cluster_id}" - ) - earmark_resolver.set_earmark( - volpath, - share.cephfs.volume, - smb_earmark, - ) - else: - parsed_earmark = _parse_earmark(earmark) - - # Check if the top-level scope is not SMB - if not earmark_resolver.check_earmark( - earmark, EarmarkTopScope.SMB - ): - raise ErrorResult( - share, - msg=f"earmark has already been set by {parsed_earmark['scope']}", - ) - - # Check if the earmark is set by a different cluster - if ( - parsed_earmark['cluster_id'] - and parsed_earmark['cluster_id'] != share.cluster_id - ): - raise ErrorResult( - share, - msg="earmark has already been set by smb cluster " - f"{parsed_earmark['cluster_id']}", - ) - - name_used_by = _share_name_in_use(staging, share) - if name_used_by: - raise ErrorResult( - share, - msg="share name already in use", - status={"conflicting_share_id": name_used_by}, - ) - - -def _share_name_in_use( - staging: _Staging, share: resources.Share -) -> Optional[str]: - """Returns the share_id value if the share's name is already in - use by a different share in the cluster. Returns None if no other - shares are using the name. - """ - share_ids = (share.cluster_id, share.share_id) - share_ns = str(ConfigNS.SHARES) - # look for any duplicate names in the staging area. - # these items are already in memory - for ekey, res in staging.incoming.items(): - if ekey[0] != share_ns: - continue # not a share - assert isinstance(res, resources.Share) - if (res.cluster_id, res.share_id) == share_ids: - continue # this share - if (res.cluster_id, res.name) == (share.cluster_id, share.name): - return res.share_id - # look for any duplicate names in the underyling store - found = config_store.find_in_store( - staging.destination_store, - share_ns, - {'cluster_id': share.cluster_id, 'name': share.name}, - ) - # remove any shares that are deleted in staging - found_curr = [ - entry for entry in found if entry.full_key not in staging.deleted - ] - # remove self-share from list - id_pair = operator.itemgetter('cluster_id', 'share_id') - found_curr = [ - entry for entry in found_curr if id_pair(entry.get()) != share_ids - ] - if not found_curr: - return None - if len(found_curr) != 1: - # this should not normally happen - log.warning( - 'multiple shares with one name in cluster: %s', - ' '.join(s.get()['share_id'] for s in found_curr), - ) - return found_curr[0].get()['share_id'] - - -def _check_join_auths( - join_auth: resources.JoinAuth, staging: _Staging -) -> None: - """Check that the JoinAuth resource can be updated.""" - if join_auth.intent == Intent.PRESENT: - return _check_join_auths_present(join_auth, staging) - return _check_join_auths_removed(join_auth, staging) - - -def _check_join_auths_removed( - join_auth: resources.JoinAuth, staging: _Staging -) -> None: - cids = set(ClusterEntry.ids(staging)) - refs_in_use: Dict[str, List[str]] = {} - for cluster_id in cids: - cluster = staging.get_cluster(cluster_id) - for ref in _auth_refs(cluster): - refs_in_use.setdefault(ref, []).append(cluster_id) - log.debug('refs_in_use: %r', refs_in_use) - if join_auth.auth_id in refs_in_use: - raise ErrorResult( - join_auth, - msg='join auth resource in use by clusters', - status={ - 'clusters': refs_in_use[join_auth.auth_id], - }, - ) - - -def _check_join_auths_present( - join_auth: resources.JoinAuth, staging: _Staging -) -> None: - if join_auth.linked_to_cluster: - cids = set(ClusterEntry.ids(staging)) - if join_auth.linked_to_cluster not in cids: - raise ErrorResult( - join_auth, - msg='linked_to_cluster id not valid', - status={ - 'unknown_id': join_auth.linked_to_cluster, - }, - ) - - -def _check_users_and_groups( - users_and_groups: resources.UsersAndGroups, staging: _Staging -) -> None: - """Check that the UsersAndGroups resource can be updated.""" - if users_and_groups.intent == Intent.PRESENT: - return _check_users_and_groups_present(users_and_groups, staging) - return _check_users_and_groups_removed(users_and_groups, staging) - - -def _check_users_and_groups_removed( - users_and_groups: resources.UsersAndGroups, staging: _Staging -) -> None: - refs_in_use: Dict[str, List[str]] = {} - cids = set(ClusterEntry.ids(staging)) - for cluster_id in cids: - cluster = staging.get_cluster(cluster_id) - for ref in _ug_refs(cluster): - refs_in_use.setdefault(ref, []).append(cluster_id) - log.debug('refs_in_use: %r', refs_in_use) - if users_and_groups.users_groups_id in refs_in_use: - raise ErrorResult( - users_and_groups, - msg='users and groups resource in use by clusters', - status={ - 'clusters': refs_in_use[users_and_groups.users_groups_id], - }, - ) - - -def _check_users_and_groups_present( - users_and_groups: resources.UsersAndGroups, staging: _Staging -) -> None: - if users_and_groups.linked_to_cluster: - cids = set(ClusterEntry.ids(staging)) - if users_and_groups.linked_to_cluster not in cids: - raise ErrorResult( - users_and_groups, - msg='linked_to_cluster id not valid', - status={ - 'unknown_id': users_and_groups.linked_to_cluster, - }, - ) - - -def _prune_linked_entries(staging: _Staging) -> None: +def _prune_linked_entries(staging: Staging) -> None: cids = set(ClusterEntry.ids(staging)) for auth_id in JoinAuthEntry.ids(staging): join_auth = staging.get_join_auth(auth_id) @@ -1144,29 +695,6 @@ def _prune_linked_entries(staging: _Staging) -> None: ).remove() -def _auth_refs(cluster: resources.Cluster) -> Collection[str]: - if cluster.auth_mode != AuthMode.ACTIVE_DIRECTORY: - return set() - return { - j.ref - for j in checked(cluster.domain_settings).join_sources - if j.source_type == JoinSourceType.RESOURCE and j.ref - } - - -def _ug_refs(cluster: resources.Cluster) -> Collection[str]: - if ( - cluster.auth_mode != AuthMode.USER - or cluster.user_group_settings is None - ): - return set() - return { - ug.ref - for ug in cluster.user_group_settings - if ug.source_type == UserGroupSourceType.RESOURCE and ug.ref - } - - def _generate_share( share: resources.Share, resolver: PathResolver, cephx_entity: str ) -> Dict[str, Dict[str, str]]: diff --git a/src/pybind/mgr/smb/staging.py b/src/pybind/mgr/smb/staging.py new file mode 100644 index 00000000000..9c9e11d84e4 --- /dev/null +++ b/src/pybind/mgr/smb/staging.py @@ -0,0 +1,512 @@ +from typing import ( + Collection, + Dict, + Iterator, + List, + Optional, + Set, + Union, +) + +import logging +import operator + +from ceph.fs.earmarking import EarmarkTopScope + +from . import config_store, resources +from .enums import ( + AuthMode, + ConfigNS, + Intent, + JoinSourceType, + SMBClustering, + State, + UserGroupSourceType, +) +from .internal import ( + ClusterEntry, + JoinAuthEntry, + ShareEntry, + UsersAndGroupsEntry, + resource_entry, + resource_key, +) +from .proto import ( + ConfigStore, + EarmarkResolver, + EntryKey, + PathResolver, +) +from .resources import SMBResource +from .results import ErrorResult, Result, ResultGroup +from .utils import checked + +ClusterRef = Union[resources.Cluster, resources.RemovedCluster] +ShareRef = Union[resources.Share, resources.RemovedShare] + +log = logging.getLogger(__name__) + + +class Staging: + def __init__(self, store: ConfigStore) -> None: + self.destination_store = store + self.incoming: Dict[EntryKey, SMBResource] = {} + self.deleted: Dict[EntryKey, SMBResource] = {} + self._store_keycache: Set[EntryKey] = set() + self._virt_keycache: Set[EntryKey] = set() + + def stage(self, resource: SMBResource) -> None: + self._virt_keycache = set() + ekey = resource_key(resource) + if resource.intent == Intent.REMOVED: + self.deleted[ekey] = resource + else: + self.deleted.pop(ekey, None) + self.incoming[ekey] = resource + + def _virtual_keys(self) -> Collection[EntryKey]: + if self._virt_keycache: + return self._virt_keycache + self._virt_keycache = set(self._store_keys()) - set( + self.deleted + ) | set(self.incoming) + return self._virt_keycache + + def _store_keys(self) -> Collection[EntryKey]: + if not self._store_keycache: + self._store_keycache = set(self.destination_store) + return self._store_keycache + + def __iter__(self) -> Iterator[EntryKey]: + return iter(self._virtual_keys()) + + def namespaces(self) -> Collection[str]: + return {k[0] for k in self} + + def contents(self, ns: str) -> Collection[str]: + return {kname for kns, kname in self if kns == ns} + + def is_new(self, resource: SMBResource) -> bool: + ekey = resource_key(resource) + return ekey not in self._store_keys() + + def get_cluster(self, cluster_id: str) -> resources.Cluster: + ekey = (str(ClusterEntry.namespace), cluster_id) + if ekey in self.incoming: + res = self.incoming[ekey] + assert isinstance(res, resources.Cluster) + return res + return ClusterEntry.from_store( + self.destination_store, cluster_id + ).get_cluster() + + def get_join_auth(self, auth_id: str) -> resources.JoinAuth: + ekey = (str(JoinAuthEntry.namespace), auth_id) + if ekey in self.incoming: + res = self.incoming[ekey] + assert isinstance(res, resources.JoinAuth) + return res + return JoinAuthEntry.from_store( + self.destination_store, auth_id + ).get_join_auth() + + def get_users_and_groups(self, ug_id: str) -> resources.UsersAndGroups: + ekey = (str(UsersAndGroupsEntry.namespace), ug_id) + if ekey in self.incoming: + res = self.incoming[ekey] + assert isinstance(res, resources.UsersAndGroups) + return res + return UsersAndGroupsEntry.from_store( + self.destination_store, ug_id + ).get_users_and_groups() + + def save(self) -> ResultGroup: + results = ResultGroup() + for res in self.deleted.values(): + results.append(self._save(res)) + for res in self.incoming.values(): + results.append(self._save(res)) + return results + + def _save(self, resource: SMBResource) -> Result: + entry = resource_entry(self.destination_store, resource) + if resource.intent == Intent.REMOVED: + removed = entry.remove() + state = State.REMOVED if removed else State.NOT_PRESENT + else: + state = entry.create_or_update(resource) + log.debug('saved resource: %r; state: %s', resource, state) + result = Result(resource, success=True, status={'state': state}) + return result + + +def auth_refs(cluster: resources.Cluster) -> Collection[str]: + if cluster.auth_mode != AuthMode.ACTIVE_DIRECTORY: + return set() + return { + j.ref + for j in checked(cluster.domain_settings).join_sources + if j.source_type == JoinSourceType.RESOURCE and j.ref + } + + +def ug_refs(cluster: resources.Cluster) -> Collection[str]: + if ( + cluster.auth_mode != AuthMode.USER + or cluster.user_group_settings is None + ): + return set() + return { + ug.ref + for ug in cluster.user_group_settings + if ug.source_type == UserGroupSourceType.RESOURCE and ug.ref + } + + +def cross_check_resource( + resource: SMBResource, + staging: Staging, + *, + path_resolver: PathResolver, + earmark_resolver: EarmarkResolver, +) -> None: + if isinstance(resource, (resources.Cluster, resources.RemovedCluster)): + _check_cluster(resource, staging) + elif isinstance(resource, (resources.Share, resources.RemovedShare)): + _check_share( + resource, + staging, + path_resolver, + earmark_resolver, + ) + elif isinstance(resource, resources.JoinAuth): + _check_join_auths(resource, staging) + elif isinstance(resource, resources.UsersAndGroups): + _check_users_and_groups(resource, staging) + else: + raise TypeError('not a valid smb resource') + + +def _check_cluster(cluster: ClusterRef, staging: Staging) -> None: + """Check that the cluster resource can be updated.""" + if cluster.intent == Intent.PRESENT: + return _check_cluster_present(cluster, staging) + return _check_cluster_removed(cluster, staging) + + +def _check_cluster_removed(cluster: ClusterRef, staging: Staging) -> None: + share_ids = ShareEntry.ids(staging) + clusters_used = {cid for cid, _ in share_ids} + if cluster.cluster_id in clusters_used: + raise ErrorResult( + cluster, + msg="cluster in use by shares", + status={ + 'shares': [ + shid + for cid, shid in share_ids + if cid == cluster.cluster_id + ] + }, + ) + + +def _check_cluster_present(cluster: ClusterRef, staging: Staging) -> None: + assert isinstance(cluster, resources.Cluster) + cluster.validate() + if not staging.is_new(cluster): + _check_cluster_modifications(cluster, staging) + for auth_ref in auth_refs(cluster): + auth = staging.get_join_auth(auth_ref) + if ( + auth.linked_to_cluster + and auth.linked_to_cluster != cluster.cluster_id + ): + raise ErrorResult( + cluster, + msg="join auth linked to different cluster", + status={ + 'other_cluster_id': auth.linked_to_cluster, + }, + ) + for ug_ref in ug_refs(cluster): + ug = staging.get_users_and_groups(ug_ref) + if ( + ug.linked_to_cluster + and ug.linked_to_cluster != cluster.cluster_id + ): + raise ErrorResult( + cluster, + msg="users and groups linked to different cluster", + status={ + 'other_cluster_id': ug.linked_to_cluster, + }, + ) + + +def _check_cluster_modifications( + cluster: resources.Cluster, staging: Staging +) -> None: + """cluster has some fields we do not permit changing after the cluster has + been created. + """ + prev = ClusterEntry.from_store( + staging.destination_store, cluster.cluster_id + ).get_cluster() + if cluster.auth_mode != prev.auth_mode: + raise ErrorResult( + cluster, + 'auth_mode value may not be changed', + status={'existing_auth_mode': prev.auth_mode}, + ) + if cluster.auth_mode == AuthMode.ACTIVE_DIRECTORY: + assert prev.domain_settings + if not cluster.domain_settings: + # should not occur + raise ErrorResult(cluster, "domain settings missing from cluster") + if cluster.domain_settings.realm != prev.domain_settings.realm: + raise ErrorResult( + cluster, + 'domain/realm value may not be changed', + status={'existing_domain_realm': prev.domain_settings.realm}, + ) + if cluster.is_clustered() != prev.is_clustered(): + prev_clustering = prev.is_clustered() + cterms = {True: 'enabled', False: 'disabled'} + msg = ( + f'a cluster resource with clustering {cterms[prev_clustering]}' + f' may not be changed to clustering {cterms[not prev_clustering]}' + ) + opt_terms = { + True: SMBClustering.ALWAYS.value, + False: SMBClustering.NEVER.value, + } + hint = { + 'note': ( + 'Set "clustering" to an explicit value that matches the' + ' current clustering behavior' + ), + 'value': opt_terms[prev_clustering], + } + raise ErrorResult(cluster, msg, status={'hint': hint}) + + +def _check_share( + share: ShareRef, + staging: Staging, + resolver: PathResolver, + earmark_resolver: EarmarkResolver, +) -> None: + """Check that the share resource can be updated.""" + if share.intent == Intent.REMOVED: + return + assert isinstance(share, resources.Share) + share.validate() + if share.cluster_id not in ClusterEntry.ids(staging): + raise ErrorResult( + share, + msg="no matching cluster id", + status={"cluster_id": share.cluster_id}, + ) + assert share.cephfs is not None + try: + volpath = resolver.resolve_exists( + share.cephfs.volume, + share.cephfs.subvolumegroup, + share.cephfs.subvolume, + share.cephfs.path, + ) + except (FileNotFoundError, NotADirectoryError): + raise ErrorResult( + share, msg="path is not a valid directory in volume" + ) + if earmark_resolver: + earmark = earmark_resolver.get_earmark( + volpath, + share.cephfs.volume, + ) + if not earmark: + smb_earmark = ( + f"{EarmarkTopScope.SMB.value}.cluster.{share.cluster_id}" + ) + earmark_resolver.set_earmark( + volpath, + share.cephfs.volume, + smb_earmark, + ) + else: + parsed_earmark = _parse_earmark(earmark) + + # Check if the top-level scope is not SMB + if not earmark_resolver.check_earmark( + earmark, EarmarkTopScope.SMB + ): + raise ErrorResult( + share, + msg=f"earmark has already been set by {parsed_earmark['scope']}", + ) + + # Check if the earmark is set by a different cluster + if ( + parsed_earmark['cluster_id'] + and parsed_earmark['cluster_id'] != share.cluster_id + ): + raise ErrorResult( + share, + msg="earmark has already been set by smb cluster " + f"{parsed_earmark['cluster_id']}", + ) + + name_used_by = _share_name_in_use(staging, share) + if name_used_by: + raise ErrorResult( + share, + msg="share name already in use", + status={"conflicting_share_id": name_used_by}, + ) + + +def _share_name_in_use( + staging: Staging, share: resources.Share +) -> Optional[str]: + """Returns the share_id value if the share's name is already in + use by a different share in the cluster. Returns None if no other + shares are using the name. + """ + share_ids = (share.cluster_id, share.share_id) + share_ns = str(ConfigNS.SHARES) + # look for any duplicate names in the staging area. + # these items are already in memory + for ekey, res in staging.incoming.items(): + if ekey[0] != share_ns: + continue # not a share + assert isinstance(res, resources.Share) + if (res.cluster_id, res.share_id) == share_ids: + continue # this share + if (res.cluster_id, res.name) == (share.cluster_id, share.name): + return res.share_id + # look for any duplicate names in the underyling store + found = config_store.find_in_store( + staging.destination_store, + share_ns, + {'cluster_id': share.cluster_id, 'name': share.name}, + ) + # remove any shares that are deleted in staging + found_curr = [ + entry for entry in found if entry.full_key not in staging.deleted + ] + # remove self-share from list + id_pair = operator.itemgetter('cluster_id', 'share_id') + found_curr = [ + entry for entry in found_curr if id_pair(entry.get()) != share_ids + ] + if not found_curr: + return None + if len(found_curr) != 1: + # this should not normally happen + log.warning( + 'multiple shares with one name in cluster: %s', + ' '.join(s.get()['share_id'] for s in found_curr), + ) + return found_curr[0].get()['share_id'] + + +def _check_join_auths( + join_auth: resources.JoinAuth, staging: Staging +) -> None: + """Check that the JoinAuth resource can be updated.""" + if join_auth.intent == Intent.PRESENT: + return _check_join_auths_present(join_auth, staging) + return _check_join_auths_removed(join_auth, staging) + + +def _check_join_auths_removed( + join_auth: resources.JoinAuth, staging: Staging +) -> None: + cids = set(ClusterEntry.ids(staging)) + refs_in_use: Dict[str, List[str]] = {} + for cluster_id in cids: + cluster = staging.get_cluster(cluster_id) + for ref in auth_refs(cluster): + refs_in_use.setdefault(ref, []).append(cluster_id) + log.debug('refs_in_use: %r', refs_in_use) + if join_auth.auth_id in refs_in_use: + raise ErrorResult( + join_auth, + msg='join auth resource in use by clusters', + status={ + 'clusters': refs_in_use[join_auth.auth_id], + }, + ) + + +def _check_join_auths_present( + join_auth: resources.JoinAuth, staging: Staging +) -> None: + if join_auth.linked_to_cluster: + cids = set(ClusterEntry.ids(staging)) + if join_auth.linked_to_cluster not in cids: + raise ErrorResult( + join_auth, + msg='linked_to_cluster id not valid', + status={ + 'unknown_id': join_auth.linked_to_cluster, + }, + ) + + +def _check_users_and_groups( + users_and_groups: resources.UsersAndGroups, staging: Staging +) -> None: + """Check that the UsersAndGroups resource can be updated.""" + if users_and_groups.intent == Intent.PRESENT: + return _check_users_and_groups_present(users_and_groups, staging) + return _check_users_and_groups_removed(users_and_groups, staging) + + +def _check_users_and_groups_removed( + users_and_groups: resources.UsersAndGroups, staging: Staging +) -> None: + refs_in_use: Dict[str, List[str]] = {} + cids = set(ClusterEntry.ids(staging)) + for cluster_id in cids: + cluster = staging.get_cluster(cluster_id) + for ref in ug_refs(cluster): + refs_in_use.setdefault(ref, []).append(cluster_id) + log.debug('refs_in_use: %r', refs_in_use) + if users_and_groups.users_groups_id in refs_in_use: + raise ErrorResult( + users_and_groups, + msg='users and groups resource in use by clusters', + status={ + 'clusters': refs_in_use[users_and_groups.users_groups_id], + }, + ) + + +def _check_users_and_groups_present( + users_and_groups: resources.UsersAndGroups, staging: Staging +) -> None: + if users_and_groups.linked_to_cluster: + cids = set(ClusterEntry.ids(staging)) + if users_and_groups.linked_to_cluster not in cids: + raise ErrorResult( + users_and_groups, + msg='linked_to_cluster id not valid', + status={ + 'unknown_id': users_and_groups.linked_to_cluster, + }, + ) + + +def _parse_earmark(earmark: str) -> dict: + parts = earmark.split('.') + + # If it only has one part (e.g., 'smb'), return None for cluster_id + if len(parts) == 1: + return {'scope': parts[0], 'cluster_id': None} + + return { + 'scope': parts[0], + 'cluster_id': parts[2] if len(parts) > 2 else None, + }