]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Revert "Merge PR #34091 into master" 34260/head
authorSage Weil <sage@redhat.com>
Fri, 27 Mar 2020 14:00:25 +0000 (09:00 -0500)
committerSage Weil <sage@redhat.com>
Fri, 27 Mar 2020 18:38:45 +0000 (13:38 -0500)
This reverts commit f865f3e0a0f3a646b093b3571ea76713eca1916c, reversing
changes made to 7ef5458e26ec7c0565509a7882fa31fa064eb49d.

Signed-off-by: Sage Weil <sage@redhat.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tests/test_completion.py

index 7e89a365500f4d2748a3ef6e21d227c0b45a26df..676768ee8cdac29c5ffff9f2f562c8f74d4c26ae 100644 (file)
@@ -386,10 +386,12 @@ class AsyncCompletion(orchestrator.Completion):
                  value=orchestrator._Promise.NO_RESULT,  # type: Any
                  on_complete=None,  # type: Optional[Callable]
                  name=None,  # type: Optional[str]
+                 many=False, # type: bool
                  update_progress=False,  # type: bool
                  ):
 
         assert CephadmOrchestrator.instance is not None
+        self.many = many
         self.update_progress = update_progress
         if name is None and on_complete is not None:
             name = getattr(on_complete, '__name__', None)
@@ -431,14 +433,33 @@ class AsyncCompletion(orchestrator.Completion):
                 assert self._on_complete_ is not None
                 try:
                     res = self._on_complete_(*args, **kwargs)
+                    if self.update_progress and self.many:
+                        assert self.progress_reference
+                        self.progress_reference.progress += 1.0 / len(value)
                     return res
                 except Exception as e:
                     self.fail(e)
                     raise
 
             assert CephadmOrchestrator.instance
-            CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
-                                                              callback=callback, error_callback=error_callback)
+            if self.many:
+                if not value:
+                    logger.info('calling map_async without values')
+                    callback([])
+                if six.PY3:
+                    CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
+                                                                    callback=callback,
+                                                                    error_callback=error_callback)
+                else:
+                    CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
+                                                                    callback=callback)
+            else:
+                if six.PY3:
+                    CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
+                                                                      callback=callback, error_callback=error_callback)
+                else:
+                    CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
+                                                                      callback=callback)
             return self.ASYNC_RESULT
 
         return run
@@ -449,35 +470,66 @@ class AsyncCompletion(orchestrator.Completion):
         self._on_complete_ = inner
 
 
-def forall_hosts(f):
-    @wraps(f)
-    def forall_hosts_wrapper(*args) -> list:
-
-        # Some weired logic to make calling functions with multiple arguments work.
-        if len(args) == 1:
-            vals = args[0]
-            self = None
-        elif len(args) == 2:
-            self, vals = args
-        else:
-            assert 'either f([...]) or self.f([...])'
+def ssh_completion(cls=AsyncCompletion, **c_kwargs):
+    # type: (Type[orchestrator.Completion], Any) -> Callable
+    """
+    See ./HACKING.rst for a how-to
+    """
+    def decorator(f):
+        @wraps(f)
+        def wrapper(*args):
+
+            name = f.__name__
+            many = c_kwargs.get('many', False)
+
+            # Some weired logic to make calling functions with multiple arguments work.
+            if len(args) == 1:
+                [value] = args
+                if many and value and isinstance(value[0], tuple):
+                    return cls(on_complete=lambda x: f(*x), value=value, name=name, **c_kwargs)
+                else:
+                    return cls(on_complete=f, value=value, name=name, **c_kwargs)
+            else:
+                if many:
+                    self, value = args
 
-        def do_work(arg):
-            if not isinstance(arg, tuple):
-                arg = (arg, )
-            try:
-                if self:
-                    return f(self, *arg)
-                return f(*arg)
-            except Exception as e:
-                logger.exception(f'executing {f.__name__}({args}) failed.')
-                raise
+                    def call_self(inner_args):
+                        if not isinstance(inner_args, tuple):
+                            inner_args = (inner_args, )
+                        return f(self, *inner_args)
 
