]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/smb: add cross-check validation for keybridge scopes
authorJohn Mulligan <jmulligan@redhat.com>
Fri, 18 Jul 2025 14:24:45 +0000 (10:24 -0400)
committerAvan Thakkar <athakkar@redhat.com>
Thu, 27 Nov 2025 15:07:27 +0000 (20:37 +0530)
Validate that scope names are not re-used, etc. Check on things that
can't be done in single object validation.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
(cherry picked from commit 259c06b0419e2188bbb6f22e211d7fd3e07e60c0)

src/pybind/mgr/smb/staging.py

index 4c857434454ed2bc0b8fbdacb32504d671e44d1b..053a1e4134abf431c7b5b6c12118daa9468bd51f 100644 (file)
@@ -268,6 +268,7 @@ def _check_cluster_resource(
                     'other_cluster_id': ug.linked_to_cluster,
                 },
             )
+    _check_cluster_keybridge(cluster)
 
 
 def _check_cluster_modifications(
@@ -317,6 +318,21 @@ def _check_cluster_modifications(
         raise ErrorResult(cluster, msg, status={'hint': hint})
 
 
+def _check_cluster_keybridge(cluster: resources.Cluster) -> None:
+    if cluster.keybridge is None:
+        return
+    cluster.keybridge.validate()
+    names: Set[str] = set()
+    for kb_scope in checked(cluster.keybridge.scopes):
+        kbsi = kb_scope.scope_identity()
+        if str(kbsi) in names:
+            raise ErrorResult(
+                cluster,
+                f"scope name {kb_scope.name} already in use",
+            )
+        names.add(str(kbsi))
+
+
 @cross_check_resource.register
 def _check_removed_share_resource(
     share: resources.RemovedShare, staging: Staging, **_: Any
@@ -397,6 +413,7 @@ def _check_share_resource(
             msg="share name already in use",
             status={"conflicting_share_id": name_used_by},
         )
+    _check_fscrypt_scopes(share, staging)
 
 
 def _share_name_in_use(
@@ -444,6 +461,27 @@ def _share_name_in_use(
     return found_curr[0].get()['share_id']
 
 
+def _check_fscrypt_scopes(share: resources.Share, staging: Staging) -> None:
+    """Validate that the share refers to a vadlid keybridge scope defined
+    on the cluster the share belongs to.
+    """
+    if not share.checked_cephfs.fscrypt_key:
+        return
+    kbsi = share.checked_cephfs.fscrypt_key.scope_identity()
+    cluster = staging.get_cluster(share.cluster_id)
+    known = _keybridge_ids(cluster)
+    if str(kbsi) not in known:
+        raise ErrorResult(
+            share,
+            msg="scope name not known",
+            status={
+                'invalid_scope': str(kbsi),
+                'known_scopes': list(known),
+                'cluster_id': share.cluster_id,
+            },
+        )
+
+
 @cross_check_resource.register
 def _check_join_auth_resource(
     join_auth: resources.JoinAuth, staging: Staging, **_: Any
@@ -586,7 +624,23 @@ def _tls_ref(src: Optional[resources.TLSSource]) -> str:
     return ''
 
 
-def tls_refs(cluster: resources.Cluster) -> Collection[str]:
+def _keybridge_tls_refs(cluster: resources.Cluster) -> Set[str]:
+    if not cluster.keybridge or not cluster.keybridge.scopes:
+        return set()
+    maybe_refs: Set[Optional[str]] = set()
+    for scope in cluster.keybridge.scopes:
+        maybe_refs.update(
+            _tls_ref(s)
+            for s in (
+                scope.kmip_cert,
+                scope.kmip_key,
+                scope.kmip_ca_cert,
+            )
+        )
+    return {ref for ref in maybe_refs if ref}
+
+
+def _remotectl_tls_refs(cluster: resources.Cluster) -> Set[str]:
     if not cluster.remote_control:
         return set()
     refs = (
@@ -600,6 +654,19 @@ def tls_refs(cluster: resources.Cluster) -> Collection[str]:
     return {ref for ref in refs if ref}
 
 
+def tls_refs(cluster: resources.Cluster) -> Collection[str]:
+    return _remotectl_tls_refs(cluster) | _keybridge_tls_refs(cluster)
+
+
+def _keybridge_ids(
+    cluster: resources.Cluster,
+) -> Dict[str, resources.KeyBridgeScopeIdentity]:
+    if not cluster.keybridge or not cluster.keybridge.scopes:
+        return {}
+    kbsids = (s.scope_identity() for s in cluster.keybridge.scopes)
+    return {str(kbsi): kbsi for kbsi in kbsids}
+
+
 def _parse_earmark(earmark: str) -> dict:
     parts = earmark.split('.')