]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: improving individual certificates checks
authorRedouane Kachach <rkachach@ibm.com>
Wed, 19 Feb 2025 16:36:15 +0000 (17:36 +0100)
committerRedouane Kachach <rkachach@ibm.com>
Tue, 11 Mar 2025 09:34:23 +0000 (10:34 +0100)
Some refactoring to improve the individual certificates checking and
use the same code for both cases: certs with and without keys are

Signed-off-by: Redouane Kachach <rkachach@ibm.com>
src/pybind/mgr/cephadm/cert_mgr.py
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tests/test_certmgr.py
src/pybind/mgr/orchestrator/_interface.py
src/pybind/mgr/orchestrator/module.py

index eb8b186a0d3a14ea1cd99383510b1f74a3a3e6f2..b1df36578a865005f995a1390cb08941839eb81a 100644 (file)
@@ -2,7 +2,7 @@ from typing import TYPE_CHECKING, Tuple, Union, List, Dict, Optional, cast, Any
 import logging
 
 from cephadm.ssl_cert_utils import SSLCerts, SSLConfigException
-from mgr_util import verify_tls, ServerConfigException
+from mgr_util import verify_tls, verify_cacrt_content, ServerConfigException
 from cephadm.ssl_cert_utils import get_certificate_info, get_private_key_info
 from cephadm.tlsobject_types import Cert, PrivKey
 from cephadm.tlsobject_store import TLSObjectStore, TLSObjectScope, TLSObjectException
@@ -328,7 +328,7 @@ class CertMgr:
             logger.warning(short_error_msg)
             self.mgr.set_health_warning(CertMgr.CEPHADM_CERTMGR_HEALTH_ERR, short_error_msg, total_issues, detailed_error_msgs)
 
-    def check_certificate_state(self, cert_name: str, target: str, cert: str, key: str) -> CertInfo:
+    def check_certificate_state(self, cert_name: str, target: str, cert: str, key: Optional[str] = None) -> CertInfo:
         """
         Checks if a certificate is valid and close to expiration.
 
@@ -339,17 +339,17 @@ class CertMgr:
             - exception_info: Details of any exception encountered during validation.
         """
         cert_obj = Cert(cert, True)
-        key_obj = PrivKey(key, True)
+        key_obj = PrivKey(key, True) if key else None
         return self._check_certificate_state(cert_name, target, cert_obj, key_obj)
 
-    def _check_certificate_state(self, cert_name: str, target: Optional[str], cert: Cert, key: PrivKey) -> CertInfo:
+    def _check_certificate_state(self, cert_name: str, target: Optional[str], cert: Cert, key: Optional[PrivKey] = None) -> CertInfo:
         """
         Checks if a certificate is valid and close to expiration.
 
         Returns: CertInfo
         """
         try:
-            days_to_expiration = verify_tls(cert.cert, key.key)
+            days_to_expiration = verify_tls(cert.cert, key.key) if key else verify_cacrt_content(cert.cert)
             is_close_to_expiration = days_to_expiration < self.mgr.certificate_renewal_threshold_days
             return CertInfo(cert_name, target, cert.user_made, True, is_close_to_expiration, days_to_expiration, "")
         except ServerConfigException as e:
@@ -393,9 +393,8 @@ class CertMgr:
 
     def get_problematic_certificates(self) -> List[Tuple[CertInfo, Cert]]:
 
-        def get_key(cert_name: str, target: Optional[str]) -> Optional[PrivKey]:
+        def get_key(cert_name: str, key_name: str, target: Optional[str]) -> Optional[PrivKey]:
             try:
-                key_name = cert_name.replace('_cert', '_key')
                 service_name, host = self.cert_store.determine_tlsobject_target(cert_name, target)
                 key = cast(PrivKey, self.key_store.get_tlsobject(key_name, service_name=service_name, host=host))
                 return key
@@ -407,21 +406,28 @@ class CertMgr:
         problematics_certs: List[Tuple[CertInfo, Cert]] = []
         for cert_name, cert_tlsobj, target in certs_tlsobjs:
             cert_obj = cast(Cert, cert_tlsobj)
-            key_obj = get_key(cert_name, target)
-            if cert_obj and key_obj:
+            if not cert_obj:
+                logger.error(f'Cannot find certificate {cert_name} in the TLSObjectStore')
+                continue
+
+            key_name = cert_name.replace('_cert', '_key')
+            key_obj = get_key(cert_name, key_name, target)
+            if key_obj:
+                # certificate has a key, let's check the cert/key pair
                 cert_info = self._check_certificate_state(cert_name, target, cert_obj, key_obj)
