]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: Adding genric cert/key name support for get certificates
authorShweta Bhosale <Shweta.Bhosale1@ibm.com>
Wed, 3 Sep 2025 14:34:54 +0000 (20:04 +0530)
committerShweta Bhosale <Shweta.Bhosale1@ibm.com>
Thu, 25 Sep 2025 16:42:12 +0000 (22:12 +0530)
Fixes: https://tracker.ceph.com/issues/71707
Signed-off-by: Shweta Bhosale <Shweta.Bhosale1@ibm.com>
src/pybind/mgr/cephadm/services/cephadmservice.py

index 8828c182a2564c4633197d448eb1bad4479290e4..a92f37c84e5ad161571715d6b5596be8654b7b8b 100644 (file)
@@ -333,15 +333,44 @@ class CephadmService(metaclass=ABCMeta):
         if not self.requires_certificates or not svc_spec.ssl:
             return EMPTY_TLS_KEYPAIR
 
+        return self.get_certificates_generic(
+            svc_spec=svc_spec,
+            daemon_spec=daemon_spec,
+            cert_attr='ssl_cert',
+            key_attr='ssl_key',
+            cert_source_attr='certificate_source',
+            cert_name=self.cert_name,
+            key_name=self.key_name,
+            ips=ips,
+            fqdns=fqdns,
+            custom_sans=custom_sans
+        )
+
+    def get_certificates_generic(
+        self,
+        svc_spec: ServiceSpec,
+        daemon_spec: CephadmDaemonDeploySpec,
+        cert_attr: str,
+        key_attr: str,
+        cert_source_attr: str,
+        cert_name: str,
+        key_name: str,
+        custom_sans: Optional[List[str]] = None,
+        ips: Optional[List[str]] = None,
+        fqdns: Optional[List[str]] = None
+    ) -> CertKeyPair:
+
         ips = ips or [self.mgr.inventory.get_addr(daemon_spec.host)]
         fqdns = fqdns or [self.mgr.get_fqdn(daemon_spec.host)]
+        custom_sans = custom_sans or []
 
-        cert_source = svc_spec.certificate_source
+        cert_source = getattr(svc_spec, cert_source_attr, None)
         logger.debug(f'Getting certificate for {svc_spec.service_name()} using source: {cert_source}')
+
         if cert_source == CertificateSource.INLINE.value:
-            return self._get_certificates_from_spec(svc_spec, daemon_spec)
+            return self._get_certificates_from_spec(svc_spec, daemon_spec, cert_attr, key_attr, cert_name, key_name)
         elif cert_source == CertificateSource.REFERENCE.value:
-            return self._get_certificates_from_certmgr_store(svc_spec, fqdns)
+            return self._get_certificates_from_certmgr_store(svc_spec, fqdns, cert_name, key_name)
         elif cert_source == CertificateSource.CEPHADM_SIGNED.value:
             return self._get_cephadm_signed_certificates(svc_spec, daemon_spec, ips, fqdns, custom_sans)
         else:
@@ -351,20 +380,24 @@ class CephadmService(metaclass=ABCMeta):
     def _get_certificates_from_spec(
         self,
         svc_spec: ServiceSpec,
-        daemon_spec: CephadmDaemonDeploySpec
+        daemon_spec: CephadmDaemonDeploySpec,
+        cert_attr: str,
+        key_attr: str,
+        cert_name: str,
+        key_name: str
     ) -> CertKeyPair:
         """
         Fetch and persist the TLS certificate and key for a service spec.
         Returns:
             A CertKeyPair if both are available; otherwise EMPTY_TLS_KEYPAIR.
         """
-        cert = getattr(svc_spec, 'ssl_cert', None)
-        key = getattr(svc_spec, 'ssl_key', None)
+        cert = getattr(svc_spec, cert_attr, None)
+        key = getattr(svc_spec, key_attr, None)
         if cert and key:
             service_name = svc_spec.service_name()
             host = daemon_spec.host
-            self.mgr.cert_mgr.save_cert(self.cert_name, cert, service_name, host, user_made=True)
-            self.mgr.cert_mgr.save_key(self.key_name, key, service_name, host, user_made=True)
+            self.mgr.cert_mgr.save_cert(cert_name, cert, service_name, host, user_made=True)
+            self.mgr.cert_mgr.save_key(key_name, key, service_name, host, user_made=True)
             return CertKeyPair(cert=cert, key=key)
 
         logger.error(
@@ -372,23 +405,30 @@ class CephadmService(metaclass=ABCMeta):
         )
         return EMPTY_TLS_KEYPAIR
 
-    def _get_certificates_from_certmgr_store(self, svc_spec: ServiceSpec, fqdns: List[str]) -> CertKeyPair:
+    def _get_certificates_from_certmgr_store(
+        self,
+        svc_spec: ServiceSpec,
+        fqdns: List[str],
+        cert_name: str,
+        key_name: str
+    ) -> CertKeyPair:
         host = fqdns[0] if fqdns else None
-        cert = self.mgr.cert_mgr.get_cert(self.cert_name, svc_spec.service_name(), host)
-        key = self.mgr.cert_mgr.get_key(self.key_name, svc_spec.service_name(), host)
+        cert = self.mgr.cert_mgr.get_cert(cert_name, svc_spec.service_name(), host)
+        key = self.mgr.cert_mgr.get_key(key_name, svc_spec.service_name(), host)
         if cert and key:
             return CertKeyPair(cert=cert, key=key)
         else:
-            logger.error(f'Failed to get cert/key {self.cert_name} for service {svc_spec.service_name()} host: {host} from the certmgr store.')
+            logger.error(f'Failed to get cert/key {cert_name} for service {svc_spec.service_name()} host: {host} from the certmgr store.')
             return EMPTY_TLS_KEYPAIR
 
-    def _get_cephadm_signed_certificates(self,
-                                         svc_spec: ServiceSpec,
-                                         daemon_spec: CephadmDaemonDeploySpec,
-                                         ips: Optional[List[str]] = None,
-                                         fqdns: Optional[List[str]] = None,
-                                         custom_sans: Optional[List[str]] = None,
-                                         ) -> CertKeyPair:
+    def _get_cephadm_signed_certificates(
+        self,
+        svc_spec: ServiceSpec,
+        daemon_spec: CephadmDaemonDeploySpec,
+        ips: List[str],
+        fqdns: List[str],
+        custom_sans: List[str],
+    ) -> CertKeyPair:
 
         custom_sans = custom_sans or svc_spec.custom_sans or []
         ips = ips or [self.mgr.inventory.get_addr(daemon_spec.host)]