From 6abc9e95bd7da47791ffa1b97c7e6c9609a2efc7 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Mon, 9 Sep 2019 11:21:54 +0200 Subject: [PATCH] qa/orchestrator: Adapt teuthology test to new completions Also: Adapt mgr/test_orchestrator to new completions Signed-off-by: Sebastian Wagner --- qa/tasks/mgr/test_orchestrator_cli.py | 2 + src/pybind/mgr/test_orchestrator/module.py | 126 +++++++-------------- 2 files changed, 46 insertions(+), 82 deletions(-) diff --git a/qa/tasks/mgr/test_orchestrator_cli.py b/qa/tasks/mgr/test_orchestrator_cli.py index 80829c38efd12..0bcbdfd4ed1cc 100644 --- a/qa/tasks/mgr/test_orchestrator_cli.py +++ b/qa/tasks/mgr/test_orchestrator_cli.py @@ -2,6 +2,7 @@ import errno import json import logging from tempfile import NamedTemporaryFile +from time import sleep from teuthology.exceptions import CommandFailedError @@ -180,6 +181,7 @@ class TestOrchestratorCli(MgrTestCase): evs = json.loads(self._progress_cmd('json'))['completed'] self.assertEqual(len(evs), 0) self._orch_cmd("mgr", "update", "4") + sleep(6) # There is a sleep(5) in the test_orchestrator.module.serve() evs = json.loads(self._progress_cmd('json'))['completed'] self.assertEqual(len(evs), 1) self.assertIn('update_mgrs', evs[0]['message']) diff --git a/src/pybind/mgr/test_orchestrator/module.py b/src/pybind/mgr/test_orchestrator/module.py index ce41ce89ab7fa..43c6c44da21f7 100644 --- a/src/pybind/mgr/test_orchestrator/module.py +++ b/src/pybind/mgr/test_orchestrator/module.py @@ -4,97 +4,55 @@ import re import os import threading import functools -import uuid from subprocess import check_output, CalledProcessError +from typing import Callable, TypeVar, List import six from ceph.deployment import inventory from mgr_module import CLICommand, HandleCommandResult -from mgr_module import MgrModule, PersistentStoreDict +from mgr_module import MgrModule import orchestrator +T = TypeVar('T') - -class TestCompletionMixin(object): - all_completions = [] # type: orchestrator.Completion - - def __init__(self, cb, message, *args, **kwargs): - super(TestCompletionMixin, self).__init__(*args, **kwargs) - self.cb = cb - self._result = None - self._complete = False - - self.message = message - self.id = str(uuid.uuid4()) - - TestCompletionMixin.all_completions.append(self) - - @property - def result(self): - return self._result - - @property - def has_result(self): - return self._complete - - def execute(self): - try: - self._result = self.cb() - self.executed = True - except Exception as e: - self.exception = e - finally: - self._complete = True - - def __str__(self): - return "{}(result={} message={}, exception={})".format(self.__class__.__name__, self.result, - self.message, self.exception) - - -class TestReadCompletion(TestCompletionMixin, orchestrator.ReadCompletion): - def __init__(self, cb): - super(TestReadCompletion, self).__init__(cb, "") - - -class TestWriteCompletion(TestCompletionMixin, orchestrator.WriteCompletion): - def __init__(self, cb, message): - super(TestWriteCompletion, self).__init__(cb, message) - - @property - def has_result(self): - return (not self.is_errored) and self.executed - - @property - def is_effective(self): - return self._complete - - -def deferred_write(message): - def wrapper(f): - @functools.wraps(f) - def inner(*args, **kwargs): - return TestWriteCompletion(lambda: f(*args, **kwargs), - '{}, args={}, kwargs={}'.format(message, args, kwargs)) - return inner - return wrapper +class TestCompletion(orchestrator.Completion[T]): + def evaluate(self): + self._first_promise.finalize(None) def deferred_read(f): + # type: (Callable[..., T]) -> Callable[..., TestCompletion[T]] """ - Decorator to make TestOrchestrator methods return + Decorator to make methods return a completion object that executes themselves. """ @functools.wraps(f) def wrapper(*args, **kwargs): - return TestReadCompletion(lambda: f(*args, **kwargs)) + return TestCompletion(on_complete=lambda _: f(*args, **kwargs)) return wrapper +def deferred_write(message): + def inner(f): + # type: (Callable[..., T]) -> Callable[..., TestCompletion[T]] + + @functools.wraps(f) + def wrapper(self, *args, **kwargs): + return TestCompletion.with_progress( + message=message, + mgr=self, + on_complete=lambda _: f(self, *args, **kwargs), + ) + + return wrapper + return inner + + class TestOrchestrator(MgrModule, orchestrator.Orchestrator): """ This is an orchestrator implementation used for internal testing. It's meant for @@ -106,16 +64,12 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): """ def process(self, completions): - for c in completions: - if not isinstance(c, TestReadCompletion) and \ - not isinstance(c, TestWriteCompletion): - raise TypeError( - "wait() requires list of completions, not {0}".format( - c.__class__ - )) + # type: (List[TestCompletion]) -> None + if completions: + self.log.info("wait: promises={0}".format(completions)) - if c.needs_result: - c.execute() + for p in completions: + p.evaluate() @CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w') def _load_data(self, inbuf): @@ -138,6 +92,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): self._initialized = threading.Event() self._shutdown = threading.Event() self._init_data({}) + self.all_progress_references = list() # type: List[orchestrator.ProgressReference] def shutdown(self): self._shutdown.set() @@ -151,9 +106,9 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): # in case we had a caller that wait()'ed on them long enough # to get persistence but not long enough to get completion - self.wait(TestCompletionMixin.all_completions) - TestCompletionMixin.all_completions = [c for c in TestCompletionMixin.all_completions if - c.is_finished] + self.all_progress_references = [p for p in self.all_progress_references if not p.effective] + for p in self.all_progress_references: + p.update() self._shutdown.wait(5) @@ -224,10 +179,17 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): return result + def create_osds(self, drive_group): + def run(all_hosts): + drive_group.validate(orchestrator.InventoryNode.get_host_names(all_hosts)) + return self.get_hosts().then(run).then( + on_complete=orchestrator.ProgressReference( + message='create_osds', + mgr=self, + ) + + ) - @deferred_write("create_osds") - def create_osds(self, drive_group, all_hosts): - drive_group.validate(all_hosts) @deferred_write("remove_osds") def remove_osds(self, osd_ids, destroy=False): -- 2.39.5