From: Shweta Bhosale Date: Wed, 3 Sep 2025 14:34:54 +0000 (+0530) Subject: mgr/cephadm: Adding genric cert/key name support for get certificates X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=14b1dd781de7088683433dde394ddbdbafedaa0e;p=ceph.git mgr/cephadm: Adding genric cert/key name support for get certificates Fixes: https://tracker.ceph.com/issues/71707 Signed-off-by: Shweta Bhosale --- diff --git a/src/pybind/mgr/cephadm/services/cephadmservice.py b/src/pybind/mgr/cephadm/services/cephadmservice.py index 8828c182a256..a92f37c84e5a 100644 --- a/src/pybind/mgr/cephadm/services/cephadmservice.py +++ b/src/pybind/mgr/cephadm/services/cephadmservice.py @@ -333,15 +333,44 @@ class CephadmService(metaclass=ABCMeta): if not self.requires_certificates or not svc_spec.ssl: return EMPTY_TLS_KEYPAIR + return self.get_certificates_generic( + svc_spec=svc_spec, + daemon_spec=daemon_spec, + cert_attr='ssl_cert', + key_attr='ssl_key', + cert_source_attr='certificate_source', + cert_name=self.cert_name, + key_name=self.key_name, + ips=ips, + fqdns=fqdns, + custom_sans=custom_sans + ) + + def get_certificates_generic( + self, + svc_spec: ServiceSpec, + daemon_spec: CephadmDaemonDeploySpec, + cert_attr: str, + key_attr: str, + cert_source_attr: str, + cert_name: str, + key_name: str, + custom_sans: Optional[List[str]] = None, + ips: Optional[List[str]] = None, + fqdns: Optional[List[str]] = None + ) -> CertKeyPair: + ips = ips or [self.mgr.inventory.get_addr(daemon_spec.host)] fqdns = fqdns or [self.mgr.get_fqdn(daemon_spec.host)] + custom_sans = custom_sans or [] - cert_source = svc_spec.certificate_source + cert_source = getattr(svc_spec, cert_source_attr, None) logger.debug(f'Getting certificate for {svc_spec.service_name()} using source: {cert_source}') + if cert_source == CertificateSource.INLINE.value: - return self._get_certificates_from_spec(svc_spec, daemon_spec) + return self._get_certificates_from_spec(svc_spec, daemon_spec, cert_attr, key_attr, cert_name, key_name) elif cert_source == CertificateSource.REFERENCE.value: - return self._get_certificates_from_certmgr_store(svc_spec, fqdns) + return self._get_certificates_from_certmgr_store(svc_spec, fqdns, cert_name, key_name) elif cert_source == CertificateSource.CEPHADM_SIGNED.value: return self._get_cephadm_signed_certificates(svc_spec, daemon_spec, ips, fqdns, custom_sans) else: @@ -351,20 +380,24 @@ class CephadmService(metaclass=ABCMeta): def _get_certificates_from_spec( self, svc_spec: ServiceSpec, - daemon_spec: CephadmDaemonDeploySpec + daemon_spec: CephadmDaemonDeploySpec, + cert_attr: str, + key_attr: str, + cert_name: str, + key_name: str ) -> CertKeyPair: """ Fetch and persist the TLS certificate and key for a service spec. Returns: A CertKeyPair if both are available; otherwise EMPTY_TLS_KEYPAIR. """ - cert = getattr(svc_spec, 'ssl_cert', None) - key = getattr(svc_spec, 'ssl_key', None) + cert = getattr(svc_spec, cert_attr, None) + key = getattr(svc_spec, key_attr, None) if cert and key: service_name = svc_spec.service_name() host = daemon_spec.host - self.mgr.cert_mgr.save_cert(self.cert_name, cert, service_name, host, user_made=True) - self.mgr.cert_mgr.save_key(self.key_name, key, service_name, host, user_made=True) + self.mgr.cert_mgr.save_cert(cert_name, cert, service_name, host, user_made=True) + self.mgr.cert_mgr.save_key(key_name, key, service_name, host, user_made=True) return CertKeyPair(cert=cert, key=key) logger.error( @@ -372,23 +405,30 @@ class CephadmService(metaclass=ABCMeta): ) return EMPTY_TLS_KEYPAIR - def _get_certificates_from_certmgr_store(self, svc_spec: ServiceSpec, fqdns: List[str]) -> CertKeyPair: + def _get_certificates_from_certmgr_store( + self, + svc_spec: ServiceSpec, + fqdns: List[str], + cert_name: str, + key_name: str + ) -> CertKeyPair: host = fqdns[0] if fqdns else None - cert = self.mgr.cert_mgr.get_cert(self.cert_name, svc_spec.service_name(), host) - key = self.mgr.cert_mgr.get_key(self.key_name, svc_spec.service_name(), host) + cert = self.mgr.cert_mgr.get_cert(cert_name, svc_spec.service_name(), host) + key = self.mgr.cert_mgr.get_key(key_name, svc_spec.service_name(), host) if cert and key: return CertKeyPair(cert=cert, key=key) else: - logger.error(f'Failed to get cert/key {self.cert_name} for service {svc_spec.service_name()} host: {host} from the certmgr store.') + logger.error(f'Failed to get cert/key {cert_name} for service {svc_spec.service_name()} host: {host} from the certmgr store.') return EMPTY_TLS_KEYPAIR - def _get_cephadm_signed_certificates(self, - svc_spec: ServiceSpec, - daemon_spec: CephadmDaemonDeploySpec, - ips: Optional[List[str]] = None, - fqdns: Optional[List[str]] = None, - custom_sans: Optional[List[str]] = None, - ) -> CertKeyPair: + def _get_cephadm_signed_certificates( + self, + svc_spec: ServiceSpec, + daemon_spec: CephadmDaemonDeploySpec, + ips: List[str], + fqdns: List[str], + custom_sans: List[str], + ) -> CertKeyPair: custom_sans = custom_sans or svc_spec.custom_sans or [] ips = ips or [self.mgr.inventory.get_addr(daemon_spec.host)]