]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: adding enahanced support for self-signed certs
authorRedouane Kachach <rkachach@ibm.com>
Tue, 12 Aug 2025 14:37:32 +0000 (16:37 +0200)
committerRedouane Kachach <rkachach@ibm.com>
Sat, 6 Sep 2025 21:39:40 +0000 (23:39 +0200)
Signed-off-by: Redouane Kachach <rkachach@ibm.com>
src/pybind/mgr/cephadm/cert_mgr.py
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tlsobject_store.py
src/pybind/mgr/cephadm/tlsobject_types.py
src/pybind/mgr/orchestrator/_interface.py

index b0514b0695b91a05c465b8e7f9e829d0c9491304..6627017f31d00d435d04dfcc63e3b819b8a7b819 100644 (file)
@@ -1,11 +1,13 @@
-from typing import TYPE_CHECKING, Tuple, Union, List, Dict, Optional, cast, Any
+from typing import TYPE_CHECKING, Tuple, Union, List, Dict, Optional, cast, Any, Callable
 import logging
+from fnmatch import fnmatch
+from enum import Enum
 
 from cephadm.ssl_cert_utils import SSLCerts, SSLConfigException
 from mgr_util import verify_tls, certificate_days_to_expire, ServerConfigException
 from cephadm.ssl_cert_utils import get_certificate_info, get_private_key_info
-from cephadm.tlsobject_types import Cert, PrivKey
-from cephadm.tlsobject_store import TLSObjectStore, TLSObjectScope, TLSObjectException
+from cephadm.tlsobject_types import Cert, PrivKey, TLSObjectScope, TLSObjectException, CertKeyPair
+from cephadm.tlsobject_store import TLSObjectStore
 
 if TYPE_CHECKING:
     from cephadm.module import CephadmOrchestrator
@@ -13,6 +15,27 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
+class CertFilterOption(str, Enum):
+    NAME = 'name'
+    STATUS = 'status'
+    SIGNED_BY = 'signed-by'
+    SCOPE = 'scope'
+    SERVICE = 'service'
+
+    def __str__(self) -> str:
+        return self.value
+
+
+class CertStatus(str, Enum):
+    VALID = 'valid'
+    INVALID = 'invalid'
+    EXPIRED = 'expired'
+    EXPIRING = 'expiring'
+
+    def __str__(self) -> str:
+        return self.value
+
+
 class CertInfo:
     """
       - is_valid: True if the certificate is valid.
@@ -35,6 +58,19 @@ class CertInfo:
         self.days_to_expiration = days_to_expiration
         self.error_info = error_info
 
+    @property
+    def signed_by(self) -> str:
+        return "user" if self.user_made else "cephadm"
+
+    @property
+    def status(self) -> CertStatus:
+        """Return certificate status as a CertStatus enum."""
+        if not self.is_valid:
+            return CertStatus.EXPIRED if 'expired' in self.error_info.lower() else CertStatus.INVALID
+        if self.is_close_to_expiration:
+            return CertStatus.EXPIRING
+        return CertStatus.VALID
+
     def __str__(self) -> str:
         return f'{self.cert_name} ({self.target})' if self.target else f'{self.cert_name}'
 
@@ -74,23 +110,58 @@ class CertMgr:
     This class holds the following important mappings:
       - known_certs
       - known_keys
-      - entities
-
-    First ones holds all the known certificates and keys managed by cephadm. Each certificate/key has a
-    pre-defined scope: Global, Host, or Service.
-
-       - Global: The same certificates is used for all the service daemons (e.g mgmt-gateway).
-       - Host: Certificates specific to individual hosts within the cluster (e.g Grafana).
-       - Service: Certificates tied to specific service (e.g RGW).
-
-    The entities mapping associates each scoped entity with its certificates. This information is needed
-    to trigger the corresponding service reconfiguration when updating some certificate and also when
-    setting the cert/key pair from CLI.
+      - consumers_by_scope   (maps *consumers* to the certificate/key names bound at each scope)
+
+    The first two mappings hold all the known certificate/key *names* (logical store identifiers such
+    as "rgw_ssl_cert", "rgw_ssl_key"). These names are not X.509 subjects but certs in PEM (or similar)
+    format in the internal TLS object store.
+
+    Each certificate/key name has a pre-defined scope: Global, Host, or Service.
+
+       - Global: The same certificate is used cluster-wide for the consumer (e.g., mgmt-gateway).
+       - Host: Certificates specific to individual hosts where the consumer runs (e.g., grafana, oauth2-proxy).
+       - Service: Certificates tied to a specific service type (e.g., RGW) and shared by all its daemons.
+
+    In addition to the scope, every scoped certificate/key object may have an associated *target*.
+    A target is the concrete identifier within a given scope:
+
+       - For HOST-scoped objects: the target is the host FQDN or inventory-ip name where the certificate is bound.
+       - For SERVICE-scoped objects: the target is the service instance name (e.g. "rgw.myzone").
+       - For GLOBAL objects, the target is always None because the object applies cluster-wide.
+
+    Scopes define *where* in the system an object conceptually lives, while targets specify the
+    *which one* within that scope. The TLSObjectStore uses both pieces of information when saving,
+    retrieving, or removing certificates/keys, ensuring that a single logical object name can have
+    multiple per-target instances when required.
+
+    Examples:
+
+      - cephadm_root_ca_cert --> scope=GLOBAL, target=None
+      - rgw_ssl_cert for rgw.myzone --> scope=SERVICE, target="rgw.myzone"
+      - cephadm-signed_grafana_cert on host node12 --> scope=HOST, target="node12"
+
+    The consumers_by_scope mapping associates each scoped consumer (service type or subsystem/integration such as
+    "rgw", "nfs", "nvmeof", "grafana", "oauth2-proxy") with the certificate/key *names* it uses.
+    This information is needed to trigger the corresponding service reconfiguration when updating some
+    certificate and also when setting the cert/key pair from CLI.
+
+    Notes and invariants:
+      - The cephadm root CA certificate/key are GLOBAL and unique per cluster (include fsid).
+      - Consumers are always service types or subsystems; hosts are never consumers. At HOST scope,
+        the host is the target under the consumer. It doesn't own the certificate.
+      - cephadm-signed names use a fixed prefix and are treated as HOST-scoped, e.g.:
+            cephadm-signed_<service_name>__<label>_cert
+            cephadm-signed_<service_name>__<label>_key
+        (the label is optional; separator is defined by LABEL_SEPARATOR).
+      - Each cert_name/key_name belongs to exactly one scope; SERVICE/HOST scoped names are stored
+        per target (service name or host), while GLOBAL scoped names store a single object.
     """
 
     CEPHADM_ROOT_CA_CERT = 'cephadm_root_ca_cert'
     CEPHADM_ROOT_CA_KEY = 'cephadm_root_ca_key'
     CEPHADM_CERTMGR_HEALTH_ERR = 'CEPHADM_CERT_ERROR'
+    CEPHADM_SIGNED = 'cephadm-signed'
+    LABEL_SEPARATOR = "__lbl__"
 
     def __init__(self, mgr: "CephadmOrchestrator") -> None:
         self.mgr = mgr
@@ -105,16 +176,43 @@ class CertMgr:
             TLSObjectScope.HOST: [],
             TLSObjectScope.GLOBAL: [self.CEPHADM_ROOT_CA_KEY],
         }
