]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: Add progress capability to completions
authorSebastian Wagner <sebastian.wagner@suse.com>
Fri, 20 Dec 2019 12:10:05 +0000 (13:10 +0100)
committerSebastian Wagner <sebastian.wagner@suse.com>
Tue, 7 Jan 2020 09:39:37 +0000 (10:39 +0100)
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tests/fixtures.py
src/pybind/mgr/cephadm/tests/test_cephadm.py
src/pybind/mgr/cephadm/tests/test_completion.py
src/pybind/mgr/orchestrator.py
src/pybind/mgr/tests/test_orchestrator.py

index 2821ce389a01b64f408b1db2e369978f0a28342d..2a260c405f54197f231bc0ca875a450dd2555082 100644 (file)
@@ -86,12 +86,14 @@ class AsyncCompletion(orchestrator.Completion):
                  on_complete=None,  # type: Optional[Callable]
                  name=None,  # type: Optional[str]
                  many=False, # type: bool
+                 update_progress=False,  # type: bool
                  ):
 
         assert CephadmOrchestrator.instance is not None
         self.many = many
+        self.update_progress = update_progress
         if name is None and on_complete is not None:
-            name = on_complete.__name__
+            name = getattr(on_complete, '__name__', None)
         super(AsyncCompletion, self).__init__(_first_promise, value, on_complete, name)
 
     @property
@@ -109,6 +111,9 @@ class AsyncCompletion(orchestrator.Completion):
 
         def callback(result):
             try:
+                if self.update_progress:
+                    assert self.progress_reference
+                    self.progress_reference.progress = 1.0
                 self._on_complete_ = None
                 self._finalize(result)
             except Exception as e:
@@ -117,35 +122,42 @@ class AsyncCompletion(orchestrator.Completion):
         def error_callback(e):
             self.fail(e)
 
-        if six.PY3:
-            _callback = self._on_complete_
-        else:
-            def _callback(*args, **kwargs):
-                # Py2 only: _worker_pool doesn't call error_callback
+        def run(value):
+            def do_work(*args, **kwargs):
+                assert self._on_complete_ is not None
                 try:
-                    return self._on_complete_(*args, **kwargs)
+                    res = self._on_complete_(*args, **kwargs)
+                    if self.update_progress and self.many:
+                        assert self.progress_reference
+                        self.progress_reference.progress += 1.0 / len(value)
+                    return res
                 except Exception as e:
-                    self.fail(e)
+                    if six.PY3:
+                        raise
+                    else:
+                        # Py2 only: _worker_pool doesn't call error_callback
+                        self.fail(e)
 
-        def run(value):
             assert CephadmOrchestrator.instance
+            #if self.update_progress:
+            #    self.progress_reference.progress = 0.0
             if self.many:
                 if not value:
                     logger.info('calling map_async without values')
                     callback([])
                 if six.PY3:
-                    CephadmOrchestrator.instance._worker_pool.map_async(_callback, value,
+                    CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
                                                                     callback=callback,
                                                                     error_callback=error_callback)
                 else:
-                    CephadmOrchestrator.instance._worker_pool.map_async(_callback, value,
+                    CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
                                                                     callback=callback)
             else:
                 if six.PY3:
-                    CephadmOrchestrator.instance._worker_pool.apply_async(_callback, (value,),
+                    CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
                                                                       callback=callback, error_callback=error_callback)
                 else:
-                    CephadmOrchestrator.instance._worker_pool.apply_async(_callback, (value,),
+                    CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
                                                                       callback=callback)
             return self.ASYNC_RESULT
 
index 8e119c3f1823afda95e77489ad4c86f955ebbe71..052eb87ba658832f5d7d94c1d359f69b59cd8234 100644 (file)
@@ -1,8 +1,12 @@
-from contextlib import contextmanager
-
+import time
+try:
+    from typing import Any
+except ImportError:
+    pass
 import pytest
 
 from cephadm import CephadmOrchestrator
+from orchestrator import raise_if_exception, Completion
 from tests import mock
 
 
@@ -30,6 +34,7 @@ def get_ceph_option(_, key):
 def cephadm_module():
     with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
             mock.patch("cephadm.module.CephadmOrchestrator._configure_logging", lambda *args: None),\
+            mock.patch("cephadm.module.CephadmOrchestrator.remote"),\
             mock.patch("cephadm.module.CephadmOrchestrator.set_store", set_store),\
             mock.patch("cephadm.module.CephadmOrchestrator.get_store", get_store),\
             mock.patch("cephadm.module.CephadmOrchestrator.get_store_prefix", get_store_prefix):
