CEPHADM_ROOT_CA_CERT = 'cephadm_root_ca_cert'
CEPHADM_ROOT_CA_KEY = 'cephadm_root_ca_key'
- CEPHADM_CERTMGR_HEALTH_ERR = 'CEPHADM_CERT_ERROR'
+
+ CEPHADM_CERT_ERROR = 'CEPHADM_CERT_ERROR'
+ CEPHADM_CERT_WARNING = 'CEPHADM_CERT_WARNING'
+
CEPHADM_SIGNED = 'cephadm-signed'
LABEL_SEPARATOR = "__lbl__"
custom_san_list: Optional[List[str]] = None,
duration_in_days: Optional[int] = None,
) -> TLSCredentials:
- cert, key = self.ssl_certs.generate_cert(host_fqdn, node_ip, custom_san_list=custom_san_list, duration_in_days=duration_in_days)
+ cert, key = self.ssl_certs.generate_cert(
+ host_fqdn,
+ node_ip,
+ custom_san_list=custom_san_list,
+ duration_in_days=duration_in_days
+ )
ca_cert = self.mgr.cert_mgr.get_root_ca()
return TLSCredentials(cert=cert, key=key, ca_cert=ca_cert)
ca_cert = self.mgr.cert_mgr.get_root_ca()
return TLSCredentials(cert=cert, key=key, ca_cert=ca_cert)
- def save_cert(self, cert_name: str, cert: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False, editable: bool = False) -> None:
+ def save_cert(self, cert_name: str, cert: str, service_name: Optional[str] = None, host: Optional[str] = None,
+ user_made: bool = False, editable: bool = False) -> None:
self.cert_store.save_tlsobject(cert_name, cert, service_name, host, user_made, editable)
- def save_key(self, key_name: str, key: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False, editable: bool = False) -> None:
+ def save_key(self, key_name: str, key: str, service_name: Optional[str] = None, host: Optional[str] = None,
+ user_made: bool = False, editable: bool = False) -> None:
self.key_store.save_tlsobject(key_name, key, service_name, host, user_made, editable)
- def save_self_signed_cert_key_pair(self, service_name: str, tls_creds: TLSCredentials, host: str, label: Optional[str] = None) -> None:
+ def save_self_signed_cert_key_pair(self, service_name: str, tls_creds: TLSCredentials, host: str,
+ label: Optional[str] = None) -> None:
ss_cert_name = self.self_signed_cert(service_name, label)
ss_key_name = self.self_signed_key(service_name, label)
self.cert_store.save_tlsobject(ss_cert_name, tls_creds.cert, host=host, user_made=False)
if (cert_info.cert_name, cert_info.target) not in previously_reported_issues:
self.certificates_health_report.append(cert_info)
+ # Nothing to report => clear both checks
if not self.certificates_health_report:
- self.mgr.remove_health_warning(CertMgr.CEPHADM_CERTMGR_HEALTH_ERR)
+ self.mgr.remove_health_warning(self.CEPHADM_CERT_ERROR)
+ self.mgr.remove_health_warning(self.CEPHADM_CERT_WARNING)
return
- detailed_error_msgs = []
- invalid_count = 0
- expired_count = 0
- expiring_count = 0
+ # Split issues by severity
+ error_certs: List[CertInfo] = []
+ warn_certs: List[CertInfo] = []
for cert_info in self.certificates_health_report:
- cert_status = cert_info.get_status_description()
- detailed_error_msgs.append(cert_status)
- if not cert_info.is_valid:
- if 'expired' in cert_info.error_info:
- expired_count += 1
- else:
- invalid_count += 1
- elif cert_info.is_close_to_expiration:
- expiring_count += 1
-
- # Generate a short description with a summary of all the detected issues
- issues = [
- f'{invalid_count} {CertStatus.INVALID}' if invalid_count > 0 else '',
- f'{expired_count} {CertStatus.EXPIRED}' if expired_count > 0 else '',
- f'{expiring_count} {CertStatus.EXPIRING}' if expiring_count > 0 else ''
- ]
- issues_description = ', '.join(filter(None, issues)) # collect only non-empty issues
- total_issues = invalid_count + expired_count + expiring_count
- short_error_msg = (f'Detected {total_issues} cephadm certificate(s) issues: {issues_description}')
-
- if invalid_count > 0 or expired_count > 0:
- logger.error(short_error_msg)
- self.mgr.set_health_error(CertMgr.CEPHADM_CERTMGR_HEALTH_ERR, short_error_msg, total_issues, detailed_error_msgs)
+ if cert_info.status in (CertStatus.INVALID, CertStatus.EXPIRED):
+ error_certs.append(cert_info)
+ elif cert_info.status == CertStatus.EXPIRING:
+ warn_certs.append(cert_info)
+
+ def _emit_health_issue(check_id: str, is_error: bool, certs: List[CertInfo]) -> None:
+ invalid_count = sum(1 for c in certs if c.status == CertStatus.INVALID)
+ expired_count = sum(1 for c in certs if c.status == CertStatus.EXPIRED)
+ expiring_count = sum(1 for c in certs if c.status == CertStatus.EXPIRING)
+ issues = [
+ f'{invalid_count} {CertStatus.INVALID}' if invalid_count else '',
+ f'{expired_count} {CertStatus.EXPIRED}' if expired_count else '',
+ f'{expiring_count} {CertStatus.EXPIRING}' if expiring_count else '',
+ ]
+ total = len(certs)
+ issues_description = ', '.join(filter(None, issues))
+ short_msg = f'Detected {total} cephadm certificate(s) issues: {issues_description}'
+
+ detailed_msgs = [c.get_status_description() for c in certs]
+ if is_error:
+ logger.error(short_msg)
+ self.mgr.set_health_error(check_id, short_msg, total, detailed_msgs)
+ else:
+ logger.warning(short_msg)
+ self.mgr.set_health_warning(check_id, short_msg, total, detailed_msgs)
+
+ # Emit/clear error check
+ if error_certs:
+ _emit_health_issue(self.CEPHADM_CERT_ERROR, True, error_certs)
else:
- logger.warning(short_error_msg)
- self.mgr.set_health_warning(CertMgr.CEPHADM_CERTMGR_HEALTH_ERR, short_error_msg, total_issues, detailed_error_msgs)
+ self.mgr.remove_health_warning(self.CEPHADM_CERT_ERROR)
+
+ # Emit/clear warning check
+ if warn_certs:
+ _emit_health_issue(self.CEPHADM_CERT_WARNING, False, warn_certs)
+ else:
+ self.mgr.remove_health_warning(self.CEPHADM_CERT_WARNING)
def check_certificate_state(self, cert_name: str, target: str, cert: str, key: Optional[str] = None) -> CertInfo:
"""
def get_key(cert_name: str, key_name: str, target: Optional[str]) -> Optional[PrivKey]:
try:
tlsobj_target = self.cert_store.determine_tlsobject_target(cert_name, target)
- key = cast(PrivKey, self.key_store.get_tlsobject(key_name,
- service_name=tlsobj_target.service,
- host=tlsobj_target.host))
+ key = cast(
+ PrivKey,
+ self.key_store.get_tlsobject(
+ key_name,
+ service_name=tlsobj_target.service,
+ host=tlsobj_target.host
+ )
+ )
return key
except TLSObjectException:
return None
if key_obj:
# certificate has a key, let's check the cert/key pair
cert_info = self._check_certificate_state(cert_name, target, cert_obj, key_obj)
-
elif any(key_name in ks for ks in self.known_keys.values()) or self.is_cephadm_signed_object(key_name):
# certificate is supposed to have a key but it's missing
logger.error(f"Key '{key_name}' is missing for certificate '{cert_name}'.")
else:
return False
+ def log_issue(cert_info: CertInfo) -> None:
+ msg = cert_info.get_status_description()
+ if cert_info.status in (CertStatus.INVALID, CertStatus.EXPIRED):
+ logger.error(msg)
+ elif cert_info.status == CertStatus.EXPIRING:
+ logger.warning(msg)
+ else:
+ logger.info(msg)
+
# Process all problematic certificates and try to fix them in case automated certs renewal
# is enabled. Successfully fixed ones are collected to trigger a service reconfiguration.
- certs_with_issues = []
+ certs_with_issues: List[CertInfo] = []
services_to_reconfig = set()
for cert_info, cert_obj in self.get_problematic_certificates():
- logger.warning(cert_info.get_status_description())
+ log_issue(cert_info)
if requires_user_intervention(cert_info, cert_obj):
certs_with_issues.append(cert_info)