try:
from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
- Type, Sequence, Dict
+ Type, Sequence, Dict, cast
except ImportError:
pass
Call ``on_complete`` as soon as this promise is finalized.
"""
assert self._state in (self.INITIALIZED, self.RUNNING)
+
+ if self._next_promise is not None:
+ return self._next_promise.then(on_complete)
+
if self._on_complete is not None:
- assert self._next_promise is None
self._set_next_promise(self.__class__(
_first_promise=self._first_promise,
on_complete=on_complete
"""
Applies any spec
"""
- fns = {
+ fns: Dict[str, Callable[[ServiceSpec], Completion]] = {
'alertmanager': self.apply_alertmanager,
'crash': self.apply_crash,
'grafana': self.apply_grafana,
'mds': self.apply_mds,
'mgr': self.apply_mgr,
'mon': self.apply_mon,
- 'nfs': self.apply_nfs,
+ 'nfs': cast(Callable[[ServiceSpec], Completion], self.apply_nfs),
'node-exporter': self.apply_node_exporter,
- 'osd': self.apply_drivegroups,
+ 'osd': cast(Callable[[ServiceSpec], Completion], lambda dg: self.apply_drivegroups([dg])),
'prometheus': self.apply_prometheus,
'rbd-mirror': self.apply_rbd_mirror,
- 'rgw': self.apply_rgw,
+ 'rgw': cast(Callable[[ServiceSpec], Completion], self.apply_rgw),
}
- spec, [specs] = specs
- completion = fns[spec.service_name](spec)
+ def merge(ls, r):
+ if isinstance(ls, list):
+ return ls + [r]
+ return [ls, r]
+
+ spec, *specs = specs
+
+ completion = fns[spec.service_type](spec)
for s in specs:
- completion.then(fns[spec.service_name](spec))
+ def next(ls):
+ return fns[s.service_type](s).then(lambda r: merge(ls, r))
+ completion = completion.then(next)
return completion
def remove_daemons(self, names):
self._ceph_get_version = mock.Mock()
self._ceph_get = mock.MagicMock()
self._ceph_get_module_option = mock.MagicMock()
+ self._ceph_get_option = mock.MagicMock()
+ self._validate_module_option = lambda _: True
+ self._configure_logging = lambda *_: None
+ self._unconfigure_logging = mock.MagicMock()
self._ceph_log = mock.MagicMock()
self._ceph_get_store = lambda _: ''
self._ceph_get_store_prefix = lambda _: {}
+ self._ceph_dispatch_remote = lambda *_: None
cm = mock.Mock()
from __future__ import absolute_import
+from ceph.deployment.service_spec import ServiceSpec
+from test_orchestrator import TestOrchestrator as _TestOrchestrator
from tests import mock
import pytest
assert p.result == 5
+def test_apply():
+ to = _TestOrchestrator('', 0, 0)
+ completion = to.apply([
+ ServiceSpec(service_type='nfs'),
+ ServiceSpec(service_type='nfs'),
+ ServiceSpec(service_type='nfs'),
+ ])
+ completion.finalize(42)
+ assert completion.result == [None, None, None]
\ No newline at end of file