From 1ecb93a47066aff0f68d9dd7abd145c2ca22dcd1 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Mon, 8 Feb 2021 01:56:16 +0100 Subject: [PATCH] mgr/test_orchestrator: adapt to now orch interface Signed-off-by: Sebastian Wagner --- src/pybind/mgr/test_orchestrator/module.py | 185 ++++++++------------- 1 file changed, 67 insertions(+), 118 deletions(-) diff --git a/src/pybind/mgr/test_orchestrator/module.py b/src/pybind/mgr/test_orchestrator/module.py index 8a62ba5c1fb98..1f527cba07657 100644 --- a/src/pybind/mgr/test_orchestrator/module.py +++ b/src/pybind/mgr/test_orchestrator/module.py @@ -20,41 +20,7 @@ from mgr_module import CLICommand, HandleCommandResult from mgr_module import MgrModule import orchestrator - - -class TestCompletion(orchestrator.Completion): - def evaluate(self): - self.finalize(None) - - -def deferred_read(f): - # type: (Callable) -> Callable[..., TestCompletion] - """ - Decorator to make methods return - a completion object that executes themselves. - """ - - @functools.wraps(f) - def wrapper(*args, **kwargs): - return TestCompletion(on_complete=lambda _: f(*args, **kwargs)) - - return wrapper - - -def deferred_write(message): - def inner(f): - # type: (Callable) -> Callable[..., TestCompletion] - - @functools.wraps(f) - def wrapper(self, *args, **kwargs): - return TestCompletion.with_progress( - message=message, - mgr=self, - on_complete=lambda _: f(self, *args, **kwargs), - ) - - return wrapper - return inner +from orchestrator import handle_orch_error, raise_if_exception class TestOrchestrator(MgrModule, orchestrator.Orchestrator): @@ -67,13 +33,6 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): The implementation is similar to the Rook orchestrator, but simpler. """ - def process(self, completions: List[TestCompletion]) -> None: # type: ignore - if completions: - self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions))) - - for p in completions: - p.evaluate() - @CLICommand('test_orchestrator load_data', perm='w') def _load_data(self, inbuf): """ @@ -98,7 +57,6 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): self._initialized = threading.Event() self._shutdown = threading.Event() self._init_data({}) - self.all_progress_references = list() # type: List[orchestrator.ProgressReference] def shutdown(self): self._shutdown.set() @@ -108,14 +66,6 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): self._initialized.set() while not self._shutdown.is_set(): - # XXX hack (or is it?) to kick all completions periodically, - # in case we had a caller that wait()'ed on them long enough - # to get persistence but not long enough to get completion - - self.all_progress_references = [p for p in self.all_progress_references if not p.effective] - for p in self.all_progress_references: - p.update() - self._shutdown.wait(5) def _init_data(self, data=None): @@ -126,7 +76,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): self._daemons = [orchestrator.DaemonDescription.from_json(daemon) for daemon in data.get('daemons', [])] - @deferred_read + @handle_orch_error def get_inventory(self, host_filter=None, refresh=False): """ There is no guarantee which devices are returned by get_inventory. @@ -190,7 +140,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): daemons.append(daemon) return daemons - @deferred_read + @handle_orch_error def describe_service(self, service_type=None, service_name=None, refresh=False): if self._services: # Dummy data @@ -218,7 +168,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): return list(filter(_filter_func, services)) - @deferred_read + @handle_orch_error def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False): """ There is no guarantee which daemons are returned by describe_service, except that @@ -246,8 +196,9 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): def preview_drivegroups(self, drive_group_name=None, dg_specs=None): return [{}] + @handle_orch_error def create_osds(self, drive_group): - # type: (DriveGroupSpec) -> TestCompletion + # type: (DriveGroupSpec) -> str """ Creates OSDs from a drive group specification. $: ceph orch osd create -i @@ -255,105 +206,98 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): The drivegroup file must only contain one spec at a time. """ - def run(all_hosts): - # type: (List[orchestrator.HostSpec]) -> None - drive_group.validate() + drive_group.validate() + all_hosts = raise_if_exception(self.get_hosts()) - def get_hosts_func(label=None, as_hostspec=False): - if as_hostspec: - return all_hosts - return [h.hostname for h in all_hosts] - if not drive_group.placement.filter_matching_hosts(get_hosts_func): - raise orchestrator.OrchestratorValidationError('failed to match') + def get_hosts_func(label=None, as_hostspec=False): + if as_hostspec: + return all_hosts + return [h.hostname for h in all_hosts] - return self.get_hosts().then(run).then( - on_complete=orchestrator.ProgressReference( - message='create_osds', - mgr=self, - ) - ) + if not drive_group.placement.filter_matching_hosts(get_hosts_func): + raise orchestrator.OrchestratorValidationError('failed to match') + return '' + @handle_orch_error def apply_drivegroups(self, specs): - # type: (List[DriveGroupSpec]) -> TestCompletion + # type: (List[DriveGroupSpec]) -> List[str] drive_group = specs[0] - def run(all_hosts): - # type: (List[orchestrator.HostSpec]) -> None - drive_group.validate() - - def get_hosts_func(label=None, as_hostspec=False): - if as_hostspec: - return all_hosts - return [h.hostname for h in all_hosts] - - if not drive_group.placement.filter_matching_hosts(get_hosts_func): - raise orchestrator.OrchestratorValidationError('failed to match') - return self.get_hosts().then(run).then( - on_complete=orchestrator.ProgressReference( - message='apply_drivesgroups', - mgr=self, - ) - ) - - @deferred_write("remove_daemons") + all_hosts = raise_if_exception(self.get_hosts()) + + drive_group.validate() + + def get_hosts_func(label=None, as_hostspec=False): + if as_hostspec: + return all_hosts + return [h.hostname for h in all_hosts] + + if not drive_group.placement.filter_matching_hosts(get_hosts_func): + raise orchestrator.OrchestratorValidationError('failed to match') + return [] + + @handle_orch_error def remove_daemons(self, names): assert isinstance(names, list) + return 'done' - @deferred_write("remove_service") + @handle_orch_error def remove_service(self, service_name): assert isinstance(service_name, str) + return 'done' - @deferred_write("blink_device_light") + @handle_orch_error def blink_device_light(self, ident_fault, on, locations): assert ident_fault in ("ident", "fault") assert len(locations) return '' - @deferred_write("service_action") + @handle_orch_error def service_action(self, action, service_name): - pass + return 'done' - @deferred_write("daemon_action") + @handle_orch_error def daemon_action(self, action, daemon_name, image=None): - pass + return 'done' - @deferred_write("Adding NFS service") + @handle_orch_error def add_nfs(self, spec): - # type: (NFSServiceSpec) -> None + # type: (NFSServiceSpec) -> List[str] assert isinstance(spec.pool, str) + return [spec.one_line_str()] - @deferred_write("apply_nfs") + @handle_orch_error def apply_nfs(self, spec): - pass + return spec.one_line_str() - @deferred_write("add_iscsi") + @handle_orch_error def add_iscsi(self, spec): - # type: (IscsiServiceSpec) -> None - pass + # type: (IscsiServiceSpec) -> List[str] + return [spec.one_line_str()] - @deferred_write("apply_iscsi") + @handle_orch_error def apply_iscsi(self, spec): - # type: (IscsiServiceSpec) -> None - pass + # type: (IscsiServiceSpec) -> str + return spec.one_line_str() - @deferred_write("add_mds") + @handle_orch_error def add_mds(self, spec): - pass + return 'done' - @deferred_write("add_rgw") + @handle_orch_error def add_rgw(self, spec): - pass + return 'done' - @deferred_read + @handle_orch_error def get_hosts(self): if self._inventory: return [orchestrator.HostSpec(i.name, i.addr, i.labels) for i in self._inventory] return [orchestrator.HostSpec('localhost')] - @deferred_write("add_host") + @handle_orch_error def add_host(self, spec): - # type: (orchestrator.HostSpec) -> None + # type: (orchestrator.HostSpec) -> str host = spec.hostname if host == 'raise_validation_error': raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5") @@ -368,22 +312,27 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): if host == 'raise_import_error': raise ImportError("test_orchestrator not enabled") assert isinstance(host, str) + return '' - @deferred_write("remove_host") + @handle_orch_error def remove_host(self, host): assert isinstance(host, str) + return 'done' - @deferred_write("apply_mgr") + @handle_orch_error def apply_mgr(self, spec): - # type: (ServiceSpec) -> None + # type: (ServiceSpec) -> str assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count assert all([isinstance(h, str) for h in spec.placement.hosts]) + return spec.one_line_str() - @deferred_write("apply_mon") + @handle_orch_error def apply_mon(self, spec): - # type: (ServiceSpec) -> None + # type: (ServiceSpec) -> str assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count assert all([isinstance(h[0], str) for h in spec.placement.hosts]) assert all([isinstance(h[1], str) or h[1] is None for h in spec.placement.hosts]) + return spec.one_line_str() + -- 2.39.5