Now, progress events are part of `WriteCompletion` istead of part of the orchestrator module.
It does not yet provide a way to just show orchestrator events.
Also fixes issue in the SSH orchestrator
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
:members:
.. autoclass:: ReadCompletion
+ :members:
+
.. autoclass:: WriteCompletion
+ :members:
Placement
---------
def _orch_cmd(self, *args):
return self.mgr_cluster.mon_manager.raw_cluster_cmd("orchestrator", *args)
+ def _progress_cmd(self, *args):
+ return self.mgr_cluster.mon_manager.raw_cluster_cmd("progress", *args)
+
def _orch_cmd_result(self, *args, **kwargs):
"""
raw_cluster_cmd doesn't support kwargs.
self.assertEqual(ret, errno.ENOENT)
ret = self._orch_cmd_result("host", "add", "raise_import_error")
self.assertEqual(ret, errno.ENOENT)
+
+ def test_progress(self):
+ self._progress_cmd('clear')
+ evs = json.loads(self._progress_cmd('json'))['completed']
+ self.assertEqual(len(evs), 0)
+ self._orch_cmd("mgr", "update", "4")
+ evs = json.loads(self._progress_cmd('json'))['completed']
+ self.assertEqual(len(evs), 1)
+ self.assertIn('update_mgrs', evs[0]['message'])
import sys
import time
import fnmatch
+import uuid
+
+import six
+
+from mgr_util import format_bytes
try:
from typing import TypeVar, Generic, List, Optional, Union, Tuple
except ImportError:
T, G = object, object
-import six
-
-from mgr_util import format_bytes
-
class OrchestratorError(Exception):
"""
"""
def __init__(self):
- pass
+ self.progress_id = str(uuid.uuid4())
+
+ #: if a orchestrator module can provide a more detailed
+ #: progress information, it needs to also call ``progress.update()``.
+ self.progress = 0.5
+
+ def __str__(self):
+ """
+ ``__str__()`` is used for determining the message for progress events.
+ """
+ return super(WriteCompletion, self).__str__()
@property
def is_persistent(self):
# Otherwise meth is always bound to last key
def shim(method_name):
def inner(self, *args, **kwargs):
- return self._oremote(method_name, args, kwargs)
+ completion = self._oremote(method_name, args, kwargs)
+ self._update_completion_progress(completion, 0)
+ return completion
return inner
for meth in Orchestrator.__dict__:
self.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(self.module_name, o, meth, args, kwargs))
return self.remote(o, meth, *args, **kwargs)
+ def _update_completion_progress(self, completion, force_progress=None):
+ # type: (WriteCompletion, Optional[float]) -> None
+ try:
+ progress = force_progress if force_progress is not None else completion.progress
+ if completion.is_complete:
+ self.remote("progress", "complete", completion.progress_id)
+ else:
+ self.remote("progress", "update", completion.progress_id, str(completion), progress,
+ ["orchestrator"])
+ except AttributeError:
+ # No WriteCompletion. Ignore.
+ pass
+ except ImportError:
+ # If the progress module is disabled that's fine,
+ # they just won't see the output.
+ pass
+
def _orchestrator_wait(self, completions):
# type: (List[_Completion]) -> None
"""
:raises NoOrchestrator:
:raises ImportError: no `orchestrator_cli` module or backend not found.
"""
+ for c in completions:
+ self._update_completion_progress(c)
while not self.wait(completions):
if any(c.should_wait for c in completions):
time.sleep(5)
else:
break
+ for c in completions:
+ self._update_completion_progress(c)
def _refresh(self):
global _module
_module.log.debug('refreshing mgr for %s (%s) at %f' % (self.id, self._message,
- self._progress))
- _module.update_progress_event(self.id, self._message, self._progress)
+ self.progress))
+ _module.update_progress_event(self.id, self._message, self.progress)
@property
def message(self):
raise NotImplementedError()
def summary(self):
- return "{0} {1}".format(self.progress, self._message)
+ return "{0} {1}".format(self.progress, self.message)
def _progress_str(self, width):
inner_width = width - 2
self._shutdown.set()
self.clear_all_progress_events()
- def update(self, ev_id, ev_msg, ev_progress):
+ def update(self, ev_id, ev_msg, ev_progress, refs=None):
"""
For calling from other mgr modules
"""
+ if refs is None:
+ refs = []
try:
ev = self._events[ev_id]
except KeyError:
- ev = RemoteEvent(ev_id, ev_msg, [])
+ ev = RemoteEvent(ev_id, ev_msg, refs)
self._events[ev_id] = ev
self.log.info("update: starting ev {0} ({1})".format(
ev_id, ev_msg))
global all_completions
all_completions.append(self)
+ def __str__(self):
+ return self.message
+
@property
def result(self):
return self._result
# TODO: configure k8s API addr instead of assuming local
]
- def _progress(self, *args, **kwargs):
- try:
- self.remote("progress", *args, **kwargs)
- except ImportError:
- # If the progress module is disabled that's fine,
- # they just won't see the output.
- pass
-
def wait(self, completions):
self.log.info("wait: completions={0}".format(completions))
if c.is_complete:
continue
- if not c.is_read:
- self._progress("update", c.id, c.message, 0.5)
-
try:
c.execute()
except Exception as e:
))
c.error = e
c._complete = True
- if not c.is_read:
- self._progress("complete", c.id)
- else:
- if c.is_complete:
- if not c.is_read:
- self._progress("complete", c.id)
if not c.is_complete:
incomplete = True
class SSHWriteCompletion(orchestrator.WriteCompletion):
def __init__(self, result):
+ super(SSHWriteCompletion, self).__init__()
if isinstance(result, multiprocessing.pool.AsyncResult):
self._result = [result]
else:
class SSHWriteCompletionReady(SSHWriteCompletion):
def __init__(self, result):
+ orchestrator.WriteCompletion.__init__(self)
self._result = result
@property
The implementation is similar to the Rook orchestrator, but simpler.
"""
- def _progress(self, *args, **kwargs):
- try:
- self.remote("progress", *args, **kwargs)
- except ImportError:
- # If the progress module is disabled that's fine,
- # they just won't see the output.
- pass
def wait(self, completions):
self.log.info("wait: completions={0}".format(completions))
- incomplete = False
-
# Our `wait` implementation is very simple because everything's
# just an API call.
for c in completions:
if c.is_complete:
continue
- if not c.is_read:
- self._progress("update", c.id, c.message, 0.5)
-
try:
c.execute()
except Exception as e:
))
c.exception = e
c._complete = True
- if not c.is_read:
- self._progress("complete", c.id)
- else:
- if c.is_complete:
- if not c.is_read:
- self._progress("complete", c.id)
-
- if not c.is_complete:
- incomplete = True
- return not incomplete
+ return all(c.is_complete for c in completions)
def available(self):
return True, ""