]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/dasboard : Injest certificate mgmt API into services API 66465/head
authorAbhishek Desai <abhishek.desai1@ibm.com>
Mon, 1 Dec 2025 08:00:25 +0000 (13:30 +0530)
committerAbhishek Desai <abhishek.desai1@ibm.com>
Wed, 28 Jan 2026 06:48:39 +0000 (12:18 +0530)
fixes : https://tracker.ceph.com/issues/74039
Signed-off-by: Abhishek Desai <abhishek.desai1@ibm.com>
Assisted-by: Cursor
src/pybind/mgr/dashboard/controllers/certificate.py [new file with mode: 0644]
src/pybind/mgr/dashboard/controllers/service.py
src/pybind/mgr/dashboard/model/certificate.py [new file with mode: 0644]
src/pybind/mgr/dashboard/openapi.yaml
src/pybind/mgr/dashboard/services/certificate.py [new file with mode: 0644]
src/pybind/mgr/dashboard/services/orchestrator.py
src/pybind/mgr/dashboard/tests/test_certificate.py [new file with mode: 0644]

diff --git a/src/pybind/mgr/dashboard/controllers/certificate.py b/src/pybind/mgr/dashboard/controllers/certificate.py
new file mode 100644 (file)
index 0000000..ebcdb9e
--- /dev/null
@@ -0,0 +1,181 @@
+from typing import Any, Dict, List, Optional
+
+from ceph.deployment.service_spec import ServiceSpec
+
+from ..exceptions import DashboardException
+from ..model.certificate import CEPHADM_ROOT_CA_CERT, CERTIFICATE_LIST_SCHEMA, \
+    CertificateScope, CertificateStatus
+from ..security import Scope
+from ..services.certificate import CertificateService
+from ..services.exception import handle_orchestrator_error
+from ..services.orchestrator import OrchClient, OrchFeature
+from ..tools import str_to_bool
+from . import APIDoc, APIRouter, EndpointDoc, ReadPermission, RESTController
+from .orchestrator import raise_if_no_orchestrator
+
+
+@APIRouter('/service/certificate', Scope.HOSTS)
+@APIDoc("Service Management API", "Service")
+class Certificate(RESTController):
+
+    @EndpointDoc("List All Certificates",
+                 parameters={
+                     'status': (str, 'Filter by certificate status '
+                                     '(e.g., "expired", "expiring", "valid", "invalid")'),
+                     'scope': (str, 'Filter by certificate scope '
+                                    '(e.g., "service", "host", "global")'),
+                     'service_type': (str, 'Filter by certificate type '
+                                      '(e.g., "rgw*")'),
+                     'include_cephadm_signed': (bool, 'Include cephadm-signed certificates '
+                                                'in the list (default: False)')
+                 },
+                 responses={200: CERTIFICATE_LIST_SCHEMA})
+    @raise_if_no_orchestrator([OrchFeature.SERVICE_LIST])
+    @ReadPermission
+    @handle_orchestrator_error('certificate')
+    def list(self, status: Optional[str] = None, scope: Optional[str] = None,
+             service_type: Optional[str] = None,
+             include_cephadm_signed: bool = False) -> List[Dict]:
+        """
+        List all certificates configured in the cluster.
+
+        This endpoint returns a list of all certificates managed by certmgr,
+        including both user-provided and cephadm-signed certificates.
+
+        :param status: Filter by certificate status. Valid values: 'expired',
+            'expiring', 'valid', 'invalid'
+        :param scope: Filter by certificate scope. Valid values: 'SERVICE',
+            'HOST', 'GLOBAL'
+        :param service_type: Filter by service type. Supports wildcards
+            (e.g., 'rgw*')
+        :param include_cephadm_signed: If True, include cephadm-signed certificates.
+            If False (default), only user-provided certificates are returned.
+        :return: List of certificate objects with their details
+        """
+        orch = OrchClient.instance()
+
+        status_value = None
+        scope_value = None
+
+        if status:
+            try:
+                status_value = CertificateStatus(status.lower()).value
+            except ValueError:
+                valid_vals = ", ".join([s.value for s in CertificateStatus])
+                raise DashboardException(
+                    msg=f'Invalid status: {status}. Valid values are: {valid_vals}')
+
+        if scope:
+            try:
+                scope_value = CertificateScope(scope.lower()).value
+            except ValueError:
+                valid_vals = ", ".join([s.value for s in CertificateScope])
+                raise DashboardException(
+                    msg=f'Invalid scope: {scope}. Valid values are: {valid_vals}')
+
+        include_cephadm_signed = str_to_bool(include_cephadm_signed)
+
+        filter_parts = []
+        if status_value:
+            filter_parts.append(f'status={status_value}')
+        if scope_value:
+            filter_parts.append(f'scope={scope_value}')
+        if service_type:
+            filter_parts.append(f'name=*{service_type.lower()}*')
+
+        filter_by = ','.join(filter_parts)
+
+        cert_ls_data = CertificateService.fetch_all_certificates(
+            orch, filter_by=filter_by or '', show_details=False,
+            include_cephadm_signed=include_cephadm_signed
+        )
+
+        # Transform certificate data into a list format
+        # Note: Filtering is already done by cert_ls via filter_by parameter
+        certificates_list = CertificateService.process_certificates_for_list(cert_ls_data)
+
+        return certificates_list
+
+    @raise_if_no_orchestrator([OrchFeature.SERVICE_LIST, OrchFeature.DAEMON_LIST])
+    @ReadPermission
+    @handle_orchestrator_error('certificate')
+    def get(self, service_name: str) -> Dict[str, Any]:
+        """
+        Get detailed certificate information for a service.
+
+        :param service_name: The service name, e.g. 'rgw.myzone'.
+        :return: Detailed certificate information including full certificate details
+        """
+        orch = OrchClient.instance()
+
+        # Get service information
+        services = orch.services.get(service_name)
+        if not services:
+            raise DashboardException(
+                msg=f'Service {service_name} not found',
+                http_status_code=404,
+                component='certificate'
+            )
+        service = services[0]
+
+        service_type = service.spec.service_type
+        service_name_full = service.spec.service_name()
+
+        cert_config = ServiceSpec.REQUIRES_CERTIFICATES.get(service_type)
+        if not cert_config:
+            return CertificateService.empty_response()
+
+        user_cert_name = f"{service_type.replace('-', '_')}_ssl_cert"
+        cephadm_cert_name = f"cephadm-signed_{service_type}_cert"
+        cert_scope = CertificateScope(cert_config.get('scope', CertificateScope.SERVICE.value))
+
+        cert_ls_data = CertificateService.fetch_certificates_for_service(
+            orch, service_type, user_cert_name, cephadm_cert_name
+        )
+
+        daemon_hostnames, _ = CertificateService.get_daemon_hostnames(orch, service_name_full)
+
+        # try user-provided first, then cephadm-signed
+        cert_details, target_key, cert_name, cert_scope_str = \
+            CertificateService.find_certificate_for_service(
+                cert_ls_data, service_type, service_name_full, cert_scope, daemon_hostnames
+            )
+
+        return CertificateService.build_certificate_status_response(
+            cert_details, cert_name or user_cert_name, cert_scope_str, target_key,
+            include_target=True, include_details=True
+        )
+
+    @EndpointDoc("Get Root CA Certificate",
+                 responses={
+                     200: {
+                         'certificate': (str, 'Root CA certificate in PEM format')
+                     }
+                 })
+    @RESTController.Collection('GET', path='/root-ca')
+    @raise_if_no_orchestrator([OrchFeature.SERVICE_LIST])
+    @ReadPermission
+    @handle_orchestrator_error('certificate')
+    def root_ca(self) -> Dict[str, str]:
+        """
+        Get the cephadm root CA certificate.
+
+        This endpoint returns the root Certificate Authority (CA) certificate
+        used by cephadm to sign other certificates in the cluster.
+
+        :return: Dictionary with certificate field containing root CA certificate in PEM format
+        """
+        orch = OrchClient.instance()
+
+        root_ca_cert_name = CEPHADM_ROOT_CA_CERT
+
+        root_ca_cert = orch.cert_store.get_cert(root_ca_cert_name)
+
+        if not root_ca_cert:
+            raise DashboardException(
+                msg='Root CA certificate not found',
+                http_status_code=404,
+                component='certificate'
+            )
+
+        return root_ca_cert
index 4e419b413844097e8f49d23088b527e203bef1f0..de265d465a0f4a9b53d604ca4dc41de6b6904490 100644 (file)
@@ -4,6 +4,7 @@ import cherrypy
 from ceph.deployment.service_spec import ServiceSpec
 
 from ..security import Scope
