--- /dev/null
+from typing import Any, Dict, List, Optional
+
+from ceph.deployment.service_spec import ServiceSpec
+
+from ..exceptions import DashboardException
+from ..model.certificate import CEPHADM_ROOT_CA_CERT, CERTIFICATE_LIST_SCHEMA, \
+ CertificateScope, CertificateStatus
+from ..security import Scope
+from ..services.certificate import CertificateService
+from ..services.exception import handle_orchestrator_error
+from ..services.orchestrator import OrchClient, OrchFeature
+from ..tools import str_to_bool
+from . import APIDoc, APIRouter, EndpointDoc, ReadPermission, RESTController
+from .orchestrator import raise_if_no_orchestrator
+
+
+@APIRouter('/service/certificate', Scope.HOSTS)
+@APIDoc("Service Management API", "Service")
+class Certificate(RESTController):
+
+ @EndpointDoc("List All Certificates",
+ parameters={
+ 'status': (str, 'Filter by certificate status '
+ '(e.g., "expired", "expiring", "valid", "invalid")'),
+ 'scope': (str, 'Filter by certificate scope '
+ '(e.g., "service", "host", "global")'),
+ 'service_type': (str, 'Filter by certificate type '
+ '(e.g., "rgw*")'),
+ 'include_cephadm_signed': (bool, 'Include cephadm-signed certificates '
+ 'in the list (default: False)')
+ },
+ responses={200: CERTIFICATE_LIST_SCHEMA})
+ @raise_if_no_orchestrator([OrchFeature.SERVICE_LIST])
+ @ReadPermission
+ @handle_orchestrator_error('certificate')
+ def list(self, status: Optional[str] = None, scope: Optional[str] = None,
+ service_type: Optional[str] = None,
+ include_cephadm_signed: bool = False) -> List[Dict]:
+ """
+ List all certificates configured in the cluster.
+
+ This endpoint returns a list of all certificates managed by certmgr,
+ including both user-provided and cephadm-signed certificates.
+
+ :param status: Filter by certificate status. Valid values: 'expired',
+ 'expiring', 'valid', 'invalid'
+ :param scope: Filter by certificate scope. Valid values: 'SERVICE',
+ 'HOST', 'GLOBAL'
+ :param service_type: Filter by service type. Supports wildcards
+ (e.g., 'rgw*')
+ :param include_cephadm_signed: If True, include cephadm-signed certificates.
+ If False (default), only user-provided certificates are returned.
+ :return: List of certificate objects with their details
+ """
+ orch = OrchClient.instance()
+
+ status_value = None
+ scope_value = None
+
+ if status:
+ try:
+ status_value = CertificateStatus(status.lower()).value
+ except ValueError:
+ valid_vals = ", ".join([s.value for s in CertificateStatus])
+ raise DashboardException(
+ msg=f'Invalid status: {status}. Valid values are: {valid_vals}')
+
+ if scope:
+ try:
+ scope_value = CertificateScope(scope.lower()).value
+ except ValueError:
+ valid_vals = ", ".join([s.value for s in CertificateScope])
+ raise DashboardException(
+ msg=f'Invalid scope: {scope}. Valid values are: {valid_vals}')
+
+ include_cephadm_signed = str_to_bool(include_cephadm_signed)
+
+ filter_parts = []
+ if status_value:
+ filter_parts.append(f'status={status_value}')
+ if scope_value:
+ filter_parts.append(f'scope={scope_value}')
+ if service_type:
+ filter_parts.append(f'name=*{service_type.lower()}*')
+
+ filter_by = ','.join(filter_parts)
+
+ cert_ls_data = CertificateService.fetch_all_certificates(
+ orch, filter_by=filter_by or '', show_details=False,
+ include_cephadm_signed=include_cephadm_signed
+ )
+
+ # Transform certificate data into a list format
+ # Note: Filtering is already done by cert_ls via filter_by parameter
+ certificates_list = CertificateService.process_certificates_for_list(cert_ls_data)
+
+ return certificates_list
+
+ @raise_if_no_orchestrator([OrchFeature.SERVICE_LIST, OrchFeature.DAEMON_LIST])
+ @ReadPermission
+ @handle_orchestrator_error('certificate')
+ def get(self, service_name: str) -> Dict[str, Any]:
+ """
+ Get detailed certificate information for a service.
+
+ :param service_name: The service name, e.g. 'rgw.myzone'.
+ :return: Detailed certificate information including full certificate details
+ """
+ orch = OrchClient.instance()
+
+ # Get service information
+ services = orch.services.get(service_name)
+ if not services:
+ raise DashboardException(
+ msg=f'Service {service_name} not found',
+ http_status_code=404,
+ component='certificate'
+ )
+ service = services[0]
+
+ service_type = service.spec.service_type
+ service_name_full = service.spec.service_name()
+
+ cert_config = ServiceSpec.REQUIRES_CERTIFICATES.get(service_type)
+ if not cert_config:
+ return CertificateService.empty_response()
+
+ user_cert_name = f"{service_type.replace('-', '_')}_ssl_cert"
+ cephadm_cert_name = f"cephadm-signed_{service_type}_cert"
+ cert_scope = CertificateScope(cert_config.get('scope', CertificateScope.SERVICE.value))
+
+ cert_ls_data = CertificateService.fetch_certificates_for_service(
+ orch, service_type, user_cert_name, cephadm_cert_name
+ )
+
+ daemon_hostnames, _ = CertificateService.get_daemon_hostnames(orch, service_name_full)
+
+ # try user-provided first, then cephadm-signed
+ cert_details, target_key, cert_name, cert_scope_str = \
+ CertificateService.find_certificate_for_service(
+ cert_ls_data, service_type, service_name_full, cert_scope, daemon_hostnames
+ )
+
+ return CertificateService.build_certificate_status_response(
+ cert_details, cert_name or user_cert_name, cert_scope_str, target_key,
+ include_target=True, include_details=True
+ )
+
+ @EndpointDoc("Get Root CA Certificate",
+ responses={
+ 200: {
+ 'certificate': (str, 'Root CA certificate in PEM format')
+ }
+ })
+ @RESTController.Collection('GET', path='/root-ca')
+ @raise_if_no_orchestrator([OrchFeature.SERVICE_LIST])
+ @ReadPermission
+ @handle_orchestrator_error('certificate')
+ def root_ca(self) -> Dict[str, str]:
+ """
+ Get the cephadm root CA certificate.
+
+ This endpoint returns the root Certificate Authority (CA) certificate
+ used by cephadm to sign other certificates in the cluster.
+
+ :return: Dictionary with certificate field containing root CA certificate in PEM format
+ """
+ orch = OrchClient.instance()
+
+ root_ca_cert_name = CEPHADM_ROOT_CA_CERT
+
+ root_ca_cert = orch.cert_store.get_cert(root_ca_cert_name)
+
+ if not root_ca_cert:
+ raise DashboardException(
+ msg='Root CA certificate not found',
+ http_status_code=404,
+ component='certificate'
+ )
+
+ return root_ca_cert
from ceph.deployment.service_spec import ServiceSpec
from ..security import Scope
+from ..services.certificate import CertificateService
from ..services.exception import handle_custom_error, handle_orchestrator_error
from ..services.orchestrator import OrchClient, OrchFeature
from . import APIDoc, APIRouter, CreatePermission, DeletePermission, Endpoint, \
orch = OrchClient.instance()
services, count = orch.services.list(service_name=service_name, offset=int(offset),
limit=int(limit), search=search, sort=sort)
- cherrypy.response.headers['X-Total-Count'] = count
+
+ # Get all certificates and enrich services with certificate status
+ cert_ls_data = CertificateService.fetch_all_certificates(orch)
+ CertificateService.enrich_services_with_certificates(orch, services, cert_ls_data)
+
+ cherrypy.response.headers['X-Total-Count'] = str(count)
return services
@raise_if_no_orchestrator([OrchFeature.SERVICE_LIST])
services = orch.services.get(service_name)
if not services:
raise cherrypy.HTTPError(404, 'Service {} not found'.format(service_name))
- return services[0].to_json()
+
+ service = services[0].to_json()
+
+ # Get all certificates and enrich single service
+ cert_ls_data = CertificateService.fetch_all_certificates(orch)
+ CertificateService.enrich_services_with_certificates(orch, [service], cert_ls_data)
+
+ return service
@RESTController.Resource('GET')
@raise_if_no_orchestrator([OrchFeature.DAEMON_LIST])
--- /dev/null
+"""
+Model definitions for certificate API responses.
+
+These classes centralize the enums and response shapes so callers avoid
+constructing ad-hoc dictionaries and can rely on consistent typing.
+"""
+from dataclasses import asdict, dataclass
+from enum import Enum
+from typing import Any, Dict, Optional
+
+
+class CertificateStatus(str, Enum):
+ EXPIRED = 'expired'
+ EXPIRING = 'expiring'
+ VALID = 'valid'
+ INVALID = 'invalid'
+ NOT_CONFIGURED = 'not_configured'
+
+
+class CertificateScope(str, Enum):
+ SERVICE = 'service'
+ HOST = 'host'
+ GLOBAL = 'global'
+
+
+@dataclass
+class CertificateListEntry:
+ STRING_FIELDS = (
+ 'cert_name', 'scope', 'signed_by', 'status',
+ 'expiry_date', 'issuer', 'common_name', 'target'
+ )
+
+ cert_name: str
+ scope: str
+ signed_by: str
+ status: CertificateStatus
+ days_to_expiration: Optional[int]
+ expiry_date: Optional[str]
+ issuer: Optional[str]
+ common_name: Optional[str]
+ target: Optional[str] = None
+
+ def to_dict(self) -> Dict[str, Any]:
+ data = asdict(self)
+ data['status'] = self.status.value
+ for key in self.STRING_FIELDS:
+ if data.get(key) is None:
+ data[key] = ''
+ return data
+
+
+@dataclass
+class CertificateStatusResponse:
+ STRING_FIELDS = (
+ 'cert_name', 'scope', 'status', 'signed_by',
+ 'certificate_source', 'expiry_date', 'issuer',
+ 'common_name', 'target', 'error'
+ )
+ BOOL_FIELDS = ('requires_certificate', 'has_certificate')
+
+ cert_name: Optional[str]
+ scope: Optional[str]
+ requires_certificate: bool = True
+ status: Optional[CertificateStatus] = None
+ days_to_expiration: Optional[int] = None
+ signed_by: Optional[str] = None
+ has_certificate: bool = False
+ certificate_source: Optional[str] = None
+ expiry_date: Optional[str] = None
+ issuer: Optional[str] = None
+ common_name: Optional[str] = None
+ target: Optional[str] = None
+ details: Optional[Dict[str, Any]] = None
+ error: Optional[str] = None
+
+ def to_dict(self) -> Dict[str, Any]:
+ data = asdict(self)
+ data['status'] = self.status.value if self.status else None
+ for key in self.STRING_FIELDS:
+ if data.get(key) is None:
+ data[key] = ''
+ for key in self.BOOL_FIELDS:
+ if data.get(key) is None:
+ data[key] = False
+ return data
+
+
+# Constants shared with cephadm certificate handling
+CEPHADM_ROOT_CA_CERT = 'cephadm_root_ca_cert'
+CEPHADM_SIGNED_CERT = 'cephadm-signed'
+
+
+# Shared schema for certificate list API responses
+CERTIFICATE_LIST_SCHEMA = [{
+ 'cert_name': (str, 'Certificate name'),
+ 'scope': (str, 'Certificate scope (SERVICE, HOST, or GLOBAL)'),
+ 'signed_by': (str, 'Certificate issuer (user or cephadm)'),
+ 'status': (str, 'Certificate status (valid, expiring, expired, invalid)'),
+ 'days_to_expiration': (int, 'Days remaining until expiration'),
+ 'expiry_date': (str, 'Certificate expiration date'),
+ 'issuer': (str, 'Certificate issuer distinguished name'),
+ 'common_name': (str, 'Certificate common name (CN)'),
+ 'target': (str, 'Certificate target (service name or hostname)'),
+}]
- jwt: []
tags:
- Service
+ /api/service/certificate:
+ get:
+ description: "\n List all certificates configured in the cluster.\n\n\
+ \ This endpoint returns a list of all certificates managed by certmgr,\n\
+ \ including both user-provided and cephadm-signed certificates.\n\n\
+ \ :param status: Filter by certificate status. Valid values: 'expired',\n\
+ \ 'expiring', 'valid', 'invalid'\n :param scope: Filter\
+ \ by certificate scope. Valid values: 'SERVICE',\n 'HOST', 'GLOBAL'\n\
+ \ :param service_type: Filter by service type. Supports wildcards\n\
+ \ (e.g., 'rgw*')\n :param include_cephadm_signed: If True,\
+ \ include cephadm-signed certificates.\n If False (default), only\
+ \ user-provided certificates are returned.\n :return: List of certificate\
+ \ objects with their details\n "
+ parameters:
+ - allowEmptyValue: true
+ description: Filter by certificate status (e.g., "expired", "expiring", "valid",
+ "invalid")
+ in: query
+ name: status
+ schema:
+ type: string
+ - allowEmptyValue: true
+ description: Filter by certificate scope (e.g., "service", "host", "global")
+ in: query
+ name: scope
+ schema:
+ type: string
+ - allowEmptyValue: true
+ description: Filter by certificate type (e.g., "rgw*")
+ in: query
+ name: service_type
+ schema:
+ type: string
+ - default: false
+ description: 'Include cephadm-signed certificates in the list (default: False)'
+ in: query
+ name: include_cephadm_signed
+ schema:
+ type: boolean
+ responses:
+ '200':
+ content:
+ application/vnd.ceph.api.v1.0+json:
+ schema:
+ items:
+ properties:
+ cert_name:
+ description: Certificate name
+ type: string
+ common_name:
+ description: Certificate common name (CN)
+ type: string
+ days_to_expiration:
+ description: Days remaining until expiration
+ type: integer
+ expiry_date:
+ description: Certificate expiration date
+ type: string
+ issuer:
+ description: Certificate issuer distinguished name
+ type: string
+ scope:
+ description: Certificate scope (SERVICE, HOST, or GLOBAL)
+ type: string
+ signed_by:
+ description: Certificate issuer (user or cephadm)
+ type: string
+ status:
+ description: Certificate status (valid, expiring, expired, invalid)
+ type: string
+ target:
+ description: Certificate target (service name or hostname)
+ type: string
+ type: object
+ required:
+ - cert_name
+ - scope
+ - signed_by
+ - status
+ - days_to_expiration
+ - expiry_date
+ - issuer
+ - common_name
+ - target
+ type: array
+ description: OK
+ '400':
+ description: Operation exception. Please check the response body for details.
+ '401':
+ description: Unauthenticated access. Please login first.
+ '403':
+ description: Unauthorized access. Please check your permissions.
+ '500':
+ description: Unexpected error. Please check the response body for the stack
+ trace.
+ security:
+ - jwt: []
+ summary: List All Certificates
+ tags:
+ - Service
+ /api/service/certificate/root-ca:
+ get:
+ description: "\n Get the cephadm root CA certificate.\n\n This\
+ \ endpoint returns the root Certificate Authority (CA) certificate\n \
+ \ used by cephadm to sign other certificates in the cluster.\n\n \
+ \ :return: Dictionary with certificate field containing root CA certificate\
+ \ in PEM format\n "
+ parameters: []
+ responses:
+ '200':
+ content:
+ application/vnd.ceph.api.v1.0+json:
+ schema:
+ properties:
+ certificate:
+ description: Root CA certificate in PEM format
+ type: string
+ required:
+ - certificate
+ type: object
+ description: OK
+ '400':
+ description: Operation exception. Please check the response body for details.
+ '401':
+ description: Unauthenticated access. Please login first.
+ '403':
+ description: Unauthorized access. Please check your permissions.
+ '500':
+ description: Unexpected error. Please check the response body for the stack
+ trace.
+ security:
+ - jwt: []
+ summary: Get Root CA Certificate
+ tags:
+ - Service
+ /api/service/certificate/{service_name}:
+ get:
+ description: "\n Get detailed certificate information for a service.\n\
+ \n :param service_name: The service name, e.g. 'rgw.myzone'.\n \
+ \ :return: Detailed certificate information including full certificate\
+ \ details\n "
+ parameters:
+ - in: path
+ name: service_name
+ required: true
+ schema:
+ type: string
+ responses:
+ '200':
+ content:
+ application/vnd.ceph.api.v1.0+json:
+ type: object
+ description: OK
+ '400':
+ description: Operation exception. Please check the response body for details.
+ '401':
+ description: Unauthenticated access. Please login first.
+ '403':
+ description: Unauthorized access. Please check your permissions.
+ '500':
+ description: Unexpected error. Please check the response body for the stack
+ trace.
+ security:
+ - jwt: []
+ tags:
+ - Service
/api/service/known_types:
get:
description: "\n Get a list of known service types, e.g. 'alertmanager',\n\
--- /dev/null
+"""
+Certificate service for dashboard.
+
+This service provides certificate management functionality following the
+"Thin Controllers, Fat Services" pattern. All business logic for certificate
+handling is contained here.
+"""
+from collections import defaultdict
+from typing import Any, Dict, List, Optional, Tuple
+
+from .. import mgr
+from ..model.certificate import CEPHADM_SIGNED_CERT, CertificateListEntry, \
+ CertificateScope, CertificateStatus, CertificateStatusResponse
+from .orchestrator import OrchClient
+
+
+def _get_certificate_renewal_threshold_days() -> int:
+ """
+ Get the certificate renewal threshold days from cephadm config.
+ Falls back to default value of 30 if config cannot be retrieved.
+
+ :return: Number of days before expiration to consider certificate as expiring
+ """
+ threshold = mgr.get_module_option_ex('cephadm', 'certificate_renewal_threshold_days', 30)
+ return int(threshold)
+
+
+def _determine_certificate_status(remaining_days: int) -> CertificateStatus:
+ """
+ Determine certificate status based on remaining days until expiration.
+
+ :param remaining_days: Number of days remaining until certificate expiration
+ :return: Status string (CertificateStatus.EXPIRED, EXPIRING, or VALID)
+ """
+ renewal_threshold = _get_certificate_renewal_threshold_days()
+ if remaining_days < 0:
+ return CertificateStatus.EXPIRED
+ if remaining_days < renewal_threshold:
+ return CertificateStatus.EXPIRING
+ return CertificateStatus.VALID
+
+
+def _extract_certificate_basic_info(cert_details: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ Extract basic certificate information from certificate details.
+
+ :param cert_details: Dictionary containing certificate details
+ :return: Dictionary with extracted information (validity, remaining_days, expiry_date,
+ subject, issuer, common_name, issuer_str)
+ """
+ validity = cert_details.get('validity', {})
+ remaining_days = validity.get('remaining_days', 0)
+ expiry_date = validity.get('not_after')
+
+ subject = cert_details.get('subject', {})
+ issuer = cert_details.get('issuer', {})
+ common_name = subject.get('commonName') or subject.get('CN')
+ issuer_str = (issuer.get('commonName') or issuer.get('CN') or str(issuer)
+ if issuer else None)
+
+ return {
+ 'validity': validity,
+ 'remaining_days': remaining_days,
+ 'expiry_date': expiry_date,
+ 'subject': subject,
+ 'issuer': issuer,
+ 'common_name': common_name,
+ 'issuer_str': issuer_str
+ }
+
+
+def _determine_signed_by(cert_name: str) -> str:
+ """
+ Determine if certificate is signed by cephadm or user based on certificate name.
+
+ :param cert_name: Certificate name
+ :return: 'cephadm' if cephadm-signed, 'user' otherwise
+ """
+ return 'cephadm' if cert_name and CEPHADM_SIGNED_CERT in cert_name else 'user'
+
+
+def _build_certificate_list_entry(cert_name: str, cert_details: Dict[str, Any],
+ cert_scope: CertificateScope, target: Optional[str] = None
+ ) -> CertificateListEntry:
+ """
+ Build a certificate list entry from certificate details.
+
+ :param cert_name: Certificate name
+ :param cert_details: Certificate details dictionary
+ :param cert_scope: Certificate scope ('GLOBAL', 'SERVICE', or 'HOST')
+ :param target: Optional target (service name or hostname)
+ :return: Certificate list entry dictionary
+ """
+ cert_info = _extract_certificate_basic_info(cert_details)
+ remaining_days = cert_info['remaining_days']
+ expiry_date = cert_info['expiry_date']
+ common_name = cert_info['common_name']
+ issuer_str = cert_info['issuer_str']
+
+ status = _determine_certificate_status(remaining_days)
+ signed_by = _determine_signed_by(cert_name)
+
+ return CertificateListEntry(
+ cert_name=cert_name,
+ scope=cert_scope.value.upper(),
+ signed_by=signed_by,
+ status=status,
+ days_to_expiration=remaining_days,
+ expiry_date=expiry_date,
+ issuer=issuer_str,
+ common_name=common_name,
+ target=target
+ )
+
+
+def _get_certificate_response_template(cert_name: Optional[str], cert_scope_str: Optional[str],
+ target_key: Optional[str] = None
+ ) -> CertificateStatusResponse:
+ """
+ Get a certificate response template with all keys initialized.
+
+ :param cert_name: Certificate name (can be None)
+ :param cert_scope_str: Certificate scope (can be None)
+ :param target_key: Optional target key (service name or hostname)
+ :return: Dictionary template with all certificate response keys
+ """
+ return CertificateStatusResponse(
+ cert_name=cert_name,
+ scope=cert_scope_str,
+ target=target_key
+ )
+
+
+def _select_service_certificate(certificates: Any, service_name: str
+ ) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
+ """
+ Pick certificate details for a service-scoped certificate.
+ """
+ if not isinstance(certificates, dict):
+ return (None, None)
+
+ target_key: Optional[str] = None
+ if service_name and service_name in certificates:
+ target_key = service_name
+
+ cert_details = certificates.get(target_key) if target_key else None
+ return (cert_details if isinstance(cert_details, dict) else None, target_key)
+
+
+def _select_host_certificate(certificates: Any, daemon_hostnames: Optional[List[str]]
+ ) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
+ """
+ Pick certificate details for a host-scoped certificate.
+ """
+ if not isinstance(certificates, dict) or not certificates:
+ return (None, None)
+
+ target_key: Optional[str] = None
+ if daemon_hostnames:
+ for hostname in daemon_hostnames:
+ if hostname in certificates:
+ target_key = hostname
+ break
+ if target_key is None:
+ target_key = next(iter(certificates.keys()))
+
+ cert_details = certificates.get(target_key)
+ return (cert_details if isinstance(cert_details, dict) else None, target_key)
+
+
+def _select_global_certificate(certificates: Any) -> Optional[Dict[str, Any]]:
+ """
+ Pick certificate details for a global certificate.
+ """
+ if isinstance(certificates, dict) and certificates:
+ return certificates
+ return None
+
+
+def _get_certificate_status_for_service(service_type: str, service_name: str,
+ cert_ls_data: Optional[Dict[str, Any]] = None,
+ daemon_hostnames: Optional[List[str]] = None
+ ) -> Dict[str, Any]:
+ """
+ Get certificate status information for a service using REQUIRES_CERTIFICATES mapping.
+
+ :param service_type: The service type (e.g., 'rgw', 'grafana')
+ :param service_name: The service name (e.g., 'rgw.myzone')
+ :param cert_ls_data: Optional pre-fetched certificate list data (all certificates)
+ :param daemon_hostnames: Optional list of hostnames where service daemons run
+ :return: Dictionary with certificate status information
+ """
+ from ceph.deployment.service_spec import ServiceSpec
+
+ cert_config = ServiceSpec.REQUIRES_CERTIFICATES.get(service_type)
+ requires_cert = cert_config is not None
+
+ if not requires_cert:
+ response = _get_certificate_response_template(None, None)
+ response.requires_certificate = False
+ return response.to_dict()
+
+ assert cert_config is not None
+ cert_scope = CertificateScope(cert_config.get('scope', CertificateScope.SERVICE.value).lower())
+
+ # Find certificate in cert_ls_data - try user-provided first, then cephadm-signed
+ cert_details, _, cert_name, cert_scope_str = CertificateService.find_certificate_for_service(
+ cert_ls_data, service_type, service_name, cert_scope, daemon_hostnames
+ )
+
+ return CertificateService.build_certificate_status_response(
+ cert_details, cert_name, cert_scope_str
+ )
+
+
+def _find_certificate_in_data(
+ cert_ls_data: Optional[Dict[str, Any]],
+ cert_name: str,
+ cert_scope: CertificateScope,
+ service_name: str,
+ daemon_hostnames: Optional[List[str]],
+) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
+ """
+ Helper to locate certificate data inside cert_ls response.
+ """
+ if not cert_ls_data or cert_name not in cert_ls_data:
+ return (None, None)
+
+ cert_data = cert_ls_data[cert_name]
+ certificates = cert_data.get('certificates', {})
+ if cert_scope == CertificateScope.SERVICE:
+ return _select_service_certificate(certificates, service_name)
+ if cert_scope == CertificateScope.HOST:
+ return _select_host_certificate(certificates, daemon_hostnames)
+ if cert_scope == CertificateScope.GLOBAL:
+ return (_select_global_certificate(certificates), None)
+ return (None, None)
+
+
+class CertificateService:
+ """
+ Certificate service class providing certificate management functionality.
+
+ This class encapsulates all certificate-related operations following the
+ "Thin Controllers, Fat Services" pattern.
+ """
+
+ @staticmethod
+ def process_certificates_for_list(cert_ls_data: Dict[str, Any]
+ ) -> List[Dict[str, Any]]:
+ """
+ Process certificate list data and return formatted certificate entries.
+
+ :param cert_ls_data: Certificate list data from cert_ls
+ :return: List of certificate entry dictionaries
+ """
+
+ certificates_list: List[CertificateListEntry] = []
+
+ for cert_name, cert_data in cert_ls_data.items():
+ try:
+ cert_scope = CertificateScope(str(cert_data.get('scope', 'UNKNOWN')).lower())
+ except (ValueError, AttributeError):
+ cert_scope = CertificateScope.SERVICE
+ certificates = cert_data.get('certificates', {})
+
+ if cert_scope == CertificateScope.GLOBAL:
+ cert_details = certificates if isinstance(certificates, dict) else {}
+ if not isinstance(cert_details, dict) or 'Error' in cert_details:
+ continue
+ certificates_list.append(
+ _build_certificate_list_entry(cert_name, cert_details, cert_scope)
+ )
+ else:
+ # For SERVICE and HOST scope, iterate through targets
+ for target, cert_details in certificates.items():
+ if isinstance(cert_details, dict) and 'Error' in cert_details:
+ continue
+ if not isinstance(cert_details, dict):
+ continue
+ certificates_list.append(
+ _build_certificate_list_entry(cert_name, cert_details, cert_scope, target)
+ )
+
+ return [entry.to_dict() for entry in certificates_list]
+
+ @staticmethod
+ def find_certificate_for_service(cert_ls_data: Optional[Dict[str, Any]],
+ service_type: str, service_name: str,
+ cert_scope: CertificateScope,
+ daemon_hostnames: Optional[List[str]] = None
+ ) -> Tuple[Optional[Dict[str, Any]], Optional[str], str, str]:
+ """
+ Find certificate for a service, trying user-provided first, then cephadm-signed.
+
+ :param cert_ls_data: Certificate list data from cert_ls
+ :param service_type: The service type (e.g., 'rgw', 'grafana')
+ :param service_name: The service name (e.g., 'rgw.myzone')
+ :param cert_scope: Certificate scope from config ('SERVICE', 'HOST', or 'GLOBAL')
+ :param daemon_hostnames: Optional list of hostnames where service daemons run
+ :return: Tuple of (cert_details, target_key, cert_name, actual_scope)
+ """
+ user_cert_name = f"{service_type.replace('-', '_')}_ssl_cert"
+ cephadm_cert_name = f"cephadm-signed_{service_type}_cert"
+ cert_details = None
+ target_key = None
+ cert_name = user_cert_name
+ actual_scope = cert_scope.value.upper()
+
+ # Try user-provided certificate first
+ if cert_ls_data and user_cert_name in cert_ls_data:
+ cert_data = cert_ls_data[user_cert_name]
+ cert_scope_from_data = cert_data.get('scope', cert_scope.value)
+ try:
+ cert_scope = CertificateScope(cert_scope_from_data.lower())
+ except ValueError:
+ cert_scope = CertificateScope.SERVICE
+ actual_scope = cert_scope.value.upper()
+ cert_details, target_key = _find_certificate_in_data(
+ cert_ls_data, user_cert_name, cert_scope, service_name, daemon_hostnames)
+ if cert_details:
+ cert_name = user_cert_name
+
+ # If user-provided cert not found, try cephadm-signed certificate
+ if not cert_details and cert_ls_data and cephadm_cert_name in cert_ls_data:
+ cert_details, target_key = _find_certificate_in_data(
+ cert_ls_data, cephadm_cert_name, CertificateScope.HOST,
+ service_name, daemon_hostnames)
+ if cert_details:
+ cert_name = cephadm_cert_name
+ actual_scope = CertificateScope.HOST.value.upper()
+
+ return (cert_details, target_key, cert_name, actual_scope)
+
+ @staticmethod
+ def fetch_certificates_for_service(orch: OrchClient, service_type: str,
+ user_cert_name: str, cephadm_cert_name: str
+ ) -> Dict[str, Any]:
+ """
+ Fetch certificates for a specific service, including missing ones.
+
+ :param orch: Orchestrator client instance
+ :param service_type: Service type for filter pattern
+ :param user_cert_name: User-provided certificate name
+ :param cephadm_cert_name: Cephadm-signed certificate name
+ :return: Dictionary of certificate data
+ """
+ service_type_for_filter = service_type.replace('-', '_')
+ filter_pattern = f'name=*{service_type_for_filter}*'
+
+ cert_ls_result = orch.cert_store.cert_ls(
+ filter_by=filter_pattern,
+ show_details=True,
+ include_cephadm_signed=True
+ )
+ cert_ls_data = cert_ls_result or {}
+
+ missing_certs: List[str] = []
+ if user_cert_name not in cert_ls_data:
+ missing_certs.append(user_cert_name)
+ if cephadm_cert_name not in cert_ls_data:
+ missing_certs.append(cephadm_cert_name)
+
+ # Fetch any missing certificates individually
+ for cert_name in missing_certs:
+ individual_result = orch.cert_store.cert_ls(
+ filter_by=f'name={cert_name}',
+ show_details=True,
+ include_cephadm_signed=True
+ )
+ if individual_result and cert_name in individual_result:
+ cert_ls_data[cert_name] = individual_result[cert_name]
+
+ return cert_ls_data
+
+ @staticmethod
+ def get_daemon_hostnames(orch: OrchClient, service_name: str
+ ) -> Tuple[List[str], Optional[str]]:
+ """
+ Get daemon hostnames for a service.
+
+ :param orch: Orchestrator client instance
+ :param service_name: Service name
+ :return: Tuple of (daemon_hostnames list, target_hostname or None)
+ """
+ daemons = orch.services.list_daemons(service_name=service_name)
+ daemon_hostnames = [d.hostname for d in daemons if d.hostname]
+ target_hostname = daemon_hostnames[0] if daemon_hostnames else None
+ return (daemon_hostnames, target_hostname)
+
+ @staticmethod
+ def build_certificate_status_response(cert_details: Optional[Dict[str, Any]],
+ cert_name: str, cert_scope_str: str,
+ target_key: Optional[str] = None,
+ include_target: bool = False,
+ include_details: bool = False
+ ) -> Dict[str, Any]:
+ """
+ Build certificate status response dictionary.
+
+ :param cert_details: Certificate details dict or None
+ :param cert_name: Certificate name
+ :param cert_scope_str: Certificate scope
+ :param target_key: Optional target key (service name or hostname)
+ :param include_target: Whether to include 'target' field in response
+ :param include_details: Whether to include detailed 'details' field in response
+ :return: Dictionary with certificate status information
+ """
+ use_target = target_key if (include_target or (target_key and include_details)) else None
+ response = _get_certificate_response_template(cert_name, cert_scope_str, use_target)
+
+ if not cert_details:
+ response.status = CertificateStatus.NOT_CONFIGURED
+ response.has_certificate = False
+ return response.to_dict()
+
+ if isinstance(cert_details, dict) and 'Error' in cert_details:
+ response.status = CertificateStatus.INVALID
+ response.signed_by = _determine_signed_by(cert_name)
+ response.has_certificate = True
+ if include_details:
+ response.error = cert_details.get('Error')
+ return response.to_dict()
+
+ cert_info = _extract_certificate_basic_info(cert_details)
+ remaining_days = cert_info['remaining_days']
+ expiry_date = cert_info['expiry_date']
+ common_name = cert_info['common_name']
+ issuer_str = cert_info['issuer_str']
+
+ status = _determine_certificate_status(remaining_days)
+ signed_by = _determine_signed_by(cert_name)
+
+ response.status = status
+ response.days_to_expiration = remaining_days
+ response.signed_by = signed_by
+ response.has_certificate = True
+ response.certificate_source = 'reference'
+ response.expiry_date = expiry_date
+ response.issuer = issuer_str
+ response.common_name = common_name
+
+ if include_details:
+ subject = cert_info['subject']
+ issuer = cert_info['issuer']
+ extensions = cert_details.get('extensions', {})
+ san_entries = extensions.get('subjectAltName', {})
+
+ response.details = {
+ 'subject': subject,
+ 'issuer': issuer,
+ 'san_entries': {
+ 'dns_names': san_entries.get('DNS', []),
+ 'ip_addresses': san_entries.get('IP', [])
+ },
+ 'key_type': cert_details.get('public_key', {}).get('key_type'),
+ 'key_size': cert_details.get('public_key', {}).get('key_size'),
+ 'validity': {
+ 'not_before': cert_info['validity'].get('not_before'),
+ 'not_after': cert_info['validity'].get('not_after'),
+ 'remaining_days': remaining_days
+ },
+ 'extensions': extensions
+ }
+
+ return response.to_dict()
+
+ @staticmethod
+ def enrich_services_with_certificates(orch: Any, services: List[Dict[str, Any]],
+ cert_ls_data: Dict[str, Any]) -> None:
+ """
+ Enrich a list of services with certificate status information.
+
+ This function modifies the services list in place, adding a 'certificate'
+ key to each service with its certificate status.
+
+ :param orch: Orchestrator client instance
+ :param services: List of service dictionaries to enrich (modified in place)
+ :param cert_ls_data: Certificate list data from cert_ls
+ """
+ daemon_hosts_by_service: Dict[str, List[str]] = defaultdict(list)
+ for daemon in orch.services.list_daemons():
+ service_name = daemon.service_name()
+ if service_name and daemon.hostname:
+ daemon_hosts_by_service[service_name].append(daemon.hostname)
+
+ for service in services:
+ svc_name = service.get('service_name', '')
+ daemon_hostnames = daemon_hosts_by_service.get(svc_name, [])
+
+ service['certificate'] = _get_certificate_status_for_service(
+ service.get('service_type', ''),
+ svc_name,
+ cert_ls_data,
+ daemon_hostnames
+ )
+
+ @staticmethod
+ def empty_response() -> Dict[str, Any]:
+ """
+ Build a standard response for services that do not require certificates.
+ """
+ return CertificateStatusResponse(
+ cert_name=None,
+ scope=None,
+ requires_certificate=False
+ ).to_dict()
+
+ @staticmethod
+ def fetch_all_certificates(orch: Any, filter_by: str = '',
+ show_details: bool = True,
+ include_cephadm_signed: bool = True) -> Dict[str, Any]:
+ """
+ Fetch all certificates from the certificate store.
+
+ :param orch: Orchestrator client instance
+ :param filter_by: Filter string for certificates (default: '')
+ :param show_details: Whether to include certificate details (default: True)
+ :param include_cephadm_signed: Whether to include cephadm-signed certs (default: True)
+ :return: Dictionary of certificate data
+ """
+ cert_ls_result = orch.cert_store.cert_ls(
+ filter_by=filter_by,
+ show_details=show_details,
+ include_cephadm_signed=include_cephadm_signed
+ )
+ return cert_ls_result or {}
return self.api.cert_store_get_key(entity, service_name, hostname,
no_exception_when_missing=ignore_missing_exception)
+ @wait_api_result
+ def cert_ls(self, filter_by: str = '', show_details: bool = False,
+ include_cephadm_signed: bool = False) -> Dict[str, Any]:
+ return self.api.cert_store_cert_ls(filter_by, show_details, include_cephadm_signed)
+
class MonitoringManager(ResourceManager):
--- /dev/null
+# -*- coding: utf-8 -*-
+import unittest
+from datetime import datetime, timedelta
+from unittest import mock
+from unittest.mock import patch
+
+from orchestrator import DaemonDescription
+
+from ..controllers._version import APIVersion
+from ..controllers.certificate import Certificate
+from ..tests import ControllerTestCase
+
+
+class CertificateTestBase(ControllerTestCase):
+ """Base class for certificate controller tests with shared helpers."""
+
+ URL_CERTIFICATE = '/api/service/certificate'
+ URL_ROOT_CA = '/api/service/certificate/root-ca'
+
+ @classmethod
+ def setup_server(cls):
+ # Mock mgr.get_module_option_ex to return default threshold of 30
+ # Patch it where it's used in the certificate service
+ patch('dashboard.services.certificate.mgr.get_module_option_ex', return_value=30).start()
+ cls.setup_controllers([Certificate])
+
+ def _setup_fake_client(self, fake_client):
+ """Helper to set up fake orchestrator client with required mocks."""
+ fake_client.available.return_value = True
+ fake_client.get_missing_features.return_value = []
+ return fake_client
+
+ def _create_mock_cert_details(self, remaining_days=100, common_name='test.example.com',
+ issuer='cephadm', key_type='RSA', key_size=2048):
+ """Helper to create mock certificate details."""
+ not_after = datetime.now() + timedelta(days=remaining_days)
+ return {
+ 'validity': {
+ 'not_before': '2024-01-01T00:00:00Z',
+ 'not_after': not_after.strftime('%Y-%m-%dT%H:%M:%SZ'),
+ 'remaining_days': remaining_days
+ },
+ 'subject': {
+ 'commonName': common_name,
+ 'CN': common_name
+ },
+ 'issuer': {
+ 'commonName': issuer,
+ 'CN': issuer
+ },
+ 'public_key': {
+ 'key_type': key_type,
+ 'key_size': key_size
+ },
+ 'extensions': {
+ 'subjectAltName': {
+ 'DNS': [common_name, '*.example.com'],
+ 'IP': ['192.168.1.1']
+ }
+ }
+ }
+
+ def _create_mock_cert_ls_data(self, cert_name='rgw_ssl_cert', scope='SERVICE',
+ service_name='rgw.test', cert_details=None):
+ """Helper to create mock cert_ls data structure."""
+ if cert_details is None:
+ cert_details = self._create_mock_cert_details()
+ return {
+ cert_name: {
+ 'scope': scope,
+ 'certificates': {
+ service_name: cert_details
+ }
+ }
+ }
+
+
+class CertificateListControllerTest(CertificateTestBase):
+ """Tests for the certificate list() endpoint."""
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_list_certificates_empty(self, instance):
+ """Test listing certificates when none exist."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ fake_client.cert_store.cert_ls.return_value = {}
+ instance.return_value = fake_client
+
+ self._get(self.URL_CERTIFICATE, version=APIVersion(1, 0))
+ self.assertStatus(200)
+ self.assertJsonBody([])
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_list_certificates_basic(self, instance):
+ """Test listing certificates with basic data."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ cert_details = self._create_mock_cert_details(remaining_days=100)
+ cert_ls_data = self._create_mock_cert_ls_data(
+ cert_name='rgw_ssl_cert',
+ scope='SERVICE',
+ service_name='rgw.test',
+ cert_details=cert_details
+ )
+ fake_client.cert_store.cert_ls.return_value = cert_ls_data
+ instance.return_value = fake_client
+
+ self._get(self.URL_CERTIFICATE, version=APIVersion(1, 0))
+ self.assertStatus(200)
+ result = self.json_body()
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0]['cert_name'], 'rgw_ssl_cert')
+ self.assertEqual(result[0]['scope'], 'SERVICE')
+ self.assertEqual(result[0]['status'], 'valid')
+ self.assertEqual(result[0]['target'], 'rgw.test')
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_list_certificates_with_filters(self, instance):
+ """Test listing certificates with status and scope filters."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ cert_details_expiring = self._create_mock_cert_details(remaining_days=10)
+
+ # Mock cert_ls to return filtered data (simulating backend filtering)
+ # When status=expiring filter is applied, cert_ls returns only expiring certs
+ cert_ls_data_expiring = {
+ 'grafana_ssl_cert': {
+ 'scope': 'SERVICE',
+ 'certificates': {
+ 'grafana.test': cert_details_expiring
+ }
+ }
+ }
+ fake_client.cert_store.cert_ls.return_value = cert_ls_data_expiring
+ instance.return_value = fake_client
+
+ # Test status filter
+ self._get('{}?status=expiring'.format(self.URL_CERTIFICATE),
+ version=APIVersion(1, 0))
+ self.assertStatus(200)
+ result = self.json_body()
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0]['cert_name'], 'grafana_ssl_cert')
+ self.assertEqual(result[0]['status'], 'expiring')
+ # Verify filter_by was passed to cert_ls
+ call_args = fake_client.cert_store.cert_ls.call_args
+ self.assertIn('status=expiring', call_args[1]['filter_by'])
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_list_certificates_with_service_name_filter(self, instance):
+ """Test listing certificates with service name filter."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ cert_details = self._create_mock_cert_details()
+ # Mock cert_ls to return filtered data (simulating backend filtering)
+ # When service_name=rgw filter is applied, cert_ls returns only rgw certs
+ cert_ls_data = {
+ 'rgw_ssl_cert': {
+ 'scope': 'SERVICE',
+ 'certificates': {
+ 'rgw.test': cert_details
+ }
+ }
+ }
+ fake_client.cert_store.cert_ls.return_value = cert_ls_data
+ instance.return_value = fake_client
+
+ self._get('{}?service_type=rgw'.format(self.URL_CERTIFICATE),
+ version=APIVersion(1, 0))
+ self.assertStatus(200)
+ result = self.json_body()
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0]['cert_name'], 'rgw_ssl_cert')
+ # Verify filter_by was passed to cert_ls
+ call_args = fake_client.cert_store.cert_ls.call_args
+ self.assertIn('name=*rgw*', call_args[1]['filter_by'])
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_list_certificates_include_cephadm_signed(self, instance):
+ """Test listing certificates with include_cephadm_signed parameter."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ cert_ls_data = {
+ 'rgw_ssl_cert': {
+ 'scope': 'SERVICE',
+ 'certificates': {
+ 'rgw.test': self._create_mock_cert_details()
+ }
+ }
+ }
+ fake_client.cert_store.cert_ls.return_value = cert_ls_data
+ instance.return_value = fake_client
+
+ # Test with include_cephadm_signed=True
+ self._get('{}?include_cephadm_signed=true'.format(self.URL_CERTIFICATE),
+ version=APIVersion(1, 0))
+ self.assertStatus(200)
+ fake_client.cert_store.cert_ls.assert_called_with(
+ filter_by='',
+ show_details=False,
+ include_cephadm_signed=True
+ )
+
+ # Test with include_cephadm_signed=False
+ self._get('{}?include_cephadm_signed=false'.format(self.URL_CERTIFICATE),
+ version=APIVersion(1, 0))
+ self.assertStatus(200)
+ fake_client.cert_store.cert_ls.assert_called_with(
+ filter_by='',
+ show_details=False,
+ include_cephadm_signed=False
+ )
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_list_certificates_global_scope(self, instance):
+ """Test listing certificates with GLOBAL scope."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ cert_details = self._create_mock_cert_details()
+ cert_ls_data = {
+ 'cephadm_root_ca_cert': {
+ 'scope': 'GLOBAL',
+ 'certificates': cert_details
+ }
+ }
+ fake_client.cert_store.cert_ls.return_value = cert_ls_data
+ instance.return_value = fake_client
+
+ self._get(self.URL_CERTIFICATE, version=APIVersion(1, 0))
+ self.assertStatus(200)
+ result = self.json_body()
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0]['scope'], 'GLOBAL')
+ self.assertEqual(result[0].get('target'), '')
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_list_certificates_error_handling(self, instance):
+ """Test error handling when cert_ls fails."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ fake_client.cert_store.cert_ls.side_effect = RuntimeError('Certificate store error')
+ instance.return_value = fake_client
+
+ self._get(self.URL_CERTIFICATE, version=APIVersion(1, 0))
+ self.assertStatus(500)
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_list_certificates_multiple_scopes(self, instance):
+ """Test listing certificates with multiple scopes."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ cert_ls_data = {
+ 'rgw_ssl_cert': {
+ 'scope': 'SERVICE',
+ 'certificates': {
+ 'rgw.test': self._create_mock_cert_details()
+ }
+ },
+ 'grafana_ssl_cert': {
+ 'scope': 'HOST',
+ 'certificates': {
+ 'node0': self._create_mock_cert_details()
+ }
+ },
+ 'cephadm_root_ca_cert': {
+ 'scope': 'GLOBAL',
+ 'certificates': self._create_mock_cert_details()
+ }
+ }
+ fake_client.cert_store.cert_ls.return_value = cert_ls_data
+ instance.return_value = fake_client
+
+ self._get(self.URL_CERTIFICATE, version=APIVersion(1, 0))
+ self.assertStatus(200)
+ result = self.json_body()
+ self.assertEqual(len(result), 3)
+ scopes = [cert['scope'] for cert in result]
+ self.assertIn('SERVICE', scopes)
+ self.assertIn('HOST', scopes)
+ self.assertIn('GLOBAL', scopes)
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_list_certificates_combined_filters(self, instance):
+ """Test listing certificates with combined filters."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ cert_details = self._create_mock_cert_details(remaining_days=10)
+ cert_ls_data = {
+ 'rgw_ssl_cert': {
+ 'scope': 'SERVICE',
+ 'certificates': {
+ 'rgw.test': cert_details
+ }
+ }
+ }
+ fake_client.cert_store.cert_ls.return_value = cert_ls_data
+ instance.return_value = fake_client
+
+ self._get('{}?status=expiring&scope=SERVICE&service_type=rgw'.format(self.URL_CERTIFICATE),
+ version=APIVersion(1, 0))
+ self.assertStatus(200)
+ fake_client.cert_store.cert_ls.assert_called()
+ # Verify filter_by was constructed correctly
+ call_args = fake_client.cert_store.cert_ls.call_args
+ filter_by = call_args[1]['filter_by']
+ self.assertIn('status=expiring', filter_by)
+ self.assertIn('scope=service', filter_by)
+ self.assertIn('name=*rgw*', filter_by)
+
+
+class CertificateGetControllerTest(CertificateTestBase):
+ """Tests for the certificate get() endpoint."""
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_get_certificate_service_not_found(self, instance):
+ """Test getting certificate for non-existent service."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ fake_client.services.get.return_value = [None]
+ instance.return_value = fake_client
+
+ self._get('{}/rgw.nonexistent'.format(self.URL_CERTIFICATE))
+ self.assertStatus(500)
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_get_certificate_service_no_cert_required(self, instance):
+ """Test getting certificate for service that doesn't require certificates."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ # Create a mock service that doesn't require certificates (e.g., mon)
+ service = mock.Mock()
+ service.spec = mock.Mock()
+ service.spec.service_type = 'mon'
+ service.spec.service_name = mock.Mock(return_value='mon.test')
+ fake_client.services.get.return_value = [service]
+ instance.return_value = fake_client
+
+ self._get('{}/mon.test'.format(self.URL_CERTIFICATE))
+ self.assertStatus(200)
+ result = self.json_body()
+ self.assertFalse(result['requires_certificate'])
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_get_certificate_not_configured(self, instance):
+ """Test getting certificate when certificate is not configured."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ service = mock.Mock()
+ service.spec = mock.Mock()
+ service.spec.service_type = 'rgw'
+ service.spec.service_name = mock.Mock(return_value='rgw.test')
+ fake_client.services.get.return_value = [service]
+ fake_client.cert_store.cert_ls.return_value = {}
+ fake_client.services.list_daemons.return_value = [
+ DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0')
+ ]
+ instance.return_value = fake_client
+
+ self._get('{}/rgw.test'.format(self.URL_CERTIFICATE))
+ self.assertStatus(200)
+ result = self.json_body()
+ self.assertEqual(result['status'], 'not_configured')
+ self.assertFalse(result['has_certificate'])
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_get_certificate_user_provided(self, instance):
+ """Test getting certificate with user-provided certificate."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ service = mock.Mock()
+ service.spec = mock.Mock()
+ service.spec.service_type = 'rgw'
+ service.spec.service_name = mock.Mock(return_value='rgw.test')
+ fake_client.services.get.return_value = [service]
+
+ cert_details = self._create_mock_cert_details(remaining_days=100, common_name='rgw.test')
+ cert_ls_data = self._create_mock_cert_ls_data(
+ cert_name='rgw_ssl_cert',
+ scope='SERVICE',
+ service_name='rgw.test',
+ cert_details=cert_details
+ )
+ fake_client.cert_store.cert_ls.return_value = cert_ls_data
+ fake_client.services.list_daemons.return_value = [
+ DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0')
+ ]
+ instance.return_value = fake_client
+
+ self._get('{}/rgw.test'.format(self.URL_CERTIFICATE))
+ self.assertStatus(200)
+ result = self.json_body()
+ self.assertEqual(result['cert_name'], 'rgw_ssl_cert')
+ self.assertEqual(result['status'], 'valid')
+ self.assertEqual(result['signed_by'], 'user')
+ self.assertTrue(result['has_certificate'])
+ self.assertEqual(result['common_name'], 'rgw.test')
+ self.assertIn('details', result)
+ self.assertIn('san_entries', result['details'])
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_get_certificate_cephadm_signed(self, instance):
+ """Test getting certificate with cephadm-signed certificate."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ service = mock.Mock()
+ service.spec = mock.Mock()
+ service.spec.service_type = 'rgw'
+ service.spec.service_name = mock.Mock(return_value='rgw.test')
+ fake_client.services.get.return_value = [service]
+
+ cert_details = self._create_mock_cert_details(remaining_days=100)
+ cert_ls_data = {
+ 'cephadm-signed_rgw_cert': {
+ 'scope': 'HOST',
+ 'certificates': {
+ 'node0': cert_details
+ }
+ }
+ }
+ fake_client.cert_store.cert_ls.return_value = cert_ls_data
+ fake_client.services.list_daemons.return_value = [
+ DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0')
+ ]
+ instance.return_value = fake_client
+
+ self._get('{}/rgw.test'.format(self.URL_CERTIFICATE))
+ self.assertStatus(200)
+ result = self.json_body()
+ self.assertEqual(result['cert_name'], 'cephadm-signed_rgw_cert')
+ self.assertEqual(result['signed_by'], 'cephadm')
+ self.assertEqual(result['scope'], 'HOST')
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_get_certificate_expiring(self, instance):
+ """Test getting certificate that is expiring."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ service = mock.Mock()
+ service.spec = mock.Mock()
+ service.spec.service_type = 'rgw'
+ service.spec.service_name = mock.Mock(return_value='rgw.test')
+ fake_client.services.get.return_value = [service]
+
+ cert_details = self._create_mock_cert_details(remaining_days=10)
+ cert_ls_data = self._create_mock_cert_ls_data(
+ cert_name='rgw_ssl_cert',
+ scope='SERVICE',
+ service_name='rgw.test',
+ cert_details=cert_details
+ )
+ fake_client.cert_store.cert_ls.return_value = cert_ls_data
+ fake_client.services.list_daemons.return_value = [
+ DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0')
+ ]
+ instance.return_value = fake_client
+
+ self._get('{}/rgw.test'.format(self.URL_CERTIFICATE))
+ self.assertStatus(200)
+ result = self.json_body()
+ self.assertEqual(result['status'], 'expiring')
+ self.assertEqual(result['days_to_expiration'], 10)
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_get_certificate_expired(self, instance):
+ """Test getting certificate that is expired."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ service = mock.Mock()
+ service.spec = mock.Mock()
+ service.spec.service_type = 'rgw'
+ service.spec.service_name = mock.Mock(return_value='rgw.test')
+ fake_client.services.get.return_value = [service]
+
+ cert_details = self._create_mock_cert_details(remaining_days=-10)
+ cert_ls_data = self._create_mock_cert_ls_data(
+ cert_name='rgw_ssl_cert',
+ scope='SERVICE',
+ service_name='rgw.test',
+ cert_details=cert_details
+ )
+ fake_client.cert_store.cert_ls.return_value = cert_ls_data
+ fake_client.services.list_daemons.return_value = [
+ DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0')
+ ]
+ instance.return_value = fake_client
+
+ self._get('{}/rgw.test'.format(self.URL_CERTIFICATE))
+ self.assertStatus(200)
+ result = self.json_body()
+ self.assertEqual(result['status'], 'expired')
+ self.assertEqual(result['days_to_expiration'], -10)
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_get_certificate_invalid(self, instance):
+ """Test getting certificate that is invalid."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ service = mock.Mock()
+ service.spec = mock.Mock()
+ service.spec.service_type = 'rgw'
+ service.spec.service_name = mock.Mock(return_value='rgw.test')
+ fake_client.services.get.return_value = [service]
+
+ cert_ls_data = {
+ 'rgw_ssl_cert': {
+ 'scope': 'SERVICE',
+ 'certificates': {
+ 'rgw.test': {'Error': 'Invalid certificate format'}
+ }
+ }
+ }
+ fake_client.cert_store.cert_ls.return_value = cert_ls_data
+ fake_client.services.list_daemons.return_value = [
+ DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0')
+ ]
+ instance.return_value = fake_client
+
+ self._get('{}/rgw.test'.format(self.URL_CERTIFICATE))
+ self.assertStatus(200)
+ result = self.json_body()
+ self.assertEqual(result['status'], 'invalid')
+ self.assertTrue(result['has_certificate'])
+ self.assertIn('error', result)
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_get_certificate_host_scope(self, instance):
+ """Test getting certificate with HOST scope."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ service = mock.Mock()
+ service.spec = mock.Mock()
+ service.spec.service_type = 'grafana'
+ service.spec.service_name = mock.Mock(return_value='grafana.test')
+ fake_client.services.get.return_value = [service]
+
+ cert_details = self._create_mock_cert_details()
+ cert_ls_data = {
+ 'grafana_ssl_cert': {
+ 'scope': 'HOST',
+ 'certificates': {
+ 'node0': cert_details
+ }
+ }
+ }
+ fake_client.cert_store.cert_ls.return_value = cert_ls_data
+ fake_client.services.list_daemons.return_value = [
+ DaemonDescription(daemon_type='grafana', daemon_id='test', hostname='node0')
+ ]
+ instance.return_value = fake_client
+
+ self._get('{}/grafana.test'.format(self.URL_CERTIFICATE))
+ self.assertStatus(200)
+ result = self.json_body()
+ self.assertEqual(result['scope'], 'HOST')
+ self.assertEqual(result['target'], 'node0')
+
+
+class CertificateRootCAControllerTest(CertificateTestBase):
+ """Tests for the certificate root_ca() endpoint."""
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_root_ca_success(self, instance):
+ """Test getting root CA certificate successfully."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ root_ca_cert = '-----BEGIN CERTIFICATE-----\nMOCK_ROOT_CA_CERT\n-----END CERTIFICATE-----'
+ fake_client.cert_store.get_cert.return_value = root_ca_cert
+ instance.return_value = fake_client
+
+ self._get(self.URL_ROOT_CA)
+ self.assertStatus(200)
+ result = self.json_body()
+ self.assertEqual(result, root_ca_cert)
+ fake_client.cert_store.get_cert.assert_called_with('cephadm_root_ca_cert')
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_root_ca_not_found(self, instance):
+ """Test getting root CA certificate when it doesn't exist."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ fake_client.cert_store.get_cert.return_value = None
+ instance.return_value = fake_client
+
+ self._get(self.URL_ROOT_CA)
+ self.assertStatus(404)
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_root_ca_error_handling(self, instance):
+ """Test error handling when getting root CA certificate fails."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ fake_client.cert_store.get_cert.side_effect = Exception('Certificate store error')
+ instance.return_value = fake_client
+
+ self._get(self.URL_ROOT_CA)
+ self.assertStatus(500)
+
+ @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+ def test_root_ca_method_not_available(self, instance):
+ """Test getting root CA when get_cert method is not available."""
+ fake_client = self._setup_fake_client(mock.Mock())
+ del fake_client.cert_store.get_cert
+ instance.return_value = fake_client
+
+ self._get(self.URL_ROOT_CA)
+ self.assertStatus(500)
+
+
+if __name__ == '__main__':
+ unittest.main()