-                if not cert_info.is_operationally_valid():
-                    problematics_certs.append((cert_info, cert_obj))
-                else:
-                    target_info = f" ({target})" if target else ""
-                    logger.info(f'Certificate for "{cert_name}{target_info}" is still valid for {cert_info.days_to_expiration} days.')
-            elif cert_obj:
-                # Cert is present but key is None, could only happen if somebody has put manually a bad key!
-                logger.warning(f"Key is missing for certificate '{cert_name}'.")
+            elif key_name in self.known_keys:
+                # certificate is supposed to have a key but it's missing
+                logger.error(f"Key '{key_name}' is missing for certificate '{cert_name}'.")
                 cert_info = CertInfo(cert_name, target, cert_obj.user_made, False, False, 0, "missing key")
+            else:
+                # certificate has no associated key
+                cert_info = self._check_certificate_state(cert_name, target, cert_obj)
+
+            if not cert_info.is_operationally_valid():
                 problematics_certs.append((cert_info, cert_obj))
             else:
-                logger.error(f'Cannot get cert/key {cert_name}')
+                target_info = f" ({target})" if target else ""
+                logger.info(f'Certificate for "{cert_name}{target_info}" is still valid for {cert_info.days_to_expiration} days.')
 
         return problematics_certs
 
index f66853ed8dc71df65de62ed4c212293ae2a94c74..752d9ddfe80c9d11f5a8c95bdbfe86fc52a8e380 100644 (file)
@@ -58,7 +58,7 @@ from mgr_module import (
     NotifyType,
     MonCommandFailed,
 )
-from mgr_util import build_url, verify_cacrt_content, ServerConfigException
+from mgr_util import build_url
 import orchestrator
 from orchestrator.module import to_format, Format
 
@@ -710,6 +710,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self.cert_mgr.register_cert_key_pair('nvmeof', 'nvmeof_server_cert', 'nvmeof_server_key', TLSObjectScope.SERVICE)
         self.cert_mgr.register_cert_key_pair('nvmeof', 'nvmeof_client_cert', 'nvmeof_client_key', TLSObjectScope.SERVICE)
 
+        # register ancilary certificates/keys
         self.cert_mgr.register_cert('nvmeof', 'nvmeof_root_ca_cert', TLSObjectScope.SERVICE)
         self.cert_mgr.register_cert('rgw', 'rgw_frontend_ssl_cert', TLSObjectScope.SERVICE)
         self.cert_mgr.register_key('nvmeof', 'nvmeof_encryption_key', TLSObjectScope.SERVICE)
@@ -3291,18 +3292,16 @@ Then run the following:
     @handle_orch_error
     def cert_store_set_cert(
         self,
-        cert: str,
         cert_name: str,
-        service_name: Optional[str] = None,
-        hostname: Optional[str] = None,
+        cert: str,
+        service_name: str = "",
+        hostname: str = "",
     ) -> str:
 
-        try:
-            days_to_expiration = verify_cacrt_content(cert)
-            if days_to_expiration < self.certificate_renewal_threshold_days:
-                raise OrchestratorError(f'Error: Certificate is about to expire (Remaining days: {days_to_expiration})')
-        except ServerConfigException as e:
-            raise OrchestratorError(f'Error: Invalid certificate for {cert_name}: {e}')
+        target = service_name or hostname
+        cert_info = self.cert_mgr.check_certificate_state(cert_name, target, cert)
+        if not cert_info.is_operationally_valid():
+            raise OrchestratorError(cert_info.get_status_description())
 
         self.cert_mgr.save_cert(cert_name, cert, service_name, hostname, True)
         return f'Certificate for {cert_name} set correctly'
