]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/smb: add tls credential support to staging
authorJohn Mulligan <jmulligan@redhat.com>
Wed, 2 Jul 2025 21:27:45 +0000 (17:27 -0400)
committerJohn Mulligan <jmulligan@redhat.com>
Tue, 12 Aug 2025 14:49:13 +0000 (10:49 -0400)
Add the necessary cross-check functions to staging.py for tls credential
resources. Add prune function for tls resources because they support
linked_to_cluster.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
src/pybind/mgr/smb/staging.py

index 658e9a03ff95704e47ca3d6f553ec1d8eb3a0929..4c857434454ed2bc0b8fbdacb32504d671e44d1b 100644 (file)
@@ -30,6 +30,7 @@ from .internal import (
     JoinAuthEntry,
     ResourceEntry,
     ShareEntry,
+    TLSCredentialEntry,
     UsersAndGroupsEntry,
     resource_entry,
     resource_key,
@@ -163,6 +164,7 @@ class Staging:
         cids = set(ClusterEntry.ids(self))
         self._prune(cids, JoinAuthEntry, resources.JoinAuth)
         self._prune(cids, UsersAndGroupsEntry, resources.UsersAndGroups)
+        self._prune(cids, TLSCredentialEntry, resources.TLSCredential)
 
 
 def auth_refs(cluster: resources.Cluster) -> Collection[str]:
@@ -532,6 +534,72 @@ def _check_users_and_groups_present(
             )
 
 
+@cross_check_resource.register
+def _check_tls_credential_resource(
+    resource: resources.TLSCredential, staging: Staging, **_kw: Any
+) -> None:
+    """Check that the tls credential resource can be updated."""
+    if resource.intent == Intent.PRESENT:
+        return _check_tls_credential_present(resource, staging)
+    return _check_tls_credential_removed(resource, staging)
+
+
+def _check_tls_credential_removed(
+    tls_cred: resources.TLSCredential, staging: Staging
+) -> None:
+    cids = set(ClusterEntry.ids(staging))
+    refs_in_use: Dict[str, List[str]] = {}
+    for cluster_id in cids:
+        cluster = staging.get_cluster(cluster_id)
+        for ref in tls_refs(cluster):
+            refs_in_use.setdefault(ref, []).append(cluster_id)
+    log.debug('refs_in_use: %r', refs_in_use)
+    if tls_cred.tls_credential_id in refs_in_use:
+        raise ErrorResult(
+            tls_cred,
+            msg='tls credential resource in use by clusters',
+            status={
+                'clusters': refs_in_use[tls_cred.tls_credential_id],
+            },
+        )
+
+
+def _check_tls_credential_present(
+    tls_cred: resources.TLSCredential, staging: Staging
+) -> None:
+    # TODO: dedupe this logic across join auth and usersgroups
+    if tls_cred.linked_to_cluster:
+        cids = set(ClusterEntry.ids(staging))
+        if tls_cred.linked_to_cluster not in cids:
+            raise ErrorResult(
+                tls_cred,
+                msg='linked_to_cluster id not valid',
+                status={
+                    'unknown_id': tls_cred.linked_to_cluster,
+                },
+            )
+
+
+def _tls_ref(src: Optional[resources.TLSSource]) -> str:
+    if src and src.source_type == UserGroupSourceType.RESOURCE and src.ref:
+        return src.ref
+    return ''
+
+
+def tls_refs(cluster: resources.Cluster) -> Collection[str]:
+    if not cluster.remote_control:
+        return set()
+    refs = (
+        _tls_ref(s)
+        for s in (
+            cluster.remote_control.cert,
+            cluster.remote_control.key,
+            cluster.remote_control.ca_cert,
+        )
+    )
+    return {ref for ref in refs if ref}
+
+
 def _parse_earmark(earmark: str) -> dict:
     parts = earmark.split('.')