From af8fa11a1f132db35fcc8051cb1bb848cb800d17 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 27 Mar 2020 09:00:25 -0500 Subject: [PATCH] Revert "Merge PR #34091 into master" This reverts commit f865f3e0a0f3a646b093b3571ea76713eca1916c, reversing changes made to 7ef5458e26ec7c0565509a7882fa31fa064eb49d. Signed-off-by: Sage Weil --- src/pybind/mgr/cephadm/module.py | 138 +++++++++++------- .../mgr/cephadm/tests/test_completion.py | 133 ++++++++++++++++- 2 files changed, 214 insertions(+), 57 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 7e89a365500f4..676768ee8cdac 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -386,10 +386,12 @@ class AsyncCompletion(orchestrator.Completion): value=orchestrator._Promise.NO_RESULT, # type: Any on_complete=None, # type: Optional[Callable] name=None, # type: Optional[str] + many=False, # type: bool update_progress=False, # type: bool ): assert CephadmOrchestrator.instance is not None + self.many = many self.update_progress = update_progress if name is None and on_complete is not None: name = getattr(on_complete, '__name__', None) @@ -431,14 +433,33 @@ class AsyncCompletion(orchestrator.Completion): assert self._on_complete_ is not None try: res = self._on_complete_(*args, **kwargs) + if self.update_progress and self.many: + assert self.progress_reference + self.progress_reference.progress += 1.0 / len(value) return res except Exception as e: self.fail(e) raise assert CephadmOrchestrator.instance - CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,), - callback=callback, error_callback=error_callback) + if self.many: + if not value: + logger.info('calling map_async without values') + callback([]) + if six.PY3: + CephadmOrchestrator.instance._worker_pool.map_async(do_work, value, + callback=callback, + error_callback=error_callback) + else: + CephadmOrchestrator.instance._worker_pool.map_async(do_work, value, + callback=callback) + else: + if six.PY3: + CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,), + callback=callback, error_callback=error_callback) + else: + CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,), + callback=callback) return self.ASYNC_RESULT return run @@ -449,35 +470,66 @@ class AsyncCompletion(orchestrator.Completion): self._on_complete_ = inner -def forall_hosts(f): - @wraps(f) - def forall_hosts_wrapper(*args) -> list: - - # Some weired logic to make calling functions with multiple arguments work. - if len(args) == 1: - vals = args[0] - self = None - elif len(args) == 2: - self, vals = args - else: - assert 'either f([...]) or self.f([...])' +def ssh_completion(cls=AsyncCompletion, **c_kwargs): + # type: (Type[orchestrator.Completion], Any) -> Callable + """ + See ./HACKING.rst for a how-to + """ + def decorator(f): + @wraps(f) + def wrapper(*args): + + name = f.__name__ + many = c_kwargs.get('many', False) + + # Some weired logic to make calling functions with multiple arguments work. + if len(args) == 1: + [value] = args + if many and value and isinstance(value[0], tuple): + return cls(on_complete=lambda x: f(*x), value=value, name=name, **c_kwargs) + else: + return cls(on_complete=f, value=value, name=name, **c_kwargs) + else: + if many: + self, value = args - def do_work(arg): - if not isinstance(arg, tuple): - arg = (arg, ) - try: - if self: - return f(self, *arg) - return f(*arg) - except Exception as e: - logger.exception(f'executing {f.__name__}({args}) failed.') - raise + def call_self(inner_args): + if not isinstance(inner_args, tuple): + inner_args = (inner_args, ) + return f(self, *inner_args) - assert CephadmOrchestrator.instance is not None - return CephadmOrchestrator.instance._worker_pool.map(do_work, vals) + return cls(on_complete=call_self, value=value, name=name, **c_kwargs) + else: + return cls(on_complete=lambda x: f(*x), value=args, name=name, **c_kwargs) + + return wrapper + return decorator + + +def async_completion(f): + # type: (Callable) -> Callable[..., AsyncCompletion] + """ + See ./HACKING.rst for a how-to + + :param f: wrapped function + """ + return ssh_completion()(f) + + +def async_map_completion(f): + # type: (Callable) -> Callable[..., AsyncCompletion] + """ + See ./HACKING.rst for a how-to + :param f: wrapped function - return forall_hosts_wrapper + kind of similar to + + >>> def sync_map(f): + ... return lambda x: map(f, x) + + """ + return ssh_completion(many=True)(f) def trivial_completion(f): @@ -1608,7 +1660,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): r.append(h) return r - @trivial_completion + @async_completion def add_host(self, spec): # type: (HostSpec) -> str """ @@ -1632,7 +1684,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.log.info('Added host %s' % spec.hostname) return "Added host '{}'".format(spec.hostname) - @trivial_completion + @async_completion def remove_host(self, host): # type: (str) -> str """ @@ -1648,7 +1700,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.log.info('Removed host %s' % host) return "Removed host '{}'".format(host) - @trivial_completion + @async_completion def update_host_addr(self, host, addr): if host not in self.inventory: raise OrchestratorError('host %s not registered' % host) @@ -1678,7 +1730,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): )) return r - @trivial_completion + @async_completion def add_host_label(self, host, label): if host not in self.inventory: raise OrchestratorError('host %s does not exist' % host) @@ -1691,7 +1743,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.log.info('Added label %s to host %s' % (label, host)) return 'Added label %s to host %s' % (label, host) - @trivial_completion + @async_completion def remove_host_label(self, host, label): if host not in self.inventory: raise OrchestratorError('host %s does not exist' % host) @@ -1879,7 +1931,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): result.append(dd) return result - @trivial_completion def service_action(self, action, service_name): args = [] for host, dm in self.cache.daemons.items(): @@ -1890,7 +1941,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.log.info('%s service %s' % (action.capitalize(), service_name)) return self._daemon_actions(args) - @forall_hosts + @async_map_completion def _daemon_actions(self, daemon_type, daemon_id, host, action): return self._daemon_action(daemon_type, daemon_id, host, action) @@ -1916,7 +1967,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.cache.invalidate_host_daemons(host) return "{} {} from host '{}'".format(action, name, host) - @trivial_completion def daemon_action(self, action, daemon_type, daemon_id): args = [] for host, dm in self.cache.daemons.items(): @@ -1933,7 +1983,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): ','.join(['%s.%s' % (a[0], a[1]) for a in args]))) return self._daemon_actions(args) - @trivial_completion def remove_daemons(self, names): # type: (List[str]) -> orchestrator.Completion args = [] @@ -1992,9 +2041,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err)) return '\n'.join(out + err) - @trivial_completion def blink_device_light(self, ident_fault, on, locs): - @forall_hosts + @async_map_completion def blink(host, dev, path): cmd = [ 'lsmcli', @@ -2260,7 +2308,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): return "{} {} on host '{}'".format( 'Reconfigured' if reconfig else 'Deployed', name, host) - @forall_hosts + @async_map_completion def _remove_daemons(self, name, host): return self._remove_daemon(name, host) @@ -2535,7 +2583,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): ) daemons.append(sd) - @forall_hosts + @async_map_completion def create_func_map(*args): return create_func(*args) @@ -2586,7 +2634,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): keyring=keyring, extra_config=extra_config) - @trivial_completion def add_mon(self, spec): # type: (ServiceSpec) -> orchestrator.Completion return self._add_daemon('mon', spec, self._create_mon) @@ -2606,7 +2653,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): return self._create_daemon('mgr', mgr_id, host, keyring=keyring) - @trivial_completion def add_mgr(self, spec): # type: (ServiceSpec) -> orchestrator.Completion return self._add_daemon('mgr', spec, self._create_mgr) @@ -2654,7 +2700,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): def apply_mgr(self, spec): return self._apply(spec) - @trivial_completion def add_mds(self, spec: ServiceSpec): return self._add_daemon('mds', spec, self._create_mds, self._config_mds) @@ -2683,7 +2728,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): }) return self._create_daemon('mds', mds_id, host, keyring=keyring) - @trivial_completion def add_rgw(self, spec): return self._add_daemon('rgw', spec, self._create_rgw, self._config_rgw) @@ -2726,7 +2770,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): def apply_rgw(self, spec): return self._apply(spec) - @trivial_completion def add_rbd_mirror(self, spec): return self._add_daemon('rbd-mirror', spec, self._create_rbd_mirror) @@ -3047,7 +3090,6 @@ receivers: "peers": peers }, sorted(deps) - @trivial_completion def add_prometheus(self, spec): return self._add_daemon('prometheus', spec, self._create_prometheus) @@ -3058,7 +3100,6 @@ receivers: def apply_prometheus(self, spec): return self._apply(spec) - @trivial_completion def add_node_exporter(self, spec): # type: (ServiceSpec) -> AsyncCompletion return self._add_daemon('node-exporter', spec, @@ -3071,7 +3112,6 @@ receivers: def _create_node_exporter(self, daemon_id, host): return self._create_daemon('node-exporter', daemon_id, host) - @trivial_completion def add_crash(self, spec): # type: (ServiceSpec) -> AsyncCompletion return self._add_daemon('crash', spec, @@ -3090,7 +3130,6 @@ receivers: }) return self._create_daemon('crash', daemon_id, host, keyring=keyring) - @trivial_completion def add_grafana(self, spec): # type: (ServiceSpec) -> AsyncCompletion return self._add_daemon('grafana', spec, self._create_grafana) @@ -3103,7 +3142,6 @@ receivers: # type: (str, str) -> str return self._create_daemon('grafana', daemon_id, host) - @trivial_completion def add_alertmanager(self, spec): # type: (ServiceSpec) -> AsyncCompletion return self._add_daemon('alertmanager', spec, self._create_alertmanager) diff --git a/src/pybind/mgr/cephadm/tests/test_completion.py b/src/pybind/mgr/cephadm/tests/test_completion.py index 41c124677cc57..085ea6c0a803b 100644 --- a/src/pybind/mgr/cephadm/tests/test_completion.py +++ b/src/pybind/mgr/cephadm/tests/test_completion.py @@ -12,7 +12,7 @@ import pytest from tests import mock from .fixtures import cephadm_module, wait -from ..module import trivial_completion, forall_hosts +from ..module import trivial_completion, async_completion, async_map_completion class TestCompletion(object): @@ -23,6 +23,19 @@ class TestCompletion(object): return x+1 assert wait(cephadm_module, run(1)) == 2 + @pytest.mark.parametrize("input", [ + ((1, ), ), + ((1, 2), ), + (("hallo", ), ), + (("hallo", "foo"), ), + ]) + def test_async(self, input, cephadm_module): + @async_completion + def run(*args): + return str(args) + + assert wait(cephadm_module, run(*input)) == str(input) + @pytest.mark.parametrize("input,expected", [ ([], []), ([1], ["(1,)"]), @@ -32,11 +45,25 @@ class TestCompletion(object): ([(1, 2), (3, 4)], ["(1, 2)", "(3, 4)"]), ]) def test_async_map(self, input, expected, cephadm_module): - @forall_hosts - def run_forall(*args): + @async_map_completion + def run(*args): return str(args) - assert run_forall(input) == expected + c = run(input) + wait(cephadm_module, c) + assert c.result == expected + + def test_async_self(self, cephadm_module): + class Run(object): + def __init__(self): + self.attr = 1 + + @async_completion + def run(self, x): + assert self.attr == 1 + return x + 1 + + assert wait(cephadm_module, Run().run(1)) == 2 @pytest.mark.parametrize("input,expected", [ ([], []), @@ -51,9 +78,101 @@ class TestCompletion(object): def __init__(self): self.attr = 1 - @forall_hosts - def run_forall(self, *args): + @async_map_completion + def run(self, *args): assert self.attr == 1 return str(args) - assert Run().run_forall(input) == expected + c = Run().run(input) + wait(cephadm_module, c) + assert c.result == expected + + def test_then1(self, cephadm_module): + @async_map_completion + def run(x): + return x+1 + + assert wait(cephadm_module, run([1,2]).then(str)) == '[2, 3]' + + def test_then2(self, cephadm_module): + @async_map_completion + def run(x): + time.sleep(0.1) + return x+1 + + @async_completion + def async_str(results): + return str(results) + + c = run([1,2]).then(async_str) + + wait(cephadm_module, c) + assert c.result == '[2, 3]' + + def test_then3(self, cephadm_module): + @async_map_completion + def run(x): + time.sleep(0.1) + return x+1 + + def async_str(results): + return async_completion(str)(results) + + c = run([1,2]).then(async_str) + + wait(cephadm_module, c) + assert c.result == '[2, 3]' + + def test_then4(self, cephadm_module): + @async_map_completion + def run(x): + time.sleep(0.1) + return x+1 + + def async_str(results): + return async_completion(str)(results).then(lambda x: x + "hello") + + c = run([1,2]).then(async_str) + + wait(cephadm_module, c) + assert c.result == '[2, 3]hello' + + @pytest.mark.skip(reason="see limitation of async_map_completion") + def test_then5(self, cephadm_module): + @async_map_completion + def run(x): + time.sleep(0.1) + return async_completion(str)(x+1) + + c = run([1,2]) + + wait(cephadm_module, c) + assert c.result == "['2', '3']" + + def test_raise(self, cephadm_module): + @async_completion + def run(x): + raise ZeroDivisionError() + + with pytest.raises(ZeroDivisionError): + wait(cephadm_module, run(1)) + + def test_progress(self, cephadm_module): + @async_map_completion + def run(*args): + return str(args) + + c = run(list(range(2))) + c.update_progress = True + c.add_progress( + mgr=cephadm_module, + message="my progress" + ) + wait(cephadm_module, c) + assert c.result == [str((x,)) for x in range(2)] + assert cephadm_module.remote.mock_calls == [ + mock.call('progress', 'update', mock.ANY, 'my progress', float(i) / 2, [('origin', 'orchestrator')]) + for i in range(2+1)] + [ + mock.call('progress', 'update', mock.ANY, 'my progress', 1.0, [('origin', 'orchestrator')]), + mock.call('progress', 'complete', mock.ANY), + ] -- 2.39.5