while self.run:
self._check_hosts()
self._remove_osds_bg()
- completions = self._apply_services()
- for completion in completions:
- if completion:
- while not completion.has_result:
- self.process([completion])
- self.log.debug(f'Still processing {completion}')
- if completion.needs_result:
+ service_completions = self._apply_services()
+ for service_completion in service_completions:
+ if service_completion:
+ while not service_completion.has_result:
+ self.process([service_completion])
+ self.log.debug(f'Still processing {service_completion}')
+ if service_completion.needs_result:
time.sleep(1)
else:
break
- if completion.exception is not None:
- self.log.error(str(completion.exception))
+ if service_completion.exception is not None:
+ self.log.error(str(service_completion.exception))
# refresh daemons
self.log.debug('refreshing hosts')
self._check_for_strays()
if self.upgrade_state and not self.upgrade_state.get('paused'):
- completion = self._do_upgrade()
- if completion:
- while not completion.has_result:
- self.process([completion])
- if completion.needs_result:
+ upgrade_completion = self._do_upgrade()
+ if upgrade_completion:
+ while not upgrade_completion.has_result:
+ self.process([upgrade_completion])
+ if upgrade_completion.needs_result:
time.sleep(1)
else:
break
- if completion.exception is not None:
- self.log.error(str(completion.exception))
+ if upgrade_completion.exception is not None:
+ self.log.error(str(upgrade_completion.exception))
+ self.log.debug('did _do_upgrade')
else:
self._serve_sleep()
self.log.debug("serve exit")
n: str = dd.service_name()
if service_name and service_name != n:
continue
- service_store = PersistentStoreDict(self, f'service_spec/{dd.daemon_type}')
try:
- _ = service_store[dd.service_name(without_service_type=True)]
+ _ = self.service_spec_store[dd.service_name()]
spec_presence = "present"
except IndexError:
spec_presence = "absent"
# TODO: bail out for OSDs when https://github.com/ceph/ceph/pull/32983 is merged
if dm[name].daemon_type == 'osd':
continue
- self._remove_key_from_store(dm[name].daemon_type,
- dm[name].service_name(without_service_type=True))
+ self._remove_key_from_store(dm[name].service_name())
if not args:
raise OrchestratorError('Unable to find daemon(s) %s' % (names))
self.log.info('Remove daemons %s' % [a[0] for a in args])
args.append(
(d.name(), d.hostname, force)
)
- self._remove_key_from_store(d.daemon_type, d.service_name(without_service_type=True))
+ self._remove_key_from_store(d.service_name())
if not args:
raise OrchestratorError('Unable to find daemons in %s service' % (
service_name))
service_name, [a[0] for a in args]))
return self._remove_daemon(args)
- def _remove_key_from_store(self, service_type: str, name: str) -> None:
- self.log.debug(f"Removing {name} from the service_spec store")
- try:
- store = PersistentStoreDict(self, f'service_spec/{service_type}')
- del store[name]
- except IndexError:
- self.log.debug(f"{service_type}/{name} not found in store. Can't remove.")
+ def _remove_key_from_store(self, spec_name: str) -> None:
+ self.log.debug(f"Removing {spec_name} from the service_spec store")
+ del self.service_spec_store[spec_name]
def get_inventory(self, host_filter=None, refresh=False):
"""
raise OrchestratorError('must specify host(s) to deploy on')
if not spec.placement.count:
spec.placement.count = len(spec.placement.hosts)
- service_name = daemon_type
- if spec.name:
- service_name += '.' + spec.name
+ # TODO: rename service_name to spec.name if works
+ service_name = spec.name
daemons = self.cache.get_daemons_by_service(service_name)
return self._create_daemons(daemon_type, spec, daemons,
create_func, config_func)
def apply_mgr(self, spec):
self.save_spec(spec)
self._kick_serve_loop()
- return trivial_result("scheduled MGR creation..")
+ return trivial_result("Scheduled MGR creation..")
def _apply_mgr(self, spec):
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
def apply_mds(self, spec: orchestrator.ServiceSpec) -> orchestrator.Completion:
self.save_spec(spec)
self._kick_serve_loop()
- return trivial_result("scheduled MDS creation..")
+ return trivial_result("Scheduled MDS creation..")
def _apply_mds(self, spec):
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
def apply_rgw(self, spec):
self.save_spec(spec)
self._kick_serve_loop()
- return trivial_result("scheduled RGW creation..")
+ return trivial_result("Scheduled RGW creation..")
def _apply_rgw(self, spec):
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
def apply_rbd_mirror(self, spec):
self.save_spec(spec)
self._kick_serve_loop()
- return trivial_result("scheduled rbd-mirror creation..")
+ return trivial_result("Scheduled rbd-mirror creation..")
def _apply_rbd_mirror(self, spec):
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
def apply_prometheus(self, spec):
self.save_spec(spec)
self._kick_serve_loop()
- return trivial_result("scheduled prometheus creation..")
+ return trivial_result("Scheduled prometheus creation..")
def add_node_exporter(self, spec):
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
def apply_node_exporter(self, spec):
self.save_spec(spec)
self._kick_serve_loop()
- return trivial_result("scheduled node-exporter creation..")
+ return trivial_result("Scheduled node-exporter creation..")
def _apply_node_exporter(self, spec):
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
Raises `OrchestratorError` if an entry with the same named identifier already exist to prevent overwrites from two different sources.
"""
- store = PersistentStoreDict(self, f'service_spec/{spec.service_type}')
- name: str = spec.name or spec.service_type
+ full_spec_name = f"{spec.service_type}.{spec.name}"
try:
- _ = store[name]
- raise orchestrator.OrchestratorError(f"Specification for {name} already exists. "
+ _ = self.service_spec_store[full_spec_name]
+ raise orchestrator.OrchestratorError(f"Specification for {full_spec_name} already exists. "
"Please review your existing specs with "
- "`ceph orch servicespecs ls` and try again.")
+ "`ceph orch spec dump` and try again.")
except IndexError:
- self.log.info(f"New spec found. Saving <{name}> to the store.")
- store[name] = spec.to_json()
+ self.log.info(f"New spec found. Saving <{full_spec_name}> to the store.")
+ _ = self.service_spec_store[full_spec_name] = spec.to_json()
def list_specs(self) -> orchestrator.Completion:
"""
specs.append(yaml.dump(spec))
return trivial_result(specs)
- def clear_all_specs(self) -> orchestrator.Completion:
- """
- Clears the service_spec root in the mon_store (mgr/cephadm/service_specs/)
- """
- self.service_spec_store.clear()
- return trivial_result(f"Mon store for {self.service_spec_store.prefix} cleared")
-
def apply_service_config(self, spec_document: str) -> orchestrator.Completion:
"""
Parse a multi document yaml file (represented in a inbuf object)
"""
content: Iterator[Any] = yaml.load_all(spec_document)
# Load all specs from a multi document yaml file.
+ loaded_specs: List[ServiceSpec] = list()
for spec in content:
# load ServiceSpec once to validate
spec_o = ServiceSpec.from_json(spec)
- self.save_spec(spec_o)
+ loaded_specs.append(spec_o)
+ for spec in loaded_specs:
+ self.save_spec(spec)
self._kick_serve_loop()
return trivial_result("ServiceSpecs saved")
"""
specs = list()
self.log.debug(f"Checking for type {find_service_type}")
- store = PersistentStoreDict(self, f'service_spec/{find_service_type}')
- for service_type, json_specs in store.items():
-
+ for spec_key, json_specs in self.service_spec_store.items():
+ if not spec_key.split('.')[0].startswith(find_service_type):
+ continue
if isinstance(json_specs, dict):
self.log.debug(f"Found dict in json_specs: No need to decode")
elif isinstance(json_specs, str):
self.log.debug(f"Found str in json_specs: Decoding from json")
json_specs = json.loads(json_specs)
- self.log.debug(f"Found service_type: {service_type} in the k-v store. Adding..")
+ self.log.debug(f"Found service_type: {spec_key} in the k-v store. Adding..")
specs.append(ServiceSpec.from_json(json_specs))
self.log.debug(f"Found {specs} specs.")
return specs
def test_mds(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
- c = cephadm_module.add_mds(ServiceSpec('name', placement=ps, service_type='mds'))
+ c = cephadm_module.add_mds(ServiceSpec(name='name', placement=ps, service_type='mds'))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed mds.name.* on host 'test'")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
@mock.patch("cephadm.module.CephadmOrchestrator._remove_key_from_store")
- def test_remove_daemon(self, _save_host, _rm_host, _save_spec, cephadm_module):
+ def test_remove_daemon(self, rm_key, _rm_host, _save_spec, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
wait(cephadm_module, c)
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
@mock.patch("cephadm.module.CephadmOrchestrator._remove_key_from_store")
- def test_remove_service(self, _save_host, _rm_host, _save_spec, cephadm_module):
+ def test_remove_service(self, _rm_key, _rm_host, _save_spec, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
wait(cephadm_module, c)
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
- c = cephadm_module.add_alertmanager(ServiceSpec(placement=ps))
+ c = cephadm_module.add_alertmanager(ServiceSpec(placement=ps, service_type='alertmanager'))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed alertmanager.* on host 'test'")
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.blink_device_light('ident', True, [('test', '', '')])
assert wait(cephadm_module, c) == ['Set ident light for test: on']
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
+ @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
+ @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
+ @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+ @mock.patch("cephadm.module.HostCache.save_host")
+ @mock.patch("cephadm.module.HostCache.rm_host")
+ def test_apply_mgr_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
+ with self._with_host(cephadm_module, 'test'):
+ ps = PlacementSpec(hosts=['test'], count=1)
+ spec = ServiceSpec(placement=ps, service_type='mgr')
+ c = cephadm_module.apply_mgr(spec)
+ _save_spec.assert_called_with(spec)
+ assert wait(cephadm_module, c) == 'Scheduled MGR creation..'
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
+ @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
+ @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
+ @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+ @mock.patch("cephadm.module.HostCache.save_host")
+ @mock.patch("cephadm.module.HostCache.rm_host")
+ def test_apply_mds_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
+ with self._with_host(cephadm_module, 'test'):
+ ps = PlacementSpec(hosts=['test'], count=1)
+ spec = ServiceSpec(placement=ps, service_type='mds')
+ c = cephadm_module.apply_mds(spec)
+ _save_spec.assert_called_with(spec)
+ assert wait(cephadm_module, c) == 'Scheduled MDS creation..'
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
+ @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
+ @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
+ @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+ @mock.patch("cephadm.module.HostCache.save_host")
+ @mock.patch("cephadm.module.HostCache.rm_host")
+ def test_apply_rgw_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
+ with self._with_host(cephadm_module, 'test'):
+ ps = PlacementSpec(hosts=['test'], count=1)
+ spec = ServiceSpec(placement=ps, service_type='rgw')
+ c = cephadm_module.apply_rgw(spec)
+ _save_spec.assert_called_with(spec)
+ assert wait(cephadm_module, c) == 'Scheduled RGW creation..'
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
+ @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
+ @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
+ @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+ @mock.patch("cephadm.module.HostCache.save_host")
+ @mock.patch("cephadm.module.HostCache.rm_host")
+ def test_apply_rbd_mirror_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
+ with self._with_host(cephadm_module, 'test'):
+ ps = PlacementSpec(hosts=['test'], count=1)
+ spec = ServiceSpec(placement=ps, service_type='rbd-mirror')
+ c = cephadm_module.apply_rbd_mirror(spec)
+ _save_spec.assert_called_with(spec)
+ assert wait(cephadm_module, c) == 'Scheduled rbd-mirror creation..'
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
+ @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
+ @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
+ @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+ @mock.patch("cephadm.module.HostCache.save_host")
+ @mock.patch("cephadm.module.HostCache.rm_host")
+ def test_apply_prometheus_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
+ with self._with_host(cephadm_module, 'test'):
+ ps = PlacementSpec(hosts=['test'], count=1)
+ spec = ServiceSpec(placement=ps, service_type='prometheus')
+ c = cephadm_module.apply_prometheus(spec)
+ _save_spec.assert_called_with(spec)
+ assert wait(cephadm_module, c) == 'Scheduled prometheus creation..'
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
+ @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
+ @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
+ @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+ @mock.patch("cephadm.module.HostCache.save_host")
+ @mock.patch("cephadm.module.HostCache.rm_host")
+ def test_apply_node_exporter_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
+ with self._with_host(cephadm_module, 'test'):
+ ps = PlacementSpec(hosts=['test'], count=1)
+ spec = ServiceSpec(placement=ps, service_type='node_exporter')
+ c = cephadm_module.apply_node_exporter(spec)
+ _save_spec.assert_called_with(spec)
+ assert wait(cephadm_module, c) == 'Scheduled node-exporter creation..'
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
+ @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
+ @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
+ @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+ @mock.patch("cephadm.module.HostCache.save_host")
+ @mock.patch("cephadm.module.HostCache.rm_host")
+ @mock.patch("cephadm.module.yaml.load_all", return_value=[{'service_type': 'rgw', 'placement': {'count': 1}, 'spec': {'rgw_realm': 'realm1', 'rgw_zone': 'zone1'}}])
+ @mock.patch("cephadm.module.ServiceSpec")
+ def test_apply_service_config(self, _sspec, _yaml, _send_command, _get_connection, _save_spec, _save_host,
+ _rm_host, cephadm_module):
+ with self._with_host(cephadm_module, 'test'):
+ c = cephadm_module.apply_service_config('dummy')
+ _save_spec.assert_called_once()
+ _sspec.from_json.assert_called_once()
+ assert wait(cephadm_module, c) == 'ServiceSpecs saved'
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
+ @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
+ @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
+ @mock.patch("cephadm.module.HostCache.save_host")
+ @mock.patch("cephadm.module.HostCache.rm_host")
+ @mock.patch("cephadm.module.CephadmOrchestrator.find_json_specs")
+ def test_trigger_deployment_todo(self, _find_json_spec, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
+ with self._with_host(cephadm_module, 'test'):
+ _find_json_spec.return_value = ['something']
+ c = cephadm_module.trigger_deployment('foo', lambda x: x)
+ _find_json_spec.assert_called_with('foo')
+ assert c == ['something']
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
+ @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
+ @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
+ @mock.patch("cephadm.module.HostCache.save_host")
+ @mock.patch("cephadm.module.HostCache.rm_host")
+ @mock.patch("cephadm.module.CephadmOrchestrator.find_json_specs")
+ def test_trigger_deployment_no_todo(self, _find_json_spec, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
+ with self._with_host(cephadm_module, 'test'):
+ _find_json_spec.return_value = []
+ c = cephadm_module.trigger_deployment('foo', lambda x: x)
+ _find_json_spec.assert_called_with('foo')
+ assert wait(cephadm_module, c[0]) == 'Nothing to do..'
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
+ @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
+ @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
+ @mock.patch("cephadm.module.HostCache.save_host")
+ @mock.patch("cephadm.module.HostCache.rm_host")
+ @mock.patch("cephadm.module.CephadmOrchestrator.trigger_deployment")
+ def test_apply_services(self, _trigger_deployment, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
+ with self._with_host(cephadm_module, 'test'):
+ c = cephadm_module._apply_services()
+ _trigger_deployment.assert_any_call('mgr', cephadm_module._apply_mgr)
+ _trigger_deployment.assert_any_call('prometheus', cephadm_module._apply_prometheus)
+ _trigger_deployment.assert_any_call('node-exporter', cephadm_module._apply_node_exporter)
+ _trigger_deployment.assert_any_call('mds', cephadm_module._apply_mds)
+ _trigger_deployment.assert_any_call('rgw', cephadm_module._apply_rgw)
+ _trigger_deployment.assert_any_call('rbd-mirror', cephadm_module._apply_rbd_mirror)
+ assert isinstance(c, list)