self.mgr.cert_mgr.save_self_signed_cert_key_pair(svc_name, tls_creds, host=daemon_spec.host, label=label)
return tls_creds
+ def _get_certificates_from_spec_ssl_certificates(
+ self,
+ svc_spec: ServiceSpec,
+ daemon_spec: CephadmDaemonDeploySpec,
+ feature: Optional[str] = None
+ ) -> TLSCredentials:
+ return EMPTY_TLS_CREDENTIALS
+
def get_certificates(self,
daemon_spec: CephadmDaemonDeploySpec,
ips: List[str] = [],
fqdns: List[str] = [],
custom_sans: List[str] = [],
- ca_cert_required: bool = False
+ ca_cert_required: bool = False,
+ feature: Optional[str] = None
) -> TLSCredentials:
svc_spec = cast(ServiceSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
ips=ips,
fqdns=fqdns,
custom_sans=custom_sans,
+ feature=feature,
)
def get_certificates_generic(
custom_sans: Optional[List[str]] = None,
ips: Optional[List[str]] = None,
fqdns: Optional[List[str]] = None,
+ feature: Optional[str] = None,
) -> TLSCredentials:
ips = ips or [self.mgr.inventory.get_addr(daemon_spec.host)]
cert_source = getattr(svc_spec, cert_source_attr, None)
logger.debug(f'Getting certificate for {svc_spec.service_name()} using source: {cert_source}')
- if cert_source == CertificateSource.INLINE.value:
+ if feature is not None:
+ return self._get_certificates_from_spec_ssl_certificates(svc_spec, daemon_spec, feature)
+ elif cert_source == CertificateSource.INLINE.value:
return self._get_certificates_from_spec(svc_spec, daemon_spec, cert_attr, key_attr, cert_name, key_name, ca_cert_attr, ca_cert_name)
elif cert_source == CertificateSource.REFERENCE.value:
return self._get_certificates_from_certmgr_store(svc_spec, fqdns, cert_name, key_name, ca_cert_name)