MONSpec,
RGWSpec,
ServiceSpec,
+ CertificateSource,
+ RequiresCertificatesEntry
)
from ceph.deployment.utils import is_ipv6, unwrap_ipv6
from mgr_util import build_url, merge_dicts
from orchestrator._interface import daemon_type_to_service
from cephadm import utils
from .service_registry import register_cephadm_service
+from cephadm.tlsobject_types import TLSObjectScope, CertKeyPair, EMPTY_TLS_KEYPAIR
+from cephadm.ssl_cert_utils import extract_ips_and_fqdns_from_cert
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
def needs_monitoring(self) -> bool:
return False
+ @property
+ def requires_certificates(self) -> bool:
+ return self.TYPE in ServiceSpec.REQUIRES_CERTIFICATES
+
+ @property
+ def allows_user_certificates(self) -> bool:
+ config = ServiceSpec.REQUIRES_CERTIFICATES.get(self.TYPE)
+ return config is not None and bool(config.get("user_cert_allowed", False))
+
+ @property
+ def SCOPE(self) -> TLSObjectScope:
+ entry: Optional[RequiresCertificatesEntry] = ServiceSpec.REQUIRES_CERTIFICATES.get(self.TYPE)
+ if not entry:
+ return TLSObjectScope.UNKNOWN
+ return TLSObjectScope(entry['scope'])
+
+ @property
+ def cert_name(self) -> str:
+ return f"{self.TYPE.replace('-', '_')}_ssl_cert"
+
+ @property
+ def key_name(self) -> str:
+ return f"{self.TYPE.replace('-', '_')}_ssl_key"
+
@property
@abstractmethod
def TYPE(self) -> str:
def __init__(self, mgr: "CephadmOrchestrator"):
self.mgr: "CephadmOrchestrator" = mgr
+ def get_self_signed_certificates_with_label(self, svc_spec: ServiceSpec, daemon_spec: CephadmDaemonDeploySpec, label: str) -> CertKeyPair:
+ svc_name = svc_spec.service_name()
+ ip = self.mgr.inventory.get_addr(daemon_spec.host)
+ host_fqdn = self.mgr.get_fqdn(daemon_spec.host)
+ tls_pair = self.mgr.cert_mgr.get_self_signed_cert_key_pair(svc_name, host_fqdn, label)
+ if not tls_pair:
+ tls_pair = self.mgr.cert_mgr.generate_cert(host_fqdn, ip)
+ self.mgr.cert_mgr.save_self_signed_cert_key_pair(svc_name, tls_pair, host=daemon_spec.host, label=label)
+ return tls_pair
+
+ def get_certificates(self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ ips: List[str] = [],
+ fqdns: List[str] = [],
+ custom_sans: List[str] = []
+ ) -> CertKeyPair:
+
+ svc_spec = cast(ServiceSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+ if not self.requires_certificates or not svc_spec.ssl:
+ return EMPTY_TLS_KEYPAIR
+
+ ips = ips or [self.mgr.inventory.get_addr(daemon_spec.host)]
+ fqdns = fqdns or [self.mgr.get_fqdn(daemon_spec.host)]
+
+ cert_source = svc_spec.certificate_source
+ logger.debug(f'Getting certificate for {svc_spec.service_name()} using source: {cert_source}')
+ if cert_source == CertificateSource.INLINE.value:
+ return self._get_certificates_from_spec(svc_spec, daemon_spec)
+ elif cert_source == CertificateSource.REFERENCE.value:
+ return self._get_certificates_from_certmgr_store(svc_spec, fqdns)
+ elif cert_source == CertificateSource.CEPHADM_SIGNED.value:
+ return self._get_cephadm_signed_certificates(svc_spec, daemon_spec, ips, fqdns, custom_sans)
+ else:
+ logger.error(f'Invalid cert_source: {cert_source}')
+ return EMPTY_TLS_KEYPAIR
+
+ def _get_certificates_from_spec(
+ self,
+ svc_spec: ServiceSpec,
+ daemon_spec: CephadmDaemonDeploySpec
+ ) -> CertKeyPair:
+ """
+ Fetch and persist the TLS certificate and key for a service spec.
+ Returns:
+ A CertKeyPair if both are available; otherwise EMPTY_TLS_KEYPAIR.
+ """
+ cert = getattr(svc_spec, 'ssl_cert', None)
+ key = getattr(svc_spec, 'ssl_key', None)
+ if cert and key:
+ service_name = svc_spec.service_name()
+ host = daemon_spec.host
+ self.mgr.cert_mgr.save_cert(self.cert_name, cert, service_name, host, user_made=True)
+ self.mgr.cert_mgr.save_key(self.key_name, key, service_name, host, user_made=True)
+ return CertKeyPair(cert=cert, key=key)
+
+ logger.error(
+ f"Cannot get cert/key '{self.cert_name}/{self.key_name}' for service '{svc_spec.service_name()}'"
+ )
+ return EMPTY_TLS_KEYPAIR
+
+ def _get_certificates_from_certmgr_store(self, svc_spec: ServiceSpec, fqdns: List[str]) -> CertKeyPair:
+ host = fqdns[0] if fqdns else None
+ cert = self.mgr.cert_mgr.get_cert(self.cert_name, svc_spec.service_name(), host)
+ key = self.mgr.cert_mgr.get_key(self.key_name, svc_spec.service_name(), host)
+ if cert and key:
+ return CertKeyPair(cert=cert, key=key)
+ else:
+ logger.error(f'Failed to get cert/key {self.cert_name} for service {svc_spec.service_name()} host: {host} from the certmgr store.')
+ return EMPTY_TLS_KEYPAIR
+
+ def _get_cephadm_signed_certificates(self,
+ svc_spec: ServiceSpec,
+ daemon_spec: CephadmDaemonDeploySpec,
+ ips: Optional[List[str]] = None,
+ fqdns: Optional[List[str]] = None,
+ custom_sans: Optional[List[str]] = None,
+ ) -> CertKeyPair:
+
+ custom_sans = custom_sans or svc_spec.custom_sans or []
+ ips = ips or [self.mgr.inventory.get_addr(daemon_spec.host)]
+ fqdns = fqdns or [self.mgr.get_fqdn(daemon_spec.host)]
+ tls_pair = self.mgr.cert_mgr.get_self_signed_cert_key_pair(svc_spec.service_name(), daemon_spec.host)
+ if tls_pair:
+ combined_fqdns = sorted(set(s.lower() for s in fqdns + custom_sans))
+ cert_ips, cert_fqdns = extract_ips_and_fqdns_from_cert(tls_pair.cert)
+ if sorted(cert_ips) == sorted(ips) and sorted(cert_fqdns) == sorted(combined_fqdns):
+ # Nothing has changed, use the stored certifiactes
+ return tls_pair
+
+ # Either there were not certs or ips/fqdns have changed generate new cets
+ tls_pair = self.mgr.cert_mgr.generate_cert(fqdns, ips, custom_sans)
+ self.mgr.cert_mgr.save_self_signed_cert_key_pair(svc_spec.service_name(), tls_pair, host=daemon_spec.host)
+
+ return tls_pair
+
def allow_colo(self) -> bool:
"""
Return True if multiple daemons of the same type can colocate on
init_containers=getattr(spec, 'init_containers', None),
)
+ def register_for_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> None:
+ if not self.requires_certificates:
+ return
+ spec = self.mgr.spec_store[daemon_spec.service_name].spec
+ if spec.is_using_certificates_source(CertificateSource.CEPHADM_SIGNED):
+ self.mgr.cert_mgr.register_self_signed_cert_key_pair(spec.service_name())
+
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
- raise NotImplementedError()
+ self.register_for_certificates(daemon_spec)
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ return daemon_spec
def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
raise NotImplementedError()
Called after the daemon is removed.
"""
assert daemon.daemon_type is not None
+ assert daemon.hostname
assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
+ if self.requires_certificates:
+
+ svc_name = daemon.service_name()
+ if svc_name not in self.mgr.spec_store:
+ return
+
+ spec = self.mgr.spec_store[svc_name].spec
+ if not spec.ssl:
+ return
+
+ host = daemon.hostname
+ cert_source = spec.certificate_source
+ if cert_source == CertificateSource.CEPHADM_SIGNED.value:
+ logger.info(f"Removing cephadm-signed certificate/key for service: {svc_name}, host: {host}")
+ self.mgr.cert_mgr.rm_self_signed_cert_key_pair(svc_name, host)
+ elif cert_source == CertificateSource.INLINE.value:
+ logger.info(f"Removing inline-saved certificate/key for service: {svc_name}, host: {host}")
+ self.mgr.cert_mgr.rm_cert(self.cert_name, svc_name, host)
+ self.mgr.cert_mgr.rm_key(self.key_name, svc_name, host)
+ else:
+ # It's a reference cert/key to the certmgr so we must keep them as user may want to use them later
+ logger.info(f"Keeping referenced certificate/key for service: {svc_name}, host: {host}")
+
def purge(self, service_name: str) -> None:
"""Called to carry out any purge tasks following service removal"""
logger.debug(f'Purge called for {self.TYPE} - no action taken')
def get_dependencies(cls, mgr: "CephadmOrchestrator",
spec: Optional[ServiceSpec] = None,
daemon_type: Optional[str] = None) -> List[str]:
-
deps = []
rgw_spec = cast(RGWSpec, spec)
ssl_cert = getattr(rgw_spec, 'rgw_frontend_ssl_certificate', None)
# set rgw_realm rgw_zonegroup and rgw_zone, if present
self.set_realm_zg_zone(spec)
- if spec.rgw_frontend_ssl_certificate:
- if isinstance(spec.rgw_frontend_ssl_certificate, list):
- cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate)
- elif isinstance(spec.rgw_frontend_ssl_certificate, str):
- cert_data = spec.rgw_frontend_ssl_certificate
- else:
- raise OrchestratorError(
- 'Invalid rgw_frontend_ssl_certificate: %s'
- % spec.rgw_frontend_ssl_certificate)
- ret, out, err = self.mgr.check_mon_command({
- 'prefix': 'config-key set',
- 'key': f'rgw/cert/{spec.service_name()}',
- 'val': cert_data,
- })
-
if spec.zonegroup_hostnames:
san_list = spec.zonegroup_hostnames or []
hostnames = san_list + [f"*.{h}" for h in san_list] if spec.wildcard_enabled else san_list
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
+ self.register_for_certificates(daemon_spec)
rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host
spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
else:
port = ports[0]
- if spec.generate_cert:
+ if spec.ssl:
san_list = spec.zonegroup_hostnames or []
- custom_san_list = san_list + [f"*.{h}" for h in san_list] if spec.wildcard_enabled else san_list
-
- cert, key = self.mgr.cert_mgr.generate_cert(
- daemon_spec.host,
- self.mgr.inventory.get_addr(daemon_spec.host),
- custom_san_list=custom_san_list
- )
- pem = ''.join([key, cert])
- self.mgr.cert_mgr.save_cert('rgw_frontend_ssl_cert', pem, service_name=spec.service_name())
+ custom_sans = san_list + [f"*.{h}" for h in san_list] if spec.wildcard_enabled else san_list
+ tls_pair = self.get_certificates(daemon_spec, custom_sans)
+ pem = f'{tls_pair.key.rstrip()}\n{tls_pair.cert.lstrip()}'
+ rgw_cert_name = daemon_spec.name() if spec.generate_cert else spec.service_name()
ret, out, err = self.mgr.check_mon_command({
'prefix': 'config-key set',
- 'key': f'rgw/cert/{daemon_spec.name()}',
+ 'key': f'rgw/cert/{rgw_cert_name}',
'val': pem,
})
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
+ self.register_for_certificates(daemon_spec)
spec = cast(CephExporterSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_spec.daemon_id),
['mon', 'profile ceph-exporter',
return daemon_spec
- def get_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[str, str]:
- node_ip = self.mgr.inventory.get_addr(daemon_spec.host)
- host_fqdn = self.mgr.get_fqdn(daemon_spec.host)
- return self.mgr.cert_mgr.generate_cert(host_fqdn, node_ip)
-
@register_cephadm_service
class CephfsMirrorService(CephService):
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
+ super().prepare_create(daemon_spec)
daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
daemon_spec.ports = [self.mgr.agent_starting_port]
'host': daemon_spec.host,
'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
- listener_cert, listener_key = self.mgr.cert_mgr.generate_cert(daemon_spec.host, self.mgr.inventory.get_addr(daemon_spec.host))
+ listener_cert, listener_key = self.get_certificates(daemon_spec)
config = {
'agent.json': json.dumps(cfg),
'keyring': daemon_spec.keyring,