force: bool = False
) -> str:
+ def raise_non_editable_cert_error() -> None:
+ if service_name:
+ context = f"service '{service_name}'"
+ elif hostname:
+ context = f"host '{hostname}'"
+ else:
+ context = f"'{consumer}'"
+
+ raise OrchestratorError(
+ f"Certificate '{cert_name}' for {context} is not editable (defined as inline in the spec or generated by cephadm)."
+ )
+
if consumer not in self.cert_mgr.list_consumers():
raise OrchestratorError(f"Invalid consumer: {consumer}. Please use 'ceph orch certmgr bindings ls' to list valid consumers.")
if (scope == TLSObjectScope.HOST and not hostname) or (scope == TLSObjectScope.SERVICE and not service_name):
raise OrchestratorError(scope_errors[scope])
+ if not self.cert_mgr.is_cert_editable(cert_name, service_name or '', hostname or ''):
+ raise_non_editable_cert_error()
+
key_name = cert_name.replace('_cert', '_key')
- self.cert_mgr.save_cert(cert_name, cert, service_name, hostname, True)
- self.cert_mgr.save_key(key_name, key, service_name, hostname, True)
+ self.cert_mgr.save_cert(cert_name, cert, service_name, hostname, user_made=True, editable=True)
+ self.cert_mgr.save_key(key_name, key, service_name, hostname, user_made=True, editable=True)
return "Certificate/key pair set correctly"
@handle_orch_error
hostname: str = "",
) -> str:
- target = service_name or hostname
- cert_info = self.cert_mgr.check_certificate_state(cert_name, target, cert)
- if not cert_info.is_operationally_valid():
- raise OrchestratorError(cert_info.get_status_description())
+ def raise_non_editable_cert_error() -> None:
+ if service_name:
+ context = f"service '{service_name}'"
+ elif hostname:
+ context = f"host '{hostname}'"
+ raise OrchestratorError(
+ f"Certificate '{cert_name}' for {context} is not editable (defined as inline in the spec or generated by cephadm)."
+ )
- self.cert_mgr.save_cert(cert_name, cert, service_name, hostname, True)
+ debug_mode = self.certificate_check_debug_mode and force
+ if not debug_mode:
+ if not self.cert_mgr.is_cert_editable(cert_name, service_name or '', hostname or ''):
+ raise_non_editable_cert_error()
+ target = service_name or hostname
+ cert_info = self.cert_mgr.check_certificate_state(cert_name, target, cert)
+ if not cert_info.is_operationally_valid():
+ raise OrchestratorError(cert_info.get_status_description())
+
+ self.cert_mgr.save_cert(cert_name, cert, service_name, hostname, user_made=True, editable=True)
return f'Certificate for {cert_name} set correctly'
@handle_orch_error