import contextlib
import logging
-import operator
import time
from ceph.deployment.service_spec import SMBSpec
-from ceph.fs.earmarking import EarmarkTopScope
from . import config_store, external, resources
from .enums import (
AuthMode,
CephFSStorageProvider,
- ConfigNS,
- Intent,
JoinSourceType,
LoginAccess,
LoginCategory,
- SMBClustering,
State,
UserGroupSourceType,
)
JoinAuthEntry,
ShareEntry,
UsersAndGroupsEntry,
- resource_entry,
- resource_key,
)
from .proto import (
AccessAuthorizer,
ConfigEntry,
ConfigStore,
EarmarkResolver,
- EntryKey,
OrchSubmitter,
PathResolver,
Simplified,
)
from .resources import SMBResource
from .results import ErrorResult, Result, ResultGroup
+from .staging import Staging, auth_refs, cross_check_resource, ug_refs
from .utils import checked, ynbool
ClusterRef = Union[resources.Cluster, resources.RemovedCluster]
)
-class _Staging:
- def __init__(self, store: ConfigStore) -> None:
- self.destination_store = store
- self.incoming: Dict[EntryKey, SMBResource] = {}
- self.deleted: Dict[EntryKey, SMBResource] = {}
- self._store_keycache: Set[EntryKey] = set()
- self._virt_keycache: Set[EntryKey] = set()
-
- def stage(self, resource: SMBResource) -> None:
- self._virt_keycache = set()
- ekey = resource_key(resource)
- if resource.intent == Intent.REMOVED:
- self.deleted[ekey] = resource
- else:
- self.deleted.pop(ekey, None)
- self.incoming[ekey] = resource
-
- def _virtual_keys(self) -> Collection[EntryKey]:
- if self._virt_keycache:
- return self._virt_keycache
- self._virt_keycache = set(self._store_keys()) - set(
- self.deleted
- ) | set(self.incoming)
- return self._virt_keycache
-
- def _store_keys(self) -> Collection[EntryKey]:
- if not self._store_keycache:
- self._store_keycache = set(self.destination_store)
- return self._store_keycache
-
- def __iter__(self) -> Iterator[EntryKey]:
- return iter(self._virtual_keys())
-
- def namespaces(self) -> Collection[str]:
- return {k[0] for k in self}
-
- def contents(self, ns: str) -> Collection[str]:
- return {kname for kns, kname in self if kns == ns}
-
- def is_new(self, resource: SMBResource) -> bool:
- ekey = resource_key(resource)
- return ekey not in self._store_keys()
-
- def get_cluster(self, cluster_id: str) -> resources.Cluster:
- ekey = (str(ClusterEntry.namespace), cluster_id)
- if ekey in self.incoming:
- res = self.incoming[ekey]
- assert isinstance(res, resources.Cluster)
- return res
- return ClusterEntry.from_store(
- self.destination_store, cluster_id
- ).get_cluster()
-
- def get_join_auth(self, auth_id: str) -> resources.JoinAuth:
- ekey = (str(JoinAuthEntry.namespace), auth_id)
- if ekey in self.incoming:
- res = self.incoming[ekey]
- assert isinstance(res, resources.JoinAuth)
- return res
- return JoinAuthEntry.from_store(
- self.destination_store, auth_id
- ).get_join_auth()
-
- def get_users_and_groups(self, ug_id: str) -> resources.UsersAndGroups:
- ekey = (str(UsersAndGroupsEntry.namespace), ug_id)
- if ekey in self.incoming:
- res = self.incoming[ekey]
- assert isinstance(res, resources.UsersAndGroups)
- return res
- return UsersAndGroupsEntry.from_store(
- self.destination_store, ug_id
- ).get_users_and_groups()
-
- def save(self) -> ResultGroup:
- results = ResultGroup()
- for res in self.deleted.values():
- results.append(self._save(res))
- for res in self.incoming.values():
- results.append(self._save(res))
- return results
-
- def _save(self, resource: SMBResource) -> Result:
- entry = resource_entry(self.destination_store, resource)
- if resource.intent == Intent.REMOVED:
- removed = entry.remove()
- state = State.REMOVED if removed else State.NOT_PRESENT
- else:
- state = entry.create_or_update(resource)
- log.debug('saved resource: %r; state: %s', resource, state)
- result = Result(resource, success=True, status={'state': state})
- return result
-
-
class ClusterConfigHandler:
"""The central class for ingesting and handling smb configuration change
requests.
"""
log.debug('applying changes to internal data store')
results = ResultGroup()
- staging = _Staging(self.internal_store)
+ staging = Staging(self.internal_store)
try:
incoming = order_resources(inputs)
for resource in incoming:
def _check(
self,
resource: SMBResource,
- staging: _Staging,
+ staging: Staging,
*,
create_only: bool = False,
) -> Result:
],
[
self._join_auth_entry(_id).get_join_auth()
- for _id in _auth_refs(cluster)
+ for _id in auth_refs(cluster)
],
[
self._users_and_groups_entry(_id).get_users_and_groups()
- for _id in _ug_refs(cluster)
+ for _id in ug_refs(cluster)
],
)
change_groups.append(change_group)
return sorted(resource_objs, key=_keyfunc)
-def cross_check_resource(
- resource: SMBResource,
- staging: _Staging,
- *,
- path_resolver: PathResolver,
- earmark_resolver: EarmarkResolver,
-) -> None:
- if isinstance(resource, (resources.Cluster, resources.RemovedCluster)):
- _check_cluster(resource, staging)
- elif isinstance(resource, (resources.Share, resources.RemovedShare)):
- _check_share(
- resource,
- staging,
- path_resolver,
- earmark_resolver,
- )
- elif isinstance(resource, resources.JoinAuth):
- _check_join_auths(resource, staging)
- elif isinstance(resource, resources.UsersAndGroups):
- _check_users_and_groups(resource, staging)
- else:
- raise TypeError('not a valid smb resource')
-
-
-def _check_cluster(cluster: ClusterRef, staging: _Staging) -> None:
- """Check that the cluster resource can be updated."""
- if cluster.intent == Intent.PRESENT:
- return _check_cluster_present(cluster, staging)
- return _check_cluster_removed(cluster, staging)
-
-
-def _check_cluster_removed(cluster: ClusterRef, staging: _Staging) -> None:
- share_ids = ShareEntry.ids(staging)
- clusters_used = {cid for cid, _ in share_ids}
- if cluster.cluster_id in clusters_used:
- raise ErrorResult(
- cluster,
- msg="cluster in use by shares",
- status={
- 'shares': [
- shid
- for cid, shid in share_ids
- if cid == cluster.cluster_id
- ]
- },
- )
-
-
-def _check_cluster_present(cluster: ClusterRef, staging: _Staging) -> None:
- assert isinstance(cluster, resources.Cluster)
- cluster.validate()
- if not staging.is_new(cluster):
- _check_cluster_modifications(cluster, staging)
- for auth_ref in _auth_refs(cluster):
- auth = staging.get_join_auth(auth_ref)
- if (
- auth.linked_to_cluster
- and auth.linked_to_cluster != cluster.cluster_id
- ):
- raise ErrorResult(
- cluster,
- msg="join auth linked to different cluster",
- status={
- 'other_cluster_id': auth.linked_to_cluster,
- },
- )
- for ug_ref in _ug_refs(cluster):
- ug = staging.get_users_and_groups(ug_ref)
- if (
- ug.linked_to_cluster
- and ug.linked_to_cluster != cluster.cluster_id
- ):
- raise ErrorResult(
- cluster,
- msg="users and groups linked to different cluster",
- status={
- 'other_cluster_id': ug.linked_to_cluster,
- },
- )
-
-
-def _check_cluster_modifications(
- cluster: resources.Cluster, staging: _Staging
-) -> None:
- """cluster has some fields we do not permit changing after the cluster has
- been created.
- """
- prev = ClusterEntry.from_store(
- staging.destination_store, cluster.cluster_id
- ).get_cluster()
- if cluster.auth_mode != prev.auth_mode:
- raise ErrorResult(
- cluster,
- 'auth_mode value may not be changed',
- status={'existing_auth_mode': prev.auth_mode},
- )
- if cluster.auth_mode == AuthMode.ACTIVE_DIRECTORY:
- assert prev.domain_settings
- if not cluster.domain_settings:
- # should not occur
- raise ErrorResult(cluster, "domain settings missing from cluster")
- if cluster.domain_settings.realm != prev.domain_settings.realm:
- raise ErrorResult(
- cluster,
- 'domain/realm value may not be changed',
- status={'existing_domain_realm': prev.domain_settings.realm},
- )
- if cluster.is_clustered() != prev.is_clustered():
- prev_clustering = prev.is_clustered()
- cterms = {True: 'enabled', False: 'disabled'}
- msg = (
- f'a cluster resource with clustering {cterms[prev_clustering]}'
- f' may not be changed to clustering {cterms[not prev_clustering]}'
- )
- opt_terms = {
- True: SMBClustering.ALWAYS.value,
- False: SMBClustering.NEVER.value,
- }
- hint = {
- 'note': (
- 'Set "clustering" to an explicit value that matches the'
- ' current clustering behavior'
- ),
- 'value': opt_terms[prev_clustering],
- }
- raise ErrorResult(cluster, msg, status={'hint': hint})
-
-
-def _parse_earmark(earmark: str) -> dict:
- parts = earmark.split('.')
-
- # If it only has one part (e.g., 'smb'), return None for cluster_id
- if len(parts) == 1:
- return {'scope': parts[0], 'cluster_id': None}
-
- return {
- 'scope': parts[0],
- 'cluster_id': parts[2] if len(parts) > 2 else None,
- }
-
-
-def _check_share(
- share: ShareRef,
- staging: _Staging,
- resolver: PathResolver,
- earmark_resolver: EarmarkResolver,
-) -> None:
- """Check that the share resource can be updated."""
- if share.intent == Intent.REMOVED:
- return
- assert isinstance(share, resources.Share)
- share.validate()
- if share.cluster_id not in ClusterEntry.ids(staging):
- raise ErrorResult(
- share,
- msg="no matching cluster id",
- status={"cluster_id": share.cluster_id},
- )
- assert share.cephfs is not None
- try:
- volpath = resolver.resolve_exists(
- share.cephfs.volume,
- share.cephfs.subvolumegroup,
- share.cephfs.subvolume,
- share.cephfs.path,
- )
- except (FileNotFoundError, NotADirectoryError):
- raise ErrorResult(
- share, msg="path is not a valid directory in volume"
- )
- if earmark_resolver:
- earmark = earmark_resolver.get_earmark(
- volpath,
- share.cephfs.volume,
- )
- if not earmark:
- smb_earmark = (
- f"{EarmarkTopScope.SMB.value}.cluster.{share.cluster_id}"
- )
- earmark_resolver.set_earmark(
- volpath,
- share.cephfs.volume,
- smb_earmark,
- )
- else:
- parsed_earmark = _parse_earmark(earmark)
-
- # Check if the top-level scope is not SMB
- if not earmark_resolver.check_earmark(
- earmark, EarmarkTopScope.SMB
- ):
- raise ErrorResult(
- share,
- msg=f"earmark has already been set by {parsed_earmark['scope']}",
- )
-
- # Check if the earmark is set by a different cluster
- if (
- parsed_earmark['cluster_id']
- and parsed_earmark['cluster_id'] != share.cluster_id
- ):
- raise ErrorResult(
- share,
- msg="earmark has already been set by smb cluster "
- f"{parsed_earmark['cluster_id']}",
- )
-
- name_used_by = _share_name_in_use(staging, share)
- if name_used_by:
- raise ErrorResult(
- share,
- msg="share name already in use",
- status={"conflicting_share_id": name_used_by},
- )
-
-
-def _share_name_in_use(
- staging: _Staging, share: resources.Share
-) -> Optional[str]:
- """Returns the share_id value if the share's name is already in
- use by a different share in the cluster. Returns None if no other
- shares are using the name.
- """
- share_ids = (share.cluster_id, share.share_id)
- share_ns = str(ConfigNS.SHARES)
- # look for any duplicate names in the staging area.
- # these items are already in memory
- for ekey, res in staging.incoming.items():
- if ekey[0] != share_ns:
- continue # not a share
- assert isinstance(res, resources.Share)
- if (res.cluster_id, res.share_id) == share_ids:
- continue # this share
- if (res.cluster_id, res.name) == (share.cluster_id, share.name):
- return res.share_id
- # look for any duplicate names in the underyling store
- found = config_store.find_in_store(
- staging.destination_store,
- share_ns,
- {'cluster_id': share.cluster_id, 'name': share.name},
- )
- # remove any shares that are deleted in staging
- found_curr = [
- entry for entry in found if entry.full_key not in staging.deleted
- ]
- # remove self-share from list
- id_pair = operator.itemgetter('cluster_id', 'share_id')
- found_curr = [
- entry for entry in found_curr if id_pair(entry.get()) != share_ids
- ]
- if not found_curr:
- return None
- if len(found_curr) != 1:
- # this should not normally happen
- log.warning(
- 'multiple shares with one name in cluster: %s',
- ' '.join(s.get()['share_id'] for s in found_curr),
- )
- return found_curr[0].get()['share_id']
-
-
-def _check_join_auths(
- join_auth: resources.JoinAuth, staging: _Staging
-) -> None:
- """Check that the JoinAuth resource can be updated."""
- if join_auth.intent == Intent.PRESENT:
- return _check_join_auths_present(join_auth, staging)
- return _check_join_auths_removed(join_auth, staging)
-
-
-def _check_join_auths_removed(
- join_auth: resources.JoinAuth, staging: _Staging
-) -> None:
- cids = set(ClusterEntry.ids(staging))
- refs_in_use: Dict[str, List[str]] = {}
- for cluster_id in cids:
- cluster = staging.get_cluster(cluster_id)
- for ref in _auth_refs(cluster):
- refs_in_use.setdefault(ref, []).append(cluster_id)
- log.debug('refs_in_use: %r', refs_in_use)
- if join_auth.auth_id in refs_in_use:
- raise ErrorResult(
- join_auth,
- msg='join auth resource in use by clusters',
- status={
- 'clusters': refs_in_use[join_auth.auth_id],
- },
- )
-
-
-def _check_join_auths_present(
- join_auth: resources.JoinAuth, staging: _Staging
-) -> None:
- if join_auth.linked_to_cluster:
- cids = set(ClusterEntry.ids(staging))
- if join_auth.linked_to_cluster not in cids:
- raise ErrorResult(
- join_auth,
- msg='linked_to_cluster id not valid',
- status={
- 'unknown_id': join_auth.linked_to_cluster,
- },
- )
-
-
-def _check_users_and_groups(
- users_and_groups: resources.UsersAndGroups, staging: _Staging
-) -> None:
- """Check that the UsersAndGroups resource can be updated."""
- if users_and_groups.intent == Intent.PRESENT:
- return _check_users_and_groups_present(users_and_groups, staging)
- return _check_users_and_groups_removed(users_and_groups, staging)
-
-
-def _check_users_and_groups_removed(
- users_and_groups: resources.UsersAndGroups, staging: _Staging
-) -> None:
- refs_in_use: Dict[str, List[str]] = {}
- cids = set(ClusterEntry.ids(staging))
- for cluster_id in cids:
- cluster = staging.get_cluster(cluster_id)
- for ref in _ug_refs(cluster):
- refs_in_use.setdefault(ref, []).append(cluster_id)
- log.debug('refs_in_use: %r', refs_in_use)
- if users_and_groups.users_groups_id in refs_in_use:
- raise ErrorResult(
- users_and_groups,
- msg='users and groups resource in use by clusters',
- status={
- 'clusters': refs_in_use[users_and_groups.users_groups_id],
- },
- )
-
-
-def _check_users_and_groups_present(
- users_and_groups: resources.UsersAndGroups, staging: _Staging
-) -> None:
- if users_and_groups.linked_to_cluster:
- cids = set(ClusterEntry.ids(staging))
- if users_and_groups.linked_to_cluster not in cids:
- raise ErrorResult(
- users_and_groups,
- msg='linked_to_cluster id not valid',
- status={
- 'unknown_id': users_and_groups.linked_to_cluster,
- },
- )
-
-
-def _prune_linked_entries(staging: _Staging) -> None:
+def _prune_linked_entries(staging: Staging) -> None:
cids = set(ClusterEntry.ids(staging))
for auth_id in JoinAuthEntry.ids(staging):
join_auth = staging.get_join_auth(auth_id)
).remove()
-def _auth_refs(cluster: resources.Cluster) -> Collection[str]:
- if cluster.auth_mode != AuthMode.ACTIVE_DIRECTORY:
- return set()
- return {
- j.ref
- for j in checked(cluster.domain_settings).join_sources
- if j.source_type == JoinSourceType.RESOURCE and j.ref
- }
-
-
-def _ug_refs(cluster: resources.Cluster) -> Collection[str]:
- if (
- cluster.auth_mode != AuthMode.USER
- or cluster.user_group_settings is None
- ):
- return set()
- return {
- ug.ref
- for ug in cluster.user_group_settings
- if ug.source_type == UserGroupSourceType.RESOURCE and ug.ref
- }
-
-
def _generate_share(
share: resources.Share, resolver: PathResolver, cephx_entity: str
) -> Dict[str, Dict[str, str]]:
--- /dev/null
+from typing import (
+ Collection,
+ Dict,
+ Iterator,
+ List,
+ Optional,
+ Set,
+ Union,
+)
+
+import logging
+import operator
+
+from ceph.fs.earmarking import EarmarkTopScope
+
+from . import config_store, resources
+from .enums import (
+ AuthMode,
+ ConfigNS,
+ Intent,
+ JoinSourceType,
+ SMBClustering,
+ State,
+ UserGroupSourceType,
+)
+from .internal import (
+ ClusterEntry,
+ JoinAuthEntry,
+ ShareEntry,
+ UsersAndGroupsEntry,
+ resource_entry,
+ resource_key,
+)
+from .proto import (
+ ConfigStore,
+ EarmarkResolver,
+ EntryKey,
+ PathResolver,
+)
+from .resources import SMBResource
+from .results import ErrorResult, Result, ResultGroup
+from .utils import checked
+
+ClusterRef = Union[resources.Cluster, resources.RemovedCluster]
+ShareRef = Union[resources.Share, resources.RemovedShare]
+
+log = logging.getLogger(__name__)
+
+
+class Staging:
+ def __init__(self, store: ConfigStore) -> None:
+ self.destination_store = store
+ self.incoming: Dict[EntryKey, SMBResource] = {}
+ self.deleted: Dict[EntryKey, SMBResource] = {}
+ self._store_keycache: Set[EntryKey] = set()
+ self._virt_keycache: Set[EntryKey] = set()
+
+ def stage(self, resource: SMBResource) -> None:
+ self._virt_keycache = set()
+ ekey = resource_key(resource)
+ if resource.intent == Intent.REMOVED:
+ self.deleted[ekey] = resource
+ else:
+ self.deleted.pop(ekey, None)
+ self.incoming[ekey] = resource
+
+ def _virtual_keys(self) -> Collection[EntryKey]:
+ if self._virt_keycache:
+ return self._virt_keycache
+ self._virt_keycache = set(self._store_keys()) - set(
+ self.deleted
+ ) | set(self.incoming)
+ return self._virt_keycache
+
+ def _store_keys(self) -> Collection[EntryKey]:
+ if not self._store_keycache:
+ self._store_keycache = set(self.destination_store)
+ return self._store_keycache
+
+ def __iter__(self) -> Iterator[EntryKey]:
+ return iter(self._virtual_keys())
+
+ def namespaces(self) -> Collection[str]:
+ return {k[0] for k in self}
+
+ def contents(self, ns: str) -> Collection[str]:
+ return {kname for kns, kname in self if kns == ns}
+
+ def is_new(self, resource: SMBResource) -> bool:
+ ekey = resource_key(resource)
+ return ekey not in self._store_keys()
+
+ def get_cluster(self, cluster_id: str) -> resources.Cluster:
+ ekey = (str(ClusterEntry.namespace), cluster_id)
+ if ekey in self.incoming:
+ res = self.incoming[ekey]
+ assert isinstance(res, resources.Cluster)
+ return res
+ return ClusterEntry.from_store(
+ self.destination_store, cluster_id
+ ).get_cluster()
+
+ def get_join_auth(self, auth_id: str) -> resources.JoinAuth:
+ ekey = (str(JoinAuthEntry.namespace), auth_id)
+ if ekey in self.incoming:
+ res = self.incoming[ekey]
+ assert isinstance(res, resources.JoinAuth)
+ return res
+ return JoinAuthEntry.from_store(
+ self.destination_store, auth_id
+ ).get_join_auth()
+
+ def get_users_and_groups(self, ug_id: str) -> resources.UsersAndGroups:
+ ekey = (str(UsersAndGroupsEntry.namespace), ug_id)
+ if ekey in self.incoming:
+ res = self.incoming[ekey]
+ assert isinstance(res, resources.UsersAndGroups)
+ return res
+ return UsersAndGroupsEntry.from_store(
+ self.destination_store, ug_id
+ ).get_users_and_groups()
+
+ def save(self) -> ResultGroup:
+ results = ResultGroup()
+ for res in self.deleted.values():
+ results.append(self._save(res))
+ for res in self.incoming.values():
+ results.append(self._save(res))
+ return results
+
+ def _save(self, resource: SMBResource) -> Result:
+ entry = resource_entry(self.destination_store, resource)
+ if resource.intent == Intent.REMOVED:
+ removed = entry.remove()
+ state = State.REMOVED if removed else State.NOT_PRESENT
+ else:
+ state = entry.create_or_update(resource)
+ log.debug('saved resource: %r; state: %s', resource, state)
+ result = Result(resource, success=True, status={'state': state})
+ return result
+
+
+def auth_refs(cluster: resources.Cluster) -> Collection[str]:
+ if cluster.auth_mode != AuthMode.ACTIVE_DIRECTORY:
+ return set()
+ return {
+ j.ref
+ for j in checked(cluster.domain_settings).join_sources
+ if j.source_type == JoinSourceType.RESOURCE and j.ref
+ }
+
+
+def ug_refs(cluster: resources.Cluster) -> Collection[str]:
+ if (
+ cluster.auth_mode != AuthMode.USER
+ or cluster.user_group_settings is None
+ ):
+ return set()
+ return {
+ ug.ref
+ for ug in cluster.user_group_settings
+ if ug.source_type == UserGroupSourceType.RESOURCE and ug.ref
+ }
+
+
+def cross_check_resource(
+ resource: SMBResource,
+ staging: Staging,
+ *,
+ path_resolver: PathResolver,
+ earmark_resolver: EarmarkResolver,
+) -> None:
+ if isinstance(resource, (resources.Cluster, resources.RemovedCluster)):
+ _check_cluster(resource, staging)
+ elif isinstance(resource, (resources.Share, resources.RemovedShare)):
+ _check_share(
+ resource,
+ staging,
+ path_resolver,
+ earmark_resolver,
+ )
+ elif isinstance(resource, resources.JoinAuth):
+ _check_join_auths(resource, staging)
+ elif isinstance(resource, resources.UsersAndGroups):
+ _check_users_and_groups(resource, staging)
+ else:
+ raise TypeError('not a valid smb resource')
+
+
+def _check_cluster(cluster: ClusterRef, staging: Staging) -> None:
+ """Check that the cluster resource can be updated."""
+ if cluster.intent == Intent.PRESENT:
+ return _check_cluster_present(cluster, staging)
+ return _check_cluster_removed(cluster, staging)
+
+
+def _check_cluster_removed(cluster: ClusterRef, staging: Staging) -> None:
+ share_ids = ShareEntry.ids(staging)
+ clusters_used = {cid for cid, _ in share_ids}
+ if cluster.cluster_id in clusters_used:
+ raise ErrorResult(
+ cluster,
+ msg="cluster in use by shares",
+ status={
+ 'shares': [
+ shid
+ for cid, shid in share_ids
+ if cid == cluster.cluster_id
+ ]
+ },
+ )
+
+
+def _check_cluster_present(cluster: ClusterRef, staging: Staging) -> None:
+ assert isinstance(cluster, resources.Cluster)
+ cluster.validate()
+ if not staging.is_new(cluster):
+ _check_cluster_modifications(cluster, staging)
+ for auth_ref in auth_refs(cluster):
+ auth = staging.get_join_auth(auth_ref)
+ if (
+ auth.linked_to_cluster
+ and auth.linked_to_cluster != cluster.cluster_id
+ ):
+ raise ErrorResult(
+ cluster,
+ msg="join auth linked to different cluster",
+ status={
+ 'other_cluster_id': auth.linked_to_cluster,
+ },
+ )
+ for ug_ref in ug_refs(cluster):
+ ug = staging.get_users_and_groups(ug_ref)
+ if (
+ ug.linked_to_cluster
+ and ug.linked_to_cluster != cluster.cluster_id
+ ):
+ raise ErrorResult(
+ cluster,
+ msg="users and groups linked to different cluster",
+ status={
+ 'other_cluster_id': ug.linked_to_cluster,
+ },
+ )
+
+
+def _check_cluster_modifications(
+ cluster: resources.Cluster, staging: Staging
+) -> None:
+ """cluster has some fields we do not permit changing after the cluster has
+ been created.
+ """
+ prev = ClusterEntry.from_store(
+ staging.destination_store, cluster.cluster_id
+ ).get_cluster()
+ if cluster.auth_mode != prev.auth_mode:
+ raise ErrorResult(
+ cluster,
+ 'auth_mode value may not be changed',
+ status={'existing_auth_mode': prev.auth_mode},
+ )
+ if cluster.auth_mode == AuthMode.ACTIVE_DIRECTORY:
+ assert prev.domain_settings
+ if not cluster.domain_settings:
+ # should not occur
+ raise ErrorResult(cluster, "domain settings missing from cluster")
+ if cluster.domain_settings.realm != prev.domain_settings.realm:
+ raise ErrorResult(
+ cluster,
+ 'domain/realm value may not be changed',
+ status={'existing_domain_realm': prev.domain_settings.realm},
+ )
+ if cluster.is_clustered() != prev.is_clustered():
+ prev_clustering = prev.is_clustered()
+ cterms = {True: 'enabled', False: 'disabled'}
+ msg = (
+ f'a cluster resource with clustering {cterms[prev_clustering]}'
+ f' may not be changed to clustering {cterms[not prev_clustering]}'
+ )
+ opt_terms = {
+ True: SMBClustering.ALWAYS.value,
+ False: SMBClustering.NEVER.value,
+ }
+ hint = {
+ 'note': (
+ 'Set "clustering" to an explicit value that matches the'
+ ' current clustering behavior'
+ ),
+ 'value': opt_terms[prev_clustering],
+ }
+ raise ErrorResult(cluster, msg, status={'hint': hint})
+
+
+def _check_share(
+ share: ShareRef,
+ staging: Staging,
+ resolver: PathResolver,
+ earmark_resolver: EarmarkResolver,
+) -> None:
+ """Check that the share resource can be updated."""
+ if share.intent == Intent.REMOVED:
+ return
+ assert isinstance(share, resources.Share)
+ share.validate()
+ if share.cluster_id not in ClusterEntry.ids(staging):
+ raise ErrorResult(
+ share,
+ msg="no matching cluster id",
+ status={"cluster_id": share.cluster_id},
+ )
+ assert share.cephfs is not None
+ try:
+ volpath = resolver.resolve_exists(
+ share.cephfs.volume,
+ share.cephfs.subvolumegroup,
+ share.cephfs.subvolume,
+ share.cephfs.path,
+ )
+ except (FileNotFoundError, NotADirectoryError):
+ raise ErrorResult(
+ share, msg="path is not a valid directory in volume"
+ )
+ if earmark_resolver:
+ earmark = earmark_resolver.get_earmark(
+ volpath,
+ share.cephfs.volume,
+ )
+ if not earmark:
+ smb_earmark = (
+ f"{EarmarkTopScope.SMB.value}.cluster.{share.cluster_id}"
+ )
+ earmark_resolver.set_earmark(
+ volpath,
+ share.cephfs.volume,
+ smb_earmark,
+ )
+ else:
+ parsed_earmark = _parse_earmark(earmark)
+
+ # Check if the top-level scope is not SMB
+ if not earmark_resolver.check_earmark(
+ earmark, EarmarkTopScope.SMB
+ ):
+ raise ErrorResult(
+ share,
+ msg=f"earmark has already been set by {parsed_earmark['scope']}",
+ )
+
+ # Check if the earmark is set by a different cluster
+ if (
+ parsed_earmark['cluster_id']
+ and parsed_earmark['cluster_id'] != share.cluster_id
+ ):
+ raise ErrorResult(
+ share,
+ msg="earmark has already been set by smb cluster "
+ f"{parsed_earmark['cluster_id']}",
+ )
+
+ name_used_by = _share_name_in_use(staging, share)
+ if name_used_by:
+ raise ErrorResult(
+ share,
+ msg="share name already in use",
+ status={"conflicting_share_id": name_used_by},
+ )
+
+
+def _share_name_in_use(
+ staging: Staging, share: resources.Share
+) -> Optional[str]:
+ """Returns the share_id value if the share's name is already in
+ use by a different share in the cluster. Returns None if no other
+ shares are using the name.
+ """
+ share_ids = (share.cluster_id, share.share_id)
+ share_ns = str(ConfigNS.SHARES)
+ # look for any duplicate names in the staging area.
+ # these items are already in memory
+ for ekey, res in staging.incoming.items():
+ if ekey[0] != share_ns:
+ continue # not a share
+ assert isinstance(res, resources.Share)
+ if (res.cluster_id, res.share_id) == share_ids:
+ continue # this share
+ if (res.cluster_id, res.name) == (share.cluster_id, share.name):
+ return res.share_id
+ # look for any duplicate names in the underyling store
+ found = config_store.find_in_store(
+ staging.destination_store,
+ share_ns,
+ {'cluster_id': share.cluster_id, 'name': share.name},
+ )
+ # remove any shares that are deleted in staging
+ found_curr = [
+ entry for entry in found if entry.full_key not in staging.deleted
+ ]
+ # remove self-share from list
+ id_pair = operator.itemgetter('cluster_id', 'share_id')
+ found_curr = [
+ entry for entry in found_curr if id_pair(entry.get()) != share_ids
+ ]
+ if not found_curr:
+ return None
+ if len(found_curr) != 1:
+ # this should not normally happen
+ log.warning(
+ 'multiple shares with one name in cluster: %s',
+ ' '.join(s.get()['share_id'] for s in found_curr),
+ )
+ return found_curr[0].get()['share_id']
+
+
+def _check_join_auths(
+ join_auth: resources.JoinAuth, staging: Staging
+) -> None:
+ """Check that the JoinAuth resource can be updated."""
+ if join_auth.intent == Intent.PRESENT:
+ return _check_join_auths_present(join_auth, staging)
+ return _check_join_auths_removed(join_auth, staging)
+
+
+def _check_join_auths_removed(
+ join_auth: resources.JoinAuth, staging: Staging
+) -> None:
+ cids = set(ClusterEntry.ids(staging))
+ refs_in_use: Dict[str, List[str]] = {}
+ for cluster_id in cids:
+ cluster = staging.get_cluster(cluster_id)
+ for ref in auth_refs(cluster):
+ refs_in_use.setdefault(ref, []).append(cluster_id)
+ log.debug('refs_in_use: %r', refs_in_use)
+ if join_auth.auth_id in refs_in_use:
+ raise ErrorResult(
+ join_auth,
+ msg='join auth resource in use by clusters',
+ status={
+ 'clusters': refs_in_use[join_auth.auth_id],
+ },
+ )
+
+
+def _check_join_auths_present(
+ join_auth: resources.JoinAuth, staging: Staging
+) -> None:
+ if join_auth.linked_to_cluster:
+ cids = set(ClusterEntry.ids(staging))
+ if join_auth.linked_to_cluster not in cids:
+ raise ErrorResult(
+ join_auth,
+ msg='linked_to_cluster id not valid',
+ status={
+ 'unknown_id': join_auth.linked_to_cluster,
+ },
+ )
+
+
+def _check_users_and_groups(
+ users_and_groups: resources.UsersAndGroups, staging: Staging
+) -> None:
+ """Check that the UsersAndGroups resource can be updated."""
+ if users_and_groups.intent == Intent.PRESENT:
+ return _check_users_and_groups_present(users_and_groups, staging)
+ return _check_users_and_groups_removed(users_and_groups, staging)
+
+
+def _check_users_and_groups_removed(
+ users_and_groups: resources.UsersAndGroups, staging: Staging
+) -> None:
+ refs_in_use: Dict[str, List[str]] = {}
+ cids = set(ClusterEntry.ids(staging))
+ for cluster_id in cids:
+ cluster = staging.get_cluster(cluster_id)
+ for ref in ug_refs(cluster):
+ refs_in_use.setdefault(ref, []).append(cluster_id)
+ log.debug('refs_in_use: %r', refs_in_use)
+ if users_and_groups.users_groups_id in refs_in_use:
+ raise ErrorResult(
+ users_and_groups,
+ msg='users and groups resource in use by clusters',
+ status={
+ 'clusters': refs_in_use[users_and_groups.users_groups_id],
+ },
+ )
+
+
+def _check_users_and_groups_present(
+ users_and_groups: resources.UsersAndGroups, staging: Staging
+) -> None:
+ if users_and_groups.linked_to_cluster:
+ cids = set(ClusterEntry.ids(staging))
+ if users_and_groups.linked_to_cluster not in cids:
+ raise ErrorResult(
+ users_and_groups,
+ msg='linked_to_cluster id not valid',
+ status={
+ 'unknown_id': users_and_groups.linked_to_cluster,
+ },
+ )
+
+
+def _parse_earmark(earmark: str) -> dict:
+ parts = earmark.split('.')
+
+ # If it only has one part (e.g., 'smb'), return None for cluster_id
+ if len(parts) == 1:
+ return {'scope': parts[0], 'cluster_id': None}
+
+ return {
+ 'scope': parts[0],
+ 'cluster_id': parts[2] if len(parts) > 2 else None,
+ }