import tempfile
from cephadm.services.service_registry import service_registry
from cephadm.services.cephadmservice import CephadmAgent
-from cephadm.tlsobject_types import CertKeyPair
+from cephadm.tlsobject_types import TLSCredentials
from urllib.error import HTTPError, URLError
from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional, MutableMapping, IO
verify_tls_files(self.cert_file.name, self.key_file.name)
server.ssl_certificate, server.ssl_private_key = self.cert_file.name, self.key_file.name
- def _get_agent_certificates(self) -> CertKeyPair:
+ def _get_agent_certificates(self) -> TLSCredentials:
host = self.mgr.get_hostname()
- tls_pair = self.mgr.cert_mgr.get_self_signed_cert_key_pair(CephadmAgent.TYPE, host)
- if not tls_pair:
- tls_pair = self.mgr.cert_mgr.generate_cert(host, self.mgr.get_mgr_ip(), duration_in_days=CEPHADM_AGENT_CERT_DURATION)
- self.mgr.cert_mgr.save_self_signed_cert_key_pair(CephadmAgent.TYPE, tls_pair, host=host)
- return tls_pair
+ tls_creds = self.mgr.cert_mgr.get_self_signed_tls_credentials(CephadmAgent.TYPE, host)
+ if not tls_creds:
+ tls_creds = self.mgr.cert_mgr.generate_cert(host, self.mgr.get_mgr_ip(), duration_in_days=CEPHADM_AGENT_CERT_DURATION)
+ self.mgr.cert_mgr.save_self_signed_cert_key_pair(CephadmAgent.TYPE, tls_creds, host=host)
+ return tls_creds
def find_free_port(self) -> None:
max_port = self.server_port + 150
from cephadm.ssl_cert_utils import SSLCerts, SSLConfigException
from mgr_util import verify_tls, certificate_days_to_expire, ServerConfigException
from cephadm.ssl_cert_utils import get_certificate_info, get_private_key_info
-from cephadm.tlsobject_types import Cert, PrivKey, TLSObjectScope, TLSObjectException, CertKeyPair
+from cephadm.tlsobject_types import Cert, PrivKey, TLSObjectScope, TLSObjectException, TLSCredentials
from cephadm.tlsobject_store import TLSObjectStore
if TYPE_CHECKING:
self.cert_store.register_object_name(self.self_signed_cert(service_name, label), TLSObjectScope.HOST)
self.key_store.register_object_name(self.self_signed_key(service_name, label), TLSObjectScope.HOST)
- def register_cert_key_pair(self, consumer: str, cert_name: str, key_name: str, scope: TLSObjectScope) -> None:
+ def register_cert_key_pair(
+ self,
+ consumer: str,
+ cert_name: str,
+ key_name: str,
+ scope: TLSObjectScope,
+ ca_cert_name: Optional[str] = None
+ ) -> None:
"""
Registers a certificate/key for a given consumer under a specific scope.
"""
self.register_cert(consumer, cert_name, scope)
self.register_key(consumer, key_name, scope)
+ if ca_cert_name:
+ self.register_cert(consumer, ca_cert_name, scope)
def register_cert(self, consumer: str, cert_name: str, scope: TLSObjectScope) -> None:
self._register_tls_object(consumer, cert_name, scope, "certs")
node_ip: Union[str, List[str]],
custom_san_list: Optional[List[str]] = None,
duration_in_days: Optional[int] = None,
- ) -> CertKeyPair:
+ ) -> TLSCredentials:
cert, key = self.ssl_certs.generate_cert(host_fqdn, node_ip, custom_san_list=custom_san_list, duration_in_days=duration_in_days)
- return CertKeyPair(cert=cert, key=key)
+ ca_cert = self.mgr.cert_mgr.get_root_ca()
+ return TLSCredentials(cert=cert, key=key, ca_cert=ca_cert)
def cert_exists(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> bool:
cert_obj = self.cert_store.get_tlsobject(cert_name, service_name, host)
key_obj = cast(PrivKey, self.key_store.get_tlsobject(key_name, service_name, host))
return key_obj.key if key_obj else None
- def get_self_signed_cert_key_pair(self, service_name: str, hostname: str, label: Optional[str] = None) -> CertKeyPair:
+ def get_self_signed_tls_credentials(self, service_name: str, hostname: str, label: Optional[str] = None) -> TLSCredentials:
cert_obj = cast(Cert, self.cert_store.get_tlsobject(self.self_signed_cert(service_name, label), host=hostname))
key_obj = cast(PrivKey, self.key_store.get_tlsobject(self.self_signed_key(service_name, label), host=hostname))
cert = cert_obj.cert if cert_obj else ''
key = key_obj.key if key_obj else ''
- return CertKeyPair(cert=cert, key=key)
+ ca_cert = self.mgr.cert_mgr.get_root_ca()
+ return TLSCredentials(cert=cert, key=key, ca_cert=ca_cert)
def save_cert(self, cert_name: str, cert: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False, editable: bool = False) -> None:
self.cert_store.save_tlsobject(cert_name, cert, service_name, host, user_made, editable)
def save_key(self, key_name: str, key: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False, editable: bool = False) -> None:
self.key_store.save_tlsobject(key_name, key, service_name, host, user_made, editable)
- def save_self_signed_cert_key_pair(self, service_name: str, tls_pair: CertKeyPair, host: str, label: Optional[str] = None) -> None:
+ def save_self_signed_cert_key_pair(self, service_name: str, tls_creds: TLSCredentials, host: str, label: Optional[str] = None) -> None:
ss_cert_name = self.self_signed_cert(service_name, label)
ss_key_name = self.self_signed_key(service_name, label)
- self.cert_store.save_tlsobject(ss_cert_name, tls_pair.cert, host=host, user_made=False)
- self.key_store.save_tlsobject(ss_key_name, tls_pair.key, host=host, user_made=False)
+ self.cert_store.save_tlsobject(ss_cert_name, tls_creds.cert, host=host, user_made=False)
+ self.key_store.save_tlsobject(ss_key_name, tls_creds.key, host=host, user_made=False)
def rm_cert(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> bool:
return self.cert_store.rm_tlsobject(cert_name, service_name, host)
if svc.allows_user_certificates:
if svc.SCOPE == TLSObjectScope.UNKNOWN:
OrchestratorError(f"Service {svc.TYPE} requieres certificates but it has not defined its svc.SCOPE field.")
- self.cert_mgr.register_cert_key_pair(svc.TYPE, svc.cert_name, svc.key_name, svc.SCOPE)
+ self.cert_mgr.register_cert_key_pair(svc.TYPE, svc.cert_name, svc.key_name, svc.SCOPE, svc.ca_cert_name)
self.cert_mgr.register_cert_key_pair('nvmeof', 'nvmeof_client_cert', 'nvmeof_client_key', TLSObjectScope.SERVICE)
self.cert_mgr.register_cert('nvmeof', 'nvmeof_root_ca_cert', TLSObjectScope.SERVICE)
if module_name == 'dashboard':
host_fqdns.append('dashboard_servers')
- cert, key = self.cert_mgr.generate_cert(host_fqdns, self.get_mgr_ip())
- return {'cert': cert, 'key': key}
+ tls_creds = self.cert_mgr.generate_cert(host_fqdns, self.get_mgr_ip())
+ return {'cert': tls_creds.cert, 'key': tls_creds.key}
@handle_orch_error
def set_prometheus_access_info(self, user: str, password: str) -> str:
from orchestrator._interface import daemon_type_to_service
from cephadm import utils
from .service_registry import register_cephadm_service
-from cephadm.tlsobject_types import TLSObjectScope, CertKeyPair, EMPTY_TLS_KEYPAIR
+from cephadm.tlsobject_types import TLSObjectScope, TLSCredentials, EMPTY_TLS_CREDENTIALS
from cephadm.ssl_cert_utils import extract_ips_and_fqdns_from_cert
if TYPE_CHECKING:
return TLSObjectScope.UNKNOWN
return TLSObjectScope(entry['scope'])
+ @property
+ def requires_ca_cert(self) -> bool:
+ config = ServiceSpec.REQUIRES_CERTIFICATES.get(self.TYPE)
+ return config is not None and bool(config.get("requires_ca_cert", False))
+
@property
def cert_name(self) -> str:
return f"{self.TYPE.replace('-', '_')}_ssl_cert"
def key_name(self) -> str:
return f"{self.TYPE.replace('-', '_')}_ssl_key"
+ @property
+ def ca_cert_name(self) -> Optional[str]:
+ if self.requires_ca_cert:
+ return f"{self.TYPE.replace('-', '_')}_ssl_ca_cert"
+ return None
+
@property
@abstractmethod
def TYPE(self) -> str:
def __init__(self, mgr: "CephadmOrchestrator"):
self.mgr: "CephadmOrchestrator" = mgr
- def get_self_signed_certificates_with_label(self, svc_spec: ServiceSpec, daemon_spec: CephadmDaemonDeploySpec, label: str) -> CertKeyPair:
+ def get_self_signed_certificates_with_label(self, svc_spec: ServiceSpec, daemon_spec: CephadmDaemonDeploySpec, label: str) -> TLSCredentials:
svc_name = svc_spec.service_name()
ip = self.mgr.inventory.get_addr(daemon_spec.host)
host_fqdn = self.mgr.get_fqdn(daemon_spec.host)
- tls_pair = self.mgr.cert_mgr.get_self_signed_cert_key_pair(svc_name, host_fqdn, label)
- if not tls_pair:
- tls_pair = self.mgr.cert_mgr.generate_cert(host_fqdn, ip)
- self.mgr.cert_mgr.save_self_signed_cert_key_pair(svc_name, tls_pair, host=daemon_spec.host, label=label)
- return tls_pair
+ tls_creds = self.mgr.cert_mgr.get_self_signed_tls_credentials(svc_name, host_fqdn, label)
+ if not tls_creds:
+ tls_creds = self.mgr.cert_mgr.generate_cert(host_fqdn, ip)
+ self.mgr.cert_mgr.save_self_signed_cert_key_pair(svc_name, tls_creds, host=daemon_spec.host, label=label)
+ return tls_creds
def get_certificates(self,
daemon_spec: CephadmDaemonDeploySpec,
ips: List[str] = [],
fqdns: List[str] = [],
- custom_sans: List[str] = []
- ) -> CertKeyPair:
+ custom_sans: List[str] = [],
+ ca_cert_required: bool = False
+ ) -> TLSCredentials:
svc_spec = cast(ServiceSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
if not self.requires_certificates or not svc_spec.ssl:
- return EMPTY_TLS_KEYPAIR
+ return EMPTY_TLS_CREDENTIALS
return self.get_certificates_generic(
svc_spec=svc_spec,
daemon_spec=daemon_spec,
- cert_attr='ssl_cert',
- key_attr='ssl_key',
cert_source_attr='certificate_source',
+ cert_attr='ssl_cert',
cert_name=self.cert_name,
+ key_attr='ssl_key',
key_name=self.key_name,
+ ca_cert_attr='ssl_ca_cert' if ca_cert_required else None,
+ ca_cert_name=self.ca_cert_name if ca_cert_required else None,
ips=ips,
fqdns=fqdns,
- custom_sans=custom_sans
+ custom_sans=custom_sans,
)
def get_certificates_generic(
self,
svc_spec: ServiceSpec,
daemon_spec: CephadmDaemonDeploySpec,
- cert_attr: str,
- key_attr: str,
cert_source_attr: str,
+ cert_attr: str,
cert_name: str,
+ key_attr: str,
key_name: str,
+ ca_cert_attr: Optional[str] = None,
+ ca_cert_name: Optional[str] = None,
custom_sans: Optional[List[str]] = None,
ips: Optional[List[str]] = None,
- fqdns: Optional[List[str]] = None
- ) -> CertKeyPair:
+ fqdns: Optional[List[str]] = None,
+ ) -> TLSCredentials:
ips = ips or [self.mgr.inventory.get_addr(daemon_spec.host)]
fqdns = fqdns or [self.mgr.get_fqdn(daemon_spec.host)]
logger.debug(f'Getting certificate for {svc_spec.service_name()} using source: {cert_source}')
if cert_source == CertificateSource.INLINE.value:
- return self._get_certificates_from_spec(svc_spec, daemon_spec, cert_attr, key_attr, cert_name, key_name)
+ return self._get_certificates_from_spec(svc_spec, daemon_spec, cert_attr, key_attr, cert_name, key_name, ca_cert_attr, ca_cert_name)
elif cert_source == CertificateSource.REFERENCE.value:
- return self._get_certificates_from_certmgr_store(svc_spec, fqdns, cert_name, key_name)
+ return self._get_certificates_from_certmgr_store(svc_spec, fqdns, cert_name, key_name, ca_cert_name)
elif cert_source == CertificateSource.CEPHADM_SIGNED.value:
return self._get_cephadm_signed_certificates(svc_spec, daemon_spec, ips, fqdns, custom_sans)
else:
logger.error(f'Invalid cert_source: {cert_source}')
- return EMPTY_TLS_KEYPAIR
+ return EMPTY_TLS_CREDENTIALS
def _get_certificates_from_spec(
self,
cert_attr: str,
key_attr: str,
cert_name: str,
- key_name: str
- ) -> CertKeyPair:
+ key_name: str,
+ ca_cert_attr: Optional[str] = None,
+ ca_cert_name: Optional[str] = None
+ ) -> TLSCredentials:
"""
Fetch and persist the TLS certificate and key for a service spec.
Returns:
- A CertKeyPair if both are available; otherwise EMPTY_TLS_KEYPAIR.
+ A TLSCredentials if both are available; otherwise EMPTY_TLS_CREDENTIALS.
"""
cert = getattr(svc_spec, cert_attr, None)
key = getattr(svc_spec, key_attr, None)
- if cert and key:
- service_name = svc_spec.service_name()
- host = daemon_spec.host
- self.mgr.cert_mgr.save_cert(cert_name, cert, service_name, host, user_made=True)
- self.mgr.cert_mgr.save_key(key_name, key, service_name, host, user_made=True)
- return CertKeyPair(cert=cert, key=key)
-
- logger.error(
- f"Cannot get cert/key '{self.cert_name}/{self.key_name}' for service '{svc_spec.service_name()}'"
- )
- return EMPTY_TLS_KEYPAIR
+ needs_ca = bool(ca_cert_attr and ca_cert_name)
+ ca_cert: Optional[str] = None
+ if needs_ca:
+ assert ca_cert_attr
+ ca_cert = getattr(svc_spec, ca_cert_attr, None)
+
+ service_name = svc_spec.service_name()
+ host = daemon_spec.host
+
+ missing = []
+ if not cert:
+ missing.append(cert_attr)
+ if not key:
+ missing.append(key_attr)
+ if needs_ca and not ca_cert:
+ assert ca_cert_attr
+ missing.append(ca_cert_attr)
+ if missing:
+ logger.error(
+ f"Cannot get required TLS fields {', '.join(missing)} for service "
+ f"'{service_name}' (cert/key names: {cert_name}/{key_name})"
+ )
+ return EMPTY_TLS_CREDENTIALS
+ # Save TLS credentials
+ if needs_ca:
+ assert ca_cert_name and ca_cert
+ self.mgr.cert_mgr.save_cert(ca_cert_name, ca_cert, service_name, host, user_made=True)
+ assert cert and key
+ self.mgr.cert_mgr.save_cert(cert_name, cert, service_name, host, user_made=True)
+ self.mgr.cert_mgr.save_key(key_name, key, service_name, host, user_made=True)
+ return TLSCredentials(cert=cert, key=key, ca_cert=ca_cert)
def _get_certificates_from_certmgr_store(
self,
svc_spec: ServiceSpec,
fqdns: List[str],
cert_name: str,
- key_name: str
- ) -> CertKeyPair:
+ key_name: str,
+ ca_cert_name: Optional[str] = None
+ ) -> TLSCredentials:
host = fqdns[0] if fqdns else None
cert = self.mgr.cert_mgr.get_cert(cert_name, svc_spec.service_name(), host)
key = self.mgr.cert_mgr.get_key(key_name, svc_spec.service_name(), host)
+ ca_cert = self.mgr.cert_mgr.get_cert(ca_cert_name, svc_spec.service_name(), host) if ca_cert_name else ''
if cert and key:
- return CertKeyPair(cert=cert, key=key)
+ if ca_cert_name and not ca_cert:
+ logger.error(f'Failed to get CA cert {ca_cert_name} for service {svc_spec.service_name()} host: {host} from the certmgr store.')
+ return EMPTY_TLS_CREDENTIALS
+ return TLSCredentials(cert=cert, key=key, ca_cert=ca_cert)
else:
logger.error(f'Failed to get cert/key {cert_name} for service {svc_spec.service_name()} host: {host} from the certmgr store.')
- return EMPTY_TLS_KEYPAIR
+ return EMPTY_TLS_CREDENTIALS
def _get_cephadm_signed_certificates(
self,
ips: List[str],
fqdns: List[str],
custom_sans: List[str],
- ) -> CertKeyPair:
+ ) -> TLSCredentials:
custom_sans = custom_sans or svc_spec.custom_sans or []
ips = ips or [self.mgr.inventory.get_addr(daemon_spec.host)]
fqdns = fqdns or [self.mgr.get_fqdn(daemon_spec.host)]
- tls_pair = self.mgr.cert_mgr.get_self_signed_cert_key_pair(svc_spec.service_name(), daemon_spec.host)
- if tls_pair:
+ tls_creds = self.mgr.cert_mgr.get_self_signed_tls_credentials(svc_spec.service_name(), daemon_spec.host)
+ if tls_creds:
combined_fqdns = sorted(set(s.lower() for s in fqdns + custom_sans))
- cert_ips, cert_fqdns = extract_ips_and_fqdns_from_cert(tls_pair.cert)
+ cert_ips, cert_fqdns = extract_ips_and_fqdns_from_cert(tls_creds.cert)
if sorted(cert_ips) == sorted(ips) and sorted(cert_fqdns) == sorted(combined_fqdns):
# Nothing has changed, use the stored certifiactes
- return tls_pair
+ return tls_creds
# Either there were not certs or ips/fqdns have changed generate new cets
- tls_pair = self.mgr.cert_mgr.generate_cert(fqdns, ips, custom_sans)
- self.mgr.cert_mgr.save_self_signed_cert_key_pair(svc_spec.service_name(), tls_pair, host=daemon_spec.host)
+ tls_creds = self.mgr.cert_mgr.generate_cert(fqdns, ips, custom_sans)
+ self.mgr.cert_mgr.save_self_signed_cert_key_pair(svc_spec.service_name(), tls_creds, host=daemon_spec.host)
- return tls_pair
+ return tls_creds
def allow_colo(self) -> bool:
"""
logger.info(f"Removing inline-saved certificate/key for service: {svc_name}, host: {host}")
self.mgr.cert_mgr.rm_cert(self.cert_name, svc_name, host)
self.mgr.cert_mgr.rm_key(self.key_name, svc_name, host)
+ if self.ca_cert_name:
+ self.mgr.cert_mgr.rm_cert(self.ca_cert_name, svc_name, host)
else:
# It's a reference cert/key to the certmgr so we must keep them as user may want to use them later
logger.info(f"Keeping referenced certificate/key for service: {svc_name}, host: {host}")
if spec.ssl:
san_list = spec.zonegroup_hostnames or []
custom_sans = san_list + [f"*.{h}" for h in san_list] if spec.wildcard_enabled else san_list
- tls_pair = self.get_certificates(daemon_spec, custom_sans)
- pem = f'{tls_pair.key.rstrip()}\n{tls_pair.cert.lstrip()}'
+ tls_creds = self.get_certificates(daemon_spec, custom_sans)
+ pem = f'{tls_creds.key.rstrip()}\n{tls_creds.cert.lstrip()}'
rgw_cert_name = daemon_spec.name() if spec.generate_cert else spec.service_name()
ret, out, err = self.mgr.check_mon_command({
'prefix': 'config-key set',
security_enabled, _, _ = self.mgr._get_security_config()
if security_enabled:
exporter_config.update({'https_enabled': True})
- crt, key = self.get_certificates(daemon_spec)
+ tls_creds = self.get_certificates(daemon_spec)
exporter_config['files'] = {
- 'ceph-exporter.crt': crt,
- 'ceph-exporter.key': key
+ 'ceph-exporter.crt': tls_creds.cert,
+ 'ceph-exporter.key': tls_creds.key
}
daemon_spec.keyring = keyring
'host': daemon_spec.host,
'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
- listener_cert, listener_key = self.get_certificates(daemon_spec)
+ tls_creds = self.get_certificates(daemon_spec)
config = {
'agent.json': json.dumps(cfg),
'keyring': daemon_spec.keyring,
'root_cert.pem': self.mgr.cert_mgr.get_root_ca(),
- 'listener.crt': listener_cert,
- 'listener.key': listener_key,
+ 'listener.crt': tls_creds.cert,
+ 'listener.key': tls_creds.key,
}
return config, sorted([str(self.mgr.get_mgr_ip()), str(agent.server_port),
from orchestrator import OrchestratorError, DaemonDescription
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
from .service_registry import register_cephadm_service
+from cephadm.tlsobject_types import TLSCredentials
if TYPE_CHECKING:
from ..module import CephadmOrchestrator
config_files['files']['haproxy.pem'] = combined_pem
if spec.monitor_ssl and spec.monitor_cert_source != MonitorCertSource.REUSE_SERVICE_CERT.value:
- stats_cert, stats_key = self.get_stats_certs(spec, daemon_spec, monitor_ips)
- monitor_ssl_cert = [stats_cert, stats_key]
+ tls_creds = self.get_stats_certs(spec, daemon_spec, monitor_ips)
+ monitor_ssl_cert = [tls_creds.cert, tls_creds.key]
config_files['files']['stats_haproxy.pem'] = '\n'.join(monitor_ssl_cert)
return config_files, self.get_haproxy_dependencies(self.mgr, spec)
svc_spec: IngressSpec,
daemon_spec: CephadmDaemonDeploySpec,
ips: Optional[List[str]] = None,
- ) -> Tuple[str, str]:
+ ) -> TLSCredentials:
return self.get_certificates_generic(
svc_spec=svc_spec,
daemon_spec=daemon_spec,
'enable_oauth2_proxy': bool(oauth2_proxy_endpoints),
}
- internal_cert, internal_pkey = self.get_self_signed_certificates_with_label(svc_spec, daemon_spec, INTERNAL_CERT_LABEL)
+ tls_creds = self.get_self_signed_certificates_with_label(svc_spec, daemon_spec, INTERNAL_CERT_LABEL)
daemon_config = {
"files": {
"nginx.conf": self.mgr.template.render(self.SVC_TEMPLATE_PATH, main_context),
"nginx_external_server.conf": self.mgr.template.render(self.EXTERNAL_SVC_TEMPLATE_PATH, server_context),
"nginx_internal_server.conf": self.mgr.template.render(self.INTERNAL_SVC_TEMPLATE_PATH, server_context),
- "nginx_internal.crt": internal_cert,
- "nginx_internal.key": internal_pkey,
+ "nginx_internal.crt": tls_creds.cert,
+ "nginx_internal.key": tls_creds.key,
"ca.crt": self.mgr.cert_mgr.get_root_ca()
}
}
from mgr_module import HandleCommandResult
from .service_registry import register_cephadm_service
from cephadm.services.service_registry import service_registry
-from cephadm.tlsobject_types import CertKeyPair
+from cephadm.tlsobject_types import TLSCredentials
from orchestrator import DaemonDescription
from ceph.deployment.service_spec import AlertManagerSpec, GrafanaSpec, ServiceSpec, \
return ''
- def get_grafana_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> CertKeyPair:
+ def get_grafana_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> TLSCredentials:
host_ips = [self.mgr.inventory.get_addr(daemon_spec.host)]
host_fqdns = [self.mgr.get_fqdn(daemon_spec.host), 'grafana_servers']
return self.get_certificates(daemon_spec, host_ips, host_fqdns)
def needs_monitoring(self) -> bool:
return True
- def get_alertmanager_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> CertKeyPair:
+ def get_alertmanager_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> TLSCredentials:
host_ips = [self.mgr.inventory.get_addr(daemon_spec.host)]
host_fqdns = [self.mgr.get_fqdn(daemon_spec.host), 'alertmanager_servers']
return self.get_certificates(daemon_spec, host_ips, host_fqdns)
# we shouldn't get here (mon will tell the mgr to respawn), but no
# harm done if we do.
- def get_prometheus_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> CertKeyPair:
+ def get_prometheus_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> TLSCredentials:
host_ips = [self.mgr.inventory.get_addr(daemon_spec.host)]
host_fqdns = [self.mgr.get_fqdn(daemon_spec.host), 'prometheus_servers']
return self.get_certificates(daemon_spec, host_ips, host_fqdns)
def needs_monitoring(self) -> bool:
return True
+ @property
+ def client_cert_name(self) -> str:
+ return 'nvmeof_client_cert'
+
+ @property
+ def client_key_name(self) -> str:
+ return 'nvmeof_client_key'
+
+ @property
+ def ca_cert_name(self) -> str:
+ return 'nvmeof_root_ca_cert'
+
def config(self, spec: NvmeofServiceSpec) -> None: # type: ignore
assert self.TYPE == spec.service_type
# Looking at src/pybind/mgr/cephadm/services/iscsi.py
self.mgr.log.info(f"TLS for nvmeof service {svc_name} is disabled.")
return
- host = daemon_spec.host
-
# Attach server-side certificates
- tls_pair = self.get_certificates(daemon_spec)
+ tls_creds = self.get_certificates(daemon_spec)
daemon_spec.extra_files.update({
- 'server_cert': tls_pair.cert,
- 'server_key': tls_pair.key,
+ 'server_cert': tls_creds.cert,
+ 'server_key': tls_creds.key,
})
# If mTLS is not enabled, we're done
if not spec.enable_auth:
return
- client_cert = client_key = root_ca_cert = None
-
+ tls_creds = None
if spec.certificate_source == CertificateSource.CEPHADM_SIGNED.value:
- client_tls_pair = self.get_self_signed_certificates_with_label(
+ tls_creds = self.get_self_signed_certificates_with_label(
spec, daemon_spec, NVMEOF_CLIENT_CERT_LABEL
)
- client_cert = client_tls_pair.cert
- client_key = client_tls_pair.key
- root_ca_cert = self.mgr.cert_mgr.get_root_ca()
-
- elif spec.certificate_source == CertificateSource.REFERENCE.value:
- client_cert = self.mgr.cert_mgr.get_cert('nvmeof_client_cert', service_name=svc_name, host=host)
- client_key = self.mgr.cert_mgr.get_key('nvmeof_client_key', service_name=svc_name, host=host)
- root_ca_cert = self.mgr.cert_mgr.get_cert('nvmeof_root_ca_cert', service_name=svc_name, host=host)
-
- elif spec.certificate_source == CertificateSource.INLINE.value:
- assert spec.client_cert and spec.client_key and spec.root_ca_cert # for mypy
- client_cert, client_key, root_ca_cert = spec.client_cert, spec.client_key, spec.root_ca_cert
- self.mgr.cert_mgr.save_cert('nvmeof_client_cert', client_cert, svc_name, daemon_spec.host, user_made=True)
- self.mgr.cert_mgr.save_key('nvmeof_client_key', client_key, svc_name, daemon_spec.host, user_made=True)
- self.mgr.cert_mgr.save_cert('nvmeof_root_ca_cert', root_ca_cert, svc_name, daemon_spec.host, user_made=True)
-
- if not all([client_cert, client_key, root_ca_cert]):
+ elif spec.certificate_source in [CertificateSource.REFERENCE.value, CertificateSource.INLINE.value]:
+ tls_creds = self.get_certificates_generic(
+ svc_spec=spec,
+ daemon_spec=daemon_spec,
+ cert_source_attr='certificate_source',
+ cert_attr='client_cert',
+ cert_name=self.client_cert_name,
+ key_attr='client_key',
+ key_name=self.client_key_name,
+ ca_cert_attr='root_ca_cert',
+ ca_cert_name=self.ca_cert_name
+ )
+
+ if not (tls_creds and tls_creds.cert and tls_creds.key and tls_creds.ca_cert):
raise OrchestratorError("mTLS is enabled, but or more of client_cert, client_key, or root_ca_cert is missing or was not set correctly.")
daemon_spec.extra_files.update({
- 'client_cert': client_cert,
- 'client_key': client_key,
- 'root_ca_cert': root_ca_cert,
+ 'client_cert': tls_creds.cert,
+ 'client_key': tls_creds.key,
+ 'root_ca_cert': tls_creds.ca_cert,
})
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
self.mgr.cert_mgr.rm_self_signed_cert_key_pair(service_name, daemon.hostname, label=NVMEOF_CLIENT_CERT_LABEL)
if spec.enable_auth and spec.certificate_source == CertificateSource.INLINE.value:
- for entry in ['nvmeof_client_cert', 'nvmeof_client_key', 'nvmeof_root_ca_cert']:
+ for entry in [self.client_cert_name, self.client_key_name, self.ca_cert_name]:
if 'cert' in entry:
self.mgr.cert_mgr.rm_cert(entry, spec.service_name(), daemon.hostname)
elif 'key' in entry:
from tests import mock
import logging
-from cephadm.tlsobject_types import Cert, PrivKey, TLSObjectException, TLSObjectProtocol, CertKeyPair
+from cephadm.tlsobject_types import Cert, PrivKey, TLSObjectException, TLSObjectProtocol, TLSCredentials
from cephadm.tlsobject_store import TLSOBJECT_STORE_PREFIX, TLSObjectStore, TLSObjectScope
from cephadm.module import CephadmOrchestrator
from cephadm.cert_mgr import CertInfo, CertMgr
# Save (simulate cephadm-generated) cert/key at host target
cm.save_self_signed_cert_key_pair(
svc,
- CertKeyPair(CEPHADM_SELF_GENERATED_CERT_1, CEPHADM_SELF_GENERATED_KEY_2048),
+ TLSCredentials(CEPHADM_SELF_GENERATED_CERT_1, CEPHADM_SELF_GENERATED_KEY_2048),
host=host,
label=cert_label,
)
OAuth2ProxySpec
)
from cephadm.tests.fixtures import with_host, with_service, _run_cephadm, async_side_effect, wait
-from cephadm.tlsobject_types import CertKeyPair
+from cephadm.tlsobject_types import TLSCredentials
from ceph.utils import datetime_now
mgr.spec_store.all_specs.get.return_value = iscsi_spec
@patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
- lambda instance, dspec, ips=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ lambda instance, dspec, ips=None: TLSCredentials(ceph_generated_cert, ceph_generated_key))
def test_iscsi_client_caps(self):
iscsi_daemon_spec = CephadmDaemonDeploySpec(
@patch('cephadm.utils.resolve_ip')
@patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
- lambda instance, dspec, ips=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ lambda instance, dspec, ips=None: TLSCredentials(ceph_generated_cert, ceph_generated_key))
def test_iscsi_dashboard_config(self, mock_resolve_ip):
self.mgr.check_mon_command = MagicMock()
@patch("cephadm.module.CephadmOrchestrator.get_unique_name")
@patch("cephadm.services.iscsi.get_trusted_ips")
@patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
- lambda instance, dspec, ips=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ lambda instance, dspec, ips=None: TLSCredentials(ceph_generated_cert, ceph_generated_key))
def test_iscsi_config_with_security_enabled(self, _get_trusted_ips, _get_name, _run_cephadm, cephadm_module: CephadmOrchestrator):
iscsi_daemon_id = 'testpool.test.qwert'
@patch("cephadm.services.monitoring.password_hash", lambda password: 'alertmanager_password_hash')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: 'cephadm_root_cert')
@patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
- lambda instance, dspec, ips=None, fqdns=None: CertKeyPair('mycert', 'mykey'))
+ lambda instance, dspec, ips=None, fqdns=None: TLSCredentials('mycert', 'mykey'))
def test_alertmanager_config_when_mgmt_gw_enabled(self, _get_fqdn, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
@patch("cephadm.services.monitoring.password_hash", lambda password: 'alertmanager_password_hash')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: 'cephadm_root_cert')
@patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
- lambda instance, dspec, ips=None, fqdns=None: CertKeyPair('mycert', 'mykey'))
+ lambda instance, dspec, ips=None, fqdns=None: TLSCredentials('mycert', 'mykey'))
def test_alertmanager_config_security_enabled(self, _get_fqdn, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
@patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '::1')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: 'cephadm_root_cert')
@patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
- lambda instance, dspec, ips=None, fqdns=None: CertKeyPair('mycert', 'mykey'))
+ lambda instance, dspec, ips=None, fqdns=None: TLSCredentials('mycert', 'mykey'))
@patch('cephadm.services.cephadmservice.CephExporterService.get_keyring_with_caps', Mock(return_value='[client.ceph-exporter.test]\nkey = fake-secret\n'))
def test_ceph_exporter_config_security_enabled(self, _get_fqdn, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
)
@patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
- lambda instance, dspec, ips=None, fqdns=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ lambda instance, dspec, ips=None, fqdns=None: TLSCredentials(ceph_generated_cert, ceph_generated_key))
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: cephadm_root_ca)
@patch("cephadm.serve.CephadmServe._run_cephadm")
@patch("socket.getfqdn")
@patch("cephadm.services.monitoring.password_hash", lambda password: 'prometheus_password_hash')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: 'cephadm_root_cert')
@patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
- lambda instance, dspec, ips=None, fqdns=None: CertKeyPair('mycert', 'mykey'))
+ lambda instance, dspec, ips=None, fqdns=None: TLSCredentials('mycert', 'mykey'))
def test_prometheus_config_security_enabled(self, _run_cephadm, _get_uname, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
_get_uname.return_value = 'test'
@patch("cephadm.module.CephadmOrchestrator.get_fqdn", lambda a, b: 'host_fqdn')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: cephadm_root_ca)
@patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
- lambda instance, dspec, ips=None, fqdns=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ lambda instance, dspec, ips=None, fqdns=None: TLSCredentials(ceph_generated_cert, ceph_generated_key))
def test_grafana_config_with_mgmt_gw_and_ouath2_proxy(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(("{}", "", 0))
@patch("cephadm.module.CephadmOrchestrator.get_fqdn", lambda a, b: 'host_fqdn')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: cephadm_root_ca)
@patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
- lambda instance, dspec, ips=None, fqdns=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ lambda instance, dspec, ips=None, fqdns=None: TLSCredentials(ceph_generated_cert, ceph_generated_key))
def test_grafana_config_with_mgmt_gw(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(("{}", "", 0))
cephadm_module, GrafanaSpec("grafana")
) as _:
cephadm_module.cert_mgr.save_self_signed_cert_key_pair('grafana',
- CertKeyPair(ceph_generated_cert, ceph_generated_key),
+ TLSCredentials(ceph_generated_cert, ceph_generated_key),
host='test')
files = {
'grafana.ini': dedent("""
@patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '1::4')
@patch("cephadm.module.CephadmOrchestrator.get_fqdn", lambda a, b: 'host_fqdn')
@patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
- lambda instance, dspec, ips=None, fqdns=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ lambda instance, dspec, ips=None, fqdns=None: TLSCredentials(ceph_generated_cert, ceph_generated_key))
def test_grafana_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(("{}", "", 0))
cephadm_module, GrafanaSpec("grafana")
) as _:
cephadm_module.cert_mgr.save_self_signed_cert_key_pair('grafana',
- CertKeyPair(ceph_generated_cert, ceph_generated_key),
+ TLSCredentials(ceph_generated_cert, ceph_generated_key),
host='test')
files = {
'grafana.ini': dedent("""
class TestAgent:
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: cephadm_root_ca)
@patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
- lambda instance, dspec, ips=None, fqdns=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ lambda instance, dspec, ips=None, fqdns=None: TLSCredentials(ceph_generated_cert, ceph_generated_key))
@patch("cephadm.serve.CephadmServe._run_cephadm")
def test_deploy_cephadm_agent(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_service_endpoints")
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_service_discovery_endpoints")
@patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
- lambda instance, dspec, ips=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ lambda instance, dspec, ips=None: TLSCredentials(ceph_generated_cert, ceph_generated_key))
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_self_signed_certificates_with_label",
- lambda instance, svc_spec, dspec, label: (ceph_generated_cert, ceph_generated_key))
+ lambda instance, svc_spec, dspec, label: TLSCredentials(ceph_generated_cert, ceph_generated_key))
@patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '::1')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: cephadm_root_ca)
@patch("cephadm.services.mgmt_gateway.get_dashboard_endpoints", lambda _: (["ceph-node-2:8443", "ceph-node-2:8443"], "https"))
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_service_endpoints")
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_service_discovery_endpoints")
@patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
- lambda instance, dspec, ips=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ lambda instance, dspec, ips=None: TLSCredentials(ceph_generated_cert, ceph_generated_key))
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_self_signed_certificates_with_label",
- lambda instance, svc_spec, dspec, label: (ceph_generated_cert, ceph_generated_key))
+ lambda instance, svc_spec, dspec, label: TLSCredentials(ceph_generated_cert, ceph_generated_key))
@patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '::1')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: cephadm_root_ca)
@patch("cephadm.services.mgmt_gateway.get_dashboard_endpoints", lambda _: (["ceph-node-2:8443", "ceph-node-2:8443"], "https"))
@patch("cephadm.serve.CephadmServe._run_cephadm")
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_service_endpoints")
@patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
- lambda instance, dspec, ips=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ lambda instance, dspec, ips=None: TLSCredentials(ceph_generated_cert, ceph_generated_key))
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_self_signed_certificates_with_label",
- lambda instance, svc_spec, dspec, label: (ceph_generated_cert, ceph_generated_key))
+ lambda instance, svc_spec, dspec, label: TLSCredentials(ceph_generated_cert, ceph_generated_key))
@patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '::1')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: cephadm_root_ca)
@patch("cephadm.services.mgmt_gateway.get_dashboard_endpoints", lambda _: (["ceph-node-2:8443", "ceph-node-2:8443"], "https"))
@patch("cephadm.serve.CephadmServe._run_cephadm")
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_service_endpoints")
@patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
- lambda instance, dspec, ips=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ lambda instance, dspec, ips=None: TLSCredentials(ceph_generated_cert, ceph_generated_key))
@patch("cephadm.services.oauth2_proxy.OAuth2ProxyService.get_certificates",
- lambda instance, dspec, ips=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ lambda instance, dspec, ips=None: TLSCredentials(ceph_generated_cert, ceph_generated_key))
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_self_signed_certificates_with_label",
- lambda instance, svc_spec, dspec, label: (ceph_generated_cert, ceph_generated_key))
+ lambda instance, svc_spec, dspec, label: TLSCredentials(ceph_generated_cert, ceph_generated_key))
@patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '::1')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: cephadm_root_ca)
@patch("cephadm.services.mgmt_gateway.get_dashboard_endpoints", lambda _: (["ceph-node-2:8443", "ceph-node-2:8443"], "https"))
return self.value
-class CertKeyPair(NamedTuple):
+class TLSCredentials(NamedTuple):
cert: str
key: str
+ ca_cert: Optional[str] = None
def __bool__(self) -> bool:
# Treat the pair as truthy only if both cert and key are non-empty
host: Optional[str]
-EMPTY_TLS_KEYPAIR = CertKeyPair('', '')
+EMPTY_TLS_CREDENTIALS = TLSCredentials('', '', '')
class TLSObjectProtocol(Protocol):
class RequiresCertificatesEntry(TypedDict):
user_cert_allowed: bool
scope: Literal['service', 'host', 'global']
+ requires_ca_cert: bool
class CertificateSource(Enum):
REQUIRES_CERTIFICATES: Dict[str, RequiresCertificatesEntry] = {
# Services that support user-provided certificates
- 'rgw': {'user_cert_allowed': True, 'scope': 'service'},
- 'ingress': {'user_cert_allowed': True, 'scope': 'service'},
- 'iscsi': {'user_cert_allowed': True, 'scope': 'service'},
- 'grafana': {'user_cert_allowed': True, 'scope': 'host'},
- 'oauth2-proxy': {'user_cert_allowed': True, 'scope': 'host'},
- 'mgmt-gateway': {'user_cert_allowed': True, 'scope': 'global'},
- 'nvmeof': {'user_cert_allowed': True, 'scope': 'service'},
+ 'rgw': {'user_cert_allowed': True, 'scope': 'service', 'requires_ca_cert': False},
+ 'ingress': {'user_cert_allowed': True, 'scope': 'service', 'requires_ca_cert': False},
+ 'iscsi': {'user_cert_allowed': True, 'scope': 'service', 'requires_ca_cert': False},
+ 'grafana': {'user_cert_allowed': True, 'scope': 'host', 'requires_ca_cert': False},
+ 'oauth2-proxy': {'user_cert_allowed': True, 'scope': 'host', 'requires_ca_cert': False},
+ 'mgmt-gateway': {'user_cert_allowed': True, 'scope': 'global', 'requires_ca_cert': False},
+ 'nvmeof': {'user_cert_allowed': True, 'scope': 'service', 'requires_ca_cert': False},
+ 'nfs': {'user_cert_allowed': True, 'scope': 'service', 'requires_ca_cert': True},
# Services that only support cephadm-signed certificates
- 'agent': {'user_cert_allowed': False, 'scope': 'host'},
- 'prometheus': {'user_cert_allowed': False, 'scope': 'host'},
- 'alertmanager': {'user_cert_allowed': False, 'scope': 'host'},
- 'ceph-exporter': {'user_cert_allowed': False, 'scope': 'host'},
- 'node-exporter': {'user_cert_allowed': False, 'scope': 'host'},
+ 'agent': {'user_cert_allowed': False, 'scope': 'host', 'requires_ca_cert': False},
+ 'prometheus': {'user_cert_allowed': False, 'scope': 'host', 'requires_ca_cert': False},
+ 'alertmanager': {'user_cert_allowed': False, 'scope': 'host', 'requires_ca_cert': False},
+ 'ceph-exporter': {'user_cert_allowed': False, 'scope': 'host', 'requires_ca_cert': False},
+ 'node-exporter': {'user_cert_allowed': False, 'scope': 'host', 'requires_ca_cert': False},
# 'loki' : {'user_cert_allowed': False, 'scope': 'host'},
# 'promtail' : {'user_cert_allowed': False, 'scope': 'host'},
# 'jaeger-agent': {'user_cert_allowed': False, 'scope': 'host'},
extra_entrypoint_args: Optional[GeneralArgList] = None,
custom_configs: Optional[List[CustomConfig]] = None,
ip_addrs: Optional[Dict[str, str]] = None,
+ ssl_ca_cert: Optional[str] = None,
):
#: See :ref:`orchestrator-cli-placement-spec`.
self.ssl = ssl
self.ssl_cert = ssl_cert
self.ssl_key = ssl_key
+ self.ssl_ca_cert = ssl_ca_cert
self.custom_sans = custom_sans
if self.service_type in self.REQUIRES_SERVICE_ID or self.service_type == 'osd':
has_cert = bool(getattr(self, "ssl_cert", None))
has_key = bool(getattr(self, "ssl_key", None))
+ has_ca_cert = bool(getattr(self, "ssl_ca_cert", None))
has_cert_src = bool(getattr(self, "certificate_source", None))
# Pairing rule for legacy inline specs
self.certificate_source = CertificateSource.CEPHADM_SIGNED.value
# Per-source constraints
- if (
- self.certificate_source == CertificateSource.INLINE.value
- and not (has_cert and has_key)
- ):
- raise SpecValidationError(
- f"When using '{CertificateSource.INLINE.value}' certificate_source, "
- "both an embedded certificate (ssl_cert) and private key"
- "(ssl_key) must be provided."
- )
-
+ if self.certificate_source == CertificateSource.INLINE.value:
+ if not (has_cert and has_key):
+ raise SpecValidationError(
+ f"When using '{CertificateSource.INLINE.value}' certificate_source, "
+ "both an embedded certificate (ssl_cert) and private key"
+ "(ssl_key) must be provided."
+ )
+ if (
+ self.REQUIRES_CERTIFICATES[self.service_type].get('requires_ca_cert', False)
+ and not has_ca_cert
+ ):
+ raise SpecValidationError(
+ f'CA certificate required for {self.service_type} service'
+ )
if (
self.certificate_source == CertificateSource.CEPHADM_SIGNED.value
and (has_cert or has_key)