TunedProfileSpec,
MgmtGatewaySpec,
NvmeofServiceSpec,
+ CertificateSource
)
from ceph.deployment.drive_group import DeviceSelection
from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
for svc in service_registry.get_all_services():
if svc.allows_user_certificates:
- assert svc.SCOPE != TLSObjectScope.UNKNOWN, f"Service {svc.TYPE} requieres certificates but it has not defined its svc.SCOPE field."
+ if svc.SCOPE == TLSObjectScope.UNKNOWN:
+ OrchestratorError(f"Service {svc.TYPE} requieres certificates but it has not defined its svc.SCOPE field.")
self.cert_mgr.register_cert_key_pair(svc.TYPE, svc.cert_name, svc.key_name, svc.SCOPE)
self.cert_mgr.register_cert_key_pair('nvmeof', 'nvmeof_client_cert', 'nvmeof_client_key', TLSObjectScope.SERVICE)
'certificate': self.cert_mgr.get_root_ca()}
@handle_orch_error
- def cert_store_cert_ls(self, show_details: bool = False) -> Dict[str, Any]:
- return self.cert_mgr.cert_ls(show_details)
+ def cert_store_cert_ls(self,
+ filter_by: str = '',
+ show_details: bool = False,
+ include_cephadm_signed: bool = False) -> Dict[str, Any]:
+ return self.cert_mgr.cert_ls(filter_by, show_details, include_cephadm_signed)
@handle_orch_error
def cert_store_bindings_ls(self) -> Dict[str, Dict[str, List[str]]]:
return report
@handle_orch_error
- def cert_store_key_ls(self) -> Dict[str, Any]:
- return self.cert_mgr.key_ls()
+ def cert_store_key_ls(self, include_cephadm_generated_keys: bool = False) -> Dict[str, Any]:
+ return self.cert_mgr.key_ls(include_cephadm_generated_keys)
@handle_orch_error
def cert_store_get_cert(
if not cert:
if no_exception_when_missing:
return ''
- raise OrchSecretNotFound(entity=cert_name, service_name=service_name, hostname=hostname)
+ raise OrchSecretNotFound(consumer=cert_name, service_name=service_name, hostname=hostname)
return cert
@handle_orch_error
if not key:
if no_exception_when_missing:
return ''
- raise OrchSecretNotFound(entity=key_name, service_name=service_name, hostname=hostname)
+ raise OrchSecretNotFound(consumer=key_name, service_name=service_name, hostname=hostname)
return key
+ def _raise_non_editable_cert_error(self, cert_name: str, consumer: str, service_name: str, hostname: str) -> None:
+ if service_name:
+ context = f"service '{service_name}'"
+ elif hostname:
+ context = f"host '{hostname}'"
+ elif consumer:
+ context = f"'{consumer}'"
+
+ raise OrchestratorError(
+ f"Certificate '{cert_name}' for {context} is not editable (defined as inline in the spec or generated by cephadm)."
+ )
+
@handle_orch_error
def cert_store_set_pair(
self,
cert: str,
key: str,
- entity: str,
+ consumer: str,
cert_name: str = "",
service_name: str = "",
hostname: str = "",
force: bool = False
) -> str:
- def raise_non_editable_cert_error() -> None:
- if service_name:
- context = f"service '{service_name}'"
- elif hostname:
- context = f"host '{hostname}'"
- else:
- context = f"'{consumer}'"
-
- raise OrchestratorError(
- f"Certificate '{cert_name}' for {context} is not editable (defined as inline in the spec or generated by cephadm)."
- )
-
if consumer not in self.cert_mgr.list_consumers():
- raise OrchestratorError(f"Invalid consumer: {consumer}. Please use 'ceph orch certmgr bindings ls' to list valid consumers.")
+ raise OrchestratorError(f"Invalid service: {consumer}. Please use 'ceph orch certmgr bindings ls' to list valid bindings.")
# Check the certificate validity status
target = service_name or hostname
if len(cert_names) == 1:
cert_name = cert_names[0]
elif len(cert_names) > 1 and not cert_name:
- raise OrchestratorError(f"Consumer '{consumer}' has many certificates, please use --cert-name argument to specify which one from the list: {cert_names}")
+ raise OrchestratorError(f"Service '{consumer}' has many certificates, please use the --cert-name argument to specify which one from the list: {cert_names}")
# Check the certificate scope
scope_errors = {
if (scope == TLSObjectScope.HOST and not hostname) or (scope == TLSObjectScope.SERVICE and not service_name):
raise OrchestratorError(scope_errors[scope])
- if not self.cert_mgr.is_cert_editable(cert_name, service_name or '', hostname or ''):
- raise_non_editable_cert_error()
+ if not debug_mode and not self.cert_mgr.is_cert_editable(cert_name, service_name or '', hostname or ''):
+ self._raise_non_editable_cert_error(cert_name, consumer, service_name, hostname)
key_name = cert_name.replace('_cert', '_key')
self.cert_mgr.save_cert(cert_name, cert, service_name, hostname, user_made=True, editable=True)
cert: str,
service_name: str = "",
hostname: str = "",
+ force: bool = False
) -> str:
- def raise_non_editable_cert_error() -> None:
- if service_name:
- context = f"service '{service_name}'"
- elif hostname:
- context = f"host '{hostname}'"
- raise OrchestratorError(
- f"Certificate '{cert_name}' for {context} is not editable (defined as inline in the spec or generated by cephadm)."
- )
-
debug_mode = self.certificate_check_debug_mode and force
if not debug_mode:
if not self.cert_mgr.is_cert_editable(cert_name, service_name or '', hostname or ''):
- raise_non_editable_cert_error()
+ self._raise_non_editable_cert_error(cert_name, '', service_name, hostname)
target = service_name or hostname
cert_info = self.cert_mgr.check_certificate_state(cert_name, target, cert)
if not cert_info.is_operationally_valid():
hostname: Optional[str] = None,
) -> str:
+ cert_err = OrchestratorError("Cannot delete the certificate. Please use 'ceph orch certmgr cert ls' to list available certificates. \n"
+ "Note: for certificates with host/service scope use --service-name or --hostname to specify the target.")
try:
- self.cert_mgr.rm_cert(cert_name, service_name, hostname)
+ if not self.cert_mgr.rm_cert(cert_name, service_name, hostname):
+ raise cert_err
return f'Certificate for {cert_name} removed correctly'
except TLSObjectException:
- raise OrchestratorError("Cannot delete the certificate. Please use 'ceph orch certmgr cert ls' to list available certificates. \n"
- "Note: for certificates with host/service scope use --service-name or --hostname to specify the target.")
+ raise cert_err
@handle_orch_error
def cert_store_rm_key(
results.append(self._plan(cast(ServiceSpec, spec)))
return results
+ def _check_cert_source(self, spec: ServiceSpec) -> str:
+ cert_warning = ''
+ if spec.is_using_certificates_source(CertificateSource.REFERENCE):
+ svc = service_registry.get_service(spec.service_type)
+ if svc.SCOPE == TLSObjectScope.SERVICE:
+ if not self.cert_mgr.cert_exists(svc.cert_name, service_name=spec.service_name(), host=None):
+ raise OrchestratorError(
+ f"\n\nSSL is configured with '{CertificateSource.REFERENCE.value}', but cannot find an entry for the service '{spec.service_name()}'"
+ f"\nunder the certificate '{svc.cert_name}' within the certmgr store. To set the certificate, use:\n"
+ f"\n > ceph orch certmgr cert set --cert-name {svc.cert_name} --service-name {spec.service_name()} -i <cert-key-pem-file> \n"
+ )
+ else:
+ cert_warning = (
+ f"\n\n\nWarning: SSL is configured with '{CertificateSource.REFERENCE.value}', and this service uses per-host certificates.\n\n"
+ f"To configure keys/certificates, run the following commands for each host daemons are deployed on:\n"
+ f" > ceph orch certmgr cert set --cert-name {svc.cert_name} --service-name {spec.service_name()} --hostname <host> -i <cert-file>\n"
+ f" > ceph orch certmgr key set --key-name {svc.cert_name} --service-name {spec.service_name()} --hostname <host> -i <key-file>\n\n"
+ f"Once all certificates are provisioned, run:\n"
+ f" > ceph orch reconfig {spec.service_name()}\n"
+ f"to reconfigure the service with the certificates."
+ )
+ return cert_warning
+
def _apply_service_spec(self, spec: ServiceSpec) -> str:
if spec.placement.is_empty():
# fill in default placement
host_count = len(self.inventory.keys())
max_count = self.max_count_per_host
+ cert_warning = self._check_cert_source(spec)
+
if spec.service_type == 'nvmeof':
nvmeof_spec = cast(NvmeofServiceSpec, spec)
assert nvmeof_spec.pool is not None, "Pool cannot be None for nvmeof services"
spec.service_name(), spec.placement.pretty_str()))
self.spec_store.save(spec)
self._kick_serve_loop()
- return "Scheduled %s update..." % spec.service_name()
+ return f"Scheduled {spec.service_name()} update...{cert_warning}"
@handle_orch_error
def apply(
return HandleCommandResult(stdout=output)
@_cli_read_command('orch certmgr cert ls')
- def _cert_store_cert_ls(self, show_details: bool = False, format: Format = Format.plain) -> HandleCommandResult:
- completion = self.cert_store_cert_ls(show_details)
+ def _cert_store_cert_ls(self,
+ filter_by: str = '',
+ show_details: bool = False,
+ include_cephadm_signed: bool = False,
+ format: Format = Format.plain) -> HandleCommandResult:
+ completion = self.cert_store_cert_ls(filter_by, show_details, include_cephadm_signed)
cert_ls = raise_if_exception(completion)
if format != Format.plain:
return HandleCommandResult(stdout=to_format(cert_ls, format, many=False, cls=None))
result_str = self._process_cert_store_json(cert_ls, 0)
return HandleCommandResult(stdout=result_str)
- @_cli_read_command('orch certmgr entity ls')
- def _cert_store_entity_ls(self, format: Format = Format.plain) -> HandleCommandResult:
- completion = self.cert_store_entity_ls()
- entity_ls = raise_if_exception(completion)
+ @_cli_read_command('orch certmgr bindings ls')
+ def _cert_store_bindings_ls(self, format: Format = Format.plain) -> HandleCommandResult:
+ completion = self.cert_store_bindings_ls()
+ bindings_ls = raise_if_exception(completion)
if format != Format.plain:
- return HandleCommandResult(stdout=to_format(entity_ls, format, many=False, cls=None))
+ return HandleCommandResult(stdout=to_format(bindings_ls, format, many=False, cls=None))
else:
- result_str = yaml.dump(entity_ls, default_flow_style=False, sort_keys=False)
+ result_str = yaml.dump(bindings_ls, default_flow_style=False, sort_keys=False)
return HandleCommandResult(stdout=result_str)
@_cli_read_command('orch certmgr cert check')
return HandleCommandResult(stdout=result_str)
@_cli_read_command('orch certmgr key ls')
- def _cert_store_key_ls(self, format: Format = Format.plain) -> HandleCommandResult:
- completion = self.cert_store_key_ls()
+ def _cert_store_key_ls(self,
+ include_cephadm_generated_keys: bool = False,
+ format: Format = Format.plain) -> HandleCommandResult:
+ completion = self.cert_store_key_ls(include_cephadm_generated_keys)
key_ls = raise_if_exception(completion)
if format != Format.plain:
return HandleCommandResult(stdout=to_format(key_ls, format, many=False, cls=None))
@_cli_write_command('orch certmgr cert-key set')
def _cert_store_cert_key_set(
self,
- entity: str,
+ consumer: str,
_end_positional_: int = 0,
cert: Optional[str] = None,
key: Optional[str] = None,
cert_name: Optional[str] = None,
service_name: Optional[str] = None,
hostname: Optional[str] = None,
- force: Optional[bool] = False,
+ force: bool = False,
inbuf: Optional[str] = None
) -> HandleCommandResult:
"""
completion = self.cert_store_set_pair(
cert_content,
key_content,
- entity,
+ consumer,
cert_name,
service_name,
hostname,
cert: Optional[str] = None,
service_name: Optional[str] = None,
hostname: Optional[str] = None,
+ force: bool = False,
inbuf: Optional[str] = None
) -> HandleCommandResult:
"""
cert_content,
service_name,
hostname,
+ force
)
output = raise_if_exception(completion)
return HandleCommandResult(stdout=output)