From: Sebastian Wagner Date: Fri, 20 Mar 2020 16:17:39 +0000 (+0100) Subject: mgr/cephadm: replace async_map_completion with a simple wrapper X-Git-Tag: v16.1.0~2779^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a78f42e225bbc041324dc3e565deb207feddad26;p=ceph.git mgr/cephadm: replace async_map_completion with a simple wrapper There is no need to wrap everything into completions. Signed-off-by: Sebastian Wagner --- diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 38f369c121ca1..6e4e2f39388c7 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -514,6 +514,36 @@ def ssh_completion(cls=AsyncCompletion, **c_kwargs): return wrapper return decorator +def forall_hosts(f): + @wraps(f) + def forall_hosts_wrapper(*args) -> list: + + # Some weired logic to make calling functions with multiple arguments work. + if len(args) == 1: + vals = args[0] + self = None + elif len(args) == 2: + self, vals = args + else: + assert 'either f([...]) or self.f([...])' + + def do_work(arg): + if not isinstance(arg, tuple): + arg = (arg, ) + try: + if self: + return f(self, *arg) + return f(*arg) + except Exception as e: + logger.exception(f'executing {f.__name__}({args}) failed.') + raise + + assert CephadmOrchestrator.instance is not None + return CephadmOrchestrator.instance._worker_pool.map(do_work, vals) + + + return forall_hosts_wrapper + def async_completion(f): # type: (Callable) -> Callable[..., AsyncCompletion] @@ -1931,6 +1961,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): result.append(dd) return result + @trivial_completion def service_action(self, action, service_name): args = [] for host, dm in self.cache.daemons.items(): @@ -1941,7 +1972,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.log.info('%s service %s' % (action.capitalize(), service_name)) return self._daemon_actions(args) - @async_map_completion + @forall_hosts def _daemon_actions(self, daemon_type, daemon_id, host, action): return self._daemon_action(daemon_type, daemon_id, host, action) @@ -1983,6 +2014,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): ','.join(['%s.%s' % (a[0], a[1]) for a in args]))) return self._daemon_actions(args) + @trivial_completion def remove_daemons(self, names): # type: (List[str]) -> orchestrator.Completion args = [] @@ -2041,8 +2073,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err)) return '\n'.join(out + err) + @trivial_completion def blink_device_light(self, ident_fault, on, locs): - @async_map_completion + @forall_hosts def blink(host, dev, path): cmd = [ 'lsmcli', @@ -2294,7 +2327,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): return "{} {} on host '{}'".format( 'Reconfigured' if reconfig else 'Deployed', name, host) - @async_map_completion + @forall_hosts def _remove_daemons(self, name, host): return self._remove_daemon(name, host) @@ -2563,7 +2596,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): ) daemons.append(sd) - @async_map_completion + @forall_hosts def create_func_map(*args): return create_func(*args) @@ -2614,6 +2647,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): keyring=keyring, extra_config=extra_config) + @trivial_completion def add_mon(self, spec): # type: (ServiceSpec) -> orchestrator.Completion return self._add_daemon('mon', spec, self._create_mon) @@ -2633,6 +2667,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): return self._create_daemon('mgr', mgr_id, host, keyring=keyring) + @trivial_completion def add_mgr(self, spec): # type: (ServiceSpec) -> orchestrator.Completion return self._add_daemon('mgr', spec, self._create_mgr) @@ -2679,6 +2714,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): def apply_mgr(self, spec): return self._apply(spec) + @trivial_completion def add_mds(self, spec: ServiceSpec): return self._add_daemon('mds', spec, self._create_mds, self._config_mds) @@ -2707,6 +2743,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): }) return self._create_daemon('mds', mds_id, host, keyring=keyring) + @trivial_completion def add_rgw(self, spec): return self._add_daemon('rgw', spec, self._create_rgw, self._config_rgw) @@ -2749,6 +2786,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): def apply_rgw(self, spec): return self._apply(spec) + @trivial_completion def add_rbd_mirror(self, spec): return self._add_daemon('rbd-mirror', spec, self._create_rbd_mirror) @@ -3017,6 +3055,7 @@ receivers: "peers": peers }, sorted(deps) + @trivial_completion def add_prometheus(self, spec): return self._add_daemon('prometheus', spec, self._create_prometheus) @@ -3027,6 +3066,7 @@ receivers: def apply_prometheus(self, spec): return self._apply(spec) + @trivial_completion def add_node_exporter(self, spec): # type: (ServiceSpec) -> AsyncCompletion return self._add_daemon('node-exporter', spec, @@ -3039,6 +3079,7 @@ receivers: def _create_node_exporter(self, daemon_id, host): return self._create_daemon('node-exporter', daemon_id, host) + @trivial_completion def add_crash(self, spec): # type: (ServiceSpec) -> AsyncCompletion return self._add_daemon('crash', spec, @@ -3057,6 +3098,7 @@ receivers: }) return self._create_daemon('crash', daemon_id, host, keyring=keyring) + @trivial_completion def add_grafana(self, spec): # type: (ServiceSpec) -> AsyncCompletion return self._add_daemon('grafana', spec, self._create_grafana) @@ -3069,6 +3111,7 @@ receivers: # type: (str, str) -> str return self._create_daemon('grafana', daemon_id, host) + @trivial_completion def add_alertmanager(self, spec): # type: (ServiceSpec) -> AsyncCompletion return self._add_daemon('alertmanager', spec, self._create_alertmanager) diff --git a/src/pybind/mgr/cephadm/tests/test_completion.py b/src/pybind/mgr/cephadm/tests/test_completion.py index 085ea6c0a803b..f39aa6c184598 100644 --- a/src/pybind/mgr/cephadm/tests/test_completion.py +++ b/src/pybind/mgr/cephadm/tests/test_completion.py @@ -12,7 +12,7 @@ import pytest from tests import mock from .fixtures import cephadm_module, wait -from ..module import trivial_completion, async_completion, async_map_completion +from ..module import trivial_completion, async_completion, async_map_completion, forall_hosts class TestCompletion(object): @@ -53,6 +53,11 @@ class TestCompletion(object): wait(cephadm_module, c) assert c.result == expected + @forall_hosts + def run_forall(*args): + return str(args) + assert run_forall(input) == expected + def test_async_self(self, cephadm_module): class Run(object): def __init__(self): @@ -83,10 +88,17 @@ class TestCompletion(object): assert self.attr == 1 return str(args) + @forall_hosts + def run_forall(self, *args): + assert self.attr == 1 + return str(args) + c = Run().run(input) wait(cephadm_module, c) assert c.result == expected + assert Run().run_forall(input) == expected + def test_then1(self, cephadm_module): @async_map_completion def run(x):