-        assert CephadmOrchestrator.instance is not None
-        return CephadmOrchestrator.instance._worker_pool.map(do_work, vals)
+                    return cls(on_complete=call_self, value=value, name=name, **c_kwargs)
+                else:
+                    return cls(on_complete=lambda x: f(*x), value=args, name=name, **c_kwargs)
+
+        return wrapper
+    return decorator
+
+
+def async_completion(f):
+    # type: (Callable) -> Callable[..., AsyncCompletion]
+    """
+    See ./HACKING.rst for a how-to
+
+    :param f: wrapped function
+    """
+    return ssh_completion()(f)
+
+
+def async_map_completion(f):
+    # type: (Callable) -> Callable[..., AsyncCompletion]
+    """
+    See ./HACKING.rst for a how-to
 
+    :param f: wrapped function
 
-    return forall_hosts_wrapper
+    kind of similar to
+
+    >>> def sync_map(f):
+    ...     return lambda x: map(f, x)
+
+    """
+    return ssh_completion(many=True)(f)
 
 
 def trivial_completion(f):
@@ -1608,7 +1660,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                 r.append(h)
         return r
 
-    @trivial_completion
+    @async_completion
     def add_host(self, spec):
         # type: (HostSpec) -> str
         """
@@ -1632,7 +1684,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.log.info('Added host %s' % spec.hostname)
         return "Added host '{}'".format(spec.hostname)
 
-    @trivial_completion
+    @async_completion
     def remove_host(self, host):
         # type: (str) -> str
         """
@@ -1648,7 +1700,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.log.info('Removed host %s' % host)
         return "Removed host '{}'".format(host)
 
-    @trivial_completion
+    @async_completion
     def update_host_addr(self, host, addr):
         if host not in self.inventory:
             raise OrchestratorError('host %s not registered' % host)
@@ -1678,7 +1730,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             ))
         return r
 
-    @trivial_completion
+    @async_completion
     def add_host_label(self, host, label):
         if host not in self.inventory:
             raise OrchestratorError('host %s does not exist' % host)
@@ -1691,7 +1743,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.log.info('Added label %s to host %s' % (label, host))
         return 'Added label %s to host %s' % (label, host)
 
-    @trivial_completion
+    @async_completion
     def remove_host_label(self, host, label):
         if host not in self.inventory:
             raise OrchestratorError('host %s does not exist' % host)
@@ -1879,7 +1931,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                 result.append(dd)
         return result
 
-    @trivial_completion
     def service_action(self, action, service_name):
         args = []
         for host, dm in self.cache.daemons.items():
@@ -1890,7 +1941,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.log.info('%s service %s' % (action.capitalize(), service_name))
         return self._daemon_actions(args)
 
-    @forall_hosts
+    @async_map_completion
     def _daemon_actions(self, daemon_type, daemon_id, host, action):
         return self._daemon_action(daemon_type, daemon_id, host, action)
 
@@ -1916,7 +1967,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.cache.invalidate_host_daemons(host)
         return "{} {} from host '{}'".format(action, name, host)
 
-    @trivial_completion
     def daemon_action(self, action, daemon_type, daemon_id):
         args = []
         for host, dm in self.cache.daemons.items():
@@ -1933,7 +1983,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             ','.join(['%s.%s' % (a[0], a[1]) for a in args])))
         return self._daemon_actions(args)
 
-    @trivial_completion
     def remove_daemons(self, names):
         # type: (List[str]) -> orchestrator.Completion
         args = []
@@ -1992,9 +2041,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
         return '\n'.join(out + err)
 
-    @trivial_completion
     def blink_device_light(self, ident_fault, on, locs):
-        @forall_hosts
+        @async_map_completion
         def blink(host, dev, path):
             cmd = [
                 'lsmcli',
@@ -2260,7 +2308,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         return "{} {} on host '{}'".format(
             'Reconfigured' if reconfig else 'Deployed', name, host)
 
-    @forall_hosts
+    @async_map_completion
     def _remove_daemons(self, name, host):
         return self._remove_daemon(name, host)
 
@@ -2535,7 +2583,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             )
             daemons.append(sd)
 
