from typing import (
+ Any,
Collection,
Dict,
Iterator,
List,
Optional,
Set,
- Union,
)
+import functools
import logging
import operator
from .results import ErrorResult, Result, ResultGroup
from .utils import checked
-ClusterRef = Union[resources.Cluster, resources.RemovedCluster]
-ShareRef = Union[resources.Share, resources.RemovedShare]
-
log = logging.getLogger(__name__)
}
+@functools.singledispatch
def cross_check_resource(
resource: SMBResource,
staging: Staging,
path_resolver: PathResolver,
earmark_resolver: EarmarkResolver,
) -> None:
- if isinstance(resource, (resources.Cluster, resources.RemovedCluster)):
- _check_cluster(resource, staging)
- elif isinstance(resource, (resources.Share, resources.RemovedShare)):
- _check_share(
- resource,
- staging,
- path_resolver,
- earmark_resolver,
- )
- elif isinstance(resource, resources.JoinAuth):
- _check_join_auths(resource, staging)
- elif isinstance(resource, resources.UsersAndGroups):
- _check_users_and_groups(resource, staging)
- else:
- raise TypeError('not a valid smb resource')
-
+ raise TypeError(f'not a valid smb resource: {type(resource)}')
-def _check_cluster(cluster: ClusterRef, staging: Staging) -> None:
- """Check that the cluster resource can be updated."""
- if cluster.intent == Intent.PRESENT:
- return _check_cluster_present(cluster, staging)
- return _check_cluster_removed(cluster, staging)
-
-def _check_cluster_removed(cluster: ClusterRef, staging: Staging) -> None:
+@cross_check_resource.register
+def _check_removed_cluster_resource(
+ cluster: resources.RemovedCluster, staging: Staging, **_: Any
+) -> None:
+ assert cluster.intent == Intent.REMOVED
share_ids = ShareEntry.ids(staging)
clusters_used = {cid for cid, _ in share_ids}
if cluster.cluster_id in clusters_used:
)
-def _check_cluster_present(cluster: ClusterRef, staging: Staging) -> None:
- assert isinstance(cluster, resources.Cluster)
+@cross_check_resource.register
+def _check_cluster_resource(
+ cluster: resources.Cluster, staging: Staging, **_: Any
+) -> None:
+ assert cluster.intent == Intent.PRESENT
cluster.validate()
if not staging.is_new(cluster):
_check_cluster_modifications(cluster, staging)
raise ErrorResult(cluster, msg, status={'hint': hint})
-def _check_share(
- share: ShareRef,
+@cross_check_resource.register
+def _check_removed_share_resource(
+ share: resources.RemovedShare, staging: Staging, **_: Any
+) -> None:
+ assert share.intent == Intent.REMOVED
+
+
+@cross_check_resource.register
+def _check_share_resource(
+ share: resources.Share,
staging: Staging,
- resolver: PathResolver,
+ path_resolver: PathResolver,
earmark_resolver: EarmarkResolver,
) -> None:
"""Check that the share resource can be updated."""
- if share.intent == Intent.REMOVED:
- return
+ assert share.intent == Intent.PRESENT
assert isinstance(share, resources.Share)
share.validate()
if share.cluster_id not in ClusterEntry.ids(staging):
)
assert share.cephfs is not None
try:
- volpath = resolver.resolve_exists(
+ volpath = path_resolver.resolve_exists(
share.cephfs.volume,
share.cephfs.subvolumegroup,
share.cephfs.subvolume,
return found_curr[0].get()['share_id']
-def _check_join_auths(
- join_auth: resources.JoinAuth, staging: Staging
+@cross_check_resource.register
+def _check_join_auth_resource(
+ join_auth: resources.JoinAuth, staging: Staging, **_: Any
) -> None:
"""Check that the JoinAuth resource can be updated."""
if join_auth.intent == Intent.PRESENT:
)
-def _check_users_and_groups(
- users_and_groups: resources.UsersAndGroups, staging: Staging
+@cross_check_resource.register
+def _check_users_and_groups_resource(
+ users_and_groups: resources.UsersAndGroups, staging: Staging, **_: Any
) -> None:
"""Check that the UsersAndGroups resource can be updated."""
if users_and_groups.intent == Intent.PRESENT: