node_name=node_name)
return orchestrator.TrivialReadCompletion(result)
+ def service_action(self, action, service_type,
+ service_name=None,
+ service_id=None):
+ self.log.debug('service_action action %s type %s name %s id %s' % (
+ action, service_type, service_name, service_id))
+ if action == 'reload':
+ return orchestrator.TrivialReadCompletion(
+ ["Reload is a no-op"])
+ daemons = self._get_services(
+ service_type,
+ service_name=service_name,
+ service_id=service_id)
+ results = []
+ for d in daemons:
+ results.append(self._worker_pool.apply_async(
+ self._service_action, (d.service_type, d.service_instance,
+ d.nodename, action)))
+ if not results:
+ n = service_name
+ if n:
+ n += '-*'
+ raise OrchestratorError('Unable to find %s.%s%s daemon(s)' % (
+ service_type, service_id, n))
+ return SSHWriteCompletion(results)
+
+ def _service_action(self, service_type, service_id, host, action):
+ actions = {
+ 'start': ['reset-failed', 'start'],
+ 'stop': ['stop'],
+ }
+ name = '%s.%s' % (service_type, service_id)
+ for a in actions[action]:
+ out, code = self._run_ceph_daemon(
+ host, name, 'unit',
+ ['--name', name, a])
+ self.log.debug('_service_action code %s out %s' % (code, out))
+ return "{} {} from host '{}'".format(action, name, host)
+
+
def get_inventory(self, node_filter=None, refresh=False):
"""
Return the storage inventory of nodes matching the given filter.