]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: adding automated certificates mgmt for cephadmservice
authorRedouane Kachach <rkachach@ibm.com>
Wed, 13 Aug 2025 17:11:44 +0000 (19:11 +0200)
committerRedouane Kachach <rkachach@ibm.com>
Sat, 6 Sep 2025 21:39:41 +0000 (23:39 +0200)
Signed-off-by: Redouane Kachach <rkachach@ibm.com>
src/pybind/mgr/cephadm/services/cephadmservice.py
src/pybind/mgr/cephadm/services/monitoring.py
src/pybind/mgr/cephadm/services/service_registry.py

index 3dbb31eafc467663be0f0f4ab40228e984d1b448..5205bf2e7f1c098413259adc83432d20ae215374 100644 (file)
@@ -20,6 +20,8 @@ from ceph.deployment.service_spec import (
     MONSpec,
     RGWSpec,
     ServiceSpec,
+    CertificateSource,
+    RequiresCertificatesEntry
 )
 from ceph.deployment.utils import is_ipv6, unwrap_ipv6
 from mgr_util import build_url, merge_dicts
@@ -32,6 +34,8 @@ from orchestrator import (
 from orchestrator._interface import daemon_type_to_service
 from cephadm import utils
 from .service_registry import register_cephadm_service
+from cephadm.tlsobject_types import TLSObjectScope, CertKeyPair, EMPTY_TLS_KEYPAIR
+from cephadm.ssl_cert_utils import extract_ips_and_fqdns_from_cert
 
 if TYPE_CHECKING:
     from cephadm.module import CephadmOrchestrator
@@ -270,6 +274,30 @@ class CephadmService(metaclass=ABCMeta):
     def needs_monitoring(self) -> bool:
         return False
 
+    @property
+    def requires_certificates(self) -> bool:
+        return self.TYPE in ServiceSpec.REQUIRES_CERTIFICATES
+
+    @property
+    def allows_user_certificates(self) -> bool:
+        config = ServiceSpec.REQUIRES_CERTIFICATES.get(self.TYPE)
+        return config is not None and bool(config.get("user_cert_allowed", False))
+
+    @property
+    def SCOPE(self) -> TLSObjectScope:
+        entry: Optional[RequiresCertificatesEntry] = ServiceSpec.REQUIRES_CERTIFICATES.get(self.TYPE)
+        if not entry:
+            return TLSObjectScope.UNKNOWN
+        return TLSObjectScope(entry['scope'])
+
+    @property
+    def cert_name(self) -> str:
+        return f"{self.TYPE.replace('-', '_')}_ssl_cert"
+
+    @property
+    def key_name(self) -> str:
+        return f"{self.TYPE.replace('-', '_')}_ssl_key"
+
     @property
     @abstractmethod
     def TYPE(self) -> str:
@@ -284,6 +312,101 @@ class CephadmService(metaclass=ABCMeta):
     def __init__(self, mgr: "CephadmOrchestrator"):
         self.mgr: "CephadmOrchestrator" = mgr
 
+    def get_self_signed_certificates_with_label(self, svc_spec: ServiceSpec, daemon_spec: CephadmDaemonDeploySpec, label: str) -> CertKeyPair:
+        svc_name = svc_spec.service_name()
+        ip = self.mgr.inventory.get_addr(daemon_spec.host)
+        host_fqdn = self.mgr.get_fqdn(daemon_spec.host)
+        tls_pair = self.mgr.cert_mgr.get_self_signed_cert_key_pair(svc_name, host_fqdn, label)
+        if not tls_pair:
+            tls_pair = self.mgr.cert_mgr.generate_cert(host_fqdn, ip)
+            self.mgr.cert_mgr.save_self_signed_cert_key_pair(svc_name, tls_pair, host=daemon_spec.host, label=label)
+        return tls_pair
+
+    def get_certificates(self,
+                         daemon_spec: CephadmDaemonDeploySpec,
+                         ips: List[str] = [],
+                         fqdns: List[str] = [],
+                         custom_sans: List[str] = []
+                         ) -> CertKeyPair:
+
+        svc_spec = cast(ServiceSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+        if not self.requires_certificates or not svc_spec.ssl:
+            return EMPTY_TLS_KEYPAIR
+
+        ips = ips or [self.mgr.inventory.get_addr(daemon_spec.host)]
+        fqdns = fqdns or [self.mgr.get_fqdn(daemon_spec.host)]
+
+        cert_source = svc_spec.certificate_source
+        logger.debug(f'Getting certificate for {svc_spec.service_name()} using source: {cert_source}')
+        if cert_source == CertificateSource.INLINE.value:
+            return self._get_certificates_from_spec(svc_spec, daemon_spec)
+        elif cert_source == CertificateSource.REFERENCE.value:
+            return self._get_certificates_from_certmgr_store(svc_spec, fqdns)
+        elif cert_source == CertificateSource.CEPHADM_SIGNED.value:
+            return self._get_cephadm_signed_certificates(svc_spec, daemon_spec, ips, fqdns, custom_sans)
+        else:
+            logger.error(f'Invalid cert_source: {cert_source}')
+            return EMPTY_TLS_KEYPAIR
+
+    def _get_certificates_from_spec(
+        self,
+        svc_spec: ServiceSpec,
+        daemon_spec: CephadmDaemonDeploySpec
+    ) -> CertKeyPair:
+        """
+        Fetch and persist the TLS certificate and key for a service spec.
+        Returns:
+            A CertKeyPair if both are available; otherwise EMPTY_TLS_KEYPAIR.
+        """
+        cert = getattr(svc_spec, 'ssl_cert', None)
+        key = getattr(svc_spec, 'ssl_key', None)
+        if cert and key:
+            service_name = svc_spec.service_name()
+            host = daemon_spec.host
+            self.mgr.cert_mgr.save_cert(self.cert_name, cert, service_name, host, user_made=True)
+            self.mgr.cert_mgr.save_key(self.key_name, key, service_name, host, user_made=True)
+            return CertKeyPair(cert=cert, key=key)
+
+        logger.error(
+            f"Cannot get cert/key '{self.cert_name}/{self.key_name}' for service '{svc_spec.service_name()}'"
+        )
+        return EMPTY_TLS_KEYPAIR
+
+    def _get_certificates_from_certmgr_store(self, svc_spec: ServiceSpec, fqdns: List[str]) -> CertKeyPair:
+        host = fqdns[0] if fqdns else None
+        cert = self.mgr.cert_mgr.get_cert(self.cert_name, svc_spec.service_name(), host)
+        key = self.mgr.cert_mgr.get_key(self.key_name, svc_spec.service_name(), host)
+        if cert and key:
+            return CertKeyPair(cert=cert, key=key)
+        else:
+            logger.error(f'Failed to get cert/key {self.cert_name} for service {svc_spec.service_name()} host: {host} from the certmgr store.')
+            return EMPTY_TLS_KEYPAIR
+
+    def _get_cephadm_signed_certificates(self,
+                                         svc_spec: ServiceSpec,
+                                         daemon_spec: CephadmDaemonDeploySpec,
+                                         ips: Optional[List[str]] = None,
+                                         fqdns: Optional[List[str]] = None,
+                                         custom_sans: Optional[List[str]] = None,
+                                         ) -> CertKeyPair:
+
+        custom_sans = custom_sans or svc_spec.custom_sans or []
+        ips = ips or [self.mgr.inventory.get_addr(daemon_spec.host)]
+        fqdns = fqdns or [self.mgr.get_fqdn(daemon_spec.host)]
+        tls_pair = self.mgr.cert_mgr.get_self_signed_cert_key_pair(svc_spec.service_name(), daemon_spec.host)
+        if tls_pair:
+            combined_fqdns = sorted(set(s.lower() for s in fqdns + custom_sans))
+            cert_ips, cert_fqdns = extract_ips_and_fqdns_from_cert(tls_pair.cert)
+            if sorted(cert_ips) == sorted(ips) and sorted(cert_fqdns) == sorted(combined_fqdns):
+                # Nothing has changed, use the stored certifiactes
+                return tls_pair
+
+        # Either there were not certs or ips/fqdns have changed generate new cets
+        tls_pair = self.mgr.cert_mgr.generate_cert(fqdns, ips, custom_sans)
+        self.mgr.cert_mgr.save_self_signed_cert_key_pair(svc_spec.service_name(), tls_pair, host=daemon_spec.host)
+
+        return tls_pair
+
     def allow_colo(self) -> bool:
         """
         Return True if multiple daemons of the same type can colocate on
@@ -346,8 +469,17 @@ class CephadmService(metaclass=ABCMeta):
             init_containers=getattr(spec, 'init_containers', None),
         )
 
+    def register_for_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> None:
+        if not self.requires_certificates:
+            return
+        spec = self.mgr.spec_store[daemon_spec.service_name].spec
+        if spec.is_using_certificates_source(CertificateSource.CEPHADM_SIGNED):
+            self.mgr.cert_mgr.register_self_signed_cert_key_pair(spec.service_name())
+
     def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
-        raise NotImplementedError()
+        self.register_for_certificates(daemon_spec)
+        daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+        return daemon_spec
 
     def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
         raise NotImplementedError()
@@ -573,9 +705,33 @@ class CephadmService(metaclass=ABCMeta):
         Called after the daemon is removed.
         """
         assert daemon.daemon_type is not None
+        assert daemon.hostname
         assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
         logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
 
+        if self.requires_certificates:
+
+            svc_name = daemon.service_name()
+            if svc_name not in self.mgr.spec_store:
+                return
+
+            spec = self.mgr.spec_store[svc_name].spec
+            if not spec.ssl:
+                return
+
+            host = daemon.hostname
+            cert_source = spec.certificate_source
+            if cert_source == CertificateSource.CEPHADM_SIGNED.value:
+                logger.info(f"Removing cephadm-signed certificate/key for service: {svc_name}, host: {host}")
+                self.mgr.cert_mgr.rm_self_signed_cert_key_pair(svc_name, host)
+            elif cert_source == CertificateSource.INLINE.value:
+                logger.info(f"Removing inline-saved certificate/key for service: {svc_name}, host: {host}")
+                self.mgr.cert_mgr.rm_cert(self.cert_name, svc_name, host)
+                self.mgr.cert_mgr.rm_key(self.key_name, svc_name, host)
+            else:
+                # It's a reference cert/key to the certmgr so we must keep them as user may want to use them later
+                logger.info(f"Keeping referenced certificate/key for service: {svc_name}, host: {host}")
+
     def purge(self, service_name: str) -> None:
         """Called to carry out any purge tasks following service removal"""
         logger.debug(f'Purge called for {self.TYPE} - no action taken')
@@ -1012,7 +1168,6 @@ class RgwService(CephService):
     def get_dependencies(cls, mgr: "CephadmOrchestrator",
                          spec: Optional[ServiceSpec] = None,
                          daemon_type: Optional[str] = None) -> List[str]:
-
         deps = []
         rgw_spec = cast(RGWSpec, spec)
         ssl_cert = getattr(rgw_spec, 'rgw_frontend_ssl_certificate', None)
@@ -1054,21 +1209,6 @@ class RgwService(CephService):
         # set rgw_realm rgw_zonegroup and rgw_zone, if present
         self.set_realm_zg_zone(spec)
 
-        if spec.rgw_frontend_ssl_certificate:
-            if isinstance(spec.rgw_frontend_ssl_certificate, list):
-                cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate)
-            elif isinstance(spec.rgw_frontend_ssl_certificate, str):
-                cert_data = spec.rgw_frontend_ssl_certificate
-            else:
-                raise OrchestratorError(
-                    'Invalid rgw_frontend_ssl_certificate: %s'
-                    % spec.rgw_frontend_ssl_certificate)
-            ret, out, err = self.mgr.check_mon_command({
-                'prefix': 'config-key set',
-                'key': f'rgw/cert/{spec.service_name()}',
-                'val': cert_data,
-            })
-
         if spec.zonegroup_hostnames:
             san_list = spec.zonegroup_hostnames or []
             hostnames = san_list + [f"*.{h}" for h in san_list] if spec.wildcard_enabled else san_list
@@ -1091,6 +1231,7 @@ class RgwService(CephService):
 
     def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         assert self.TYPE == daemon_spec.daemon_type
+        self.register_for_certificates(daemon_spec)
         rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host
         spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
 
@@ -1108,20 +1249,15 @@ class RgwService(CephService):
             else:
                 port = ports[0]
 
-        if spec.generate_cert:
+        if spec.ssl:
             san_list = spec.zonegroup_hostnames or []
-            custom_san_list = san_list + [f"*.{h}" for h in san_list] if spec.wildcard_enabled else san_list
-
-            cert, key = self.mgr.cert_mgr.generate_cert(
-                daemon_spec.host,
-                self.mgr.inventory.get_addr(daemon_spec.host),
-                custom_san_list=custom_san_list
-            )
-            pem = ''.join([key, cert])
-            self.mgr.cert_mgr.save_cert('rgw_frontend_ssl_cert', pem, service_name=spec.service_name())
+            custom_sans = san_list + [f"*.{h}" for h in san_list] if spec.wildcard_enabled else san_list
+            tls_pair = self.get_certificates(daemon_spec, custom_sans)
+            pem = f'{tls_pair.key.rstrip()}\n{tls_pair.cert.lstrip()}'
+            rgw_cert_name = daemon_spec.name() if spec.generate_cert else spec.service_name()
             ret, out, err = self.mgr.check_mon_command({
                 'prefix': 'config-key set',
-                'key': f'rgw/cert/{daemon_spec.name()}',
+                'key': f'rgw/cert/{rgw_cert_name}',
                 'val': pem,
             })
 
@@ -1438,6 +1574,7 @@ class CephExporterService(CephService):
 
     def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         assert self.TYPE == daemon_spec.daemon_type
+        self.register_for_certificates(daemon_spec)
         spec = cast(CephExporterSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
         keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_spec.daemon_id),
                                              ['mon', 'profile ceph-exporter',
@@ -1469,11 +1606,6 @@ class CephExporterService(CephService):
 
         return daemon_spec
 
-    def get_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[str, str]:
-        node_ip = self.mgr.inventory.get_addr(daemon_spec.host)
-        host_fqdn = self.mgr.get_fqdn(daemon_spec.host)
-        return self.mgr.cert_mgr.generate_cert(host_fqdn, node_ip)
-
 
 @register_cephadm_service
 class CephfsMirrorService(CephService):
@@ -1528,6 +1660,7 @@ class CephadmAgent(CephService):
 
     def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         assert self.TYPE == daemon_spec.daemon_type
+        super().prepare_create(daemon_spec)
         daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
         daemon_spec.ports = [self.mgr.agent_starting_port]
 
@@ -1558,7 +1691,7 @@ class CephadmAgent(CephService):
                'host': daemon_spec.host,
                'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
 
-        listener_cert, listener_key = self.mgr.cert_mgr.generate_cert(daemon_spec.host, self.mgr.inventory.get_addr(daemon_spec.host))
+        listener_cert, listener_key = self.get_certificates(daemon_spec)
         config = {
             'agent.json': json.dumps(cfg),
             'keyring': daemon_spec.keyring,
index ad6d30fdc636be2cb7a05fcda23979c201c38232..dff6036a95c5b341b1f23db19a7968a5ed1a8561 100644 (file)
@@ -1,7 +1,6 @@
 import errno
 import logging
 import os
-import socket
 from typing import List, Any, Tuple, Dict, Optional, cast, TYPE_CHECKING
 import ipaddress
 import time
@@ -842,11 +841,6 @@ class LokiService(CephadmService):
     TYPE = 'loki'
     DEFAULT_SERVICE_PORT = 3100
 
-    def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
-        assert self.TYPE == daemon_spec.daemon_type
-        daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
-        return daemon_spec
-
     def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
         assert self.TYPE == daemon_spec.daemon_type
         deps: List[str] = []
@@ -870,11 +864,6 @@ class PromtailService(CephadmService):
                          daemon_type: Optional[str] = None) -> List[str]:
         return sorted(mgr.cache.get_daemons_by_types(['loki']))
 
-    def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
-        assert self.TYPE == daemon_spec.daemon_type
-        daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
-        return daemon_spec
-
     def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
         assert self.TYPE == daemon_spec.daemon_type
         deps: List[str] = self.get_dependencies(self.mgr)
@@ -901,11 +890,6 @@ class PromtailService(CephadmService):
 class SNMPGatewayService(CephadmService):
     TYPE = 'snmp-gateway'
 
-    def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
-        assert self.TYPE == daemon_spec.daemon_type
-        daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
-        return daemon_spec
-
     def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
         assert self.TYPE == daemon_spec.daemon_type
         deps: List[str] = []
index 10fb893a3e2b549aab6e1b1f733223037be75bfc..2d344e025083a8ee23430ce8f1a07f6d2fefa8b9 100644 (file)
@@ -72,6 +72,10 @@ class CephadmServiceRegistry:
         services_to_monitor.append('ceph')  # this is needed for mgr-prometheus targets
         return sorted(services_to_monitor)
 
+    def get_all_services(self) -> List["CephadmService"]:
+        """Retrieves an initialized service instance by type."""
+        return list(self._services.values())
+
 
 def register_cephadm_service(cls: Type["CephadmService"]) -> Type["CephadmService"]:
     """