'other_cluster_id': ug.linked_to_cluster,
},
)
+ _check_cluster_keybridge(cluster)
def _check_cluster_modifications(
raise ErrorResult(cluster, msg, status={'hint': hint})
+def _check_cluster_keybridge(cluster: resources.Cluster) -> None:
+ if cluster.keybridge is None:
+ return
+ cluster.keybridge.validate()
+ names: Set[str] = set()
+ for kb_scope in checked(cluster.keybridge.scopes):
+ kbsi = kb_scope.scope_identity()
+ if str(kbsi) in names:
+ raise ErrorResult(
+ cluster,
+ f"scope name {kb_scope.name} already in use",
+ )
+ names.add(str(kbsi))
+
+
@cross_check_resource.register
def _check_removed_share_resource(
share: resources.RemovedShare, staging: Staging, **_: Any
msg="share name already in use",
status={"conflicting_share_id": name_used_by},
)
+ _check_fscrypt_scopes(share, staging)
def _share_name_in_use(
return found_curr[0].get()['share_id']
+def _check_fscrypt_scopes(share: resources.Share, staging: Staging) -> None:
+ """Validate that the share refers to a vadlid keybridge scope defined
+ on the cluster the share belongs to.
+ """
+ if not share.checked_cephfs.fscrypt_key:
+ return
+ kbsi = share.checked_cephfs.fscrypt_key.scope_identity()
+ cluster = staging.get_cluster(share.cluster_id)
+ known = _keybridge_ids(cluster)
+ if str(kbsi) not in known:
+ raise ErrorResult(
+ share,
+ msg="scope name not known",
+ status={
+ 'invalid_scope': str(kbsi),
+ 'known_scopes': list(known),
+ 'cluster_id': share.cluster_id,
+ },
+ )
+
+
@cross_check_resource.register
def _check_join_auth_resource(
join_auth: resources.JoinAuth, staging: Staging, **_: Any
return ''
-def tls_refs(cluster: resources.Cluster) -> Collection[str]:
+def _keybridge_tls_refs(cluster: resources.Cluster) -> Set[str]:
+ if not cluster.keybridge or not cluster.keybridge.scopes:
+ return set()
+ maybe_refs: Set[Optional[str]] = set()
+ for scope in cluster.keybridge.scopes:
+ maybe_refs.update(
+ _tls_ref(s)
+ for s in (
+ scope.kmip_cert,
+ scope.kmip_key,
+ scope.kmip_ca_cert,
+ )
+ )
+ return {ref for ref in maybe_refs if ref}
+
+
+def _remotectl_tls_refs(cluster: resources.Cluster) -> Set[str]:
if not cluster.remote_control:
return set()
refs = (
return {ref for ref in refs if ref}
+def tls_refs(cluster: resources.Cluster) -> Collection[str]:
+ return _remotectl_tls_refs(cluster) | _keybridge_tls_refs(cluster)
+
+
+def _keybridge_ids(
+ cluster: resources.Cluster,
+) -> Dict[str, resources.KeyBridgeScopeIdentity]:
+ if not cluster.keybridge or not cluster.keybridge.scopes:
+ return {}
+ kbsids = (s.scope_identity() for s in cluster.keybridge.scopes)
+ return {str(kbsi): kbsi for kbsi in kbsids}
+
+
def _parse_earmark(earmark: str) -> dict:
parts = earmark.split('.')