return cert_obj is not None
def is_cert_editable(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> bool:
- cert_obj = cast(Cert, self.cert_store.get_tlsobject(cert_name, service_name, host))
- return cert_obj.editable if cert_obj else True
+ # By definition a certificate which doesn't not exist it's considered
+ # editable so user can populate its content by using a custom value.
+ if self.cert_store.tlsobject_exists(cert_name):
+ cert_obj = cast(Cert, self.cert_store.get_tlsobject(cert_name, service_name, host))
+ return cert_obj.editable if cert_obj else True
+ else:
+ return True
def get_cert(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> Optional[str]:
cert_obj = cast(Cert, self.cert_store.get_tlsobject(cert_name, service_name, host))
self.rm_cert(self.self_signed_cert(service_name, label), service_name, host)
self.rm_key(self.self_signed_key(service_name, label), service_name, host)
+ def rm_cert_if_present(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> bool:
+ try:
+ return self.rm_cert(cert_name, service_name, host)
+ except TLSObjectException:
+ logger.debug(
+ "TLS cert %s for service=%s host=%s not found; nothing to remove",
+ cert_name, service_name, host,
+ )
+ return False
+
+ def rm_key_if_present(self, key_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> bool:
+ try:
+ return self.rm_key(key_name, service_name, host)
+ except TLSObjectException:
+ logger.debug(
+ "TLS key %s for service=%s host=%s not found; nothing to remove",
+ key_name, service_name, host,
+ )
+ return False
+
+ def rm_self_signed_cert_key_pair_if_present(self, service_name: str, host: str, label: Optional[str] = None) -> None:
+ self.rm_cert_if_present(self.self_signed_cert(service_name, label), service_name, host)
+ self.rm_key_if_present(self.self_signed_key(service_name, label), service_name, host)
+
def cert_ls(self, filter_by: str = '',
include_details: bool = False,
include_cephadm_signed: bool = False) -> Dict:
"""
Called after the daemon is removed.
"""
+
+ def _cleanup_tls_creds_for_host(svc_name: str, host: str, cert_source: Optional[str]) -> None:
+
+ if cert_source == CertificateSource.CEPHADM_SIGNED.value:
+ logger.info(f"Removing cephadm-signed certificate/key for service: {svc_name}, host: {host}")
+ self.mgr.cert_mgr.rm_self_signed_cert_key_pair(svc_name, host)
+ elif cert_source == CertificateSource.INLINE.value:
+ logger.info(f"Removing inline-saved certificate/key for service: {svc_name}, host: {host}")
+ self.mgr.cert_mgr.rm_cert(self.cert_name, svc_name, host)
+ self.mgr.cert_mgr.rm_key(self.key_name, svc_name, host)
+ if self.ca_cert_name:
+ self.mgr.cert_mgr.rm_cert(self.ca_cert_name, svc_name, host)
+ elif cert_source == CertificateSource.REFERENCE.value:
+ # It's a reference cert/key to the certmgr so we must keep them as user may want to use them later
+ logger.info(f"Keeping referenced certificate/key for service: {svc_name}, host: {host}")
+ elif cert_source is None:
+ # This could happen when TLS is being disabled in spec by the user, so we lose the value
+ # of the certificate source but we still have to do our best to cleanup the TLS credentials.
+ logger.info(f"Removing certificate/key for service: {svc_name}, host: {host}")
+ self.mgr.cert_mgr.rm_self_signed_cert_key_pair_if_present(svc_name, host)
+ # try to remove inline certs (if any)
+ if not self.mgr.cert_mgr.is_cert_editable(self.cert_name, svc_name, host):
+ self.mgr.cert_mgr.rm_cert_if_present(self.cert_name, svc_name, host)
+ self.mgr.cert_mgr.rm_key_if_present(self.key_name, svc_name, host)
+ if self.ca_cert_name:
+ self.mgr.cert_mgr.rm_cert_if_present(self.ca_cert_name, svc_name, host)
+ else:
+ logger.error(f"Unknown cert-source {cert_source}. Cannot remove cert/key for: {svc_name}, host: {host}")
+
assert daemon.daemon_type is not None
assert daemon.hostname
assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
if svc_name not in self.mgr.spec_store:
return
+ host = daemon.hostname
spec = self.mgr.spec_store[svc_name].spec
if not spec.ssl:
+ # Service no longer uses TLS; safe to clean up TLS credentials for this host
+ _cleanup_tls_creds_for_host(svc_name, host, spec.certificate_source)
return
- host = daemon.hostname
- cert_source = spec.certificate_source
- if cert_source == CertificateSource.CEPHADM_SIGNED.value:
- logger.info(f"Removing cephadm-signed certificate/key for service: {svc_name}, host: {host}")
- self.mgr.cert_mgr.rm_self_signed_cert_key_pair(svc_name, host)
- elif cert_source == CertificateSource.INLINE.value:
- logger.info(f"Removing inline-saved certificate/key for service: {svc_name}, host: {host}")
- self.mgr.cert_mgr.rm_cert(self.cert_name, svc_name, host)
- self.mgr.cert_mgr.rm_key(self.key_name, svc_name, host)
- if self.ca_cert_name:
- self.mgr.cert_mgr.rm_cert(self.ca_cert_name, svc_name, host)
+ remaining = [
+ d for d in self.mgr.cache.get_daemons_by_service(svc_name)
+ if d.hostname == host
+ ]
+ if remaining:
+ # The service still has daemons running on this host (e.g. during an HTTP -> HTTPS
+ # transition the daemons running on :80 are removed but new daemons running on :443
+ # area created), so the TLS cert/key may still be in use. Do not remove it yet.
+ logger.info(
+ "Not removing TLS cert/key for %s on %s: still %d daemons present",
+ svc_name, host, len(remaining)
+ )
else:
- # It's a reference cert/key to the certmgr so we must keep them as user may want to use them later
- logger.info(f"Keeping referenced certificate/key for service: {svc_name}, host: {host}")
+ _cleanup_tls_creds_for_host(svc_name, host, spec.certificate_source)
def purge(self, service_name: str) -> None:
"""Called to carry out any purge tasks following service removal"""
entity = self.get_auth_entity(daemon_id, host=host)
- logger.info(f'Removing key for {entity}')
+ logger.info(f'Removing keyring for {entity}')
ret, out, err = self.mgr.mon_command({
'prefix': 'auth rm',
'entity': entity,