+from ..services.certificate import CertificateService
 from ..services.exception import handle_custom_error, handle_orchestrator_error
 from ..services.orchestrator import OrchClient, OrchFeature
 from . import APIDoc, APIRouter, CreatePermission, DeletePermission, Endpoint, \
@@ -36,7 +37,12 @@ class Service(RESTController):
         orch = OrchClient.instance()
         services, count = orch.services.list(service_name=service_name, offset=int(offset),
                                              limit=int(limit), search=search, sort=sort)
-        cherrypy.response.headers['X-Total-Count'] = count
+
+        # Get all certificates and enrich services with certificate status
+        cert_ls_data = CertificateService.fetch_all_certificates(orch)
+        CertificateService.enrich_services_with_certificates(orch, services, cert_ls_data)
+
+        cherrypy.response.headers['X-Total-Count'] = str(count)
         return services
 
     @raise_if_no_orchestrator([OrchFeature.SERVICE_LIST])
@@ -45,7 +51,14 @@ class Service(RESTController):
         services = orch.services.get(service_name)
         if not services:
             raise cherrypy.HTTPError(404, 'Service {} not found'.format(service_name))
-        return services[0].to_json()
+
+        service = services[0].to_json()
+
+        # Get all certificates and enrich single service
+        cert_ls_data = CertificateService.fetch_all_certificates(orch)
+        CertificateService.enrich_services_with_certificates(orch, [service], cert_ls_data)
+
+        return service
 
     @RESTController.Resource('GET')
     @raise_if_no_orchestrator([OrchFeature.DAEMON_LIST])
diff --git a/src/pybind/mgr/dashboard/model/certificate.py b/src/pybind/mgr/dashboard/model/certificate.py
new file mode 100644 (file)
index 0000000..9684464
--- /dev/null
@@ -0,0 +1,104 @@
+"""
+Model definitions for certificate API responses.
+
+These classes centralize the enums and response shapes so callers avoid
+constructing ad-hoc dictionaries and can rely on consistent typing.
+"""
+from dataclasses import asdict, dataclass
+from enum import Enum
+from typing import Any, Dict, Optional
+
+
+class CertificateStatus(str, Enum):
+    EXPIRED = 'expired'
+    EXPIRING = 'expiring'
+    VALID = 'valid'
+    INVALID = 'invalid'
+    NOT_CONFIGURED = 'not_configured'
+
+
+class CertificateScope(str, Enum):
+    SERVICE = 'service'
+    HOST = 'host'
+    GLOBAL = 'global'
+
+
+@dataclass
+class CertificateListEntry:
+    STRING_FIELDS = (
+        'cert_name', 'scope', 'signed_by', 'status',
+        'expiry_date', 'issuer', 'common_name', 'target'
+    )
+
+    cert_name: str
+    scope: str
+    signed_by: str
+    status: CertificateStatus
+    days_to_expiration: Optional[int]
+    expiry_date: Optional[str]
+    issuer: Optional[str]
+    common_name: Optional[str]
+    target: Optional[str] = None
+
+    def to_dict(self) -> Dict[str, Any]:
+        data = asdict(self)
+        data['status'] = self.status.value
+        for key in self.STRING_FIELDS:
+            if data.get(key) is None:
+                data[key] = ''
+        return data
+
+
+@dataclass
+class CertificateStatusResponse:
+    STRING_FIELDS = (
+        'cert_name', 'scope', 'status', 'signed_by',
+        'certificate_source', 'expiry_date', 'issuer',
+        'common_name', 'target', 'error'
+    )
+    BOOL_FIELDS = ('requires_certificate', 'has_certificate')
+
+    cert_name: Optional[str]
+    scope: Optional[str]
+    requires_certificate: bool = True
+    status: Optional[CertificateStatus] = None
+    days_to_expiration: Optional[int] = None
+    signed_by: Optional[str] = None
+    has_certificate: bool = False
+    certificate_source: Optional[str] = None
+    expiry_date: Optional[str] = None
+    issuer: Optional[str] = None
+    common_name: Optional[str] = None
+    target: Optional[str] = None
+    details: Optional[Dict[str, Any]] = None
+    error: Optional[str] = None
+
+    def to_dict(self) -> Dict[str, Any]:
+        data = asdict(self)
+        data['status'] = self.status.value if self.status else None
+        for key in self.STRING_FIELDS:
+            if data.get(key) is None:
+                data[key] = ''
+        for key in self.BOOL_FIELDS:
+            if data.get(key) is None:
+                data[key] = False
+        return data
+
+
+# Constants shared with cephadm certificate handling
+CEPHADM_ROOT_CA_CERT = 'cephadm_root_ca_cert'
+CEPHADM_SIGNED_CERT = 'cephadm-signed'
+
+
+# Shared schema for certificate list API responses
+CERTIFICATE_LIST_SCHEMA = [{
+    'cert_name': (str, 'Certificate name'),
+    'scope': (str, 'Certificate scope (SERVICE, HOST, or GLOBAL)'),
+    'signed_by': (str, 'Certificate issuer (user or cephadm)'),
+    'status': (str, 'Certificate status (valid, expiring, expired, invalid)'),
+    'days_to_expiration': (int, 'Days remaining until expiration'),
+    'expiry_date': (str, 'Certificate expiration date'),
+    'issuer': (str, 'Certificate issuer distinguished name'),
+    'common_name': (str, 'Certificate common name (CN)'),
+    'target': (str, 'Certificate target (service name or hostname)'),
+}]
index e7d9677ee486ee432f366023d31df911e518a230..e2631047e05990e486844227b7af18e96838160b 100755 (executable)
@@ -17015,6 +17015,172 @@ paths:
       - jwt: []
       tags:
       - Service
