Intent,
JoinSourceType,
SMBClustering,
+ SourceReferenceType,
State,
UserGroupSourceType,
)
)
+@cross_check_resource.register
+def _check_external_ceph_cluster_resource(
+ ext_cluster: resources.ExternalCephCluster, staging: Staging, **_: Any
+) -> None:
+ """Check that the external ceph cluster resource is valid."""
+ if ext_cluster.intent == Intent.PRESENT:
+ return
+ cids = set(ClusterEntry.ids(staging))
+ refs_in_use: Dict[str, List[str]] = {}
+ for cluster_id in cids:
+ cluster = staging.get_cluster(cluster_id)
+ for ref in ext_cluster_refs(cluster):
+ refs_in_use.setdefault(ref, []).append(cluster_id)
+ log.debug('ext cluster refs in use: %r', refs_in_use)
+ my_id = ext_cluster.external_ceph_cluster_id
+ if my_id in refs_in_use:
+ raise ErrorResult(
+ ext_cluster,
+ msg='external ceph cluster resource in use by clusters',
+ status={
+ 'clusters': refs_in_use[my_id],
+ },
+ )
+
+
def _tls_ref(src: Optional[resources.TLSSource]) -> str:
if src and src.source_type == UserGroupSourceType.RESOURCE and src.ref:
return src.ref
return {ref for ref in refs if ref}
+def ext_cluster_refs(cluster: resources.Cluster) -> Collection[str]:
+ if (
+ cluster.external_ceph_cluster
+ and cluster.external_ceph_cluster.source_type
+ == SourceReferenceType.RESOURCE
+ and cluster.external_ceph_cluster.ref
+ ):
+ return [cluster.external_ceph_cluster.ref]
+ return []
+
+
def _parse_earmark(earmark: str) -> dict:
parts = earmark.split('.')