@@ -44,3 +49,25 @@ def cephadm_module():
         }
         m.__init__('cephadm', 0, 0)
         yield m
+
+
+def wait(m, c):
+    # type: (CephadmOrchestrator, Completion) -> Any
+    m.process([c])
+
+    try:
+        import pydevd  # if in debugger
+        while True:    # don't timeout
+            if c.is_finished:
+                raise_if_exception(c)
+                return c.result
+            time.sleep(0.1)
+    except ImportError:  # not in debugger
+        for i in range(30):
+            if i % 10 == 0:
+                m.process([c])
+            if c.is_finished:
+                raise_if_exception(c)
+                return c.result
+            time.sleep(0.1)
+    assert False, "timeout" + str(c._state)
index 76da7561ebc0c151788cf2910b8272948a09b779..e91abf28bc808a413ec579ee4e9915b66bfbef2e 100644 (file)
@@ -9,11 +9,10 @@ try:
 except ImportError:
     pass
 
-from orchestrator import ServiceDescription, raise_if_exception, Completion, InventoryNode, \
-    StatelessServiceSpec, PlacementSpec, RGWSpec, parse_host_specs, StatefulServiceSpec
-from ..module import CephadmOrchestrator
+from orchestrator import ServiceDescription, InventoryNode, \
+    StatelessServiceSpec, PlacementSpec, RGWSpec, StatefulServiceSpec
 from tests import mock