index ed88de49dd6f652116a99e580e9b57860db3ba72..6d20791a488205a9acdcb0f7245204ebef0125a3 100644 (file)
@@ -535,51 +535,91 @@ class TestCertMgr(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.get_store_prefix")
     def test_tlsobject_store_load(self, _get_store_prefix, cephadm_module: CephadmOrchestrator):
 
-        rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert'
-        grafana_host1_key = 'fake-grafana-host1-cert'
-        nvmeof_server_cert = 'nvmeof-server-cert'
-        nvmeof_client_cert = 'nvmeof-client-cert'
-        nvmeof_root_ca_cert = 'nvmeof-root-ca-cert'
-        nvmeof_server_key = 'nvmeof-server-key'
-        nvmeof_client_key = 'nvmeof-client-key'
-        nvmeof_encryption_key = 'nvmeof-encryption-key'
-        unknown_cert_entity = 'unknown_per_service_cert'
-        unknown_cert_key = 'unknown_per_service_key'
+        # Define certs and keys with their corresponding scopes
+        certs = {
+            'rgw_frontend_ssl_cert': ('rgw.foo', 'fake-rgw-cert', TLSObjectScope.SERVICE),
+            'nvmeof_server_cert': ('nvmeof.foo', 'nvmeof-server-cert', TLSObjectScope.SERVICE),
+            'nvmeof_client_cert': ('nvmeof.foo', 'nvmeof-client-cert', TLSObjectScope.SERVICE),
+            'nvmeof_root_ca_cert': ('nvmeof.foo', 'nvmeof-root-ca-cert', TLSObjectScope.SERVICE),
+            'ingress_ssl_cert': ('ingress', 'ingress-ssl-cert', TLSObjectScope.SERVICE),
+            'iscsi_ssl_cert': ('iscsi', 'iscsi-ssl-cert', TLSObjectScope.SERVICE),
+            'grafana_cert': ('host1', 'grafana-cert', TLSObjectScope.HOST),
+            'mgmt_gw_cert': ('mgmt-gateway', 'mgmt-gw-cert', TLSObjectScope.GLOBAL),
+            'oauth2_proxy_cert': ('oauth2-proxy', 'oauth2-proxy-cert', TLSObjectScope.GLOBAL),
+        }
+        unknown_certs = {
+            'unknown_per_service_cert': ('unknown-svc.foo', 'unknown-cert', TLSObjectScope.SERVICE),
+            'unknown_per_host_cert': ('unknown-host.foo', 'unknown-cert', TLSObjectScope.HOST),
+            'unknown_global_cert': ('unknown-global.foo', 'unknown-cert', TLSObjectScope.GLOBAL),
+            'cert_with_unknown_scope': ('unknown-global.foo', 'unknown-cert', TLSObjectScope.UNKNOWN),
+        }
 
+        keys = {
+            'grafana_key': ('host1', 'fake-grafana-host1-key', TLSObjectScope.HOST),
+            'nvmeof_server_key': ('nvmeof.foo', 'nvmeof-server-key', TLSObjectScope.SERVICE),
+            'nvmeof_client_key': ('nvmeof.foo', 'nvmeof-client-key', TLSObjectScope.SERVICE),
+            'nvmeof_encryption_key': ('nvmeof.foo', 'nvmeof-encryption-key', TLSObjectScope.SERVICE),
+            'mgmt_gw_key': ('mgmt-gateway', 'mgmt-gw-key', TLSObjectScope.GLOBAL),
+            'oauth2_proxy_key': ('oauth2-proxy', 'oauth2-proxy-key', TLSObjectScope.GLOBAL),
+            'ingress_ssl_key': ('ingress', 'ingress-ssl-key', TLSObjectScope.SERVICE),
+            'iscsi_ssl_key': ('iscsi', 'iscsi-ssl-key', TLSObjectScope.SERVICE),
+        }
+        unknown_keys = {
+            'unknown_per_service_key': ('unknown-svc.foo', 'unknown-key', TLSObjectScope.SERVICE),
+            'unknown_per_host_key': ('unknown-host.foo', 'unknown-key', TLSObjectScope.HOST),
+            'unknown_global_key': ('unknown-global.foo', 'unknown-key', TLSObjectScope.GLOBAL),
+            'key_with_unknown_scope': ('unknown-global.foo', 'unknown-key', TLSObjectScope.UNKNOWN),
+        }
+
+        # Mock function to simulate store behavior
         def _fake_prefix_store(key):
+            from itertools import chain
             if key == 'cert_store.cert.':
                 return {
-                    f'{TLSOBJECT_STORE_CERT_PREFIX}rgw_frontend_ssl_cert': json.dumps({'rgw.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()}),
-                    f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_server_cert': json.dumps({'nvmeof.foo': Cert(nvmeof_server_cert, True).to_json()}),
-                    f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_client_cert': json.dumps({'nvmeof.foo': Cert(nvmeof_client_cert, True).to_json()}),
-                    f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_root_ca_cert': json.dumps({'nvmeof.foo': Cert(nvmeof_root_ca_cert, True).to_json()}),
-                    f'{TLSOBJECT_STORE_CERT_PREFIX}{unknown_cert_entity}': json.dumps({'unkonwn.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()}),
+                    f'{TLSOBJECT_STORE_CERT_PREFIX}{cert_name}': json.dumps(
+                        {target: Cert(cert_value, True).to_json()} if scope != TLSObjectScope.GLOBAL
+                        else Cert(cert_value, True).to_json()
+                    )
+                    for cert_name, (target, cert_value, scope) in chain(certs.items(), unknown_certs.items())
                 }
             elif key == 'cert_store.key.':
                 return {
-                    f'{TLSOBJECT_STORE_KEY_PREFIX}grafana_key': json.dumps({'host1': PrivKey(grafana_host1_key).to_json()}),
-                    f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_server_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_server_key).to_json()}),
-                    f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_client_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_client_key).to_json()}),
-                    f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_encryption_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_encryption_key).to_json()}),
-                    f'{TLSOBJECT_STORE_KEY_PREFIX}{unknown_cert_key}': json.dumps({'unkonwn.foo': PrivKey(nvmeof_encryption_key).to_json()}),
+                    f'{TLSOBJECT_STORE_KEY_PREFIX}{key_name}': json.dumps(
+                        {target: PrivKey(key_value).to_json()} if scope != TLSObjectScope.GLOBAL
+                        else PrivKey(key_value).to_json()
+                    )
+                    for key_name, (target, key_value, scope) in chain(keys.items(), unknown_keys.items())
                 }
             else:
