value=orchestrator._Promise.NO_RESULT, # type: Any
on_complete=None, # type: Optional[Callable]
name=None, # type: Optional[str]
- many=False, # type: bool
update_progress=False, # type: bool
):
assert CephadmOrchestrator.instance is not None
- self.many = many
self.update_progress = update_progress
if name is None and on_complete is not None:
name = getattr(on_complete, '__name__', None)
assert self._on_complete_ is not None
try:
res = self._on_complete_(*args, **kwargs)
- if self.update_progress and self.many:
- assert self.progress_reference
- self.progress_reference.progress += 1.0 / len(value)
return res
except Exception as e:
self.fail(e)
raise
assert CephadmOrchestrator.instance
- if self.many:
- if not value:
- logger.info('calling map_async without values')
- callback([])
- if six.PY3:
- CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
- callback=callback,
- error_callback=error_callback)
- else:
- CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
- callback=callback)
- else:
- if six.PY3:
- CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
- callback=callback, error_callback=error_callback)
- else:
- CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
- callback=callback)
+ CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
+ callback=callback, error_callback=error_callback)
return self.ASYNC_RESULT
return run
self._on_complete_ = inner
-def ssh_completion(cls=AsyncCompletion, **c_kwargs):
- # type: (Type[orchestrator.Completion], Any) -> Callable
- """
- See ./HACKING.rst for a how-to
- """
- def decorator(f):
- @wraps(f)
- def wrapper(*args):
-
- name = f.__name__
- many = c_kwargs.get('many', False)
-
- # Some weired logic to make calling functions with multiple arguments work.
- if len(args) == 1:
- [value] = args
- if many and value and isinstance(value[0], tuple):
- return cls(on_complete=lambda x: f(*x), value=value, name=name, **c_kwargs)
- else:
- return cls(on_complete=f, value=value, name=name, **c_kwargs)
- else:
- if many:
- self, value = args
-
- def call_self(inner_args):
- if not isinstance(inner_args, tuple):
- inner_args = (inner_args, )
- return f(self, *inner_args)
-
- return cls(on_complete=call_self, value=value, name=name, **c_kwargs)
- else:
- return cls(on_complete=lambda x: f(*x), value=args, name=name, **c_kwargs)
-
- return wrapper
- return decorator
-
def forall_hosts(f):
@wraps(f)
def forall_hosts_wrapper(*args) -> list:
return forall_hosts_wrapper
-def async_completion(f):
- # type: (Callable) -> Callable[..., AsyncCompletion]
- """
- See ./HACKING.rst for a how-to
-
- :param f: wrapped function
- """
- return ssh_completion()(f)
-
-
-def async_map_completion(f):
- # type: (Callable) -> Callable[..., AsyncCompletion]
- """
- See ./HACKING.rst for a how-to
-
- :param f: wrapped function
-
- kind of similar to
-
- >>> def sync_map(f):
- ... return lambda x: map(f, x)
-
- """
- return ssh_completion(many=True)(f)
-
-
def trivial_completion(f):
# type: (Callable) -> Callable[..., orchestrator.Completion]
@wraps(f)
from tests import mock
from .fixtures import cephadm_module, wait
-from ..module import trivial_completion, async_completion, async_map_completion, forall_hosts
+from ..module import trivial_completion, forall_hosts
class TestCompletion(object):
return x+1
assert wait(cephadm_module, run(1)) == 2
- @pytest.mark.parametrize("input", [
- ((1, ), ),
- ((1, 2), ),
- (("hallo", ), ),
- (("hallo", "foo"), ),
- ])
- def test_async(self, input, cephadm_module):
- @async_completion
- def run(*args):
- return str(args)
-
- assert wait(cephadm_module, run(*input)) == str(input)
-
@pytest.mark.parametrize("input,expected", [
([], []),
([1], ["(1,)"]),
([(1, 2), (3, 4)], ["(1, 2)", "(3, 4)"]),
])
def test_async_map(self, input, expected, cephadm_module):
- @async_map_completion
- def run(*args):
- return str(args)
-
- c = run(input)
- wait(cephadm_module, c)
- assert c.result == expected
-
@forall_hosts
def run_forall(*args):
return str(args)
assert run_forall(input) == expected
- def test_async_self(self, cephadm_module):
- class Run(object):
- def __init__(self):
- self.attr = 1
-
- @async_completion
- def run(self, x):
- assert self.attr == 1
- return x + 1
-
- assert wait(cephadm_module, Run().run(1)) == 2
@pytest.mark.parametrize("input,expected", [
([], []),
def __init__(self):
self.attr = 1
- @async_map_completion
- def run(self, *args):
- assert self.attr == 1
- return str(args)
-
@forall_hosts
def run_forall(self, *args):
assert self.attr == 1
return str(args)
- c = Run().run(input)
- wait(cephadm_module, c)
- assert c.result == expected
-
assert Run().run_forall(input) == expected
-
- def test_then1(self, cephadm_module):
- @async_map_completion
- def run(x):
- return x+1
-
- assert wait(cephadm_module, run([1,2]).then(str)) == '[2, 3]'
-
- def test_then2(self, cephadm_module):
- @async_map_completion
- def run(x):
- time.sleep(0.1)
- return x+1
-
- @async_completion
- def async_str(results):
- return str(results)
-
- c = run([1,2]).then(async_str)
-
- wait(cephadm_module, c)
- assert c.result == '[2, 3]'
-
- def test_then3(self, cephadm_module):
- @async_map_completion
- def run(x):
- time.sleep(0.1)
- return x+1
-
- def async_str(results):
- return async_completion(str)(results)
-
- c = run([1,2]).then(async_str)
-
- wait(cephadm_module, c)
- assert c.result == '[2, 3]'
-
- def test_then4(self, cephadm_module):
- @async_map_completion
- def run(x):
- time.sleep(0.1)
- return x+1
-
- def async_str(results):
- return async_completion(str)(results).then(lambda x: x + "hello")
-
- c = run([1,2]).then(async_str)
-
- wait(cephadm_module, c)
- assert c.result == '[2, 3]hello'
-
- @pytest.mark.skip(reason="see limitation of async_map_completion")
- def test_then5(self, cephadm_module):
- @async_map_completion
- def run(x):
- time.sleep(0.1)
- return async_completion(str)(x+1)
-
- c = run([1,2])
-
- wait(cephadm_module, c)
- assert c.result == "['2', '3']"
-
- def test_raise(self, cephadm_module):
- @async_completion
- def run(x):
- raise ZeroDivisionError()
-
- with pytest.raises(ZeroDivisionError):
- wait(cephadm_module, run(1))
-
- def test_progress(self, cephadm_module):
- @async_map_completion
- def run(*args):
- return str(args)
-
- c = run(list(range(2)))
- c.update_progress = True
- c.add_progress(
- mgr=cephadm_module,
- message="my progress"
- )
- wait(cephadm_module, c)
- assert c.result == [str((x,)) for x in range(2)]
- assert cephadm_module.remote.mock_calls == [
- mock.call('progress', 'update', mock.ANY, 'my progress', float(i) / 2, [('origin', 'orchestrator')])
- for i in range(2+1)] + [
- mock.call('progress', 'update', mock.ANY, 'my progress', 1.0, [('origin', 'orchestrator')]),
- mock.call('progress', 'complete', mock.ANY),
- ]