import copy
from cephadm.ssl_cert_utils import SSLCerts, SSLConfigException
-from typing import TYPE_CHECKING, Tuple, Union, List, Optional
+from mgr_util import verify_tls, ServerConfigException
+from cephadm.ssl_cert_utils import get_certificate_info, get_private_key_info
from cephadm.tlsobject_types import Cert, PrivKey
-from cephadm.tlsobject_store import TLSObjectStore, TLSObjectScope
+from cephadm.tlsobject_store import TLSObjectStore, TLSObjectScope, TLSObjectException
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
logger = logging.getLogger(__name__)
+class CertInfo:
+ """
+ - is_valid: True if the certificate is valid.
+ - is_close_to_expiration: True if the certificate is close to expiration.
+ - days_to_expiration: Number of days until expiration.
+ - error_info: Details of any exception encountered during validation.
+ """
+ def __init__(self, cert_name: str,
+ target: Optional[str],
+ user_made: bool = False,
+ is_valid: bool = False,
+ is_close_to_expiration: bool = False,
+ days_to_expiration: int = 0,
+ error_info: str = ''):
+ self.user_made = user_made
+ self.cert_name = cert_name
+ self.target = target or ''
+ self.is_valid = is_valid
+ self.is_close_to_expiration = is_close_to_expiration
+ self.days_to_expiration = days_to_expiration
+ self.error_info = error_info
+
+ def __str__(self) -> str:
+ return f'{self.cert_name} ({self.target})' if self.target else f'{self.cert_name}'
+
+ def is_operationally_valid(self) -> bool:
+ return self.is_valid and not self.is_close_to_expiration
+
+ def get_status_description(self) -> str:
+ cert_source = 'user-made' if self.user_made else 'self-signed'
+ cert_details = f"'{self.cert_name} ({self.target})' ({cert_source})"
+ if not self.is_valid:
+ if 'expired' in self.error_info.lower():
+ return f'Certificate {cert_details} has expired'
+ else:
+ return f'Certificate {cert_details} is not valid (error: {self.error_info})'
+ elif self.is_close_to_expiration:
+ return f'Certificate {cert_details} is about to expire (remaining days: {self.days_to_expiration})'
+
+ return 'Certificate is valid'
+
+
class CertMgr:
+ """
+ Cephadm Certificate Manager plays a crucial role in maintaining a secure and automated certificate
+ lifecycle within Cephadm deployments. CertMgr manages SSL/TLS certificates for all services
+ handled by cephadm, acting as the root Certificate Authority (CA) for all certificates.
+ This class provides mechanisms for storing, validating, renewing, and monitoring certificate status.
+
+ It tracks known certificates and private keys, associates them with services, and ensures
+ their validity. If certificates are close to expiration or invalid, depending on the configuration
+ (governed by the mgr/cephadm/certificate_automated_rotation_enabled parameter), CertMgr generates
+ warnings or attempts renewal for self-signed certificates.
+
+ Additionally, CertMgr provides methods for certificate management, including retrieving, saving,
+ and removing certificates and keys, as well as reporting certificate health status in case of issues.
+
+ This class holds the following important mappings:
+ - known_certs
+ - known_keys
+ - entities
+
+ First ones holds all the known certificates and keys managed by cephadm. Each certificate/key has a
+ pre-defined scope: Global, Host, or Service.
+
+ - Global: The same certificates is used for all the service daemons (e.g mgmt-gateway).
+ - Host: Certificates specific to individual hosts within the cluster (e.g Grafana).
+ - Service: Certificates tied to specific service (e.g RGW).
+
+ The entities mapping associates each scoped entity with its certificates. This information is needed
+ to trigger the corresponding service reconfiguration when updating some certificate and also when
+ setting the cert/key pair from CLI.
+ """
CEPHADM_ROOT_CA_CERT = 'cephadm_root_ca_cert'
CEPHADM_ROOT_CA_KEY = 'cephadm_root_ca_key'
+ CEPHADM_CERTMGR_HEALTH_ERR = 'CEPHADM_CERT_ERROR'
- # In an effort to try and track all the certs we manage in cephadm
- # we're being explicit here and listing them out.
-
- ####################################################
- # cephadm certmgr known Certificates section
- known_certs = {
- TLSObjectScope.SERVICE: [
- 'iscsi_ssl_cert',
- 'rgw_frontend_ssl_cert',
- 'ingress_ssl_cert',
- 'nvmeof_server_cert',
- 'nvmeof_client_cert',
- 'nvmeof_root_ca_cert',
- ],
- TLSObjectScope.HOST: [
- 'grafana_cert',
- ],
- TLSObjectScope.GLOBAL: [
- 'mgmt_gw_cert',
- 'oauth2_proxy_cert',
- CEPHADM_ROOT_CA_CERT,
- ],
- }
-
- ####################################################
- # cephadm certmgr known Keys section
- known_keys = {
- TLSObjectScope.SERVICE: [
- 'iscsi_ssl_key',
- 'ingress_ssl_key',
- 'nvmeof_server_key',
- 'nvmeof_client_key',
- 'nvmeof_encryption_key',
- ],
- TLSObjectScope.HOST: [
- 'grafana_key',
- ],
- TLSObjectScope.GLOBAL: [
- 'mgmt_gw_key',
- 'oauth2_proxy_key',
- CEPHADM_ROOT_CA_KEY,
- ],
- }
-
- cert_to_service = {
- 'rgw_frontend_ssl_cert': 'rgw',
- 'iscsi_ssl_cert': 'iscsi',
- 'ingress_ssl_cert': 'ingress',
- 'nvmeof_server_cert': 'nvmeof',
- 'nvmeof_client_cert': 'nvmeof',
- 'nvmeof_root_ca_cert': 'nvmeof',
- 'mgmt_gw_cert': 'mgmt-gateway',
- 'oauth2_proxy_cert': 'oauth2-proxy',
- 'grafana_cert': 'grafana',
- }
-
- def __init__(self,
- mgr: "CephadmOrchestrator",
- certificate_automated_rotation_enabled: bool,
- certificate_duration_days: int,
- renewal_threshold_days: int,
- mgr_ip: str) -> None:
+ def __init__(self, mgr: "CephadmOrchestrator") -> None:
self.mgr = mgr
- self.mgr_ip = mgr_ip
- self.certificate_automated_rotation_enabled = certificate_automated_rotation_enabled
- self.certificate_duration_days = certificate_duration_days
- self.renewal_threshold_days = renewal_threshold_days
- self._init_tlsobject_store()
- self._initialize_root_ca(mgr_ip)
-
- def _init_tlsobject_store(self) -> None:
+ self.certificates_health_report: List[CertInfo] = []
+ self.known_certs: Dict[TLSObjectScope, List[str]] = {
+ TLSObjectScope.SERVICE: [],
+ TLSObjectScope.HOST: [],
+ TLSObjectScope.GLOBAL: [self.CEPHADM_ROOT_CA_CERT],
+ }
+ self.known_keys: Dict[TLSObjectScope, List[str]] = {
+ TLSObjectScope.SERVICE: [],
+ TLSObjectScope.HOST: [],
+ TLSObjectScope.GLOBAL: [self.CEPHADM_ROOT_CA_KEY],
+ }
+ self.entities: Dict[TLSObjectScope, Dict[str, Dict[str, List[str]]]] = {
+ TLSObjectScope.SERVICE: {},
+ TLSObjectScope.HOST: {},
+ TLSObjectScope.GLOBAL: {},
+ }
+
+ def init_tlsobject_store(self) -> None:
self.cert_store = TLSObjectStore(self.mgr, Cert, self.known_certs)
self.cert_store.load()
self.key_store = TLSObjectStore(self.mgr, PrivKey, self.known_keys)
self.key_store.load()
+ self._initialize_root_ca(self.mgr.get_mgr_ip())
def load(self) -> None:
- self.cert_store.load()
- self.key_store.load()
+ self.init_tlsobject_store()
def _initialize_root_ca(self, ip: str) -> None:
- self.ssl_certs: SSLCerts = SSLCerts(self.certificate_duration_days)
+ self.ssl_certs: SSLCerts = SSLCerts(self.mgr._cluster_fsid, self.mgr.certificate_duration_days)
old_cert = cast(Cert, self.cert_store.get_tlsobject(self.CEPHADM_ROOT_CA_CERT))
old_key = cast(PrivKey, self.key_store.get_tlsobject(self.CEPHADM_ROOT_CA_KEY))
if old_key and old_cert:
def get_root_ca(self) -> str:
return self.ssl_certs.get_root_cert()
+ def register_cert_key_pair(self, entity: str, cert_name: str, key_name: str, scope: TLSObjectScope) -> None:
+ """
+ Registers a certificate/key for a given entity under a specific scope.
+
+ :param entity: The entity (e.g., service, host) owning the certificate.
+ :param cert_name: The name of the certificate.
+ :param key_name: The name of the key.
+ :param scope: The TLSObjectScope (SERVICE, HOST, GLOBAL).
+ """
+ self.register_cert(entity, cert_name, scope)
+ self.register_key(entity, key_name, scope)
+
+ def register_cert(self, entity: str, cert_name: str, scope: TLSObjectScope) -> None:
+ self._register_tls_object(entity, cert_name, scope, "certs")
+
+ def register_key(self, entity: str, key_name: str, scope: TLSObjectScope) -> None:
+ self._register_tls_object(entity, key_name, scope, "keys")
+
+ def _register_tls_object(self, entity: str, obj_name: str, scope: TLSObjectScope, obj_type: str) -> None:
+ """
+ Registers a TLS-related object (certificate or key) for a given entity under a specific scope.
+
+ :param entity: The entity (service name) owning the TLS object.
+ :param obj_name: The name of the certificate or key.
+ :param scope: The TLSObjectScope (SERVICE, HOST, GLOBAL).
+ :param obj_type: either "certs" or "keys".
+ """
+ storage = self.known_certs if obj_type == "certs" else self.known_keys
+
+ if obj_name and obj_name not in storage[scope]:
+ storage[scope].append(obj_name)
+
+ if entity not in self.entities[scope]:
+ self.entities[scope][entity] = {"certs": [], "keys": []}
+
+ self.entities[scope][entity][obj_type].append(obj_name)
+
+ def cert_to_entity(self, cert_name: str) -> str:
+ """
+ Retrieves the entity that owns a given certificate or key name.
+
+ :param cert_name: The certificate or key name.
+ :return: The entity name if found, otherwise None.
+ """
+ for scope_entities in self.entities.values():
+ for entity, certs in scope_entities.items():
+ if cert_name in certs:
+ return entity
+ return 'unkown'
+
def generate_cert(
self,
host_fqdn: Union[str, List[str]],
elif cert in self.known_certs[TLSObjectScope.GLOBAL]:
return TLSObjectScope.GLOBAL.value
return TLSObjectScope.UNKNOWN.value
+
+ def _notify_certificates_health_status(self, problematic_certificates: List[CertInfo]) -> None:
+
+ previously_reported_issues = [(c.cert_name, c.target) for c in self.certificates_health_report]
+ for cert_info in problematic_certificates:
+ if (cert_info.cert_name, cert_info.target) not in previously_reported_issues:
+ self.certificates_health_report.append(cert_info)
+
+ if not self.certificates_health_report:
+ self.mgr.remove_health_warning(CertMgr.CEPHADM_CERTMGR_HEALTH_ERR)
+ return
+
+ detailed_error_msgs = []
+ invalid_count = 0
+ expired_count = 0
+ expiring_count = 0
+ for cert_info in self.certificates_health_report:
+ cert_status = cert_info.get_status_description()
+ detailed_error_msgs.append(cert_status)
+ if not cert_info.is_valid:
+ if "expired" in cert_info.error_info:
+ expired_count += 1
+ else:
+ invalid_count += 1
+ elif cert_info.is_close_to_expiration:
+ expiring_count += 1
+
+ # Generate a short description with a summery of all the detected issues
+ issues = [
+ f'{invalid_count} invalid' if invalid_count > 0 else '',
+ f'{expired_count} expired' if expired_count > 0 else '',
+ f'{expiring_count} expiring' if expiring_count > 0 else ''
+ ]
+ issues_description = ', '.join(filter(None, issues)) # collect only non-empty issues
+ total_issues = invalid_count + expired_count + expiring_count
+ short_error_msg = (f'Detected {total_issues} cephadm certificate(s) issues: {issues_description}')
+
+ if invalid_count > 0 or expired_count > 0:
+ logger.error(short_error_msg)
+ self.mgr.set_health_error(CertMgr.CEPHADM_CERTMGR_HEALTH_ERR, short_error_msg, total_issues, detailed_error_msgs)
+ else:
+ logger.warning(short_error_msg)
+ self.mgr.set_health_warning(CertMgr.CEPHADM_CERTMGR_HEALTH_ERR, short_error_msg, total_issues, detailed_error_msgs)
+
+ def check_certificate_state(self, cert_name: str, target: str, cert: str, key: str) -> CertInfo:
+ """
+ Checks if a certificate is valid and close to expiration.
+
+ Returns:
+ - is_valid: True if the certificate is valid.
+ - is_close_to_expiration: True if the certificate is close to expiration.
+ - days_to_expiration: Number of days until expiration.
+ - exception_info: Details of any exception encountered during validation.
+ """
+ cert_obj = Cert(cert, True)
+ key_obj = PrivKey(key, True)
+ return self._check_certificate_state(cert_name, target, cert_obj, key_obj)
+
+ def _check_certificate_state(self, cert_name: str, target: Optional[str], cert: Cert, key: PrivKey) -> CertInfo:
+ """
+ Checks if a certificate is valid and close to expiration.
+
+ Returns: CertInfo
+ """
+ try:
+ days_to_expiration = verify_tls(cert.cert, key.key)
+ is_close_to_expiration = days_to_expiration < self.mgr.certificate_renewal_threshold_days
+ return CertInfo(cert_name, target, cert.user_made, True, is_close_to_expiration, days_to_expiration, "")
+ except ServerConfigException as e:
+ return CertInfo(cert_name, target, cert.user_made, False, False, 0, str(e))
+
+ def prepare_certificate(self,
+ cert_name: str,
+ key_name: str,
+ host_fqdns: Union[str, List[str]],
+ host_ips: Union[str, List[str]],
+ target_host: str = '',
+ target_service: str = '',
+ ) -> Tuple[Optional[str], Optional[str]]:
+
+ if not cert_name or not key_name:
+ logger.error("Certificate name and key name must be provided when calling prepare_certificates.")
+ return None, None
+
+ cert_obj = cast(Cert, self.cert_store.get_tlsobject(cert_name, target_service, target_host))
+ key_obj = cast(PrivKey, self.key_store.get_tlsobject(key_name, target_service, target_host))
+ if cert_obj and key_obj:
+ target = target_host or target_service
+ cert_info = self._check_certificate_state(cert_name, target, cert_obj, key_obj)
+ if cert_info.is_operationally_valid():
+ return cert_obj.cert, key_obj.key
+ elif cert_obj.user_made:
+ self._notify_certificates_health_status([cert_info])
+ return None, None
+ else:
+ logger.warning(f'Found invalid cephadm certificate/key pair {cert_name}/{key_name}, '
+ f'status: {cert_info.get_status_description()}, '
+ f'error: {cert_info.error_info}')
+
+ # Reaching this point means either certificates are not present or they are
+ # invalid self-signed certificates. Either way, we will just generate new ones.
+ logger.info(f'Generating cephadm self-signed certificates for {cert_name}/{key_name}')
+ cert, pkey = self.generate_cert(host_fqdns, host_ips)
+ self.mgr.cert_mgr.save_cert(cert_name, cert, host=target_host, service_name=target_service)
+ self.mgr.cert_mgr.save_key(key_name, pkey, host=target_host, service_name=target_service)
+ return cert, pkey
+
+ def get_problematic_certificates(self) -> List[Tuple[CertInfo, Cert]]:
+
+ def get_key(cert_name: str, target: Optional[str]) -> Optional[PrivKey]:
+ try:
+ key_name = cert_name.replace('_cert', '_key')
+ service_name, host = self.cert_store.determine_tlsobject_target(cert_name, target)
+ key = cast(PrivKey, self.key_store.get_tlsobject(key_name, service_name=service_name, host=host))
+ return key
+ except TLSObjectException as e:
+ return None
+
+ # Filter non-empty entries skipping cephadm root CA cetificate
+ certs_tlsobjs = [c for c in self.cert_store.list_tlsobjects() if c[1] and c[0] != self.CEPHADM_ROOT_CA_CERT]
+ problematics_certs: List[Tuple[CertInfo, Cert]] = []
+ for cert_name, cert_tlsobj, target in certs_tlsobjs:
+ cert_obj = cast(Cert, cert_tlsobj)
+ key_obj = get_key(cert_name, target)
+ if cert_obj and key_obj:
+ cert_info = self._check_certificate_state(cert_name, target, cert_obj, key_obj)
+ if not cert_info.is_operationally_valid():
+ problematics_certs.append((cert_info, cert_obj))
+ else:
+ target_info = f" ({target})" if target else ""
+ logger.info(f'Certificate for "{cert_name}{target_info}" is still valid for {cert_info.days_to_expiration} days.')
+ elif cert_obj:
+ # Cert is present but key is None, could only happen if somebody has put manually a bad key!
+ logger.warning(f"Key is missing for certificate '{cert_name}'.")
+ cert_info = CertInfo(cert_name, target, cert_obj.user_made, False, False, 0, "missing key")
+ problematics_certs.append((cert_info, cert_obj))
+ else:
+ logger.error(f'Cannot get cert/key {cert_name}')
+
+ return problematics_certs
+
+ def _renew_self_signed_certificate(self, cert_info: CertInfo, cert_obj: Cert) -> bool:
+ try:
+ logger.info(f'Renewing self-signed certificate for {cert_info.cert_name}')
+ new_cert, new_key = self.ssl_certs.renew_cert(cert_obj.cert, self.mgr.certificate_duration_days)
+ service_name, host = self.cert_store.determine_tlsobject_target(cert_info.cert_name, cert_info.target)
+ self.cert_store.save_tlsobject(cert_info.cert_name, new_cert, service_name=service_name, host=host)
+ key_name = cert_info.cert_name.replace('_cert', '_key')
+ self.key_store.save_tlsobject(key_name, new_key, service_name=service_name, host=host)
+ return True
+ except SSLConfigException as e:
+ logger.error(f'Error while trying to renew self-signed certificate for {cert_info.cert_name}: {e}')
+ return False
+
+ def check_services_certificates(self, fix_issues: bool = False) -> Tuple[List[str], List[CertInfo]]:
+ """
+ Checks services' certificates and optionally attempts to fix issues if fix_issues is True.
+
+ :param fix_issues: Whether to attempt fixing issues automatically.
+ :return: A tuple with:
+ - List of services requiring reconfiguration.
+ - List of certificates that require manual intervention.
+ """
+
+ def requires_user_intervention(cert_info: CertInfo, cert_obj: Cert) -> bool:
+ """Determines if a certificate requires manual user intervention."""
+ close_to_expiry = (not cert_info.is_operationally_valid() and not self.mgr.certificate_automated_rotation_enabled)
+ user_made_and_invalid = cert_obj.user_made and not cert_info.is_operationally_valid()
+ return close_to_expiry or user_made_and_invalid
+
+ def trigger_auto_fix(cert_info: CertInfo, cert_obj: Cert) -> bool:
+ """Attempts to automatically fix certificate issues if possible."""
+ if not self.mgr.certificate_automated_rotation_enabled or cert_obj.user_made:
+ return False
+
+ # This is a self-signed certificate, let's try to fix it
+ if not cert_info.is_valid:
+ # Remove the invalid certificate to force regeneration
+ service_name, host = self.cert_store.determine_tlsobject_target(cert_info.cert_name, cert_info.target)
+ logger.info(
+ f'Removing invalid certificate for {cert_info.cert_name} to trigger regeneration '
+ f'(service: {service_name}, host: {host}).'
+ )
+ self.cert_store.rm_tlsobject(cert_info.cert_name, service_name, host)
+ return True
+ elif cert_info.is_close_to_expiration:
+ return self._renew_self_signed_certificate(cert_info, cert_obj)
+ else:
+ return False
+
+ # Process all problematic certificates and try to fix them in case automated certs renewal
+ # is enabled. Successfully fixed ones are collected to trigger a service reconfiguration.
+ certs_with_issues = []
+ services_to_reconfig = set()
+ for cert_info, cert_obj in self.get_problematic_certificates():
+
+ logger.warning(cert_info.get_status_description())
+
+ if requires_user_intervention(cert_info, cert_obj):
+ certs_with_issues.append(cert_info)
+ continue
+
+ if fix_issues and trigger_auto_fix(cert_info, cert_obj):
+ services_to_reconfig.add(self.cert_to_entity(cert_info.cert_name))
+
+ # Clear previously reported issues as we are newly checking all the certifiactes
+ self.certificates_health_report = []
+
+ # All problematic certificates have been processed. certs_with_issues now only
+ # contains certificates that couldn't be fixed either because they are user-made
+ # or automated rotation is disabled. In these cases, health warning or error
+ # is raised to notify the user.
+ self._notify_certificates_health_status(certs_with_issues)
+
+ return list(services_to_reconfig), certs_with_issues
from ceph.deployment.service_spec import PrometheusSpec
from cephadm.cert_mgr import CertMgr
+from cephadm.tlsobject_store import TLSObjectScope
import string
from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
NotifyType,
MonCommandFailed,
)
-from mgr_util import build_url
+from mgr_util import build_url, verify_cacrt_content, ServerConfigException
import orchestrator
from orchestrator.module import to_format, Format
default=False,
desc='Log all refresh metadata. Includes daemon, device, and host info collected regularly. Only has effect if logging at debug level'
),
+ Option(
+ 'certificate_automated_rotation_enabled',
+ type='bool',
+ default=False,
+ desc='This flag controls whether cephadm automatically rotates certificates upon expiration.',
+ ),
+ Option(
+ 'certificate_check_period',
+ type='int',
+ default=1, # Default to checking certificates once per day
+ desc='Specifies how often (in days) the certificate should be checked for validity.',
+ min=1,
+ max=3, # must be lesr than min of certificate_renewal_threshold_days
+ ),
+ Option(
+ 'certificate_duration_days',
+ type='int',
+ default=(3 * 365),
+ desc='Specifies the duration of self certificates generated and signed by cephadm root CA',
+ min=90,
+ max=(10 * 365)
+ ),
+ Option(
+ 'certificate_renewal_threshold_days',
+ type='int',
+ default=30,
+ desc='Specifies the lead time in days to initiate certificate renewal before expiration.',
+ min=10,
+ max=90
+ ),
Option(
'secure_monitoring_stack',
type='bool',
self.oob_default_addr = ''
self.ssh_keepalive_interval = 0
self.ssh_keepalive_count_max = 0
+ self.certificate_duration_days = 0
+ self.certificate_renewal_threshold_days = 0
+ self.certificate_automated_rotation_enabled = False
+ self.certificate_check_period = 0
self.notify(NotifyType.mon_map, None)
self.config_notify()
self.tuned_profile_utils = TunedProfileUtils(self)
- self.cert_mgr = CertMgr(self, self.get_mgr_ip())
+ self._init_cert_mgr()
# ensure the host lists are in sync
for h in self.inventory.keys():
"""
return self.inventory.get_fqdn(hostname) or self.inventory.get_addr(hostname)
+ def _init_cert_mgr(self) -> None:
+
+ self.cert_mgr = CertMgr(self)
+
+ # register global certificates
+ self.cert_mgr.register_cert_key_pair('mgmt-gateway', 'mgmt_gw_cert', 'mgmt_gw_key', TLSObjectScope.GLOBAL)
+ self.cert_mgr.register_cert_key_pair('oauth2-proxy', 'oauth2_proxy_cert', 'oauth2_proxy_key', TLSObjectScope.GLOBAL)
+
+ # register per-service certificates
+ self.cert_mgr.register_cert_key_pair('ingress', 'ingress_ssl_cert', 'ingress_ssl_key', TLSObjectScope.SERVICE)
+ self.cert_mgr.register_cert_key_pair('iscsi', 'iscsi_ssl_cert', 'iscsi_ssl_key', TLSObjectScope.SERVICE)
+ self.cert_mgr.register_cert_key_pair('nvmeof', 'nvmeof_server_cert', 'nvmeof_server_key', TLSObjectScope.SERVICE)
+ self.cert_mgr.register_cert_key_pair('nvmeof', 'nvmeof_client_cert', 'nvmeof_client_key', TLSObjectScope.SERVICE)
+ self.cert_mgr.register_cert('nvmeof', 'nvmeof_root_ca_cert', TLSObjectScope.SERVICE)
+ self.cert_mgr.register_cert('rgw', 'rgw_frontend_ssl_cert', TLSObjectScope.SERVICE)
+ self.cert_mgr.register_key('nvmeof', 'nvmeof_encryption_key', TLSObjectScope.SERVICE)
+
+ # register per-host certificates
+ self.cert_mgr.register_cert_key_pair('grafana', 'grafana_cert', 'grafana_key', TLSObjectScope.HOST)
+
+ self.cert_mgr.init_tlsobject_store()
+
def _get_security_config(self) -> Tuple[bool, bool, bool]:
oauth2_proxy_enabled = len(self.cache.get_daemons_by_service('oauth2-proxy')) > 0
mgmt_gw_enabled = len(self.cache.get_daemons_by_service('mgmt-gateway')) > 0
@handle_orch_error
def cert_store_cert_ls(self) -> Dict[str, Any]:
- return self.cert_key_store.cert_ls()
+ return self.cert_mgr.cert_ls()
@handle_orch_error
def cert_store_key_ls(self) -> Dict[str, Any]:
-from typing import Any, Tuple, IO, List, Union, Optional
+from typing import Any, Tuple, IO, List, Union, Optional, Dict
import ipaddress
from datetime import datetime, timedelta
from cryptography import x509
-from cryptography.x509.oid import NameOID
+from cryptography.x509 import Certificate
+from cryptography.x509.oid import NameOID, ExtensionOID
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.backends import default_backend
pass
+def parse_extensions(cert: Certificate) -> Dict:
+ """Parse extensions into a readable format."""
+ parsed_extensions = {}
+ for ext in cert.extensions:
+ try:
+ if ext.oid == ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
+ san = ext.value
+ parsed_extensions["subjectAltName"] = {
+ "DNSNames": san.get_values_for_type(x509.DNSName),
+ "IPAddresses": [str(ip) for ip in san.get_values_for_type(x509.IPAddress)],
+ }
+ elif ext.oid == ExtensionOID.BASIC_CONSTRAINTS:
+ basic_constraints = ext.value
+ parsed_extensions["basicConstraints"] = {
+ "ca": basic_constraints.ca,
+ "path_length": basic_constraints.path_length,
+ }
+ elif ext.oid == ExtensionOID.SUBJECT_KEY_IDENTIFIER:
+ parsed_extensions["subjectKeyIdentifier"] = {"present": True}
+ elif ext.oid == ExtensionOID.AUTHORITY_KEY_IDENTIFIER:
+ parsed_extensions["authorityKeyIdentifier"] = {"present": True}
+ else:
+ parsed_extensions[ext.oid.dotted_string] = {"value": "present"}
+ except Exception as e:
+ parsed_extensions[ext.oid.dotted_string] = {"error": str(e)}
+
+ return parsed_extensions
+
+
+def get_certificate_info(cert_data: str, include_details: bool = False) -> Dict:
+ """Return detailed information about a certificate as a dictionary."""
+
+ def get_oid_name(oid: Any) -> str:
+ """Return a human-readable name for an OID."""
+ oid_mapping = {
+ NameOID.COMMON_NAME: 'commonName',
+ NameOID.COUNTRY_NAME: 'countryName',
+ NameOID.LOCALITY_NAME: 'localityName',
+ NameOID.STATE_OR_PROVINCE_NAME: 'stateOrProvinceName',
+ NameOID.ORGANIZATION_NAME: 'organizationName',
+ NameOID.ORGANIZATIONAL_UNIT_NAME: 'organizationalUnitName',
+ }
+ return oid_mapping.get(oid, oid.dotted_string)
+
+ try:
+ cert = x509.load_pem_x509_certificate(cert_data.encode('utf-8'), default_backend())
+ remaining_days = (cert.not_valid_after - datetime.utcnow()).days
+ info = {
+ 'subject': {get_oid_name(attr.oid): attr.value for attr in cert.subject},
+ 'validity': {
+ 'remaining_days': remaining_days,
+ }
+ }
+
+ if include_details:
+ info['issuer'] = {get_oid_name(attr.oid): attr.value for attr in cert.issuer}
+ info['validity'] = {
+ 'not_before': cert.not_valid_before.isoformat(),
+ 'not_after': cert.not_valid_after.isoformat(),
+ 'remaining_days': remaining_days,
+ }
+ info['extensions'] = parse_extensions(cert)
+ info['public_key'] = {}
+ public_key = cert.public_key()
+ if isinstance(public_key, rsa.RSAPublicKey):
+ info['public_key'] = {
+ 'key_type': 'RSA',
+ 'key_size': public_key.key_size,
+ }
+ else:
+ info['public_key'] = {
+ 'key_type': 'Unknown',
+ }
+
+ return info
+ except Exception as e:
+ return {'Error': f'Error parsing certificate: {e}'}
+
+
+def get_private_key_info(private_data: str) -> Dict:
+ """Return detailed information about a private key as a dictionary."""
+ try:
+ private_key = serialization.load_pem_private_key(
+ private_data.encode('utf-8'),
+ password=None,
+ backend=default_backend())
+
+ info = {}
+ if isinstance(private_key, rsa.RSAPrivateKey):
+ info = {
+ 'key_type': 'RSA',
+ 'key_size': private_key.key_size,
+ }
+ else:
+ info = {
+ 'key_type': 'Unknown',
+ }
+ return info
+ except Exception as e:
+ return {'Error': f'Error parsing key: {e}'}
+
+
class SSLCerts:
- def __init__(self, fsid: str) -> None:
+ def __init__(self, fsid: str, _certificate_duration_days: int = (365 * 10 + 3)) -> None:
+ self.root_certificate_duration_days = (365 * 10 + 3)
+ self.certificate_duration_days = _certificate_duration_days
self.root_cert: Any
self.root_key: Any
self.key_file: IO[bytes]
public_exponent=65537, key_size=4096, backend=default_backend())
root_public_key = self.root_key.public_key()
root_builder = x509.CertificateBuilder()
- root_builder = root_builder.subject_name(x509.Name([
- x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'),
- ]))
- root_builder = root_builder.issuer_name(x509.Name([
+ root_ca_name = x509.Name([
+ x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Ceph"),
x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'),
- ]))
+ ])
+ root_builder = root_builder.subject_name(root_ca_name)
+ root_builder = root_builder.issuer_name(root_ca_name)
root_builder = root_builder.not_valid_before(datetime.now())
- root_builder = root_builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3)))
+ root_builder = root_builder.not_valid_after(datetime.now() + timedelta(days=self.root_certificate_duration_days))
root_builder = root_builder.serial_number(x509.random_serial_number())
root_builder = root_builder.public_key(root_public_key)
public_key = private_key.public_key()
builder = x509.CertificateBuilder()
+ root_ca_name = x509.Name([
+ x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Ceph"),
+ x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'),
+ ])
builder = builder.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, addrs[0]), ]))
- builder = builder.issuer_name(
- x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'), ]))
+ builder = builder.issuer_name(root_ca_name)
builder = builder.not_valid_before(datetime.now())
- builder = builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3)))
+ builder = builder.not_valid_after(datetime.now() + timedelta(days=self.certificate_duration_days))
builder = builder.serial_number(x509.random_serial_number())
builder = builder.public_key(public_key)
return (cert_str, key_str)
+ def renew_cert(
+ self,
+ old_cert: str,
+ new_duration_days: Optional[int] = None
+ ) -> Tuple[str, str]:
+ """
+ Renews a certificate, generating a new private key and extending its duration.
+
+ :param old_cert: The existing certificate (PEM format) to be renewed.
+ :param new_duration_days: The new validity duration for the certificate in days.
+ If not provided, it defaults to `self.certificate_duration_days`.
+ :return: A tuple containing the renewed certificate and the new private key (PEM format).
+ """
+ try:
+ # Load the old certificate
+ old_certificate = x509.load_pem_x509_certificate(old_cert.encode('utf-8'), backend=default_backend())
+
+ # Generate a new private key
+ new_private_key = rsa.generate_private_key(
+ public_exponent=65537, key_size=4096, backend=default_backend()
+ )
+
+ # Extract existing SANs
+ san_extension = old_certificate.extensions.get_extension_for_class(x509.SubjectAlternativeName)
+ san_list = san_extension.value
+
+ # Build a new certificate with the same attributes
+ builder = x509.CertificateBuilder()
+ builder = builder.subject_name(old_certificate.subject)
+ builder = builder.issuer_name(old_certificate.issuer)
+ builder = builder.not_valid_before(datetime.now())
+ builder = builder.not_valid_after(
+ datetime.now() + timedelta(days=new_duration_days or self.certificate_duration_days)
+ )
+ builder = builder.serial_number(x509.random_serial_number())
+ builder = builder.public_key(new_private_key.public_key())
+
+ # Reuse SANs
+ builder = builder.add_extension(san_list, critical=False)
+
+ # Retain the original basic constraints
+ basic_constraints = old_certificate.extensions.get_extension_for_class(x509.BasicConstraints)
+ builder = builder.add_extension(basic_constraints.value, critical=basic_constraints.critical)
+
+ # Sign the new certificate
+ renewed_cert = builder.sign(private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend())
+
+ # Convert certificate and key to PEM format
+ cert_str = renewed_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
+ key_str = new_private_key.private_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=serialization.NoEncryption()
+ ).decode('utf-8')
+
+ return cert_str, key_str
+
+ except Exception as e:
+ raise SSLConfigException(f"Failed to renew certificate: {e}")
+
def get_root_cert(self) -> str:
try:
return self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')