From 284ad001449e8fcd181f10453d6d0ba40591aa42 Mon Sep 17 00:00:00 2001 From: Redouane Kachach Date: Wed, 13 Aug 2025 19:11:44 +0200 Subject: [PATCH] mgr/cephadm: adding automated certificates mgmt for cephadmservice Signed-off-by: Redouane Kachach --- .../mgr/cephadm/services/cephadmservice.py | 201 +++++++++++++++--- src/pybind/mgr/cephadm/services/monitoring.py | 16 -- .../mgr/cephadm/services/service_registry.py | 4 + 3 files changed, 171 insertions(+), 50 deletions(-) diff --git a/src/pybind/mgr/cephadm/services/cephadmservice.py b/src/pybind/mgr/cephadm/services/cephadmservice.py index 3dbb31eafc4..5205bf2e7f1 100644 --- a/src/pybind/mgr/cephadm/services/cephadmservice.py +++ b/src/pybind/mgr/cephadm/services/cephadmservice.py @@ -20,6 +20,8 @@ from ceph.deployment.service_spec import ( MONSpec, RGWSpec, ServiceSpec, + CertificateSource, + RequiresCertificatesEntry ) from ceph.deployment.utils import is_ipv6, unwrap_ipv6 from mgr_util import build_url, merge_dicts @@ -32,6 +34,8 @@ from orchestrator import ( from orchestrator._interface import daemon_type_to_service from cephadm import utils from .service_registry import register_cephadm_service +from cephadm.tlsobject_types import TLSObjectScope, CertKeyPair, EMPTY_TLS_KEYPAIR +from cephadm.ssl_cert_utils import extract_ips_and_fqdns_from_cert if TYPE_CHECKING: from cephadm.module import CephadmOrchestrator @@ -270,6 +274,30 @@ class CephadmService(metaclass=ABCMeta): def needs_monitoring(self) -> bool: return False + @property + def requires_certificates(self) -> bool: + return self.TYPE in ServiceSpec.REQUIRES_CERTIFICATES + + @property + def allows_user_certificates(self) -> bool: + config = ServiceSpec.REQUIRES_CERTIFICATES.get(self.TYPE) + return config is not None and bool(config.get("user_cert_allowed", False)) + + @property + def SCOPE(self) -> TLSObjectScope: + entry: Optional[RequiresCertificatesEntry] = ServiceSpec.REQUIRES_CERTIFICATES.get(self.TYPE) + if not entry: + return TLSObjectScope.UNKNOWN + return TLSObjectScope(entry['scope']) + + @property + def cert_name(self) -> str: + return f"{self.TYPE.replace('-', '_')}_ssl_cert" + + @property + def key_name(self) -> str: + return f"{self.TYPE.replace('-', '_')}_ssl_key" + @property @abstractmethod def TYPE(self) -> str: @@ -284,6 +312,101 @@ class CephadmService(metaclass=ABCMeta): def __init__(self, mgr: "CephadmOrchestrator"): self.mgr: "CephadmOrchestrator" = mgr + def get_self_signed_certificates_with_label(self, svc_spec: ServiceSpec, daemon_spec: CephadmDaemonDeploySpec, label: str) -> CertKeyPair: + svc_name = svc_spec.service_name() + ip = self.mgr.inventory.get_addr(daemon_spec.host) + host_fqdn = self.mgr.get_fqdn(daemon_spec.host) + tls_pair = self.mgr.cert_mgr.get_self_signed_cert_key_pair(svc_name, host_fqdn, label) + if not tls_pair: + tls_pair = self.mgr.cert_mgr.generate_cert(host_fqdn, ip) + self.mgr.cert_mgr.save_self_signed_cert_key_pair(svc_name, tls_pair, host=daemon_spec.host, label=label) + return tls_pair + + def get_certificates(self, + daemon_spec: CephadmDaemonDeploySpec, + ips: List[str] = [], + fqdns: List[str] = [], + custom_sans: List[str] = [] + ) -> CertKeyPair: + + svc_spec = cast(ServiceSpec, self.mgr.spec_store[daemon_spec.service_name].spec) + if not self.requires_certificates or not svc_spec.ssl: + return EMPTY_TLS_KEYPAIR + + ips = ips or [self.mgr.inventory.get_addr(daemon_spec.host)] + fqdns = fqdns or [self.mgr.get_fqdn(daemon_spec.host)] + + cert_source = svc_spec.certificate_source + logger.debug(f'Getting certificate for {svc_spec.service_name()} using source: {cert_source}') + if cert_source == CertificateSource.INLINE.value: + return self._get_certificates_from_spec(svc_spec, daemon_spec) + elif cert_source == CertificateSource.REFERENCE.value: + return self._get_certificates_from_certmgr_store(svc_spec, fqdns) + elif cert_source == CertificateSource.CEPHADM_SIGNED.value: + return self._get_cephadm_signed_certificates(svc_spec, daemon_spec, ips, fqdns, custom_sans) + else: + logger.error(f'Invalid cert_source: {cert_source}') + return EMPTY_TLS_KEYPAIR + + def _get_certificates_from_spec( + self, + svc_spec: ServiceSpec, + daemon_spec: CephadmDaemonDeploySpec + ) -> CertKeyPair: + """ + Fetch and persist the TLS certificate and key for a service spec. + Returns: + A CertKeyPair if both are available; otherwise EMPTY_TLS_KEYPAIR. + """ + cert = getattr(svc_spec, 'ssl_cert', None) + key = getattr(svc_spec, 'ssl_key', None) + if cert and key: + service_name = svc_spec.service_name() + host = daemon_spec.host + self.mgr.cert_mgr.save_cert(self.cert_name, cert, service_name, host, user_made=True) + self.mgr.cert_mgr.save_key(self.key_name, key, service_name, host, user_made=True) + return CertKeyPair(cert=cert, key=key) + + logger.error( + f"Cannot get cert/key '{self.cert_name}/{self.key_name}' for service '{svc_spec.service_name()}'" + ) + return EMPTY_TLS_KEYPAIR + + def _get_certificates_from_certmgr_store(self, svc_spec: ServiceSpec, fqdns: List[str]) -> CertKeyPair: + host = fqdns[0] if fqdns else None + cert = self.mgr.cert_mgr.get_cert(self.cert_name, svc_spec.service_name(), host) + key = self.mgr.cert_mgr.get_key(self.key_name, svc_spec.service_name(), host) + if cert and key: + return CertKeyPair(cert=cert, key=key) + else: + logger.error(f'Failed to get cert/key {self.cert_name} for service {svc_spec.service_name()} host: {host} from the certmgr store.') + return EMPTY_TLS_KEYPAIR + + def _get_cephadm_signed_certificates(self, + svc_spec: ServiceSpec, + daemon_spec: CephadmDaemonDeploySpec, + ips: Optional[List[str]] = None, + fqdns: Optional[List[str]] = None, + custom_sans: Optional[List[str]] = None, + ) -> CertKeyPair: + + custom_sans = custom_sans or svc_spec.custom_sans or [] + ips = ips or [self.mgr.inventory.get_addr(daemon_spec.host)] + fqdns = fqdns or [self.mgr.get_fqdn(daemon_spec.host)] + tls_pair = self.mgr.cert_mgr.get_self_signed_cert_key_pair(svc_spec.service_name(), daemon_spec.host) + if tls_pair: + combined_fqdns = sorted(set(s.lower() for s in fqdns + custom_sans)) + cert_ips, cert_fqdns = extract_ips_and_fqdns_from_cert(tls_pair.cert) + if sorted(cert_ips) == sorted(ips) and sorted(cert_fqdns) == sorted(combined_fqdns): + # Nothing has changed, use the stored certifiactes + return tls_pair + + # Either there were not certs or ips/fqdns have changed generate new cets + tls_pair = self.mgr.cert_mgr.generate_cert(fqdns, ips, custom_sans) + self.mgr.cert_mgr.save_self_signed_cert_key_pair(svc_spec.service_name(), tls_pair, host=daemon_spec.host) + + return tls_pair + def allow_colo(self) -> bool: """ Return True if multiple daemons of the same type can colocate on @@ -346,8 +469,17 @@ class CephadmService(metaclass=ABCMeta): init_containers=getattr(spec, 'init_containers', None), ) + def register_for_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> None: + if not self.requires_certificates: + return + spec = self.mgr.spec_store[daemon_spec.service_name].spec + if spec.is_using_certificates_source(CertificateSource.CEPHADM_SIGNED): + self.mgr.cert_mgr.register_self_signed_cert_key_pair(spec.service_name()) + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: - raise NotImplementedError() + self.register_for_certificates(daemon_spec) + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + return daemon_spec def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: raise NotImplementedError() @@ -573,9 +705,33 @@ class CephadmService(metaclass=ABCMeta): Called after the daemon is removed. """ assert daemon.daemon_type is not None + assert daemon.hostname assert self.TYPE == daemon_type_to_service(daemon.daemon_type) logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}') + if self.requires_certificates: + + svc_name = daemon.service_name() + if svc_name not in self.mgr.spec_store: + return + + spec = self.mgr.spec_store[svc_name].spec + if not spec.ssl: + return + + host = daemon.hostname + cert_source = spec.certificate_source + if cert_source == CertificateSource.CEPHADM_SIGNED.value: + logger.info(f"Removing cephadm-signed certificate/key for service: {svc_name}, host: {host}") + self.mgr.cert_mgr.rm_self_signed_cert_key_pair(svc_name, host) + elif cert_source == CertificateSource.INLINE.value: + logger.info(f"Removing inline-saved certificate/key for service: {svc_name}, host: {host}") + self.mgr.cert_mgr.rm_cert(self.cert_name, svc_name, host) + self.mgr.cert_mgr.rm_key(self.key_name, svc_name, host) + else: + # It's a reference cert/key to the certmgr so we must keep them as user may want to use them later + logger.info(f"Keeping referenced certificate/key for service: {svc_name}, host: {host}") + def purge(self, service_name: str) -> None: """Called to carry out any purge tasks following service removal""" logger.debug(f'Purge called for {self.TYPE} - no action taken') @@ -1012,7 +1168,6 @@ class RgwService(CephService): def get_dependencies(cls, mgr: "CephadmOrchestrator", spec: Optional[ServiceSpec] = None, daemon_type: Optional[str] = None) -> List[str]: - deps = [] rgw_spec = cast(RGWSpec, spec) ssl_cert = getattr(rgw_spec, 'rgw_frontend_ssl_certificate', None) @@ -1054,21 +1209,6 @@ class RgwService(CephService): # set rgw_realm rgw_zonegroup and rgw_zone, if present self.set_realm_zg_zone(spec) - if spec.rgw_frontend_ssl_certificate: - if isinstance(spec.rgw_frontend_ssl_certificate, list): - cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate) - elif isinstance(spec.rgw_frontend_ssl_certificate, str): - cert_data = spec.rgw_frontend_ssl_certificate - else: - raise OrchestratorError( - 'Invalid rgw_frontend_ssl_certificate: %s' - % spec.rgw_frontend_ssl_certificate) - ret, out, err = self.mgr.check_mon_command({ - 'prefix': 'config-key set', - 'key': f'rgw/cert/{spec.service_name()}', - 'val': cert_data, - }) - if spec.zonegroup_hostnames: san_list = spec.zonegroup_hostnames or [] hostnames = san_list + [f"*.{h}" for h in san_list] if spec.wildcard_enabled else san_list @@ -1091,6 +1231,7 @@ class RgwService(CephService): def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: assert self.TYPE == daemon_spec.daemon_type + self.register_for_certificates(daemon_spec) rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec) @@ -1108,20 +1249,15 @@ class RgwService(CephService): else: port = ports[0] - if spec.generate_cert: + if spec.ssl: san_list = spec.zonegroup_hostnames or [] - custom_san_list = san_list + [f"*.{h}" for h in san_list] if spec.wildcard_enabled else san_list - - cert, key = self.mgr.cert_mgr.generate_cert( - daemon_spec.host, - self.mgr.inventory.get_addr(daemon_spec.host), - custom_san_list=custom_san_list - ) - pem = ''.join([key, cert]) - self.mgr.cert_mgr.save_cert('rgw_frontend_ssl_cert', pem, service_name=spec.service_name()) + custom_sans = san_list + [f"*.{h}" for h in san_list] if spec.wildcard_enabled else san_list + tls_pair = self.get_certificates(daemon_spec, custom_sans) + pem = f'{tls_pair.key.rstrip()}\n{tls_pair.cert.lstrip()}' + rgw_cert_name = daemon_spec.name() if spec.generate_cert else spec.service_name() ret, out, err = self.mgr.check_mon_command({ 'prefix': 'config-key set', - 'key': f'rgw/cert/{daemon_spec.name()}', + 'key': f'rgw/cert/{rgw_cert_name}', 'val': pem, }) @@ -1438,6 +1574,7 @@ class CephExporterService(CephService): def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: assert self.TYPE == daemon_spec.daemon_type + self.register_for_certificates(daemon_spec) spec = cast(CephExporterSpec, self.mgr.spec_store[daemon_spec.service_name].spec) keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_spec.daemon_id), ['mon', 'profile ceph-exporter', @@ -1469,11 +1606,6 @@ class CephExporterService(CephService): return daemon_spec - def get_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[str, str]: - node_ip = self.mgr.inventory.get_addr(daemon_spec.host) - host_fqdn = self.mgr.get_fqdn(daemon_spec.host) - return self.mgr.cert_mgr.generate_cert(host_fqdn, node_ip) - @register_cephadm_service class CephfsMirrorService(CephService): @@ -1528,6 +1660,7 @@ class CephadmAgent(CephService): def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: assert self.TYPE == daemon_spec.daemon_type + super().prepare_create(daemon_spec) daemon_id, host = daemon_spec.daemon_id, daemon_spec.host daemon_spec.ports = [self.mgr.agent_starting_port] @@ -1558,7 +1691,7 @@ class CephadmAgent(CephService): 'host': daemon_spec.host, 'device_enhanced_scan': str(self.mgr.device_enhanced_scan)} - listener_cert, listener_key = self.mgr.cert_mgr.generate_cert(daemon_spec.host, self.mgr.inventory.get_addr(daemon_spec.host)) + listener_cert, listener_key = self.get_certificates(daemon_spec) config = { 'agent.json': json.dumps(cfg), 'keyring': daemon_spec.keyring, diff --git a/src/pybind/mgr/cephadm/services/monitoring.py b/src/pybind/mgr/cephadm/services/monitoring.py index ad6d30fdc63..dff6036a95c 100644 --- a/src/pybind/mgr/cephadm/services/monitoring.py +++ b/src/pybind/mgr/cephadm/services/monitoring.py @@ -1,7 +1,6 @@ import errno import logging import os -import socket from typing import List, Any, Tuple, Dict, Optional, cast, TYPE_CHECKING import ipaddress import time @@ -842,11 +841,6 @@ class LokiService(CephadmService): TYPE = 'loki' DEFAULT_SERVICE_PORT = 3100 - def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: - assert self.TYPE == daemon_spec.daemon_type - daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) - return daemon_spec - def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: assert self.TYPE == daemon_spec.daemon_type deps: List[str] = [] @@ -870,11 +864,6 @@ class PromtailService(CephadmService): daemon_type: Optional[str] = None) -> List[str]: return sorted(mgr.cache.get_daemons_by_types(['loki'])) - def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: - assert self.TYPE == daemon_spec.daemon_type - daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) - return daemon_spec - def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: assert self.TYPE == daemon_spec.daemon_type deps: List[str] = self.get_dependencies(self.mgr) @@ -901,11 +890,6 @@ class PromtailService(CephadmService): class SNMPGatewayService(CephadmService): TYPE = 'snmp-gateway' - def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: - assert self.TYPE == daemon_spec.daemon_type - daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) - return daemon_spec - def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: assert self.TYPE == daemon_spec.daemon_type deps: List[str] = [] diff --git a/src/pybind/mgr/cephadm/services/service_registry.py b/src/pybind/mgr/cephadm/services/service_registry.py index 10fb893a3e2..2d344e02508 100644 --- a/src/pybind/mgr/cephadm/services/service_registry.py +++ b/src/pybind/mgr/cephadm/services/service_registry.py @@ -72,6 +72,10 @@ class CephadmServiceRegistry: services_to_monitor.append('ceph') # this is needed for mgr-prometheus targets return sorted(services_to_monitor) + def get_all_services(self) -> List["CephadmService"]: + """Retrieves an initialized service instance by type.""" + return list(self._services.values()) + def register_cephadm_service(cls: Type["CephadmService"]) -> Type["CephadmService"]: """ -- 2.39.5