]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: replace async_map_completion with a simple wrapper
authorSebastian Wagner <sebastian.wagner@suse.com>
Fri, 20 Mar 2020 16:17:39 +0000 (17:17 +0100)
committerSebastian Wagner <sebastian.wagner@suse.com>
Wed, 25 Mar 2020 11:07:25 +0000 (12:07 +0100)
There is no need to wrap everything into completions.

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tests/test_completion.py

index 38f369c121ca1b35e6ffce82a85cee49408a6129..6e4e2f39388c7eb84f86ba3ac118306d8344549b 100644 (file)
@@ -514,6 +514,36 @@ def ssh_completion(cls=AsyncCompletion, **c_kwargs):
         return wrapper
     return decorator
 
+def forall_hosts(f):
+    @wraps(f)
+    def forall_hosts_wrapper(*args) -> list:
+
+        # Some weired logic to make calling functions with multiple arguments work.
+        if len(args) == 1:
+            vals = args[0]
+            self = None
+        elif len(args) == 2:
+            self, vals = args
+        else:
+            assert 'either f([...]) or self.f([...])'
+
+        def do_work(arg):
+            if not isinstance(arg, tuple):
+                arg = (arg, )
+            try:
+                if self:
+                    return f(self, *arg)
+                return f(*arg)
+            except Exception as e:
+                logger.exception(f'executing {f.__name__}({args}) failed.')
+                raise
+
+        assert CephadmOrchestrator.instance is not None
+        return CephadmOrchestrator.instance._worker_pool.map(do_work, vals)
+
+
+    return forall_hosts_wrapper
+
 
 def async_completion(f):
     # type: (Callable) -> Callable[..., AsyncCompletion]
@@ -1931,6 +1961,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                 result.append(dd)
         return result
 
+    @trivial_completion
     def service_action(self, action, service_name):
         args = []
         for host, dm in self.cache.daemons.items():
@@ -1941,7 +1972,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.log.info('%s service %s' % (action.capitalize(), service_name))
         return self._daemon_actions(args)
 
-    @async_map_completion
+    @forall_hosts
     def _daemon_actions(self, daemon_type, daemon_id, host, action):
         return self._daemon_action(daemon_type, daemon_id, host, action)
 
@@ -1983,6 +2014,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             ','.join(['%s.%s' % (a[0], a[1]) for a in args])))
         return self._daemon_actions(args)
 
+    @trivial_completion
     def remove_daemons(self, names):
         # type: (List[str]) -> orchestrator.Completion
         args = []
@@ -2041,8 +2073,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
         return '\n'.join(out + err)
 
+    @trivial_completion
     def blink_device_light(self, ident_fault, on, locs):
-        @async_map_completion
+        @forall_hosts
         def blink(host, dev, path):
             cmd = [
                 'lsmcli',
@@ -2294,7 +2327,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         return "{} {} on host '{}'".format(
             'Reconfigured' if reconfig else 'Deployed', name, host)
 
-    @async_map_completion
+    @forall_hosts
     def _remove_daemons(self, name, host):
         return self._remove_daemon(name, host)
 
@@ -2563,7 +2596,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             )
             daemons.append(sd)
 
-        @async_map_completion
+        @forall_hosts
         def create_func_map(*args):
             return create_func(*args)
 
@@ -2614,6 +2647,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                                    keyring=keyring,
                                    extra_config=extra_config)
 
+    @trivial_completion
     def add_mon(self, spec):
         # type: (ServiceSpec) -> orchestrator.Completion
         return self._add_daemon('mon', spec, self._create_mon)
@@ -2633,6 +2667,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
         return self._create_daemon('mgr', mgr_id, host, keyring=keyring)
 
+    @trivial_completion
     def add_mgr(self, spec):
         # type: (ServiceSpec) -> orchestrator.Completion
         return self._add_daemon('mgr', spec, self._create_mgr)
@@ -2679,6 +2714,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
     def apply_mgr(self, spec):
         return self._apply(spec)
 
+    @trivial_completion
     def add_mds(self, spec: ServiceSpec):
         return self._add_daemon('mds', spec, self._create_mds, self._config_mds)
 
@@ -2707,6 +2743,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         })
         return self._create_daemon('mds', mds_id, host, keyring=keyring)
 
+    @trivial_completion
     def add_rgw(self, spec):
         return self._add_daemon('rgw', spec, self._create_rgw, self._config_rgw)
 
@@ -2749,6 +2786,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
     def apply_rgw(self, spec):
         return self._apply(spec)
 
+    @trivial_completion
     def add_rbd_mirror(self, spec):
         return self._add_daemon('rbd-mirror', spec, self._create_rbd_mirror)
 
@@ -3017,6 +3055,7 @@ receivers:
             "peers": peers
         }, sorted(deps)
 
+    @trivial_completion
     def add_prometheus(self, spec):
         return self._add_daemon('prometheus', spec, self._create_prometheus)
 
@@ -3027,6 +3066,7 @@ receivers:
     def apply_prometheus(self, spec):
         return self._apply(spec)
 
+    @trivial_completion
     def add_node_exporter(self, spec):
         # type: (ServiceSpec) -> AsyncCompletion
         return self._add_daemon('node-exporter', spec,
@@ -3039,6 +3079,7 @@ receivers:
     def _create_node_exporter(self, daemon_id, host):
         return self._create_daemon('node-exporter', daemon_id, host)
 
+    @trivial_completion
     def add_crash(self, spec):
         # type: (ServiceSpec) -> AsyncCompletion
         return self._add_daemon('crash', spec,
@@ -3057,6 +3098,7 @@ receivers:
         })
         return self._create_daemon('crash', daemon_id, host, keyring=keyring)
 
+    @trivial_completion
     def add_grafana(self, spec):
         # type: (ServiceSpec) -> AsyncCompletion
         return self._add_daemon('grafana', spec, self._create_grafana)
@@ -3069,6 +3111,7 @@ receivers:
         # type: (str, str) -> str
         return self._create_daemon('grafana', daemon_id, host)
 
+    @trivial_completion
     def add_alertmanager(self, spec):
         # type: (ServiceSpec) -> AsyncCompletion
         return self._add_daemon('alertmanager', spec, self._create_alertmanager)
index 085ea6c0a803b374570e26f2aa22bff0dcf13b62..f39aa6c18459821a1430c63b6e7254943b168e54 100644 (file)
@@ -12,7 +12,7 @@ import pytest
 
 from tests import mock
 from .fixtures import cephadm_module, wait
-from ..module import trivial_completion, async_completion, async_map_completion
+from ..module import trivial_completion, async_completion, async_map_completion, forall_hosts
 
 
 class TestCompletion(object):
@@ -53,6 +53,11 @@ class TestCompletion(object):
         wait(cephadm_module, c)
         assert c.result == expected
 
+        @forall_hosts
+        def run_forall(*args):
+            return str(args)
+        assert run_forall(input) == expected
+
     def test_async_self(self, cephadm_module):
         class Run(object):
             def __init__(self):
@@ -83,10 +88,17 @@ class TestCompletion(object):
                 assert self.attr == 1
                 return str(args)
 
+            @forall_hosts
+            def run_forall(self, *args):
+                assert self.attr == 1
+                return str(args)
+
         c = Run().run(input)
         wait(cephadm_module, c)
         assert c.result == expected
 
+        assert Run().run_forall(input) == expected
+
     def test_then1(self, cephadm_module):
         @async_map_completion
         def run(x):