import os
import threading
import functools
-import uuid
from subprocess import check_output, CalledProcessError
+from typing import Callable, TypeVar, List
import six
from ceph.deployment import inventory
from mgr_module import CLICommand, HandleCommandResult
-from mgr_module import MgrModule, PersistentStoreDict
+from mgr_module import MgrModule
import orchestrator
+T = TypeVar('T')
-
-class TestCompletionMixin(object):
- all_completions = [] # type: orchestrator.Completion
-
- def __init__(self, cb, message, *args, **kwargs):
- super(TestCompletionMixin, self).__init__(*args, **kwargs)
- self.cb = cb
- self._result = None
- self._complete = False
-
- self.message = message
- self.id = str(uuid.uuid4())
-
- TestCompletionMixin.all_completions.append(self)
-
- @property
- def result(self):
- return self._result
-
- @property
- def has_result(self):
- return self._complete
-
- def execute(self):
- try:
- self._result = self.cb()
- self.executed = True
- except Exception as e:
- self.exception = e
- finally:
- self._complete = True
-
- def __str__(self):
- return "{}(result={} message={}, exception={})".format(self.__class__.__name__, self.result,
- self.message, self.exception)
-
-
-class TestReadCompletion(TestCompletionMixin, orchestrator.ReadCompletion):
- def __init__(self, cb):
- super(TestReadCompletion, self).__init__(cb, "<read op>")
-
-
-class TestWriteCompletion(TestCompletionMixin, orchestrator.WriteCompletion):
- def __init__(self, cb, message):
- super(TestWriteCompletion, self).__init__(cb, message)
-
- @property
- def has_result(self):
- return (not self.is_errored) and self.executed
-
- @property
- def is_effective(self):
- return self._complete
-
-
-def deferred_write(message):
- def wrapper(f):
- @functools.wraps(f)
- def inner(*args, **kwargs):
- return TestWriteCompletion(lambda: f(*args, **kwargs),
- '{}, args={}, kwargs={}'.format(message, args, kwargs))
- return inner
- return wrapper
+class TestCompletion(orchestrator.Completion[T]):
+ def evaluate(self):
+ self._first_promise.finalize(None)
def deferred_read(f):
+ # type: (Callable[..., T]) -> Callable[..., TestCompletion[T]]
"""
- Decorator to make TestOrchestrator methods return
+ Decorator to make methods return
a completion object that executes themselves.
"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
- return TestReadCompletion(lambda: f(*args, **kwargs))
+ return TestCompletion(on_complete=lambda _: f(*args, **kwargs))
return wrapper
+def deferred_write(message):
+ def inner(f):
+ # type: (Callable[..., T]) -> Callable[..., TestCompletion[T]]
+
+ @functools.wraps(f)
+ def wrapper(self, *args, **kwargs):
+ return TestCompletion.with_progress(
+ message=message,
+ mgr=self,
+ on_complete=lambda _: f(self, *args, **kwargs),
+ )
+
+ return wrapper
+ return inner
+
+
class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
"""
This is an orchestrator implementation used for internal testing. It's meant for
"""
def process(self, completions):
- for c in completions:
- if not isinstance(c, TestReadCompletion) and \
- not isinstance(c, TestWriteCompletion):
- raise TypeError(
- "wait() requires list of completions, not {0}".format(
- c.__class__
- ))
+ # type: (List[TestCompletion]) -> None
+ if completions:
+ self.log.info("wait: promises={0}".format(completions))
- if c.needs_result:
- c.execute()
+ for p in completions:
+ p.evaluate()
@CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w')
def _load_data(self, inbuf):
self._initialized = threading.Event()
self._shutdown = threading.Event()
self._init_data({})
+ self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
def shutdown(self):
self._shutdown.set()
# in case we had a caller that wait()'ed on them long enough
# to get persistence but not long enough to get completion
- self.wait(TestCompletionMixin.all_completions)
- TestCompletionMixin.all_completions = [c for c in TestCompletionMixin.all_completions if
- c.is_finished]
+ self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
+ for p in self.all_progress_references:
+ p.update()
self._shutdown.wait(5)
return result
+ def create_osds(self, drive_group):
+ def run(all_hosts):
+ drive_group.validate(orchestrator.InventoryNode.get_host_names(all_hosts))
+ return self.get_hosts().then(run).then(
+ on_complete=orchestrator.ProgressReference(
+ message='create_osds',
+ mgr=self,
+ )
+
+ )
- @deferred_write("create_osds")
- def create_osds(self, drive_group, all_hosts):
- drive_group.validate(all_hosts)
@deferred_write("remove_osds")
def remove_osds(self, osd_ids, destroy=False):