+  /api/service/certificate:
+    get:
+      description: "\n        List all certificates configured in the cluster.\n\n\
+        \        This endpoint returns a list of all certificates managed by certmgr,\n\
+        \        including both user-provided and cephadm-signed certificates.\n\n\
+        \        :param status: Filter by certificate status. Valid values: 'expired',\n\
+        \            'expiring', 'valid', 'invalid'\n        :param scope: Filter\
+        \ by certificate scope. Valid values: 'SERVICE',\n            'HOST', 'GLOBAL'\n\
+        \        :param service_type: Filter by service type. Supports wildcards\n\
+        \            (e.g., 'rgw*')\n        :param include_cephadm_signed: If True,\
+        \ include cephadm-signed certificates.\n            If False (default), only\
+        \ user-provided certificates are returned.\n        :return: List of certificate\
+        \ objects with their details\n        "
+      parameters:
+      - allowEmptyValue: true
+        description: Filter by certificate status (e.g., "expired", "expiring", "valid",
+          "invalid")
+        in: query
+        name: status
+        schema:
+          type: string
+      - allowEmptyValue: true
+        description: Filter by certificate scope (e.g., "service", "host", "global")
+        in: query
+        name: scope
+        schema:
+          type: string
+      - allowEmptyValue: true
+        description: Filter by certificate type (e.g., "rgw*")
+        in: query
+        name: service_type
+        schema:
+          type: string
+      - default: false
+        description: 'Include cephadm-signed certificates in the list (default: False)'
+        in: query
+        name: include_cephadm_signed
+        schema:
+          type: boolean
+      responses:
+        '200':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              schema:
+                items:
+                  properties:
+                    cert_name:
+                      description: Certificate name
+                      type: string
+                    common_name:
+                      description: Certificate common name (CN)
+                      type: string
+                    days_to_expiration:
+                      description: Days remaining until expiration
+                      type: integer
+                    expiry_date:
+                      description: Certificate expiration date
+                      type: string
+                    issuer:
+                      description: Certificate issuer distinguished name
+                      type: string
+                    scope:
+                      description: Certificate scope (SERVICE, HOST, or GLOBAL)
+                      type: string
+                    signed_by:
+                      description: Certificate issuer (user or cephadm)
+                      type: string
+                    status:
+                      description: Certificate status (valid, expiring, expired, invalid)
+                      type: string
+                    target:
+                      description: Certificate target (service name or hostname)
+                      type: string
+                  type: object
+                required:
+                - cert_name
+                - scope
+                - signed_by
+                - status
+                - days_to_expiration
+                - expiry_date
+                - issuer
+                - common_name
+                - target
+                type: array
+          description: OK
+        '400':
+          description: Operation exception. Please check the response body for details.
+        '401':
+          description: Unauthenticated access. Please login first.
+        '403':
+          description: Unauthorized access. Please check your permissions.
+        '500':
+          description: Unexpected error. Please check the response body for the stack
+            trace.
+      security:
+      - jwt: []
+      summary: List All Certificates
+      tags:
+      - Service
+  /api/service/certificate/root-ca:
+    get:
+      description: "\n        Get the cephadm root CA certificate.\n\n        This\
+        \ endpoint returns the root Certificate Authority (CA) certificate\n     \
+        \   used by cephadm to sign other certificates in the cluster.\n\n       \
+        \ :return: Dictionary with certificate field containing root CA certificate\
+        \ in PEM format\n        "
+      parameters: []
+      responses:
+        '200':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              schema:
+                properties:
+                  certificate:
+                    description: Root CA certificate in PEM format
+                    type: string
+                required:
+                - certificate
+                type: object
+          description: OK
+        '400':
+          description: Operation exception. Please check the response body for details.
+        '401':
+          description: Unauthenticated access. Please login first.
+        '403':
+          description: Unauthorized access. Please check your permissions.
+        '500':
+          description: Unexpected error. Please check the response body for the stack
+            trace.
+      security:
+      - jwt: []
+      summary: Get Root CA Certificate
+      tags:
+      - Service
+  /api/service/certificate/{service_name}:
+    get:
+      description: "\n        Get detailed certificate information for a service.\n\
+        \n        :param service_name: The service name, e.g. 'rgw.myzone'.\n    \
+        \    :return: Detailed certificate information including full certificate\
+        \ details\n        "
+      parameters:
+      - in: path
+        name: service_name
+        required: true
+        schema:
+          type: string
+      responses:
+        '200':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: OK
+        '400':
+          description: Operation exception. Please check the response body for details.
+        '401':
+          description: Unauthenticated access. Please login first.
+        '403':
+          description: Unauthorized access. Please check your permissions.
+        '500':
+          description: Unexpected error. Please check the response body for the stack
+            trace.
+      security:
+      - jwt: []
+      tags:
+      - Service
   /api/service/known_types:
     get:
       description: "\n        Get a list of known service types, e.g. 'alertmanager',\n\
diff --git a/src/pybind/mgr/dashboard/services/certificate.py b/src/pybind/mgr/dashboard/services/certificate.py
new file mode 100644 (file)
index 0000000..a880d3c
--- /dev/null
@@ -0,0 +1,527 @@
+"""
+Certificate service for dashboard.
+
+This service provides certificate management functionality following the
+"Thin Controllers, Fat Services" pattern. All business logic for certificate
+handling is contained here.
+"""
+from collections import defaultdict
+from typing import Any, Dict, List, Optional, Tuple
+
+from .. import mgr
+from ..model.certificate import CEPHADM_SIGNED_CERT, CertificateListEntry, \
+    CertificateScope, CertificateStatus, CertificateStatusResponse
+from .orchestrator import OrchClient
+
+
+def _get_certificate_renewal_threshold_days() -> int:
+    """
+    Get the certificate renewal threshold days from cephadm config.
+    Falls back to default value of 30 if config cannot be retrieved.
+
+    :return: Number of days before expiration to consider certificate as expiring
+    """
+    threshold = mgr.get_module_option_ex('cephadm', 'certificate_renewal_threshold_days', 30)
+    return int(threshold)
+
+
+def _determine_certificate_status(remaining_days: int) -> CertificateStatus:
+    """
+    Determine certificate status based on remaining days until expiration.
+
+    :param remaining_days: Number of days remaining until certificate expiration
+    :return: Status string (CertificateStatus.EXPIRED, EXPIRING, or VALID)
+    """
+    renewal_threshold = _get_certificate_renewal_threshold_days()
+    if remaining_days < 0:
+        return CertificateStatus.EXPIRED
+    if remaining_days < renewal_threshold:
+        return CertificateStatus.EXPIRING
+    return CertificateStatus.VALID
+
+
+def _extract_certificate_basic_info(cert_details: Dict[str, Any]) -> Dict[str, Any]:
+    """
+    Extract basic certificate information from certificate details.
+
+    :param cert_details: Dictionary containing certificate details
+    :return: Dictionary with extracted information (validity, remaining_days, expiry_date,
+             subject, issuer, common_name, issuer_str)
+    """
+    validity = cert_details.get('validity', {})
+    remaining_days = validity.get('remaining_days', 0)
+    expiry_date = validity.get('not_after')
+
+    subject = cert_details.get('subject', {})
+    issuer = cert_details.get('issuer', {})
+    common_name = subject.get('commonName') or subject.get('CN')
+    issuer_str = (issuer.get('commonName') or issuer.get('CN') or str(issuer)
+                  if issuer else None)
+
+    return {
+        'validity': validity,
+        'remaining_days': remaining_days,
+        'expiry_date': expiry_date,
+        'subject': subject,
+        'issuer': issuer,
+        'common_name': common_name,
+        'issuer_str': issuer_str
+    }
+
+
+def _determine_signed_by(cert_name: str) -> str:
+    """
+    Determine if certificate is signed by cephadm or user based on certificate name.
+
+    :param cert_name: Certificate name
+    :return: 'cephadm' if cephadm-signed, 'user' otherwise
+    """
+    return 'cephadm' if cert_name and CEPHADM_SIGNED_CERT in cert_name else 'user'
+
+
+def _build_certificate_list_entry(cert_name: str, cert_details: Dict[str, Any],
+                                  cert_scope: CertificateScope, target: Optional[str] = None
+                                  ) -> CertificateListEntry:
+    """
+    Build a certificate list entry from certificate details.
+
+    :param cert_name: Certificate name
+    :param cert_details: Certificate details dictionary
+    :param cert_scope: Certificate scope ('GLOBAL', 'SERVICE', or 'HOST')
+    :param target: Optional target (service name or hostname)
+    :return: Certificate list entry dictionary
+    """
+    cert_info = _extract_certificate_basic_info(cert_details)
+    remaining_days = cert_info['remaining_days']
+    expiry_date = cert_info['expiry_date']
+    common_name = cert_info['common_name']
+    issuer_str = cert_info['issuer_str']
+
+    status = _determine_certificate_status(remaining_days)
+    signed_by = _determine_signed_by(cert_name)
+
+    return CertificateListEntry(
+        cert_name=cert_name,
+        scope=cert_scope.value.upper(),
+        signed_by=signed_by,
+        status=status,
+        days_to_expiration=remaining_days,
+        expiry_date=expiry_date,
+        issuer=issuer_str,
+        common_name=common_name,
+        target=target
+    )
+
+
+def _get_certificate_response_template(cert_name: Optional[str], cert_scope_str: Optional[str],
+                                       target_key: Optional[str] = None
+                                       ) -> CertificateStatusResponse:
+    """
+    Get a certificate response template with all keys initialized.
+
+    :param cert_name: Certificate name (can be None)
+    :param cert_scope_str: Certificate scope (can be None)
+    :param target_key: Optional target key (service name or hostname)
+    :return: Dictionary template with all certificate response keys
+    """
+    return CertificateStatusResponse(
+        cert_name=cert_name,
+        scope=cert_scope_str,
+        target=target_key
+    )
+
+
+def _select_service_certificate(certificates: Any, service_name: str
+                                ) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
+    """
+    Pick certificate details for a service-scoped certificate.
+    """
+    if not isinstance(certificates, dict):
+        return (None, None)
+
+    target_key: Optional[str] = None
+    if service_name and service_name in certificates:
+        target_key = service_name
+
+    cert_details = certificates.get(target_key) if target_key else None
+    return (cert_details if isinstance(cert_details, dict) else None, target_key)
+
+
+def _select_host_certificate(certificates: Any, daemon_hostnames: Optional[List[str]]
+                             ) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
+    """
+    Pick certificate details for a host-scoped certificate.
+    """
+    if not isinstance(certificates, dict) or not certificates:
+        return (None, None)
+
+    target_key: Optional[str] = None
+    if daemon_hostnames:
+        for hostname in daemon_hostnames:
+            if hostname in certificates:
+                target_key = hostname
+                break
+    if target_key is None:
+        target_key = next(iter(certificates.keys()))
+
+    cert_details = certificates.get(target_key)
+    return (cert_details if isinstance(cert_details, dict) else None, target_key)
+
+
+def _select_global_certificate(certificates: Any) -> Optional[Dict[str, Any]]:
+    """
+    Pick certificate details for a global certificate.
+    """
+    if isinstance(certificates, dict) and certificates:
+        return certificates
+    return None
+
+
+def _get_certificate_status_for_service(service_type: str, service_name: str,
+                                        cert_ls_data: Optional[Dict[str, Any]] = None,
+                                        daemon_hostnames: Optional[List[str]] = None
+                                        ) -> Dict[str, Any]:
+    """
+    Get certificate status information for a service using REQUIRES_CERTIFICATES mapping.
+
+    :param service_type: The service type (e.g., 'rgw', 'grafana')
+    :param service_name: The service name (e.g., 'rgw.myzone')
+    :param cert_ls_data: Optional pre-fetched certificate list data (all certificates)
+    :param daemon_hostnames: Optional list of hostnames where service daemons run
+    :return: Dictionary with certificate status information
+    """
+    from ceph.deployment.service_spec import ServiceSpec
+
+    cert_config = ServiceSpec.REQUIRES_CERTIFICATES.get(service_type)
+    requires_cert = cert_config is not None
+
+    if not requires_cert:
+        response = _get_certificate_response_template(None, None)
+        response.requires_certificate = False
+        return response.to_dict()
+
+    assert cert_config is not None
+    cert_scope = CertificateScope(cert_config.get('scope', CertificateScope.SERVICE.value).lower())
+
+    # Find certificate in cert_ls_data - try user-provided first, then cephadm-signed
+    cert_details, _, cert_name, cert_scope_str = CertificateService.find_certificate_for_service(
+        cert_ls_data, service_type, service_name, cert_scope, daemon_hostnames
+    )
+
+    return CertificateService.build_certificate_status_response(
+        cert_details, cert_name, cert_scope_str
+    )
+
+
+def _find_certificate_in_data(
+    cert_ls_data: Optional[Dict[str, Any]],
+    cert_name: str,
+    cert_scope: CertificateScope,
+    service_name: str,
+    daemon_hostnames: Optional[List[str]],
+) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
+    """
+    Helper to locate certificate data inside cert_ls response.
+    """
+    if not cert_ls_data or cert_name not in cert_ls_data:
+        return (None, None)
+
+    cert_data = cert_ls_data[cert_name]
+    certificates = cert_data.get('certificates', {})
+    if cert_scope == CertificateScope.SERVICE:
+        return _select_service_certificate(certificates, service_name)
+    if cert_scope == CertificateScope.HOST:
+        return _select_host_certificate(certificates, daemon_hostnames)
+    if cert_scope == CertificateScope.GLOBAL:
+        return (_select_global_certificate(certificates), None)
+    return (None, None)
+
+
+class CertificateService:
+    """
+    Certificate service class providing certificate management functionality.
+
+    This class encapsulates all certificate-related operations following the
+    "Thin Controllers, Fat Services" pattern.
+    """
+
+    @staticmethod
+    def process_certificates_for_list(cert_ls_data: Dict[str, Any]
+                                      ) -> List[Dict[str, Any]]:
+        """
+        Process certificate list data and return formatted certificate entries.
+
+        :param cert_ls_data: Certificate list data from cert_ls
+        :return: List of certificate entry dictionaries
+        """
+
+        certificates_list: List[CertificateListEntry] = []
+
+        for cert_name, cert_data in cert_ls_data.items():
+            try:
+                cert_scope = CertificateScope(str(cert_data.get('scope', 'UNKNOWN')).lower())
+            except (ValueError, AttributeError):
+                cert_scope = CertificateScope.SERVICE
+            certificates = cert_data.get('certificates', {})
+
+            if cert_scope == CertificateScope.GLOBAL:
+                cert_details = certificates if isinstance(certificates, dict) else {}
+                if not isinstance(cert_details, dict) or 'Error' in cert_details:
+                    continue
+                certificates_list.append(
+                    _build_certificate_list_entry(cert_name, cert_details, cert_scope)
+                )
+            else:
+                # For SERVICE and HOST scope, iterate through targets
+                for target, cert_details in certificates.items():
+                    if isinstance(cert_details, dict) and 'Error' in cert_details:
+                        continue
+                    if not isinstance(cert_details, dict):
+                        continue
+                    certificates_list.append(
+                        _build_certificate_list_entry(cert_name, cert_details, cert_scope, target)
+                    )
+
+        return [entry.to_dict() for entry in certificates_list]
+
+    @staticmethod
+    def find_certificate_for_service(cert_ls_data: Optional[Dict[str, Any]],
+                                     service_type: str, service_name: str,
+                                     cert_scope: CertificateScope,
+                                     daemon_hostnames: Optional[List[str]] = None
+                                     ) -> Tuple[Optional[Dict[str, Any]], Optional[str], str, str]:
+        """
+        Find certificate for a service, trying user-provided first, then cephadm-signed.
+
+        :param cert_ls_data: Certificate list data from cert_ls
+        :param service_type: The service type (e.g., 'rgw', 'grafana')
+        :param service_name: The service name (e.g., 'rgw.myzone')
+        :param cert_scope: Certificate scope from config ('SERVICE', 'HOST', or 'GLOBAL')
+        :param daemon_hostnames: Optional list of hostnames where service daemons run
+        :return: Tuple of (cert_details, target_key, cert_name, actual_scope)
+        """
+        user_cert_name = f"{service_type.replace('-', '_')}_ssl_cert"
+        cephadm_cert_name = f"cephadm-signed_{service_type}_cert"
+        cert_details = None
+        target_key = None
+        cert_name = user_cert_name
+        actual_scope = cert_scope.value.upper()
+
+        # Try user-provided certificate first
+        if cert_ls_data and user_cert_name in cert_ls_data:
+            cert_data = cert_ls_data[user_cert_name]
+            cert_scope_from_data = cert_data.get('scope', cert_scope.value)
+            try:
+                cert_scope = CertificateScope(cert_scope_from_data.lower())
+            except ValueError:
+                cert_scope = CertificateScope.SERVICE
+            actual_scope = cert_scope.value.upper()
+            cert_details, target_key = _find_certificate_in_data(
+                cert_ls_data, user_cert_name, cert_scope, service_name, daemon_hostnames)
+            if cert_details:
+                cert_name = user_cert_name
+
+        # If user-provided cert not found, try cephadm-signed certificate
+        if not cert_details and cert_ls_data and cephadm_cert_name in cert_ls_data:
+            cert_details, target_key = _find_certificate_in_data(
+                cert_ls_data, cephadm_cert_name, CertificateScope.HOST,
+                service_name, daemon_hostnames)
+            if cert_details:
+                cert_name = cephadm_cert_name
+                actual_scope = CertificateScope.HOST.value.upper()
+
+        return (cert_details, target_key, cert_name, actual_scope)
+
+    @staticmethod
+    def fetch_certificates_for_service(orch: OrchClient, service_type: str,
+                                       user_cert_name: str, cephadm_cert_name: str
+                                       ) -> Dict[str, Any]:
+        """
+        Fetch certificates for a specific service, including missing ones.
+
+        :param orch: Orchestrator client instance
+        :param service_type: Service type for filter pattern
+        :param user_cert_name: User-provided certificate name
+        :param cephadm_cert_name: Cephadm-signed certificate name
+        :return: Dictionary of certificate data
+        """
+        service_type_for_filter = service_type.replace('-', '_')
+        filter_pattern = f'name=*{service_type_for_filter}*'
+
+        cert_ls_result = orch.cert_store.cert_ls(
+            filter_by=filter_pattern,
+            show_details=True,
+            include_cephadm_signed=True
+        )
+        cert_ls_data = cert_ls_result or {}
+
+        missing_certs: List[str] = []
+        if user_cert_name not in cert_ls_data:
+            missing_certs.append(user_cert_name)
+        if cephadm_cert_name not in cert_ls_data:
+            missing_certs.append(cephadm_cert_name)
+
+        # Fetch any missing certificates individually
+        for cert_name in missing_certs:
+            individual_result = orch.cert_store.cert_ls(
+                filter_by=f'name={cert_name}',
+                show_details=True,
+                include_cephadm_signed=True
+            )
+            if individual_result and cert_name in individual_result:
+                cert_ls_data[cert_name] = individual_result[cert_name]
+
+        return cert_ls_data
+
+    @staticmethod
+    def get_daemon_hostnames(orch: OrchClient, service_name: str
+                             ) -> Tuple[List[str], Optional[str]]:
+        """
+        Get daemon hostnames for a service.
+
+        :param orch: Orchestrator client instance
+        :param service_name: Service name
+        :return: Tuple of (daemon_hostnames list, target_hostname or None)
+        """
+        daemons = orch.services.list_daemons(service_name=service_name)
+        daemon_hostnames = [d.hostname for d in daemons if d.hostname]
+        target_hostname = daemon_hostnames[0] if daemon_hostnames else None
+        return (daemon_hostnames, target_hostname)
+
+    @staticmethod
+    def build_certificate_status_response(cert_details: Optional[Dict[str, Any]],
+                                          cert_name: str, cert_scope_str: str,
+                                          target_key: Optional[str] = None,
+                                          include_target: bool = False,
+                                          include_details: bool = False
+                                          ) -> Dict[str, Any]:
+        """
+        Build certificate status response dictionary.
+
+        :param cert_details: Certificate details dict or None
+        :param cert_name: Certificate name
+        :param cert_scope_str: Certificate scope
+        :param target_key: Optional target key (service name or hostname)
+        :param include_target: Whether to include 'target' field in response
+        :param include_details: Whether to include detailed 'details' field in response
+        :return: Dictionary with certificate status information
+        """
+        use_target = target_key if (include_target or (target_key and include_details)) else None
+        response = _get_certificate_response_template(cert_name, cert_scope_str, use_target)
+
+        if not cert_details:
+            response.status = CertificateStatus.NOT_CONFIGURED
+            response.has_certificate = False
+            return response.to_dict()
+
+        if isinstance(cert_details, dict) and 'Error' in cert_details:
+            response.status = CertificateStatus.INVALID
+            response.signed_by = _determine_signed_by(cert_name)
+            response.has_certificate = True
+            if include_details:
+                response.error = cert_details.get('Error')
+            return response.to_dict()
+
+        cert_info = _extract_certificate_basic_info(cert_details)
+        remaining_days = cert_info['remaining_days']
+        expiry_date = cert_info['expiry_date']
+        common_name = cert_info['common_name']
+        issuer_str = cert_info['issuer_str']
+
+        status = _determine_certificate_status(remaining_days)
+        signed_by = _determine_signed_by(cert_name)
+
+        response.status = status
+        response.days_to_expiration = remaining_days
+        response.signed_by = signed_by
+        response.has_certificate = True
+        response.certificate_source = 'reference'
+        response.expiry_date = expiry_date
+        response.issuer = issuer_str
+        response.common_name = common_name
+
+        if include_details:
+            subject = cert_info['subject']
+            issuer = cert_info['issuer']
+            extensions = cert_details.get('extensions', {})
+            san_entries = extensions.get('subjectAltName', {})
+
+            response.details = {
+                'subject': subject,
+                'issuer': issuer,
+                'san_entries': {
+                    'dns_names': san_entries.get('DNS', []),
+                    'ip_addresses': san_entries.get('IP', [])
+                },
+                'key_type': cert_details.get('public_key', {}).get('key_type'),
+                'key_size': cert_details.get('public_key', {}).get('key_size'),
+                'validity': {
+                    'not_before': cert_info['validity'].get('not_before'),
+                    'not_after': cert_info['validity'].get('not_after'),
+                    'remaining_days': remaining_days
+                },
+                'extensions': extensions
+            }
+
+        return response.to_dict()
+
+    @staticmethod
+    def enrich_services_with_certificates(orch: Any, services: List[Dict[str, Any]],
+                                          cert_ls_data: Dict[str, Any]) -> None:
+        """
+        Enrich a list of services with certificate status information.
+
+        This function modifies the services list in place, adding a 'certificate'
+        key to each service with its certificate status.
+
+        :param orch: Orchestrator client instance
+        :param services: List of service dictionaries to enrich (modified in place)
+        :param cert_ls_data: Certificate list data from cert_ls
+        """
+        daemon_hosts_by_service: Dict[str, List[str]] = defaultdict(list)
+        for daemon in orch.services.list_daemons():
+            service_name = daemon.service_name()
+            if service_name and daemon.hostname:
+                daemon_hosts_by_service[service_name].append(daemon.hostname)
+
+        for service in services:
+            svc_name = service.get('service_name', '')
+            daemon_hostnames = daemon_hosts_by_service.get(svc_name, [])
+
+            service['certificate'] = _get_certificate_status_for_service(
+                service.get('service_type', ''),
+                svc_name,
+                cert_ls_data,
+                daemon_hostnames
+            )
+
+    @staticmethod
+    def empty_response() -> Dict[str, Any]:
+        """
+        Build a standard response for services that do not require certificates.
+        """
+        return CertificateStatusResponse(
+            cert_name=None,
+            scope=None,
+            requires_certificate=False
+        ).to_dict()
+
+    @staticmethod
+    def fetch_all_certificates(orch: Any, filter_by: str = '',
+                               show_details: bool = True,
+                               include_cephadm_signed: bool = True) -> Dict[str, Any]:
+        """
+        Fetch all certificates from the certificate store.
+
+        :param orch: Orchestrator client instance
+        :param filter_by: Filter string for certificates (default: '')
+        :param show_details: Whether to include certificate details (default: True)
+        :param include_cephadm_signed: Whether to include cephadm-signed certs (default: True)
+        :return: Dictionary of certificate data
+        """
+        cert_ls_result = orch.cert_store.cert_ls(
+            filter_by=filter_by,
+            show_details=show_details,
+            include_cephadm_signed=include_cephadm_signed
+        )
+        return cert_ls_result or {}
index 769172b9bfade23655cbaac63f1542af7b7f3eb3..6adcfa571bd0de9430907ad43f21a4d0530cc732 100644 (file)
@@ -221,6 +221,11 @@ class CertStoreManager(ResourceManager):
         return self.api.cert_store_get_key(entity, service_name, hostname,
                                            no_exception_when_missing=ignore_missing_exception)
 