-        self.entities: Dict[TLSObjectScope, Dict[str, Dict[str, List[str]]]] = {
+        self.consumers_by_scope: Dict[TLSObjectScope, Dict[str, Dict[str, List[str]]]] = {
             TLSObjectScope.SERVICE: {},
             TLSObjectScope.HOST: {},
             TLSObjectScope.GLOBAL: {},
         }
 
+    def is_cephadm_signed_object(self, object_name: str) -> bool:
+        return object_name.startswith(self.CEPHADM_SIGNED)
+
+    def self_signed_cert(self, service_name: str, label: Optional[str] = None) -> str:
+        if label:
+            return f'{self.CEPHADM_SIGNED}_{service_name}{self.LABEL_SEPARATOR}{label}_cert'
+        else:
+            return f'{self.CEPHADM_SIGNED}_{service_name}_cert'
+
+    def self_signed_key(self, service_name: str, label: Optional[str] = None) -> str:
+        if label:
+            return f'{self.CEPHADM_SIGNED}_{service_name}{self.LABEL_SEPARATOR}{label}_key'
+        else:
+            return f'{self.CEPHADM_SIGNED}_{service_name}_key'
+
+    def service_name_from_cert(self, cert_name: str) -> str:
+        prefix = f'{self.CEPHADM_SIGNED}_'
+        suffix = '_cert'
+        if cert_name.startswith(prefix) and cert_name.endswith(suffix):
+            middle = cert_name[len(prefix):-len(suffix)]
+            if self.LABEL_SEPARATOR in middle:
+                service_name, _ = middle.split(self.LABEL_SEPARATOR, 1)
+            else:
+                service_name, _ = middle, None
+            return service_name
+        return 'unknown-service'
+
     def init_tlsobject_store(self) -> None:
-        self.cert_store = TLSObjectStore(self.mgr, Cert, self.known_certs)
+        self.cert_store = TLSObjectStore(self.mgr, Cert, self.known_certs, self.is_cephadm_signed_object)
         self.cert_store.load()
-        self.key_store = TLSObjectStore(self.mgr, PrivKey, self.known_keys)
+        self.key_store = TLSObjectStore(self.mgr, PrivKey, self.known_keys, self.is_cephadm_signed_object)
         self.key_store.load()
         self._initialize_root_ca(self.mgr.get_mgr_ip())
 
@@ -138,29 +236,38 @@ class CertMgr:
     def get_root_ca(self) -> str:
         return self.ssl_certs.get_root_cert()
 
-    def register_cert_key_pair(self, entity: str, cert_name: str, key_name: str, scope: TLSObjectScope) -> None:
+    def register_self_signed_cert_key_pair(self, service_name: str, label: Optional[str] = None) -> None:
+        """
+        Registers a self-signed certificate/key for a given service under host scope.
+
+        :param service_name: The name of the service.
+        """
+        self.cert_store.register_object_name(self.self_signed_cert(service_name, label), TLSObjectScope.HOST)
+        self.key_store.register_object_name(self.self_signed_key(service_name, label), TLSObjectScope.HOST)
+
+    def register_cert_key_pair(self, consumer: str, cert_name: str, key_name: str, scope: TLSObjectScope) -> None:
         """
-        Registers a certificate/key for a given entity under a specific scope.
+        Registers a certificate/key for a given consumer under a specific scope.
 
-        :param entity: The entity (e.g., service, host) owning the certificate.
+        :param consumer: The consumer of the certificate (e.g., service-type, other gobal consumer).
         :param cert_name: The name of the certificate.
         :param key_name: The name of the key.
         :param scope: The TLSObjectScope (SERVICE, HOST, GLOBAL).
         """
-        self.register_cert(entity, cert_name, scope)
-        self.register_key(entity, key_name, scope)
+        self.register_cert(consumer, cert_name, scope)
+        self.register_key(consumer, key_name, scope)
 
-    def register_cert(self, entity: str, cert_name: str, scope: TLSObjectScope) -> None:
-        self._register_tls_object(entity, cert_name, scope, "certs")
+    def register_cert(self, consumer: str, cert_name: str, scope: TLSObjectScope) -> None:
+        self._register_tls_object(consumer, cert_name, scope, "certs")
 
-    def register_key(self, entity: str, key_name: str, scope: TLSObjectScope) -> None:
-        self._register_tls_object(entity, key_name, scope, "keys")
+    def register_key(self, consumer: str, key_name: str, scope: TLSObjectScope) -> None:
+        self._register_tls_object(consumer, key_name, scope, "keys")
 
-    def _register_tls_object(self, entity: str, obj_name: str, scope: TLSObjectScope, obj_type: str) -> None:
+    def _register_tls_object(self, consumer: str, obj_name: str, scope: TLSObjectScope, obj_type: str) -> None:
         """
-        Registers a TLS-related object (certificate or key) for a given entity under a specific scope.
+        Registers a TLS-related object (certificate or key) for a given consumer under a specific scope.
 
-        :param entity: The entity (service name) owning the TLS object.
+        :param consumer: The consumer of the TLS object.
         :param obj_name: The name of the certificate or key.
         :param scope: The TLSObjectScope (SERVICE, HOST, GLOBAL).
         :param obj_type: either "certs" or "keys".
@@ -170,31 +277,45 @@ class CertMgr:
         if obj_name and obj_name not in storage[scope]:
             storage[scope].append(obj_name)
 
-        if entity not in self.entities[scope]:
-            self.entities[scope][entity] = {"certs": [], "keys": []}
+        if consumer not in self.consumers_by_scope[scope]:
+            self.consumers_by_scope[scope][consumer] = {"certs": [], "keys": []}
 
-        self.entities[scope][entity][obj_type].append(obj_name)
+        if obj_name not in self.consumers_by_scope[scope][consumer][obj_type]:
+            self.consumers_by_scope[scope][consumer][obj_type].append(obj_name)
 
-    def cert_to_entity(self, cert_name: str) -> str:
+    def get_associated_service(self, cert_info: CertInfo) -> Optional[str]:
         """
-        Retrieves the entity that owns a given certificate or key name.
-
-        :param cert_name: The certificate or key name.
-        :return: The entity name if found, otherwise None.
+        Retrieves the service associeted to the certificate
         """
-        for scope_entities in self.entities.values():
-            for entity, certs in scope_entities.items():
-                if cert_name in certs:
-                    return entity
-        return 'unkown'
+        if self.is_cephadm_signed_object(cert_info.cert_name):
+            return self.service_name_from_cert(cert_info.cert_name)
+        for scoped_consumers in self.consumers_by_scope.values():
+            for consumer, bundles in scoped_consumers.items():
+                if cert_info.cert_name in bundles.get('certs', []):
+                    cert_scope = self.get_cert_scope(cert_info.cert_name)
+                    if cert_scope == TLSObjectScope.SERVICE:
+                        return cert_info.target
+                    else:
+                        return consumer
+        return None
 
     def generate_cert(
         self,
         host_fqdn: Union[str, List[str]],
         node_ip: Union[str, List[str]],
         custom_san_list: Optional[List[str]] = None,
-    ) -> Tuple[str, str]:
-        return self.ssl_certs.generate_cert(host_fqdn, node_ip, custom_san_list=custom_san_list)
+        duration_in_days: Optional[int] = None,
+    ) -> CertKeyPair:
+        cert, key = self.ssl_certs.generate_cert(host_fqdn, node_ip, custom_san_list=custom_san_list, duration_in_days=duration_in_days)
+        return CertKeyPair(cert=cert, key=key)
+
+    def cert_exists(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> bool:
+        cert_obj = self.cert_store.get_tlsobject(cert_name, service_name, host)
+        return cert_obj is not None
+
+    def is_cert_editable(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> bool:
+        cert_obj = cast(Cert, self.cert_store.get_tlsobject(cert_name, service_name, host))
+        return cert_obj.editable if cert_obj else True
 
     def get_cert(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> Optional[str]:
         cert_obj = cast(Cert, self.cert_store.get_tlsobject(cert_name, service_name, host))
@@ -204,26 +325,119 @@ class CertMgr:
         key_obj = cast(PrivKey, self.key_store.get_tlsobject(key_name, service_name, host))
         return key_obj.key if key_obj else None
 
-    def save_cert(self, cert_name: str, cert: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False) -> None:
-        self.cert_store.save_tlsobject(cert_name, cert, service_name, host, user_made)
+    def get_self_signed_cert_key_pair(self, service_name: str, hostname: str, label: Optional[str] = None) -> CertKeyPair:
+        cert_obj = cast(Cert, self.cert_store.get_tlsobject(self.self_signed_cert(service_name, label), host=hostname))
+        key_obj = cast(PrivKey, self.key_store.get_tlsobject(self.self_signed_key(service_name, label), host=hostname))
+        cert = cert_obj.cert if cert_obj else ''
+        key = key_obj.key if key_obj else ''
+        return CertKeyPair(cert=cert, key=key)
+
+    def save_cert(self, cert_name: str, cert: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False, editable: bool = False) -> None:
+        self.cert_store.save_tlsobject(cert_name, cert, service_name, host, user_made, editable)
 
-    def save_key(self, key_name: str, key: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False) -> None:
-        self.key_store.save_tlsobject(key_name, key, service_name, host, user_made)
+    def save_key(self, key_name: str, key: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False, editable: bool = False) -> None:
+        self.key_store.save_tlsobject(key_name, key, service_name, host, user_made, editable)
 
-    def rm_cert(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
-        self.cert_store.rm_tlsobject(cert_name, service_name, host)
+    def save_self_signed_cert_key_pair(self, service_name: str, tls_pair: CertKeyPair, host: str, label: Optional[str] = None) -> None:
+        ss_cert_name = self.self_signed_cert(service_name, label)
+        ss_key_name = self.self_signed_key(service_name, label)
+        self.cert_store.save_tlsobject(ss_cert_name, tls_pair.cert, host=host, user_made=False)
+        self.key_store.save_tlsobject(ss_key_name, tls_pair.key, host=host, user_made=False)
+
+    def rm_cert(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> bool:
+        return self.cert_store.rm_tlsobject(cert_name, service_name, host)
+
+    def rm_key(self, key_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> bool:
+        return self.key_store.rm_tlsobject(key_name, service_name, host)
+
+    def rm_self_signed_cert_key_pair(self, service_name: str, host: str, label: Optional[str] = None) -> None:
+        self.rm_cert(self.self_signed_cert(service_name, label), service_name, host)
+        self.rm_key(self.self_signed_key(service_name, label), service_name, host)
+
+    def cert_ls(self, filter_by: str = '',
+                include_details: bool = False,
+                include_cephadm_signed: bool = False) -> Dict:
+        """
+        signed-by filtering behavior in `cert_ls`:
+
+        Defaults:
+        - If `include_cephadm_signed` is False and no explicit `signed-by=` is provided,
+          we auto-filter to show only user-made certs (and always include the root CA).
+        - If the caller explicitly filters by `signed-by=...`, that explicit filter wins.
+
+        Behavior matrix:
+          +------------------------+-----------------------------+----------------------------------------------+
+          | include_cephadm_signed | 'signed-by=' in filter_by?  | Effective behavior on signed-by               |
+          +------------------------+-----------------------------+----------------------------------------------+
+          | False                  | No                          | Auto-filter: signed-by=user  + root CA        |
+          | False                  | Yes                         | Use user's explicit selector                  |
+          | True                   | No                          | No auto filter (include user + cephadm)       |
+          | True                   | Yes                         | Use user's explicit selector                  |
+          +------------------------+-----------------------------+----------------------------------------------+
+        """
 
-    def rm_key(self, key_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
-        self.key_store.rm_tlsobject(key_name, service_name, host)
+        def _lhs(expr: str) -> str:
+            return expr.partition('=')[0].strip().lower()
+
+        def _build_cert_context(cert_info: CertInfo) -> Dict[CertFilterOption, Any]:
+            scope = self.get_cert_scope(cert_info.cert_name)
+            svc = self.get_associated_service(cert_info) or ''
+            return {
+                CertFilterOption.NAME: cert_info.cert_name,
+                CertFilterOption.STATUS: cert_info.status,
+                CertFilterOption.SIGNED_BY: cert_info.signed_by,
+                CertFilterOption.SCOPE: scope,
+                CertFilterOption.SERVICE: svc,
+            }
+
+        def _field_filter(expr: str) -> Callable[[Dict[CertFilterOption, Any]], bool]:
+            key_str, _, value = expr.partition('=')
+            key_str = key_str.strip().lower()
+            value = value.strip()
 
-    def cert_ls(self, include_datails: bool = False) -> Dict:
+            try:
+                key = CertFilterOption(key_str)
+            except ValueError:
+                return lambda cert_ctx: True
+
+            if key in (CertFilterOption.NAME, CertFilterOption.SERVICE):
+                return lambda cert_ctx: fnmatch(cert_ctx.get(key, ''), value)
+
+            if key in (CertFilterOption.SCOPE, CertFilterOption.STATUS, CertFilterOption.SIGNED_BY):
+                return lambda cert_ctx: cert_ctx.get(key) == value
+
+            # Default: unknown field selector -> nop filter (don't exclude)
+            return lambda cert_ctx: True
+
+        def build_filters() -> List[Callable[[Dict[CertFilterOption, Any]], bool]]:
+            filter_exprs = [e.strip() for e in filter_by.split(',') if e.strip()]
+            cert_filters = [_field_filter(expr) for expr in filter_exprs]
+            # By default: filter out cephadm-signed certs unless explicitly included
+            # with the exception of the cephadm root CA cert (CEPHADM_ROOT_CA_CERT) as
+            # technically the user may be interested in adding it to his CA trust chain
+            explicit_signed_by = any(_lhs(e) == str(CertFilterOption.SIGNED_BY) for e in filter_exprs)
+            if not include_cephadm_signed and not explicit_signed_by:
+                cert_filters.append(
+                    lambda cert_ctx:
+                    cert_ctx.get(CertFilterOption.SIGNED_BY) == 'user'
+                    or cert_ctx[CertFilterOption.NAME] == self.CEPHADM_ROOT_CA_CERT
+                )
+            return cert_filters
+
+        filters = build_filters()
         cert_objects: List = self.cert_store.list_tlsobjects()
         ls: Dict = {}
         for cert_name, cert_obj, target in cert_objects:
-            cert_extended_info = get_certificate_info(cert_obj.cert, include_datails)
+
+            cert_info = self._check_certificate_state(cert_name, target, cert_obj)
+            ctx = _build_cert_context(cert_info)
+            if not all(f(ctx) for f in filters):
+                continue
+
+            cert_extended_info = get_certificate_info(cert_obj.cert, include_details)
             cert_scope = self.get_cert_scope(cert_name)
             if cert_name not in ls:
-                ls[cert_name] = {'scope': str(cert_scope), 'certificates': {}}
+                ls[cert_name] = {'scope': cert_scope.value, 'certificates': {}}
             if cert_scope == TLSObjectScope.GLOBAL:
                 ls[cert_name]['certificates'] = cert_extended_info
             else:
@@ -231,50 +445,54 @@ class CertMgr:
 
         return ls
 
-    def key_ls(self) -> Dict:
+    def key_ls(self, include_cephadm_generated_keys: bool = False) -> Dict:
         key_objects: List = self.key_store.list_tlsobjects()
         ls: Dict = {}
         for key_name, key_obj, target in key_objects:
+            if not include_cephadm_generated_keys and self.is_cephadm_signed_object(key_name):
+                continue
             priv_key_info = get_private_key_info(key_obj.key)
             key_scope = self.get_key_scope(key_name)
             if key_name not in ls:
-                ls[key_name] = {'scope': str(key_scope), 'keys': {}}
+                ls[key_name] = {'scope': key_scope.value, 'keys': {}}
             if key_scope == TLSObjectScope.GLOBAL:
                 ls[key_name]['keys'] = priv_key_info
             else:
                 ls[key_name]['keys'].update({target: priv_key_info})
 
         # we don't want this key to be leaked
-        del ls[self.CEPHADM_ROOT_CA_KEY]
+        ls.pop(self.CEPHADM_ROOT_CA_KEY, None)
 
         return ls
 
-    def list_entity_known_certificates(self, entity: str) -> List[str]:
+    def list_consumer_known_certificates(self, consumer: str) -> List[str]:
         """
-        Retrieves all certificates associated with a given entity.
+        Retrieves all certificates associated with a given consumer.
 
-        :param entity: The entity name.
-        :return: A list of certificate names, or None if the entity is not found.
+        :param consumer: The consumer name.
+        :return: A list of certificate names, or None if the consumer is not found.
         """
-        for scope, entities in self.entities.items():
-            if entity in entities:
-                return entities[entity]['certs']  # Return certs for the entity
+        for scope, scoped_consumers in self.consumers_by_scope.items():
+            if consumer in scoped_consumers:
+                return scoped_consumers[consumer]['certs']  # Return certs for the consumer
         return []
 
-    def get_entities(self, get_scope: bool = False) -> Dict[str, Any]:
-        return {f'{scope}': entities for scope, entities in self.entities.items()}
+    def get_consumers(self, get_scope: bool = False) -> Dict[str, Any]:
+        return {scope.value: consumers for scope, consumers in self.consumers_by_scope.items()}
 
-    def list_entities(self) -> List[str]:
+    def list_consumers(self) -> List[str]:
         """
-        Retrieves a list of all registered entities across all scopes.
-        :return: A list of entity names.
+        Retrieves a list of all registered consumers across all scopes.
+        :return: A list of consumer names.
         """
-        entities: List[str] = []
-        for scope_entities in self.entities.values():
-            entities.extend(scope_entities.keys())
-        return entities
+        consumers: List[str] = []
+        for scoped_consumers in self.consumers_by_scope.values():
+            consumers.extend(scoped_consumers.keys())
+        return consumers
 
     def get_cert_scope(self, cert_name: str) -> TLSObjectScope:
+        if self.is_cephadm_signed_object(cert_name):
+            return TLSObjectScope.HOST
         for scope, certificates in self.known_certs.items():
             if cert_name in certificates:
                 return scope
@@ -305,18 +523,18 @@ class CertMgr:
             cert_status = cert_info.get_status_description()
             detailed_error_msgs.append(cert_status)
             if not cert_info.is_valid:
-                if "expired" in cert_info.error_info:
+                if 'expired' in cert_info.error_info:
                     expired_count += 1
                 else:
                     invalid_count += 1
             elif cert_info.is_close_to_expiration:
                 expiring_count += 1
 
-        # Generate a short description with a summery of all the detected issues
+        # Generate a short description with a summary of all the detected issues
         issues = [
-            f'{invalid_count} invalid' if invalid_count > 0 else '',
-            f'{expired_count} expired' if expired_count > 0 else '',
-            f'{expiring_count} expiring' if expiring_count > 0 else ''
+            f'{invalid_count} {CertStatus.INVALID}' if invalid_count > 0 else '',
+            f'{expired_count} {CertStatus.EXPIRED}' if expired_count > 0 else '',
+            f'{expiring_count} {CertStatus.EXPIRING}' if expiring_count > 0 else ''
         ]
         issues_description = ', '.join(filter(None, issues))  # collect only non-empty issues
         total_issues = invalid_count + expired_count + expiring_count
@@ -356,48 +574,14 @@ class CertMgr:
         except ServerConfigException as e:
             return CertInfo(cert_name, target, cert.user_made, False, False, 0, str(e))
 
-    def prepare_certificate(self,
-                            cert_name: str,
-                            key_name: str,
-                            host_fqdns: Union[str, List[str]],
-                            host_ips: Union[str, List[str]],
-                            target_host: str = '',
-                            target_service: str = '',
-                            ) -> Tuple[Optional[str], Optional[str]]:
-
-        if not cert_name or not key_name:
-            logger.error("Certificate name and key name must be provided when calling prepare_certificates.")
-            return None, None
-
-        cert_obj = cast(Cert, self.cert_store.get_tlsobject(cert_name, target_service, target_host))
-        key_obj = cast(PrivKey, self.key_store.get_tlsobject(key_name, target_service, target_host))
-        if cert_obj and key_obj:
-            target = target_host or target_service
-            cert_info = self._check_certificate_state(cert_name, target, cert_obj, key_obj)
-            if cert_info.is_operationally_valid():
-                return cert_obj.cert, key_obj.key
-            elif cert_obj.user_made:
-                self._notify_certificates_health_status([cert_info])
-                return None, None
-            else:
-                logger.warning(f'Found invalid cephadm certificate/key pair {cert_name}/{key_name}, '
-                               f'status: {cert_info.get_status_description()}, '
-                               f'error: {cert_info.error_info}')
-
-        # Reaching this point means either certificates are not present or they are
-        # invalid cephadm-signed certificates. Either way, we will just generate new ones.
-        logger.info(f'Generating cephadm-signed certificates for {cert_name}/{key_name}')
-        cert, pkey = self.generate_cert(host_fqdns, host_ips)
-        self.mgr.cert_mgr.save_cert(cert_name, cert, host=target_host, service_name=target_service)
-        self.mgr.cert_mgr.save_key(key_name, pkey, host=target_host, service_name=target_service)
-        return cert, pkey
-
     def get_problematic_certificates(self) -> List[Tuple[CertInfo, Cert]]:
 
         def get_key(cert_name: str, key_name: str, target: Optional[str]) -> Optional[PrivKey]:
             try:
-                service_name, host = self.cert_store.determine_tlsobject_target(cert_name, target)
-                key = cast(PrivKey, self.key_store.get_tlsobject(key_name, service_name=service_name, host=host))
+                tlsobj_target = self.cert_store.determine_tlsobject_target(cert_name, target)
+                key = cast(PrivKey, self.key_store.get_tlsobject(key_name,
+                                                                 service_name=tlsobj_target.service,
+                                                                 host=tlsobj_target.host))
                 return key
             except TLSObjectException:
                 return None
@@ -416,7 +600,8 @@ class CertMgr:
             if key_obj:
                 # certificate has a key, let's check the cert/key pair
                 cert_info = self._check_certificate_state(cert_name, target, cert_obj, key_obj)
-            elif key_name in self.known_keys:
+
+            elif any(key_name in ks for ks in self.known_keys.values()) or self.is_cephadm_signed_object(key_name):
                 # certificate is supposed to have a key but it's missing
                 logger.error(f"Key '{key_name}' is missing for certificate '{cert_name}'.")
                 cert_info = CertInfo(cert_name, target, cert_obj.user_made, False, False, 0, "missing key")
@@ -436,10 +621,10 @@ class CertMgr:
         try:
             logger.info(f'Renewing cephadm-signed certificate for {cert_info.cert_name}')
             new_cert, new_key = self.ssl_certs.renew_cert(cert_obj.cert, self.mgr.certificate_duration_days)
-            service_name, host = self.cert_store.determine_tlsobject_target(cert_info.cert_name, cert_info.target)
-            self.cert_store.save_tlsobject(cert_info.cert_name, new_cert, service_name=service_name, host=host)
+            tlsobj_target = self.cert_store.determine_tlsobject_target(cert_info.cert_name, cert_info.target)
+            self.cert_store.save_tlsobject(cert_info.cert_name, new_cert, service_name=tlsobj_target.service, host=tlsobj_target.host)
             key_name = cert_info.cert_name.replace('_cert', '_key')
-            self.key_store.save_tlsobject(key_name, new_key, service_name=service_name, host=host)
+            self.key_store.save_tlsobject(key_name, new_key, service_name=tlsobj_target.service, host=tlsobj_target.host)
             return True
         except SSLConfigException as e:
             logger.error(f'Error while trying to renew cephadm-signed certificate for {cert_info.cert_name}: {e}')
@@ -469,12 +654,12 @@ class CertMgr:
             # This is a cephadm-signed certificate, let's try to fix it
             if not cert_info.is_valid:
                 # Remove the invalid certificate to force regeneration
-                service_name, host = self.cert_store.determine_tlsobject_target(cert_info.cert_name, cert_info.target)
+                tlsobj_target = self.cert_store.determine_tlsobject_target(cert_info.cert_name, cert_info.target)
                 logger.info(
                     f'Removing invalid certificate for {cert_info.cert_name} to trigger regeneration '
-                    f'(service: {service_name}, host: {host}).'
+                    f'(service: {tlsobj_target.service}, host: {tlsobj_target.host}).'
                 )
-                self.cert_store.rm_tlsobject(cert_info.cert_name, service_name, host)
+                self.cert_store.rm_tlsobject(cert_info.cert_name, tlsobj_target.service, tlsobj_target.host)
                 return True
             elif cert_info.is_close_to_expiration:
                 return self._renew_self_signed_certificate(cert_info, cert_obj)
@@ -494,9 +679,13 @@ class CertMgr:
                 continue
 
             if fix_issues and trigger_auto_fix(cert_info, cert_obj):
-                services_to_reconfig.add(self.cert_to_entity(cert_info.cert_name))
+                svc = self.get_associated_service(cert_info)
+                if svc:
+                    services_to_reconfig.add(svc)
+                else:
+                    logger.error(f'Cannot find the service associated with the certificate {cert_info.cert_name}')
 
-        # Clear previously reported issues as we are newly checking all the certifiactes
+        # Clear previously reported issues as we are newly checking all the certificates
         self.certificates_health_report = []
 
         # All problematic certificates have been processed. certs_with_issues now only
index cce2c6ce90fc88e5b8ae5e5cd4f0403ae4338c11..396af6a0566aa635ef6b094f5ea72a644f3cd0e5 100644 (file)
@@ -649,8 +649,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         self.tuned_profile_utils = TunedProfileUtils(self)
 
-        self._init_cert_mgr()
-
         # ensure the host lists are in sync
         for h in self.inventory.keys():
             if h not in self.cache.daemons:
@@ -663,9 +661,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self.events = EventStore(self)
         self.offline_hosts: Set[str] = set()
 
-        self.migration = Migrations(self)
-
         service_registry.init_services(self)
+        self._init_cert_mgr()
+
+        self.migration = Migrations(self)
 
         self.mgr_service: MgrService = cast(MgrService, service_registry.get_service('mgr'))
         self.osd_service: OSDService = cast(OSDService, service_registry.get_service('osd'))
@@ -723,23 +722,13 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         self.cert_mgr = CertMgr(self)
 
-        # register global certificates
-        self.cert_mgr.register_cert_key_pair('mgmt-gateway', 'mgmt_gw_cert', 'mgmt_gw_key', TLSObjectScope.GLOBAL)
-        self.cert_mgr.register_cert_key_pair('oauth2-proxy', 'oauth2_proxy_cert', 'oauth2_proxy_key', TLSObjectScope.GLOBAL)
+        for svc in service_registry.get_all_services():
+            if svc.allows_user_certificates:
+                assert svc.SCOPE != TLSObjectScope.UNKNOWN, f"Service {svc.TYPE} requieres certificates but it has not defined its svc.SCOPE field."
+                self.cert_mgr.register_cert_key_pair(svc.TYPE, svc.cert_name, svc.key_name, svc.SCOPE)
 
-        # register per-service certificates
-        self.cert_mgr.register_cert_key_pair('ingress', 'ingress_ssl_cert', 'ingress_ssl_key', TLSObjectScope.SERVICE)
-        self.cert_mgr.register_cert_key_pair('iscsi', 'iscsi_ssl_cert', 'iscsi_ssl_key', TLSObjectScope.SERVICE)
-        self.cert_mgr.register_cert_key_pair('nvmeof', 'nvmeof_server_cert', 'nvmeof_server_key', TLSObjectScope.SERVICE)
         self.cert_mgr.register_cert_key_pair('nvmeof', 'nvmeof_client_cert', 'nvmeof_client_key', TLSObjectScope.SERVICE)
-
-        # register ancilary certificates/keys
         self.cert_mgr.register_cert('nvmeof', 'nvmeof_root_ca_cert', TLSObjectScope.SERVICE)
-        self.cert_mgr.register_cert('rgw', 'rgw_frontend_ssl_cert', TLSObjectScope.SERVICE)
-        self.cert_mgr.register_key('nvmeof', 'nvmeof_encryption_key', TLSObjectScope.SERVICE)
-
-        # register per-host certificates
-        self.cert_mgr.register_cert_key_pair('grafana', 'grafana_cert', 'grafana_key', TLSObjectScope.HOST)
 
         self.cert_mgr.init_tlsobject_store()
 
@@ -3333,8 +3322,8 @@ Then run the following:
         return self.cert_mgr.cert_ls(show_details)
 
     @handle_orch_error
-    def cert_store_entity_ls(self) -> Dict[str, Dict[str, List[str]]]:
-        return self.cert_mgr.get_entities()
+    def cert_store_bindings_ls(self) -> Dict[str, Dict[str, List[str]]]:
+        return self.cert_mgr.get_consumers()
 
     @handle_orch_error
     def cert_store_reload(self) -> str:
@@ -3399,21 +3388,23 @@ Then run the following:
         force: bool = False
     ) -> str:
 
-        if entity not in self.cert_mgr.list_entities():
-            raise OrchestratorError(f"Invalid entity: {entity}. Please use 'ceph orch certmgr entity ls' to list valid entities.")
+        if consumer not in self.cert_mgr.list_consumers():
+            raise OrchestratorError(f"Invalid consumer: {consumer}. Please use 'ceph orch certmgr bindings ls' to list valid consumers.")
 
         # Check the certificate validity status
         target = service_name or hostname
-        cert_info = self.cert_mgr.check_certificate_state(entity, target, cert, key)
-        if not force and not cert_info.is_operationally_valid():
+        cert_info = self.cert_mgr.check_certificate_state(consumer, target, cert, key)
+        debug_mode = self.certificate_check_debug_mode and force
+        if not debug_mode and not cert_info.is_operationally_valid():
             raise OrchestratorError(cert_info.get_status_description())
 
-        # Obtain the certificate name (from entity)
-        cert_names = self.cert_mgr.list_entity_known_certificates(entity)
-        if len(cert_names) == 1:
-            cert_name = cert_names[0]
-        elif len(cert_names) > 1 and not cert_name:
-            raise OrchestratorError(f"Entity '{entity}' has many certificates, please use --cert-name argument to specify which one from the list: {cert_names}")
+        if not cert_name:
+            # If not provided, then obtain the certificate name by using consumer
+            cert_names = self.cert_mgr.list_consumer_known_certificates(consumer)
+            if len(cert_names) == 1:
+                cert_name = cert_names[0]
+            elif len(cert_names) > 1 and not cert_name:
+                raise OrchestratorError(f"Consumer '{consumer}' has many certificates, please use --cert-name argument to specify which one from the list: {cert_names}")
 
         # Check the certificate scope
         scope_errors = {
index 83ff9e14a9cf613d20bb2442d901ec2668616ba2..1b5141b6eeb1fe0a722820708feb872fb5f0a932 100644 (file)
-from typing import Any, Dict, Union, List, Tuple, Optional, TYPE_CHECKING, Type
-from enum import Enum
+from typing import (Any,
+                    Dict,
+                    Union,
+                    List,
+                    Tuple,
+                    Optional,
+                    TYPE_CHECKING,
+                    Type,
+                    Callable)
 import json
 import logging
 
-from cephadm.tlsobject_types import TLSObjectProtocol, TLSObjectException
+from cephadm.tlsobject_types import TLSObjectProtocol, TLSObjectException, TLSObjectScope, TLSObjectTarget
 
 
 if TYPE_CHECKING:
     from cephadm.module import CephadmOrchestrator
 
 
-TLSOBJECT_STORE_PREFIX = 'cert_store.'
-
-
 logger = logging.getLogger(__name__)
 
 
-class TLSObjectScope(Enum):
-    HOST = "host"
-    SERVICE = "service"
-    GLOBAL = "global"
-    UNKNOWN = "unknown"
-
-    def __str__(self) -> str:
-        return self.value
-
-    def __repr__(self) -> str:
-        return self.value
+TLSOBJECT_STORE_PREFIX = 'cert_store.'
 
 
 class TLSObjectStore():
 
     def __init__(self, mgr: 'CephadmOrchestrator',
                  tlsobject_class: Type[TLSObjectProtocol],
-                 known_entities: Dict[TLSObjectScope, List[str]]) -> None:
+                 known_objects_names: Dict[TLSObjectScope, List[str]],
+                 cephadm_signed_obj_checker: Callable[[str], bool]) -> None:
 
         self.mgr: CephadmOrchestrator = mgr
+        self.cephadm_signed_object_checker = cephadm_signed_obj_checker
         self.tlsobject_class = tlsobject_class
-        all_known_entities = [item for sublist in known_entities.values() for item in sublist]
-        self.known_entities: Dict[str, Any] = {key: {} for key in all_known_entities}
-        self.per_service_name_tlsobjects = known_entities[TLSObjectScope.SERVICE]
-        self.per_host_tlsobjects = known_entities[TLSObjectScope.HOST]
-        self.global_tlsobjects = known_entities[TLSObjectScope.GLOBAL]
+        all_known_objects_names = [item for sublist in known_objects_names.values() for item in sublist]
+        self.objects_by_name: Dict[str, Any] = {key: {} for key in all_known_objects_names}
+        self.service_scoped_objects = known_objects_names[TLSObjectScope.SERVICE]
+        self.host_scoped_objects = known_objects_names[TLSObjectScope.HOST]
+        self.global_scoped_objects = known_objects_names[TLSObjectScope.GLOBAL]
         self.store_prefix = f'{TLSOBJECT_STORE_PREFIX}{tlsobject_class.STORAGE_PREFIX}.'
 
-    def determine_tlsobject_target(self, entity: str, target: Optional[str]) -> Tuple[Optional[str], Optional[str]]:
-        if entity in self.per_service_name_tlsobjects:
-            return (target, None)
-        elif entity in self.per_host_tlsobjects:
-            return (None, target)
+    def register_object_name(self, obj_name: str, scope: TLSObjectScope) -> None:
+        """
+        Register a new TLS object name under the specified scope if it does not already exist.
+        Args:
+            obj_name (str): The name of the TLS object to add.
+            scope (TLSObjectScope): The scope of the object (SERVICE, HOST, or GLOBAL).
+        Raises:
+            ValueError: If an invalid scope is provided.
+        """
+        if obj_name not in self.objects_by_name:
+            # Initialize an empty dictionary to track TLS objects for this obj_name
+            self.objects_by_name[obj_name] = {}
+
+        # Add to the appropriate scope list
+        if scope == TLSObjectScope.SERVICE and obj_name not in self.service_scoped_objects:
+            self.service_scoped_objects.append(obj_name)
+        elif scope == TLSObjectScope.HOST and obj_name not in self.host_scoped_objects:
+            self.host_scoped_objects.append(obj_name)
+        elif scope == TLSObjectScope.GLOBAL and obj_name not in self.global_scoped_objects:
+            self.global_scoped_objects.append(obj_name)
+        elif scope not in [TLSObjectScope.HOST, TLSObjectScope.SERVICE, TLSObjectScope.GLOBAL]:
+            raise ValueError(f"Invalid TLSObjectScope '{scope}' for obj_name '{obj_name}'")
+
+    def determine_tlsobject_target(self, obj_name: str, target: Optional[str]) -> TLSObjectTarget:
+        if obj_name in self.service_scoped_objects:
+            return TLSObjectTarget(service=target, host=None)
+        elif obj_name in self.host_scoped_objects:
+            return TLSObjectTarget(service=None, host=target)
         else:
-            return (None, None)
+            return TLSObjectTarget(service=None, host=None)
+
+    def get_tlsobject_scope_and_target(self, obj_name: str,
+                                       service_name: Optional[str] = None,
+                                       host: Optional[str] = None) -> Tuple[TLSObjectScope, Optional[Any]]:
 
-    def get_tlsobject_scope_and_target(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> Tuple[TLSObjectScope, Optional[Any]]:
-        if entity in self.per_service_name_tlsobjects:
+        if obj_name in self.service_scoped_objects:
             return TLSObjectScope.SERVICE, service_name
-        elif entity in self.per_host_tlsobjects:
+        elif obj_name in self.host_scoped_objects:
             return TLSObjectScope.HOST, host
-        elif entity in self.global_tlsobjects:
+        elif obj_name in self.global_scoped_objects:
             return TLSObjectScope.GLOBAL, None
         else:
             return TLSObjectScope.UNKNOWN, None
 
-    def get_tlsobject(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> Optional[TLSObjectProtocol]:
-        self._validate_tlsobject_entity(entity, service_name, host)
-        scope, target = self.get_tlsobject_scope_and_target(entity, service_name, host)
+    def get_tlsobject(self, obj_name: str,
+                      service_name: Optional[str] = None,
+                      host: Optional[str] = None) -> Optional[TLSObjectProtocol]:
+
+        self._validate_tlsobject_name(obj_name, service_name, host)
+        scope, target = self.get_tlsobject_scope_and_target(obj_name, service_name, host)
         if scope == TLSObjectScope.GLOBAL:
-            return self.known_entities.get(entity)
+            return self.objects_by_name.get(obj_name)
         else:
-            return self.known_entities.get(entity, {}).get(target)
-
-    def save_tlsobject(self, entity: str, tlsobject: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False) -> None:
-        self._validate_tlsobject_entity(entity, service_name, host)
-        tlsobject = self.tlsobject_class(tlsobject, user_made)
-        scope, target = self.get_tlsobject_scope_and_target(entity, service_name, host)
+            return self.objects_by_name.get(obj_name, {}).get(target)
+
+    def save_tlsobject(self, obj_name: str,
+                       tlsobject: str,
+                       service_name: Optional[str] = None,
+                       host: Optional[str] = None,
+                       user_made: bool = False,
+                       editable: bool = False) -> None:
+
+        self._validate_tlsobject_name(obj_name, service_name, host)
+        tlsobject = self.tlsobject_class(tlsobject, user_made, editable)
+        scope, target = self.get_tlsobject_scope_and_target(obj_name, service_name, host)
         j: Union[str, Dict[Any, Any], None] = None
         if scope in (TLSObjectScope.SERVICE, TLSObjectScope.HOST):
-            self.known_entities[entity][target] = tlsobject
+            self.objects_by_name[obj_name][target] = tlsobject
             j = {
-                key: self.tlsobject_class.to_json(self.known_entities[entity][key])
-                for key in self.known_entities[entity]
+                key: self.tlsobject_class.to_json(self.objects_by_name[obj_name][key])
+                for key in self.objects_by_name[obj_name]
             }
-            self.mgr.set_store(self.store_prefix + entity, json.dumps(j))
+            self.mgr.set_store(self.store_prefix + obj_name, json.dumps(j))
         elif scope == TLSObjectScope.GLOBAL:
-            self.known_entities[entity] = tlsobject
+            self.objects_by_name[obj_name] = tlsobject
             j = self.tlsobject_class.to_json(tlsobject)
-            self.mgr.set_store(self.store_prefix + entity, json.dumps(j))
+            self.mgr.set_store(self.store_prefix + obj_name, json.dumps(j))
         else:
-            logger.error(f'Trying to save entity {entity} with a not-supported/unknown TLSObjectScope scope {scope.value}')
+            logger.error(f'Trying to save TLS object name {obj_name} with a not-supported/unknown TLSObjectScope scope {scope.value}')
+
+    def rm_tlsobject(self, obj_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> bool:
+        """
+        Remove a TLS object from the in-memory registry and persist the change.
+
+        Behavior by scope:
+          - SERVICE / HOST: Removes the entry for the given target (service_name/host).
+            If this was the last target for that name, the name remains registered with
+            an empty per-target map.
+          - GLOBAL: Resets the object for `obj_name` to an empty instance of
+            `tlsobject_class` and writes it back to the store. The store key is NOT
+            deleted; the empty object acts as a tombstone.
+
+            Keeping the KV key stable allows watchers/consumers to distinguish
+            "known-but-empty" from "unknown", and lets future `save_tlsobject(...)`
+            calls reuse the same name without recreating metadata.
+
+        Args:
+            obj_name: Registered TLS object name to remove.
+            service_name: Required when the name is service-scoped; identifies the target.
+            host: Required when the name is host-scoped; identifies the target.
+
+        Returns:
+            True if a change was persisted to the store; False if there was nothing to remove.
 
-    def rm_tlsobject(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
-        """Remove a tlsobjectificate for a specific entity, service, or host."""
-        self._validate_tlsobject_entity(entity, service_name, host)
-        scope, target = self.get_tlsobject_scope_and_target(entity, service_name, host)
+        Raises:
+            TLSObjectException: If `obj_name` is unknown, or the required `service_name`/`host`
+                target is missing for a scoped name, or the name resolves to an
+                unsupported/unknown scope.
+
+        Notes:
+            - An “empty” TLS object is falsy and serializes to the minimal JSON defined
+              by `tlsobject_class.to_json`.
+        """
+        self._validate_tlsobject_name(obj_name, service_name, host)
+        scope, target = self.get_tlsobject_scope_and_target(obj_name, service_name, host)
         j: Union[str, Dict[Any, Any], None] = None
         if scope in (TLSObjectScope.SERVICE, TLSObjectScope.HOST):
-            if entity in self.known_entities and target in self.known_entities[entity]:
-                del self.known_entities[entity][target]
+            if obj_name in self.objects_by_name and target in self.objects_by_name[obj_name]:
+                del self.objects_by_name[obj_name][target]
                 j = {
-                    key: self.tlsobject_class.to_json(self.known_entities[entity][key])
-                    for key in self.known_entities[entity]
+                    key: self.tlsobject_class.to_json(self.objects_by_name[obj_name][key])
+                    for key in self.objects_by_name[obj_name]
                 }
-                self.mgr.set_store(self.store_prefix + entity, json.dumps(j))
+                self.mgr.set_store(self.store_prefix + obj_name, json.dumps(j))
+                return True
         elif scope == TLSObjectScope.GLOBAL:
-            self.known_entities[entity] = self.tlsobject_class()
-            j = self.tlsobject_class.to_json(self.known_entities[entity])
-            self.mgr.set_store(self.store_prefix + entity, json.dumps(j))
+            self.objects_by_name[obj_name] = self.tlsobject_class()
+            j = self.tlsobject_class.to_json(self.objects_by_name[obj_name])
+            self.mgr.set_store(self.store_prefix + obj_name, json.dumps(j))
+            return True
         else:
-            raise TLSObjectException(f'Attempted to remove {self.tlsobject_class.__name__.lower()} for unknown entity {entity}')
+            raise TLSObjectException(f'Attempted to remove {self.tlsobject_class.__name__.lower()} for unknown obj_name {obj_name}')
+        return False
 
-    def _validate_tlsobject_entity(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
+    def _validate_tlsobject_name(self, obj_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
         cred_type = self.tlsobject_class.__name__.lower()
-        if entity not in self.known_entities.keys():
-            raise TLSObjectException(f'Attempted to access {cred_type} for unknown entity {entity}')
-        if entity in self.per_host_tlsobjects and not host:
-            raise TLSObjectException(f'Need host to access {cred_type} for entity {entity}')
-        if entity in self.per_service_name_tlsobjects and not service_name:
-            raise TLSObjectException(f'Need service name to access {cred_type} for entity {entity}')
-
-    def list_tlsobjects(self) -> List[Tuple[str, Type[TLSObjectProtocol], Optional[str]]]:
+        if obj_name not in self.objects_by_name.keys():
+            raise TLSObjectException(f'Attempted to access {cred_type} for unknown TLS object name {obj_name}')
+        if obj_name in self.host_scoped_objects and not host:
+            raise TLSObjectException(f'Need host to access {cred_type} for TLS object {obj_name}')
+        if obj_name in self.service_scoped_objects and not service_name:
+            raise TLSObjectException(f'Need service name to access {cred_type} for TLS object {obj_name}')
+
+    def list_tlsobjects(self) -> List[Tuple[str, TLSObjectProtocol, Optional[str]]]:
         """
         Returns a shallow list of all known TLS objects, including their targets.
 
         Returns:
-            List of tuples: (entity, tlsobject, target)
-            - entity: The TLS object entity name.
+            List of tuples: (obj_name, tlsobject, target)
+            - obj_name: The TLS object name.
             - tlsobject: The TLS object itself.
             - target: The associated target (service_name, host, or None for global).
         """
         tlsobjects = []
-        for known_entity, value in self.known_entities.items():
+        for known_obj_name, value in self.objects_by_name.items():
             if isinstance(value, dict):  # Handle per-service or per-host TLS objects
                 for target, tlsobject in value.items():
                     if tlsobject:
-                        tlsobjects.append((known_entity, tlsobject, target))
+                        tlsobjects.append((known_obj_name, tlsobject, target))
             elif value:  # Handle Global TLS objects
-                tlsobjects.append((known_entity, value, None))
+                tlsobjects.append((known_obj_name, value, None))
 
         return tlsobjects
 
     def load(self) -> None:
         for k, v in self.mgr.get_store_prefix(self.store_prefix).items():
-            entity = k[len(self.store_prefix):]
-            if entity not in self.known_entities:
-                logger.warning(f"TLSObjectStore: Discarding unknown entity '{entity}'")
+            obj_name = k[len(self.store_prefix):]
+            is_cephadm_signed_object = self.cephadm_signed_object_checker(obj_name)
+            if not is_cephadm_signed_object and obj_name not in self.objects_by_name:
+                logger.warning(f"TLSObjectStore: Discarding unknown obj_name '{obj_name}'")
+                continue
+
+            try:
+                tls_object_targets = json.loads(v)
+            except json.JSONDecodeError as e:
+                logger.warning(
+                    f"TLSObjectStore: Cannot parse JSON for '{obj_name}': "
+                    f"key={k}, len={len(v) if v else 0}, startswith={v[:20]!r}, error={e}"
+                )
+                continue
+            except Exception as e:
+                logger.error(
+                    f"TLSObjectStore: Unexpected error happened while trying to parse JSON for '{obj_name}': "
+                    f"key={k}, len={len(v) if v else 0}, startswith={v[:20]!r}, error={e}"
+                )
                 continue
-            entity_targets = json.loads(v)
-            if entity in self.per_service_name_tlsobjects or entity in self.per_host_tlsobjects:
-                self.known_entities[entity] = {}
-                for target in entity_targets:
-                    tlsobject = self.tlsobject_class.from_json(entity_targets[target])
+
+            if is_cephadm_signed_object or (obj_name in self.service_scoped_objects) or (obj_name in self.host_scoped_objects):
+                if is_cephadm_signed_object and obj_name not in self.host_scoped_objects:
+                    self.host_scoped_objects.append(obj_name)
+                self.objects_by_name[obj_name] = {}
+                for target in tls_object_targets:
+                    tlsobject = self.tlsobject_class.from_json(tls_object_targets[target])
                     if tlsobject:
-                        self.known_entities[entity][target] = tlsobject
-            elif entity in self.global_tlsobjects:
-                tlsobject = self.tlsobject_class.from_json(entity_targets)
+                        self.objects_by_name[obj_name][target] = tlsobject
+            elif obj_name in self.global_scoped_objects:
+                tlsobject = self.tlsobject_class.from_json(tls_object_targets)
                 if tlsobject:
-                    self.known_entities[entity] = tlsobject
+                    self.objects_by_name[obj_name] = tlsobject
             else:
-                logger.error(f"TLSObjectStore: Found a known entity {entity} with unknown scope!")
+                logger.error(f"TLSObjectStore: Found a known TLS object name {obj_name} with unknown scope!")
index fd46a656cc1674e717ef85d4b2339d5400771ced..1c5a02c4a188f26d63143bb9e324a3dbc1b5d31f 100644 (file)
@@ -1,15 +1,59 @@
-from typing import Any, Dict, Protocol, Union
+from enum import Enum
+from typing import Any, Dict, Protocol, Union, NamedTuple, Optional
 from orchestrator import OrchestratorError
+from ceph.utils import strtobool
+
+
+def parse_bool(value: Any) -> bool:
+    """
+    Parses a value as a boolean. Accepts:
+    - bool values (returns as-is)
+    - strings like "true"/"false" (case-insensitive)
+    - numeric values (0 -> False, non-zero -> True)
+    Raises:
+        ValueError if the value cannot be interpreted as a boolean.
+    """
+    try:
+        return strtobool(value) if isinstance(value, str) else bool(value)
+    except Exception:
+        raise ValueError(f"Expected a boolean-compatible value but got: {type(value)}")
 
 
 class TLSObjectException(OrchestratorError):
     pass
 
 
+class TLSObjectScope(str, Enum):
+    HOST = 'host'
+    SERVICE = 'service'
+    GLOBAL = 'global'
+    UNKNOWN = 'unknown'
+
+    def __str__(self) -> str:
+        return self.value
+
+
+class CertKeyPair(NamedTuple):
+    cert: str
+    key: str
+
+    def __bool__(self) -> bool:
+        # Treat the pair as truthy only if both cert and key are non-empty
+        return bool(self.cert and self.key)
+
+
+class TLSObjectTarget(NamedTuple):
+    service: Optional[str]
+    host: Optional[str]
+
+
+EMPTY_TLS_KEYPAIR = CertKeyPair('', '')
+
+
 class TLSObjectProtocol(Protocol):
     STORAGE_PREFIX: str
 
-    def __init__(self, cert: str = '', user_made: bool = False) -> None:
+    def __init__(self, cert: str = '', user_made: bool = False, editable: bool = False) -> None:
         ...
 
     def __bool__(self) -> bool:
@@ -29,9 +73,10 @@ class TLSObjectProtocol(Protocol):
 class Cert(TLSObjectProtocol):
     STORAGE_PREFIX = 'cert'
 
-    def __init__(self, cert: str = '', user_made: bool = False) -> None:
+    def __init__(self, cert: str = '', user_made: bool = False, editable: bool = False) -> None:
         self.cert = cert
         self.user_made = user_made
+        self.editable = editable
 
     def __bool__(self) -> bool:
         return bool(self.cert)
@@ -45,7 +90,8 @@ class Cert(TLSObjectProtocol):
         if (self):
             return {
                 'cert': self.cert,
-                'user_made': self.user_made
+                'user_made': self.user_made,
+                'editable': self.editable
             }
         else:
             return {}
@@ -57,28 +103,25 @@ class Cert(TLSObjectProtocol):
         cert = data['cert']
         if not isinstance(cert, str):
             raise TLSObjectException('Tried to make Cert object with non-string cert')
-        if any(k not in ['cert', 'user_made'] for k in data.keys()):
+        if any(k not in ['cert', 'user_made', 'editable'] for k in data.keys()):
             raise TLSObjectException(f'Got unknown field for Cert object. Fields: {data.keys()}')
-        user_made: Union[str, bool] = data.get('user_made', False)
-        if not isinstance(user_made, bool):
-            if isinstance(user_made, str):
-                if user_made.lower() == 'true':
-                    user_made = True
-                elif user_made.lower() == 'false':
-                    user_made = False
-            try:
-                user_made = bool(user_made)
-            except Exception:
-                raise TLSObjectException(f'Expected user_made field in Cert object to be bool but got {type(user_made)}')
-        return cls(cert=cert, user_made=user_made)
+
+        try:
+            user_made = parse_bool(data.get('user_made', False))
+            editable = parse_bool(data.get('editable', False))
+        except ValueError as e:
+            raise TLSObjectException(f'Expected field in Cert object to be bool but got another type: {e}')
+
+        return cls(cert=cert, user_made=user_made, editable=editable)
 
 
 class PrivKey(TLSObjectProtocol):
     STORAGE_PREFIX = 'key'
 
-    def __init__(self, key: str = '', user_made: bool = False) -> None:
+    def __init__(self, key: str = '', user_made: bool = False, editable: bool = False) -> None:
         self.key = key
         self.user_made = user_made
+        self.editable = editable
 
     def __bool__(self) -> bool:
         return bool(self.key)
@@ -92,7 +135,8 @@ class PrivKey(TLSObjectProtocol):
         if bool(self):
             return {
                 'key': self.key,
-                'user_made': self.user_made
+                'user_made': self.user_made,
+                'editable': self.editable
             }
         else:
             return {}
@@ -104,17 +148,13 @@ class PrivKey(TLSObjectProtocol):
         key = data['key']
         if not isinstance(key, str):
             raise TLSObjectException('Tried to make PrivKey object with non-string key')
-        if any(k not in ['key', 'user_made'] for k in data.keys()):
+        if any(k not in ['key', 'user_made', 'editable'] for k in data.keys()):
             raise TLSObjectException(f'Got unknown field for PrivKey object. Fields: {data.keys()}')
-        user_made: Union[str, bool] = data.get('user_made', False)
-        if not isinstance(user_made, bool):
-            if isinstance(user_made, str):
-                if user_made.lower() == 'true':
-                    user_made = True
-                elif user_made.lower() == 'false':
-                    user_made = False
-            try:
-                user_made = bool(user_made)
-            except Exception:
-                raise TLSObjectException(f'Expected user_made field in PrivKey object to be bool but got {type(user_made)}')
-        return cls(key=key, user_made=user_made)
+
+        try:
+            user_made = parse_bool(data.get('user_made', False))
+            editable = parse_bool(data.get('editable', False))
+        except ValueError as e:
+            raise TLSObjectException(f'Expected field in Cert object to be bool but got another type: {e}')
+
+        return cls(key=key, user_made=user_made, editable=editable)
index 6ea347f7c7ac796405bbf6f1917f814ae7a6040e..767c2a72015d21b2cd946dcdfad1313cee50a439 100644 (file)
@@ -590,7 +590,7 @@ class Orchestrator(object):
     def cert_store_cert_ls(self, show_details: bool = False) -> OrchResult[Dict[str, Any]]:
         raise NotImplementedError()
 
-    def cert_store_entity_ls(self) -> OrchResult[Dict[Any, Dict[str, List[str]]]]:
+    def cert_store_bindings_ls(self) -> OrchResult[Dict[Any, Dict[str, List[str]]]]:
         raise NotImplementedError()
 
     def cert_store_reload(self) -> OrchResult[str]: