from typing import TYPE_CHECKING, Tuple, Union, List, Dict, Optional, cast, Any
import logging
-import copy
from cephadm.ssl_cert_utils import SSLCerts, SSLConfigException
from mgr_util import verify_tls, ServerConfigException
def rm_key(self, key_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
self.key_store.rm_tlsobject(key_name, service_name, host)
- def cert_ls(self) -> Dict[str, Union[bool, Dict[str, Dict[str, bool]]]]:
- ls: Dict = copy.deepcopy(self.cert_store.get_tlsobjects())
- for k, v in ls.items():
- if isinstance(v, dict):
- tmp: Dict[str, Any] = {key: get_certificate_info(cast(Cert, v[key]).cert) for key in v if isinstance(v[key], Cert)}
- ls[k] = tmp if tmp else {}
- elif isinstance(v, Cert):
- ls[k] = get_certificate_info(cast(Cert, v).cert) if bool(v) else False
+ def cert_ls(self, include_datails: bool = False) -> Dict:
+ cert_objects: List = self.cert_store.list_tlsobjects()
+ ls: Dict = {}
+ for cert_name, cert_obj, target in cert_objects:
+ cert_extended_info = get_certificate_info(cert_obj.cert, include_datails)
+ cert_scope = self.get_cert_scope(cert_name)
+ if cert_name not in ls:
+ ls[cert_name] = {'scope': str(cert_scope), 'certificates': {}}
+ if cert_scope == TLSObjectScope.GLOBAL:
+ ls[cert_name]['certificates'] = cert_extended_info
+ else:
+ ls[cert_name]['certificates'][target] = cert_extended_info
+
return ls
- def key_ls(self) -> Dict[str, Union[bool, Dict[str, bool]]]:
- ls: Dict = copy.deepcopy(self.key_store.get_tlsobjects())
- if self.CEPHADM_ROOT_CA_KEY in ls:
- del ls[self.CEPHADM_ROOT_CA_KEY]
- for k, v in ls.items():
- if isinstance(v, dict) and v:
- tmp: Dict[str, Any] = {key: get_private_key_info(cast(PrivKey, v[key]).key) for key in v if v[key]}
- ls[k] = tmp if tmp else {}
- elif isinstance(v, PrivKey):
- ls[k] = get_private_key_info(cast(PrivKey, v).key)
+ def key_ls(self) -> Dict:
+ key_objects: List = self.key_store.list_tlsobjects()
+ ls: Dict = {}
+ for key_name, key_obj, target in key_objects:
+ priv_key_info = get_private_key_info(key_obj.key)
+ key_scope = self.get_key_scope(key_name)
+ if key_name not in ls:
+ ls[key_name] = {'scope': str(key_scope), 'keys': {}}
+ if key_scope == TLSObjectScope.GLOBAL:
+ ls[key_name]['keys'] = priv_key_info
+ else:
+ ls[key_name]['keys'].update({target: priv_key_info})
+
+ # we don't want this key to be leaked
+ del ls[self.CEPHADM_ROOT_CA_KEY]
+
return ls
def list_entity_known_certificates(self, entity: str) -> List[str]:
- return [cert_name for cert_name, service in self.cert_to_service.items() if service == entity]
+ """
+ Retrieves all certificates associated with a given entity.
- def entity_ls(self, get_scope: bool = False) -> List[Union[str, Tuple[str, str]]]:
- if get_scope:
- return [(entity, self.determine_scope(entity)) for entity in set(self.cert_to_service.values())]
- else:
- return list(self.cert_to_service.values())
-
- def determine_scope(self, entity: str) -> str:
- for cert, service in self.cert_to_service.items():
- if service == entity:
- if cert in self.known_certs[TLSObjectScope.SERVICE]:
- return TLSObjectScope.SERVICE.value
- elif cert in self.known_certs[TLSObjectScope.HOST]:
- return TLSObjectScope.HOST.value
- elif cert in self.known_certs[TLSObjectScope.GLOBAL]:
- return TLSObjectScope.GLOBAL.value
- return TLSObjectScope.UNKNOWN.value
+ :param entity: The entity name.
+ :return: A list of certificate names, or None if the entity is not found.
+ """
+ for scope, entities in self.entities.items():
+ if entity in entities:
+ return entities[entity]['certs'] # Return certs for the entity
+ return []
+
+ def get_entities(self, get_scope: bool = False) -> Dict[str, Any]:
+ return {f'{scope}': entities for scope, entities in self.entities.items()}
+
+ def list_entities(self) -> List[str]:
+ """
+ Retrieves a list of all registered entities across all scopes.
+ :return: A list of entity names.
+ """
+ entities: List[str] = []
+ for scope_entities in self.entities.values():
+ entities.extend(scope_entities.keys())
+ return entities
+
+ def get_cert_scope(self, cert_name: str) -> TLSObjectScope:
+ for scope, certificates in self.known_certs.items():
+ if cert_name in certificates:
+ return scope
+ return TLSObjectScope.UNKNOWN
+
+ def get_key_scope(self, key_name: str) -> TLSObjectScope:
+ for scope, keys in self.known_keys.items():
+ if key_name in keys:
+ return scope
+ return TLSObjectScope.UNKNOWN
def _notify_certificates_health_status(self, problematic_certificates: List[CertInfo]) -> None:
service_name, host = self.cert_store.determine_tlsobject_target(cert_name, target)
key = cast(PrivKey, self.key_store.get_tlsobject(key_name, service_name=service_name, host=host))
return key
- except TLSObjectException as e:
+ except TLSObjectException:
return None
# Filter non-empty entries skipping cephadm root CA cetificate
'certificate': self.cert_mgr.get_root_ca()}
@handle_orch_error
- def cert_store_cert_ls(self) -> Dict[str, Any]:
- return self.cert_mgr.cert_ls()
+ def cert_store_cert_ls(self, show_details: bool = False) -> Dict[str, Any]:
+ return self.cert_mgr.cert_ls(show_details)
+
+ @handle_orch_error
+ def cert_store_entity_ls(self) -> Dict[str, Dict[str, List[str]]]:
+ return self.cert_mgr.get_entities()
+
+ @handle_orch_error
+ def cert_store_reload(self) -> str:
+ self.cert_mgr.load()
+ return "OK"
+
+ @handle_orch_error
+ def cert_store_cert_check(self) -> List[str]:
+ report = []
+ _, certs_with_issues = self.cert_mgr.check_services_certificates(fix_issues=False)
+ for cert_info in certs_with_issues:
+ if not cert_info.is_operationally_valid():
+ report.append(cert_info.get_status_description())
+ return report
@handle_orch_error
def cert_store_key_ls(self) -> Dict[str, Any]:
@handle_orch_error
def cert_store_get_cert(
self,
- entity: str,
+ cert_name: str,
service_name: Optional[str] = None,
hostname: Optional[str] = None,
no_exception_when_missing: bool = False
) -> str:
- cert = self.cert_mgr.get_cert(entity, service_name or '', hostname or '')
+ cert = self.cert_mgr.get_cert(cert_name, service_name or '', hostname or '')
if not cert:
if no_exception_when_missing:
return ''
- raise OrchSecretNotFound(entity=entity, service_name=service_name, hostname=hostname)
+ raise OrchSecretNotFound(entity=cert_name, service_name=service_name, hostname=hostname)
return cert
@handle_orch_error
def cert_store_get_key(
self,
- entity: str,
+ key_name: str,
service_name: Optional[str] = None,
hostname: Optional[str] = None,
no_exception_when_missing: bool = False
) -> str:
- key = self.cert_mgr.get_key(entity, service_name or '', hostname or '')
+ key = self.cert_mgr.get_key(key_name, service_name or '', hostname or '')
if not key:
if no_exception_when_missing:
return ''
- raise OrchSecretNotFound(entity=entity, service_name=service_name, hostname=hostname)
+ raise OrchSecretNotFound(entity=key_name, service_name=service_name, hostname=hostname)
return key
+ @handle_orch_error
+ def cert_store_set_pair(
+ self,
+ cert: str,
+ key: str,
+ entity: str,
+ cert_name: str = "",
+ service_name: str = "",
+ hostname: str = "",
+ force: bool = False
+ ) -> str:
+
+ if entity not in self.cert_mgr.list_entities():
+ raise OrchestratorError(f"Invalid entity: {entity}. Please use 'ceph orch certmgr entity ls' to list valid entities.")
+
+ # Check the certificate validity status
+ target = service_name or hostname
+ cert_info = self.cert_mgr.check_certificate_state(entity, target, cert, key)
+ if not force and not cert_info.is_operationally_valid():
+ raise OrchestratorError(cert_info.get_status_description())
+
+ # Obtain the certificate name (from entity)
+ cert_names = self.cert_mgr.list_entity_known_certificates(entity)
+ if len(cert_names) == 1:
+ cert_name = cert_names[0]
+ elif len(cert_names) > 1 and not cert_name:
+ raise OrchestratorError(f"Entity '{entity}' has many certificates, please use --cert-name argument to specify which one from the list: {cert_names}")
+
+ # Check the certificate scope
+ scope_errors = {
+ TLSObjectScope.HOST: "Certificate is bound to a host. Please specify the host using --hostname.",
+ TLSObjectScope.SERVICE: "Certificate is bound to a service. Please specify the service using --service-name.",
+ TLSObjectScope.UNKNOWN: f"Unknown certificate '{cert_name}'. Use 'ceph orch certmgr cert ls' to list supported certificates.",
+ }
+ scope = self.cert_mgr.get_cert_scope(cert_name)
+ if (scope == TLSObjectScope.HOST and not hostname) or (scope == TLSObjectScope.SERVICE and not service_name):
+ raise OrchestratorError(scope_errors[scope])
+
+ key_name = cert_name.replace('_cert', '_key')
+ self.cert_mgr.save_cert(cert_name, cert, service_name, hostname, True)
+ self.cert_mgr.save_key(key_name, key, service_name, hostname, True)
+ return "Certificate/key pair set correctly"
+
+ @handle_orch_error
+ def cert_store_set_cert(
+ self,
+ cert: str,
+ cert_name: str,
+ service_name: Optional[str] = None,
+ hostname: Optional[str] = None,
+ ) -> str:
+
+ try:
+ days_to_expiration = verify_cacrt_content(cert)
+ if days_to_expiration < self.certificate_renewal_threshold_days:
+ raise OrchestratorError(f'Error: Certificate is about to expire (Remaining days: {days_to_expiration})')
+ except ServerConfigException as e:
+ raise OrchestratorError(f'Error: Invalid certificate for {cert_name}: {e}')
+
+ self.cert_mgr.save_cert(cert_name, cert, service_name, hostname, True)
+ return f'Certificate for {cert_name} set correctly'
+
+ @handle_orch_error
+ def cert_store_set_key(
+ self,
+ key: str,
+ key_name: str,
+ service_name: Optional[str] = None,
+ hostname: Optional[str] = None,
+ ) -> str:
+ self.cert_mgr.save_key(key_name, key, service_name, hostname, True)
+ return f'Key for {key_name} set correctly'
+
@handle_orch_error
def apply_mon(self, spec: ServiceSpec) -> str:
return self._apply(spec)
else:
salt = salt_password.encode('utf8')
return bcrypt.hashpw(password.encode('utf8'), salt).decode('utf8')
+
+def parse_combined_pem_file(pem_data: str) -> Tuple[Optional[str], Optional[str]]:
+
+ # Extract the certificate
+ cert_start = "-----BEGIN CERTIFICATE-----"
+ cert_end = "-----END CERTIFICATE-----"
+ cert = None
+ if cert_start in pem_data and cert_end in pem_data:
+ cert = pem_data[pem_data.index(cert_start):pem_data.index(cert_end) + len(cert_end)]
+
+ # Extract the private key
+ key_start = "-----BEGIN PRIVATE KEY-----"
+ key_end = "-----END PRIVATE KEY-----"
+ private_key = None
+ if key_start in pem_data and key_end in pem_data:
+ private_key = pem_data[pem_data.index(key_start):pem_data.index(key_end) + len(key_end)]
+
+ return cert, private_key
"""
raise NotImplementedError()
- def cert_store_cert_ls(self) -> OrchResult[Dict[str, Any]]:
+ def cert_store_cert_ls(self, show_details: bool = False) -> OrchResult[Dict[str, Any]]:
+ raise NotImplementedError()
+
+ def cert_store_entity_ls(self) -> OrchResult[Dict[Any, Dict[str, List[str]]]]:
+ raise NotImplementedError()
+
+ def cert_store_reload(self) -> OrchResult[str]:
+ raise NotImplementedError()
+
+ def cert_store_cert_check(self) -> OrchResult[List[str]]:
raise NotImplementedError()
def cert_store_key_ls(self) -> OrchResult[Dict[str, Any]]:
def cert_store_get_cert(
self,
- entity: str,
+ cert_name: str,
service_name: Optional[str] = None,
hostname: Optional[str] = None,
no_exception_when_missing: bool = False
def cert_store_get_key(
self,
- entity: str,
+ key_name: str,
service_name: Optional[str] = None,
hostname: Optional[str] = None,
no_exception_when_missing: bool = False
) -> OrchResult[str]:
raise NotImplementedError()
+ def cert_store_set_pair(
+ self,
+ cert: str,
+ key: str,
+ entity: str,
+ cert_name: Optional[str] = None,
+ service_name: Optional[str] = None,
+ hostname: Optional[str] = None,
+ force: Optional[bool] = False
+ ) -> OrchResult[str]:
+ raise NotImplementedError()
+
+ def cert_store_set_cert(
+ self,
+ cert: str,
+ cert_name: str,
+ service_name: Optional[str] = None,
+ hostname: Optional[str] = None,
+ ) -> OrchResult[str]:
+ raise NotImplementedError()
+
+ def cert_store_set_key(
+ self,
+ key: str,
+ key_name: str,
+ service_name: Optional[str] = None,
+ hostname: Optional[str] = None,
+ ) -> OrchResult[str]:
+ raise NotImplementedError()
+
@handle_orch_error
def apply(
self,
from ceph.deployment.utils import unwrap_ipv6
from ceph.utils import datetime_now
from ceph.cephadm.images import NonCephImageServiceTypes
-from mgr_util import to_pretty_timedelta, format_bytes
+from mgr_util import to_pretty_timedelta, format_bytes, parse_combined_pem_file
from mgr_module import MgrModule, HandleCommandResult, Option
from object_format import Format
result_str = ''
indent = ' ' * level
for k, v in d.items():
- if isinstance(v, dict):
+ if isinstance(v, dict) and v:
result_str += f'{indent}{k}\n'
result_str += self._process_cert_store_json(v, level + 1)
- else:
- result_str += f'{indent}{k} - {v}\n'
+ elif v or isinstance(v, int):
+ result_str += f'{indent}{k}: {v}\n'
return result_str
- @_cli_read_command('orch cert-store cert ls')
- def _cert_store_cert_ls(self, format: Format = Format.plain) -> HandleCommandResult:
- completion = self.cert_store_cert_ls()
+ @_cli_read_command('orch certmgr reload')
+ def _cert_store_reload(self, format: Format = Format.plain) -> HandleCommandResult:
+ completion = self.cert_store_reload()
+ output = raise_if_exception(completion)
+ return HandleCommandResult(stdout=output)
+
+ @_cli_read_command('orch certmgr cert ls')
+ def _cert_store_cert_ls(self, show_details: bool = False, format: Format = Format.plain) -> HandleCommandResult:
+ completion = self.cert_store_cert_ls(show_details)
cert_ls = raise_if_exception(completion)
if format != Format.plain:
return HandleCommandResult(stdout=to_format(cert_ls, format, many=False, cls=None))
result_str = self._process_cert_store_json(cert_ls, 0)
return HandleCommandResult(stdout=result_str)
- @_cli_read_command('orch cert-store key ls')
+ @_cli_read_command('orch certmgr entity ls')
+ def _cert_store_entity_ls(self, format: Format = Format.plain) -> HandleCommandResult:
+ completion = self.cert_store_entity_ls()
+ entity_ls = raise_if_exception(completion)
+ if format != Format.plain:
+ return HandleCommandResult(stdout=to_format(entity_ls, format, many=False, cls=None))
+ else:
+ result_str = yaml.dump(entity_ls, default_flow_style=False, sort_keys=False)
+ return HandleCommandResult(stdout=result_str)
+
+ @_cli_read_command('orch certmgr cert check')
+ def _cert_store_cert_check(self, format: Format = Format.plain) -> HandleCommandResult:
+ completion = self.cert_store_cert_check()
+ cert_check_report = raise_if_exception(completion)
+ if format != Format.plain:
+ return HandleCommandResult(stdout=to_format(cert_check_report, format, many=False, cls=None))
+ else:
+ result_str = "\n".join(f"- {e}" for e in cert_check_report)
+ return HandleCommandResult(stdout=result_str)
+
+ @_cli_read_command('orch certmgr key ls')
def _cert_store_key_ls(self, format: Format = Format.plain) -> HandleCommandResult:
completion = self.cert_store_key_ls()
key_ls = raise_if_exception(completion)
result_str = self._process_cert_store_json(key_ls, 0)
return HandleCommandResult(stdout=result_str)
- @_cli_read_command('orch cert-store get cert')
+ @_cli_read_command('orch certmgr cert get')
def _cert_store_get_cert(
self,
- entity: str,
+ cert_name: str,
_end_positional_: int = 0,
service_name: Optional[str] = None,
hostname: Optional[str] = None,
no_exception_when_missing: bool = False
) -> HandleCommandResult:
completion = self.cert_store_get_cert(
- entity,
+ cert_name,
service_name,
hostname,
no_exception_when_missing
cert = raise_if_exception(completion)
return HandleCommandResult(stdout=cert)
- @_cli_read_command('orch cert-store get key')
+ @_cli_read_command('orch certmgr key get')
def _cert_store_get_key(
self,
- entity: str,
+ key_name: str,
_end_positional_: int = 0,
service_name: Optional[str] = None,
hostname: Optional[str] = None,
no_exception_when_missing: bool = False
) -> HandleCommandResult:
completion = self.cert_store_get_key(
- entity,
+ key_name,
service_name,
hostname,
no_exception_when_missing
key = raise_if_exception(completion)
return HandleCommandResult(stdout=key)
+ @_cli_write_command('orch certmgr cert-key set')
+ def _cert_store_cert_key_set(
+ self,
+ entity: str,
+ _end_positional_: int = 0,
+ cert: Optional[str] = None,
+ key: Optional[str] = None,
+ cert_name: Optional[str] = None,
+ service_name: Optional[str] = None,
+ hostname: Optional[str] = None,
+ force: Optional[bool] = False,
+ inbuf: Optional[str] = None
+ ) -> HandleCommandResult:
+ if inbuf:
+ cert_content, key_content = parse_combined_pem_file(inbuf)
+ if not cert_content or not key_content:
+ raise OrchestratorError('Expected a combined PEM file with certificate and key pairs')
+ else:
+ cert_content, key_content = cert, key
+ if not cert_content or not key_content:
+ raise OrchestratorError('This command requires passing cert/key pair by either using --cert/--key parameters or a combined PEM file using "-i" option.')
+
+ completion = self.cert_store_set_pair(
+ cert_content,
+ key_content,
+ entity,
+ cert_name,
+ service_name,
+ hostname,
+ force
+ )
+ output = raise_if_exception(completion)
+ return HandleCommandResult(stdout=output)
+
+ @_cli_write_command('orch certmgr cert set')
+ def _cert_store_set_cert(
+ self,
+ cert_name: str,
+ _end_positional_: int = 0,
+ cert: Optional[str] = None,
+ service_name: Optional[str] = None,
+ hostname: Optional[str] = None,
+ inbuf: Optional[str] = None
+ ) -> HandleCommandResult:
+ cert_content = cert or inbuf
+ if not cert_content:
+ raise OrchestratorError('This command requires passing a certificate using --cert parameter or "-i <filepath>" option')
+
+ completion = self.cert_store_set_cert(
+ cert_content,
+ cert_name,
+ service_name,
+ hostname,
+ )
+ output = raise_if_exception(completion)
+ return HandleCommandResult(stdout=output)
+
+ @_cli_write_command('orch certmgr key set')
+ def _cert_store_set_key(
+ self,
+ key_name: str,
+ _end_positional_: int = 0,
+ key: Optional[str] = None,
+ service_name: Optional[str] = None,
+ hostname: Optional[str] = None,
+ inbuf: Optional[str] = None
+ ) -> HandleCommandResult:
+ key_content = key or inbuf
+ if not key_content:
+ raise OrchestratorError('This command requires passing a key using --key parameter or "-i <filepath>" option')
+
+ completion = self.cert_store_set_key(
+ key_content,
+ key_name,
+ service_name,
+ hostname,
+ )
+ output = raise_if_exception(completion)
+ return HandleCommandResult(stdout=output)
+
def _get_credentials(self, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[str, str]:
_username = username