def cert_store_key_ls(self) -> Dict[str, Any]:
return self.cert_key_store.key_ls()
+ @handle_orch_error
+ def cert_store_get_cert(
+ self,
+ entity: str,
+ service_name: Optional[str] = None,
+ hostname: Optional[str] = None
+ ) -> str:
+ cert = self.cert_key_store.get_cert(entity, service_name or '', hostname or '')
+ if not cert:
+ err_msg = f'No cert found for entity {entity}'
+ if service_name:
+ err_msg += f' with service name {service_name}'
+ if hostname:
+ err_msg += f' with hostname {hostname}'
+ raise OrchestratorError(err_msg)
+ return cert
+
+ @handle_orch_error
+ def cert_store_get_key(
+ self,
+ entity: str,
+ service_name: Optional[str] = None,
+ hostname: Optional[str] = None
+ ) -> str:
+ key = self.cert_key_store.get_key(entity, service_name or '', hostname or '')
+ if not key:
+ err_msg = f'No key found for entity {entity}'
+ if service_name:
+ err_msg += f' with service name {service_name}'
+ if hostname:
+ err_msg += f' with hostname {hostname}'
+ raise OrchestratorError(err_msg)
+ return key
+
@handle_orch_error
def apply_mon(self, spec: ServiceSpec) -> str:
return self._apply(spec)
def cert_store_key_ls(self) -> OrchResult[Dict[str, Any]]:
raise NotImplementedError()
+ def cert_store_get_cert(
+ self,
+ entity: str,
+ service_name: Optional[str] = None,
+ hostname: Optional[str] = None
+ ) -> OrchResult[str]:
+ raise NotImplementedError()
+
+ def cert_store_get_key(
+ self,
+ entity: str,
+ service_name: Optional[str] = None,
+ hostname: Optional[str] = None
+ ) -> OrchResult[str]:
+ raise NotImplementedError()
+
@handle_orch_error
def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]:
"""
result_str = self._process_cert_store_json(key_ls, 0)
return HandleCommandResult(stdout=result_str)
+ @_cli_read_command('orch cert-store get cert')
+ def _cert_store_get_cert(
+ self,
+ entity: str,
+ _end_positional_: int = 0,
+ service_name: Optional[str] = None,
+ hostname: Optional[str] = None
+ ) -> HandleCommandResult:
+ completion = self.cert_store_get_cert(entity, service_name, hostname)
+ cert = raise_if_exception(completion)
+ return HandleCommandResult(stdout=cert)
+
+ @_cli_read_command('orch cert-store get key')
+ def _cert_store_get_key(
+ self,
+ entity: str,
+ _end_positional_: int = 0,
+ service_name: Optional[str] = None,
+ hostname: Optional[str] = None
+ ) -> HandleCommandResult:
+ completion = self.cert_store_get_key(entity, service_name, hostname)
+ key = raise_if_exception(completion)
+ return HandleCommandResult(stdout=key)
+
def _get_credentials(self, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[str, str]:
_username = username