if isinstance(
resource, (resources.Cluster, resources.RemovedCluster)
):
- self._check_cluster(resource)
+ check_cluster(resource, self.internal_store)
entry = self._cluster_entry(resource.cluster_id)
elif isinstance(resource, (resources.Share, resources.RemovedShare)):
- self._check_share(resource)
+ check_share(resource, self.internal_store, self._path_resolver)
entry = self._share_entry(resource.cluster_id, resource.share_id)
elif isinstance(resource, resources.JoinAuth):
- self._check_join_auths(resource)
+ check_join_auths(resource, self.internal_store)
entry = self._join_auth_entry(resource.auth_id)
elif isinstance(resource, resources.UsersAndGroups):
- self._check_users_and_groups(resource)
+ check_users_and_groups(resource, self.internal_store)
entry = self._users_and_groups_entry(resource.users_groups_id)
else:
raise TypeError('not a valid smb resource')
external.rm_cluster(self.priv_store, cluster_id)
external.rm_cluster(self.public_store, cluster_id)
- def _check_cluster(self, cluster: ClusterRef) -> None:
- """Check that the cluster resource can be updated."""
- if cluster.intent == Intent.REMOVED:
- share_ids = ShareEntry.ids(self.internal_store)
- clusters_used = {cid for cid, _ in share_ids}
- if cluster.cluster_id in clusters_used:
- raise ErrorResult(
- cluster,
- msg="cluster in use by shares",
- status={
- 'clusters': [
- shid
- for cid, shid in share_ids
- if cid == cluster.cluster_id
- ]
- },
- )
- return
- assert isinstance(cluster, resources.Cluster)
- cluster.validate()
-
- def _check_share(self, share: ShareRef) -> None:
- """Check that the share resource can be updated."""
- if share.intent == Intent.REMOVED:
- return
- assert isinstance(share, resources.Share)
- share.validate()
- if share.cluster_id not in ClusterEntry.ids(self.internal_store):
- raise ErrorResult(
- share,
- msg="no matching cluster id",
- status={"cluster_id": share.cluster_id},
- )
- assert share.cephfs is not None
- try:
- self._path_resolver.resolve_exists(
- share.cephfs.volume,
- share.cephfs.subvolumegroup,
- share.cephfs.subvolume,
- share.cephfs.path,
- )
- except (FileNotFoundError, NotADirectoryError):
- raise ErrorResult(
- share, msg="path is not a valid directory in volume"
- )
-
- def _check_join_auths(self, join_auth: resources.JoinAuth) -> None:
- """Check that the JoinAuth resource can be updated."""
- if join_auth.intent == Intent.PRESENT:
- return # adding is always ok
- refs_in_use: Dict[str, List[str]] = {}
- for cluster_id in ClusterEntry.ids(self.internal_store):
- cluster = self._cluster_entry(cluster_id).get_cluster()
- for ref in _auth_refs(cluster):
- refs_in_use.setdefault(ref, []).append(cluster_id)
- log.debug('refs_in_use: %r', refs_in_use)
- if join_auth.auth_id in refs_in_use:
- raise ErrorResult(
- join_auth,
- msg='join auth resource in use by clusters',
- status={
- 'clusters': refs_in_use[join_auth.auth_id],
- },
- )
-
- def _check_users_and_groups(
- self, users_and_groups: resources.UsersAndGroups
- ) -> None:
- """Check that the UsersAndGroups resource can be updated."""
- if users_and_groups.intent == Intent.PRESENT:
- return # adding is always ok
- refs_in_use: Dict[str, List[str]] = {}
- for cluster_id in ClusterEntry.ids(self.internal_store):
- cluster = self._cluster_entry(cluster_id).get_cluster()
- for ref in _ug_refs(cluster):
- refs_in_use.setdefault(ref, []).append(cluster_id)
- log.debug('refs_in_use: %r', refs_in_use)
- if users_and_groups.users_groups_id in refs_in_use:
- raise ErrorResult(
- users_and_groups,
- msg='users and groups resource in use by clusters',
- status={
- 'clusters': refs_in_use[users_and_groups.users_groups_id],
- },
- )
-
def _cluster_entry(self, cluster_id: str) -> ClusterEntry:
return ClusterEntry.from_store(self.internal_store, cluster_id)
return sorted(resource_objs, key=_keyfunc)
+def check_cluster(cluster: ClusterRef, store: ConfigStore) -> None:
+ """Check that the cluster resource can be updated."""
+ if cluster.intent == Intent.REMOVED:
+ share_ids = ShareEntry.ids(store)
+ clusters_used = {cid for cid, _ in share_ids}
+ if cluster.cluster_id in clusters_used:
+ raise ErrorResult(
+ cluster,
+ msg="cluster in use by shares",
+ status={
+ 'clusters': [
+ shid
+ for cid, shid in share_ids
+ if cid == cluster.cluster_id
+ ]
+ },
+ )
+ return
+ assert isinstance(cluster, resources.Cluster)
+ cluster.validate()
+
+
+def check_share(
+ share: ShareRef, store: ConfigStore, resolver: PathResolver
+) -> None:
+ """Check that the share resource can be updated."""
+ if share.intent == Intent.REMOVED:
+ return
+ assert isinstance(share, resources.Share)
+ share.validate()
+ if share.cluster_id not in ClusterEntry.ids(store):
+ raise ErrorResult(
+ share,
+ msg="no matching cluster id",
+ status={"cluster_id": share.cluster_id},
+ )
+ assert share.cephfs is not None
+ try:
+ resolver.resolve_exists(
+ share.cephfs.volume,
+ share.cephfs.subvolumegroup,
+ share.cephfs.subvolume,
+ share.cephfs.path,
+ )
+ except (FileNotFoundError, NotADirectoryError):
+ raise ErrorResult(
+ share, msg="path is not a valid directory in volume"
+ )
+
+
+def check_join_auths(
+ join_auth: resources.JoinAuth, store: ConfigStore
+) -> None:
+ """Check that the JoinAuth resource can be updated."""
+ if join_auth.intent == Intent.PRESENT:
+ return # adding is always ok
+ refs_in_use: Dict[str, List[str]] = {}
+ for cluster_id in ClusterEntry.ids(store):
+ cluster = ClusterEntry.from_store(store, cluster_id).get_cluster()
+ for ref in _auth_refs(cluster):
+ refs_in_use.setdefault(ref, []).append(cluster_id)
+ log.debug('refs_in_use: %r', refs_in_use)
+ if join_auth.auth_id in refs_in_use:
+ raise ErrorResult(
+ join_auth,
+ msg='join auth resource in use by clusters',
+ status={
+ 'clusters': refs_in_use[join_auth.auth_id],
+ },
+ )
+
+
+def check_users_and_groups(
+ users_and_groups: resources.UsersAndGroups, store: ConfigStore
+) -> None:
+ """Check that the UsersAndGroups resource can be updated."""
+ if users_and_groups.intent == Intent.PRESENT:
+ return # adding is always ok
+ refs_in_use: Dict[str, List[str]] = {}
+ for cluster_id in ClusterEntry.ids(store):
+ cluster = ClusterEntry.from_store(store, cluster_id).get_cluster()
+ for ref in _ug_refs(cluster):
+ refs_in_use.setdefault(ref, []).append(cluster_id)
+ log.debug('refs_in_use: %r', refs_in_use)
+ if users_and_groups.users_groups_id in refs_in_use:
+ raise ErrorResult(
+ users_and_groups,
+ msg='users and groups resource in use by clusters',
+ status={
+ 'clusters': refs_in_use[users_and_groups.users_groups_id],
+ },
+ )
+
+
def _auth_refs(cluster: resources.Cluster) -> Collection[str]:
if cluster.auth_mode != AuthMode.ACTIVE_DIRECTORY:
return set()