-        @forall_hosts
+        @async_map_completion
         def create_func_map(*args):
             return create_func(*args)
 
@@ -2586,7 +2634,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                                    keyring=keyring,
                                    extra_config=extra_config)
 
-    @trivial_completion
     def add_mon(self, spec):
         # type: (ServiceSpec) -> orchestrator.Completion
         return self._add_daemon('mon', spec, self._create_mon)
@@ -2606,7 +2653,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
         return self._create_daemon('mgr', mgr_id, host, keyring=keyring)
 
-    @trivial_completion
     def add_mgr(self, spec):
         # type: (ServiceSpec) -> orchestrator.Completion
         return self._add_daemon('mgr', spec, self._create_mgr)
@@ -2654,7 +2700,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
     def apply_mgr(self, spec):
         return self._apply(spec)
 
-    @trivial_completion
     def add_mds(self, spec: ServiceSpec):
         return self._add_daemon('mds', spec, self._create_mds, self._config_mds)
 
@@ -2683,7 +2728,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         })
         return self._create_daemon('mds', mds_id, host, keyring=keyring)
 
-    @trivial_completion
     def add_rgw(self, spec):
         return self._add_daemon('rgw', spec, self._create_rgw, self._config_rgw)
 
@@ -2726,7 +2770,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
     def apply_rgw(self, spec):
         return self._apply(spec)
 
-    @trivial_completion
     def add_rbd_mirror(self, spec):
         return self._add_daemon('rbd-mirror', spec, self._create_rbd_mirror)
 
@@ -3047,7 +3090,6 @@ receivers:
             "peers": peers
         }, sorted(deps)
 
-    @trivial_completion
     def add_prometheus(self, spec):
         return self._add_daemon('prometheus', spec, self._create_prometheus)
 
@@ -3058,7 +3100,6 @@ receivers:
     def apply_prometheus(self, spec):
         return self._apply(spec)
 
-    @trivial_completion
     def add_node_exporter(self, spec):
         # type: (ServiceSpec) -> AsyncCompletion
         return self._add_daemon('node-exporter', spec,
@@ -3071,7 +3112,6 @@ receivers:
     def _create_node_exporter(self, daemon_id, host):
         return self._create_daemon('node-exporter', daemon_id, host)
 
-    @trivial_completion
     def add_crash(self, spec):
         # type: (ServiceSpec) -> AsyncCompletion
         return self._add_daemon('crash', spec,
@@ -3090,7 +3130,6 @@ receivers:
         })
         return self._create_daemon('crash', daemon_id, host, keyring=keyring)
 
-    @trivial_completion
     def add_grafana(self, spec):
         # type: (ServiceSpec) -> AsyncCompletion
         return self._add_daemon('grafana', spec, self._create_grafana)
@@ -3103,7 +3142,6 @@ receivers:
         # type: (str, str) -> str
         return self._create_daemon('grafana', daemon_id, host)
 
-    @trivial_completion
     def add_alertmanager(self, spec):
         # type: (ServiceSpec) -> AsyncCompletion
         return self._add_daemon('alertmanager', spec, self._create_alertmanager)
index 41c124677cc577ca318989d64bb47ee2513759a8..085ea6c0a803b374570e26f2aa22bff0dcf13b62 100644 (file)
@@ -12,7 +12,7 @@ import pytest
 
 from tests import mock
 from .fixtures import cephadm_module, wait
-from ..module import trivial_completion, forall_hosts
+from ..module import trivial_completion, async_completion, async_map_completion
 
 
 class TestCompletion(object):
@@ -23,6 +23,19 @@ class TestCompletion(object):
             return x+1
         assert wait(cephadm_module, run(1)) == 2
 
