-from typing import TYPE_CHECKING, Tuple, Union, List, Dict, Optional, cast, Any
+from typing import TYPE_CHECKING, Tuple, Union, List, Dict, Optional, cast, Any, Callable
import logging
+from fnmatch import fnmatch
+from enum import Enum
from cephadm.ssl_cert_utils import SSLCerts, SSLConfigException
from mgr_util import verify_tls, certificate_days_to_expire, ServerConfigException
from cephadm.ssl_cert_utils import get_certificate_info, get_private_key_info
-from cephadm.tlsobject_types import Cert, PrivKey
-from cephadm.tlsobject_store import TLSObjectStore, TLSObjectScope, TLSObjectException
+from cephadm.tlsobject_types import Cert, PrivKey, TLSObjectScope, TLSObjectException, CertKeyPair
+from cephadm.tlsobject_store import TLSObjectStore
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
logger = logging.getLogger(__name__)
+class CertFilterOption(str, Enum):
+ NAME = 'name'
+ STATUS = 'status'
+ SIGNED_BY = 'signed-by'
+ SCOPE = 'scope'
+ SERVICE = 'service'
+
+ def __str__(self) -> str:
+ return self.value
+
+
+class CertStatus(str, Enum):
+ VALID = 'valid'
+ INVALID = 'invalid'
+ EXPIRED = 'expired'
+ EXPIRING = 'expiring'
+
+ def __str__(self) -> str:
+ return self.value
+
+
class CertInfo:
"""
- is_valid: True if the certificate is valid.
self.days_to_expiration = days_to_expiration
self.error_info = error_info
+ @property
+ def signed_by(self) -> str:
+ return "user" if self.user_made else "cephadm"
+
+ @property
+ def status(self) -> CertStatus:
+ """Return certificate status as a CertStatus enum."""
+ if not self.is_valid:
+ return CertStatus.EXPIRED if 'expired' in self.error_info.lower() else CertStatus.INVALID
+ if self.is_close_to_expiration:
+ return CertStatus.EXPIRING
+ return CertStatus.VALID
+
def __str__(self) -> str:
return f'{self.cert_name} ({self.target})' if self.target else f'{self.cert_name}'
This class holds the following important mappings:
- known_certs
- known_keys
- - entities
-
- First ones holds all the known certificates and keys managed by cephadm. Each certificate/key has a
- pre-defined scope: Global, Host, or Service.
-
- - Global: The same certificates is used for all the service daemons (e.g mgmt-gateway).
- - Host: Certificates specific to individual hosts within the cluster (e.g Grafana).
- - Service: Certificates tied to specific service (e.g RGW).
-
- The entities mapping associates each scoped entity with its certificates. This information is needed
- to trigger the corresponding service reconfiguration when updating some certificate and also when
- setting the cert/key pair from CLI.
+ - consumers_by_scope (maps *consumers* to the certificate/key names bound at each scope)
+
+ The first two mappings hold all the known certificate/key *names* (logical store identifiers such
+ as "rgw_ssl_cert", "rgw_ssl_key"). These names are not X.509 subjects but certs in PEM (or similar)
+ format in the internal TLS object store.
+
+ Each certificate/key name has a pre-defined scope: Global, Host, or Service.
+
+ - Global: The same certificate is used cluster-wide for the consumer (e.g., mgmt-gateway).
+ - Host: Certificates specific to individual hosts where the consumer runs (e.g., grafana, oauth2-proxy).
+ - Service: Certificates tied to a specific service type (e.g., RGW) and shared by all its daemons.
+
+ In addition to the scope, every scoped certificate/key object may have an associated *target*.
+ A target is the concrete identifier within a given scope:
+
+ - For HOST-scoped objects: the target is the host FQDN or inventory-ip name where the certificate is bound.
+ - For SERVICE-scoped objects: the target is the service instance name (e.g. "rgw.myzone").
+ - For GLOBAL objects, the target is always None because the object applies cluster-wide.
+
+ Scopes define *where* in the system an object conceptually lives, while targets specify the
+ *which one* within that scope. The TLSObjectStore uses both pieces of information when saving,
+ retrieving, or removing certificates/keys, ensuring that a single logical object name can have
+ multiple per-target instances when required.
+
+ Examples:
+
+ - cephadm_root_ca_cert --> scope=GLOBAL, target=None
+ - rgw_ssl_cert for rgw.myzone --> scope=SERVICE, target="rgw.myzone"
+ - cephadm-signed_grafana_cert on host node12 --> scope=HOST, target="node12"
+
+ The consumers_by_scope mapping associates each scoped consumer (service type or subsystem/integration such as
+ "rgw", "nfs", "nvmeof", "grafana", "oauth2-proxy") with the certificate/key *names* it uses.
+ This information is needed to trigger the corresponding service reconfiguration when updating some
+ certificate and also when setting the cert/key pair from CLI.
+
+ Notes and invariants:
+ - The cephadm root CA certificate/key are GLOBAL and unique per cluster (include fsid).
+ - Consumers are always service types or subsystems; hosts are never consumers. At HOST scope,
+ the host is the target under the consumer. It doesn't own the certificate.
+ - cephadm-signed names use a fixed prefix and are treated as HOST-scoped, e.g.:
+ cephadm-signed_<service_name>__<label>_cert
+ cephadm-signed_<service_name>__<label>_key
+ (the label is optional; separator is defined by LABEL_SEPARATOR).
+ - Each cert_name/key_name belongs to exactly one scope; SERVICE/HOST scoped names are stored
+ per target (service name or host), while GLOBAL scoped names store a single object.
"""
CEPHADM_ROOT_CA_CERT = 'cephadm_root_ca_cert'
CEPHADM_ROOT_CA_KEY = 'cephadm_root_ca_key'
CEPHADM_CERTMGR_HEALTH_ERR = 'CEPHADM_CERT_ERROR'
+ CEPHADM_SIGNED = 'cephadm-signed'
+ LABEL_SEPARATOR = "__lbl__"
def __init__(self, mgr: "CephadmOrchestrator") -> None:
self.mgr = mgr
TLSObjectScope.HOST: [],
TLSObjectScope.GLOBAL: [self.CEPHADM_ROOT_CA_KEY],
}
- self.entities: Dict[TLSObjectScope, Dict[str, Dict[str, List[str]]]] = {
+ self.consumers_by_scope: Dict[TLSObjectScope, Dict[str, Dict[str, List[str]]]] = {
TLSObjectScope.SERVICE: {},
TLSObjectScope.HOST: {},
TLSObjectScope.GLOBAL: {},
}
+ def is_cephadm_signed_object(self, object_name: str) -> bool:
+ return object_name.startswith(self.CEPHADM_SIGNED)
+
+ def self_signed_cert(self, service_name: str, label: Optional[str] = None) -> str:
+ if label:
+ return f'{self.CEPHADM_SIGNED}_{service_name}{self.LABEL_SEPARATOR}{label}_cert'
+ else:
+ return f'{self.CEPHADM_SIGNED}_{service_name}_cert'
+
+ def self_signed_key(self, service_name: str, label: Optional[str] = None) -> str:
+ if label:
+ return f'{self.CEPHADM_SIGNED}_{service_name}{self.LABEL_SEPARATOR}{label}_key'
+ else:
+ return f'{self.CEPHADM_SIGNED}_{service_name}_key'
+
+ def service_name_from_cert(self, cert_name: str) -> str:
+ prefix = f'{self.CEPHADM_SIGNED}_'
+ suffix = '_cert'
+ if cert_name.startswith(prefix) and cert_name.endswith(suffix):
+ middle = cert_name[len(prefix):-len(suffix)]
+ if self.LABEL_SEPARATOR in middle:
+ service_name, _ = middle.split(self.LABEL_SEPARATOR, 1)
+ else:
+ service_name, _ = middle, None
+ return service_name
+ return 'unknown-service'
+
def init_tlsobject_store(self) -> None:
- self.cert_store = TLSObjectStore(self.mgr, Cert, self.known_certs)
+ self.cert_store = TLSObjectStore(self.mgr, Cert, self.known_certs, self.is_cephadm_signed_object)
self.cert_store.load()
- self.key_store = TLSObjectStore(self.mgr, PrivKey, self.known_keys)
+ self.key_store = TLSObjectStore(self.mgr, PrivKey, self.known_keys, self.is_cephadm_signed_object)
self.key_store.load()
self._initialize_root_ca(self.mgr.get_mgr_ip())
def get_root_ca(self) -> str:
return self.ssl_certs.get_root_cert()
- def register_cert_key_pair(self, entity: str, cert_name: str, key_name: str, scope: TLSObjectScope) -> None:
+ def register_self_signed_cert_key_pair(self, service_name: str, label: Optional[str] = None) -> None:
+ """
+ Registers a self-signed certificate/key for a given service under host scope.
+
+ :param service_name: The name of the service.
+ """
+ self.cert_store.register_object_name(self.self_signed_cert(service_name, label), TLSObjectScope.HOST)
+ self.key_store.register_object_name(self.self_signed_key(service_name, label), TLSObjectScope.HOST)
+
+ def register_cert_key_pair(self, consumer: str, cert_name: str, key_name: str, scope: TLSObjectScope) -> None:
"""
- Registers a certificate/key for a given entity under a specific scope.
+ Registers a certificate/key for a given consumer under a specific scope.
- :param entity: The entity (e.g., service, host) owning the certificate.
+ :param consumer: The consumer of the certificate (e.g., service-type, other gobal consumer).
:param cert_name: The name of the certificate.
:param key_name: The name of the key.
:param scope: The TLSObjectScope (SERVICE, HOST, GLOBAL).
"""
- self.register_cert(entity, cert_name, scope)
- self.register_key(entity, key_name, scope)
+ self.register_cert(consumer, cert_name, scope)
+ self.register_key(consumer, key_name, scope)
- def register_cert(self, entity: str, cert_name: str, scope: TLSObjectScope) -> None:
- self._register_tls_object(entity, cert_name, scope, "certs")
+ def register_cert(self, consumer: str, cert_name: str, scope: TLSObjectScope) -> None:
+ self._register_tls_object(consumer, cert_name, scope, "certs")
- def register_key(self, entity: str, key_name: str, scope: TLSObjectScope) -> None:
- self._register_tls_object(entity, key_name, scope, "keys")
+ def register_key(self, consumer: str, key_name: str, scope: TLSObjectScope) -> None:
+ self._register_tls_object(consumer, key_name, scope, "keys")
- def _register_tls_object(self, entity: str, obj_name: str, scope: TLSObjectScope, obj_type: str) -> None:
+ def _register_tls_object(self, consumer: str, obj_name: str, scope: TLSObjectScope, obj_type: str) -> None:
"""
- Registers a TLS-related object (certificate or key) for a given entity under a specific scope.
+ Registers a TLS-related object (certificate or key) for a given consumer under a specific scope.
- :param entity: The entity (service name) owning the TLS object.
+ :param consumer: The consumer of the TLS object.
:param obj_name: The name of the certificate or key.
:param scope: The TLSObjectScope (SERVICE, HOST, GLOBAL).
:param obj_type: either "certs" or "keys".
if obj_name and obj_name not in storage[scope]:
storage[scope].append(obj_name)
- if entity not in self.entities[scope]:
- self.entities[scope][entity] = {"certs": [], "keys": []}
+ if consumer not in self.consumers_by_scope[scope]:
+ self.consumers_by_scope[scope][consumer] = {"certs": [], "keys": []}
- self.entities[scope][entity][obj_type].append(obj_name)
+ if obj_name not in self.consumers_by_scope[scope][consumer][obj_type]:
+ self.consumers_by_scope[scope][consumer][obj_type].append(obj_name)
- def cert_to_entity(self, cert_name: str) -> str:
+ def get_associated_service(self, cert_info: CertInfo) -> Optional[str]:
"""
- Retrieves the entity that owns a given certificate or key name.
-
- :param cert_name: The certificate or key name.
- :return: The entity name if found, otherwise None.
+ Retrieves the service associeted to the certificate
"""
- for scope_entities in self.entities.values():
- for entity, certs in scope_entities.items():
- if cert_name in certs:
- return entity
- return 'unkown'
+ if self.is_cephadm_signed_object(cert_info.cert_name):
+ return self.service_name_from_cert(cert_info.cert_name)
+ for scoped_consumers in self.consumers_by_scope.values():
+ for consumer, bundles in scoped_consumers.items():
+ if cert_info.cert_name in bundles.get('certs', []):
+ cert_scope = self.get_cert_scope(cert_info.cert_name)
+ if cert_scope == TLSObjectScope.SERVICE:
+ return cert_info.target
+ else:
+ return consumer
+ return None
def generate_cert(
self,
host_fqdn: Union[str, List[str]],
node_ip: Union[str, List[str]],
custom_san_list: Optional[List[str]] = None,
- ) -> Tuple[str, str]:
- return self.ssl_certs.generate_cert(host_fqdn, node_ip, custom_san_list=custom_san_list)
+ duration_in_days: Optional[int] = None,
+ ) -> CertKeyPair:
+ cert, key = self.ssl_certs.generate_cert(host_fqdn, node_ip, custom_san_list=custom_san_list, duration_in_days=duration_in_days)
+ return CertKeyPair(cert=cert, key=key)
+
+ def cert_exists(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> bool:
+ cert_obj = self.cert_store.get_tlsobject(cert_name, service_name, host)
+ return cert_obj is not None
+
+ def is_cert_editable(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> bool:
+ cert_obj = cast(Cert, self.cert_store.get_tlsobject(cert_name, service_name, host))
+ return cert_obj.editable if cert_obj else True
def get_cert(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> Optional[str]:
cert_obj = cast(Cert, self.cert_store.get_tlsobject(cert_name, service_name, host))
key_obj = cast(PrivKey, self.key_store.get_tlsobject(key_name, service_name, host))
return key_obj.key if key_obj else None
- def save_cert(self, cert_name: str, cert: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False) -> None:
- self.cert_store.save_tlsobject(cert_name, cert, service_name, host, user_made)
+ def get_self_signed_cert_key_pair(self, service_name: str, hostname: str, label: Optional[str] = None) -> CertKeyPair:
+ cert_obj = cast(Cert, self.cert_store.get_tlsobject(self.self_signed_cert(service_name, label), host=hostname))
+ key_obj = cast(PrivKey, self.key_store.get_tlsobject(self.self_signed_key(service_name, label), host=hostname))
+ cert = cert_obj.cert if cert_obj else ''
+ key = key_obj.key if key_obj else ''
+ return CertKeyPair(cert=cert, key=key)
+
+ def save_cert(self, cert_name: str, cert: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False, editable: bool = False) -> None:
+ self.cert_store.save_tlsobject(cert_name, cert, service_name, host, user_made, editable)
- def save_key(self, key_name: str, key: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False) -> None:
- self.key_store.save_tlsobject(key_name, key, service_name, host, user_made)
+ def save_key(self, key_name: str, key: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False, editable: bool = False) -> None:
+ self.key_store.save_tlsobject(key_name, key, service_name, host, user_made, editable)
- def rm_cert(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
- self.cert_store.rm_tlsobject(cert_name, service_name, host)
+ def save_self_signed_cert_key_pair(self, service_name: str, tls_pair: CertKeyPair, host: str, label: Optional[str] = None) -> None:
+ ss_cert_name = self.self_signed_cert(service_name, label)
+ ss_key_name = self.self_signed_key(service_name, label)
+ self.cert_store.save_tlsobject(ss_cert_name, tls_pair.cert, host=host, user_made=False)
+ self.key_store.save_tlsobject(ss_key_name, tls_pair.key, host=host, user_made=False)
+
+ def rm_cert(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> bool:
+ return self.cert_store.rm_tlsobject(cert_name, service_name, host)
+
+ def rm_key(self, key_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> bool:
+ return self.key_store.rm_tlsobject(key_name, service_name, host)
+
+ def rm_self_signed_cert_key_pair(self, service_name: str, host: str, label: Optional[str] = None) -> None:
+ self.rm_cert(self.self_signed_cert(service_name, label), service_name, host)
+ self.rm_key(self.self_signed_key(service_name, label), service_name, host)
+
+ def cert_ls(self, filter_by: str = '',
+ include_details: bool = False,
+ include_cephadm_signed: bool = False) -> Dict:
+ """
+ signed-by filtering behavior in `cert_ls`:
+
+ Defaults:
+ - If `include_cephadm_signed` is False and no explicit `signed-by=` is provided,
+ we auto-filter to show only user-made certs (and always include the root CA).
+ - If the caller explicitly filters by `signed-by=...`, that explicit filter wins.
+
+ Behavior matrix:
+ +------------------------+-----------------------------+----------------------------------------------+
+ | include_cephadm_signed | 'signed-by=' in filter_by? | Effective behavior on signed-by |
+ +------------------------+-----------------------------+----------------------------------------------+
+ | False | No | Auto-filter: signed-by=user + root CA |
+ | False | Yes | Use user's explicit selector |
+ | True | No | No auto filter (include user + cephadm) |
+ | True | Yes | Use user's explicit selector |
+ +------------------------+-----------------------------+----------------------------------------------+
+ """
- def rm_key(self, key_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
- self.key_store.rm_tlsobject(key_name, service_name, host)
+ def _lhs(expr: str) -> str:
+ return expr.partition('=')[0].strip().lower()
+
+ def _build_cert_context(cert_info: CertInfo) -> Dict[CertFilterOption, Any]:
+ scope = self.get_cert_scope(cert_info.cert_name)
+ svc = self.get_associated_service(cert_info) or ''
+ return {
+ CertFilterOption.NAME: cert_info.cert_name,
+ CertFilterOption.STATUS: cert_info.status,
+ CertFilterOption.SIGNED_BY: cert_info.signed_by,
+ CertFilterOption.SCOPE: scope,
+ CertFilterOption.SERVICE: svc,
+ }
+
+ def _field_filter(expr: str) -> Callable[[Dict[CertFilterOption, Any]], bool]:
+ key_str, _, value = expr.partition('=')
+ key_str = key_str.strip().lower()
+ value = value.strip()
- def cert_ls(self, include_datails: bool = False) -> Dict:
+ try:
+ key = CertFilterOption(key_str)
+ except ValueError:
+ return lambda cert_ctx: True
+
+ if key in (CertFilterOption.NAME, CertFilterOption.SERVICE):
+ return lambda cert_ctx: fnmatch(cert_ctx.get(key, ''), value)
+
+ if key in (CertFilterOption.SCOPE, CertFilterOption.STATUS, CertFilterOption.SIGNED_BY):
+ return lambda cert_ctx: cert_ctx.get(key) == value
+
+ # Default: unknown field selector -> nop filter (don't exclude)
+ return lambda cert_ctx: True
+
+ def build_filters() -> List[Callable[[Dict[CertFilterOption, Any]], bool]]:
+ filter_exprs = [e.strip() for e in filter_by.split(',') if e.strip()]
+ cert_filters = [_field_filter(expr) for expr in filter_exprs]
+ # By default: filter out cephadm-signed certs unless explicitly included
+ # with the exception of the cephadm root CA cert (CEPHADM_ROOT_CA_CERT) as
+ # technically the user may be interested in adding it to his CA trust chain
+ explicit_signed_by = any(_lhs(e) == str(CertFilterOption.SIGNED_BY) for e in filter_exprs)
+ if not include_cephadm_signed and not explicit_signed_by:
+ cert_filters.append(
+ lambda cert_ctx:
+ cert_ctx.get(CertFilterOption.SIGNED_BY) == 'user'
+ or cert_ctx[CertFilterOption.NAME] == self.CEPHADM_ROOT_CA_CERT
+ )
+ return cert_filters
+
+ filters = build_filters()
cert_objects: List = self.cert_store.list_tlsobjects()
ls: Dict = {}
for cert_name, cert_obj, target in cert_objects:
- cert_extended_info = get_certificate_info(cert_obj.cert, include_datails)
+
+ cert_info = self._check_certificate_state(cert_name, target, cert_obj)
+ ctx = _build_cert_context(cert_info)
+ if not all(f(ctx) for f in filters):
+ continue
+
+ cert_extended_info = get_certificate_info(cert_obj.cert, include_details)
cert_scope = self.get_cert_scope(cert_name)
if cert_name not in ls:
- ls[cert_name] = {'scope': str(cert_scope), 'certificates': {}}
+ ls[cert_name] = {'scope': cert_scope.value, 'certificates': {}}
if cert_scope == TLSObjectScope.GLOBAL:
ls[cert_name]['certificates'] = cert_extended_info
else:
return ls
- def key_ls(self) -> Dict:
+ def key_ls(self, include_cephadm_generated_keys: bool = False) -> Dict:
key_objects: List = self.key_store.list_tlsobjects()
ls: Dict = {}
for key_name, key_obj, target in key_objects:
+ if not include_cephadm_generated_keys and self.is_cephadm_signed_object(key_name):
+ continue
priv_key_info = get_private_key_info(key_obj.key)
key_scope = self.get_key_scope(key_name)
if key_name not in ls:
- ls[key_name] = {'scope': str(key_scope), 'keys': {}}
+ ls[key_name] = {'scope': key_scope.value, 'keys': {}}
if key_scope == TLSObjectScope.GLOBAL:
ls[key_name]['keys'] = priv_key_info
else:
ls[key_name]['keys'].update({target: priv_key_info})
# we don't want this key to be leaked
- del ls[self.CEPHADM_ROOT_CA_KEY]
+ ls.pop(self.CEPHADM_ROOT_CA_KEY, None)
return ls
- def list_entity_known_certificates(self, entity: str) -> List[str]:
+ def list_consumer_known_certificates(self, consumer: str) -> List[str]:
"""
- Retrieves all certificates associated with a given entity.
+ Retrieves all certificates associated with a given consumer.
- :param entity: The entity name.
- :return: A list of certificate names, or None if the entity is not found.
+ :param consumer: The consumer name.
+ :return: A list of certificate names, or None if the consumer is not found.
"""
- for scope, entities in self.entities.items():
- if entity in entities:
- return entities[entity]['certs'] # Return certs for the entity
+ for scope, scoped_consumers in self.consumers_by_scope.items():
+ if consumer in scoped_consumers:
+ return scoped_consumers[consumer]['certs'] # Return certs for the consumer
return []
- def get_entities(self, get_scope: bool = False) -> Dict[str, Any]:
- return {f'{scope}': entities for scope, entities in self.entities.items()}
+ def get_consumers(self, get_scope: bool = False) -> Dict[str, Any]:
+ return {scope.value: consumers for scope, consumers in self.consumers_by_scope.items()}
- def list_entities(self) -> List[str]:
+ def list_consumers(self) -> List[str]:
"""
- Retrieves a list of all registered entities across all scopes.
- :return: A list of entity names.
+ Retrieves a list of all registered consumers across all scopes.
+ :return: A list of consumer names.
"""
- entities: List[str] = []
- for scope_entities in self.entities.values():
- entities.extend(scope_entities.keys())
- return entities
+ consumers: List[str] = []
+ for scoped_consumers in self.consumers_by_scope.values():
+ consumers.extend(scoped_consumers.keys())
+ return consumers
def get_cert_scope(self, cert_name: str) -> TLSObjectScope:
+ if self.is_cephadm_signed_object(cert_name):
+ return TLSObjectScope.HOST
for scope, certificates in self.known_certs.items():
if cert_name in certificates:
return scope
cert_status = cert_info.get_status_description()
detailed_error_msgs.append(cert_status)
if not cert_info.is_valid:
- if "expired" in cert_info.error_info:
+ if 'expired' in cert_info.error_info:
expired_count += 1
else:
invalid_count += 1
elif cert_info.is_close_to_expiration:
expiring_count += 1
- # Generate a short description with a summery of all the detected issues
+ # Generate a short description with a summary of all the detected issues
issues = [
- f'{invalid_count} invalid' if invalid_count > 0 else '',
- f'{expired_count} expired' if expired_count > 0 else '',
- f'{expiring_count} expiring' if expiring_count > 0 else ''
+ f'{invalid_count} {CertStatus.INVALID}' if invalid_count > 0 else '',
+ f'{expired_count} {CertStatus.EXPIRED}' if expired_count > 0 else '',
+ f'{expiring_count} {CertStatus.EXPIRING}' if expiring_count > 0 else ''
]
issues_description = ', '.join(filter(None, issues)) # collect only non-empty issues
total_issues = invalid_count + expired_count + expiring_count
except ServerConfigException as e:
return CertInfo(cert_name, target, cert.user_made, False, False, 0, str(e))
- def prepare_certificate(self,
- cert_name: str,
- key_name: str,
- host_fqdns: Union[str, List[str]],
- host_ips: Union[str, List[str]],
- target_host: str = '',
- target_service: str = '',
- ) -> Tuple[Optional[str], Optional[str]]:
-
- if not cert_name or not key_name:
- logger.error("Certificate name and key name must be provided when calling prepare_certificates.")
- return None, None
-
- cert_obj = cast(Cert, self.cert_store.get_tlsobject(cert_name, target_service, target_host))
- key_obj = cast(PrivKey, self.key_store.get_tlsobject(key_name, target_service, target_host))
- if cert_obj and key_obj:
- target = target_host or target_service
- cert_info = self._check_certificate_state(cert_name, target, cert_obj, key_obj)
- if cert_info.is_operationally_valid():
- return cert_obj.cert, key_obj.key
- elif cert_obj.user_made:
- self._notify_certificates_health_status([cert_info])
- return None, None
- else:
- logger.warning(f'Found invalid cephadm certificate/key pair {cert_name}/{key_name}, '
- f'status: {cert_info.get_status_description()}, '
- f'error: {cert_info.error_info}')
-
- # Reaching this point means either certificates are not present or they are
- # invalid cephadm-signed certificates. Either way, we will just generate new ones.
- logger.info(f'Generating cephadm-signed certificates for {cert_name}/{key_name}')
- cert, pkey = self.generate_cert(host_fqdns, host_ips)
- self.mgr.cert_mgr.save_cert(cert_name, cert, host=target_host, service_name=target_service)
- self.mgr.cert_mgr.save_key(key_name, pkey, host=target_host, service_name=target_service)
- return cert, pkey
-
def get_problematic_certificates(self) -> List[Tuple[CertInfo, Cert]]:
def get_key(cert_name: str, key_name: str, target: Optional[str]) -> Optional[PrivKey]:
try:
- service_name, host = self.cert_store.determine_tlsobject_target(cert_name, target)
- key = cast(PrivKey, self.key_store.get_tlsobject(key_name, service_name=service_name, host=host))
+ tlsobj_target = self.cert_store.determine_tlsobject_target(cert_name, target)
+ key = cast(PrivKey, self.key_store.get_tlsobject(key_name,
+ service_name=tlsobj_target.service,
+ host=tlsobj_target.host))
return key
except TLSObjectException:
return None
if key_obj:
# certificate has a key, let's check the cert/key pair
cert_info = self._check_certificate_state(cert_name, target, cert_obj, key_obj)
- elif key_name in self.known_keys:
+
+ elif any(key_name in ks for ks in self.known_keys.values()) or self.is_cephadm_signed_object(key_name):
# certificate is supposed to have a key but it's missing
logger.error(f"Key '{key_name}' is missing for certificate '{cert_name}'.")
cert_info = CertInfo(cert_name, target, cert_obj.user_made, False, False, 0, "missing key")
try:
logger.info(f'Renewing cephadm-signed certificate for {cert_info.cert_name}')
new_cert, new_key = self.ssl_certs.renew_cert(cert_obj.cert, self.mgr.certificate_duration_days)
- service_name, host = self.cert_store.determine_tlsobject_target(cert_info.cert_name, cert_info.target)
- self.cert_store.save_tlsobject(cert_info.cert_name, new_cert, service_name=service_name, host=host)
+ tlsobj_target = self.cert_store.determine_tlsobject_target(cert_info.cert_name, cert_info.target)
+ self.cert_store.save_tlsobject(cert_info.cert_name, new_cert, service_name=tlsobj_target.service, host=tlsobj_target.host)
key_name = cert_info.cert_name.replace('_cert', '_key')
- self.key_store.save_tlsobject(key_name, new_key, service_name=service_name, host=host)
+ self.key_store.save_tlsobject(key_name, new_key, service_name=tlsobj_target.service, host=tlsobj_target.host)
return True
except SSLConfigException as e:
logger.error(f'Error while trying to renew cephadm-signed certificate for {cert_info.cert_name}: {e}')
# This is a cephadm-signed certificate, let's try to fix it
if not cert_info.is_valid:
# Remove the invalid certificate to force regeneration
- service_name, host = self.cert_store.determine_tlsobject_target(cert_info.cert_name, cert_info.target)
+ tlsobj_target = self.cert_store.determine_tlsobject_target(cert_info.cert_name, cert_info.target)
logger.info(
f'Removing invalid certificate for {cert_info.cert_name} to trigger regeneration '
- f'(service: {service_name}, host: {host}).'
+ f'(service: {tlsobj_target.service}, host: {tlsobj_target.host}).'
)
- self.cert_store.rm_tlsobject(cert_info.cert_name, service_name, host)
+ self.cert_store.rm_tlsobject(cert_info.cert_name, tlsobj_target.service, tlsobj_target.host)
return True
elif cert_info.is_close_to_expiration:
return self._renew_self_signed_certificate(cert_info, cert_obj)
continue
if fix_issues and trigger_auto_fix(cert_info, cert_obj):
- services_to_reconfig.add(self.cert_to_entity(cert_info.cert_name))
+ svc = self.get_associated_service(cert_info)
+ if svc:
+ services_to_reconfig.add(svc)
+ else:
+ logger.error(f'Cannot find the service associated with the certificate {cert_info.cert_name}')
- # Clear previously reported issues as we are newly checking all the certifiactes
+ # Clear previously reported issues as we are newly checking all the certificates
self.certificates_health_report = []
# All problematic certificates have been processed. certs_with_issues now only
-from typing import Any, Dict, Union, List, Tuple, Optional, TYPE_CHECKING, Type
-from enum import Enum
+from typing import (Any,
+ Dict,
+ Union,
+ List,
+ Tuple,
+ Optional,
+ TYPE_CHECKING,
+ Type,
+ Callable)
import json
import logging
-from cephadm.tlsobject_types import TLSObjectProtocol, TLSObjectException
+from cephadm.tlsobject_types import TLSObjectProtocol, TLSObjectException, TLSObjectScope, TLSObjectTarget
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
-TLSOBJECT_STORE_PREFIX = 'cert_store.'
-
-
logger = logging.getLogger(__name__)
-class TLSObjectScope(Enum):
- HOST = "host"
- SERVICE = "service"
- GLOBAL = "global"
- UNKNOWN = "unknown"
-
- def __str__(self) -> str:
- return self.value
-
- def __repr__(self) -> str:
- return self.value
+TLSOBJECT_STORE_PREFIX = 'cert_store.'
class TLSObjectStore():
def __init__(self, mgr: 'CephadmOrchestrator',
tlsobject_class: Type[TLSObjectProtocol],
- known_entities: Dict[TLSObjectScope, List[str]]) -> None:
+ known_objects_names: Dict[TLSObjectScope, List[str]],
+ cephadm_signed_obj_checker: Callable[[str], bool]) -> None:
self.mgr: CephadmOrchestrator = mgr
+ self.cephadm_signed_object_checker = cephadm_signed_obj_checker
self.tlsobject_class = tlsobject_class
- all_known_entities = [item for sublist in known_entities.values() for item in sublist]
- self.known_entities: Dict[str, Any] = {key: {} for key in all_known_entities}
- self.per_service_name_tlsobjects = known_entities[TLSObjectScope.SERVICE]
- self.per_host_tlsobjects = known_entities[TLSObjectScope.HOST]
- self.global_tlsobjects = known_entities[TLSObjectScope.GLOBAL]
+ all_known_objects_names = [item for sublist in known_objects_names.values() for item in sublist]
+ self.objects_by_name: Dict[str, Any] = {key: {} for key in all_known_objects_names}
+ self.service_scoped_objects = known_objects_names[TLSObjectScope.SERVICE]
+ self.host_scoped_objects = known_objects_names[TLSObjectScope.HOST]
+ self.global_scoped_objects = known_objects_names[TLSObjectScope.GLOBAL]
self.store_prefix = f'{TLSOBJECT_STORE_PREFIX}{tlsobject_class.STORAGE_PREFIX}.'
- def determine_tlsobject_target(self, entity: str, target: Optional[str]) -> Tuple[Optional[str], Optional[str]]:
- if entity in self.per_service_name_tlsobjects:
- return (target, None)
- elif entity in self.per_host_tlsobjects:
- return (None, target)
+ def register_object_name(self, obj_name: str, scope: TLSObjectScope) -> None:
+ """
+ Register a new TLS object name under the specified scope if it does not already exist.
+ Args:
+ obj_name (str): The name of the TLS object to add.
+ scope (TLSObjectScope): The scope of the object (SERVICE, HOST, or GLOBAL).
+ Raises:
+ ValueError: If an invalid scope is provided.
+ """
+ if obj_name not in self.objects_by_name:
+ # Initialize an empty dictionary to track TLS objects for this obj_name
+ self.objects_by_name[obj_name] = {}
+
+ # Add to the appropriate scope list
+ if scope == TLSObjectScope.SERVICE and obj_name not in self.service_scoped_objects:
+ self.service_scoped_objects.append(obj_name)
+ elif scope == TLSObjectScope.HOST and obj_name not in self.host_scoped_objects:
+ self.host_scoped_objects.append(obj_name)
+ elif scope == TLSObjectScope.GLOBAL and obj_name not in self.global_scoped_objects:
+ self.global_scoped_objects.append(obj_name)
+ elif scope not in [TLSObjectScope.HOST, TLSObjectScope.SERVICE, TLSObjectScope.GLOBAL]:
+ raise ValueError(f"Invalid TLSObjectScope '{scope}' for obj_name '{obj_name}'")
+
+ def determine_tlsobject_target(self, obj_name: str, target: Optional[str]) -> TLSObjectTarget:
+ if obj_name in self.service_scoped_objects:
+ return TLSObjectTarget(service=target, host=None)
+ elif obj_name in self.host_scoped_objects:
+ return TLSObjectTarget(service=None, host=target)
else:
- return (None, None)
+ return TLSObjectTarget(service=None, host=None)
+
+ def get_tlsobject_scope_and_target(self, obj_name: str,
+ service_name: Optional[str] = None,
+ host: Optional[str] = None) -> Tuple[TLSObjectScope, Optional[Any]]:
- def get_tlsobject_scope_and_target(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> Tuple[TLSObjectScope, Optional[Any]]:
- if entity in self.per_service_name_tlsobjects:
+ if obj_name in self.service_scoped_objects:
return TLSObjectScope.SERVICE, service_name
- elif entity in self.per_host_tlsobjects:
+ elif obj_name in self.host_scoped_objects:
return TLSObjectScope.HOST, host
- elif entity in self.global_tlsobjects:
+ elif obj_name in self.global_scoped_objects:
return TLSObjectScope.GLOBAL, None
else:
return TLSObjectScope.UNKNOWN, None
- def get_tlsobject(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> Optional[TLSObjectProtocol]:
- self._validate_tlsobject_entity(entity, service_name, host)
- scope, target = self.get_tlsobject_scope_and_target(entity, service_name, host)
+ def get_tlsobject(self, obj_name: str,
+ service_name: Optional[str] = None,
+ host: Optional[str] = None) -> Optional[TLSObjectProtocol]:
+
+ self._validate_tlsobject_name(obj_name, service_name, host)
+ scope, target = self.get_tlsobject_scope_and_target(obj_name, service_name, host)
if scope == TLSObjectScope.GLOBAL:
- return self.known_entities.get(entity)
+ return self.objects_by_name.get(obj_name)
else:
- return self.known_entities.get(entity, {}).get(target)
-
- def save_tlsobject(self, entity: str, tlsobject: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False) -> None:
- self._validate_tlsobject_entity(entity, service_name, host)
- tlsobject = self.tlsobject_class(tlsobject, user_made)
- scope, target = self.get_tlsobject_scope_and_target(entity, service_name, host)
+ return self.objects_by_name.get(obj_name, {}).get(target)
+
+ def save_tlsobject(self, obj_name: str,
+ tlsobject: str,
+ service_name: Optional[str] = None,
+ host: Optional[str] = None,
+ user_made: bool = False,
+ editable: bool = False) -> None:
+
+ self._validate_tlsobject_name(obj_name, service_name, host)
+ tlsobject = self.tlsobject_class(tlsobject, user_made, editable)
+ scope, target = self.get_tlsobject_scope_and_target(obj_name, service_name, host)
j: Union[str, Dict[Any, Any], None] = None
if scope in (TLSObjectScope.SERVICE, TLSObjectScope.HOST):
- self.known_entities[entity][target] = tlsobject
+ self.objects_by_name[obj_name][target] = tlsobject
j = {
- key: self.tlsobject_class.to_json(self.known_entities[entity][key])
- for key in self.known_entities[entity]
+ key: self.tlsobject_class.to_json(self.objects_by_name[obj_name][key])
+ for key in self.objects_by_name[obj_name]
}
- self.mgr.set_store(self.store_prefix + entity, json.dumps(j))
+ self.mgr.set_store(self.store_prefix + obj_name, json.dumps(j))
elif scope == TLSObjectScope.GLOBAL:
- self.known_entities[entity] = tlsobject
+ self.objects_by_name[obj_name] = tlsobject
j = self.tlsobject_class.to_json(tlsobject)
- self.mgr.set_store(self.store_prefix + entity, json.dumps(j))
+ self.mgr.set_store(self.store_prefix + obj_name, json.dumps(j))
else:
- logger.error(f'Trying to save entity {entity} with a not-supported/unknown TLSObjectScope scope {scope.value}')
+ logger.error(f'Trying to save TLS object name {obj_name} with a not-supported/unknown TLSObjectScope scope {scope.value}')
+
+ def rm_tlsobject(self, obj_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> bool:
+ """
+ Remove a TLS object from the in-memory registry and persist the change.
+
+ Behavior by scope:
+ - SERVICE / HOST: Removes the entry for the given target (service_name/host).
+ If this was the last target for that name, the name remains registered with
+ an empty per-target map.
+ - GLOBAL: Resets the object for `obj_name` to an empty instance of
+ `tlsobject_class` and writes it back to the store. The store key is NOT
+ deleted; the empty object acts as a tombstone.
+
+ Keeping the KV key stable allows watchers/consumers to distinguish
+ "known-but-empty" from "unknown", and lets future `save_tlsobject(...)`
+ calls reuse the same name without recreating metadata.
+
+ Args:
+ obj_name: Registered TLS object name to remove.
+ service_name: Required when the name is service-scoped; identifies the target.
+ host: Required when the name is host-scoped; identifies the target.
+
+ Returns:
+ True if a change was persisted to the store; False if there was nothing to remove.
- def rm_tlsobject(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
- """Remove a tlsobjectificate for a specific entity, service, or host."""
- self._validate_tlsobject_entity(entity, service_name, host)
- scope, target = self.get_tlsobject_scope_and_target(entity, service_name, host)
+ Raises:
+ TLSObjectException: If `obj_name` is unknown, or the required `service_name`/`host`
+ target is missing for a scoped name, or the name resolves to an
+ unsupported/unknown scope.
+
+ Notes:
+ - An “empty” TLS object is falsy and serializes to the minimal JSON defined
+ by `tlsobject_class.to_json`.
+ """
+ self._validate_tlsobject_name(obj_name, service_name, host)
+ scope, target = self.get_tlsobject_scope_and_target(obj_name, service_name, host)
j: Union[str, Dict[Any, Any], None] = None
if scope in (TLSObjectScope.SERVICE, TLSObjectScope.HOST):
- if entity in self.known_entities and target in self.known_entities[entity]:
- del self.known_entities[entity][target]
+ if obj_name in self.objects_by_name and target in self.objects_by_name[obj_name]:
+ del self.objects_by_name[obj_name][target]
j = {
- key: self.tlsobject_class.to_json(self.known_entities[entity][key])
- for key in self.known_entities[entity]
+ key: self.tlsobject_class.to_json(self.objects_by_name[obj_name][key])
+ for key in self.objects_by_name[obj_name]
}
- self.mgr.set_store(self.store_prefix + entity, json.dumps(j))
+ self.mgr.set_store(self.store_prefix + obj_name, json.dumps(j))
+ return True
elif scope == TLSObjectScope.GLOBAL:
- self.known_entities[entity] = self.tlsobject_class()
- j = self.tlsobject_class.to_json(self.known_entities[entity])
- self.mgr.set_store(self.store_prefix + entity, json.dumps(j))
+ self.objects_by_name[obj_name] = self.tlsobject_class()
+ j = self.tlsobject_class.to_json(self.objects_by_name[obj_name])
+ self.mgr.set_store(self.store_prefix + obj_name, json.dumps(j))
+ return True
else:
- raise TLSObjectException(f'Attempted to remove {self.tlsobject_class.__name__.lower()} for unknown entity {entity}')
+ raise TLSObjectException(f'Attempted to remove {self.tlsobject_class.__name__.lower()} for unknown obj_name {obj_name}')
+ return False
- def _validate_tlsobject_entity(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
+ def _validate_tlsobject_name(self, obj_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
cred_type = self.tlsobject_class.__name__.lower()
- if entity not in self.known_entities.keys():
- raise TLSObjectException(f'Attempted to access {cred_type} for unknown entity {entity}')
- if entity in self.per_host_tlsobjects and not host:
- raise TLSObjectException(f'Need host to access {cred_type} for entity {entity}')
- if entity in self.per_service_name_tlsobjects and not service_name:
- raise TLSObjectException(f'Need service name to access {cred_type} for entity {entity}')
-
- def list_tlsobjects(self) -> List[Tuple[str, Type[TLSObjectProtocol], Optional[str]]]:
+ if obj_name not in self.objects_by_name.keys():
+ raise TLSObjectException(f'Attempted to access {cred_type} for unknown TLS object name {obj_name}')
+ if obj_name in self.host_scoped_objects and not host:
+ raise TLSObjectException(f'Need host to access {cred_type} for TLS object {obj_name}')
+ if obj_name in self.service_scoped_objects and not service_name:
+ raise TLSObjectException(f'Need service name to access {cred_type} for TLS object {obj_name}')
+
+ def list_tlsobjects(self) -> List[Tuple[str, TLSObjectProtocol, Optional[str]]]:
"""
Returns a shallow list of all known TLS objects, including their targets.
Returns:
- List of tuples: (entity, tlsobject, target)
- - entity: The TLS object entity name.
+ List of tuples: (obj_name, tlsobject, target)
+ - obj_name: The TLS object name.
- tlsobject: The TLS object itself.
- target: The associated target (service_name, host, or None for global).
"""
tlsobjects = []
- for known_entity, value in self.known_entities.items():
+ for known_obj_name, value in self.objects_by_name.items():
if isinstance(value, dict): # Handle per-service or per-host TLS objects
for target, tlsobject in value.items():
if tlsobject:
- tlsobjects.append((known_entity, tlsobject, target))
+ tlsobjects.append((known_obj_name, tlsobject, target))
elif value: # Handle Global TLS objects
- tlsobjects.append((known_entity, value, None))
+ tlsobjects.append((known_obj_name, value, None))
return tlsobjects
def load(self) -> None:
for k, v in self.mgr.get_store_prefix(self.store_prefix).items():
- entity = k[len(self.store_prefix):]
- if entity not in self.known_entities:
- logger.warning(f"TLSObjectStore: Discarding unknown entity '{entity}'")
+ obj_name = k[len(self.store_prefix):]
+ is_cephadm_signed_object = self.cephadm_signed_object_checker(obj_name)
+ if not is_cephadm_signed_object and obj_name not in self.objects_by_name:
+ logger.warning(f"TLSObjectStore: Discarding unknown obj_name '{obj_name}'")
+ continue
+
+ try:
+ tls_object_targets = json.loads(v)
+ except json.JSONDecodeError as e:
+ logger.warning(
+ f"TLSObjectStore: Cannot parse JSON for '{obj_name}': "
+ f"key={k}, len={len(v) if v else 0}, startswith={v[:20]!r}, error={e}"
+ )
+ continue
+ except Exception as e:
+ logger.error(
+ f"TLSObjectStore: Unexpected error happened while trying to parse JSON for '{obj_name}': "
+ f"key={k}, len={len(v) if v else 0}, startswith={v[:20]!r}, error={e}"
+ )
continue
- entity_targets = json.loads(v)
- if entity in self.per_service_name_tlsobjects or entity in self.per_host_tlsobjects:
- self.known_entities[entity] = {}
- for target in entity_targets:
- tlsobject = self.tlsobject_class.from_json(entity_targets[target])
+
+ if is_cephadm_signed_object or (obj_name in self.service_scoped_objects) or (obj_name in self.host_scoped_objects):
+ if is_cephadm_signed_object and obj_name not in self.host_scoped_objects:
+ self.host_scoped_objects.append(obj_name)
+ self.objects_by_name[obj_name] = {}
+ for target in tls_object_targets:
+ tlsobject = self.tlsobject_class.from_json(tls_object_targets[target])
if tlsobject:
- self.known_entities[entity][target] = tlsobject
- elif entity in self.global_tlsobjects:
- tlsobject = self.tlsobject_class.from_json(entity_targets)
+ self.objects_by_name[obj_name][target] = tlsobject
+ elif obj_name in self.global_scoped_objects:
+ tlsobject = self.tlsobject_class.from_json(tls_object_targets)
if tlsobject:
- self.known_entities[entity] = tlsobject
+ self.objects_by_name[obj_name] = tlsobject
else:
- logger.error(f"TLSObjectStore: Found a known entity {entity} with unknown scope!")
+ logger.error(f"TLSObjectStore: Found a known TLS object name {obj_name} with unknown scope!")