from typing import Any, Dict, List, Optional
from ceph.deployment.service_spec import ServiceSpec
-from orchestrator import Completion, DaemonDescription, DeviceLightLoc, \
- HostSpec, InventoryFilter, OrchestratorClientMixin, OrchestratorError, \
+from orchestrator import DaemonDescription, DeviceLightLoc, HostSpec, \
+ InventoryFilter, OrchestratorClientMixin, OrchestratorError, OrchResult, \
ServiceDescription, raise_if_exception
from .. import mgr
available=False,
message='Orchestrator is unavailable: {}'.format(str(e)))
- def orchestrator_wait(self, completions):
- return self._orchestrator_wait(completions)
-
def wait_api_result(method):
@wraps(method)
def inner(self, *args, **kwargs):
completion = method(self, *args, **kwargs)
- self.api.orchestrator_wait([completion])
raise_if_exception(completion)
return completion.result
return inner
return self.api.remove_host(hostname)
@wait_api_result
- def add_label(self, host: str, label: str) -> Completion:
+ def add_label(self, host: str, label: str) -> OrchResult[str]:
return self.api.add_host_label(host, label)
@wait_api_result
- def remove_label(self, host: str, label: str) -> Completion:
+ def remove_label(self, host: str, label: str) -> OrchResult[str]:
return self.api.remove_host_label(host, label)
raise_if_exception(c)
@wait_api_result
- def apply(self, service_spec: Dict) -> Completion:
+ def apply(self, service_spec: Dict) -> OrchResult[List[str]]:
spec = ServiceSpec.from_json(service_spec)
return self.api.apply([spec])
@wait_api_result
def blink_device_light(self, hostname, device, ident_fault, on):
- # type: (str, str, str, bool) -> Completion
+ # type: (str, str, str, bool) -> OrchResult[List[str]]
return self.api.blink_device_light(
ident_fault, on, [DeviceLightLoc(hostname, device, device)])