from mgr_module import MgrModule
import orchestrator
-
-
-class TestCompletion(orchestrator.Completion):
- def evaluate(self):
- self.finalize(None)
-
-
-def deferred_read(f):
- # type: (Callable) -> Callable[..., TestCompletion]
- """
- Decorator to make methods return
- a completion object that executes themselves.
- """
-
- @functools.wraps(f)
- def wrapper(*args, **kwargs):
- return TestCompletion(on_complete=lambda _: f(*args, **kwargs))
-
- return wrapper
-
-
-def deferred_write(message):
- def inner(f):
- # type: (Callable) -> Callable[..., TestCompletion]
-
- @functools.wraps(f)
- def wrapper(self, *args, **kwargs):
- return TestCompletion.with_progress(
- message=message,
- mgr=self,
- on_complete=lambda _: f(self, *args, **kwargs),
- )
-
- return wrapper
- return inner
+from orchestrator import handle_orch_error, raise_if_exception
class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
The implementation is similar to the Rook orchestrator, but simpler.
"""
- def process(self, completions: List[TestCompletion]) -> None: # type: ignore
- if completions:
- self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
-
- for p in completions:
- p.evaluate()
-
@CLICommand('test_orchestrator load_data', perm='w')
def _load_data(self, inbuf):
"""
self._initialized = threading.Event()
self._shutdown = threading.Event()
self._init_data({})
- self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
def shutdown(self):
self._shutdown.set()
self._initialized.set()
while not self._shutdown.is_set():
- # XXX hack (or is it?) to kick all completions periodically,
- # in case we had a caller that wait()'ed on them long enough
- # to get persistence but not long enough to get completion
-
- self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
- for p in self.all_progress_references:
- p.update()
-
self._shutdown.wait(5)
def _init_data(self, data=None):
self._daemons = [orchestrator.DaemonDescription.from_json(daemon)
for daemon in data.get('daemons', [])]
- @deferred_read
+ @handle_orch_error
def get_inventory(self, host_filter=None, refresh=False):
"""
There is no guarantee which devices are returned by get_inventory.
daemons.append(daemon)
return daemons
- @deferred_read
+ @handle_orch_error
def describe_service(self, service_type=None, service_name=None, refresh=False):
if self._services:
# Dummy data
return list(filter(_filter_func, services))
- @deferred_read
+ @handle_orch_error
def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False):
"""
There is no guarantee which daemons are returned by describe_service, except that
def preview_drivegroups(self, drive_group_name=None, dg_specs=None):
return [{}]
+ @handle_orch_error
def create_osds(self, drive_group):
- # type: (DriveGroupSpec) -> TestCompletion
+ # type: (DriveGroupSpec) -> str
""" Creates OSDs from a drive group specification.
$: ceph orch osd create -i <dg.file>
The drivegroup file must only contain one spec at a time.
"""
- def run(all_hosts):
- # type: (List[orchestrator.HostSpec]) -> None
- drive_group.validate()
+ drive_group.validate()
+ all_hosts = raise_if_exception(self.get_hosts())
- def get_hosts_func(label=None, as_hostspec=False):
- if as_hostspec:
- return all_hosts
- return [h.hostname for h in all_hosts]
- if not drive_group.placement.filter_matching_hosts(get_hosts_func):
- raise orchestrator.OrchestratorValidationError('failed to match')
+ def get_hosts_func(label=None, as_hostspec=False):
+ if as_hostspec:
+ return all_hosts
+ return [h.hostname for h in all_hosts]
- return self.get_hosts().then(run).then(
- on_complete=orchestrator.ProgressReference(
- message='create_osds',
- mgr=self,
- )
- )
+ if not drive_group.placement.filter_matching_hosts(get_hosts_func):
+ raise orchestrator.OrchestratorValidationError('failed to match')
+ return ''
+ @handle_orch_error
def apply_drivegroups(self, specs):
- # type: (List[DriveGroupSpec]) -> TestCompletion
+ # type: (List[DriveGroupSpec]) -> List[str]
drive_group = specs[0]
- def run(all_hosts):
- # type: (List[orchestrator.HostSpec]) -> None
- drive_group.validate()
-
- def get_hosts_func(label=None, as_hostspec=False):
- if as_hostspec:
- return all_hosts
- return [h.hostname for h in all_hosts]
-
- if not drive_group.placement.filter_matching_hosts(get_hosts_func):
- raise orchestrator.OrchestratorValidationError('failed to match')
- return self.get_hosts().then(run).then(
- on_complete=orchestrator.ProgressReference(
- message='apply_drivesgroups',
- mgr=self,
- )
- )
-
- @deferred_write("remove_daemons")
+ all_hosts = raise_if_exception(self.get_hosts())
+
+ drive_group.validate()
+
+ def get_hosts_func(label=None, as_hostspec=False):
+ if as_hostspec:
+ return all_hosts
+ return [h.hostname for h in all_hosts]
+
+ if not drive_group.placement.filter_matching_hosts(get_hosts_func):
+ raise orchestrator.OrchestratorValidationError('failed to match')
+ return []
+
+ @handle_orch_error
def remove_daemons(self, names):
assert isinstance(names, list)
+ return 'done'
- @deferred_write("remove_service")
+ @handle_orch_error
def remove_service(self, service_name):
assert isinstance(service_name, str)
+ return 'done'
- @deferred_write("blink_device_light")
+ @handle_orch_error
def blink_device_light(self, ident_fault, on, locations):
assert ident_fault in ("ident", "fault")
assert len(locations)
return ''
- @deferred_write("service_action")
+ @handle_orch_error
def service_action(self, action, service_name):
- pass
+ return 'done'
- @deferred_write("daemon_action")
+ @handle_orch_error
def daemon_action(self, action, daemon_name, image=None):
- pass
+ return 'done'
- @deferred_write("Adding NFS service")
+ @handle_orch_error
def add_nfs(self, spec):
- # type: (NFSServiceSpec) -> None
+ # type: (NFSServiceSpec) -> List[str]
assert isinstance(spec.pool, str)
+ return [spec.one_line_str()]
- @deferred_write("apply_nfs")
+ @handle_orch_error
def apply_nfs(self, spec):
- pass
+ return spec.one_line_str()
- @deferred_write("add_iscsi")
+ @handle_orch_error
def add_iscsi(self, spec):
- # type: (IscsiServiceSpec) -> None
- pass
+ # type: (IscsiServiceSpec) -> List[str]
+ return [spec.one_line_str()]
- @deferred_write("apply_iscsi")
+ @handle_orch_error
def apply_iscsi(self, spec):
- # type: (IscsiServiceSpec) -> None
- pass
+ # type: (IscsiServiceSpec) -> str
+ return spec.one_line_str()
- @deferred_write("add_mds")
+ @handle_orch_error
def add_mds(self, spec):
- pass
+ return 'done'
- @deferred_write("add_rgw")
+ @handle_orch_error
def add_rgw(self, spec):
- pass
+ return 'done'
- @deferred_read
+ @handle_orch_error
def get_hosts(self):
if self._inventory:
return [orchestrator.HostSpec(i.name, i.addr, i.labels) for i in self._inventory]
return [orchestrator.HostSpec('localhost')]
- @deferred_write("add_host")
+ @handle_orch_error
def add_host(self, spec):
- # type: (orchestrator.HostSpec) -> None
+ # type: (orchestrator.HostSpec) -> str
host = spec.hostname
if host == 'raise_validation_error':
raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5")
if host == 'raise_import_error':
raise ImportError("test_orchestrator not enabled")
assert isinstance(host, str)
+ return ''
- @deferred_write("remove_host")
+ @handle_orch_error
def remove_host(self, host):
assert isinstance(host, str)
+ return 'done'
- @deferred_write("apply_mgr")
+ @handle_orch_error
def apply_mgr(self, spec):
- # type: (ServiceSpec) -> None
+ # type: (ServiceSpec) -> str
assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
assert all([isinstance(h, str) for h in spec.placement.hosts])
+ return spec.one_line_str()
- @deferred_write("apply_mon")
+ @handle_orch_error
def apply_mon(self, spec):
- # type: (ServiceSpec) -> None
+ # type: (ServiceSpec) -> str
assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
assert all([isinstance(h[0], str) for h in spec.placement.hosts])
assert all([isinstance(h[1], str) or h[1] is None for h in spec.placement.hosts])
+ return spec.one_line_str()
+