From f7bf4d0c597080f45bb8466bd6bab3b9cecb03c4 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Tue, 12 May 2020 16:16:39 +0200 Subject: [PATCH] mgr/cephadm: Remove AsyncCompletion Simplyfy things a lot by not using multiprocessing.pool. Signed-off-by: Sebastian Wagner --- src/pybind/mgr/cephadm/module.py | 86 +++---------------- .../mgr/cephadm/tests/test_completion.py | 8 ++ 2 files changed, 21 insertions(+), 73 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index f02f094ac00..7d6554175c1 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -92,75 +92,6 @@ except ImportError: self.cleanup() -class AsyncCompletion(orchestrator.Completion): - def __init__(self, - _first_promise=None, # type: Optional[orchestrator.Completion] - value=orchestrator._Promise.NO_RESULT, # type: Any - on_complete=None, # type: Optional[Callable] - name=None, # type: Optional[str] - update_progress=False, # type: bool - ): - - assert CephadmOrchestrator.instance is not None - self.update_progress = update_progress - if name is None and on_complete is not None: - name = getattr(on_complete, '__name__', None) - super(AsyncCompletion, self).__init__(_first_promise, value, on_complete, name) - - @property - def _progress_reference(self): - # type: () -> Optional[orchestrator.ProgressReference] - if hasattr(self._on_complete_, 'progress_id'): # type: ignore - return self._on_complete_ # type: ignore - return None - - @property - def _on_complete(self): - # type: () -> Optional[Callable] - if self._on_complete_ is None: - return None - - def callback(result): - try: - if self.update_progress: - assert self.progress_reference - self.progress_reference.progress = 1.0 - self._on_complete_ = None - self._finalize(result) - except Exception as e: - try: - self.fail(e) - except Exception: - logger.exception(f'failed to fail AsyncCompletion: >{repr(self)}<') - if 'UNITTEST' in os.environ: - assert False - - def error_callback(e): - pass - - def run(value): - def do_work(*args, **kwargs): - assert self._on_complete_ is not None - try: - res = self._on_complete_(*args, **kwargs) - return res - except Exception as e: - self.fail(e) - raise - - assert CephadmOrchestrator.instance - CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,), - callback=callback, error_callback=error_callback) - return self.ASYNC_RESULT - - return run - - @_on_complete.setter - def _on_complete(self, inner): - # type: (Callable) -> None - self._on_complete_ = inner - - def forall_hosts(f: Callable[..., T]) -> Callable[..., List[T]]: @wraps(f) def forall_hosts_wrapper(*args) -> List[T]: @@ -192,11 +123,20 @@ def forall_hosts(f: Callable[..., T]) -> Callable[..., List[T]]: return forall_hosts_wrapper -def trivial_completion(f): - # type: (Callable) -> Callable[..., orchestrator.Completion] +class CephadmCompletion(orchestrator.Completion): + def evaluate(self): + self.finalize(None) + +def trivial_completion(f: Callable) -> Callable[..., CephadmCompletion]: + """ + Decorator to make CephadmCompletion methods return + a completion object that executes themselves. + """ + @wraps(f) def wrapper(*args, **kwargs): - return AsyncCompletion(value=f(*args, **kwargs), name=f.__name__) + return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs)) + return wrapper @@ -729,7 +669,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.log.debug("process: completions={0}".format(orchestrator.pretty_print(completions))) for p in completions: - p.finalize() + p.evaluate() @orchestrator._cli_write_command( prefix='cephadm set-ssh-config', diff --git a/src/pybind/mgr/cephadm/tests/test_completion.py b/src/pybind/mgr/cephadm/tests/test_completion.py index 41c124677cc..2f7956667fc 100644 --- a/src/pybind/mgr/cephadm/tests/test_completion.py +++ b/src/pybind/mgr/cephadm/tests/test_completion.py @@ -23,6 +23,14 @@ class TestCompletion(object): return x+1 assert wait(cephadm_module, run(1)) == 2 + def test_exception(self, cephadm_module): + @trivial_completion + def run(x): + raise ValueError + c = run(1) + with pytest.raises(ValueError): + wait(cephadm_module, c) + @pytest.mark.parametrize("input,expected", [ ([], []), ([1], ["(1,)"]), -- 2.39.5