]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/orchestrator: add progress events to all orchestrators 26654/head
authorSebastian Wagner <sebastian.wagner@suse.com>
Tue, 26 Feb 2019 16:27:53 +0000 (17:27 +0100)
committerSebastian Wagner <sebastian.wagner@suse.com>
Wed, 8 May 2019 09:07:02 +0000 (11:07 +0200)
Now, progress events are part of `WriteCompletion` istead of part of the orchestrator module.

It does not yet provide a way to just show orchestrator events.

Also fixes issue in the SSH orchestrator

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
doc/mgr/orchestrator_modules.rst
qa/tasks/mgr/test_orchestrator_cli.py
src/pybind/mgr/orchestrator.py
src/pybind/mgr/progress/module.py
src/pybind/mgr/rook/module.py
src/pybind/mgr/ssh/module.py
src/pybind/mgr/test_orchestrator/module.py

index 76e426d7b7f393311ca1118c33f62a91a7f931d2..6e6005718f530f96ce073547e5a96b64f4e5b673 100644 (file)
@@ -139,7 +139,10 @@ effect.  Second, the completion becomes *effective*, meaning that the operation
    :members:
 
 .. autoclass:: ReadCompletion
+   :members:
+
 .. autoclass:: WriteCompletion
+   :members:
 
 Placement
 ---------
index c91238c4135309eed237542734cf7415b5ca80cc..86f72678926b1bfd465eee700df8cf3b7698f768 100644 (file)
@@ -17,6 +17,9 @@ class TestOrchestratorCli(MgrTestCase):
     def _orch_cmd(self, *args):
         return self.mgr_cluster.mon_manager.raw_cluster_cmd("orchestrator", *args)
 
+    def _progress_cmd(self, *args):
+        return self.mgr_cluster.mon_manager.raw_cluster_cmd("progress", *args)
+
     def _orch_cmd_result(self, *args, **kwargs):
         """
         raw_cluster_cmd doesn't support kwargs.
@@ -141,3 +144,12 @@ class TestOrchestratorCli(MgrTestCase):
         self.assertEqual(ret, errno.ENOENT)
         ret = self._orch_cmd_result("host", "add", "raise_import_error")
         self.assertEqual(ret, errno.ENOENT)
+
+    def test_progress(self):
+        self._progress_cmd('clear')
+        evs = json.loads(self._progress_cmd('json'))['completed']
+        self.assertEqual(len(evs), 0)
+        self._orch_cmd("mgr", "update", "4")
+        evs = json.loads(self._progress_cmd('json'))['completed']
+        self.assertEqual(len(evs), 1)
+        self.assertIn('update_mgrs', evs[0]['message'])
index e66f926a3f33f669a61b84f841319bf3e14e4565..4a23f1be28d6197791d032ab91379294d05c162e 100644 (file)
@@ -7,6 +7,11 @@ Please see the ceph-mgr module developer's guide for more information.
 import sys
 import time
 import fnmatch
+import uuid
+
+import six
+
+from mgr_util import format_bytes
 
 try:
     from typing import TypeVar, Generic, List, Optional, Union, Tuple
@@ -15,10 +20,6 @@ try:
 except ImportError:
     T, G = object, object
 
-import six
-
-from mgr_util import format_bytes
-
 
 class OrchestratorError(Exception):
     """
@@ -150,7 +151,17 @@ class WriteCompletion(_Completion):
     """
 
     def __init__(self):
-        pass
+        self.progress_id = str(uuid.uuid4())
+
+        #: if a orchestrator module can provide a more detailed
+        #: progress information, it needs to also call ``progress.update()``.
+        self.progress = 0.5
+
+    def __str__(self):
+        """
+        ``__str__()`` is used for determining the message for progress events.
+        """
+        return super(WriteCompletion, self).__str__()
 
     @property
     def is_persistent(self):
@@ -835,7 +846,9 @@ def _mk_orch_methods(cls):
     # Otherwise meth is always bound to last key
     def shim(method_name):
         def inner(self, *args, **kwargs):
-            return self._oremote(method_name, args, kwargs)
+            completion = self._oremote(method_name, args, kwargs)
+            self._update_completion_progress(completion, 0)
+            return completion
         return inner
 
     for meth in Orchestrator.__dict__:
@@ -879,6 +892,23 @@ class OrchestratorClientMixin(Orchestrator):
         self.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(self.module_name, o, meth, args, kwargs))
         return self.remote(o, meth, *args, **kwargs)
 
+    def _update_completion_progress(self, completion, force_progress=None):
+        # type: (WriteCompletion, Optional[float]) -> None
+        try:
+            progress = force_progress if force_progress is not None else completion.progress
+            if completion.is_complete:
+                self.remote("progress", "complete", completion.progress_id)
+            else:
+                self.remote("progress", "update", completion.progress_id, str(completion), progress,
+                            ["orchestrator"])
+        except AttributeError:
+            # No WriteCompletion. Ignore.
+            pass
+        except ImportError:
+            # If the progress module is disabled that's fine,
+            # they just won't see the output.
+            pass
+
     def _orchestrator_wait(self, completions):
         # type: (List[_Completion]) -> None
         """
