value=orchestrator._Promise.NO_RESULT, # type: Any
on_complete=None, # type: Optional[Callable]
name=None, # type: Optional[str]
+ many=False, # type: bool
update_progress=False, # type: bool
):
assert CephadmOrchestrator.instance is not None
+ self.many = many
self.update_progress = update_progress
if name is None and on_complete is not None:
name = getattr(on_complete, '__name__', None)
assert self._on_complete_ is not None
try:
res = self._on_complete_(*args, **kwargs)
+ if self.update_progress and self.many:
+ assert self.progress_reference
+ self.progress_reference.progress += 1.0 / len(value)
return res
except Exception as e:
self.fail(e)
raise
assert CephadmOrchestrator.instance
- CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
- callback=callback, error_callback=error_callback)
+ if self.many:
+ if not value:
+ logger.info('calling map_async without values')
+ callback([])
+ if six.PY3:
+ CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
+ callback=callback,
+ error_callback=error_callback)
+ else:
+ CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
+ callback=callback)
+ else:
+ if six.PY3:
+ CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
+ callback=callback, error_callback=error_callback)
+ else:
+ CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
+ callback=callback)
return self.ASYNC_RESULT
return run
self._on_complete_ = inner
-def forall_hosts(f):
- @wraps(f)
- def forall_hosts_wrapper(*args) -> list:
-
- # Some weired logic to make calling functions with multiple arguments work.
- if len(args) == 1:
- vals = args[0]
- self = None
- elif len(args) == 2:
- self, vals = args
- else:
- assert 'either f([...]) or self.f([...])'
+def ssh_completion(cls=AsyncCompletion, **c_kwargs):
+ # type: (Type[orchestrator.Completion], Any) -> Callable
+ """
+ See ./HACKING.rst for a how-to
+ """
+ def decorator(f):
+ @wraps(f)
+ def wrapper(*args):
+
+ name = f.__name__
+ many = c_kwargs.get('many', False)
+
+ # Some weired logic to make calling functions with multiple arguments work.
+ if len(args) == 1:
+ [value] = args
+ if many and value and isinstance(value[0], tuple):
+ return cls(on_complete=lambda x: f(*x), value=value, name=name, **c_kwargs)
+ else:
+ return cls(on_complete=f, value=value, name=name, **c_kwargs)
+ else:
+ if many:
+ self, value = args
- def do_work(arg):
- if not isinstance(arg, tuple):
- arg = (arg, )
- try:
- if self:
- return f(self, *arg)
- return f(*arg)
- except Exception as e:
- logger.exception(f'executing {f.__name__}({args}) failed.')
- raise
+ def call_self(inner_args):
+ if not isinstance(inner_args, tuple):
+ inner_args = (inner_args, )
+ return f(self, *inner_args)
- assert CephadmOrchestrator.instance is not None
- return CephadmOrchestrator.instance._worker_pool.map(do_work, vals)
+ return cls(on_complete=call_self, value=value, name=name, **c_kwargs)
+ else:
+ return cls(on_complete=lambda x: f(*x), value=args, name=name, **c_kwargs)
+
+ return wrapper
+ return decorator
+
+
+def async_completion(f):
+ # type: (Callable) -> Callable[..., AsyncCompletion]
+ """
+ See ./HACKING.rst for a how-to
+
+ :param f: wrapped function
+ """
+ return ssh_completion()(f)
+
+
+def async_map_completion(f):
+ # type: (Callable) -> Callable[..., AsyncCompletion]
+ """
+ See ./HACKING.rst for a how-to
+ :param f: wrapped function
- return forall_hosts_wrapper
+ kind of similar to
+
+ >>> def sync_map(f):
+ ... return lambda x: map(f, x)
+
+ """
+ return ssh_completion(many=True)(f)
def trivial_completion(f):
r.append(h)
return r
- @trivial_completion
+ @async_completion
def add_host(self, spec):
# type: (HostSpec) -> str
"""
self.log.info('Added host %s' % spec.hostname)
return "Added host '{}'".format(spec.hostname)
- @trivial_completion
+ @async_completion
def remove_host(self, host):
# type: (str) -> str
"""
self.log.info('Removed host %s' % host)
return "Removed host '{}'".format(host)
- @trivial_completion
+ @async_completion
def update_host_addr(self, host, addr):
if host not in self.inventory:
raise OrchestratorError('host %s not registered' % host)
))
return r
- @trivial_completion
+ @async_completion
def add_host_label(self, host, label):
if host not in self.inventory:
raise OrchestratorError('host %s does not exist' % host)
self.log.info('Added label %s to host %s' % (label, host))
return 'Added label %s to host %s' % (label, host)
- @trivial_completion
+ @async_completion
def remove_host_label(self, host, label):
if host not in self.inventory:
raise OrchestratorError('host %s does not exist' % host)
result.append(dd)
return result
- @trivial_completion
def service_action(self, action, service_name):
args = []
for host, dm in self.cache.daemons.items():
self.log.info('%s service %s' % (action.capitalize(), service_name))
return self._daemon_actions(args)
- @forall_hosts
+ @async_map_completion
def _daemon_actions(self, daemon_type, daemon_id, host, action):
return self._daemon_action(daemon_type, daemon_id, host, action)
self.cache.invalidate_host_daemons(host)
return "{} {} from host '{}'".format(action, name, host)
- @trivial_completion
def daemon_action(self, action, daemon_type, daemon_id):
args = []
for host, dm in self.cache.daemons.items():
','.join(['%s.%s' % (a[0], a[1]) for a in args])))
return self._daemon_actions(args)
- @trivial_completion
def remove_daemons(self, names):
# type: (List[str]) -> orchestrator.Completion
args = []
raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
return '\n'.join(out + err)
- @trivial_completion
def blink_device_light(self, ident_fault, on, locs):
- @forall_hosts
+ @async_map_completion
def blink(host, dev, path):
cmd = [
'lsmcli',
return "{} {} on host '{}'".format(
'Reconfigured' if reconfig else 'Deployed', name, host)
- @forall_hosts
+ @async_map_completion
def _remove_daemons(self, name, host):
return self._remove_daemon(name, host)
)
daemons.append(sd)
- @forall_hosts
+ @async_map_completion
def create_func_map(*args):
return create_func(*args)
keyring=keyring,
extra_config=extra_config)
- @trivial_completion
def add_mon(self, spec):
# type: (ServiceSpec) -> orchestrator.Completion
return self._add_daemon('mon', spec, self._create_mon)
return self._create_daemon('mgr', mgr_id, host, keyring=keyring)
- @trivial_completion
def add_mgr(self, spec):
# type: (ServiceSpec) -> orchestrator.Completion
return self._add_daemon('mgr', spec, self._create_mgr)
def apply_mgr(self, spec):
return self._apply(spec)
- @trivial_completion
def add_mds(self, spec: ServiceSpec):
return self._add_daemon('mds', spec, self._create_mds, self._config_mds)
})
return self._create_daemon('mds', mds_id, host, keyring=keyring)
- @trivial_completion
def add_rgw(self, spec):
return self._add_daemon('rgw', spec, self._create_rgw, self._config_rgw)
def apply_rgw(self, spec):
return self._apply(spec)
- @trivial_completion
def add_rbd_mirror(self, spec):
return self._add_daemon('rbd-mirror', spec, self._create_rbd_mirror)
"peers": peers
}, sorted(deps)
- @trivial_completion
def add_prometheus(self, spec):
return self._add_daemon('prometheus', spec, self._create_prometheus)
def apply_prometheus(self, spec):
return self._apply(spec)
- @trivial_completion
def add_node_exporter(self, spec):
# type: (ServiceSpec) -> AsyncCompletion
return self._add_daemon('node-exporter', spec,
def _create_node_exporter(self, daemon_id, host):
return self._create_daemon('node-exporter', daemon_id, host)
- @trivial_completion
def add_crash(self, spec):
# type: (ServiceSpec) -> AsyncCompletion
return self._add_daemon('crash', spec,
})
return self._create_daemon('crash', daemon_id, host, keyring=keyring)
- @trivial_completion
def add_grafana(self, spec):
# type: (ServiceSpec) -> AsyncCompletion
return self._add_daemon('grafana', spec, self._create_grafana)
# type: (str, str) -> str
return self._create_daemon('grafana', daemon_id, host)
- @trivial_completion
def add_alertmanager(self, spec):
# type: (ServiceSpec) -> AsyncCompletion
return self._add_daemon('alertmanager', spec, self._create_alertmanager)
from tests import mock
from .fixtures import cephadm_module, wait
-from ..module import trivial_completion, forall_hosts
+from ..module import trivial_completion, async_completion, async_map_completion
class TestCompletion(object):
return x+1
assert wait(cephadm_module, run(1)) == 2
+ @pytest.mark.parametrize("input", [
+ ((1, ), ),
+ ((1, 2), ),
+ (("hallo", ), ),
+ (("hallo", "foo"), ),
+ ])
+ def test_async(self, input, cephadm_module):
+ @async_completion
+ def run(*args):
+ return str(args)
+
+ assert wait(cephadm_module, run(*input)) == str(input)
+
@pytest.mark.parametrize("input,expected", [
([], []),
([1], ["(1,)"]),
([(1, 2), (3, 4)], ["(1, 2)", "(3, 4)"]),
])
def test_async_map(self, input, expected, cephadm_module):
- @forall_hosts
- def run_forall(*args):
+ @async_map_completion
+ def run(*args):
return str(args)
- assert run_forall(input) == expected
+ c = run(input)
+ wait(cephadm_module, c)
+ assert c.result == expected
+
+ def test_async_self(self, cephadm_module):
+ class Run(object):
+ def __init__(self):
+ self.attr = 1
+
+ @async_completion
+ def run(self, x):
+ assert self.attr == 1
+ return x + 1
+
+ assert wait(cephadm_module, Run().run(1)) == 2
@pytest.mark.parametrize("input,expected", [
([], []),
def __init__(self):
self.attr = 1
- @forall_hosts
- def run_forall(self, *args):
+ @async_map_completion
+ def run(self, *args):
assert self.attr == 1
return str(args)
- assert Run().run_forall(input) == expected
+ c = Run().run(input)
+ wait(cephadm_module, c)
+ assert c.result == expected
+
+ def test_then1(self, cephadm_module):
+ @async_map_completion
+ def run(x):
+ return x+1
+
+ assert wait(cephadm_module, run([1,2]).then(str)) == '[2, 3]'
+
+ def test_then2(self, cephadm_module):
+ @async_map_completion
+ def run(x):
+ time.sleep(0.1)
+ return x+1
+
+ @async_completion
+ def async_str(results):
+ return str(results)
+
+ c = run([1,2]).then(async_str)
+
+ wait(cephadm_module, c)
+ assert c.result == '[2, 3]'
+
+ def test_then3(self, cephadm_module):
+ @async_map_completion
+ def run(x):
+ time.sleep(0.1)
+ return x+1
+
+ def async_str(results):
+ return async_completion(str)(results)
+
+ c = run([1,2]).then(async_str)
+
+ wait(cephadm_module, c)
+ assert c.result == '[2, 3]'
+
+ def test_then4(self, cephadm_module):
+ @async_map_completion
+ def run(x):
+ time.sleep(0.1)
+ return x+1
+
+ def async_str(results):
+ return async_completion(str)(results).then(lambda x: x + "hello")
+
+ c = run([1,2]).then(async_str)
+
+ wait(cephadm_module, c)
+ assert c.result == '[2, 3]hello'
+
+ @pytest.mark.skip(reason="see limitation of async_map_completion")
+ def test_then5(self, cephadm_module):
+ @async_map_completion
+ def run(x):
+ time.sleep(0.1)
+ return async_completion(str)(x+1)
+
+ c = run([1,2])
+
+ wait(cephadm_module, c)
+ assert c.result == "['2', '3']"
+
+ def test_raise(self, cephadm_module):
+ @async_completion
+ def run(x):
+ raise ZeroDivisionError()
+
+ with pytest.raises(ZeroDivisionError):
+ wait(cephadm_module, run(1))
+
+ def test_progress(self, cephadm_module):
+ @async_map_completion
+ def run(*args):
+ return str(args)
+
+ c = run(list(range(2)))
+ c.update_progress = True
+ c.add_progress(
+ mgr=cephadm_module,
+ message="my progress"
+ )
+ wait(cephadm_module, c)
+ assert c.result == [str((x,)) for x in range(2)]
+ assert cephadm_module.remote.mock_calls == [
+ mock.call('progress', 'update', mock.ANY, 'my progress', float(i) / 2, [('origin', 'orchestrator')])
+ for i in range(2+1)] + [
+ mock.call('progress', 'update', mock.ANY, 'my progress', 1.0, [('origin', 'orchestrator')]),
+ mock.call('progress', 'complete', mock.ANY),
+ ]