]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: Remove AsyncCompletion 35022/head
authorSebastian Wagner <sebastian.wagner@suse.com>
Tue, 12 May 2020 14:16:39 +0000 (16:16 +0200)
committerSebastian Wagner <sebastian.wagner@suse.com>
Tue, 12 May 2020 14:16:39 +0000 (16:16 +0200)
Simplyfy things a lot by not using multiprocessing.pool.

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tests/test_completion.py

index f02f094ac00021570ab7e8ec61ed4c0cd7422c22..7d6554175c1f3909e3c1a2ba702f64844173af5d 100644 (file)
@@ -92,75 +92,6 @@ except ImportError:
             self.cleanup()
 
 
-class AsyncCompletion(orchestrator.Completion):
-    def __init__(self,
-                 _first_promise=None,  # type: Optional[orchestrator.Completion]
-                 value=orchestrator._Promise.NO_RESULT,  # type: Any
-                 on_complete=None,  # type: Optional[Callable]
-                 name=None,  # type: Optional[str]
-                 update_progress=False,  # type: bool
-                 ):
-
-        assert CephadmOrchestrator.instance is not None
-        self.update_progress = update_progress
-        if name is None and on_complete is not None:
-            name = getattr(on_complete, '__name__', None)
-        super(AsyncCompletion, self).__init__(_first_promise, value, on_complete, name)
-
-    @property
-    def _progress_reference(self):
-        # type: () -> Optional[orchestrator.ProgressReference]
-        if hasattr(self._on_complete_, 'progress_id'):  # type: ignore
-            return self._on_complete_  # type: ignore
-        return None
-
-    @property
-    def _on_complete(self):
-        # type: () -> Optional[Callable]
-        if self._on_complete_ is None:
-            return None
-
-        def callback(result):
-            try:
-                if self.update_progress:
-                    assert self.progress_reference
-                    self.progress_reference.progress = 1.0
-                self._on_complete_ = None
-                self._finalize(result)
-            except Exception as e:
-                try:
-                    self.fail(e)
-                except Exception:
-                    logger.exception(f'failed to fail AsyncCompletion: >{repr(self)}<')
-                    if 'UNITTEST' in os.environ:
-                        assert False
-
-        def error_callback(e):
-            pass
-
-        def run(value):
-            def do_work(*args, **kwargs):
-                assert self._on_complete_ is not None
-                try:
-                    res = self._on_complete_(*args, **kwargs)
-                    return res
-                except Exception as e:
-                    self.fail(e)
-                    raise
-
-            assert CephadmOrchestrator.instance
-            CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
-                                                              callback=callback, error_callback=error_callback)
-            return self.ASYNC_RESULT
-
-        return run
-
-    @_on_complete.setter
-    def _on_complete(self, inner):
-        # type: (Callable) -> None
-        self._on_complete_ = inner
-
-
 def forall_hosts(f: Callable[..., T]) -> Callable[..., List[T]]:
     @wraps(f)
     def forall_hosts_wrapper(*args) -> List[T]:
@@ -192,11 +123,20 @@ def forall_hosts(f: Callable[..., T]) -> Callable[..., List[T]]:
     return forall_hosts_wrapper
 
 
-def trivial_completion(f):
-    # type: (Callable) -> Callable[..., orchestrator.Completion]
+class CephadmCompletion(orchestrator.Completion):
+    def evaluate(self):
+        self.finalize(None)
+
+def trivial_completion(f: Callable) -> Callable[..., CephadmCompletion]:
+    """
+    Decorator to make CephadmCompletion methods return
+    a completion object that executes themselves.
+    """
+
     @wraps(f)
     def wrapper(*args, **kwargs):
-        return AsyncCompletion(value=f(*args, **kwargs), name=f.__name__)
+        return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs))
+
     return wrapper
 
 
@@ -729,7 +669,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             self.log.debug("process: completions={0}".format(orchestrator.pretty_print(completions)))
 
             for p in completions:
-                p.finalize()
+                p.evaluate()
 
     @orchestrator._cli_write_command(
         prefix='cephadm set-ssh-config',
index 41c124677cc577ca318989d64bb47ee2513759a8..2f7956667fc8aca5f9ef781a7ccbd7449f8a461e 100644 (file)
@@ -23,6 +23,14 @@ class TestCompletion(object):
             return x+1
         assert wait(cephadm_module, run(1)) == 2
 
+    def test_exception(self, cephadm_module):
+        @trivial_completion
+        def run(x):
+            raise ValueError
+        c = run(1)
+        with pytest.raises(ValueError):
+            wait(cephadm_module, c)
+
     @pytest.mark.parametrize("input,expected", [
         ([], []),
         ([1], ["(1,)"]),