-                raise Exception(f'Get store with unexpected value {key}')
+                raise Exception(f'Unexpected key access in store: {key}')
 
+        # Inject the mock store behavior and the cert manager
         _get_store_prefix.side_effect = _fake_prefix_store
         cephadm_module._init_cert_mgr()
 
-        assert cephadm_module.cert_mgr.cert_store.known_entities['rgw_frontend_ssl_cert']['rgw.foo'] == Cert(rgw_frontend_rgw_foo_host2_cert, True)
-        assert cephadm_module.cert_mgr.cert_store.known_entities['nvmeof_server_cert']['nvmeof.foo'] == Cert(nvmeof_server_cert, True)
-        assert cephadm_module.cert_mgr.cert_store.known_entities['nvmeof_client_cert']['nvmeof.foo'] == Cert(nvmeof_client_cert, True)
-        assert cephadm_module.cert_mgr.cert_store.known_entities['nvmeof_root_ca_cert']['nvmeof.foo'] == Cert(nvmeof_root_ca_cert, True)
-        assert cephadm_module.cert_mgr.key_store.known_entities['grafana_key']['host1'] == PrivKey(grafana_host1_key)
-        assert unknown_cert_entity not in cephadm_module.cert_mgr.cert_store.known_entities
-
-        assert cephadm_module.cert_mgr.key_store.known_entities['nvmeof_server_key']['nvmeof.foo'] == PrivKey(nvmeof_server_key)
-        assert cephadm_module.cert_mgr.key_store.known_entities['nvmeof_client_key']['nvmeof.foo'] == PrivKey(nvmeof_client_key)
-        assert cephadm_module.cert_mgr.key_store.known_entities['nvmeof_encryption_key']['nvmeof.foo'] == PrivKey(nvmeof_encryption_key)
-        assert unknown_cert_key not in cephadm_module.cert_mgr.key_store.known_entities
+        # Validate certificates in cert_store
+        for cert_name, (target, cert_value, scope) in certs.items():
+            assert cert_name in cephadm_module.cert_mgr.cert_store.known_entities
+            if scope == TLSObjectScope.GLOBAL:
+                assert cephadm_module.cert_mgr.cert_store.known_entities[cert_name] == Cert(cert_value, True)
+            else:
+                assert cephadm_module.cert_mgr.cert_store.known_entities[cert_name][target] == Cert(cert_value, True)
+
+        # Validate keys in key_store
+        for key_name, (target, key_value, scope) in keys.items():
+            assert key_name in cephadm_module.cert_mgr.key_store.known_entities
+            if scope == TLSObjectScope.GLOBAL:
+                assert cephadm_module.cert_mgr.key_store.known_entities[key_name] == PrivKey(key_value)
+            else:
+                assert cephadm_module.cert_mgr.key_store.known_entities[key_name][target] == PrivKey(key_value)
+
+        # Check unknown certificates are not loaded
+        for unknown_cert in unknown_certs:
+            assert unknown_cert not in cephadm_module.cert_mgr.cert_store.known_entities
+
+        # Check unknown keys are not loaded
+        for unknown_key in unknown_keys:
+            assert unknown_key not in cephadm_module.cert_mgr.key_store.known_entities
 
     def test_tlsobject_store_get_cert_key(self, cephadm_module: CephadmOrchestrator):
 
index 3745d8805524a849098a8e4d65c2055195951d3e..e9b19a93583c5b162d1e2b848bfe4cacb98c998e 100644 (file)
@@ -607,8 +607,8 @@ class Orchestrator(object):
 
     def cert_store_set_cert(
         self,
-        cert: str,
         cert_name: str,
+        cert: str,
         service_name: Optional[str] = None,
         hostname: Optional[str] = None,
     ) -> OrchResult[str]:
index c888519b9d786593b465e41a79179144beed3d5e..d362bbcabb585663d67b6a535984176bdd211b1c 100644 (file)
@@ -1287,8 +1287,8 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule,
             raise OrchestratorError('This command requires passing a certificate using --cert parameter or "-i <filepath>" option')
 
         completion = self.cert_store_set_cert(
-            cert_content,
             cert_name,
+            cert_content,
             service_name,
             hostname,
         )