raise OrchestratorError(f'Cannot find pool "{pool}" for '
f'service {service_name}')
- def _check_daemons(self):
-
- daemons = self.cache.get_daemons()
- daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
- for dd in daemons:
- # orphan?
- spec = self.spec_store.specs.get(dd.service_name(), None)
- if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
- # (mon and mgr specs should always exist; osds aren't matched
- # to a service spec)
- self.log.info('Removing orphan daemon %s...' % dd.name())
- self._remove_daemon(dd.name(), dd.hostname)
-
- # ignore unmanaged services
- if spec and spec.unmanaged:
- continue
-
- # These daemon types require additional configs after creation
- if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
- daemons_post[dd.daemon_type].append(dd)
-
- if self.cephadm_services[dd.daemon_type].get_active_daemon(
- self.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
- dd.is_active = True
- else:
- dd.is_active = False
-
- deps = self._calc_daemon_deps(dd.daemon_type, dd.daemon_id)
- last_deps, last_config = self.cache.get_daemon_last_config_deps(
- dd.hostname, dd.name())
- if last_deps is None:
- last_deps = []
- action = self.cache.get_scheduled_daemon_action(dd.hostname, dd.name())
- if not last_config:
- self.log.info('Reconfiguring %s (unknown last config time)...' % (
- dd.name()))
- action = 'reconfig'
- elif last_deps != deps:
- self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps,
- deps))
- self.log.info('Reconfiguring %s (dependencies changed)...' % (
- dd.name()))
- action = 'reconfig'
- elif self.last_monmap and \
- self.last_monmap > last_config and \
- dd.daemon_type in CEPH_TYPES:
- self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
- action = 'reconfig'
- elif self.extra_ceph_conf_is_newer(last_config) and \
- dd.daemon_type in CEPH_TYPES:
- self.log.info('Reconfiguring %s (extra config changed)...' % dd.name())
- action = 'reconfig'
- if action:
- if self.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \
- and action == 'reconfig':
- action = 'redeploy'
- try:
- self._daemon_action(
- daemon_type=dd.daemon_type,
- daemon_id=dd.daemon_id,
- host=dd.hostname,
- action=action
- )
- self.cache.rm_scheduled_daemon_action(dd.hostname, dd.name())
- except OrchestratorError as e:
- self.events.from_orch_error(e)
- if dd.daemon_type in daemons_post:
- del daemons_post[dd.daemon_type]
- # continue...
- except Exception as e:
- self.events.for_daemon_from_exception(dd.name(), e)
- if dd.daemon_type in daemons_post:
- del daemons_post[dd.daemon_type]
- # continue...
-
- # do daemon post actions
- for daemon_type, daemon_descs in daemons_post.items():
- if daemon_type in self.requires_post_actions:
- self.requires_post_actions.remove(daemon_type)
- self._get_cephadm_service(daemon_type).daemon_check_post(daemon_descs)
-
def _add_daemon(self, daemon_type, spec,
create_func: Callable[..., CephadmDaemonSpec], config_func=None) -> List[str]:
"""
import json
import logging
from collections import defaultdict
-from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set
+from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set, Dict
try:
import remoto
import orchestrator
from cephadm.schedule import HostAssignment
+from cephadm.upgrade import CEPH_UPGRADE_ORDER
from cephadm.utils import forall_hosts, cephadmNoImage, str_to_datetime
from orchestrator import OrchestratorError
logger = logging.getLogger(__name__)
+CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
+
class CephadmServe:
"""
if self._apply_all_services():
continue # did something, refresh
- self.mgr._check_daemons()
+ self._check_daemons()
if self.mgr.upgrade.continue_upgrade():
continue
if r is None:
r = False
return r
+
+ def _check_daemons(self) -> None:
+
+ daemons = self.mgr.cache.get_daemons()
+ daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
+ for dd in daemons:
+ # orphan?
+ spec = self.mgr.spec_store.specs.get(dd.service_name(), None)
+ if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
+ # (mon and mgr specs should always exist; osds aren't matched
+ # to a service spec)
+ self.log.info('Removing orphan daemon %s...' % dd.name())
+ self.mgr._remove_daemon(dd.name(), dd.hostname)
+
+ # ignore unmanaged services
+ if spec and spec.unmanaged:
+ continue
+
+ # These daemon types require additional configs after creation
+ if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
+ daemons_post[dd.daemon_type].append(dd)
+
+ if self.mgr.cephadm_services[dd.daemon_type].get_active_daemon(
+ self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
+ dd.is_active = True
+ else:
+ dd.is_active = False
+
+ deps = self.mgr._calc_daemon_deps(dd.daemon_type, dd.daemon_id)
+ last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
+ dd.hostname, dd.name())
+ if last_deps is None:
+ last_deps = []
+ action = self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name())
+ if not last_config:
+ self.log.info('Reconfiguring %s (unknown last config time)...' % (
+ dd.name()))
+ action = 'reconfig'
+ elif last_deps != deps:
+ self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps,
+ deps))
+ self.log.info('Reconfiguring %s (dependencies changed)...' % (
+ dd.name()))
+ action = 'reconfig'
+ elif self.mgr.last_monmap and \
+ self.mgr.last_monmap > last_config and \
+ dd.daemon_type in CEPH_TYPES:
+ self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
+ action = 'reconfig'
+ elif self.mgr.extra_ceph_conf_is_newer(last_config) and \
+ dd.daemon_type in CEPH_TYPES:
+ self.log.info('Reconfiguring %s (extra config changed)...' % dd.name())
+ action = 'reconfig'
+ if action:
+ if self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \
+ and action == 'reconfig':
+ action = 'redeploy'
+ try:
+ self.mgr._daemon_action(
+ daemon_type=dd.daemon_type,
+ daemon_id=dd.daemon_id,
+ host=dd.hostname,
+ action=action
+ )
+ self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name())
+ except OrchestratorError as e:
+ self.mgr.events.from_orch_error(e)
+ if dd.daemon_type in daemons_post:
+ del daemons_post[dd.daemon_type]
+ # continue...
+ except Exception as e:
+ self.mgr.events.for_daemon_from_exception(dd.name(), e)
+ if dd.daemon_type in daemons_post:
+ del daemons_post[dd.daemon_type]
+ # continue...
+
+ # do daemon post actions
+ for daemon_type, daemon_descs in daemons_post.items():
+ if daemon_type in self.mgr.requires_post_actions:
+ self.mgr.requires_post_actions.remove(daemon_type)
+ self.mgr._get_cephadm_service(daemon_type).daemon_check_post(daemon_descs)
}
cephadm_module.notify('mon_map', None)
- cephadm_module._check_daemons()
+ CephadmServe(cephadm_module)._check_daemons()
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
@mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
})
cephadm_module.notify('mon_map', None)
- cephadm_module._check_daemons()
+ CephadmServe(cephadm_module)._check_daemons()
evs = [e.message for e in cephadm_module.events.get_for_daemon(
f'rgw.{daemon_id}')]
assert cephadm_module.cache.get_scheduled_daemon_action(
'test', daemon_name) == action
- cephadm_module._check_daemons()
+ CephadmServe(cephadm_module)._check_daemons()
assert cephadm_module.cache.get_scheduled_daemon_action('test', daemon_name) is None
cephadm_module._set_extra_ceph_conf('[mon]\nk=v')
- cephadm_module._check_daemons()
+ CephadmServe(cephadm_module)._check_daemons()
_run_cephadm.assert_called_with('test', 'mon.test', 'deploy', [
'--name', 'mon.test', '--reconfig', '--config-json', '-'],
})
with mock.patch("cephadm.module.CephadmOrchestrator.mon_command") as _mon_cmd:
-
- cephadm_module._check_daemons()
+ CephadmServe(cephadm_module)._check_daemons()
_mon_cmd.assert_any_call(
{'prefix': 'dashboard set-grafana-api-url', 'value': 'https://test:3000'})