]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: add logic to handle certmgr errors and warnings
authorRedouane Kachach <rkachach@ibm.com>
Tue, 20 Jan 2026 12:05:03 +0000 (13:05 +0100)
committerRedouane Kachach <rkachach@ibm.com>
Fri, 13 Feb 2026 09:00:41 +0000 (10:00 +0100)
Add support in certmgr to distinguish between warnings and
errors. Previously, a single code (CEPHADM_CERT_ERROR) was used for
both, which complicated the Dashboard logic and forced ad-hoc checks
to determine whether a reported event was a warning or an error. In the
new code, CEPHADM_CERT_ERROR and CEPHADM_CERT_WARNING are used to
clearly distinguish between the two cases such as:

CEPHADM_CERT_ERROR: only INVALID/EXPIRED certificates
CEPHADM_CERT_WARNING: only EXPIRING certificates

https://tracker.ceph.com/issues/74467

Signed-off-by: Redouane Kachach <rkachach@ibm.com>
src/pybind/mgr/cephadm/cert_mgr.py

index b636f3ddce98849182fba6161c5ae7317e56b8da..e50463935a8cce59d5f83f4620f7c33724dba86a 100644 (file)
@@ -159,7 +159,10 @@ class CertMgr:
 
     CEPHADM_ROOT_CA_CERT = 'cephadm_root_ca_cert'
     CEPHADM_ROOT_CA_KEY = 'cephadm_root_ca_key'
-    CEPHADM_CERTMGR_HEALTH_ERR = 'CEPHADM_CERT_ERROR'
+
+    CEPHADM_CERT_ERROR = 'CEPHADM_CERT_ERROR'
+    CEPHADM_CERT_WARNING = 'CEPHADM_CERT_WARNING'
+
     CEPHADM_SIGNED = 'cephadm-signed'
     LABEL_SEPARATOR = "__lbl__"
 
@@ -315,7 +318,12 @@ class CertMgr:
         custom_san_list: Optional[List[str]] = None,
         duration_in_days: Optional[int] = None,
     ) -> TLSCredentials:
-        cert, key = self.ssl_certs.generate_cert(host_fqdn, node_ip, custom_san_list=custom_san_list, duration_in_days=duration_in_days)
+        cert, key = self.ssl_certs.generate_cert(
+            host_fqdn,
+            node_ip,
+            custom_san_list=custom_san_list,
+            duration_in_days=duration_in_days
+        )
         ca_cert = self.mgr.cert_mgr.get_root_ca()
         return TLSCredentials(cert=cert, key=key, ca_cert=ca_cert)
 
@@ -348,13 +356,16 @@ class CertMgr:
         ca_cert = self.mgr.cert_mgr.get_root_ca()
         return TLSCredentials(cert=cert, key=key, ca_cert=ca_cert)
 
-    def save_cert(self, cert_name: str, cert: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False, editable: bool = False) -> None:
+    def save_cert(self, cert_name: str, cert: str, service_name: Optional[str] = None, host: Optional[str] = None,
+                  user_made: bool = False, editable: bool = False) -> None:
         self.cert_store.save_tlsobject(cert_name, cert, service_name, host, user_made, editable)
 
-    def save_key(self, key_name: str, key: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False, editable: bool = False) -> None:
+    def save_key(self, key_name: str, key: str, service_name: Optional[str] = None, host: Optional[str] = None,
+                 user_made: bool = False, editable: bool = False) -> None:
         self.key_store.save_tlsobject(key_name, key, service_name, host, user_made, editable)
 
-    def save_self_signed_cert_key_pair(self, service_name: str, tls_creds: TLSCredentials, host: str, label: Optional[str] = None) -> None:
+    def save_self_signed_cert_key_pair(self, service_name: str, tls_creds: TLSCredentials, host: str,
+                                       label: Optional[str] = None) -> None:
         ss_cert_name = self.self_signed_cert(service_name, label)
         ss_key_name = self.self_signed_key(service_name, label)
         self.cert_store.save_tlsobject(ss_cert_name, tls_creds.cert, host=host, user_made=False)
@@ -551,41 +562,53 @@ class CertMgr:
             if (cert_info.cert_name, cert_info.target) not in previously_reported_issues:
                 self.certificates_health_report.append(cert_info)
 
+        # Nothing to report => clear both checks
         if not self.certificates_health_report:
-            self.mgr.remove_health_warning(CertMgr.CEPHADM_CERTMGR_HEALTH_ERR)
+            self.mgr.remove_health_warning(self.CEPHADM_CERT_ERROR)
+            self.mgr.remove_health_warning(self.CEPHADM_CERT_WARNING)
             return
 
-        detailed_error_msgs = []
-        invalid_count = 0
-        expired_count = 0
-        expiring_count = 0
+        # Split issues by severity
+        error_certs: List[CertInfo] = []
+        warn_certs: List[CertInfo] = []
         for cert_info in self.certificates_health_report:
