if not self.requires_certificates or not svc_spec.ssl:
return EMPTY_TLS_KEYPAIR
+ return self.get_certificates_generic(
+ svc_spec=svc_spec,
+ daemon_spec=daemon_spec,
+ cert_attr='ssl_cert',
+ key_attr='ssl_key',
+ cert_source_attr='certificate_source',
+ cert_name=self.cert_name,
+ key_name=self.key_name,
+ ips=ips,
+ fqdns=fqdns,
+ custom_sans=custom_sans
+ )
+
+ def get_certificates_generic(
+ self,
+ svc_spec: ServiceSpec,
+ daemon_spec: CephadmDaemonDeploySpec,
+ cert_attr: str,
+ key_attr: str,
+ cert_source_attr: str,
+ cert_name: str,
+ key_name: str,
+ custom_sans: Optional[List[str]] = None,
+ ips: Optional[List[str]] = None,
+ fqdns: Optional[List[str]] = None
+ ) -> CertKeyPair:
+
ips = ips or [self.mgr.inventory.get_addr(daemon_spec.host)]
fqdns = fqdns or [self.mgr.get_fqdn(daemon_spec.host)]
+ custom_sans = custom_sans or []
- cert_source = svc_spec.certificate_source
+ cert_source = getattr(svc_spec, cert_source_attr, None)
logger.debug(f'Getting certificate for {svc_spec.service_name()} using source: {cert_source}')
+
if cert_source == CertificateSource.INLINE.value:
- return self._get_certificates_from_spec(svc_spec, daemon_spec)
+ return self._get_certificates_from_spec(svc_spec, daemon_spec, cert_attr, key_attr, cert_name, key_name)
elif cert_source == CertificateSource.REFERENCE.value:
- return self._get_certificates_from_certmgr_store(svc_spec, fqdns)
+ return self._get_certificates_from_certmgr_store(svc_spec, fqdns, cert_name, key_name)
elif cert_source == CertificateSource.CEPHADM_SIGNED.value:
return self._get_cephadm_signed_certificates(svc_spec, daemon_spec, ips, fqdns, custom_sans)
else:
def _get_certificates_from_spec(
self,
svc_spec: ServiceSpec,
- daemon_spec: CephadmDaemonDeploySpec
+ daemon_spec: CephadmDaemonDeploySpec,
+ cert_attr: str,
+ key_attr: str,
+ cert_name: str,
+ key_name: str
) -> CertKeyPair:
"""
Fetch and persist the TLS certificate and key for a service spec.
Returns:
A CertKeyPair if both are available; otherwise EMPTY_TLS_KEYPAIR.
"""
- cert = getattr(svc_spec, 'ssl_cert', None)
- key = getattr(svc_spec, 'ssl_key', None)
+ cert = getattr(svc_spec, cert_attr, None)
+ key = getattr(svc_spec, key_attr, None)
if cert and key:
service_name = svc_spec.service_name()
host = daemon_spec.host
- self.mgr.cert_mgr.save_cert(self.cert_name, cert, service_name, host, user_made=True)
- self.mgr.cert_mgr.save_key(self.key_name, key, service_name, host, user_made=True)
+ self.mgr.cert_mgr.save_cert(cert_name, cert, service_name, host, user_made=True)
+ self.mgr.cert_mgr.save_key(key_name, key, service_name, host, user_made=True)
return CertKeyPair(cert=cert, key=key)
logger.error(
)
return EMPTY_TLS_KEYPAIR
- def _get_certificates_from_certmgr_store(self, svc_spec: ServiceSpec, fqdns: List[str]) -> CertKeyPair:
+ def _get_certificates_from_certmgr_store(
+ self,
+ svc_spec: ServiceSpec,
+ fqdns: List[str],
+ cert_name: str,
+ key_name: str
+ ) -> CertKeyPair:
host = fqdns[0] if fqdns else None
- cert = self.mgr.cert_mgr.get_cert(self.cert_name, svc_spec.service_name(), host)
- key = self.mgr.cert_mgr.get_key(self.key_name, svc_spec.service_name(), host)
+ cert = self.mgr.cert_mgr.get_cert(cert_name, svc_spec.service_name(), host)
+ key = self.mgr.cert_mgr.get_key(key_name, svc_spec.service_name(), host)
if cert and key:
return CertKeyPair(cert=cert, key=key)
else:
- logger.error(f'Failed to get cert/key {self.cert_name} for service {svc_spec.service_name()} host: {host} from the certmgr store.')
+ logger.error(f'Failed to get cert/key {cert_name} for service {svc_spec.service_name()} host: {host} from the certmgr store.')
return EMPTY_TLS_KEYPAIR
- def _get_cephadm_signed_certificates(self,
- svc_spec: ServiceSpec,
- daemon_spec: CephadmDaemonDeploySpec,
- ips: Optional[List[str]] = None,
- fqdns: Optional[List[str]] = None,
- custom_sans: Optional[List[str]] = None,
- ) -> CertKeyPair:
+ def _get_cephadm_signed_certificates(
+ self,
+ svc_spec: ServiceSpec,
+ daemon_spec: CephadmDaemonDeploySpec,
+ ips: List[str],
+ fqdns: List[str],
+ custom_sans: List[str],
+ ) -> CertKeyPair:
custom_sans = custom_sans or svc_spec.custom_sans or []
ips = ips or [self.mgr.inventory.get_addr(daemon_spec.host)]