From: Redouane Kachach Date: Wed, 19 Feb 2025 16:36:15 +0000 (+0100) Subject: mgr/cephadm: improving individual certificates checks X-Git-Tag: v20.3.0~386^2~5 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=eae45d208a6f97f3abc8bcc0df06f7ca783b52da;p=ceph.git mgr/cephadm: improving individual certificates checks Some refactoring to improve the individual certificates checking and use the same code for both cases: certs with and without keys are Signed-off-by: Redouane Kachach --- diff --git a/src/pybind/mgr/cephadm/cert_mgr.py b/src/pybind/mgr/cephadm/cert_mgr.py index eb8b186a0d3a1..b1df36578a865 100644 --- a/src/pybind/mgr/cephadm/cert_mgr.py +++ b/src/pybind/mgr/cephadm/cert_mgr.py @@ -2,7 +2,7 @@ from typing import TYPE_CHECKING, Tuple, Union, List, Dict, Optional, cast, Any import logging from cephadm.ssl_cert_utils import SSLCerts, SSLConfigException -from mgr_util import verify_tls, ServerConfigException +from mgr_util import verify_tls, verify_cacrt_content, ServerConfigException from cephadm.ssl_cert_utils import get_certificate_info, get_private_key_info from cephadm.tlsobject_types import Cert, PrivKey from cephadm.tlsobject_store import TLSObjectStore, TLSObjectScope, TLSObjectException @@ -328,7 +328,7 @@ class CertMgr: logger.warning(short_error_msg) self.mgr.set_health_warning(CertMgr.CEPHADM_CERTMGR_HEALTH_ERR, short_error_msg, total_issues, detailed_error_msgs) - def check_certificate_state(self, cert_name: str, target: str, cert: str, key: str) -> CertInfo: + def check_certificate_state(self, cert_name: str, target: str, cert: str, key: Optional[str] = None) -> CertInfo: """ Checks if a certificate is valid and close to expiration. @@ -339,17 +339,17 @@ class CertMgr: - exception_info: Details of any exception encountered during validation. """ cert_obj = Cert(cert, True) - key_obj = PrivKey(key, True) + key_obj = PrivKey(key, True) if key else None return self._check_certificate_state(cert_name, target, cert_obj, key_obj) - def _check_certificate_state(self, cert_name: str, target: Optional[str], cert: Cert, key: PrivKey) -> CertInfo: + def _check_certificate_state(self, cert_name: str, target: Optional[str], cert: Cert, key: Optional[PrivKey] = None) -> CertInfo: """ Checks if a certificate is valid and close to expiration. Returns: CertInfo """ try: - days_to_expiration = verify_tls(cert.cert, key.key) + days_to_expiration = verify_tls(cert.cert, key.key) if key else verify_cacrt_content(cert.cert) is_close_to_expiration = days_to_expiration < self.mgr.certificate_renewal_threshold_days return CertInfo(cert_name, target, cert.user_made, True, is_close_to_expiration, days_to_expiration, "") except ServerConfigException as e: @@ -393,9 +393,8 @@ class CertMgr: def get_problematic_certificates(self) -> List[Tuple[CertInfo, Cert]]: - def get_key(cert_name: str, target: Optional[str]) -> Optional[PrivKey]: + def get_key(cert_name: str, key_name: str, target: Optional[str]) -> Optional[PrivKey]: try: - key_name = cert_name.replace('_cert', '_key') service_name, host = self.cert_store.determine_tlsobject_target(cert_name, target) key = cast(PrivKey, self.key_store.get_tlsobject(key_name, service_name=service_name, host=host)) return key @@ -407,21 +406,28 @@ class CertMgr: problematics_certs: List[Tuple[CertInfo, Cert]] = [] for cert_name, cert_tlsobj, target in certs_tlsobjs: cert_obj = cast(Cert, cert_tlsobj) - key_obj = get_key(cert_name, target) - if cert_obj and key_obj: + if not cert_obj: + logger.error(f'Cannot find certificate {cert_name} in the TLSObjectStore') + continue + + key_name = cert_name.replace('_cert', '_key') + key_obj = get_key(cert_name, key_name, target) + if key_obj: + # certificate has a key, let's check the cert/key pair cert_info = self._check_certificate_state(cert_name, target, cert_obj, key_obj) - if not cert_info.is_operationally_valid(): - problematics_certs.append((cert_info, cert_obj)) - else: - target_info = f" ({target})" if target else "" - logger.info(f'Certificate for "{cert_name}{target_info}" is still valid for {cert_info.days_to_expiration} days.') - elif cert_obj: - # Cert is present but key is None, could only happen if somebody has put manually a bad key! - logger.warning(f"Key is missing for certificate '{cert_name}'.") + elif key_name in self.known_keys: + # certificate is supposed to have a key but it's missing + logger.error(f"Key '{key_name}' is missing for certificate '{cert_name}'.") cert_info = CertInfo(cert_name, target, cert_obj.user_made, False, False, 0, "missing key") + else: + # certificate has no associated key + cert_info = self._check_certificate_state(cert_name, target, cert_obj) + + if not cert_info.is_operationally_valid(): problematics_certs.append((cert_info, cert_obj)) else: - logger.error(f'Cannot get cert/key {cert_name}') + target_info = f" ({target})" if target else "" + logger.info(f'Certificate for "{cert_name}{target_info}" is still valid for {cert_info.days_to_expiration} days.') return problematics_certs diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index f66853ed8dc71..752d9ddfe80c9 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -58,7 +58,7 @@ from mgr_module import ( NotifyType, MonCommandFailed, ) -from mgr_util import build_url, verify_cacrt_content, ServerConfigException +from mgr_util import build_url import orchestrator from orchestrator.module import to_format, Format @@ -710,6 +710,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.cert_mgr.register_cert_key_pair('nvmeof', 'nvmeof_server_cert', 'nvmeof_server_key', TLSObjectScope.SERVICE) self.cert_mgr.register_cert_key_pair('nvmeof', 'nvmeof_client_cert', 'nvmeof_client_key', TLSObjectScope.SERVICE) + # register ancilary certificates/keys self.cert_mgr.register_cert('nvmeof', 'nvmeof_root_ca_cert', TLSObjectScope.SERVICE) self.cert_mgr.register_cert('rgw', 'rgw_frontend_ssl_cert', TLSObjectScope.SERVICE) self.cert_mgr.register_key('nvmeof', 'nvmeof_encryption_key', TLSObjectScope.SERVICE) @@ -3291,18 +3292,16 @@ Then run the following: @handle_orch_error def cert_store_set_cert( self, - cert: str, cert_name: str, - service_name: Optional[str] = None, - hostname: Optional[str] = None, + cert: str, + service_name: str = "", + hostname: str = "", ) -> str: - try: - days_to_expiration = verify_cacrt_content(cert) - if days_to_expiration < self.certificate_renewal_threshold_days: - raise OrchestratorError(f'Error: Certificate is about to expire (Remaining days: {days_to_expiration})') - except ServerConfigException as e: - raise OrchestratorError(f'Error: Invalid certificate for {cert_name}: {e}') + target = service_name or hostname + cert_info = self.cert_mgr.check_certificate_state(cert_name, target, cert) + if not cert_info.is_operationally_valid(): + raise OrchestratorError(cert_info.get_status_description()) self.cert_mgr.save_cert(cert_name, cert, service_name, hostname, True) return f'Certificate for {cert_name} set correctly' diff --git a/src/pybind/mgr/cephadm/tests/test_certmgr.py b/src/pybind/mgr/cephadm/tests/test_certmgr.py index ed88de49dd6f6..6d20791a48820 100644 --- a/src/pybind/mgr/cephadm/tests/test_certmgr.py +++ b/src/pybind/mgr/cephadm/tests/test_certmgr.py @@ -535,51 +535,91 @@ class TestCertMgr(object): @mock.patch("cephadm.module.CephadmOrchestrator.get_store_prefix") def test_tlsobject_store_load(self, _get_store_prefix, cephadm_module: CephadmOrchestrator): - rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert' - grafana_host1_key = 'fake-grafana-host1-cert' - nvmeof_server_cert = 'nvmeof-server-cert' - nvmeof_client_cert = 'nvmeof-client-cert' - nvmeof_root_ca_cert = 'nvmeof-root-ca-cert' - nvmeof_server_key = 'nvmeof-server-key' - nvmeof_client_key = 'nvmeof-client-key' - nvmeof_encryption_key = 'nvmeof-encryption-key' - unknown_cert_entity = 'unknown_per_service_cert' - unknown_cert_key = 'unknown_per_service_key' + # Define certs and keys with their corresponding scopes + certs = { + 'rgw_frontend_ssl_cert': ('rgw.foo', 'fake-rgw-cert', TLSObjectScope.SERVICE), + 'nvmeof_server_cert': ('nvmeof.foo', 'nvmeof-server-cert', TLSObjectScope.SERVICE), + 'nvmeof_client_cert': ('nvmeof.foo', 'nvmeof-client-cert', TLSObjectScope.SERVICE), + 'nvmeof_root_ca_cert': ('nvmeof.foo', 'nvmeof-root-ca-cert', TLSObjectScope.SERVICE), + 'ingress_ssl_cert': ('ingress', 'ingress-ssl-cert', TLSObjectScope.SERVICE), + 'iscsi_ssl_cert': ('iscsi', 'iscsi-ssl-cert', TLSObjectScope.SERVICE), + 'grafana_cert': ('host1', 'grafana-cert', TLSObjectScope.HOST), + 'mgmt_gw_cert': ('mgmt-gateway', 'mgmt-gw-cert', TLSObjectScope.GLOBAL), + 'oauth2_proxy_cert': ('oauth2-proxy', 'oauth2-proxy-cert', TLSObjectScope.GLOBAL), + } + unknown_certs = { + 'unknown_per_service_cert': ('unknown-svc.foo', 'unknown-cert', TLSObjectScope.SERVICE), + 'unknown_per_host_cert': ('unknown-host.foo', 'unknown-cert', TLSObjectScope.HOST), + 'unknown_global_cert': ('unknown-global.foo', 'unknown-cert', TLSObjectScope.GLOBAL), + 'cert_with_unknown_scope': ('unknown-global.foo', 'unknown-cert', TLSObjectScope.UNKNOWN), + } + keys = { + 'grafana_key': ('host1', 'fake-grafana-host1-key', TLSObjectScope.HOST), + 'nvmeof_server_key': ('nvmeof.foo', 'nvmeof-server-key', TLSObjectScope.SERVICE), + 'nvmeof_client_key': ('nvmeof.foo', 'nvmeof-client-key', TLSObjectScope.SERVICE), + 'nvmeof_encryption_key': ('nvmeof.foo', 'nvmeof-encryption-key', TLSObjectScope.SERVICE), + 'mgmt_gw_key': ('mgmt-gateway', 'mgmt-gw-key', TLSObjectScope.GLOBAL), + 'oauth2_proxy_key': ('oauth2-proxy', 'oauth2-proxy-key', TLSObjectScope.GLOBAL), + 'ingress_ssl_key': ('ingress', 'ingress-ssl-key', TLSObjectScope.SERVICE), + 'iscsi_ssl_key': ('iscsi', 'iscsi-ssl-key', TLSObjectScope.SERVICE), + } + unknown_keys = { + 'unknown_per_service_key': ('unknown-svc.foo', 'unknown-key', TLSObjectScope.SERVICE), + 'unknown_per_host_key': ('unknown-host.foo', 'unknown-key', TLSObjectScope.HOST), + 'unknown_global_key': ('unknown-global.foo', 'unknown-key', TLSObjectScope.GLOBAL), + 'key_with_unknown_scope': ('unknown-global.foo', 'unknown-key', TLSObjectScope.UNKNOWN), + } + + # Mock function to simulate store behavior def _fake_prefix_store(key): + from itertools import chain if key == 'cert_store.cert.': return { - f'{TLSOBJECT_STORE_CERT_PREFIX}rgw_frontend_ssl_cert': json.dumps({'rgw.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()}), - f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_server_cert': json.dumps({'nvmeof.foo': Cert(nvmeof_server_cert, True).to_json()}), - f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_client_cert': json.dumps({'nvmeof.foo': Cert(nvmeof_client_cert, True).to_json()}), - f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_root_ca_cert': json.dumps({'nvmeof.foo': Cert(nvmeof_root_ca_cert, True).to_json()}), - f'{TLSOBJECT_STORE_CERT_PREFIX}{unknown_cert_entity}': json.dumps({'unkonwn.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()}), + f'{TLSOBJECT_STORE_CERT_PREFIX}{cert_name}': json.dumps( + {target: Cert(cert_value, True).to_json()} if scope != TLSObjectScope.GLOBAL + else Cert(cert_value, True).to_json() + ) + for cert_name, (target, cert_value, scope) in chain(certs.items(), unknown_certs.items()) } elif key == 'cert_store.key.': return { - f'{TLSOBJECT_STORE_KEY_PREFIX}grafana_key': json.dumps({'host1': PrivKey(grafana_host1_key).to_json()}), - f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_server_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_server_key).to_json()}), - f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_client_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_client_key).to_json()}), - f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_encryption_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_encryption_key).to_json()}), - f'{TLSOBJECT_STORE_KEY_PREFIX}{unknown_cert_key}': json.dumps({'unkonwn.foo': PrivKey(nvmeof_encryption_key).to_json()}), + f'{TLSOBJECT_STORE_KEY_PREFIX}{key_name}': json.dumps( + {target: PrivKey(key_value).to_json()} if scope != TLSObjectScope.GLOBAL + else PrivKey(key_value).to_json() + ) + for key_name, (target, key_value, scope) in chain(keys.items(), unknown_keys.items()) } else: - raise Exception(f'Get store with unexpected value {key}') + raise Exception(f'Unexpected key access in store: {key}') + # Inject the mock store behavior and the cert manager _get_store_prefix.side_effect = _fake_prefix_store cephadm_module._init_cert_mgr() - assert cephadm_module.cert_mgr.cert_store.known_entities['rgw_frontend_ssl_cert']['rgw.foo'] == Cert(rgw_frontend_rgw_foo_host2_cert, True) - assert cephadm_module.cert_mgr.cert_store.known_entities['nvmeof_server_cert']['nvmeof.foo'] == Cert(nvmeof_server_cert, True) - assert cephadm_module.cert_mgr.cert_store.known_entities['nvmeof_client_cert']['nvmeof.foo'] == Cert(nvmeof_client_cert, True) - assert cephadm_module.cert_mgr.cert_store.known_entities['nvmeof_root_ca_cert']['nvmeof.foo'] == Cert(nvmeof_root_ca_cert, True) - assert cephadm_module.cert_mgr.key_store.known_entities['grafana_key']['host1'] == PrivKey(grafana_host1_key) - assert unknown_cert_entity not in cephadm_module.cert_mgr.cert_store.known_entities - - assert cephadm_module.cert_mgr.key_store.known_entities['nvmeof_server_key']['nvmeof.foo'] == PrivKey(nvmeof_server_key) - assert cephadm_module.cert_mgr.key_store.known_entities['nvmeof_client_key']['nvmeof.foo'] == PrivKey(nvmeof_client_key) - assert cephadm_module.cert_mgr.key_store.known_entities['nvmeof_encryption_key']['nvmeof.foo'] == PrivKey(nvmeof_encryption_key) - assert unknown_cert_key not in cephadm_module.cert_mgr.key_store.known_entities + # Validate certificates in cert_store + for cert_name, (target, cert_value, scope) in certs.items(): + assert cert_name in cephadm_module.cert_mgr.cert_store.known_entities + if scope == TLSObjectScope.GLOBAL: + assert cephadm_module.cert_mgr.cert_store.known_entities[cert_name] == Cert(cert_value, True) + else: + assert cephadm_module.cert_mgr.cert_store.known_entities[cert_name][target] == Cert(cert_value, True) + + # Validate keys in key_store + for key_name, (target, key_value, scope) in keys.items(): + assert key_name in cephadm_module.cert_mgr.key_store.known_entities + if scope == TLSObjectScope.GLOBAL: + assert cephadm_module.cert_mgr.key_store.known_entities[key_name] == PrivKey(key_value) + else: + assert cephadm_module.cert_mgr.key_store.known_entities[key_name][target] == PrivKey(key_value) + + # Check unknown certificates are not loaded + for unknown_cert in unknown_certs: + assert unknown_cert not in cephadm_module.cert_mgr.cert_store.known_entities + + # Check unknown keys are not loaded + for unknown_key in unknown_keys: + assert unknown_key not in cephadm_module.cert_mgr.key_store.known_entities def test_tlsobject_store_get_cert_key(self, cephadm_module: CephadmOrchestrator): diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index 3745d8805524a..e9b19a93583c5 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -607,8 +607,8 @@ class Orchestrator(object): def cert_store_set_cert( self, - cert: str, cert_name: str, + cert: str, service_name: Optional[str] = None, hostname: Optional[str] = None, ) -> OrchResult[str]: diff --git a/src/pybind/mgr/orchestrator/module.py b/src/pybind/mgr/orchestrator/module.py index c888519b9d786..d362bbcabb585 100644 --- a/src/pybind/mgr/orchestrator/module.py +++ b/src/pybind/mgr/orchestrator/module.py @@ -1287,8 +1287,8 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule, raise OrchestratorError('This command requires passing a certificate using --cert parameter or "-i " option') completion = self.cert_store_set_cert( - cert_content, cert_name, + cert_content, service_name, hostname, )