JoinAuthEntry,
ResourceEntry,
ShareEntry,
+ TLSCredentialEntry,
UsersAndGroupsEntry,
resource_entry,
resource_key,
cids = set(ClusterEntry.ids(self))
self._prune(cids, JoinAuthEntry, resources.JoinAuth)
self._prune(cids, UsersAndGroupsEntry, resources.UsersAndGroups)
+ self._prune(cids, TLSCredentialEntry, resources.TLSCredential)
def auth_refs(cluster: resources.Cluster) -> Collection[str]:
)
+@cross_check_resource.register
+def _check_tls_credential_resource(
+ resource: resources.TLSCredential, staging: Staging, **_kw: Any
+) -> None:
+ """Check that the tls credential resource can be updated."""
+ if resource.intent == Intent.PRESENT:
+ return _check_tls_credential_present(resource, staging)
+ return _check_tls_credential_removed(resource, staging)
+
+
+def _check_tls_credential_removed(
+ tls_cred: resources.TLSCredential, staging: Staging
+) -> None:
+ cids = set(ClusterEntry.ids(staging))
+ refs_in_use: Dict[str, List[str]] = {}
+ for cluster_id in cids:
+ cluster = staging.get_cluster(cluster_id)
+ for ref in tls_refs(cluster):
+ refs_in_use.setdefault(ref, []).append(cluster_id)
+ log.debug('refs_in_use: %r', refs_in_use)
+ if tls_cred.tls_credential_id in refs_in_use:
+ raise ErrorResult(
+ tls_cred,
+ msg='tls credential resource in use by clusters',
+ status={
+ 'clusters': refs_in_use[tls_cred.tls_credential_id],
+ },
+ )
+
+
+def _check_tls_credential_present(
+ tls_cred: resources.TLSCredential, staging: Staging
+) -> None:
+ # TODO: dedupe this logic across join auth and usersgroups
+ if tls_cred.linked_to_cluster:
+ cids = set(ClusterEntry.ids(staging))
+ if tls_cred.linked_to_cluster not in cids:
+ raise ErrorResult(
+ tls_cred,
+ msg='linked_to_cluster id not valid',
+ status={
+ 'unknown_id': tls_cred.linked_to_cluster,
+ },
+ )
+
+
+def _tls_ref(src: Optional[resources.TLSSource]) -> str:
+ if src and src.source_type == UserGroupSourceType.RESOURCE and src.ref:
+ return src.ref
+ return ''
+
+
+def tls_refs(cluster: resources.Cluster) -> Collection[str]:
+ if not cluster.remote_control:
+ return set()
+ refs = (
+ _tls_ref(s)
+ for s in (
+ cluster.remote_control.cert,
+ cluster.remote_control.key,
+ cluster.remote_control.ca_cert,
+ )
+ )
+ return {ref for ref in refs if ref}
+
+
def _parse_earmark(earmark: str) -> dict:
parts = earmark.split('.')