from cephadm.ssl_cert_utils import SSLCerts, SSLConfigException
from mgr_util import verify_tls, certificate_days_to_expire, ServerConfigException
from cephadm.ssl_cert_utils import get_certificate_info, get_private_key_info
-from cephadm.tlsobject_types import Cert, PrivKey, TLSObjectScope, TLSObjectException, TLSCredentials
+from cephadm.tlsobject_types import Cert, PrivKey, TLSObjectScope, TLSObjectException, TLSObjectProtocol, TLSCredentials
from cephadm.tlsobject_store import TLSObjectStore
if TYPE_CHECKING:
self.cert_store.save_tlsobject(ss_cert_name, tls_creds.cert, host=host, user_made=False)
self.key_store.save_tlsobject(ss_key_name, tls_creds.key, host=host, user_made=False)
+ def _is_inline_saved_tlsobject(self, obj: Optional[TLSObjectProtocol]) -> bool:
+ # Inline-saved credentials are persisted as user_made=True but editable=False.
+ return bool(obj and getattr(obj, 'user_made', False) and not getattr(obj, 'editable', True))
+
+ def rm_inline_saved_cert_key_pair(
+ self,
+ cert_name: str,
+ key_name: str,
+ service_name: Optional[str] = None,
+ host: Optional[str] = None,
+ ca_cert_name: Optional[str] = None,
+ ) -> None:
+ """Remove inline-saved (non-editable) TLS objects for a given service/host.
+ This intentionally does *not* remove user-provisioned certmgr entries
+ (editable=True), which are considered reusable references.
+ """
+ context = f"service={service_name!r}, host={host!r}" if host else f"service={service_name!r}"
+
+ cert_obj = cast(Cert, self.cert_store.get_tlsobject_if_exists(cert_name, service_name, host))
+ if self._is_inline_saved_tlsobject(cert_obj):
+ logger.info("Removing inline-saved cert %r (%s)", cert_name, context)
+ self.rm_cert_if_present(cert_name, service_name, host)
+ elif cert_obj:
+ logger.info("Skipping cert removal for %r — exists but is not inline-saved (editable=True) (%s)", cert_name, context)
+
+ key_obj = cast(PrivKey, self.key_store.get_tlsobject_if_exists(key_name, service_name, host))
+ if self._is_inline_saved_tlsobject(key_obj):
+ logger.info("Removing inline-saved private key %r (%s)", key_name, context)
+ self.rm_key_if_present(key_name, service_name, host)
+ elif key_obj:
+ logger.info("Skipping key removal for %r — exists but is not inline-saved (editable=True) (%s)", key_name, context)
+
+ if ca_cert_name:
+ ca_obj = cast(Cert, self.cert_store.get_tlsobject_if_exists(ca_cert_name, service_name, host))
+ if self._is_inline_saved_tlsobject(ca_obj):
+ logger.info("Removing inline-saved CA cert %r (%s)", ca_cert_name, context)
+ self.rm_cert_if_present(ca_cert_name, service_name, host)
+ elif ca_obj:
+ logger.info("Skipping CA cert removal for %r — exists but is not inline-saved (editable=True) (%s)", ca_cert_name, context)
+
def rm_cert(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> bool:
return self.cert_store.rm_tlsobject(cert_name, service_name, host)
try:
return self.rm_cert(cert_name, service_name, host)
except TLSObjectException:
- logger.debug(
- "TLS cert %s for service=%s host=%s not found; nothing to remove",
- cert_name, service_name, host,
- )
return False
def rm_key_if_present(self, key_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> bool:
try:
return self.rm_key(key_name, service_name, host)
except TLSObjectException:
- logger.debug(
- "TLS key %s for service=%s host=%s not found; nothing to remove",
- key_name, service_name, host,
- )
return False
- def rm_self_signed_cert_key_pair_if_present(self, service_name: str, host: str, label: Optional[str] = None) -> None:
- self.rm_cert_if_present(self.self_signed_cert(service_name, label), service_name, host)
- self.rm_key_if_present(self.self_signed_key(service_name, label), service_name, host)
+ def try_rm_self_signed_cert_key_pair(self, service_name: str, host: str, label: Optional[str] = None) -> None:
+ cert_removed = self.rm_cert_if_present(self.self_signed_cert(service_name, label), service_name, host)
+ key_removed = self.rm_key_if_present(self.self_signed_key(service_name, label), service_name, host)
+ if cert_removed or key_removed:
+ logger.info(
+ f"Removing cephadm-signed cert/key for service: {service_name}, host: {host}: "
+ f"key/cert removed: {key_removed}/{cert_removed}"
+ )
def cert_ls(self, filter_by: str = '',
include_details: bool = False,
if spec.is_using_certificates_source(CertificateSource.REFERENCE):
svc = service_registry.get_service(spec.service_type)
if svc.SCOPE == TLSObjectScope.SERVICE:
+ # If the service was previously configured with INLINE, the cert/key
+ # may exist in the certmgr store as non-editable (inline-saved).
+ # When switching to REFERENCE, those inline-saved entries must be
+ # removed to avoid "pinning" the service to the old inline values and
+ # to allow the user to provision editable entries via the certmgr CLI.
+ if self.cert_mgr.cert_exists(svc.cert_name, service_name=spec.service_name(), host=None) and \
+ not self.cert_mgr.is_cert_editable(svc.cert_name, service_name=spec.service_name(), host=None):
+ self.log.info(
+ f"Removing inline-saved (non-editable) cert/key for '{spec.service_name()}' "
+ f"before switching to '{CertificateSource.REFERENCE.value}'."
+ )
+ self.cert_mgr.rm_inline_saved_cert_key_pair(
+ svc.cert_name,
+ svc.key_name,
+ service_name=spec.service_name(),
+ host=None,
+ ca_cert_name=getattr(svc, 'ca_cert_name', None),
+ )
if not self.cert_mgr.cert_exists(svc.cert_name, service_name=spec.service_name(), host=None):
raise OrchestratorError(
f"\n\nSSL is configured with '{CertificateSource.REFERENCE.value}', but cannot find an entry for the service '{spec.service_name()}'"
f"\nunder the certificate '{svc.cert_name}' within the certmgr store. To set the certificate, use:\n"
- f"\n > ceph orch certmgr cert set --cert-name {svc.cert_name} --service-name {spec.service_name()} -i <cert-key-pem-file> \n"
+ f"\n > ceph orch certmgr cert-key set --consumer {spec.service_type} --service-name {spec.service_name()} -i <cert-key-pem-file> \n"
)
else:
cert_warning += (
cert_source = getattr(svc_spec, cert_source_attr, None)
logger.debug(f'Getting certificate for {svc_spec.service_name()} using source: {cert_source}')
+ # Reconcile TLS objects when switching certificate sources.
+ #
+ # - Inline-saved certs/keys are persisted in the certmgr store as user_made=True
+ # but editable=False. These should be garbage-collected once the service no
+ # longer uses INLINE.
+ # - Cephadm-signed certs/keys are stored under cephadm-signed_* entities and
+ # should be removed when the service no longer uses CEPHADM_SIGNED.
+ svc_name = svc_spec.service_name()
+ host = daemon_spec.host
+ if cert_source in (CertificateSource.REFERENCE.value, CertificateSource.CEPHADM_SIGNED.value):
+ self.mgr.cert_mgr.rm_inline_saved_cert_key_pair(
+ cert_name,
+ key_name,
+ service_name=svc_name,
+ host=host,
+ ca_cert_name=ca_cert_name,
+ )
+ if cert_source != CertificateSource.CEPHADM_SIGNED.value:
+ # Best-effort: the cephadm-signed entities might not be registered if the
+ # service never used CEPHADM_SIGNED (or after a manager restart).
+ self.mgr.cert_mgr.try_rm_self_signed_cert_key_pair(svc_name, host)
+
if cert_source == CertificateSource.INLINE.value:
return self._get_certificates_from_spec(svc_spec, daemon_spec, cert_attr, key_attr, cert_name, key_name, ca_cert_attr, ca_cert_name)
elif cert_source == CertificateSource.REFERENCE.value:
Called after the daemon is removed.
"""
- def _cleanup_tls_creds_for_host(svc_name: str, host: str, cert_source: Optional[str]) -> None:
-
- if cert_source == CertificateSource.CEPHADM_SIGNED.value:
- logger.info(f"Removing cephadm-signed certificate/key for service: {svc_name}, host: {host}")
- self.mgr.cert_mgr.rm_self_signed_cert_key_pair(svc_name, host)
- elif cert_source == CertificateSource.INLINE.value:
- logger.info(f"Removing inline-saved certificate/key for service: {svc_name}, host: {host}")
- self.mgr.cert_mgr.rm_cert(self.cert_name, svc_name, host)
- self.mgr.cert_mgr.rm_key(self.key_name, svc_name, host)
- if self.ca_cert_name:
- self.mgr.cert_mgr.rm_cert(self.ca_cert_name, svc_name, host)
- elif cert_source == CertificateSource.REFERENCE.value:
+ def _cleanup_tls_creds_for_host(svc_name: str, host: str, cert_source: Optional[str], other_daemons_in_service: bool) -> None:
+
+ # Daemons can be removed after the service has switched certificate sources. Let's Clean up
+ # any cephadm-signed leftovers for this host regardless of the *current* source:
+ self.mgr.cert_mgr.try_rm_self_signed_cert_key_pair(svc_name, host)
+
+ # Inline-saved cleanup depends on TLS object scope:
+ # - HOST scope: safe to remove for this host when no other daemons remain on this host
+ # - SERVICE/GLOBAL scope: only remove when this is the last daemon of the service
+ if self.SCOPE in (TLSObjectScope.SERVICE, TLSObjectScope.GLOBAL):
+ if other_daemons_in_service:
+ return
+ self.mgr.cert_mgr.rm_inline_saved_cert_key_pair(
+ self.cert_name,
+ self.key_name,
+ service_name=svc_name,
+ host=None,
+ ca_cert_name=self.ca_cert_name,
+ )
+ else: # Host Scope
+ self.mgr.cert_mgr.rm_inline_saved_cert_key_pair(
+ self.cert_name,
+ self.key_name,
+ service_name=svc_name,
+ host=host,
+ ca_cert_name=self.ca_cert_name,
+ )
+
+ if cert_source == CertificateSource.REFERENCE.value:
# It's a reference cert/key to the certmgr so we must keep them as user may want to use them later
- logger.info(f"Keeping referenced certificate/key for service: {svc_name}, host: {host}")
- elif cert_source is None:
- # This could happen when TLS is being disabled in spec by the user, so we lose the value
- # of the certificate source but we still have to do our best to cleanup the TLS credentials.
- logger.info(f"Removing certificate/key for service: {svc_name}, host: {host}")
- self.mgr.cert_mgr.rm_self_signed_cert_key_pair_if_present(svc_name, host)
- # try to remove inline certs (if any)
- if not self.mgr.cert_mgr.is_cert_editable(self.cert_name, svc_name, host):
- self.mgr.cert_mgr.rm_cert_if_present(self.cert_name, svc_name, host)
- self.mgr.cert_mgr.rm_key_if_present(self.key_name, svc_name, host)
- if self.ca_cert_name:
- self.mgr.cert_mgr.rm_cert_if_present(self.ca_cert_name, svc_name, host)
- else:
- logger.error(f"Unknown cert-source {cert_source}. Cannot remove cert/key for: {svc_name}, host: {host}")
+ logger.info(
+ "Certificate source is 'reference'; user-provided certmgr entries (if present) will be kept for service: %s, host: %s",
+ svc_name, host
+ )
assert daemon.daemon_type is not None
assert daemon.hostname
assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
- if self.requires_certificates:
+ if not self.requires_certificates:
+ # no certificates cleanup actions are needed
+ return
- svc_name = daemon.service_name()
- if svc_name not in self.mgr.spec_store:
- return
+ svc_name = daemon.service_name()
+ if svc_name not in self.mgr.spec_store:
+ return
- host = daemon.hostname
- spec = self.mgr.spec_store[svc_name].spec
- if not spec.ssl:
- # Service no longer uses TLS; safe to clean up TLS credentials for this host
- _cleanup_tls_creds_for_host(svc_name, host, spec.certificate_source)
- return
+ host = daemon.hostname
+ spec = self.mgr.spec_store[svc_name].spec
+
+ # If TLS is disabled, we may not have a meaningful source anymore
+ cert_source = spec.certificate_source if spec.ssl else None
+
+ daemons = [
+ d for d in self.mgr.cache.get_daemons_by_service(svc_name)
+ if d.name() != daemon.name()
+ ]
+ remaining_on_host = [d for d in daemons if d.hostname == host]
+ if remaining_on_host:
+ logger.info(
+ "Not removing TLS cert/key for %s on %s: still %d daemons present",
+ svc_name, host, len(remaining_on_host)
+ )
+ return
+
+ _cleanup_tls_creds_for_host(svc_name, host, cert_source, other_daemons_in_service=bool(daemons))
- remaining = [
- d for d in self.mgr.cache.get_daemons_by_service(svc_name)
- if d.hostname == host
- ]
- if remaining:
- # The service still has daemons running on this host (e.g. during an HTTP -> HTTPS
- # transition the daemons running on :80 are removed but new daemons running on :443
- # area created), so the TLS cert/key may still be in use. Do not remove it yet.
- logger.info(
- "Not removing TLS cert/key for %s on %s: still %d daemons present",
- svc_name, host, len(remaining)
- )
- else:
- _cleanup_tls_creds_for_host(svc_name, host, spec.certificate_source)
def purge(self, service_name: str) -> None:
"""Called to carry out any purge tasks following service removal"""
else:
return TLSObjectScope.UNKNOWN, None
+ def get_tlsobject_if_exists(self,
+ obj_name: str,
+ service_name: Optional[str] = None,
+ host: Optional[str] = None) -> Optional[TLSObjectProtocol]:
+ try:
+ return self.get_tlsobject(obj_name, service_name, host)
+ except TLSObjectException:
+ return None
+
def get_tlsobject(self, obj_name: str,
service_name: Optional[str] = None,
host: Optional[str] = None) -> Optional[TLSObjectProtocol]: