def trivial_result(val):
+ # type: (Any) -> AsyncCompletion
return AsyncCompletion(value=val, name='trivial_result')
@async_completion
def remove_host(self, host):
+ # type: (str) -> str
"""
Remove a host from orchestrator management.
if daemon_type == 'prometheus':
j = self._generate_prometheus_config()
extra_args.extend(['--config-json', '-'])
+ elif daemon_type == 'node-exporter':
+ j = None
else:
# keyring
if not keyring:
spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='prometheus').load()
return self._update_service('prometheus', self.add_prometheus, spec)
+ def add_node_exporter(self, spec):
+ # type: (orchestrator.ServiceSpec) -> AsyncCompletion
+ # FIXME if no hosts are set (likewise no spec.count?) add node-exporter to all hosts!
+ if not spec.placement.hosts or len(spec.placement.hosts) < spec.count:
+ raise RuntimeError("must specify at least %d hosts" % spec.count)
+ return self._add_new_daemon('node-exporter', spec, self._create_node_exporter)
+
+ def apply_node_exporter(self, spec):
+ spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='node-exporter').load()
+ return self._update_service('node-exporter', self.add_node_exporter, spec)
+
+ @async_map_completion
+ def _create_node_exporter(self, daemon_id, host):
+ return self._create_daemon('node-exporter', daemon_id, host)
+
def _get_container_image_id(self, image_name):
# pick a random host...
host = None
# Leaving this open for the next iteration
# NOTE: This currently queries for all hosts without label restriction
if self.spec.placement.label:
- logger.info("Found labels. Assinging nodes that match the label")
+ logger.info("Found labels. Assigning nodes that match the label")
candidates = [HostPlacementSpec(x[0], '', '') for x in self.get_hosts_func()] # TODO: query for labels
logger.info('Assigning nodes to spec: {}'.format(candidates))
self.spec.placement.set_hosts(candidates)
ServiceSpec, PlacementSpec, RGWSpec, HostSpec, OrchestratorError
from tests import mock
from .fixtures import cephadm_module, wait
+from cephadm.module import CephadmOrchestrator
"""
@contextmanager
def _with_host(self, m, name):
+ # type: (CephadmOrchestrator, str) -> None
wait(m, m.add_host(HostSpec(hostname=name)))
yield
wait(m, m.remove_host(name))
def test_get_unique_name(self, cephadm_module):
+ # type: (CephadmOrchestrator) -> None
existing = [
DaemonDescription(daemon_type='mon', daemon_id='a')
]
@mock.patch("cephadm.module.DaemonCache.save_host")
@mock.patch("cephadm.module.DaemonCache.rm_host")
def test_rbd_mirror(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
+ # type: (mock.Mock, mock.Mock, mock.Mock, mock.Mock, CephadmOrchestrator) -> None
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
c = cephadm_module.add_rbd_mirror(ServiceSpec(name='name', placement=ps))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed prometheus.* on host 'test'")
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
+ @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
+ @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
+ @mock.patch("cephadm.module.DaemonCache.save_host")
+ @mock.patch("cephadm.module.DaemonCache.rm_host")
+ def test_node_exporter(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
+ # type: (mock.Mock, mock.Mock, mock.Mock, mock.Mock, CephadmOrchestrator) -> None
+ with self._with_host(cephadm_module, 'test'):
+ ps = PlacementSpec(hosts=['test'], count=1)
+
+ c = cephadm_module.add_node_exporter(ServiceSpec(placement=ps))
+ [out] = wait(cephadm_module, c)
+ match_glob(out, "Deployed node-exporter.* on host 'test'")
+
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
"""Update prometheus cluster"""
raise NotImplementedError()
+ def add_node_exporter(self, spec):
+ # type: (ServiceSpec) -> Completion
+ """Create a new Node-Exporter service"""
+ raise NotImplementedError()
+
+ def apply_node_exporter(self, spec):
+ # type: (ServiceSpec) -> Completion
+ """Update existing a Node-Exporter daemon(s)"""
+ raise NotImplementedError()
+
def upgrade_check(self, image, version):
# type: (Optional[str], Optional[str]) -> Completion
raise NotImplementedError()
@_cli_read_command(
'orch ps',
"name=host,type=CephString,req=false "
- "name=daemon_type,type=CephChoices,strings=mon|mgr|osd|mds|iscsi|nfs|rgw|rbd-mirror,req=false "
+ "name=daemon_type,type=CephString,req=false "
"name=daemon_id,type=CephString,req=false "
"name=format,type=CephChoices,strings=json|plain,req=false "
"name=refresh,type=CephBool,req=false",
self._orchestrator_wait([completion])
return HandleCommandResult(stdout=completion.result_str())
+ @_cli_write_command(
+ 'orch daemon add node-exporter',
+ 'name=num,type=CephInt,req=false '
+ 'name=hosts,type=CephString,n=N,req=false '
+ 'name=label,type=CephString,req=false',
+ 'Add node-exporter daemon(s)')
+ def _daemon_add_node_exporter(self, num=None, label=None, hosts=[]):
+ # type: (Optional[int], Optional[str], List[str]) -> HandleCommandResult
+ spec = ServiceSpec(
+ placement=PlacementSpec(label=label, hosts=hosts, count=num),
+ )
+ completion = self.add_node_exporter(spec)
+ self._orchestrator_wait([completion])
+ return HandleCommandResult(stdout=completion.result_str())
+
@_cli_write_command(
'orch',
"name=action,type=CephChoices,strings=start|stop|restart|redeploy|reconfig "
self._orchestrator_wait([completion])
return HandleCommandResult(stdout=completion.result_str())
+ @_cli_write_command(
+ 'orch apply node-exporter',
+ 'name=num,type=CephInt,req=false '
+ 'name=hosts,type=CephString,n=N,req=false '
+ 'name=label,type=CephString,req=false',
+ 'Update node_exporter service')
+ def _apply_node_exporter(self, num=None, label=None, hosts=[]):
+ # type: (Optional[int], Optional[str], List[str]) -> HandleCommandResult
+ spec = ServiceSpec(
+ placement=PlacementSpec(label=label, hosts=hosts, count=num),
+ )
+ completion = self.apply_node_exporter(spec)
+ self._orchestrator_wait([completion])
+ return HandleCommandResult(stdout=completion.result_str())
+
@_cli_write_command(
'orch set backend',
"name=module_name,type=CephString,req=true",