]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
qa/orchestrator: Adapt teuthology test to new completions
authorSebastian Wagner <sebastian.wagner@suse.com>
Mon, 9 Sep 2019 09:21:54 +0000 (11:21 +0200)
committerSebastian Wagner <sebastian.wagner@suse.com>
Wed, 27 Nov 2019 12:38:20 +0000 (13:38 +0100)
Also: Adapt mgr/test_orchestrator to new completions

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
qa/tasks/mgr/test_orchestrator_cli.py
src/pybind/mgr/test_orchestrator/module.py

index 80829c38efd121e807e1acbaf0e92596f0da047d..0bcbdfd4ed1cc04f4bbd52afad5e8135bb51d206 100644 (file)
@@ -2,6 +2,7 @@ import errno
 import json
 import logging
 from tempfile import NamedTemporaryFile
+from time import sleep
 
 from teuthology.exceptions import CommandFailedError
 
@@ -180,6 +181,7 @@ class TestOrchestratorCli(MgrTestCase):
         evs = json.loads(self._progress_cmd('json'))['completed']
         self.assertEqual(len(evs), 0)
         self._orch_cmd("mgr", "update", "4")
+        sleep(6)  # There is a sleep(5) in the test_orchestrator.module.serve()
         evs = json.loads(self._progress_cmd('json'))['completed']
         self.assertEqual(len(evs), 1)
         self.assertIn('update_mgrs', evs[0]['message'])
index ce41ce89ab7fab0241820c9581b5ced7a4006c82..43c6c44da21f792340dfc1f0277f7236bd113115 100644 (file)
@@ -4,97 +4,55 @@ import re
 import os
 import threading
 import functools
-import uuid
 from subprocess import check_output, CalledProcessError
+from typing import Callable, TypeVar, List
 
 import six
 
 from ceph.deployment import inventory
 from mgr_module import CLICommand, HandleCommandResult
-from mgr_module import MgrModule, PersistentStoreDict
+from mgr_module import MgrModule
 
 import orchestrator
 
+T = TypeVar('T')
 
 
-
-class TestCompletionMixin(object):
-    all_completions = []  # type: orchestrator.Completion
-
-    def __init__(self, cb, message, *args, **kwargs):
-        super(TestCompletionMixin, self).__init__(*args, **kwargs)
-        self.cb = cb
-        self._result = None
-        self._complete = False
-
-        self.message = message
-        self.id = str(uuid.uuid4())
-
-        TestCompletionMixin.all_completions.append(self)
-
-    @property
-    def result(self):
-        return self._result
-
-    @property
-    def has_result(self):
-        return self._complete
-
-    def execute(self):
-        try:
-            self._result = self.cb()
-            self.executed = True
-        except Exception as e:
-            self.exception = e
-        finally:
-            self._complete = True
-
-    def __str__(self):
-        return "{}(result={} message={}, exception={})".format(self.__class__.__name__, self.result,
-                                                               self.message, self.exception)
-
-
-class TestReadCompletion(TestCompletionMixin, orchestrator.ReadCompletion):
-    def __init__(self, cb):
-        super(TestReadCompletion, self).__init__(cb, "<read op>")
-
-
-class TestWriteCompletion(TestCompletionMixin, orchestrator.WriteCompletion):
-    def __init__(self, cb, message):
-        super(TestWriteCompletion, self).__init__(cb, message)
-
-    @property
-    def has_result(self):
-        return (not self.is_errored) and self.executed
-
-    @property
-    def is_effective(self):
-        return self._complete
-
-
-def deferred_write(message):
-    def wrapper(f):
-        @functools.wraps(f)
-        def inner(*args, **kwargs):
-            return TestWriteCompletion(lambda: f(*args, **kwargs),
-                                       '{}, args={}, kwargs={}'.format(message, args, kwargs))
-        return inner
-    return wrapper
+class TestCompletion(orchestrator.Completion[T]):
+    def evaluate(self):
+        self._first_promise.finalize(None)
 
 
 def deferred_read(f):
+    # type: (Callable[..., T]) -> Callable[..., TestCompletion[T]]
     """
-    Decorator to make TestOrchestrator methods return
+    Decorator to make methods return
     a completion object that executes themselves.
     """
 
     @functools.wraps(f)
     def wrapper(*args, **kwargs):
-        return TestReadCompletion(lambda: f(*args, **kwargs))
+        return TestCompletion(on_complete=lambda _: f(*args, **kwargs))
 
     return wrapper
 
 
+def deferred_write(message):
+    def inner(f):
+        # type: (Callable[..., T]) -> Callable[..., TestCompletion[T]]
+
+        @functools.wraps(f)
+        def wrapper(self, *args, **kwargs):
+            return TestCompletion.with_progress(
+                message=message,
+                mgr=self,
+                on_complete=lambda _: f(self, *args, **kwargs),
+            )
+
+        return wrapper
+    return inner
+
+
 class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
     """
     This is an orchestrator implementation used for internal testing. It's meant for
@@ -106,16 +64,12 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
     """
 
     def process(self, completions):
-        for c in completions:
-            if not isinstance(c, TestReadCompletion) and \
-                    not isinstance(c, TestWriteCompletion):
-                raise TypeError(
-                    "wait() requires list of completions, not {0}".format(
-                        c.__class__
-                    ))
+        # type: (List[TestCompletion]) -> None
+        if completions:
+            self.log.info("wait: promises={0}".format(completions))
 
-            if c.needs_result:
-                c.execute()
+            for p in completions:
+                p.evaluate()
 
     @CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w')
     def _load_data(self, inbuf):
@@ -138,6 +92,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
         self._initialized = threading.Event()
         self._shutdown = threading.Event()
         self._init_data({})
+        self.all_progress_references = list()  # type: List[orchestrator.ProgressReference]
 
     def shutdown(self):
         self._shutdown.set()
@@ -151,9 +106,9 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
             # in case we had a caller that wait()'ed on them long enough
             # to get persistence but not long enough to get completion
 
-            self.wait(TestCompletionMixin.all_completions)
-            TestCompletionMixin.all_completions = [c for c in TestCompletionMixin.all_completions if
-                                                   c.is_finished]
+            self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
+            for p in self.all_progress_references:
+                p.update()
 
             self._shutdown.wait(5)
 
@@ -224,10 +179,17 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         return result
 
+    def create_osds(self, drive_group):
+        def run(all_hosts):
+            drive_group.validate(orchestrator.InventoryNode.get_host_names(all_hosts))
+        return self.get_hosts().then(run).then(
+            on_complete=orchestrator.ProgressReference(
+                message='create_osds',
+                mgr=self,
+            )
+
+        )
 
-    @deferred_write("create_osds")
-    def create_osds(self, drive_group, all_hosts):
-        drive_group.validate(all_hosts)
 
     @deferred_write("remove_osds")
     def remove_osds(self, osd_ids, destroy=False):