-from .fixtures import cephadm_module
+from .fixtures import cephadm_module, wait
 
 
 """
@@ -35,35 +34,12 @@ def mon_command(*args, **kwargs):
 
 
 class TestCephadm(object):
-    def _wait(self, m, c):
-        # type: (CephadmOrchestrator, Completion) -> Any
-        m.process([c])
-
-        try:
-            import pydevd  # if in debugger
-            while True:    # don't timeout
-                if c.is_finished:
-                    raise_if_exception(c)
-                    return c.result
-                time.sleep(0.1)
-        except ImportError:  # not in debugger
-            for i in range(30):
-                if i % 10 == 0:
-                    m.process([c])
-                if c.is_finished:
-                    raise_if_exception(c)
-                    return c.result
-                time.sleep(0.1)
-        assert False, "timeout" + str(c._state)
-
-        m.process([c])
-        assert False, "timeout" + str(c._state)
 
     @contextmanager
     def _with_host(self, m, name):
-        self._wait(m, m.add_host(name))
+        wait(m, m.add_host(name))
         yield
-        self._wait(m, m.remove_host(name))
+        wait(m, m.remove_host(name))
 
     def test_get_unique_name(self, cephadm_module):
         existing = [
@@ -75,21 +51,21 @@ class TestCephadm(object):
 
     def test_host(self, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
-            assert self._wait(cephadm_module, cephadm_module.get_hosts()) == [InventoryNode('test')]
+            assert wait(cephadm_module, cephadm_module.get_hosts()) == [InventoryNode('test')]
         c = cephadm_module.get_hosts()
-        assert self._wait(cephadm_module, c) == []
+        assert wait(cephadm_module, c) == []
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
     def test_service_ls(self, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.describe_service()
-            assert self._wait(cephadm_module, c) == []
+            assert wait(cephadm_module, c) == []
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
     def test_device_ls(self, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.get_inventory()
-            assert self._wait(cephadm_module, c) == [InventoryNode('test')]
+            assert wait(cephadm_module, c) == [InventoryNode('test')]
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
         json.dumps([
@@ -111,11 +87,11 @@ class TestCephadm(object):
         cephadm_module.service_cache_timeout = 10
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.service_action('redeploy', 'rgw', service_id='myrgw.foobar')
-            assert self._wait(cephadm_module, c) == ["Deployed rgw.myrgw.foobar on host 'test'"]
+            assert wait(cephadm_module, c) == ["Deployed rgw.myrgw.foobar on host 'test'"]
 
             for what in ('start', 'stop', 'restart'):
                 c = cephadm_module.service_action(what, 'rgw', service_id='myrgw.foobar')
-                assert self._wait(cephadm_module, c) == [what + " rgw.myrgw.foobar from host 'test'"]
+                assert wait(cephadm_module, c) == [what + " rgw.myrgw.foobar from host 'test'"]
 
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
@@ -126,7 +102,7 @@ class TestCephadm(object):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
             c = cephadm_module.update_mons(StatefulServiceSpec(placement=ps))
-            assert self._wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
+            assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@@ -136,7 +112,7 @@ class TestCephadm(object):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
             c = cephadm_module.update_mgrs(StatefulServiceSpec(placement=ps))
-            [out] = self._wait(cephadm_module, c)
+            [out] = wait(cephadm_module, c)
             assert "Deployed mgr." in out
             assert " on host 'test'" in out
 
@@ -148,7 +124,7 @@ class TestCephadm(object):
         with self._with_host(cephadm_module, 'test'):
             dg = DriveGroupSpec('test', DeviceSelection(paths=['']))
             c = cephadm_module.create_osds(dg)
-            assert self._wait(cephadm_module, c) == "Created osd(s) on host 'test'"
+            assert wait(cephadm_module, c) == "Created osd(s) on host 'test'"
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
         json.dumps([
@@ -166,7 +142,7 @@ class TestCephadm(object):
         cephadm_module._cluster_fsid = "fsid"
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.remove_osds(['0'])
-            out = self._wait(cephadm_module, c)
+            out = wait(cephadm_module, c)
             assert out == ["Removed osd.0 from host 'test'"]
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@@ -177,7 +153,7 @@ class TestCephadm(object):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test'], count=1)
             c = cephadm_module.add_mds(StatelessServiceSpec('name', placement=ps))
-            [out] = self._wait(cephadm_module, c)
+            [out] = wait(cephadm_module, c)
             assert "Deployed mds.name." in out
             assert " on host 'test'" in out
 
@@ -189,7 +165,7 @@ class TestCephadm(object):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test'], count=1)
             c = cephadm_module.add_rgw(RGWSpec('realm', 'zone', placement=ps))
-            [out] = self._wait(cephadm_module, c)
+            [out] = wait(cephadm_module, c)
             assert "Deployed rgw.realm.zone." in out
             assert " on host 'test'" in out
 
@@ -209,7 +185,7 @@ class TestCephadm(object):
         cephadm_module._cluster_fsid = "fsid"
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.remove_rgw('myrgw')
-            out = self._wait(cephadm_module, c)
+            out = wait(cephadm_module, c)
             assert out == ["Removed rgw.myrgw.foobar from host 'test'"]
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@@ -220,7 +196,7 @@ class TestCephadm(object):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test'], count=1)
             c = cephadm_module.add_rbd_mirror(StatelessServiceSpec(name='name', placement=ps))
-            [out] = self._wait(cephadm_module, c)
+            [out] = wait(cephadm_module, c)
             assert "Deployed rbd-mirror." in out
             assert " on host 'test'" in out
 
@@ -231,5 +207,5 @@ class TestCephadm(object):
     def test_blink_device_light(self, _send_command, _get_connection, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.blink_device_light('ident', True, [('test', '', '')])
-            assert self._wait(cephadm_module, c) == ['Set ident light for test: on']
+            assert wait(cephadm_module, c) == ['Set ident light for test: on']
 
index 9b858d6a572c4146c2694b5eb381466d467d847b..085ea6c0a803b374570e26f2aa22bff0dcf13b62 100644 (file)
@@ -10,29 +10,18 @@ except ImportError:
 import pytest
 
 
-from orchestrator import raise_if_exception, Completion
-from .fixtures import cephadm_module
-from ..module import trivial_completion, async_completion, async_map_completion, CephadmOrchestrator
+from tests import mock
+from .fixtures import cephadm_module, wait
+from ..module import trivial_completion, async_completion, async_map_completion
 
 
 class TestCompletion(object):
-    def _wait(self, m, c):
-        # type: (CephadmOrchestrator, Completion) -> Any
-        m.process([c])
-        m.process([c])
-
-        for _ in range(30):
-            if c.is_finished:
-                raise_if_exception(c)
-                return c.result
-            time.sleep(0.1)
-        assert False, "timeout" + str(c._state)
 
     def test_trivial(self, cephadm_module):
         @trivial_completion
         def run(x):
             return x+1
-        assert self._wait(cephadm_module, run(1)) == 2
+        assert wait(cephadm_module, run(1)) == 2
 
     @pytest.mark.parametrize("input", [
         ((1, ), ),
@@ -45,7 +34,7 @@ class TestCompletion(object):
         def run(*args):
             return str(args)
 
-        assert self._wait(cephadm_module, run(*input)) == str(input)
+        assert wait(cephadm_module, run(*input)) == str(input)
 
     @pytest.mark.parametrize("input,expected", [
         ([], []),
@@ -61,7 +50,7 @@ class TestCompletion(object):
             return str(args)
 
         c = run(input)
-        self._wait(cephadm_module, c)
+        wait(cephadm_module, c)
         assert c.result == expected
 
     def test_async_self(self, cephadm_module):
@@ -74,7 +63,7 @@ class TestCompletion(object):
                 assert self.attr == 1
                 return x + 1
 
-        assert self._wait(cephadm_module, Run().run(1)) == 2
+        assert wait(cephadm_module, Run().run(1)) == 2
 
     @pytest.mark.parametrize("input,expected", [
         ([], []),
@@ -95,7 +84,7 @@ class TestCompletion(object):
                 return str(args)
 
         c = Run().run(input)
-        self._wait(cephadm_module, c)
+        wait(cephadm_module, c)
         assert c.result == expected
 
     def test_then1(self, cephadm_module):
@@ -103,7 +92,7 @@ class TestCompletion(object):
         def run(x):
             return x+1
 
-        assert self._wait(cephadm_module, run([1,2]).then(str)) == '[2, 3]'
+        assert wait(cephadm_module, run([1,2]).then(str)) == '[2, 3]'
 
     def test_then2(self, cephadm_module):
         @async_map_completion
@@ -117,7 +106,7 @@ class TestCompletion(object):
 
         c = run([1,2]).then(async_str)
 
-        self._wait(cephadm_module, c)
+        wait(cephadm_module, c)
         assert c.result == '[2, 3]'
 
     def test_then3(self, cephadm_module):
@@ -131,7 +120,7 @@ class TestCompletion(object):
 
         c = run([1,2]).then(async_str)
 
-        self._wait(cephadm_module, c)
+        wait(cephadm_module, c)
         assert c.result == '[2, 3]'
 
     def test_then4(self, cephadm_module):
@@ -145,7 +134,7 @@ class TestCompletion(object):
 
         c = run([1,2]).then(async_str)
 
-        self._wait(cephadm_module, c)
+        wait(cephadm_module, c)
         assert c.result == '[2, 3]hello'
 
     @pytest.mark.skip(reason="see limitation of async_map_completion")
@@ -157,7 +146,7 @@ class TestCompletion(object):
 
         c = run([1,2])
 
-        self._wait(cephadm_module, c)
+        wait(cephadm_module, c)
         assert c.result == "['2', '3']"
 
     def test_raise(self, cephadm_module):
@@ -166,4 +155,24 @@ class TestCompletion(object):
             raise ZeroDivisionError()
 
         with pytest.raises(ZeroDivisionError):
-            self._wait(cephadm_module, run(1))
+            wait(cephadm_module, run(1))
+
+    def test_progress(self, cephadm_module):
+        @async_map_completion
+        def run(*args):
+            return str(args)
+
+        c = run(list(range(2)))
+        c.update_progress = True
+        c.add_progress(
+            mgr=cephadm_module,
+            message="my progress"
+        )
+        wait(cephadm_module, c)
+        assert c.result == [str((x,)) for x in range(2)]
+        assert cephadm_module.remote.mock_calls == [
+            mock.call('progress', 'update', mock.ANY, 'my progress', float(i) / 2, [('origin', 'orchestrator')])
+            for i in range(2+1)] + [
+            mock.call('progress', 'update', mock.ANY, 'my progress', 1.0, [('origin', 'orchestrator')]),
+            mock.call('progress', 'complete', mock.ANY),
+        ]
index b1946863909dd9584b848abc6d55d9bcd9055ff8..ac4e6214b3745618098cffef76ae6c5f21a13103 100644 (file)
@@ -273,7 +273,8 @@ class _Promise(object):
 
         :param value: new value.
         """
-        assert self._state in (self.INITIALIZED, self.RUNNING)
+        if self._state not in (self.INITIALIZED, self.RUNNING):
+            raise ValueError('finalize: {} already finished. {}'.format(repr(self), value))
 
         self._state = self.RUNNING
 
@@ -402,8 +403,7 @@ class ProgressReference(object):
 
     def __call__(self, arg):
         self._completion_has_result = True
-        if self.progress == 0.0:
-            self.progress = 0.5
+        self.progress = 1.0
         return arg
 
     @property
@@ -412,6 +412,7 @@ class ProgressReference(object):
 
     @progress.setter
     def progress(self, progress):
+        assert progress <= 1.0
         self._progress = progress
         try:
             if self.effective:
index abbea033cbc26039e6624c2e9e05e8f8d5764645..3687e253d1ef6fd558263ff9495b0cff26b55e23 100644 (file)
@@ -177,7 +177,7 @@ def test_progress():
     mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', 0.0, [('origin', 'orchestrator')])
 
     c.finalize()
-    mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', 0.5, [('origin', 'orchestrator')])
+    mgr.remote.assert_called_with('progress', 'complete', c.progress_reference.progress_id)
 
     c.progress_reference.update()
     mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', progress_val, [('origin', 'orchestrator')])