import os
import re
import enum
+from enum import Enum
from collections import OrderedDict
from contextlib import contextmanager
from functools import wraps
TypeVar,
Union,
cast,
+ TypedDict,
+ Literal
)
import yaml
FuncT = TypeVar('FuncT', bound=Callable)
+class TLSBlock(TypedDict, total=False):
+ ssl: bool
+ certificate_source: str
+ ssl_cert: str
+ ssl_key: str
+ custom_sans: List[str]
+
+
+class RequiresCertificatesEntry(TypedDict):
+ user_cert_allowed: bool
+ scope: Literal['service', 'host', 'global']
+
+
+class CertificateSource(Enum):
+ """
+ Describes the source of the certificate used by cephadm:
+
+ - INLINE: Certificate is embedded inline in the spec.
+ - REFEFRENCE: Certificate is provided by the user through the certmgr.
+ - CEPHADM_SIGNED: Certificate is generated and signed by cephadm (via certmgr).
+ """
+ INLINE = "inline"
+ REFERENCE = "reference"
+ CEPHADM_SIGNED = "cephadm-signed"
+
+
def handle_type_error(method: FuncT) -> FuncT:
@wraps(method)
def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
'smb',
]
+ # List of all services that can require TLS certifiactes
+ REQUIRES_CERTIFICATES: Dict[str, RequiresCertificatesEntry] = {
+
+ # Services that support user-provided certificates
+ 'rgw': {'user_cert_allowed': True, 'scope': 'service'},
+ 'ingress': {'user_cert_allowed': True, 'scope': 'service'},
+ 'iscsi': {'user_cert_allowed': True, 'scope': 'service'},
+ 'grafana': {'user_cert_allowed': True, 'scope': 'host'},
+ 'oauth2-proxy': {'user_cert_allowed': True, 'scope': 'host'},
+ 'mgmt-gateway': {'user_cert_allowed': True, 'scope': 'global'},
+ 'nvmeof': {'user_cert_allowed': True, 'scope': 'service'},
+
+ # Services that only support cephadm-signed certificates
+ 'agent': {'user_cert_allowed': False, 'scope': 'host'},
+ 'prometheus': {'user_cert_allowed': False, 'scope': 'host'},
+ 'alertmanager': {'user_cert_allowed': False, 'scope': 'host'},
+ 'ceph-exporter': {'user_cert_allowed': False, 'scope': 'host'},
+ 'node-exporter': {'user_cert_allowed': False, 'scope': 'host'},
+ # 'loki' : {'user_cert_allowed': False, 'scope': 'host'},
+ # 'promtail' : {'user_cert_allowed': False, 'scope': 'host'},
+ # 'jaeger-agent': {'user_cert_allowed': False, 'scope': 'host'},
+ }
+
MANAGED_CONFIG_OPTIONS = [
'mds_join_fs',
]
placement: Optional[PlacementSpec] = None,
count: Optional[int] = None,
config: Optional[Dict[str, str]] = None,
+ ssl: Optional[bool] = False,
+ certificate_source: Optional[str] = None,
+ custom_sans: Optional[List[str]] = None,
+ ssl_cert: Optional[str] = None,
+ ssl_key: Optional[str] = None,
unmanaged: bool = False,
preview_only: bool = False,
networks: Optional[List[str]] = None,
#: ``rgw``, ``container``, ``ingress``
self.service_id = None
+ if self.service_type in self.REQUIRES_CERTIFICATES:
+ self.certificate_source = certificate_source
+ self.ssl = ssl
+ self.ssl_cert = ssl_cert
+ self.ssl_key = ssl_key
+ self.custom_sans = custom_sans
+
if self.service_type in self.REQUIRES_SERVICE_ID or self.service_type == 'osd':
self.service_id = service_id
def get_virtual_ip(self) -> Optional[str]:
return None
+ def is_using_certificates_source(self, source: CertificateSource) -> bool:
+ return getattr(self, 'ssl', False) is True and self.certificate_source == source.value
+
def to_json(self):
# type: () -> OrderedDict[str, Any]
ret: OrderedDict[str, Any] = OrderedDict()
val = val.to_json()
if val:
c[key] = val
+
+ if self.service_type in self.REQUIRES_CERTIFICATES:
+
+ tls: TLSBlock = {}
+ if self.ssl:
+ tls['ssl'] = self.ssl
+ if self.certificate_source:
+ tls['certificate_source'] = self.certificate_source
+ if self.ssl_cert and self.certificate_source == CertificateSource.INLINE.value:
+ tls['ssl_cert'] = self.ssl_cert
+ if self.ssl_key and self.certificate_source == CertificateSource.INLINE.value:
+ tls['ssl_key'] = self.ssl_key
+ if self.custom_sans:
+ tls['custom_sans'] = self.custom_sans
+ c.update(tls)
+
if c:
ret['spec'] = c
return ret
+ def _normalize_and_validate_tls(self) -> None:
+
+ # If service has an 'ssl' toggle and it's off, ignore TLS-related fields.
+ if not self.ssl:
+ return
+
+ valid_sources = {e.value for e in CertificateSource}
+ if self.certificate_source and self.certificate_source not in valid_sources:
+ raise SpecValidationError(
+ f"Invalid certificate_source: '{self.certificate_source}'. "
+ f"Valid values are: {', '.join(sorted(valid_sources))}"
+ )
+
+ has_cert = bool(getattr(self, "ssl_cert", None))
+ has_key = bool(getattr(self, "ssl_key", None))
+ has_cert_src = bool(getattr(self, "certificate_source", None))
+
+ # Pairing rule for legacy inline specs
+ if (has_cert or has_key) and not (has_cert and has_key):
+ raise SpecValidationError(
+ "Inline TLS credentials detected but incomplete: "
+ "both ssl_cert and ssl_key must be provided."
+ )
+
+ # Back-compat inference: old specs implied INLINE if cert+key were embedded
+ if not has_cert_src:
+ if has_cert and has_key:
+ self.certificate_source = CertificateSource.INLINE.value
+ else:
+ # set the default source to cephadm-signed
+ self.certificate_source = CertificateSource.CEPHADM_SIGNED.value
+
+ # Per-source constraints
+ if (
+ self.certificate_source == CertificateSource.INLINE.value
+ and not (has_cert and has_key)
+ ):
+ raise SpecValidationError(
+ f"When using '{CertificateSource.INLINE.value}' certificate_source, "
+ "both an embedded certificate (ssl_cert) and private key"
+ "(ssl_key) must be provided."
+ )
+
+ if (
+ self.certificate_source == CertificateSource.CEPHADM_SIGNED.value
+ and (has_cert or has_key)
+ ):
+ raise SpecValidationError(
+ f"When using certificate_source '{self.certificate_source}', custom "
+ "ssl_cert or ssl_key must not be provided."
+ )
+
def validate(self) -> None:
if not self.service_type:
raise SpecValidationError('Cannot add Service: type required')
+ if self.service_type in self.REQUIRES_CERTIFICATES:
+ self._normalize_and_validate_tls()
+
if self.service_type != 'osd':
if self.service_type in self.REQUIRES_SERVICE_ID and not self.service_id:
raise SpecValidationError('Cannot add Service: id required')
rgw_frontend_extra_args: Optional[List[str]] = None,
unmanaged: bool = False,
ssl: bool = False,
+ certificate_source: Optional[str] = None,
+ ssl_cert: Optional[str] = None,
+ ssl_key: Optional[str] = None,
+ custom_sans: Optional[List[str]] = None,
preview_only: bool = False,
config: Optional[Dict[str, str]] = None,
networks: Optional[List[str]] = None,
super(RGWSpec, self).__init__(
'rgw', service_id=service_id,
+ ssl=ssl,
+ certificate_source=certificate_source,
+ ssl_cert=ssl_cert,
+ ssl_key=ssl_key,
+ custom_sans=custom_sans,
placement=placement, unmanaged=unmanaged,
preview_only=preview_only, config=config, networks=networks,
extra_container_args=extra_container_args, extra_entrypoint_args=extra_entrypoint_args,
port: Optional[int] = None,
pool: Optional[str] = None,
enable_auth: bool = False,
+ ssl: Optional[bool] = False,
+ certificate_source: Optional[str] = None,
+ custom_sans: Optional[List[str]] = None,
+ ssl_cert: Optional[str] = None,
+ ssl_key: Optional[str] = None,
state_update_notify: Optional[bool] = True,
state_update_interval_sec: Optional[int] = 5,
break_update_interval_sec: Optional[int] = 25,
):
assert service_type == 'nvmeof'
super(NvmeofServiceSpec, self).__init__('nvmeof', service_id=service_id,
+ ssl=ssl,
+ certificate_source=certificate_source,
+ ssl_cert=ssl_cert,
+ ssl_key=ssl_key,
+ custom_sans=custom_sans,
placement=placement, unmanaged=unmanaged,
preview_only=preview_only,
config=config, networks=networks,
self.group = group or ''
#: ``enable_auth`` enables user authentication on nvmeof gateway
self.enable_auth = enable_auth
+ self.ssl = enable_auth # to force enabling ssl field when auth is enabled
#: ``state_update_notify`` enables automatic update from OMAP in nvmeof gateway
self.state_update_notify = state_update_notify
#: ``state_update_interval_sec`` number of seconds to check for updates in OMAP
self.server_key = server_key
#: ``server_cert`` gateway server certificate
self.server_cert = server_cert
+ #: ``ssl_cert`` gateway TLS server certificate
+ self.ssl_cert = ssl_cert or server_cert
+ #: ``ssl_key`` gateway TLS server key
+ self.ssl_key = ssl_key or server_key
#: ``client_key`` client key
self.client_key = client_key
#: ``client_cert`` client certificate
verify_positive_int(value, name)
break # only one should be defined, so we can stop after validating it
+ def to_json(self) -> OrderedDict[str, Any]:
+ data = super().to_json()
+ spec = data.setdefault('spec', {})
+
+ if self.ssl:
+ if self.server_cert and self.server_key:
+ spec['server_cert'] = self.server_cert
+ spec['server_key'] = self.server_key
+ else:
+ spec['ssl_cert'] = self.ssl_cert
+ spec['ssl_key'] = self.ssl_key
+
+ if self.enable_auth:
+ spec['client_cert'] = self.client_cert
+ spec['client_key'] = self.client_key
+ spec['root_ca_cert'] = self.root_ca_cert
+
+ return data
+
def validate(self) -> None:
# TODO: what other parameters should be validated as part of this function?
super(NvmeofServiceSpec, self).validate()
raise SpecValidationError('Cannot add NVMEOF: No Pool specified')
verify_boolean(self.enable_auth, "Enable authentication")
- if self.enable_auth:
- if not all([self.server_key, self.server_cert, self.client_key,
- self.client_cert, self.root_ca_cert]):
- err_msg = 'enable_auth is true but '
- for cert_key_attr in ['server_key', 'server_cert', 'client_key',
- 'client_cert', 'root_ca_cert']:
- if not hasattr(self, cert_key_attr):
- err_msg += f'{cert_key_attr}, '
- err_msg += 'attribute(s) not set in the spec'
- raise SpecValidationError(err_msg)
+ if self.enable_auth or self.ssl:
+ if self.certificate_source == CertificateSource.INLINE.value:
+ if not all([self.server_key, self.server_cert, self.client_key,
+ self.client_cert, self.root_ca_cert]):
+ err_msg = 'enable_auth is true but '
+ for cert_key_attr in ['ssl_key', 'ssl_cert', 'client_key',
+ 'client_cert', 'root_ca_cert']:
+ val = getattr(self, cert_key_attr, None)
+ if val is None or val == "":
+ err_msg += f'{cert_key_attr}, '
+ err_msg += 'attribute(s) not set in the spec'
+ raise SpecValidationError(err_msg)
if self.transports not in ['tcp']:
raise SpecValidationError('Invalid transport. Valid values are tcp')
api_user: Optional[str] = 'admin',
api_password: Optional[str] = 'admin',
api_secure: Optional[bool] = None,
+ ssl: Optional[bool] = False,
ssl_cert: Optional[str] = None,
ssl_key: Optional[str] = None,
+ certificate_source: Optional[str] = None,
+ custom_sans: Optional[List[str]] = None,
placement: Optional[PlacementSpec] = None,
unmanaged: bool = False,
preview_only: bool = False,
assert service_type == 'iscsi'
super(IscsiServiceSpec, self).__init__('iscsi', service_id=service_id,
placement=placement, unmanaged=unmanaged,
+ ssl=ssl,
+ ssl_cert=ssl_cert,
+ ssl_key=ssl_key,
+ certificate_source=certificate_source,
+ custom_sans=custom_sans,
preview_only=preview_only,
config=config, networks=networks,
extra_container_args=extra_container_args,
self.api_password = api_password
#: ``api_secure`` as defined in the ``iscsi-gateway.cfg``
self.api_secure = api_secure
- #: SSL certificate
- self.ssl_cert = ssl_cert
- #: SSL private key
- self.ssl_key = ssl_key
if not self.api_secure and self.ssl_cert and self.ssl_key:
self.api_secure = True
+ # for backward compatibility we also pick the value of api_secure
+ # as in old Specs ssl field didn't exist and api_secure was used
+ # to enable/disable API security
+ self.ssl = self.ssl or self.api_secure
+
def get_port_start(self) -> List[int]:
return [self.api_port or 5000]
first_virtual_router_id: Optional[int] = 50,
unmanaged: bool = False,
ssl: bool = False,
+ certificate_source: Optional[str] = None,
+ custom_sans: Optional[List[str]] = None,
keepalive_only: bool = False,
enable_haproxy_protocol: bool = False,
extra_container_args: Optional[GeneralArgList] = None,
'ingress', service_id=service_id,
placement=placement, config=config,
networks=networks,
+ ssl=ssl,
+ certificate_source=certificate_source,
+ ssl_cert=ssl_cert,
+ ssl_key=ssl_key,
+ custom_sans=custom_sans,
extra_container_args=extra_container_args,
extra_entrypoint_args=extra_entrypoint_args,
custom_configs=custom_configs
)
self.backend_service = backend_service
self.frontend_port = frontend_port
- self.ssl_cert = ssl_cert
- self.ssl_key = ssl_key
self.ssl_dh_param = ssl_dh_param
self.ssl_ciphers = ssl_ciphers
self.ssl_options = ssl_options
config: Optional[Dict[str, str]] = None,
networks: Optional[List[str]] = None,
placement: Optional[PlacementSpec] = None,
- ssl: Optional[bool] = True,
enable_auth: Optional[bool] = False,
port: Optional[int] = None,
ssl_cert: Optional[str] = None,
ssl_key: Optional[str] = None,
+ ssl: Optional[bool] = True,
+ certificate_source: Optional[str] = None,
+ custom_sans: Optional[List[str]] = None,
ssl_prefer_server_ciphers: Optional[str] = None,
ssl_session_tickets: Optional[str] = None,
ssl_session_timeout: Optional[str] = None,
'mgmt-gateway', service_id=service_id,
placement=placement, config=config,
networks=networks,
+ ssl=ssl,
+ ssl_cert=ssl_cert,
+ ssl_key=ssl_key,
+ certificate_source=certificate_source,
+ custom_sans=custom_sans,
preview_only=preview_only,
extra_container_args=extra_container_args,
extra_entrypoint_args=extra_entrypoint_args,
self.enable_auth = enable_auth
#: The port number on which the server will listen
self.port = port
- #: A multi-line string that contains the SSL certificate
- self.ssl_cert = ssl_cert
- #: A multi-line string that contains the SSL key
- self.ssl_key = ssl_key
#: Prefer server ciphers over client ciphers: on | off
self.ssl_prefer_server_ciphers = ssl_prefer_server_ciphers
#: A multioption flag to control session tickets: on | off
cookie_secret: Optional[str] = None,
ssl_cert: Optional[str] = None,
ssl_key: Optional[str] = None,
+ ssl: Optional[bool] = True,
+ certificate_source: Optional[str] = None,
+ custom_sans: Optional[List[str]] = None,
allowlist_domains: Optional[List[str]] = None,
unmanaged: bool = False,
extra_container_args: Optional[GeneralArgList] = None,
'oauth2-proxy', service_id=service_id,
placement=placement, config=config,
networks=networks,
+ ssl=ssl,
+ certificate_source=certificate_source,
+ ssl_cert=ssl_cert,
+ ssl_key=ssl_key,
+ custom_sans=custom_sans,
extra_container_args=extra_container_args,
extra_entrypoint_args=extra_entrypoint_args,
custom_configs=custom_configs
#: The secret key used for signing cookies. Its length must be 16,
# 24, or 32 bytes to create an AES cipher.
self.cookie_secret = cookie_secret or self.generate_random_secret()
- #: The multi-line SSL certificate for encrypting communications.
- self.ssl_cert = ssl_cert
- #: The multi-line SSL certificate private key for decrypting communications.
- self.ssl_key = ssl_key
#: List of allowed domains for safe redirection after login or logout,
# preventing unauthorized redirects.
self.allowlist_domains = allowlist_domains
service_type: str,
service_id: Optional[str] = None,
config: Optional[Dict[str, str]] = None,
+ certificate_source: Optional[str] = None,
+ ssl: Optional[bool] = True,
networks: Optional[List[str]] = None,
placement: Optional[PlacementSpec] = None,
unmanaged: bool = False,
super(MonitoringSpec, self).__init__(
service_type, service_id,
placement=placement, unmanaged=unmanaged,
+ ssl=ssl, certificate_source=certificate_source,
preview_only=preview_only, config=config,
networks=networks, extra_container_args=extra_container_args,
extra_entrypoint_args=extra_entrypoint_args,
def __init__(self,
service_type: str = 'alertmanager',
service_id: Optional[str] = None,
+ certificate_source: Optional[str] = None,
+ ssl: Optional[bool] = True,
placement: Optional[PlacementSpec] = None,
unmanaged: bool = False,
preview_only: bool = False,
super(AlertManagerSpec, self).__init__(
'alertmanager', service_id=service_id,
placement=placement, unmanaged=unmanaged,
+ ssl=ssl, certificate_source=certificate_source,
preview_only=preview_only, config=config, networks=networks, port=port,
extra_container_args=extra_container_args, extra_entrypoint_args=extra_entrypoint_args,
custom_configs=custom_configs)
def __init__(self,
service_type: str = 'grafana',
service_id: Optional[str] = None,
+ certificate_source: Optional[str] = None,
+ ssl: Optional[bool] = True,
placement: Optional[PlacementSpec] = None,
unmanaged: bool = False,
preview_only: bool = False,
assert service_type == 'grafana'
super(GrafanaSpec, self).__init__(
'grafana', service_id=service_id,
+ ssl=ssl, certificate_source=certificate_source,
placement=placement, unmanaged=unmanaged,
preview_only=preview_only, config=config, networks=networks, port=port,
extra_container_args=extra_container_args, extra_entrypoint_args=extra_entrypoint_args,
def __init__(self,
service_type: str = 'prometheus',
service_id: Optional[str] = None,
+ certificate_source: Optional[str] = None,
+ ssl: Optional[bool] = True,
placement: Optional[PlacementSpec] = None,
unmanaged: bool = False,
preview_only: bool = False,
super(PrometheusSpec, self).__init__(
'prometheus', service_id=service_id,
placement=placement, unmanaged=unmanaged,
+ ssl=ssl, certificate_source=certificate_source,
preview_only=preview_only, config=config, networks=networks, port=port, targets=targets,
extra_container_args=extra_container_args, extra_entrypoint_args=extra_entrypoint_args,
custom_configs=custom_configs)
prio_limit: Optional[int] = 5,
stats_period: Optional[int] = 5,
placement: Optional[PlacementSpec] = None,
+ certificate_source: Optional[str] = None,
+ ssl: Optional[bool] = True,
unmanaged: bool = False,
preview_only: bool = False,
extra_container_args: Optional[GeneralArgList] = None,
super(CephExporterSpec, self).__init__(
service_type,
+ ssl=ssl,
+ certificate_source=certificate_source,
placement=placement,
unmanaged=unmanaged,
preview_only=preview_only,