return wrapper
return decorator
+def forall_hosts(f):
+ @wraps(f)
+ def forall_hosts_wrapper(*args) -> list:
+
+ # Some weired logic to make calling functions with multiple arguments work.
+ if len(args) == 1:
+ vals = args[0]
+ self = None
+ elif len(args) == 2:
+ self, vals = args
+ else:
+ assert 'either f([...]) or self.f([...])'
+
+ def do_work(arg):
+ if not isinstance(arg, tuple):
+ arg = (arg, )
+ try:
+ if self:
+ return f(self, *arg)
+ return f(*arg)
+ except Exception as e:
+ logger.exception(f'executing {f.__name__}({args}) failed.')
+ raise
+
+ assert CephadmOrchestrator.instance is not None
+ return CephadmOrchestrator.instance._worker_pool.map(do_work, vals)
+
+
+ return forall_hosts_wrapper
+
def async_completion(f):
# type: (Callable) -> Callable[..., AsyncCompletion]
result.append(dd)
return result
+ @trivial_completion
def service_action(self, action, service_name):
args = []
for host, dm in self.cache.daemons.items():
self.log.info('%s service %s' % (action.capitalize(), service_name))
return self._daemon_actions(args)
- @async_map_completion
+ @forall_hosts
def _daemon_actions(self, daemon_type, daemon_id, host, action):
return self._daemon_action(daemon_type, daemon_id, host, action)
','.join(['%s.%s' % (a[0], a[1]) for a in args])))
return self._daemon_actions(args)
+ @trivial_completion
def remove_daemons(self, names):
# type: (List[str]) -> orchestrator.Completion
args = []
raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
return '\n'.join(out + err)
+ @trivial_completion
def blink_device_light(self, ident_fault, on, locs):
- @async_map_completion
+ @forall_hosts
def blink(host, dev, path):
cmd = [
'lsmcli',
return "{} {} on host '{}'".format(
'Reconfigured' if reconfig else 'Deployed', name, host)
- @async_map_completion
+ @forall_hosts
def _remove_daemons(self, name, host):
return self._remove_daemon(name, host)
)
daemons.append(sd)
- @async_map_completion
+ @forall_hosts
def create_func_map(*args):
return create_func(*args)
keyring=keyring,
extra_config=extra_config)
+ @trivial_completion
def add_mon(self, spec):
# type: (ServiceSpec) -> orchestrator.Completion
return self._add_daemon('mon', spec, self._create_mon)
return self._create_daemon('mgr', mgr_id, host, keyring=keyring)
+ @trivial_completion
def add_mgr(self, spec):
# type: (ServiceSpec) -> orchestrator.Completion
return self._add_daemon('mgr', spec, self._create_mgr)
def apply_mgr(self, spec):
return self._apply(spec)
+ @trivial_completion
def add_mds(self, spec: ServiceSpec):
return self._add_daemon('mds', spec, self._create_mds, self._config_mds)
})
return self._create_daemon('mds', mds_id, host, keyring=keyring)
+ @trivial_completion
def add_rgw(self, spec):
return self._add_daemon('rgw', spec, self._create_rgw, self._config_rgw)
def apply_rgw(self, spec):
return self._apply(spec)
+ @trivial_completion
def add_rbd_mirror(self, spec):
return self._add_daemon('rbd-mirror', spec, self._create_rbd_mirror)
"peers": peers
}, sorted(deps)
+ @trivial_completion
def add_prometheus(self, spec):
return self._add_daemon('prometheus', spec, self._create_prometheus)
def apply_prometheus(self, spec):
return self._apply(spec)
+ @trivial_completion
def add_node_exporter(self, spec):
# type: (ServiceSpec) -> AsyncCompletion
return self._add_daemon('node-exporter', spec,
def _create_node_exporter(self, daemon_id, host):
return self._create_daemon('node-exporter', daemon_id, host)
+ @trivial_completion
def add_crash(self, spec):
# type: (ServiceSpec) -> AsyncCompletion
return self._add_daemon('crash', spec,
})
return self._create_daemon('crash', daemon_id, host, keyring=keyring)
+ @trivial_completion
def add_grafana(self, spec):
# type: (ServiceSpec) -> AsyncCompletion
return self._add_daemon('grafana', spec, self._create_grafana)
# type: (str, str) -> str
return self._create_daemon('grafana', daemon_id, host)
+ @trivial_completion
def add_alertmanager(self, spec):
# type: (ServiceSpec) -> AsyncCompletion
return self._add_daemon('alertmanager', spec, self._create_alertmanager)