import logging
from cephadm.ssl_cert_utils import SSLCerts, SSLConfigException
-from mgr_util import verify_tls, ServerConfigException
+from mgr_util import verify_tls, verify_cacrt_content, ServerConfigException
from cephadm.ssl_cert_utils import get_certificate_info, get_private_key_info
from cephadm.tlsobject_types import Cert, PrivKey
from cephadm.tlsobject_store import TLSObjectStore, TLSObjectScope, TLSObjectException
logger.warning(short_error_msg)
self.mgr.set_health_warning(CertMgr.CEPHADM_CERTMGR_HEALTH_ERR, short_error_msg, total_issues, detailed_error_msgs)
- def check_certificate_state(self, cert_name: str, target: str, cert: str, key: str) -> CertInfo:
+ def check_certificate_state(self, cert_name: str, target: str, cert: str, key: Optional[str] = None) -> CertInfo:
"""
Checks if a certificate is valid and close to expiration.
- exception_info: Details of any exception encountered during validation.
"""
cert_obj = Cert(cert, True)
- key_obj = PrivKey(key, True)
+ key_obj = PrivKey(key, True) if key else None
return self._check_certificate_state(cert_name, target, cert_obj, key_obj)
- def _check_certificate_state(self, cert_name: str, target: Optional[str], cert: Cert, key: PrivKey) -> CertInfo:
+ def _check_certificate_state(self, cert_name: str, target: Optional[str], cert: Cert, key: Optional[PrivKey] = None) -> CertInfo:
"""
Checks if a certificate is valid and close to expiration.
Returns: CertInfo
"""
try:
- days_to_expiration = verify_tls(cert.cert, key.key)
+ days_to_expiration = verify_tls(cert.cert, key.key) if key else verify_cacrt_content(cert.cert)
is_close_to_expiration = days_to_expiration < self.mgr.certificate_renewal_threshold_days
return CertInfo(cert_name, target, cert.user_made, True, is_close_to_expiration, days_to_expiration, "")
except ServerConfigException as e:
def get_problematic_certificates(self) -> List[Tuple[CertInfo, Cert]]:
- def get_key(cert_name: str, target: Optional[str]) -> Optional[PrivKey]:
+ def get_key(cert_name: str, key_name: str, target: Optional[str]) -> Optional[PrivKey]:
try:
- key_name = cert_name.replace('_cert', '_key')
service_name, host = self.cert_store.determine_tlsobject_target(cert_name, target)
key = cast(PrivKey, self.key_store.get_tlsobject(key_name, service_name=service_name, host=host))
return key
problematics_certs: List[Tuple[CertInfo, Cert]] = []
for cert_name, cert_tlsobj, target in certs_tlsobjs:
cert_obj = cast(Cert, cert_tlsobj)
- key_obj = get_key(cert_name, target)
- if cert_obj and key_obj:
+ if not cert_obj:
+ logger.error(f'Cannot find certificate {cert_name} in the TLSObjectStore')
+ continue
+
+ key_name = cert_name.replace('_cert', '_key')
+ key_obj = get_key(cert_name, key_name, target)
+ if key_obj:
+ # certificate has a key, let's check the cert/key pair
cert_info = self._check_certificate_state(cert_name, target, cert_obj, key_obj)
- if not cert_info.is_operationally_valid():
- problematics_certs.append((cert_info, cert_obj))
- else:
- target_info = f" ({target})" if target else ""
- logger.info(f'Certificate for "{cert_name}{target_info}" is still valid for {cert_info.days_to_expiration} days.')
- elif cert_obj:
- # Cert is present but key is None, could only happen if somebody has put manually a bad key!
- logger.warning(f"Key is missing for certificate '{cert_name}'.")
+ elif key_name in self.known_keys:
+ # certificate is supposed to have a key but it's missing
+ logger.error(f"Key '{key_name}' is missing for certificate '{cert_name}'.")
cert_info = CertInfo(cert_name, target, cert_obj.user_made, False, False, 0, "missing key")
+ else:
+ # certificate has no associated key
+ cert_info = self._check_certificate_state(cert_name, target, cert_obj)
+
+ if not cert_info.is_operationally_valid():
problematics_certs.append((cert_info, cert_obj))
else:
- logger.error(f'Cannot get cert/key {cert_name}')
+ target_info = f" ({target})" if target else ""
+ logger.info(f'Certificate for "{cert_name}{target_info}" is still valid for {cert_info.days_to_expiration} days.')
return problematics_certs
NotifyType,
MonCommandFailed,
)
-from mgr_util import build_url, verify_cacrt_content, ServerConfigException
+from mgr_util import build_url
import orchestrator
from orchestrator.module import to_format, Format
self.cert_mgr.register_cert_key_pair('nvmeof', 'nvmeof_server_cert', 'nvmeof_server_key', TLSObjectScope.SERVICE)
self.cert_mgr.register_cert_key_pair('nvmeof', 'nvmeof_client_cert', 'nvmeof_client_key', TLSObjectScope.SERVICE)
+ # register ancilary certificates/keys
self.cert_mgr.register_cert('nvmeof', 'nvmeof_root_ca_cert', TLSObjectScope.SERVICE)
self.cert_mgr.register_cert('rgw', 'rgw_frontend_ssl_cert', TLSObjectScope.SERVICE)
self.cert_mgr.register_key('nvmeof', 'nvmeof_encryption_key', TLSObjectScope.SERVICE)
@handle_orch_error
def cert_store_set_cert(
self,
- cert: str,
cert_name: str,
- service_name: Optional[str] = None,
- hostname: Optional[str] = None,
+ cert: str,
+ service_name: str = "",
+ hostname: str = "",
) -> str:
- try:
- days_to_expiration = verify_cacrt_content(cert)
- if days_to_expiration < self.certificate_renewal_threshold_days:
- raise OrchestratorError(f'Error: Certificate is about to expire (Remaining days: {days_to_expiration})')
- except ServerConfigException as e:
- raise OrchestratorError(f'Error: Invalid certificate for {cert_name}: {e}')
+ target = service_name or hostname
+ cert_info = self.cert_mgr.check_certificate_state(cert_name, target, cert)
+ if not cert_info.is_operationally_valid():
+ raise OrchestratorError(cert_info.get_status_description())
self.cert_mgr.save_cert(cert_name, cert, service_name, hostname, True)
return f'Certificate for {cert_name} set correctly'
@mock.patch("cephadm.module.CephadmOrchestrator.get_store_prefix")
def test_tlsobject_store_load(self, _get_store_prefix, cephadm_module: CephadmOrchestrator):
- rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert'
- grafana_host1_key = 'fake-grafana-host1-cert'
- nvmeof_server_cert = 'nvmeof-server-cert'
- nvmeof_client_cert = 'nvmeof-client-cert'
- nvmeof_root_ca_cert = 'nvmeof-root-ca-cert'
- nvmeof_server_key = 'nvmeof-server-key'
- nvmeof_client_key = 'nvmeof-client-key'
- nvmeof_encryption_key = 'nvmeof-encryption-key'
- unknown_cert_entity = 'unknown_per_service_cert'
- unknown_cert_key = 'unknown_per_service_key'
+ # Define certs and keys with their corresponding scopes
+ certs = {
+ 'rgw_frontend_ssl_cert': ('rgw.foo', 'fake-rgw-cert', TLSObjectScope.SERVICE),
+ 'nvmeof_server_cert': ('nvmeof.foo', 'nvmeof-server-cert', TLSObjectScope.SERVICE),
+ 'nvmeof_client_cert': ('nvmeof.foo', 'nvmeof-client-cert', TLSObjectScope.SERVICE),
+ 'nvmeof_root_ca_cert': ('nvmeof.foo', 'nvmeof-root-ca-cert', TLSObjectScope.SERVICE),
+ 'ingress_ssl_cert': ('ingress', 'ingress-ssl-cert', TLSObjectScope.SERVICE),
+ 'iscsi_ssl_cert': ('iscsi', 'iscsi-ssl-cert', TLSObjectScope.SERVICE),
+ 'grafana_cert': ('host1', 'grafana-cert', TLSObjectScope.HOST),
+ 'mgmt_gw_cert': ('mgmt-gateway', 'mgmt-gw-cert', TLSObjectScope.GLOBAL),
+ 'oauth2_proxy_cert': ('oauth2-proxy', 'oauth2-proxy-cert', TLSObjectScope.GLOBAL),
+ }
+ unknown_certs = {
+ 'unknown_per_service_cert': ('unknown-svc.foo', 'unknown-cert', TLSObjectScope.SERVICE),
+ 'unknown_per_host_cert': ('unknown-host.foo', 'unknown-cert', TLSObjectScope.HOST),
+ 'unknown_global_cert': ('unknown-global.foo', 'unknown-cert', TLSObjectScope.GLOBAL),
+ 'cert_with_unknown_scope': ('unknown-global.foo', 'unknown-cert', TLSObjectScope.UNKNOWN),
+ }
+ keys = {
+ 'grafana_key': ('host1', 'fake-grafana-host1-key', TLSObjectScope.HOST),
+ 'nvmeof_server_key': ('nvmeof.foo', 'nvmeof-server-key', TLSObjectScope.SERVICE),
+ 'nvmeof_client_key': ('nvmeof.foo', 'nvmeof-client-key', TLSObjectScope.SERVICE),
+ 'nvmeof_encryption_key': ('nvmeof.foo', 'nvmeof-encryption-key', TLSObjectScope.SERVICE),
+ 'mgmt_gw_key': ('mgmt-gateway', 'mgmt-gw-key', TLSObjectScope.GLOBAL),
+ 'oauth2_proxy_key': ('oauth2-proxy', 'oauth2-proxy-key', TLSObjectScope.GLOBAL),
+ 'ingress_ssl_key': ('ingress', 'ingress-ssl-key', TLSObjectScope.SERVICE),
+ 'iscsi_ssl_key': ('iscsi', 'iscsi-ssl-key', TLSObjectScope.SERVICE),
+ }
+ unknown_keys = {
+ 'unknown_per_service_key': ('unknown-svc.foo', 'unknown-key', TLSObjectScope.SERVICE),
+ 'unknown_per_host_key': ('unknown-host.foo', 'unknown-key', TLSObjectScope.HOST),
+ 'unknown_global_key': ('unknown-global.foo', 'unknown-key', TLSObjectScope.GLOBAL),
+ 'key_with_unknown_scope': ('unknown-global.foo', 'unknown-key', TLSObjectScope.UNKNOWN),
+ }
+
+ # Mock function to simulate store behavior
def _fake_prefix_store(key):
+ from itertools import chain
if key == 'cert_store.cert.':
return {
- f'{TLSOBJECT_STORE_CERT_PREFIX}rgw_frontend_ssl_cert': json.dumps({'rgw.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()}),
- f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_server_cert': json.dumps({'nvmeof.foo': Cert(nvmeof_server_cert, True).to_json()}),
- f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_client_cert': json.dumps({'nvmeof.foo': Cert(nvmeof_client_cert, True).to_json()}),
- f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_root_ca_cert': json.dumps({'nvmeof.foo': Cert(nvmeof_root_ca_cert, True).to_json()}),
- f'{TLSOBJECT_STORE_CERT_PREFIX}{unknown_cert_entity}': json.dumps({'unkonwn.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()}),
+ f'{TLSOBJECT_STORE_CERT_PREFIX}{cert_name}': json.dumps(
+ {target: Cert(cert_value, True).to_json()} if scope != TLSObjectScope.GLOBAL
+ else Cert(cert_value, True).to_json()
+ )
+ for cert_name, (target, cert_value, scope) in chain(certs.items(), unknown_certs.items())
}
elif key == 'cert_store.key.':
return {
- f'{TLSOBJECT_STORE_KEY_PREFIX}grafana_key': json.dumps({'host1': PrivKey(grafana_host1_key).to_json()}),
- f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_server_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_server_key).to_json()}),
- f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_client_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_client_key).to_json()}),
- f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_encryption_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_encryption_key).to_json()}),
- f'{TLSOBJECT_STORE_KEY_PREFIX}{unknown_cert_key}': json.dumps({'unkonwn.foo': PrivKey(nvmeof_encryption_key).to_json()}),
+ f'{TLSOBJECT_STORE_KEY_PREFIX}{key_name}': json.dumps(
+ {target: PrivKey(key_value).to_json()} if scope != TLSObjectScope.GLOBAL
+ else PrivKey(key_value).to_json()
+ )
+ for key_name, (target, key_value, scope) in chain(keys.items(), unknown_keys.items())
}
else:
- raise Exception(f'Get store with unexpected value {key}')
+ raise Exception(f'Unexpected key access in store: {key}')
+ # Inject the mock store behavior and the cert manager
_get_store_prefix.side_effect = _fake_prefix_store
cephadm_module._init_cert_mgr()
- assert cephadm_module.cert_mgr.cert_store.known_entities['rgw_frontend_ssl_cert']['rgw.foo'] == Cert(rgw_frontend_rgw_foo_host2_cert, True)
- assert cephadm_module.cert_mgr.cert_store.known_entities['nvmeof_server_cert']['nvmeof.foo'] == Cert(nvmeof_server_cert, True)
- assert cephadm_module.cert_mgr.cert_store.known_entities['nvmeof_client_cert']['nvmeof.foo'] == Cert(nvmeof_client_cert, True)
- assert cephadm_module.cert_mgr.cert_store.known_entities['nvmeof_root_ca_cert']['nvmeof.foo'] == Cert(nvmeof_root_ca_cert, True)
- assert cephadm_module.cert_mgr.key_store.known_entities['grafana_key']['host1'] == PrivKey(grafana_host1_key)
- assert unknown_cert_entity not in cephadm_module.cert_mgr.cert_store.known_entities
-
- assert cephadm_module.cert_mgr.key_store.known_entities['nvmeof_server_key']['nvmeof.foo'] == PrivKey(nvmeof_server_key)
- assert cephadm_module.cert_mgr.key_store.known_entities['nvmeof_client_key']['nvmeof.foo'] == PrivKey(nvmeof_client_key)
- assert cephadm_module.cert_mgr.key_store.known_entities['nvmeof_encryption_key']['nvmeof.foo'] == PrivKey(nvmeof_encryption_key)
- assert unknown_cert_key not in cephadm_module.cert_mgr.key_store.known_entities
+ # Validate certificates in cert_store
+ for cert_name, (target, cert_value, scope) in certs.items():
+ assert cert_name in cephadm_module.cert_mgr.cert_store.known_entities
+ if scope == TLSObjectScope.GLOBAL:
+ assert cephadm_module.cert_mgr.cert_store.known_entities[cert_name] == Cert(cert_value, True)
+ else:
+ assert cephadm_module.cert_mgr.cert_store.known_entities[cert_name][target] == Cert(cert_value, True)
+
+ # Validate keys in key_store
+ for key_name, (target, key_value, scope) in keys.items():
+ assert key_name in cephadm_module.cert_mgr.key_store.known_entities
+ if scope == TLSObjectScope.GLOBAL:
+ assert cephadm_module.cert_mgr.key_store.known_entities[key_name] == PrivKey(key_value)
+ else:
+ assert cephadm_module.cert_mgr.key_store.known_entities[key_name][target] == PrivKey(key_value)
+
+ # Check unknown certificates are not loaded
+ for unknown_cert in unknown_certs:
+ assert unknown_cert not in cephadm_module.cert_mgr.cert_store.known_entities
+
+ # Check unknown keys are not loaded
+ for unknown_key in unknown_keys:
+ assert unknown_key not in cephadm_module.cert_mgr.key_store.known_entities
def test_tlsobject_store_get_cert_key(self, cephadm_module: CephadmOrchestrator):