on_complete=None, # type: Optional[Callable]
name=None, # type: Optional[str]
many=False, # type: bool
+ update_progress=False, # type: bool
):
assert CephadmOrchestrator.instance is not None
self.many = many
+ self.update_progress = update_progress
if name is None and on_complete is not None:
- name = on_complete.__name__
+ name = getattr(on_complete, '__name__', None)
super(AsyncCompletion, self).__init__(_first_promise, value, on_complete, name)
@property
def callback(result):
try:
+ if self.update_progress:
+ assert self.progress_reference
+ self.progress_reference.progress = 1.0
self._on_complete_ = None
self._finalize(result)
except Exception as e:
def error_callback(e):
self.fail(e)
- if six.PY3:
- _callback = self._on_complete_
- else:
- def _callback(*args, **kwargs):
- # Py2 only: _worker_pool doesn't call error_callback
+ def run(value):
+ def do_work(*args, **kwargs):
+ assert self._on_complete_ is not None
try:
- return self._on_complete_(*args, **kwargs)
+ res = self._on_complete_(*args, **kwargs)
+ if self.update_progress and self.many:
+ assert self.progress_reference
+ self.progress_reference.progress += 1.0 / len(value)
+ return res
except Exception as e:
- self.fail(e)
+ if six.PY3:
+ raise
+ else:
+ # Py2 only: _worker_pool doesn't call error_callback
+ self.fail(e)
- def run(value):
assert CephadmOrchestrator.instance
+ #if self.update_progress:
+ # self.progress_reference.progress = 0.0
if self.many:
if not value:
logger.info('calling map_async without values')
callback([])
if six.PY3:
- CephadmOrchestrator.instance._worker_pool.map_async(_callback, value,
+ CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
callback=callback,
error_callback=error_callback)
else:
- CephadmOrchestrator.instance._worker_pool.map_async(_callback, value,
+ CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
callback=callback)
else:
if six.PY3:
- CephadmOrchestrator.instance._worker_pool.apply_async(_callback, (value,),
+ CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
callback=callback, error_callback=error_callback)
else:
- CephadmOrchestrator.instance._worker_pool.apply_async(_callback, (value,),
+ CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
callback=callback)
return self.ASYNC_RESULT
-from contextlib import contextmanager
-
+import time
+try:
+ from typing import Any
+except ImportError:
+ pass
import pytest
from cephadm import CephadmOrchestrator
+from orchestrator import raise_if_exception, Completion
from tests import mock
def cephadm_module():
with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
mock.patch("cephadm.module.CephadmOrchestrator._configure_logging", lambda *args: None),\
+ mock.patch("cephadm.module.CephadmOrchestrator.remote"),\
mock.patch("cephadm.module.CephadmOrchestrator.set_store", set_store),\
mock.patch("cephadm.module.CephadmOrchestrator.get_store", get_store),\
mock.patch("cephadm.module.CephadmOrchestrator.get_store_prefix", get_store_prefix):
}
m.__init__('cephadm', 0, 0)
yield m
+
+
+def wait(m, c):
+ # type: (CephadmOrchestrator, Completion) -> Any
+ m.process([c])
+
+ try:
+ import pydevd # if in debugger
+ while True: # don't timeout
+ if c.is_finished:
+ raise_if_exception(c)
+ return c.result
+ time.sleep(0.1)
+ except ImportError: # not in debugger
+ for i in range(30):
+ if i % 10 == 0:
+ m.process([c])
+ if c.is_finished:
+ raise_if_exception(c)
+ return c.result
+ time.sleep(0.1)
+ assert False, "timeout" + str(c._state)
except ImportError:
pass
-from orchestrator import ServiceDescription, raise_if_exception, Completion, InventoryNode, \
- StatelessServiceSpec, PlacementSpec, RGWSpec, parse_host_specs, StatefulServiceSpec
-from ..module import CephadmOrchestrator
+from orchestrator import ServiceDescription, InventoryNode, \
+ StatelessServiceSpec, PlacementSpec, RGWSpec, StatefulServiceSpec
from tests import mock
-from .fixtures import cephadm_module
+from .fixtures import cephadm_module, wait
"""
class TestCephadm(object):
- def _wait(self, m, c):
- # type: (CephadmOrchestrator, Completion) -> Any
- m.process([c])
-
- try:
- import pydevd # if in debugger
- while True: # don't timeout
- if c.is_finished:
- raise_if_exception(c)
- return c.result
- time.sleep(0.1)
- except ImportError: # not in debugger
- for i in range(30):
- if i % 10 == 0:
- m.process([c])
- if c.is_finished:
- raise_if_exception(c)
- return c.result
- time.sleep(0.1)
- assert False, "timeout" + str(c._state)
-
- m.process([c])
- assert False, "timeout" + str(c._state)
@contextmanager
def _with_host(self, m, name):
- self._wait(m, m.add_host(name))
+ wait(m, m.add_host(name))
yield
- self._wait(m, m.remove_host(name))
+ wait(m, m.remove_host(name))
def test_get_unique_name(self, cephadm_module):
existing = [
def test_host(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
- assert self._wait(cephadm_module, cephadm_module.get_hosts()) == [InventoryNode('test')]
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == [InventoryNode('test')]
c = cephadm_module.get_hosts()
- assert self._wait(cephadm_module, c) == []
+ assert wait(cephadm_module, c) == []
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_service_ls(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.describe_service()
- assert self._wait(cephadm_module, c) == []
+ assert wait(cephadm_module, c) == []
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_device_ls(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.get_inventory()
- assert self._wait(cephadm_module, c) == [InventoryNode('test')]
+ assert wait(cephadm_module, c) == [InventoryNode('test')]
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
json.dumps([
cephadm_module.service_cache_timeout = 10
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.service_action('redeploy', 'rgw', service_id='myrgw.foobar')
- assert self._wait(cephadm_module, c) == ["Deployed rgw.myrgw.foobar on host 'test'"]
+ assert wait(cephadm_module, c) == ["Deployed rgw.myrgw.foobar on host 'test'"]
for what in ('start', 'stop', 'restart'):
c = cephadm_module.service_action(what, 'rgw', service_id='myrgw.foobar')
- assert self._wait(cephadm_module, c) == [what + " rgw.myrgw.foobar from host 'test'"]
+ assert wait(cephadm_module, c) == [what + " rgw.myrgw.foobar from host 'test'"]
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
c = cephadm_module.update_mons(StatefulServiceSpec(placement=ps))
- assert self._wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
+ assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
c = cephadm_module.update_mgrs(StatefulServiceSpec(placement=ps))
- [out] = self._wait(cephadm_module, c)
+ [out] = wait(cephadm_module, c)
assert "Deployed mgr." in out
assert " on host 'test'" in out
with self._with_host(cephadm_module, 'test'):
dg = DriveGroupSpec('test', DeviceSelection(paths=['']))
c = cephadm_module.create_osds(dg)
- assert self._wait(cephadm_module, c) == "Created osd(s) on host 'test'"
+ assert wait(cephadm_module, c) == "Created osd(s) on host 'test'"
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
json.dumps([
cephadm_module._cluster_fsid = "fsid"
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.remove_osds(['0'])
- out = self._wait(cephadm_module, c)
+ out = wait(cephadm_module, c)
assert out == ["Removed osd.0 from host 'test'"]
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
c = cephadm_module.add_mds(StatelessServiceSpec('name', placement=ps))
- [out] = self._wait(cephadm_module, c)
+ [out] = wait(cephadm_module, c)
assert "Deployed mds.name." in out
assert " on host 'test'" in out
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
c = cephadm_module.add_rgw(RGWSpec('realm', 'zone', placement=ps))
- [out] = self._wait(cephadm_module, c)
+ [out] = wait(cephadm_module, c)
assert "Deployed rgw.realm.zone." in out
assert " on host 'test'" in out
cephadm_module._cluster_fsid = "fsid"
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.remove_rgw('myrgw')
- out = self._wait(cephadm_module, c)
+ out = wait(cephadm_module, c)
assert out == ["Removed rgw.myrgw.foobar from host 'test'"]
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
c = cephadm_module.add_rbd_mirror(StatelessServiceSpec(name='name', placement=ps))
- [out] = self._wait(cephadm_module, c)
+ [out] = wait(cephadm_module, c)
assert "Deployed rbd-mirror." in out
assert " on host 'test'" in out
def test_blink_device_light(self, _send_command, _get_connection, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.blink_device_light('ident', True, [('test', '', '')])
- assert self._wait(cephadm_module, c) == ['Set ident light for test: on']
+ assert wait(cephadm_module, c) == ['Set ident light for test: on']
import pytest
-from orchestrator import raise_if_exception, Completion
-from .fixtures import cephadm_module
-from ..module import trivial_completion, async_completion, async_map_completion, CephadmOrchestrator
+from tests import mock
+from .fixtures import cephadm_module, wait
+from ..module import trivial_completion, async_completion, async_map_completion
class TestCompletion(object):
- def _wait(self, m, c):
- # type: (CephadmOrchestrator, Completion) -> Any
- m.process([c])
- m.process([c])
-
- for _ in range(30):
- if c.is_finished:
- raise_if_exception(c)
- return c.result
- time.sleep(0.1)
- assert False, "timeout" + str(c._state)
def test_trivial(self, cephadm_module):
@trivial_completion
def run(x):
return x+1
- assert self._wait(cephadm_module, run(1)) == 2
+ assert wait(cephadm_module, run(1)) == 2
@pytest.mark.parametrize("input", [
((1, ), ),
def run(*args):
return str(args)
- assert self._wait(cephadm_module, run(*input)) == str(input)
+ assert wait(cephadm_module, run(*input)) == str(input)
@pytest.mark.parametrize("input,expected", [
([], []),
return str(args)
c = run(input)
- self._wait(cephadm_module, c)
+ wait(cephadm_module, c)
assert c.result == expected
def test_async_self(self, cephadm_module):
assert self.attr == 1
return x + 1
- assert self._wait(cephadm_module, Run().run(1)) == 2
+ assert wait(cephadm_module, Run().run(1)) == 2
@pytest.mark.parametrize("input,expected", [
([], []),
return str(args)
c = Run().run(input)
- self._wait(cephadm_module, c)
+ wait(cephadm_module, c)
assert c.result == expected
def test_then1(self, cephadm_module):
def run(x):
return x+1
- assert self._wait(cephadm_module, run([1,2]).then(str)) == '[2, 3]'
+ assert wait(cephadm_module, run([1,2]).then(str)) == '[2, 3]'
def test_then2(self, cephadm_module):
@async_map_completion
c = run([1,2]).then(async_str)
- self._wait(cephadm_module, c)
+ wait(cephadm_module, c)
assert c.result == '[2, 3]'
def test_then3(self, cephadm_module):
c = run([1,2]).then(async_str)
- self._wait(cephadm_module, c)
+ wait(cephadm_module, c)
assert c.result == '[2, 3]'
def test_then4(self, cephadm_module):
c = run([1,2]).then(async_str)
- self._wait(cephadm_module, c)
+ wait(cephadm_module, c)
assert c.result == '[2, 3]hello'
@pytest.mark.skip(reason="see limitation of async_map_completion")
c = run([1,2])
- self._wait(cephadm_module, c)
+ wait(cephadm_module, c)
assert c.result == "['2', '3']"
def test_raise(self, cephadm_module):
raise ZeroDivisionError()
with pytest.raises(ZeroDivisionError):
- self._wait(cephadm_module, run(1))
+ wait(cephadm_module, run(1))
+
+ def test_progress(self, cephadm_module):
+ @async_map_completion
+ def run(*args):
+ return str(args)
+
+ c = run(list(range(2)))
+ c.update_progress = True
+ c.add_progress(
+ mgr=cephadm_module,
+ message="my progress"
+ )
+ wait(cephadm_module, c)
+ assert c.result == [str((x,)) for x in range(2)]
+ assert cephadm_module.remote.mock_calls == [
+ mock.call('progress', 'update', mock.ANY, 'my progress', float(i) / 2, [('origin', 'orchestrator')])
+ for i in range(2+1)] + [
+ mock.call('progress', 'update', mock.ANY, 'my progress', 1.0, [('origin', 'orchestrator')]),
+ mock.call('progress', 'complete', mock.ANY),
+ ]
:param value: new value.
"""
- assert self._state in (self.INITIALIZED, self.RUNNING)
+ if self._state not in (self.INITIALIZED, self.RUNNING):
+ raise ValueError('finalize: {} already finished. {}'.format(repr(self), value))
self._state = self.RUNNING
def __call__(self, arg):
self._completion_has_result = True
- if self.progress == 0.0:
- self.progress = 0.5
+ self.progress = 1.0
return arg
@property
@progress.setter
def progress(self, progress):
+ assert progress <= 1.0
self._progress = progress
try:
if self.effective:
mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', 0.0, [('origin', 'orchestrator')])
c.finalize()
- mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', 0.5, [('origin', 'orchestrator')])
+ mgr.remote.assert_called_with('progress', 'complete', c.progress_reference.progress_id)
c.progress_reference.update()
mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', progress_val, [('origin', 'orchestrator')])