+    @wait_api_result
+    def cert_ls(self, filter_by: str = '', show_details: bool = False,
+                include_cephadm_signed: bool = False) -> Dict[str, Any]:
+        return self.api.cert_store_cert_ls(filter_by, show_details, include_cephadm_signed)
+
 
 class MonitoringManager(ResourceManager):
 
diff --git a/src/pybind/mgr/dashboard/tests/test_certificate.py b/src/pybind/mgr/dashboard/tests/test_certificate.py
new file mode 100644 (file)
index 0000000..2382cef
--- /dev/null
@@ -0,0 +1,589 @@
+# -*- coding: utf-8 -*-
+import unittest
+from datetime import datetime, timedelta
+from unittest import mock
+from unittest.mock import patch
+
+from orchestrator import DaemonDescription
+
+from ..controllers._version import APIVersion
+from ..controllers.certificate import Certificate
+from ..tests import ControllerTestCase
+
+
+class CertificateTestBase(ControllerTestCase):
+    """Base class for certificate controller tests with shared helpers."""
+
+    URL_CERTIFICATE = '/api/service/certificate'
+    URL_ROOT_CA = '/api/service/certificate/root-ca'
+
+    @classmethod
+    def setup_server(cls):
+        # Mock mgr.get_module_option_ex to return default threshold of 30
+        # Patch it where it's used in the certificate service
+        patch('dashboard.services.certificate.mgr.get_module_option_ex', return_value=30).start()
+        cls.setup_controllers([Certificate])
+
+    def _setup_fake_client(self, fake_client):
+        """Helper to set up fake orchestrator client with required mocks."""
+        fake_client.available.return_value = True
+        fake_client.get_missing_features.return_value = []
+        return fake_client
+
+    def _create_mock_cert_details(self, remaining_days=100, common_name='test.example.com',
+                                  issuer='cephadm', key_type='RSA', key_size=2048):
+        """Helper to create mock certificate details."""
+        not_after = datetime.now() + timedelta(days=remaining_days)
+        return {
+            'validity': {
+                'not_before': '2024-01-01T00:00:00Z',
+                'not_after': not_after.strftime('%Y-%m-%dT%H:%M:%SZ'),
+                'remaining_days': remaining_days
+            },
+            'subject': {
+                'commonName': common_name,
+                'CN': common_name
+            },
+            'issuer': {
+                'commonName': issuer,
+                'CN': issuer
+            },
+            'public_key': {
+                'key_type': key_type,
+                'key_size': key_size
+            },
+            'extensions': {
+                'subjectAltName': {
+                    'DNS': [common_name, '*.example.com'],
+                    'IP': ['192.168.1.1']
+                }
+            }
+        }
+
+    def _create_mock_cert_ls_data(self, cert_name='rgw_ssl_cert', scope='SERVICE',
+                                  service_name='rgw.test', cert_details=None):
+        """Helper to create mock cert_ls data structure."""
+        if cert_details is None:
+            cert_details = self._create_mock_cert_details()
+        return {
+            cert_name: {
+                'scope': scope,
+                'certificates': {
+                    service_name: cert_details
+                }
+            }
+        }
+
+
+class CertificateListControllerTest(CertificateTestBase):
+    """Tests for the certificate list() endpoint."""
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_list_certificates_empty(self, instance):
+        """Test listing certificates when none exist."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        fake_client.cert_store.cert_ls.return_value = {}
+        instance.return_value = fake_client
+
+        self._get(self.URL_CERTIFICATE, version=APIVersion(1, 0))
+        self.assertStatus(200)
+        self.assertJsonBody([])
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_list_certificates_basic(self, instance):
+        """Test listing certificates with basic data."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        cert_details = self._create_mock_cert_details(remaining_days=100)
+        cert_ls_data = self._create_mock_cert_ls_data(
+            cert_name='rgw_ssl_cert',
+            scope='SERVICE',
+            service_name='rgw.test',
+            cert_details=cert_details
+        )
+        fake_client.cert_store.cert_ls.return_value = cert_ls_data
+        instance.return_value = fake_client
+
+        self._get(self.URL_CERTIFICATE, version=APIVersion(1, 0))
+        self.assertStatus(200)
+        result = self.json_body()
+        self.assertEqual(len(result), 1)
+        self.assertEqual(result[0]['cert_name'], 'rgw_ssl_cert')
+        self.assertEqual(result[0]['scope'], 'SERVICE')
+        self.assertEqual(result[0]['status'], 'valid')
+        self.assertEqual(result[0]['target'], 'rgw.test')
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_list_certificates_with_filters(self, instance):
+        """Test listing certificates with status and scope filters."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        cert_details_expiring = self._create_mock_cert_details(remaining_days=10)
+
+        # Mock cert_ls to return filtered data (simulating backend filtering)
+        # When status=expiring filter is applied, cert_ls returns only expiring certs
+        cert_ls_data_expiring = {
+            'grafana_ssl_cert': {
+                'scope': 'SERVICE',
+                'certificates': {
+                    'grafana.test': cert_details_expiring
+                }
+            }
+        }
+        fake_client.cert_store.cert_ls.return_value = cert_ls_data_expiring
+        instance.return_value = fake_client
+
+        # Test status filter
+        self._get('{}?status=expiring'.format(self.URL_CERTIFICATE),
+                  version=APIVersion(1, 0))
+        self.assertStatus(200)
+        result = self.json_body()
+        self.assertEqual(len(result), 1)
+        self.assertEqual(result[0]['cert_name'], 'grafana_ssl_cert')
+        self.assertEqual(result[0]['status'], 'expiring')
+        # Verify filter_by was passed to cert_ls
+        call_args = fake_client.cert_store.cert_ls.call_args
+        self.assertIn('status=expiring', call_args[1]['filter_by'])
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_list_certificates_with_service_name_filter(self, instance):
+        """Test listing certificates with service name filter."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        cert_details = self._create_mock_cert_details()
+        # Mock cert_ls to return filtered data (simulating backend filtering)
+        # When service_name=rgw filter is applied, cert_ls returns only rgw certs
+        cert_ls_data = {
+            'rgw_ssl_cert': {
+                'scope': 'SERVICE',
+                'certificates': {
+                    'rgw.test': cert_details
+                }
+            }
+        }
+        fake_client.cert_store.cert_ls.return_value = cert_ls_data
+        instance.return_value = fake_client
+
+        self._get('{}?service_type=rgw'.format(self.URL_CERTIFICATE),
+                  version=APIVersion(1, 0))
+        self.assertStatus(200)
+        result = self.json_body()
+        self.assertEqual(len(result), 1)
+        self.assertEqual(result[0]['cert_name'], 'rgw_ssl_cert')
+        # Verify filter_by was passed to cert_ls
+        call_args = fake_client.cert_store.cert_ls.call_args
+        self.assertIn('name=*rgw*', call_args[1]['filter_by'])
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_list_certificates_include_cephadm_signed(self, instance):
+        """Test listing certificates with include_cephadm_signed parameter."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        cert_ls_data = {
+            'rgw_ssl_cert': {
+                'scope': 'SERVICE',
+                'certificates': {
+                    'rgw.test': self._create_mock_cert_details()
+                }
+            }
+        }
+        fake_client.cert_store.cert_ls.return_value = cert_ls_data
+        instance.return_value = fake_client
+
+        # Test with include_cephadm_signed=True
+        self._get('{}?include_cephadm_signed=true'.format(self.URL_CERTIFICATE),
+                  version=APIVersion(1, 0))
+        self.assertStatus(200)
+        fake_client.cert_store.cert_ls.assert_called_with(
+            filter_by='',
+            show_details=False,
+            include_cephadm_signed=True
+        )
+
+        # Test with include_cephadm_signed=False
+        self._get('{}?include_cephadm_signed=false'.format(self.URL_CERTIFICATE),
+                  version=APIVersion(1, 0))
+        self.assertStatus(200)
+        fake_client.cert_store.cert_ls.assert_called_with(
+            filter_by='',
+            show_details=False,
+            include_cephadm_signed=False
+        )
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_list_certificates_global_scope(self, instance):
+        """Test listing certificates with GLOBAL scope."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        cert_details = self._create_mock_cert_details()
+        cert_ls_data = {
+            'cephadm_root_ca_cert': {
+                'scope': 'GLOBAL',
+                'certificates': cert_details
+            }
+        }
+        fake_client.cert_store.cert_ls.return_value = cert_ls_data
+        instance.return_value = fake_client
+
+        self._get(self.URL_CERTIFICATE, version=APIVersion(1, 0))
+        self.assertStatus(200)
+        result = self.json_body()
+        self.assertEqual(len(result), 1)
+        self.assertEqual(result[0]['scope'], 'GLOBAL')
+        self.assertEqual(result[0].get('target'), '')
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_list_certificates_error_handling(self, instance):
+        """Test error handling when cert_ls fails."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        fake_client.cert_store.cert_ls.side_effect = RuntimeError('Certificate store error')
+        instance.return_value = fake_client
+
+        self._get(self.URL_CERTIFICATE, version=APIVersion(1, 0))
+        self.assertStatus(500)
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_list_certificates_multiple_scopes(self, instance):
+        """Test listing certificates with multiple scopes."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        cert_ls_data = {
+            'rgw_ssl_cert': {
+                'scope': 'SERVICE',
+                'certificates': {
+                    'rgw.test': self._create_mock_cert_details()
+                }
+            },
+            'grafana_ssl_cert': {
+                'scope': 'HOST',
+                'certificates': {
+                    'node0': self._create_mock_cert_details()
+                }
+            },
+            'cephadm_root_ca_cert': {
+                'scope': 'GLOBAL',
+                'certificates': self._create_mock_cert_details()
+            }
+        }
+        fake_client.cert_store.cert_ls.return_value = cert_ls_data
+        instance.return_value = fake_client
+
+        self._get(self.URL_CERTIFICATE, version=APIVersion(1, 0))
+        self.assertStatus(200)
+        result = self.json_body()
+        self.assertEqual(len(result), 3)
+        scopes = [cert['scope'] for cert in result]
+        self.assertIn('SERVICE', scopes)
+        self.assertIn('HOST', scopes)
+        self.assertIn('GLOBAL', scopes)
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_list_certificates_combined_filters(self, instance):
+        """Test listing certificates with combined filters."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        cert_details = self._create_mock_cert_details(remaining_days=10)
+        cert_ls_data = {
+            'rgw_ssl_cert': {
+                'scope': 'SERVICE',
+                'certificates': {
+                    'rgw.test': cert_details
+                }
+            }
+        }
+        fake_client.cert_store.cert_ls.return_value = cert_ls_data
+        instance.return_value = fake_client
+
+        self._get('{}?status=expiring&scope=SERVICE&service_type=rgw'.format(self.URL_CERTIFICATE),
+                  version=APIVersion(1, 0))
+        self.assertStatus(200)
+        fake_client.cert_store.cert_ls.assert_called()
+        # Verify filter_by was constructed correctly
+        call_args = fake_client.cert_store.cert_ls.call_args
+        filter_by = call_args[1]['filter_by']
+        self.assertIn('status=expiring', filter_by)
+        self.assertIn('scope=service', filter_by)
+        self.assertIn('name=*rgw*', filter_by)
+
+
+class CertificateGetControllerTest(CertificateTestBase):
+    """Tests for the certificate get() endpoint."""
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_get_certificate_service_not_found(self, instance):
+        """Test getting certificate for non-existent service."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        fake_client.services.get.return_value = [None]
+        instance.return_value = fake_client
+
+        self._get('{}/rgw.nonexistent'.format(self.URL_CERTIFICATE))
+        self.assertStatus(500)
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_get_certificate_service_no_cert_required(self, instance):
+        """Test getting certificate for service that doesn't require certificates."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        # Create a mock service that doesn't require certificates (e.g., mon)
+        service = mock.Mock()
+        service.spec = mock.Mock()
+        service.spec.service_type = 'mon'
+        service.spec.service_name = mock.Mock(return_value='mon.test')
+        fake_client.services.get.return_value = [service]
+        instance.return_value = fake_client
+
+        self._get('{}/mon.test'.format(self.URL_CERTIFICATE))
+        self.assertStatus(200)
+        result = self.json_body()
+        self.assertFalse(result['requires_certificate'])
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_get_certificate_not_configured(self, instance):
+        """Test getting certificate when certificate is not configured."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        service = mock.Mock()
+        service.spec = mock.Mock()
+        service.spec.service_type = 'rgw'
+        service.spec.service_name = mock.Mock(return_value='rgw.test')
+        fake_client.services.get.return_value = [service]
+        fake_client.cert_store.cert_ls.return_value = {}
+        fake_client.services.list_daemons.return_value = [
+            DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0')
+        ]
+        instance.return_value = fake_client
+
+        self._get('{}/rgw.test'.format(self.URL_CERTIFICATE))
+        self.assertStatus(200)
+        result = self.json_body()
+        self.assertEqual(result['status'], 'not_configured')
+        self.assertFalse(result['has_certificate'])
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_get_certificate_user_provided(self, instance):
+        """Test getting certificate with user-provided certificate."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        service = mock.Mock()
+        service.spec = mock.Mock()
+        service.spec.service_type = 'rgw'
+        service.spec.service_name = mock.Mock(return_value='rgw.test')
+        fake_client.services.get.return_value = [service]
+
+        cert_details = self._create_mock_cert_details(remaining_days=100, common_name='rgw.test')
+        cert_ls_data = self._create_mock_cert_ls_data(
+            cert_name='rgw_ssl_cert',
+            scope='SERVICE',
+            service_name='rgw.test',
+            cert_details=cert_details
+        )
+        fake_client.cert_store.cert_ls.return_value = cert_ls_data
+        fake_client.services.list_daemons.return_value = [
+            DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0')
+        ]
+        instance.return_value = fake_client
+
+        self._get('{}/rgw.test'.format(self.URL_CERTIFICATE))
+        self.assertStatus(200)
+        result = self.json_body()
+        self.assertEqual(result['cert_name'], 'rgw_ssl_cert')
+        self.assertEqual(result['status'], 'valid')
+        self.assertEqual(result['signed_by'], 'user')
+        self.assertTrue(result['has_certificate'])
+        self.assertEqual(result['common_name'], 'rgw.test')
+        self.assertIn('details', result)
+        self.assertIn('san_entries', result['details'])
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_get_certificate_cephadm_signed(self, instance):
+        """Test getting certificate with cephadm-signed certificate."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        service = mock.Mock()
+        service.spec = mock.Mock()
+        service.spec.service_type = 'rgw'
+        service.spec.service_name = mock.Mock(return_value='rgw.test')
+        fake_client.services.get.return_value = [service]
+
+        cert_details = self._create_mock_cert_details(remaining_days=100)
+        cert_ls_data = {
+            'cephadm-signed_rgw_cert': {
+                'scope': 'HOST',
+                'certificates': {
+                    'node0': cert_details
+                }
+            }
+        }
+        fake_client.cert_store.cert_ls.return_value = cert_ls_data
+        fake_client.services.list_daemons.return_value = [
+            DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0')
+        ]
+        instance.return_value = fake_client
+
+        self._get('{}/rgw.test'.format(self.URL_CERTIFICATE))
+        self.assertStatus(200)
+        result = self.json_body()
+        self.assertEqual(result['cert_name'], 'cephadm-signed_rgw_cert')
+        self.assertEqual(result['signed_by'], 'cephadm')
+        self.assertEqual(result['scope'], 'HOST')
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_get_certificate_expiring(self, instance):
+        """Test getting certificate that is expiring."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        service = mock.Mock()
+        service.spec = mock.Mock()
+        service.spec.service_type = 'rgw'
+        service.spec.service_name = mock.Mock(return_value='rgw.test')
+        fake_client.services.get.return_value = [service]
+
+        cert_details = self._create_mock_cert_details(remaining_days=10)
+        cert_ls_data = self._create_mock_cert_ls_data(
+            cert_name='rgw_ssl_cert',
+            scope='SERVICE',
+            service_name='rgw.test',
+            cert_details=cert_details
+        )
+        fake_client.cert_store.cert_ls.return_value = cert_ls_data
+        fake_client.services.list_daemons.return_value = [
+            DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0')
+        ]
+        instance.return_value = fake_client
+
+        self._get('{}/rgw.test'.format(self.URL_CERTIFICATE))
+        self.assertStatus(200)
+        result = self.json_body()
+        self.assertEqual(result['status'], 'expiring')
+        self.assertEqual(result['days_to_expiration'], 10)
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_get_certificate_expired(self, instance):
+        """Test getting certificate that is expired."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        service = mock.Mock()
+        service.spec = mock.Mock()
+        service.spec.service_type = 'rgw'
+        service.spec.service_name = mock.Mock(return_value='rgw.test')
+        fake_client.services.get.return_value = [service]
+
+        cert_details = self._create_mock_cert_details(remaining_days=-10)
+        cert_ls_data = self._create_mock_cert_ls_data(
+            cert_name='rgw_ssl_cert',
+            scope='SERVICE',
+            service_name='rgw.test',
+            cert_details=cert_details
+        )
+        fake_client.cert_store.cert_ls.return_value = cert_ls_data
+        fake_client.services.list_daemons.return_value = [
+            DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0')
+        ]
+        instance.return_value = fake_client
+
+        self._get('{}/rgw.test'.format(self.URL_CERTIFICATE))
+        self.assertStatus(200)
+        result = self.json_body()
+        self.assertEqual(result['status'], 'expired')
+        self.assertEqual(result['days_to_expiration'], -10)
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_get_certificate_invalid(self, instance):
+        """Test getting certificate that is invalid."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        service = mock.Mock()
+        service.spec = mock.Mock()
+        service.spec.service_type = 'rgw'
+        service.spec.service_name = mock.Mock(return_value='rgw.test')
+        fake_client.services.get.return_value = [service]
+
+        cert_ls_data = {
+            'rgw_ssl_cert': {
+                'scope': 'SERVICE',
+                'certificates': {
+                    'rgw.test': {'Error': 'Invalid certificate format'}
+                }
+            }
+        }
+        fake_client.cert_store.cert_ls.return_value = cert_ls_data
+        fake_client.services.list_daemons.return_value = [
+            DaemonDescription(daemon_type='rgw', daemon_id='test', hostname='node0')
+        ]
+        instance.return_value = fake_client
+
+        self._get('{}/rgw.test'.format(self.URL_CERTIFICATE))
+        self.assertStatus(200)
+        result = self.json_body()
+        self.assertEqual(result['status'], 'invalid')
+        self.assertTrue(result['has_certificate'])
+        self.assertIn('error', result)
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_get_certificate_host_scope(self, instance):
+        """Test getting certificate with HOST scope."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        service = mock.Mock()
+        service.spec = mock.Mock()
+        service.spec.service_type = 'grafana'
+        service.spec.service_name = mock.Mock(return_value='grafana.test')
+        fake_client.services.get.return_value = [service]
+
+        cert_details = self._create_mock_cert_details()
+        cert_ls_data = {
+            'grafana_ssl_cert': {
+                'scope': 'HOST',
+                'certificates': {
+                    'node0': cert_details
+                }
+            }
+        }
+        fake_client.cert_store.cert_ls.return_value = cert_ls_data
+        fake_client.services.list_daemons.return_value = [
+            DaemonDescription(daemon_type='grafana', daemon_id='test', hostname='node0')
+        ]
+        instance.return_value = fake_client
+
+        self._get('{}/grafana.test'.format(self.URL_CERTIFICATE))
+        self.assertStatus(200)
+        result = self.json_body()
+        self.assertEqual(result['scope'], 'HOST')
+        self.assertEqual(result['target'], 'node0')
+
+
+class CertificateRootCAControllerTest(CertificateTestBase):
+    """Tests for the certificate root_ca() endpoint."""
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_root_ca_success(self, instance):
+        """Test getting root CA certificate successfully."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        root_ca_cert = '-----BEGIN CERTIFICATE-----\nMOCK_ROOT_CA_CERT\n-----END CERTIFICATE-----'
+        fake_client.cert_store.get_cert.return_value = root_ca_cert
+        instance.return_value = fake_client
+
+        self._get(self.URL_ROOT_CA)
+        self.assertStatus(200)
+        result = self.json_body()
+        self.assertEqual(result, root_ca_cert)
+        fake_client.cert_store.get_cert.assert_called_with('cephadm_root_ca_cert')
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_root_ca_not_found(self, instance):
+        """Test getting root CA certificate when it doesn't exist."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        fake_client.cert_store.get_cert.return_value = None
+        instance.return_value = fake_client
+
+        self._get(self.URL_ROOT_CA)
+        self.assertStatus(404)
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_root_ca_error_handling(self, instance):
+        """Test error handling when getting root CA certificate fails."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        fake_client.cert_store.get_cert.side_effect = Exception('Certificate store error')
+        instance.return_value = fake_client
+
+        self._get(self.URL_ROOT_CA)
+        self.assertStatus(500)
+
+    @mock.patch('dashboard.controllers.certificate.OrchClient.instance')
+    def test_root_ca_method_not_available(self, instance):
+        """Test getting root CA when get_cert method is not available."""
+        fake_client = self._setup_fake_client(mock.Mock())
+        del fake_client.cert_store.get_cert
+        instance.return_value = fake_client
+
+        self._get(self.URL_ROOT_CA)
+        self.assertStatus(500)
+
+
+if __name__ == '__main__':
+    unittest.main()