self._remove_osds_bg()
- service_completions = self._apply_services()
+ service_completions = self._apply_all_services()
for service_completion in service_completions:
if service_completion:
while not service_completion.has_result:
self.cache.invalidate_host_daemons(host)
return "Removed {} from host '{}'".format(name, host)
- def _apply_service(self, spec, create_func, config_func=None):
+ def _apply_service(self, spec):
"""
Schedule a service. Deploy new daemons or remove old ones, depending
on the target label and count specified in the placement.
"""
daemon_type = spec.service_type
+ create_fns = {
+ 'mon': self._create_mon,
+ 'mgr': self._create_mgr,
+ 'mds': self._create_mds,
+ 'rgw': self._create_rgw,
+ 'rbd-mirror': self._create_rbd_mirror,
+ 'grafana': self._create_grafana,
+ 'alertmanager': self._create_alertmanager,
+ 'prometheus': self._create_prometheus,
+ 'node-exporter': self._create_node_exporter,
+ }
+ config_fns = {
+ 'mds': self._config_mds,
+ 'rgw': self._config_rgw,
+ }
+ create_func = create_fns.get(daemon_type, None)
+ if not create_func:
+ self.log.debug('unrecognized service type %s' % daemon_type)
+ return trivial_result([])
+ config_func = config_fns.get(daemon_type, None)
+
service_name = spec.service_name()
self.log.debug('Applying service %s spec' % service_name)
daemons = self.cache.get_daemons_by_service(service_name)
create_func, config_func)
return trivial_result([])
+ def _apply_all_services(self):
+ r : List[orchestrator.Completion] = []
+ for sn, spec in self.spec_store.specs.items():
+ try:
+ r.extend(self._apply_service(spec))
+ except Exception as e:
+ self.log.warning('Failed to apply %s spec %s: %s' % (
+ spec.service_name(), spec, e))
+ return r
+
def _add_daemon(self, daemon_type, spec,
create_func, config_func=None):
"""
def apply_mgr(self, spec):
return self._apply(spec)
- def _apply_mgr(self, spec):
- # type: (orchestrator.ServiceSpec) -> AsyncCompletion
- return self._apply_service(spec, self._create_mgr)
-
def add_mds(self, spec):
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
return self._add_daemon('mds', spec, self._create_mds, self._config_mds)
def apply_mds(self, spec: orchestrator.ServiceSpec) -> orchestrator.Completion:
return self._apply(spec)
- def _apply_mds(self, spec):
- # type: (orchestrator.ServiceSpec) -> AsyncCompletion
- return self._apply_service(spec, self._create_mds,
- self._config_mds)
-
def _config_mds(self, spec):
# ensure mds_join_fs is set for these daemons
assert spec.name
def apply_rgw(self, spec):
return self._apply(spec)
- def _apply_rgw(self, spec):
- # type: (orchestrator.ServiceSpec) -> AsyncCompletion
- return self._apply_service(spec, self._create_rgw,
- self._config_rgw)
-
def add_rbd_mirror(self, spec):
return self._add_daemon('rbd-mirror', spec, self._create_rbd_mirror)
def apply_rbd_mirror(self, spec):
return self._apply(spec)
- def _apply_rbd_mirror(self, spec):
- # type: (orchestrator.ServiceSpec) -> AsyncCompletion
- return self._apply_service(spec, self._create_rbd_mirror)
-
def _generate_prometheus_config(self):
# scrape mgrs
mgr_scrape_list = []
def _create_prometheus(self, daemon_id, host):
return self._create_daemon('prometheus', daemon_id, host)
- def _apply_prometheus(self, spec):
- # type: (orchestrator.ServiceSpec) -> AsyncCompletion
- return self._apply_service(spec, self._create_prometheus)
-
def apply_prometheus(self, spec):
return self._apply(spec)
def apply_node_exporter(self, spec):
return self._apply(spec)
- def _apply_node_exporter(self, spec):
- # type: (orchestrator.ServiceSpec) -> AsyncCompletion
- return self._apply_service(spec,
- self._create_node_exporter)
-
@async_map_completion
def _create_node_exporter(self, daemon_id, host):
return self._create_daemon('node-exporter', daemon_id, host)
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
return self._apply(spec)
- def _apply_grafana(self, spec):
- return self._apply_service(spec, self._create_grafana)
-
@async_map_completion
def _create_grafana(self, daemon_id, host):
return self._create_daemon('grafana', daemon_id, host)
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
return self._apply(spec)
- def _apply_alertmanager(self, spec):
- # type: (orchestrator.ServiceSpec) -> AsyncCompletion
- return self._apply_service(spec, self._create_alertmanager)
-
@async_map_completion
def _create_alertmanager(self, daemon_id, host):
return self._create_daemon('alertmanager', daemon_id, host)
self._kick_serve_loop()
return trivial_result("ServiceSpecs saved")
- def trigger_deployment(self,
- service_name: str,
- func: Callable[[ServiceSpec], orchestrator.Completion]) -> List[orchestrator.Completion]:
- """
- Triggers a corresponding deployment method `func` to `service_name`
- Services can have multiple entries. (i.e. different RGW configurations)
- """
- self.log.debug(f"starting async {service_name} deployment")
- specs = self.spec_store.find(service_name)
- completions = list()
- for spec in specs:
- try:
- completions.append(func(spec))
- except Exception as e:
- self.log.warning('Failed to apply %s spec %s: %s' % (
- service_name, spec, e))
- if completions:
- return completions
- return [trivial_result("Nothing to do..")]
-
- def _apply_services(self) -> List[orchestrator.Completion]:
- """
- This is a method that is supposed to run continuously in the
- server() thread.
- It will initiate deployments based on the presence of a ServiceSpec
- in the persistent mon_store.
- There is a defined order in which the services should be deployed
- Defined order:
- # mon -> mgr -> osd -> monitoring -> mds -> rgw -> nfs -> iscsi -> rbd-mirror
-
- Special cases:
- * Mons scaling is currently not implemented.
- * OSDs are daemons that are handled differently and may not fit in this paradigm
-
- The serve() thread processes the completions serially, which ensures the adherence to
- the defined order.
- """
-
- super_completions: List[orchestrator.Completion] = list()
- super_completions.extend(self.trigger_deployment('mgr', self._apply_mgr))
- super_completions.extend(self.trigger_deployment('prometheus', self._apply_prometheus))
- super_completions.extend(self.trigger_deployment('node-exporter', self._apply_node_exporter))
- super_completions.extend(self.trigger_deployment('mds', self._apply_mds))
- super_completions.extend(self.trigger_deployment('rgw', self._apply_rgw))
- super_completions.extend(self.trigger_deployment('rbd-mirror', self._apply_rbd_mirror))
- super_completions.extend(self.trigger_deployment('grafana', self._apply_grafana))
- super_completions.extend(self.trigger_deployment('alertmanager', self._apply_alertmanager))
-
- # Not implemented
-
- # super_completions.extend(trigger_deployment('mon', self._apply_mon))
- # super_completions.extend(trigger_deployment('nfs', self._apply_nfs))
- # super_completions.extend(trigger_deployment('grafana', self._apply_grafana))
- # super_completions.extend(trigger_deployment('iscsi', self._apply_iscsi))
-
- # Not implemented
- return super_completions
-
class BaseScheduler(object):
"""
def test_mgr_update(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
- c = cephadm_module._apply_mgr(ServiceSpec(placement=ps, service_type='mgr'))
+ c = cephadm_module._apply_service(ServiceSpec(placement=ps, service_type='mgr'))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed mgr.* on host 'test'")
match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'")
ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
- c = cephadm_module._apply_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw'))
+ c = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw'))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed rgw.realm.zone1.host2.* on host 'host2'")
with pytest.raises(OrchestratorError):
ps = PlacementSpec(hosts=['host1', 'host2'], count=3)
- c = cephadm_module._apply_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw'))
+ c = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw'))
[out] = wait(cephadm_module, c)
_sspec.from_json.assert_called_once()
assert wait(cephadm_module, c) == 'ServiceSpecs saved'
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
- @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
- @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
- @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
- @mock.patch("cephadm.module.HostCache.save_host")
- @mock.patch("cephadm.module.HostCache.rm_host")
- @mock.patch("cephadm.module.SpecStore.find")
- def test_trigger_deployment_todo(self, _find, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
- _find.return_value = ['something']
- c = cephadm_module.trigger_deployment('foo', lambda x: x)
- _find.assert_called_with('foo')
- assert c == ['something']
-
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
- @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
- @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
- @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
- @mock.patch("cephadm.module.HostCache.save_host")
- @mock.patch("cephadm.module.HostCache.rm_host")
- @mock.patch("cephadm.module.SpecStore.find")
- def test_trigger_deployment_no_todo(self, _find, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
- _find.return_value = []
- c = cephadm_module.trigger_deployment('foo', lambda x: x)
- _find.assert_called_with('foo')
- assert wait(cephadm_module, c[0]) == 'Nothing to do..'
-
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
- @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
- @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
- @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
- @mock.patch("cephadm.module.HostCache.save_host")
- @mock.patch("cephadm.module.HostCache.rm_host")
- @mock.patch("cephadm.module.CephadmOrchestrator.trigger_deployment")
- def test_apply_services(self, _trigger_deployment, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
- c = cephadm_module._apply_services()
- _trigger_deployment.assert_any_call('mgr', cephadm_module._apply_mgr)
- _trigger_deployment.assert_any_call('prometheus', cephadm_module._apply_prometheus)
- _trigger_deployment.assert_any_call('node-exporter', cephadm_module._apply_node_exporter)
- _trigger_deployment.assert_any_call('mds', cephadm_module._apply_mds)
- _trigger_deployment.assert_any_call('rgw', cephadm_module._apply_rgw)
- _trigger_deployment.assert_any_call('rbd-mirror', cephadm_module._apply_rbd_mirror)
- assert isinstance(c, list)