-            cert_status = cert_info.get_status_description()
-            detailed_error_msgs.append(cert_status)
-            if not cert_info.is_valid:
-                if 'expired' in cert_info.error_info:
-                    expired_count += 1
-                else:
-                    invalid_count += 1
-            elif cert_info.is_close_to_expiration:
-                expiring_count += 1
-
-        # Generate a short description with a summary of all the detected issues
-        issues = [
-            f'{invalid_count} {CertStatus.INVALID}' if invalid_count > 0 else '',
-            f'{expired_count} {CertStatus.EXPIRED}' if expired_count > 0 else '',
-            f'{expiring_count} {CertStatus.EXPIRING}' if expiring_count > 0 else ''
-        ]
-        issues_description = ', '.join(filter(None, issues))  # collect only non-empty issues
-        total_issues = invalid_count + expired_count + expiring_count
-        short_error_msg = (f'Detected {total_issues} cephadm certificate(s) issues: {issues_description}')
-
-        if invalid_count > 0 or expired_count > 0:
-            logger.error(short_error_msg)
-            self.mgr.set_health_error(CertMgr.CEPHADM_CERTMGR_HEALTH_ERR, short_error_msg, total_issues, detailed_error_msgs)
+            if cert_info.status in (CertStatus.INVALID, CertStatus.EXPIRED):
+                error_certs.append(cert_info)
+            elif cert_info.status == CertStatus.EXPIRING:
+                warn_certs.append(cert_info)
+
+        def _emit_health_issue(check_id: str, is_error: bool, certs: List[CertInfo]) -> None:
+            invalid_count = sum(1 for c in certs if c.status == CertStatus.INVALID)
+            expired_count = sum(1 for c in certs if c.status == CertStatus.EXPIRED)
+            expiring_count = sum(1 for c in certs if c.status == CertStatus.EXPIRING)
+            issues = [
+                f'{invalid_count} {CertStatus.INVALID}' if invalid_count else '',
+                f'{expired_count} {CertStatus.EXPIRED}' if expired_count else '',
+                f'{expiring_count} {CertStatus.EXPIRING}' if expiring_count else '',
+            ]
+            total = len(certs)
+            issues_description = ', '.join(filter(None, issues))
+            short_msg = f'Detected {total} cephadm certificate(s) issues: {issues_description}'
+
+            detailed_msgs = [c.get_status_description() for c in certs]
+            if is_error:
+                logger.error(short_msg)
+                self.mgr.set_health_error(check_id, short_msg, total, detailed_msgs)
+            else:
+                logger.warning(short_msg)
+                self.mgr.set_health_warning(check_id, short_msg, total, detailed_msgs)
+
+        # Emit/clear error check
+        if error_certs:
+            _emit_health_issue(self.CEPHADM_CERT_ERROR, True, error_certs)
         else:
-            logger.warning(short_error_msg)
-            self.mgr.set_health_warning(CertMgr.CEPHADM_CERTMGR_HEALTH_ERR, short_error_msg, total_issues, detailed_error_msgs)
+            self.mgr.remove_health_warning(self.CEPHADM_CERT_ERROR)
+
+        # Emit/clear warning check
+        if warn_certs:
+            _emit_health_issue(self.CEPHADM_CERT_WARNING, False, warn_certs)
+        else:
+            self.mgr.remove_health_warning(self.CEPHADM_CERT_WARNING)
 
     def check_certificate_state(self, cert_name: str, target: str, cert: str, key: Optional[str] = None) -> CertInfo:
         """
@@ -619,9 +642,14 @@ class CertMgr:
         def get_key(cert_name: str, key_name: str, target: Optional[str]) -> Optional[PrivKey]:
             try:
                 tlsobj_target = self.cert_store.determine_tlsobject_target(cert_name, target)
-                key = cast(PrivKey, self.key_store.get_tlsobject(key_name,
-                                                                 service_name=tlsobj_target.service,
-                                                                 host=tlsobj_target.host))
+                key = cast(
+                    PrivKey,
+                    self.key_store.get_tlsobject(
+                        key_name,
+                        service_name=tlsobj_target.service,
+                        host=tlsobj_target.host
+                    )
+                )
                 return key
             except TLSObjectException:
                 return None
@@ -640,7 +668,6 @@ class CertMgr:
             if key_obj:
                 # certificate has a key, let's check the cert/key pair
                 cert_info = self._check_certificate_state(cert_name, target, cert_obj, key_obj)
-
             elif any(key_name in ks for ks in self.known_keys.values()) or self.is_cephadm_signed_object(key_name):
                 # certificate is supposed to have a key but it's missing
                 logger.error(f"Key '{key_name}' is missing for certificate '{cert_name}'.")
@@ -706,13 +733,22 @@ class CertMgr:
             else:
                 return False
 
+        def log_issue(cert_info: CertInfo) -> None:
+            msg = cert_info.get_status_description()
+            if cert_info.status in (CertStatus.INVALID, CertStatus.EXPIRED):
+                logger.error(msg)
+            elif cert_info.status == CertStatus.EXPIRING:
+                logger.warning(msg)
+            else:
+                logger.info(msg)
+
         # Process all problematic certificates and try to fix them in case automated certs renewal
         # is enabled. Successfully fixed ones are collected to trigger a service reconfiguration.
-        certs_with_issues = []
+        certs_with_issues: List[CertInfo] = []
         services_to_reconfig = set()
         for cert_info, cert_obj in self.get_problematic_certificates():
 
-            logger.warning(cert_info.get_status_description())
+            log_issue(cert_info)
 
             if requires_user_intervention(cert_info, cert_obj):
                 certs_with_issues.append(cert_info)