+    @pytest.mark.parametrize("input", [
+        ((1, ), ),
+        ((1, 2), ),
+        (("hallo", ), ),
+        (("hallo", "foo"), ),
+    ])
+    def test_async(self, input, cephadm_module):
+        @async_completion
+        def run(*args):
+            return str(args)
+
+        assert wait(cephadm_module, run(*input)) == str(input)
+
     @pytest.mark.parametrize("input,expected", [
         ([], []),
         ([1], ["(1,)"]),
@@ -32,11 +45,25 @@ class TestCompletion(object):
         ([(1, 2), (3, 4)], ["(1, 2)", "(3, 4)"]),
     ])
     def test_async_map(self, input, expected, cephadm_module):
-        @forall_hosts
-        def run_forall(*args):
+        @async_map_completion
+        def run(*args):
             return str(args)
-        assert run_forall(input) == expected
 
+        c = run(input)
+        wait(cephadm_module, c)
+        assert c.result == expected
+
+    def test_async_self(self, cephadm_module):
+        class Run(object):
+            def __init__(self):
+                self.attr = 1
+
+            @async_completion
+            def run(self, x):
+                assert self.attr == 1
+                return x + 1
+
+        assert wait(cephadm_module, Run().run(1)) == 2
 
     @pytest.mark.parametrize("input,expected", [
         ([], []),
@@ -51,9 +78,101 @@ class TestCompletion(object):
             def __init__(self):
                 self.attr = 1
 
-            @forall_hosts
-            def run_forall(self, *args):
+            @async_map_completion
+            def run(self, *args):
                 assert self.attr == 1
                 return str(args)
 
-        assert Run().run_forall(input) == expected
+        c = Run().run(input)
+        wait(cephadm_module, c)
+        assert c.result == expected
+
+    def test_then1(self, cephadm_module):
+        @async_map_completion
+        def run(x):
+            return x+1
+
+        assert wait(cephadm_module, run([1,2]).then(str)) == '[2, 3]'
+
+    def test_then2(self, cephadm_module):
+        @async_map_completion
+        def run(x):
+            time.sleep(0.1)
+            return x+1
+
+        @async_completion
+        def async_str(results):
+            return str(results)
+
+        c = run([1,2]).then(async_str)
+
+        wait(cephadm_module, c)
+        assert c.result == '[2, 3]'
+
+    def test_then3(self, cephadm_module):
+        @async_map_completion
+        def run(x):
+            time.sleep(0.1)
+            return x+1
+
+        def async_str(results):
+            return async_completion(str)(results)
+
+        c = run([1,2]).then(async_str)
+
+        wait(cephadm_module, c)
+        assert c.result == '[2, 3]'
+
+    def test_then4(self, cephadm_module):
+        @async_map_completion
+        def run(x):
+            time.sleep(0.1)
+            return x+1
+
+        def async_str(results):
+            return async_completion(str)(results).then(lambda x: x + "hello")
+
+        c = run([1,2]).then(async_str)
+
+        wait(cephadm_module, c)
+        assert c.result == '[2, 3]hello'
+
+    @pytest.mark.skip(reason="see limitation of async_map_completion")
+    def test_then5(self, cephadm_module):
+        @async_map_completion
+        def run(x):
+            time.sleep(0.1)
+            return async_completion(str)(x+1)
+
+        c = run([1,2])
+
+        wait(cephadm_module, c)
+        assert c.result == "['2', '3']"
+
+    def test_raise(self, cephadm_module):
+        @async_completion
+        def run(x):
+            raise ZeroDivisionError()
+
+        with pytest.raises(ZeroDivisionError):
+            wait(cephadm_module, run(1))
+
+    def test_progress(self, cephadm_module):
+        @async_map_completion
+        def run(*args):
+            return str(args)
+
+        c = run(list(range(2)))
+        c.update_progress = True
+        c.add_progress(
+            mgr=cephadm_module,
+            message="my progress"
+        )
+        wait(cephadm_module, c)
+        assert c.result == [str((x,)) for x in range(2)]
+        assert cephadm_module.remote.mock_calls == [
+            mock.call('progress', 'update', mock.ANY, 'my progress', float(i) / 2, [('origin', 'orchestrator')])
+            for i in range(2+1)] + [
+            mock.call('progress', 'update', mock.ANY, 'my progress', 1.0, [('origin', 'orchestrator')]),
+            mock.call('progress', 'complete', mock.ANY),
+        ]