@@ -891,8 +921,12 @@ class OrchestratorClientMixin(Orchestrator):
         :raises NoOrchestrator:
         :raises ImportError: no `orchestrator_cli` module or backend not found.
         """
+        for c in completions:
+            self._update_completion_progress(c)
         while not self.wait(completions):
             if any(c.should_wait for c in completions):
                 time.sleep(5)
             else:
                 break
+        for c in completions:
+            self._update_completion_progress(c)
index 3dd440ef993b77cc2d652fb107b13ee1960dfa1f..702517495be0dd4772b59c7db9a9aa5cc0412600 100644 (file)
@@ -30,8 +30,8 @@ class Event(object):
     def _refresh(self):
         global _module
         _module.log.debug('refreshing mgr for %s (%s) at %f' % (self.id, self._message,
-                                                                self._progress))
-        _module.update_progress_event(self.id, self._message, self._progress)
+                                                                self.progress))
+        _module.update_progress_event(self.id, self._message, self.progress)
 
     @property
     def message(self):
@@ -46,7 +46,7 @@ class Event(object):
         raise NotImplementedError()
 
     def summary(self):
-        return "{0} {1}".format(self.progress, self._message)
+        return "{0} {1}".format(self.progress, self.message)
 
     def _progress_str(self, width):
         inner_width = width - 2
@@ -477,14 +477,16 @@ class Module(MgrModule):
         self._shutdown.set()
         self.clear_all_progress_events()
 
-    def update(self, ev_id, ev_msg, ev_progress):
+    def update(self, ev_id, ev_msg, ev_progress, refs=None):
         """
         For calling from other mgr modules
         """
+        if refs is None:
+            refs = []
         try:
             ev = self._events[ev_id]
         except KeyError:
-            ev = RemoteEvent(ev_id, ev_msg, [])
+            ev = RemoteEvent(ev_id, ev_msg, refs)
             self._events[ev_id] = ev
             self.log.info("update: starting ev {0} ({1})".format(
                 ev_id, ev_msg))
index f3864f5ba05cc470b0d9bdeda0e7e554f5c633c9..bf377b83ed51d69a265057fce1b9eff24cb890d7 100644 (file)
@@ -92,6 +92,9 @@ class RookWriteCompletion(orchestrator.WriteCompletion):
         global all_completions
         all_completions.append(self)
 
+    def __str__(self):
+        return self.message
+
     @property
     def result(self):
         return self._result
@@ -158,14 +161,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         # TODO: configure k8s API addr instead of assuming local
     ]
 
-    def _progress(self, *args, **kwargs):
-        try:
-            self.remote("progress", *args, **kwargs)
-        except ImportError:
-            # If the progress module is disabled that's fine,
-            # they just won't see the output.
-            pass
-
     def wait(self, completions):
         self.log.info("wait: completions={0}".format(completions))
 
@@ -184,9 +179,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             if c.is_complete:
                 continue
 
-            if not c.is_read:
-                self._progress("update", c.id, c.message, 0.5)
-
             try:
                 c.execute()
             except Exception as e:
@@ -195,12 +187,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
                 ))
                 c.error = e
                 c._complete = True
-                if not c.is_read:
-                    self._progress("complete", c.id)
-            else:
-                if c.is_complete:
-                    if not c.is_read:
-                        self._progress("complete", c.id)
 
             if not c.is_complete:
                 incomplete = True
index a9eb060b90a10ac3a10e634226b1cec6fd6f6324..b6ebad6e5ed7aedfb12549421c4214d8e1dd7c4b 100644 (file)
@@ -54,6 +54,7 @@ class SSHReadCompletionReady(SSHReadCompletion):
 
 class SSHWriteCompletion(orchestrator.WriteCompletion):
     def __init__(self, result):
+        super(SSHWriteCompletion, self).__init__()
         if isinstance(result, multiprocessing.pool.AsyncResult):
             self._result = [result]
         else:
@@ -83,6 +84,7 @@ class SSHWriteCompletion(orchestrator.WriteCompletion):
 
 class SSHWriteCompletionReady(SSHWriteCompletion):
     def __init__(self, result):
+        orchestrator.WriteCompletion.__init__(self)
         self._result = result
 
     @property
index be105f69dfcf161a079f5e894bb1f88c62e433d2..236207358c8ab0e751da4f67c14f0cbb458fd344 100644 (file)
@@ -95,19 +95,10 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
 
     The implementation is similar to the Rook orchestrator, but simpler.
     """
-    def _progress(self, *args, **kwargs):
-        try:
-            self.remote("progress", *args, **kwargs)
-        except ImportError:
-            # If the progress module is disabled that's fine,
-            # they just won't see the output.
-            pass
 
     def wait(self, completions):
         self.log.info("wait: completions={0}".format(completions))
 
-        incomplete = False
-
         # Our `wait` implementation is very simple because everything's
         # just an API call.
         for c in completions:
@@ -121,9 +112,6 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
             if c.is_complete:
                 continue
 
-            if not c.is_read:
-                self._progress("update", c.id, c.message, 0.5)
-
             try:
                 c.execute()
             except Exception as e:
@@ -132,17 +120,8 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
                 ))
                 c.exception = e
                 c._complete = True
-                if not c.is_read:
-                    self._progress("complete", c.id)
-            else:
-                if c.is_complete:
-                    if not c.is_read:
-                        self._progress("complete", c.id)
-
-            if not c.is_complete:
-                incomplete = True
 
-        return not incomplete
+        return all(c.is_complete for c in completions)
 
     def available(self):
         return True, ""