From 38d88fef38a3b2f37f6166df106f3bb2b7512cbd Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Fri, 20 Dec 2019 13:10:05 +0100 Subject: [PATCH] mgr/cephadm: Add progress capability to completions Signed-off-by: Sebastian Wagner --- src/pybind/mgr/cephadm/module.py | 38 +++++++---- src/pybind/mgr/cephadm/tests/fixtures.py | 31 ++++++++- src/pybind/mgr/cephadm/tests/test_cephadm.py | 64 ++++++------------- .../mgr/cephadm/tests/test_completion.py | 59 +++++++++-------- src/pybind/mgr/orchestrator.py | 7 +- src/pybind/mgr/tests/test_orchestrator.py | 2 +- 6 files changed, 113 insertions(+), 88 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 2821ce389a01b..2a260c405f541 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -86,12 +86,14 @@ class AsyncCompletion(orchestrator.Completion): on_complete=None, # type: Optional[Callable] name=None, # type: Optional[str] many=False, # type: bool + update_progress=False, # type: bool ): assert CephadmOrchestrator.instance is not None self.many = many + self.update_progress = update_progress if name is None and on_complete is not None: - name = on_complete.__name__ + name = getattr(on_complete, '__name__', None) super(AsyncCompletion, self).__init__(_first_promise, value, on_complete, name) @property @@ -109,6 +111,9 @@ class AsyncCompletion(orchestrator.Completion): def callback(result): try: + if self.update_progress: + assert self.progress_reference + self.progress_reference.progress = 1.0 self._on_complete_ = None self._finalize(result) except Exception as e: @@ -117,35 +122,42 @@ class AsyncCompletion(orchestrator.Completion): def error_callback(e): self.fail(e) - if six.PY3: - _callback = self._on_complete_ - else: - def _callback(*args, **kwargs): - # Py2 only: _worker_pool doesn't call error_callback + def run(value): + def do_work(*args, **kwargs): + assert self._on_complete_ is not None try: - return self._on_complete_(*args, **kwargs) + res = self._on_complete_(*args, **kwargs) + if self.update_progress and self.many: + assert self.progress_reference + self.progress_reference.progress += 1.0 / len(value) + return res except Exception as e: - self.fail(e) + if six.PY3: + raise + else: + # Py2 only: _worker_pool doesn't call error_callback + self.fail(e) - def run(value): assert CephadmOrchestrator.instance + #if self.update_progress: + # self.progress_reference.progress = 0.0 if self.many: if not value: logger.info('calling map_async without values') callback([]) if six.PY3: - CephadmOrchestrator.instance._worker_pool.map_async(_callback, value, + CephadmOrchestrator.instance._worker_pool.map_async(do_work, value, callback=callback, error_callback=error_callback) else: - CephadmOrchestrator.instance._worker_pool.map_async(_callback, value, + CephadmOrchestrator.instance._worker_pool.map_async(do_work, value, callback=callback) else: if six.PY3: - CephadmOrchestrator.instance._worker_pool.apply_async(_callback, (value,), + CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,), callback=callback, error_callback=error_callback) else: - CephadmOrchestrator.instance._worker_pool.apply_async(_callback, (value,), + CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,), callback=callback) return self.ASYNC_RESULT diff --git a/src/pybind/mgr/cephadm/tests/fixtures.py b/src/pybind/mgr/cephadm/tests/fixtures.py index 8e119c3f1823a..052eb87ba6588 100644 --- a/src/pybind/mgr/cephadm/tests/fixtures.py +++ b/src/pybind/mgr/cephadm/tests/fixtures.py @@ -1,8 +1,12 @@ -from contextlib import contextmanager - +import time +try: + from typing import Any +except ImportError: + pass import pytest from cephadm import CephadmOrchestrator +from orchestrator import raise_if_exception, Completion from tests import mock @@ -30,6 +34,7 @@ def get_ceph_option(_, key): def cephadm_module(): with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\ mock.patch("cephadm.module.CephadmOrchestrator._configure_logging", lambda *args: None),\ + mock.patch("cephadm.module.CephadmOrchestrator.remote"),\ mock.patch("cephadm.module.CephadmOrchestrator.set_store", set_store),\ mock.patch("cephadm.module.CephadmOrchestrator.get_store", get_store),\ mock.patch("cephadm.module.CephadmOrchestrator.get_store_prefix", get_store_prefix): @@ -44,3 +49,25 @@ def cephadm_module(): } m.__init__('cephadm', 0, 0) yield m + + +def wait(m, c): + # type: (CephadmOrchestrator, Completion) -> Any + m.process([c]) + + try: + import pydevd # if in debugger + while True: # don't timeout + if c.is_finished: + raise_if_exception(c) + return c.result + time.sleep(0.1) + except ImportError: # not in debugger + for i in range(30): + if i % 10 == 0: + m.process([c]) + if c.is_finished: + raise_if_exception(c) + return c.result + time.sleep(0.1) + assert False, "timeout" + str(c._state) diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index 76da7561ebc0c..e91abf28bc808 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -9,11 +9,10 @@ try: except ImportError: pass -from orchestrator import ServiceDescription, raise_if_exception, Completion, InventoryNode, \ - StatelessServiceSpec, PlacementSpec, RGWSpec, parse_host_specs, StatefulServiceSpec -from ..module import CephadmOrchestrator +from orchestrator import ServiceDescription, InventoryNode, \ + StatelessServiceSpec, PlacementSpec, RGWSpec, StatefulServiceSpec from tests import mock -from .fixtures import cephadm_module +from .fixtures import cephadm_module, wait """ @@ -35,35 +34,12 @@ def mon_command(*args, **kwargs): class TestCephadm(object): - def _wait(self, m, c): - # type: (CephadmOrchestrator, Completion) -> Any - m.process([c]) - - try: - import pydevd # if in debugger - while True: # don't timeout - if c.is_finished: - raise_if_exception(c) - return c.result - time.sleep(0.1) - except ImportError: # not in debugger - for i in range(30): - if i % 10 == 0: - m.process([c]) - if c.is_finished: - raise_if_exception(c) - return c.result - time.sleep(0.1) - assert False, "timeout" + str(c._state) - - m.process([c]) - assert False, "timeout" + str(c._state) @contextmanager def _with_host(self, m, name): - self._wait(m, m.add_host(name)) + wait(m, m.add_host(name)) yield - self._wait(m, m.remove_host(name)) + wait(m, m.remove_host(name)) def test_get_unique_name(self, cephadm_module): existing = [ @@ -75,21 +51,21 @@ class TestCephadm(object): def test_host(self, cephadm_module): with self._with_host(cephadm_module, 'test'): - assert self._wait(cephadm_module, cephadm_module.get_hosts()) == [InventoryNode('test')] + assert wait(cephadm_module, cephadm_module.get_hosts()) == [InventoryNode('test')] c = cephadm_module.get_hosts() - assert self._wait(cephadm_module, c) == [] + assert wait(cephadm_module, c) == [] @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) def test_service_ls(self, cephadm_module): with self._with_host(cephadm_module, 'test'): c = cephadm_module.describe_service() - assert self._wait(cephadm_module, c) == [] + assert wait(cephadm_module, c) == [] @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) def test_device_ls(self, cephadm_module): with self._with_host(cephadm_module, 'test'): c = cephadm_module.get_inventory() - assert self._wait(cephadm_module, c) == [InventoryNode('test')] + assert wait(cephadm_module, c) == [InventoryNode('test')] @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm( json.dumps([ @@ -111,11 +87,11 @@ class TestCephadm(object): cephadm_module.service_cache_timeout = 10 with self._with_host(cephadm_module, 'test'): c = cephadm_module.service_action('redeploy', 'rgw', service_id='myrgw.foobar') - assert self._wait(cephadm_module, c) == ["Deployed rgw.myrgw.foobar on host 'test'"] + assert wait(cephadm_module, c) == ["Deployed rgw.myrgw.foobar on host 'test'"] for what in ('start', 'stop', 'restart'): c = cephadm_module.service_action(what, 'rgw', service_id='myrgw.foobar') - assert self._wait(cephadm_module, c) == [what + " rgw.myrgw.foobar from host 'test'"] + assert wait(cephadm_module, c) == [what + " rgw.myrgw.foobar from host 'test'"] @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) @@ -126,7 +102,7 @@ class TestCephadm(object): with self._with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) c = cephadm_module.update_mons(StatefulServiceSpec(placement=ps)) - assert self._wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"] + assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"] @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) @mock.patch("cephadm.module.CephadmOrchestrator.send_command") @@ -136,7 +112,7 @@ class TestCephadm(object): with self._with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) c = cephadm_module.update_mgrs(StatefulServiceSpec(placement=ps)) - [out] = self._wait(cephadm_module, c) + [out] = wait(cephadm_module, c) assert "Deployed mgr." in out assert " on host 'test'" in out @@ -148,7 +124,7 @@ class TestCephadm(object): with self._with_host(cephadm_module, 'test'): dg = DriveGroupSpec('test', DeviceSelection(paths=[''])) c = cephadm_module.create_osds(dg) - assert self._wait(cephadm_module, c) == "Created osd(s) on host 'test'" + assert wait(cephadm_module, c) == "Created osd(s) on host 'test'" @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm( json.dumps([ @@ -166,7 +142,7 @@ class TestCephadm(object): cephadm_module._cluster_fsid = "fsid" with self._with_host(cephadm_module, 'test'): c = cephadm_module.remove_osds(['0']) - out = self._wait(cephadm_module, c) + out = wait(cephadm_module, c) assert out == ["Removed osd.0 from host 'test'"] @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) @@ -177,7 +153,7 @@ class TestCephadm(object): with self._with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test'], count=1) c = cephadm_module.add_mds(StatelessServiceSpec('name', placement=ps)) - [out] = self._wait(cephadm_module, c) + [out] = wait(cephadm_module, c) assert "Deployed mds.name." in out assert " on host 'test'" in out @@ -189,7 +165,7 @@ class TestCephadm(object): with self._with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test'], count=1) c = cephadm_module.add_rgw(RGWSpec('realm', 'zone', placement=ps)) - [out] = self._wait(cephadm_module, c) + [out] = wait(cephadm_module, c) assert "Deployed rgw.realm.zone." in out assert " on host 'test'" in out @@ -209,7 +185,7 @@ class TestCephadm(object): cephadm_module._cluster_fsid = "fsid" with self._with_host(cephadm_module, 'test'): c = cephadm_module.remove_rgw('myrgw') - out = self._wait(cephadm_module, c) + out = wait(cephadm_module, c) assert out == ["Removed rgw.myrgw.foobar from host 'test'"] @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) @@ -220,7 +196,7 @@ class TestCephadm(object): with self._with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test'], count=1) c = cephadm_module.add_rbd_mirror(StatelessServiceSpec(name='name', placement=ps)) - [out] = self._wait(cephadm_module, c) + [out] = wait(cephadm_module, c) assert "Deployed rbd-mirror." in out assert " on host 'test'" in out @@ -231,5 +207,5 @@ class TestCephadm(object): def test_blink_device_light(self, _send_command, _get_connection, cephadm_module): with self._with_host(cephadm_module, 'test'): c = cephadm_module.blink_device_light('ident', True, [('test', '', '')]) - assert self._wait(cephadm_module, c) == ['Set ident light for test: on'] + assert wait(cephadm_module, c) == ['Set ident light for test: on'] diff --git a/src/pybind/mgr/cephadm/tests/test_completion.py b/src/pybind/mgr/cephadm/tests/test_completion.py index 9b858d6a572c4..085ea6c0a803b 100644 --- a/src/pybind/mgr/cephadm/tests/test_completion.py +++ b/src/pybind/mgr/cephadm/tests/test_completion.py @@ -10,29 +10,18 @@ except ImportError: import pytest -from orchestrator import raise_if_exception, Completion -from .fixtures import cephadm_module -from ..module import trivial_completion, async_completion, async_map_completion, CephadmOrchestrator +from tests import mock +from .fixtures import cephadm_module, wait +from ..module import trivial_completion, async_completion, async_map_completion class TestCompletion(object): - def _wait(self, m, c): - # type: (CephadmOrchestrator, Completion) -> Any - m.process([c]) - m.process([c]) - - for _ in range(30): - if c.is_finished: - raise_if_exception(c) - return c.result - time.sleep(0.1) - assert False, "timeout" + str(c._state) def test_trivial(self, cephadm_module): @trivial_completion def run(x): return x+1 - assert self._wait(cephadm_module, run(1)) == 2 + assert wait(cephadm_module, run(1)) == 2 @pytest.mark.parametrize("input", [ ((1, ), ), @@ -45,7 +34,7 @@ class TestCompletion(object): def run(*args): return str(args) - assert self._wait(cephadm_module, run(*input)) == str(input) + assert wait(cephadm_module, run(*input)) == str(input) @pytest.mark.parametrize("input,expected", [ ([], []), @@ -61,7 +50,7 @@ class TestCompletion(object): return str(args) c = run(input) - self._wait(cephadm_module, c) + wait(cephadm_module, c) assert c.result == expected def test_async_self(self, cephadm_module): @@ -74,7 +63,7 @@ class TestCompletion(object): assert self.attr == 1 return x + 1 - assert self._wait(cephadm_module, Run().run(1)) == 2 + assert wait(cephadm_module, Run().run(1)) == 2 @pytest.mark.parametrize("input,expected", [ ([], []), @@ -95,7 +84,7 @@ class TestCompletion(object): return str(args) c = Run().run(input) - self._wait(cephadm_module, c) + wait(cephadm_module, c) assert c.result == expected def test_then1(self, cephadm_module): @@ -103,7 +92,7 @@ class TestCompletion(object): def run(x): return x+1 - assert self._wait(cephadm_module, run([1,2]).then(str)) == '[2, 3]' + assert wait(cephadm_module, run([1,2]).then(str)) == '[2, 3]' def test_then2(self, cephadm_module): @async_map_completion @@ -117,7 +106,7 @@ class TestCompletion(object): c = run([1,2]).then(async_str) - self._wait(cephadm_module, c) + wait(cephadm_module, c) assert c.result == '[2, 3]' def test_then3(self, cephadm_module): @@ -131,7 +120,7 @@ class TestCompletion(object): c = run([1,2]).then(async_str) - self._wait(cephadm_module, c) + wait(cephadm_module, c) assert c.result == '[2, 3]' def test_then4(self, cephadm_module): @@ -145,7 +134,7 @@ class TestCompletion(object): c = run([1,2]).then(async_str) - self._wait(cephadm_module, c) + wait(cephadm_module, c) assert c.result == '[2, 3]hello' @pytest.mark.skip(reason="see limitation of async_map_completion") @@ -157,7 +146,7 @@ class TestCompletion(object): c = run([1,2]) - self._wait(cephadm_module, c) + wait(cephadm_module, c) assert c.result == "['2', '3']" def test_raise(self, cephadm_module): @@ -166,4 +155,24 @@ class TestCompletion(object): raise ZeroDivisionError() with pytest.raises(ZeroDivisionError): - self._wait(cephadm_module, run(1)) + wait(cephadm_module, run(1)) + + def test_progress(self, cephadm_module): + @async_map_completion + def run(*args): + return str(args) + + c = run(list(range(2))) + c.update_progress = True + c.add_progress( + mgr=cephadm_module, + message="my progress" + ) + wait(cephadm_module, c) + assert c.result == [str((x,)) for x in range(2)] + assert cephadm_module.remote.mock_calls == [ + mock.call('progress', 'update', mock.ANY, 'my progress', float(i) / 2, [('origin', 'orchestrator')]) + for i in range(2+1)] + [ + mock.call('progress', 'update', mock.ANY, 'my progress', 1.0, [('origin', 'orchestrator')]), + mock.call('progress', 'complete', mock.ANY), + ] diff --git a/src/pybind/mgr/orchestrator.py b/src/pybind/mgr/orchestrator.py index b1946863909dd..ac4e6214b3745 100644 --- a/src/pybind/mgr/orchestrator.py +++ b/src/pybind/mgr/orchestrator.py @@ -273,7 +273,8 @@ class _Promise(object): :param value: new value. """ - assert self._state in (self.INITIALIZED, self.RUNNING) + if self._state not in (self.INITIALIZED, self.RUNNING): + raise ValueError('finalize: {} already finished. {}'.format(repr(self), value)) self._state = self.RUNNING @@ -402,8 +403,7 @@ class ProgressReference(object): def __call__(self, arg): self._completion_has_result = True - if self.progress == 0.0: - self.progress = 0.5 + self.progress = 1.0 return arg @property @@ -412,6 +412,7 @@ class ProgressReference(object): @progress.setter def progress(self, progress): + assert progress <= 1.0 self._progress = progress try: if self.effective: diff --git a/src/pybind/mgr/tests/test_orchestrator.py b/src/pybind/mgr/tests/test_orchestrator.py index abbea033cbc26..3687e253d1ef6 100644 --- a/src/pybind/mgr/tests/test_orchestrator.py +++ b/src/pybind/mgr/tests/test_orchestrator.py @@ -177,7 +177,7 @@ def test_progress(): mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', 0.0, [('origin', 'orchestrator')]) c.finalize() - mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', 0.5, [('origin', 'orchestrator')]) + mgr.remote.assert_called_with('progress', 'complete', c.progress_reference.progress_id) c.progress_reference.update() mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', progress_val, [('origin', 'orchestrator')]) -- 2.47.3