From 42a6d66346f7095ed6bf7e502fa6af94c827e80a Mon Sep 17 00:00:00 2001 From: Abhishek Desai Date: Mon, 1 Dec 2025 13:30:25 +0530 Subject: [PATCH] mgr/dasboard : Injest certificate mgmt API into services API fixes : https://tracker.ceph.com/issues/74039 Signed-off-by: Abhishek Desai Assisted-by: Cursor --- .../mgr/dashboard/controllers/certificate.py | 181 ++++++ .../mgr/dashboard/controllers/service.py | 17 +- src/pybind/mgr/dashboard/model/certificate.py | 104 ++++ src/pybind/mgr/dashboard/openapi.yaml | 166 +++++ .../mgr/dashboard/services/certificate.py | 527 ++++++++++++++++ .../mgr/dashboard/services/orchestrator.py | 5 + .../mgr/dashboard/tests/test_certificate.py | 589 ++++++++++++++++++ 7 files changed, 1587 insertions(+), 2 deletions(-) create mode 100644 src/pybind/mgr/dashboard/controllers/certificate.py create mode 100644 src/pybind/mgr/dashboard/model/certificate.py create mode 100644 src/pybind/mgr/dashboard/services/certificate.py create mode 100644 src/pybind/mgr/dashboard/tests/test_certificate.py diff --git a/src/pybind/mgr/dashboard/controllers/certificate.py b/src/pybind/mgr/dashboard/controllers/certificate.py new file mode 100644 index 00000000000..ebcdb9e68d3 --- /dev/null +++ b/src/pybind/mgr/dashboard/controllers/certificate.py @@ -0,0 +1,181 @@ +from typing import Any, Dict, List, Optional + +from ceph.deployment.service_spec import ServiceSpec + +from ..exceptions import DashboardException +from ..model.certificate import CEPHADM_ROOT_CA_CERT, CERTIFICATE_LIST_SCHEMA, \ + CertificateScope, CertificateStatus +from ..security import Scope +from ..services.certificate import CertificateService +from ..services.exception import handle_orchestrator_error +from ..services.orchestrator import OrchClient, OrchFeature +from ..tools import str_to_bool +from . import APIDoc, APIRouter, EndpointDoc, ReadPermission, RESTController +from .orchestrator import raise_if_no_orchestrator + + +@APIRouter('/service/certificate', Scope.HOSTS) +@APIDoc("Service Management API", "Service") +class Certificate(RESTController): + + @EndpointDoc("List All Certificates", + parameters={ + 'status': (str, 'Filter by certificate status ' + '(e.g., "expired", "expiring", "valid", "invalid")'), + 'scope': (str, 'Filter by certificate scope ' + '(e.g., "service", "host", "global")'), + 'service_type': (str, 'Filter by certificate type ' + '(e.g., "rgw*")'), + 'include_cephadm_signed': (bool, 'Include cephadm-signed certificates ' + 'in the list (default: False)') + }, + responses={200: CERTIFICATE_LIST_SCHEMA}) + @raise_if_no_orchestrator([OrchFeature.SERVICE_LIST]) + @ReadPermission + @handle_orchestrator_error('certificate') + def list(self, status: Optional[str] = None, scope: Optional[str] = None, + service_type: Optional[str] = None, + include_cephadm_signed: bool = False) -> List[Dict]: + """ + List all certificates configured in the cluster. + + This endpoint returns a list of all certificates managed by certmgr, + including both user-provided and cephadm-signed certificates. + + :param status: Filter by certificate status. Valid values: 'expired', + 'expiring', 'valid', 'invalid' + :param scope: Filter by certificate scope. Valid values: 'SERVICE', + 'HOST', 'GLOBAL' + :param service_type: Filter by service type. Supports wildcards + (e.g., 'rgw*') + :param include_cephadm_signed: If True, include cephadm-signed certificates. + If False (default), only user-provided certificates are returned. + :return: List of certificate objects with their details + """ + orch = OrchClient.instance() + + status_value = None + scope_value = None + + if status: + try: + status_value = CertificateStatus(status.lower()).value + except ValueError: + valid_vals = ", ".join([s.value for s in CertificateStatus]) + raise DashboardException( + msg=f'Invalid status: {status}. Valid values are: {valid_vals}') + + if scope: + try: + scope_value = CertificateScope(scope.lower()).value + except ValueError: + valid_vals = ", ".join([s.value for s in CertificateScope]) + raise DashboardException( + msg=f'Invalid scope: {scope}. Valid values are: {valid_vals}') + + include_cephadm_signed = str_to_bool(include_cephadm_signed) + + filter_parts = [] + if status_value: + filter_parts.append(f'status={status_value}') + if scope_value: + filter_parts.append(f'scope={scope_value}') + if service_type: + filter_parts.append(f'name=*{service_type.lower()}*') + + filter_by = ','.join(filter_parts) + + cert_ls_data = CertificateService.fetch_all_certificates( + orch, filter_by=filter_by or '', show_details=False, + include_cephadm_signed=include_cephadm_signed + ) + + # Transform certificate data into a list format + # Note: Filtering is already done by cert_ls via filter_by parameter + certificates_list = CertificateService.process_certificates_for_list(cert_ls_data) + + return certificates_list + + @raise_if_no_orchestrator([OrchFeature.SERVICE_LIST, OrchFeature.DAEMON_LIST]) + @ReadPermission + @handle_orchestrator_error('certificate') + def get(self, service_name: str) -> Dict[str, Any]: + """ + Get detailed certificate information for a service. + + :param service_name: The service name, e.g. 'rgw.myzone'. + :return: Detailed certificate information including full certificate details + """ + orch = OrchClient.instance() + + # Get service information + services = orch.services.get(service_name) + if not services: + raise DashboardException( + msg=f'Service {service_name} not found', + http_status_code=404, + component='certificate' + ) + service = services[0] + + service_type = service.spec.service_type + service_name_full = service.spec.service_name() + + cert_config = ServiceSpec.REQUIRES_CERTIFICATES.get(service_type) + if not cert_config: + return CertificateService.empty_response() + + user_cert_name = f"{service_type.replace('-', '_')}_ssl_cert" + cephadm_cert_name = f"cephadm-signed_{service_type}_cert" + cert_scope = CertificateScope(cert_config.get('scope', CertificateScope.SERVICE.value)) + + cert_ls_data = CertificateService.fetch_certificates_for_service( + orch, service_type, user_cert_name, cephadm_cert_name + ) + + daemon_hostnames, _ = CertificateService.get_daemon_hostnames(orch, service_name_full) + + # try user-provided first, then cephadm-signed + cert_details, target_key, cert_name, cert_scope_str = \ + CertificateService.find_certificate_for_service( + cert_ls_data, service_type, service_name_full, cert_scope, daemon_hostnames + ) + + return CertificateService.build_certificate_status_response( + cert_details, cert_name or user_cert_name, cert_scope_str, target_key, + include_target=True, include_details=True + ) + + @EndpointDoc("Get Root CA Certificate", + responses={ + 200: { + 'certificate': (str, 'Root CA certificate in PEM format') + } + }) + @RESTController.Collection('GET', path='/root-ca') + @raise_if_no_orchestrator([OrchFeature.SERVICE_LIST]) + @ReadPermission + @handle_orchestrator_error('certificate') + def root_ca(self) -> Dict[str, str]: + """ + Get the cephadm root CA certificate. + + This endpoint returns the root Certificate Authority (CA) certificate + used by cephadm to sign other certificates in the cluster. + + :return: Dictionary with certificate field containing root CA certificate in PEM format + """ + orch = OrchClient.instance() + + root_ca_cert_name = CEPHADM_ROOT_CA_CERT + + root_ca_cert = orch.cert_store.get_cert(root_ca_cert_name) + + if not root_ca_cert: + raise DashboardException( + msg='Root CA certificate not found', + http_status_code=404, + component='certificate' + ) + + return root_ca_cert diff --git a/src/pybind/mgr/dashboard/controllers/service.py b/src/pybind/mgr/dashboard/controllers/service.py index 4e419b41384..de265d465a0 100644 --- a/src/pybind/mgr/dashboard/controllers/service.py +++ b/src/pybind/mgr/dashboard/controllers/service.py @@ -4,6 +4,7 @@ import cherrypy from ceph.deployment.service_spec import ServiceSpec from ..security import Scope +from ..services.certificate import CertificateService from ..services.exception import handle_custom_error, handle_orchestrator_error from ..services.orchestrator import OrchClient, OrchFeature from . import APIDoc, APIRouter, CreatePermission, DeletePermission, Endpoint, \ @@ -36,7 +37,12 @@ class Service(RESTController): orch = OrchClient.instance() services, count = orch.services.list(service_name=service_name, offset=int(offset), limit=int(limit), search=search, sort=sort) - cherrypy.response.headers['X-Total-Count'] = count + + # Get all certificates and enrich services with certificate status + cert_ls_data = CertificateService.fetch_all_certificates(orch) + CertificateService.enrich_services_with_certificates(orch, services, cert_ls_data) + + cherrypy.response.headers['X-Total-Count'] = str(count) return services @raise_if_no_orchestrator([OrchFeature.SERVICE_LIST]) @@ -45,7 +51,14 @@ class Service(RESTController): services = orch.services.get(service_name) if not services: raise cherrypy.HTTPError(404, 'Service {} not found'.format(service_name)) - return services[0].to_json() + + service = services[0].to_json() + + # Get all certificates and enrich single service + cert_ls_data = CertificateService.fetch_all_certificates(orch) + CertificateService.enrich_services_with_certificates(orch, [service], cert_ls_data) + + return service @RESTController.Resource('GET') @raise_if_no_orchestrator([OrchFeature.DAEMON_LIST]) diff --git a/src/pybind/mgr/dashboard/model/certificate.py b/src/pybind/mgr/dashboard/model/certificate.py new file mode 100644 index 00000000000..9684464193c --- /dev/null +++ b/src/pybind/mgr/dashboard/model/certificate.py @@ -0,0 +1,104 @@ +""" +Model definitions for certificate API responses. + +These classes centralize the enums and response shapes so callers avoid +constructing ad-hoc dictionaries and can rely on consistent typing. +""" +from dataclasses import asdict, dataclass +from enum import Enum +from typing import Any, Dict, Optional + + +class CertificateStatus(str, Enum): + EXPIRED = 'expired' + EXPIRING = 'expiring' + VALID = 'valid' + INVALID = 'invalid' + NOT_CONFIGURED = 'not_configured' + + +class CertificateScope(str, Enum): + SERVICE = 'service' + HOST = 'host' + GLOBAL = 'global' + + +@dataclass +class CertificateListEntry: + STRING_FIELDS = ( + 'cert_name', 'scope', 'signed_by', 'status', + 'expiry_date', 'issuer', 'common_name', 'target' + ) + + cert_name: str + scope: str + signed_by: str + status: CertificateStatus + days_to_expiration: Optional[int] + expiry_date: Optional[str] + issuer: Optional[str] + common_name: Optional[str] + target: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + data = asdict(self) + data['status'] = self.status.value + for key in self.STRING_FIELDS: + if data.get(key) is None: + data[key] = '' + return data + + +@dataclass +class CertificateStatusResponse: + STRING_FIELDS = ( + 'cert_name', 'scope', 'status', 'signed_by', + 'certificate_source', 'expiry_date', 'issuer', + 'common_name', 'target', 'error' + ) + BOOL_FIELDS = ('requires_certificate', 'has_certificate') + + cert_name: Optional[str] + scope: Optional[str] + requires_certificate: bool = True + status: Optional[CertificateStatus] = None + days_to_expiration: Optional[int] = None + signed_by: Optional[str] = None + has_certificate: bool = False + certificate_source: Optional[str] = None + expiry_date: Optional[str] = None + issuer: Optional[str] = None + common_name: Optional[str] = None + target: Optional[str] = None + details: Optional[Dict[str, Any]] = None + error: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + data = asdict(self) + data['status'] = self.status.value if self.status else None + for key in self.STRING_FIELDS: + if data.get(key) is None: + data[key] = '' + for key in self.BOOL_FIELDS: + if data.get(key) is None: + data[key] = False + return data + + +# Constants shared with cephadm certificate handling +CEPHADM_ROOT_CA_CERT = 'cephadm_root_ca_cert' +CEPHADM_SIGNED_CERT = 'cephadm-signed' + + +# Shared schema for certificate list API responses +CERTIFICATE_LIST_SCHEMA = [{ + 'cert_name': (str, 'Certificate name'), + 'scope': (str, 'Certificate scope (SERVICE, HOST, or GLOBAL)'), + 'signed_by': (str, 'Certificate issuer (user or cephadm)'), + 'status': (str, 'Certificate status (valid, expiring, expired, invalid)'), + 'days_to_expiration': (int, 'Days remaining until expiration'), + 'expiry_date': (str, 'Certificate expiration date'), + 'issuer': (str, 'Certificate issuer distinguished name'), + 'common_name': (str, 'Certificate common name (CN)'), + 'target': (str, 'Certificate target (service name or hostname)'), +}] diff --git a/src/pybind/mgr/dashboard/openapi.yaml b/src/pybind/mgr/dashboard/openapi.yaml index e7d9677ee48..e2631047e05 100755 --- a/src/pybind/mgr/dashboard/openapi.yaml +++ b/src/pybind/mgr/dashboard/openapi.yaml @@ -17015,6 +17015,172 @@ paths: - jwt: [] tags: - Service + /api/service/certificate: + get: + description: "\n List all certificates configured in the cluster.\n\n\ + \ This endpoint returns a list of all certificates managed by certmgr,\n\ + \ including both user-provided and cephadm-signed certificates.\n\n\ + \ :param status: Filter by certificate status. Valid values: 'expired',\n\ + \ 'expiring', 'valid', 'invalid'\n :param scope: Filter\ + \ by certificate scope. Valid values: 'SERVICE',\n 'HOST', 'GLOBAL'\n\ + \ :param service_type: Filter by service type. Supports wildcards\n\ + \ (e.g., 'rgw*')\n :param include_cephadm_signed: If True,\ + \ include cephadm-signed certificates.\n If False (default), only\ + \ user-provided certificates are returned.\n :return: List of certificate\ + \ objects with their details\n " + parameters: + - allowEmptyValue: true + description: Filter by certificate status (e.g., "expired", "expiring", "valid", + "invalid") + in: query + name: status + schema: + type: string + - allowEmptyValue: true + description: Filter by certificate scope (e.g., "service", "host", "global") + in: query + name: scope + schema: + type: string + - allowEmptyValue: true + description: Filter by certificate type (e.g., "rgw*") + in: query + name: service_type + schema: + type: string + - default: false + description: 'Include cephadm-signed certificates in the list (default: False)' + in: query + name: include_cephadm_signed + schema: + type: boolean + responses: + '200': + content: + application/vnd.ceph.api.v1.0+json: + schema: + items: + properties: + cert_name: + description: Certificate name + type: string + common_name: + description: Certificate common name (CN) + type: string + days_to_expiration: + description: Days remaining until expiration + type: integer + expiry_date: + description: Certificate expiration date + type: string + issuer: + description: Certificate issuer distinguished name + type: string + scope: + description: Certificate scope (SERVICE, HOST, or GLOBAL) + type: string + signed_by: + description: Certificate issuer (user or cephadm) + type: string + status: + description: Certificate status (valid, expiring, expired, invalid) + type: string + target: + description: Certificate target (service name or hostname) + type: string + type: object + required: + - cert_name + - scope + - signed_by + - status + - days_to_expiration + - expiry_date + - issuer + - common_name + - target + type: array + description: OK + '400': + description: Operation exception. Please check the response body for details. + '401': + description: Unauthenticated access. Please login first. + '403': + description: Unauthorized access. Please check your permissions. + '500': + description: Unexpected error. Please check the response body for the stack + trace. + security: + - jwt: [] + summary: List All Certificates + tags: + - Service + /api/service/certificate/root-ca: + get: + description: "\n Get the cephadm root CA certificate.\n\n This\ + \ endpoint returns the root Certificate Authority (CA) certificate\n \ + \ used by cephadm to sign other certificates in the cluster.\n\n \ + \ :return: Dictionary with certificate field containing root CA certificate\ + \ in PEM format\n " + parameters: [] + responses: + '200': + content: + application/vnd.ceph.api.v1.0+json: + schema: + properties: + certificate: + description: Root CA certificate in PEM format + type: string + required: + - certificate + type: object + description: OK + '400': + description: Operation exception. Please check the response body for details. + '401': + description: Unauthenticated access. Please login first. + '403': + description: Unauthorized access. Please check your permissions. + '500': + description: Unexpected error. Please check the response body for the stack + trace. + security: + - jwt: [] + summary: Get Root CA Certificate + tags: + - Service + /api/service/certificate/{service_name}: + get: + description: "\n Get detailed certificate information for a service.\n\ + \n :param service_name: The service name, e.g. 'rgw.myzone'.\n \ + \ :return: Detailed certificate information including full certificate\ + \ details\n " + parameters: + - in: path + name: service_name + required: true + schema: + type: string + responses: + '200': + content: + application/vnd.ceph.api.v1.0+json: + type: object + description: OK + '400': + description: Operation exception. Please check the response body for details. + '401': + description: Unauthenticated access. Please login first. + '403': + description: Unauthorized access. Please check your permissions. + '500': + description: Unexpected error. Please check the response body for the stack + trace. + security: + - jwt: [] + tags: + - Service /api/service/known_types: get: description: "\n Get a list of known service types, e.g. 'alertmanager',\n\ diff --git a/src/pybind/mgr/dashboard/services/certificate.py b/src/pybind/mgr/dashboard/services/certificate.py new file mode 100644 index 00000000000..a880d3c39ad --- /dev/null +++ b/src/pybind/mgr/dashboard/services/certificate.py @@ -0,0 +1,527 @@ +""" +Certificate service for dashboard. + +This service provides certificate management functionality following the +"Thin Controllers, Fat Services" pattern. All business logic for certificate +handling is contained here. +""" +from collections import defaultdict +from typing import Any, Dict, List, Optional, Tuple + +from .. import mgr +from ..model.certificate import CEPHADM_SIGNED_CERT, CertificateListEntry, \ + CertificateScope, CertificateStatus, CertificateStatusResponse +from .orchestrator import OrchClient + + +def _get_certificate_renewal_threshold_days() -> int: + """ + Get the certificate renewal threshold days from cephadm config. + Falls back to default value of 30 if config cannot be retrieved. + + :return: Number of days before expiration to consider certificate as expiring + """ + threshold = mgr.get_module_option_ex('cephadm', 'certificate_renewal_threshold_days', 30) + return int(threshold) + + +def _determine_certificate_status(remaining_days: int) -> CertificateStatus: + """ + Determine certificate status based on remaining days until expiration. + + :param remaining_days: Number of days remaining until certificate expiration + :return: Status string (CertificateStatus.EXPIRED, EXPIRING, or VALID) + """ + renewal_threshold = _get_certificate_renewal_threshold_days() + if remaining_days < 0: + return CertificateStatus.EXPIRED + if remaining_days < renewal_threshold: + return CertificateStatus.EXPIRING + return CertificateStatus.VALID + + +def _extract_certificate_basic_info(cert_details: Dict[str, Any]) -> Dict[str, Any]: + """ + Extract basic certificate information from certificate details. + + :param cert_details: Dictionary containing certificate details + :return: Dictionary with extracted information (validity, remaining_days, expiry_date, + subject, issuer, common_name, issuer_str) + """ + validity = cert_details.get('validity', {}) + remaining_days = validity.get('remaining_days', 0) + expiry_date = validity.get('not_after') + + subject = cert_details.get('subject', {}) + issuer = cert_details.get('issuer', {}) + common_name = subject.get('commonName') or subject.get('CN') + issuer_str = (issuer.get('commonName') or issuer.get('CN') or str(issuer) + if issuer else None) + + return { + 'validity': validity, + 'remaining_days': remaining_days, + 'expiry_date': expiry_date, + 'subject': subject, + 'issuer': issuer, + 'common_name': common_name, + 'issuer_str': issuer_str + } + + +def _determine_signed_by(cert_name: str) -> str: + """ + Determine if certificate is signed by cephadm or user based on certificate name. + + :param cert_name: Certificate name + :return: 'cephadm' if cephadm-signed, 'user' otherwise + """ + return 'cephadm' if cert_name and CEPHADM_SIGNED_CERT in cert_name else 'user' + + +def _build_certificate_list_entry(cert_name: str, cert_details: Dict[str, Any], + cert_scope: CertificateScope, target: Optional[str] = None + ) -> CertificateListEntry: + """ + Build a certificate list entry from certificate details. + + :param cert_name: Certificate name + :param cert_details: Certificate details dictionary + :param cert_scope: Certificate scope ('GLOBAL', 'SERVICE', or 'HOST') + :param target: Optional target (service name or hostname) + :return: Certificate list entry dictionary + """ + cert_info = _extract_certificate_basic_info(cert_details) + remaining_days = cert_info['remaining_days'] + expiry_date = cert_info['expiry_date'] + common_name = cert_info['common_name'] + issuer_str = cert_info['issuer_str'] + + status = _determine_certificate_status(remaining_days) + signed_by = _determine_signed_by(cert_name) + + return CertificateListEntry( + cert_name=cert_name, + scope=cert_scope.value.upper(), + signed_by=signed_by, + status=status, + days_to_expiration=remaining_days, + expiry_date=expiry_date, + issuer=issuer_str, + common_name=common_name, + target=target + ) + + +def _get_certificate_response_template(cert_name: Optional[str], cert_scope_str: Optional[str], + target_key: Optional[str] = None + ) -> CertificateStatusResponse: + """ + Get a certificate response template with all keys initialized. + + :param cert_name: Certificate name (can be None) + :param cert_scope_str: Certificate scope (can be None) + :param target_key: Optional target key (service name or hostname) + :return: Dictionary template with all certificate response keys + """ + return CertificateStatusResponse( + cert_name=cert_name, + scope=cert_scope_str, + target=target_key + ) + + +def _select_service_certificate(certificates: Any, service_name: str + ) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: + """ + Pick certificate details for a service-scoped certificate. + """ + if not isinstance(certificates, dict): + return (None, None) + + target_key: Optional[str] = None + if service_name and service_name in certificates: + target_key = service_name + + cert_details = certificates.get(target_key) if target_key else None + return (cert_details if isinstance(cert_details, dict) else None, target_key) + + +def _select_host_certificate(certificates: Any, daemon_hostnames: Optional[List[str]] + ) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: + """ + Pick certificate details for a host-scoped certificate. + """ + if not isinstance(certificates, dict) or not certificates: + return (None, None) + + target_key: Optional[str] = None + if daemon_hostnames: + for hostname in daemon_hostnames: + if hostname in certificates: + target_key = hostname + break + if target_key is None: + target_key = next(iter(certificates.keys())) + + cert_details = certificates.get(target_key) + return (cert_details if isinstance(cert_details, dict) else None, target_key) + + +def _select_global_certificate(certificates: Any) -> Optional[Dict[str, Any]]: + """ + Pick certificate details for a global certificate. + """ + if isinstance(certificates, dict) and certificates: + return certificates + return None + + +def _get_certificate_status_for_service(service_type: str, service_name: str, + cert_ls_data: Optional[Dict[str, Any]] = None, + daemon_hostnames: Optional[List[str]] = None + ) -> Dict[str, Any]: + """ + Get certificate status information for a service using REQUIRES_CERTIFICATES mapping. + + :param service_type: The service type (e.g., 'rgw', 'grafana') + :param service_name: The service name (e.g., 'rgw.myzone') + :param cert_ls_data: Optional pre-fetched certificate list data (all certificates) + :param daemon_hostnames: Optional list of hostnames where service daemons run + :return: Dictionary with certificate status information + """ + from ceph.deployment.service_spec import ServiceSpec + + cert_config = ServiceSpec.REQUIRES_CERTIFICATES.get(service_type) + requires_cert = cert_config is not None + + if not requires_cert: + response = _get_certificate_response_template(None, None) + response.requires_certificate = False + return response.to_dict() + + assert cert_config is not None + cert_scope = CertificateScope(cert_config.get('scope', CertificateScope.SERVICE.value).lower()) + + # Find certificate in cert_ls_data - try user-provided first, then cephadm-signed + cert_details, _, cert_name, cert_scope_str = CertificateService.find_certificate_for_service( + cert_ls_data, service_type, service_name, cert_scope, daemon_hostnames + ) + + return CertificateService.build_certificate_status_response( + cert_details, cert_name, cert_scope_str + ) + + +def _find_certificate_in_data( + cert_ls_data: Optional[Dict[str, Any]], + cert_name: str, + cert_scope: CertificateScope, + service_name: str, + daemon_hostnames: Optional[List[str]], +) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: + """ + Helper to locate certificate data inside cert_ls response. + """ + if not cert_ls_data or cert_name not in cert_ls_data: + return (None, None) + + cert_data = cert_ls_data[cert_name] + certificates = cert_data.get('certificates', {}) + if cert_scope == CertificateScope.SERVICE: + return _select_service_certificate(certificates, service_name) + if cert_scope == CertificateScope.HOST: + return _select_host_certificate(certificates, daemon_hostnames) + if cert_scope == CertificateScope.GLOBAL: + return (_select_global_certificate(certificates), None) + return (None, None) + + +class CertificateService: + """ + Certificate service class providing certificate management functionality. + + This class encapsulates all certificate-related operations following the + "Thin Controllers, Fat Services" pattern. + """ + + @staticmethod + def process_certificates_for_list(cert_ls_data: Dict[str, Any] + ) -> List[Dict[str, Any]]: + """ + Process certificate list data and return formatted certificate entries. + + :param cert_ls_data: Certificate list data from cert_ls + :return: List of certificate entry dictionaries + """ + + certificates_list: List[CertificateListEntry] = [] + + for cert_name, cert_data in cert_ls_data.items(): + try: + cert_scope = CertificateScope(str(cert_data.get('scope', 'UNKNOWN')).lower()) + except (ValueError, AttributeError): + cert_scope = CertificateScope.SERVICE + certificates = cert_data.get('certificates', {}) + + if cert_scope == CertificateScope.GLOBAL: + cert_details = certificates if isinstance(certificates, dict) else {} + if not isinstance(cert_details, dict) or 'Error' in cert_details: + continue + certificates_list.append( + _build_certificate_list_entry(cert_name, cert_details, cert_scope) + ) + else: + # For SERVICE and HOST scope, iterate through targets + for target, cert_details in certificates.items(): + if isinstance(cert_details, dict) and 'Error' in cert_details: + continue + if not isinstance(cert_details, dict): + continue + certificates_list.append( + _build_certificate_list_entry(cert_name, cert_details, cert_scope, target) + ) + + return [entry.to_dict() for entry in certificates_list] + + @staticmethod + def find_certificate_for_service(cert_ls_data: Optional[Dict[str, Any]], + service_type: str, service_name: str, + cert_scope: CertificateScope, + daemon_hostnames: Optional[List[str]] = None + ) -> Tuple[Optional[Dict[str, Any]], Optional[str], str, str]: + """ + Find certificate for a service, trying user-provided first, then cephadm-signed. + + :param cert_ls_data: Certificate list data from cert_ls + :param service_type: The service type (e.g., 'rgw', 'grafana') + :param service_name: The service name (e.g., 'rgw.myzone') + :param cert_scope: Certificate scope from config ('SERVICE', 'HOST', or 'GLOBAL') + :param daemon_hostnames: Optional list of hostnames where service daemons run + :return: Tuple of (cert_details, target_key, cert_name, actual_scope) + """ + user_cert_name = f"{service_type.replace('-', '_')}_ssl_cert" + cephadm_cert_name = f"cephadm-signed_{service_type}_cert" + cert_details = None + target_key = None + cert_name = user_cert_name + actual_scope = cert_scope.value.upper() + + # Try user-provided certificate first + if cert_ls_data and user_cert_name in cert_ls_data: + cert_data = cert_ls_data[user_cert_name] + cert_scope_from_data = cert_data.get('scope', cert_scope.value) + try: + cert_scope = CertificateScope(cert_scope_from_data.lower()) + except ValueError: + cert_scope = CertificateScope.SERVICE + actual_scope = cert_scope.value.upper() + cert_details, target_key = _find_certificate_in_data( + cert_ls_data, user_cert_name, cert_scope, service_name, daemon_hostnames) + if cert_details: + cert_name = user_cert_name + + # If user-provided cert not found, try cephadm-signed certificate + if not cert_details and cert_ls_data and cephadm_cert_name in cert_ls_data: + cert_details, target_key = _find_certificate_in_data( + cert_ls_data, cephadm_cert_name, CertificateScope.HOST, + service_name, daemon_hostnames) + if cert_details: + cert_name = cephadm_cert_name + actual_scope = CertificateScope.HOST.value.upper() + + return (cert_details, target_key, cert_name, actual_scope) + + @staticmethod + def fetch_certificates_for_service(orch: OrchClient, service_type: str, + user_cert_name: str, cephadm_cert_name: str + ) -> Dict[str, Any]: + """ + Fetch certificates for a specific service, including missing ones. + + :param orch: Orchestrator client instance + :param service_type: Service type for filter pattern + :param user_cert_name: User-provided certificate name + :param cephadm_cert_name: Cephadm-signed certificate name + :return: Dictionary of certificate data + """ + service_type_for_filter = service_type.replace('-', '_') + filter_pattern = f'name=*{service_type_for_filter}*' + + cert_ls_result = orch.cert_store.cert_ls( + filter_by=filter_pattern, + show_details=True, + include_cephadm_signed=True + ) + cert_ls_data = cert_ls_result or {} + + missing_certs: List[str] = [] + if user_cert_name not in cert_ls_data: + missing_certs.append(user_cert_name) + if cephadm_cert_name not in cert_ls_data: + missing_certs.append(cephadm_cert_name) + + # Fetch any missing certificates individually + for cert_name in missing_certs: + individual_result = orch.cert_store.cert_ls( + filter_by=f'name={cert_name}', + show_details=True, + include_cephadm_signed=True + ) + if individual_result and cert_name in individual_result: + cert_ls_data[cert_name] = individual_result[cert_name] + + return cert_ls_data + + @staticmethod + def get_daemon_hostnames(orch: OrchClient, service_name: str + ) -> Tuple[List[str], Optional[str]]: + """ + Get daemon hostnames for a service. + + :param orch: Orchestrator client instance + :param service_name: Service name + :return: Tuple of (daemon_hostnames list, target_hostname or None) + """ + daemons = orch.services.list_daemons(service_name=service_name) + daemon_hostnames = [d.hostname for d in daemons if d.hostname] + target_hostname = daemon_hostnames[0] if daemon_hostnames else None + return (daemon_hostnames, target_hostname) + + @staticmethod + def build_certificate_status_response(cert_details: Optional[Dict[str, Any]], + cert_name: str, cert_scope_str: str, + target_key: Optional[str] = None, + include_target: bool = False, + include_details: bool = False + ) -> Dict[str, Any]: + """ + Build certificate status response dictionary. + + :param cert_details: Certificate details dict or None + :param cert_name: Certificate name + :param cert_scope_str: Certificate scope + :param target_key: Optional target key (service name or hostname) + :param include_target: Whether to include 'target' field in response + :param include_details: Whether to include detailed 'details' field in response + :return: Dictionary with certificate status information + """ + use_target = target_key if (include_target or (target_key and include_details)) else None + response = _get_certificate_response_template(cert_name, cert_scope_str, use_target) + + if not cert_details: + response.status = CertificateStatus.NOT_CONFIGURED + response.has_certificate = False + return response.to_dict() + + if isinstance(cert_details, dict) and 'Error' in cert_details: + response.status = CertificateStatus.INVALID + response.signed_by = _determine_signed_by(cert_name) + response.has_certificate = True + if include_details: + response.error = cert_details.get('Error') + return response.to_dict() + + cert_info = _extract_certificate_basic_info(cert_details) + remaining_days = cert_info['remaining_days'] + expiry_date = cert_info['expiry_date'] + common_name = cert_info['common_name'] + issuer_str = cert_info['issuer_str'] + + status = _determine_certificate_status(remaining_days) + signed_by = _determine_signed_by(cert_name) + + response.status = status + response.days_to_expiration = remaining_days + response.signed_by = signed_by + response.has_certificate = True + response.certificate_source = 'reference' + response.expiry_date = expiry_date + response.issuer = issuer_str + response.common_name = common_name + + if include_details: + subject = cert_info['subject'] + issuer = cert_info['issuer'] + extensions = cert_details.get('extensions', {}) + san_entries = extensions.get('subjectAltName', {}) + + response.details = { + 'subject': subject, + 'issuer': issuer, + 'san_entries': { + 'dns_names': san_entries.get('DNS', []), + 'ip_addresses': san_entries.get('IP', []) + }, + 'key_type': cert_details.get('public_key', {}).get('key_type'), + 'key_size': cert_details.get('public_key', {}).get('key_size'), + 'validity': { + 'not_before': cert_info['validity'].get('not_before'), + 'not_after': cert_info['validity'].get('not_after'), + 'remaining_days': remaining_days + }, + 'extensions': extensions + } + + return response.to_dict() + + @staticmethod + def enrich_services_with_certificates(orch: Any, services: List[Dict[str, Any]], + cert_ls_data: Dict[str, Any]) -> None: + """ + Enrich a list of services with certificate status information. + + This function modifies the services list in place, adding a 'certificate' + key to each service with its certificate status. + + :param orch: Orchestrator client instance + :param services: List of service dictionaries to enrich (modified in place) + :param cert_ls_data: Certificate list data from cert_ls + """ + daemon_hosts_by_service: Dict[str, List[str]] = defaultdict(list) + for daemon in orch.services.list_daemons(): + service_name = daemon.service_name() + if service_name and daemon.hostname: + daemon_hosts_by_service[service_name].append(daemon.hostname) + + for service in services: + svc_name = service.get('service_name', '') + daemon_hostnames = daemon_hosts_by_service.get(svc_name, []) + + service['certificate'] = _get_certificate_status_for_service( + service.get('service_type', ''), + svc_name, + cert_ls_data, + daemon_hostnames + ) + + @staticmethod + def empty_response() -> Dict[str, Any]: + """ + Build a standard response for services that do not require certificates. + """ + return CertificateStatusResponse( + cert_name=None, + scope=None, + requires_certificate=False + ).to_dict() + + @staticmethod + def fetch_all_certificates(orch: Any, filter_by: str = '', + show_details: bool = True, + include_cephadm_signed: bool = True) -> Dict[str, Any]: + """ + Fetch all certificates from the certificate store. + + :param orch: Orchestrator client instance + :param filter_by: Filter string for certificates (default: '') + :param show_details: Whether to include certificate details (default: True) + :param include_cephadm_signed: Whether to include cephadm-signed certs (default: True) + :return: Dictionary of certificate data + """ + cert_ls_result = orch.cert_store.cert_ls( + filter_by=filter_by, + show_details=show_details, + include_cephadm_signed=include_cephadm_signed + ) + return cert_ls_result or {} diff --git a/src/pybind/mgr/dashboard/services/orchestrator.py b/src/pybind/mgr/dashboard/services/orchestrator.py index 769172b9bfa..6adcfa571bd 100644 --- a/src/pybind/mgr/dashboard/services/orchestrator.py +++ b/src/pybind/mgr/dashboard/services/orchestrator.py @@ -221,6 +221,11 @@ class CertStoreManager(ResourceManager): return self.api.cert_store_get_key(entity, service_name, hostname, no_exception_when_missing=ignore_missing_exception) + @wait_api_result + def cert_ls(self, filter_by: str = '', show_details: bool = False, + include_cephadm_signed: bool = False) -> Dict[str, Any]: + return self.api.cert_store_cert_ls(filter_by, show_details, include_cephadm_signed) + class MonitoringManager(ResourceManager): diff --git a/src/pybind/mgr/dashboard/tests/test_certificate.py b/src/pybind/mgr/dashboard/tests/test_certificate.py new file mode 100644 index 00000000000..2382cefb8fa --- /dev/null +++ b/src/pybind/mgr/dashboard/tests/test_certificate.py @@ -0,0 +1,589 @@ +# -*- coding: utf-8 -*- +import unittest +from datetime import datetime, timedelta +from unittest import mock +from unittest.mock import patch + +from orchestrator import DaemonDescription + +from ..controllers._version import APIVersion +from ..controllers.certificate import Certificate +from ..tests import ControllerTestCase + + +class CertificateTestBase(ControllerTestCase): + """Base class for certificate controller tests with shared helpers.""" + + URL_CERTIFICATE = '/api/service/certificate' + URL_ROOT_CA = '/api/service/certificate/root-ca' + + @classmethod + def setup_server(cls): + # Mock mgr.get_module_option_ex to return default threshold of 30 + # Patch it where it's used in the certificate service + patch('dashboard.services.certificate.mgr.get_module_option_ex', return_value=30).start() + cls.setup_controllers([Certificate]) + + def _setup_fake_client(self, fake_client): + """Helper to set up fake orchestrator client with required mocks.""" + fake_client.available.return_value = True + fake_client.get_missing_features.return_value = [] + return fake_client + + def _create_mock_cert_details(self, remaining_days=100, common_name='test.example.com', + issuer='cephadm', key_type='RSA', key_size=2048): + """Helper to create mock certificate details.""" + not_after = datetime.now() + timedelta(days=remaining_days) + return { + 'validity': { + 'not_before': '2024-01-01T00:00:00Z', + 'not_after': not_after.strftime('%Y-%m-%dT%H:%M:%SZ'), + 'remaining_days': remaining_days + }, + 'subject': { + 'commonName': common_name, + 'CN': common_name + }, + 'issuer': { + 'commonName': issuer, + 'CN': issuer + }, + 'public_key': { + 'key_type': key_type, + 'key_size': key_size + }, + 'extensions': { + 'subjectAltName': { + 'DNS': [common_name, '*.example.com'], + 'IP': ['192.168.1.1'] + } + } + } + + def _create_mock_cert_ls_data(self, cert_name='rgw_ssl_cert', scope='SERVICE', + service_name='rgw.test', cert_details=None): + """Helper to create mock cert_ls data structure.""" + if cert_details is None: + cert_details = self._create_mock_cert_details() + return { + cert_name: { + 'scope': scope, + 'certificates': { + service_name: cert_details + } + } + } + + +class CertificateListControllerTest(CertificateTestBase): + """Tests for the certificate list() endpoint.""" + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_list_certificates_empty(self, instance): + """Test listing certificates when none exist.""" + fake_client = self._setup_fake_client(mock.Mock()) + fake_client.cert_store.cert_ls.return_value = {} + instance.return_value = fake_client + + self._get(self.URL_CERTIFICATE, version=APIVersion(1, 0)) + self.assertStatus(200) + self.assertJsonBody([]) + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_list_certificates_basic(self, instance): + """Test listing certificates with basic data.""" + fake_client = self._setup_fake_client(mock.Mock()) + cert_details = self._create_mock_cert_details(remaining_days=100) + cert_ls_data = self._create_mock_cert_ls_data( + cert_name='rgw_ssl_cert', + scope='SERVICE', + service_name='rgw.test', + cert_details=cert_details + ) + fake_client.cert_store.cert_ls.return_value = cert_ls_data + instance.return_value = fake_client + + self._get(self.URL_CERTIFICATE, version=APIVersion(1, 0)) + self.assertStatus(200) + result = self.json_body() + self.assertEqual(len(result), 1) + self.assertEqual(result[0]['cert_name'], 'rgw_ssl_cert') + self.assertEqual(result[0]['scope'], 'SERVICE') + self.assertEqual(result[0]['status'], 'valid') + self.assertEqual(result[0]['target'], 'rgw.test') + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_list_certificates_with_filters(self, instance): + """Test listing certificates with status and scope filters.""" + fake_client = self._setup_fake_client(mock.Mock()) + cert_details_expiring = self._create_mock_cert_details(remaining_days=10) + + # Mock cert_ls to return filtered data (simulating backend filtering) + # When status=expiring filter is applied, cert_ls returns only expiring certs + cert_ls_data_expiring = { + 'grafana_ssl_cert': { + 'scope': 'SERVICE', + 'certificates': { + 'grafana.test': cert_details_expiring + } + } + } + fake_client.cert_store.cert_ls.return_value = cert_ls_data_expiring + instance.return_value = fake_client + + # Test status filter + self._get('{}?status=expiring'.format(self.URL_CERTIFICATE), + version=APIVersion(1, 0)) + self.assertStatus(200) + result = self.json_body() + self.assertEqual(len(result), 1) + self.assertEqual(result[0]['cert_name'], 'grafana_ssl_cert') + self.assertEqual(result[0]['status'], 'expiring') + # Verify filter_by was passed to cert_ls + call_args = fake_client.cert_store.cert_ls.call_args + self.assertIn('status=expiring', call_args[1]['filter_by']) + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_list_certificates_with_service_name_filter(self, instance): + """Test listing certificates with service name filter.""" + fake_client = self._setup_fake_client(mock.Mock()) + cert_details = self._create_mock_cert_details() + # Mock cert_ls to return filtered data (simulating backend filtering) + # When service_name=rgw filter is applied, cert_ls returns only rgw certs + cert_ls_data = { + 'rgw_ssl_cert': { + 'scope': 'SERVICE', + 'certificates': { + 'rgw.test': cert_details + } + } + } + fake_client.cert_store.cert_ls.return_value = cert_ls_data + instance.return_value = fake_client + + self._get('{}?service_type=rgw'.format(self.URL_CERTIFICATE), + version=APIVersion(1, 0)) + self.assertStatus(200) + result = self.json_body() + self.assertEqual(len(result), 1) + self.assertEqual(result[0]['cert_name'], 'rgw_ssl_cert') + # Verify filter_by was passed to cert_ls + call_args = fake_client.cert_store.cert_ls.call_args + self.assertIn('name=*rgw*', call_args[1]['filter_by']) + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_list_certificates_include_cephadm_signed(self, instance): + """Test listing certificates with include_cephadm_signed parameter.""" + fake_client = self._setup_fake_client(mock.Mock()) + cert_ls_data = { + 'rgw_ssl_cert': { + 'scope': 'SERVICE', + 'certificates': { + 'rgw.test': self._create_mock_cert_details() + } + } + } + fake_client.cert_store.cert_ls.return_value = cert_ls_data + instance.return_value = fake_client + + # Test with include_cephadm_signed=True + self._get('{}?include_cephadm_signed=true'.format(self.URL_CERTIFICATE), + version=APIVersion(1, 0)) + self.assertStatus(200) + fake_client.cert_store.cert_ls.assert_called_with( + filter_by='', + show_details=False, + include_cephadm_signed=True + ) + + # Test with include_cephadm_signed=False + self._get('{}?include_cephadm_signed=false'.format(self.URL_CERTIFICATE), + version=APIVersion(1, 0)) + self.assertStatus(200) + fake_client.cert_store.cert_ls.assert_called_with( + filter_by='', + show_details=False, + include_cephadm_signed=False + ) + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_list_certificates_global_scope(self, instance): + """Test listing certificates with GLOBAL scope.""" + fake_client = self._setup_fake_client(mock.Mock()) + cert_details = self._create_mock_cert_details() + cert_ls_data = { + 'cephadm_root_ca_cert': { + 'scope': 'GLOBAL', + 'certificates': cert_details + } + } + fake_client.cert_store.cert_ls.return_value = cert_ls_data + instance.return_value = fake_client + + self._get(self.URL_CERTIFICATE, version=APIVersion(1, 0)) + self.assertStatus(200) + result = self.json_body() + self.assertEqual(len(result), 1) + self.assertEqual(result[0]['scope'], 'GLOBAL') + self.assertEqual(result[0].get('target'), '') + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_list_certificates_error_handling(self, instance): + """Test error handling when cert_ls fails.""" + fake_client = self._setup_fake_client(mock.Mock()) + fake_client.cert_store.cert_ls.side_effect = RuntimeError('Certificate store error') + instance.return_value = fake_client + + self._get(self.URL_CERTIFICATE, version=APIVersion(1, 0)) + self.assertStatus(500) + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_list_certificates_multiple_scopes(self, instance): + """Test listing certificates with multiple scopes.""" + fake_client = self._setup_fake_client(mock.Mock()) + cert_ls_data = { + 'rgw_ssl_cert': { + 'scope': 'SERVICE', + 'certificates': { + 'rgw.test': self._create_mock_cert_details() + } + }, + 'grafana_ssl_cert': { + 'scope': 'HOST', + 'certificates': { + 'node0': self._create_mock_cert_details() + } + }, + 'cephadm_root_ca_cert': { + 'scope': 'GLOBAL', + 'certificates': self._create_mock_cert_details() + } + } + fake_client.cert_store.cert_ls.return_value = cert_ls_data + instance.return_value = fake_client + + self._get(self.URL_CERTIFICATE, version=APIVersion(1, 0)) + self.assertStatus(200) + result = self.json_body() + self.assertEqual(len(result), 3) + scopes = [cert['scope'] for cert in result] + self.assertIn('SERVICE', scopes) + self.assertIn('HOST', scopes) + self.assertIn('GLOBAL', scopes) + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_list_certificates_combined_filters(self, instance): + """Test listing certificates with combined filters.""" + fake_client = self._setup_fake_client(mock.Mock()) + cert_details = self._create_mock_cert_details(remaining_days=10) + cert_ls_data = { + 'rgw_ssl_cert': { + 'scope': 'SERVICE', + 'certificates': { + 'rgw.test': cert_details + } + } + } + fake_client.cert_store.cert_ls.return_value = cert_ls_data + instance.return_value = fake_client + + self._get('{}?status=expiring&scope=SERVICE&service_type=rgw'.format(self.URL_CERTIFICATE), + version=APIVersion(1, 0)) + self.assertStatus(200) + fake_client.cert_store.cert_ls.assert_called() + # Verify filter_by was constructed correctly + call_args = fake_client.cert_store.cert_ls.call_args + filter_by = call_args[1]['filter_by'] + self.assertIn('status=expiring', filter_by) + self.assertIn('scope=service', filter_by) + self.assertIn('name=*rgw*', filter_by) + + +class CertificateGetControllerTest(CertificateTestBase): + """Tests for the certificate get() endpoint.""" + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_get_certificate_service_not_found(self, instance): + """Test getting certificate for non-existent service.""" + fake_client = self._setup_fake_client(mock.Mock()) + fake_client.services.get.return_value = [None] + instance.return_value = fake_client + + self._get('{}/rgw.nonexistent'.format(self.URL_CERTIFICATE)) + self.assertStatus(500) + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_get_certificate_service_no_cert_required(self, instance): + """Test getting certificate for service that doesn't require certificates.""" + fake_client = self._setup_fake_client(mock.Mock()) + # Create a mock service that doesn't require certificates (e.g., mon) + service = mock.Mock() + service.spec = mock.Mock() + service.spec.service_type = 'mon' + service.spec.service_name = mock.Mock(return_value='mon.test') + fake_client.services.get.return_value = [service] + instance.return_value = fake_client + + self._get('{}/mon.test'.format(self.URL_CERTIFICATE)) + self.assertStatus(200) + result = self.json_body() + self.assertFalse(result['requires_certificate']) + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_get_certificate_not_configured(self, instance): + """Test getting certificate when certificate is not configured.""" + fake_client = self._setup_fake_client(mock.Mock()) + service = mock.Mock() + service.spec = mock.Mock() + service.spec.service_type = 'rgw' + service.spec.service_name = mock.Mock(return_value='rgw.test') + fake_client.services.get.return_value = [service] + fake_client.cert_store.cert_ls.return_value = {} + fake_client.services.list_daemons.return_value = [ + DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0') + ] + instance.return_value = fake_client + + self._get('{}/rgw.test'.format(self.URL_CERTIFICATE)) + self.assertStatus(200) + result = self.json_body() + self.assertEqual(result['status'], 'not_configured') + self.assertFalse(result['has_certificate']) + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_get_certificate_user_provided(self, instance): + """Test getting certificate with user-provided certificate.""" + fake_client = self._setup_fake_client(mock.Mock()) + service = mock.Mock() + service.spec = mock.Mock() + service.spec.service_type = 'rgw' + service.spec.service_name = mock.Mock(return_value='rgw.test') + fake_client.services.get.return_value = [service] + + cert_details = self._create_mock_cert_details(remaining_days=100, common_name='rgw.test') + cert_ls_data = self._create_mock_cert_ls_data( + cert_name='rgw_ssl_cert', + scope='SERVICE', + service_name='rgw.test', + cert_details=cert_details + ) + fake_client.cert_store.cert_ls.return_value = cert_ls_data + fake_client.services.list_daemons.return_value = [ + DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0') + ] + instance.return_value = fake_client + + self._get('{}/rgw.test'.format(self.URL_CERTIFICATE)) + self.assertStatus(200) + result = self.json_body() + self.assertEqual(result['cert_name'], 'rgw_ssl_cert') + self.assertEqual(result['status'], 'valid') + self.assertEqual(result['signed_by'], 'user') + self.assertTrue(result['has_certificate']) + self.assertEqual(result['common_name'], 'rgw.test') + self.assertIn('details', result) + self.assertIn('san_entries', result['details']) + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_get_certificate_cephadm_signed(self, instance): + """Test getting certificate with cephadm-signed certificate.""" + fake_client = self._setup_fake_client(mock.Mock()) + service = mock.Mock() + service.spec = mock.Mock() + service.spec.service_type = 'rgw' + service.spec.service_name = mock.Mock(return_value='rgw.test') + fake_client.services.get.return_value = [service] + + cert_details = self._create_mock_cert_details(remaining_days=100) + cert_ls_data = { + 'cephadm-signed_rgw_cert': { + 'scope': 'HOST', + 'certificates': { + 'node0': cert_details + } + } + } + fake_client.cert_store.cert_ls.return_value = cert_ls_data + fake_client.services.list_daemons.return_value = [ + DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0') + ] + instance.return_value = fake_client + + self._get('{}/rgw.test'.format(self.URL_CERTIFICATE)) + self.assertStatus(200) + result = self.json_body() + self.assertEqual(result['cert_name'], 'cephadm-signed_rgw_cert') + self.assertEqual(result['signed_by'], 'cephadm') + self.assertEqual(result['scope'], 'HOST') + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_get_certificate_expiring(self, instance): + """Test getting certificate that is expiring.""" + fake_client = self._setup_fake_client(mock.Mock()) + service = mock.Mock() + service.spec = mock.Mock() + service.spec.service_type = 'rgw' + service.spec.service_name = mock.Mock(return_value='rgw.test') + fake_client.services.get.return_value = [service] + + cert_details = self._create_mock_cert_details(remaining_days=10) + cert_ls_data = self._create_mock_cert_ls_data( + cert_name='rgw_ssl_cert', + scope='SERVICE', + service_name='rgw.test', + cert_details=cert_details + ) + fake_client.cert_store.cert_ls.return_value = cert_ls_data + fake_client.services.list_daemons.return_value = [ + DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0') + ] + instance.return_value = fake_client + + self._get('{}/rgw.test'.format(self.URL_CERTIFICATE)) + self.assertStatus(200) + result = self.json_body() + self.assertEqual(result['status'], 'expiring') + self.assertEqual(result['days_to_expiration'], 10) + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_get_certificate_expired(self, instance): + """Test getting certificate that is expired.""" + fake_client = self._setup_fake_client(mock.Mock()) + service = mock.Mock() + service.spec = mock.Mock() + service.spec.service_type = 'rgw' + service.spec.service_name = mock.Mock(return_value='rgw.test') + fake_client.services.get.return_value = [service] + + cert_details = self._create_mock_cert_details(remaining_days=-10) + cert_ls_data = self._create_mock_cert_ls_data( + cert_name='rgw_ssl_cert', + scope='SERVICE', + service_name='rgw.test', + cert_details=cert_details + ) + fake_client.cert_store.cert_ls.return_value = cert_ls_data + fake_client.services.list_daemons.return_value = [ + DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0') + ] + instance.return_value = fake_client + + self._get('{}/rgw.test'.format(self.URL_CERTIFICATE)) + self.assertStatus(200) + result = self.json_body() + self.assertEqual(result['status'], 'expired') + self.assertEqual(result['days_to_expiration'], -10) + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_get_certificate_invalid(self, instance): + """Test getting certificate that is invalid.""" + fake_client = self._setup_fake_client(mock.Mock()) + service = mock.Mock() + service.spec = mock.Mock() + service.spec.service_type = 'rgw' + service.spec.service_name = mock.Mock(return_value='rgw.test') + fake_client.services.get.return_value = [service] + + cert_ls_data = { + 'rgw_ssl_cert': { + 'scope': 'SERVICE', + 'certificates': { + 'rgw.test': {'Error': 'Invalid certificate format'} + } + } + } + fake_client.cert_store.cert_ls.return_value = cert_ls_data + fake_client.services.list_daemons.return_value = [ + DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0') + ] + instance.return_value = fake_client + + self._get('{}/rgw.test'.format(self.URL_CERTIFICATE)) + self.assertStatus(200) + result = self.json_body() + self.assertEqual(result['status'], 'invalid') + self.assertTrue(result['has_certificate']) + self.assertIn('error', result) + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_get_certificate_host_scope(self, instance): + """Test getting certificate with HOST scope.""" + fake_client = self._setup_fake_client(mock.Mock()) + service = mock.Mock() + service.spec = mock.Mock() + service.spec.service_type = 'grafana' + service.spec.service_name = mock.Mock(return_value='grafana.test') + fake_client.services.get.return_value = [service] + + cert_details = self._create_mock_cert_details() + cert_ls_data = { + 'grafana_ssl_cert': { + 'scope': 'HOST', + 'certificates': { + 'node0': cert_details + } + } + } + fake_client.cert_store.cert_ls.return_value = cert_ls_data + fake_client.services.list_daemons.return_value = [ + DaemonDescription(daemon_type='grafana', daemon_id='test', hostname='node0') + ] + instance.return_value = fake_client + + self._get('{}/grafana.test'.format(self.URL_CERTIFICATE)) + self.assertStatus(200) + result = self.json_body() + self.assertEqual(result['scope'], 'HOST') + self.assertEqual(result['target'], 'node0') + + +class CertificateRootCAControllerTest(CertificateTestBase): + """Tests for the certificate root_ca() endpoint.""" + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_root_ca_success(self, instance): + """Test getting root CA certificate successfully.""" + fake_client = self._setup_fake_client(mock.Mock()) + root_ca_cert = '-----BEGIN CERTIFICATE-----\nMOCK_ROOT_CA_CERT\n-----END CERTIFICATE-----' + fake_client.cert_store.get_cert.return_value = root_ca_cert + instance.return_value = fake_client + + self._get(self.URL_ROOT_CA) + self.assertStatus(200) + result = self.json_body() + self.assertEqual(result, root_ca_cert) + fake_client.cert_store.get_cert.assert_called_with('cephadm_root_ca_cert') + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_root_ca_not_found(self, instance): + """Test getting root CA certificate when it doesn't exist.""" + fake_client = self._setup_fake_client(mock.Mock()) + fake_client.cert_store.get_cert.return_value = None + instance.return_value = fake_client + + self._get(self.URL_ROOT_CA) + self.assertStatus(404) + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_root_ca_error_handling(self, instance): + """Test error handling when getting root CA certificate fails.""" + fake_client = self._setup_fake_client(mock.Mock()) + fake_client.cert_store.get_cert.side_effect = Exception('Certificate store error') + instance.return_value = fake_client + + self._get(self.URL_ROOT_CA) + self.assertStatus(500) + + @mock.patch('dashboard.controllers.certificate.OrchClient.instance') + def test_root_ca_method_not_available(self, instance): + """Test getting root CA when get_cert method is not available.""" + fake_client = self._setup_fake_client(mock.Mock()) + del fake_client.cert_store.get_cert + instance.return_value = fake_client + + self._get(self.URL_ROOT_CA) + self.assertStatus(500) + + +if __name__ == '__main__': + unittest.main() -- 2.47.3