from mgr_module import MgrModule, HandleCommandResult
import orchestrator
from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
- CLICommandMeta, OrchestratorEvent
+ CLICommandMeta, OrchestratorEvent, set_exception_subject, DaemonDescription
from orchestrator._interface import GenericSpec
from . import remotes
self.log.debug("serve starting")
while self.run:
- # refresh daemons
- self.log.debug('refreshing hosts and daemons')
- self._refresh_hosts_and_daemons()
+ try:
- self._check_for_strays()
+ # refresh daemons
+ self.log.debug('refreshing hosts and daemons')
+ self._refresh_hosts_and_daemons()
- if self.paused:
- self.health_checks['CEPHADM_PAUSED'] = {
- 'severity': 'warning',
- 'summary': 'cephadm background work is paused',
- 'count': 1,
- 'detail': ["'ceph orch resume' to resume"],
- }
- self.set_health_checks(self.health_checks)
- else:
- if 'CEPHADM_PAUSED' in self.health_checks:
- del self.health_checks['CEPHADM_PAUSED']
+ self._check_for_strays()
+
+ if self.paused:
+ self.health_checks['CEPHADM_PAUSED'] = {
+ 'severity': 'warning',
+ 'summary': 'cephadm background work is paused',
+ 'count': 1,
+ 'detail': ["'ceph orch resume' to resume"],
+ }
self.set_health_checks(self.health_checks)
+ else:
+ if 'CEPHADM_PAUSED' in self.health_checks:
+ del self.health_checks['CEPHADM_PAUSED']
+ self.set_health_checks(self.health_checks)
- self.rm_util._remove_osds_bg()
+ self.rm_util._remove_osds_bg()
- self.migration.migrate()
- if self.migration.is_migration_ongoing():
- continue
+ self.migration.migrate()
+ if self.migration.is_migration_ongoing():
+ continue
- if self._apply_all_services():
- continue # did something, refresh
+ if self._apply_all_services():
+ continue # did something, refresh
- self._check_daemons()
+ self._check_daemons()
- if self.upgrade.continue_upgrade():
- continue
+ if self.upgrade.continue_upgrade():
+ continue
+
+ except OrchestratorError as e:
+ if e.event_subject:
+ self.events.add(OrchestratorEvent(
+ datetime.datetime.utcnow(),
+ e.event_subject[0],
+ e.event_subject[1],
+ "ERROR",
+ str(e)
+ ))
self._serve_sleep()
self.log.debug("serve exit")
if err:
self.log.debug('err: %s' % '\n'.join(err))
if code and not error_ok:
- raise RuntimeError(
+ raise OrchestratorError(
'cephadm exited with an error code: %d, stderr:%s' % (
code, '\n'.join(err)))
return out, err, code
@forall_hosts
def _daemon_actions(self, daemon_type, daemon_id, host, action):
- return self._daemon_action(daemon_type, daemon_id, host, action)
+ with set_exception_subject('daemon', DaemonDescription(
+ daemon_type=daemon_type,
+ daemon_id=daemon_id
+ ).name()):
+ return self._daemon_action(daemon_type, daemon_id, host, action)
def _daemon_action(self, daemon_type, daemon_id, host, action):
daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
) -> str:
- start_time = datetime.datetime.utcnow()
- cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(daemon_spec)
+ with set_exception_subject('service', orchestrator.DaemonDescription(
+ daemon_type=daemon_spec.daemon_type,
+ daemon_id=daemon_spec.daemon_id,
+ hostname=daemon_spec.host,
+ ).service_id(), overwrite=True):
+
+ start_time = datetime.datetime.utcnow()
+ cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(daemon_spec)
- daemon_spec.extra_args.extend(['--config-json', '-'])
+ daemon_spec.extra_args.extend(['--config-json', '-'])
- # osd deployments needs an --osd-uuid arg
- if daemon_spec.daemon_type == 'osd':
- if not osd_uuid_map:
- osd_uuid_map = self.get_osd_uuid_map()
- osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
- if not osd_uuid:
- raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
- daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
+ # osd deployments needs an --osd-uuid arg
+ if daemon_spec.daemon_type == 'osd':
+ if not osd_uuid_map:
+ osd_uuid_map = self.get_osd_uuid_map()
+ osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
+ if not osd_uuid:
+ raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
+ daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
- if reconfig:
- daemon_spec.extra_args.append('--reconfig')
- if self.allow_ptrace:
- daemon_spec.extra_args.append('--allow-ptrace')
+ if reconfig:
+ daemon_spec.extra_args.append('--reconfig')
+ if self.allow_ptrace:
+ daemon_spec.extra_args.append('--allow-ptrace')
- self.log.info('%s daemon %s on %s' % (
- 'Reconfiguring' if reconfig else 'Deploying',
- daemon_spec.name(), daemon_spec.host))
+ self.log.info('%s daemon %s on %s' % (
+ 'Reconfiguring' if reconfig else 'Deploying',
+ daemon_spec.name(), daemon_spec.host))
- out, err, code = self._run_cephadm(
- daemon_spec.host, daemon_spec.name(), 'deploy',
- [
- '--name', daemon_spec.name(),
- ] + daemon_spec.extra_args,
- stdin=json.dumps(cephadm_config))
- if not code and daemon_spec.host in self.cache.daemons:
- # prime cached service state with what we (should have)
- # just created
- sd = orchestrator.DaemonDescription()
- sd.daemon_type = daemon_spec.daemon_type
- sd.daemon_id = daemon_spec.daemon_id
- sd.hostname = daemon_spec.host
- sd.status = 1
- sd.status_desc = 'starting'
- self.cache.add_daemon(daemon_spec.host, sd)
- if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
- self.requires_post_actions.add(daemon_spec.daemon_type)
- self.cache.invalidate_host_daemons(daemon_spec.host)
- self.cache.update_daemon_config_deps(daemon_spec.host, daemon_spec.name(), deps, start_time)
- self.cache.save_host(daemon_spec.host)
- msg = "{} {} on host '{}'".format(
- 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
- if not code:
- self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
- else:
- what = 'reconfigure' if reconfig else 'deploy'
- self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
- return msg
+ out, err, code = self._run_cephadm(
+ daemon_spec.host, daemon_spec.name(), 'deploy',
+ [
+ '--name', daemon_spec.name(),
+ ] + daemon_spec.extra_args,
+ stdin=json.dumps(cephadm_config))
+ if not code and daemon_spec.host in self.cache.daemons:
+ # prime cached service state with what we (should have)
+ # just created
+ sd = orchestrator.DaemonDescription()
+ sd.daemon_type = daemon_spec.daemon_type
+ sd.daemon_id = daemon_spec.daemon_id
+ sd.hostname = daemon_spec.host
+ sd.status = 1
+ sd.status_desc = 'starting'
+ self.cache.add_daemon(daemon_spec.host, sd)
+ if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
+ self.requires_post_actions.add(daemon_spec.daemon_type)
+ self.cache.invalidate_host_daemons(daemon_spec.host)
+ self.cache.update_daemon_config_deps(daemon_spec.host, daemon_spec.name(), deps, start_time)
+ self.cache.save_host(daemon_spec.host)
+ msg = "{} {} on host '{}'".format(
+ 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
+ if not code:
+ self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
+ else:
+ what = 'reconfigure' if reconfig else 'deploy'
+ self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
+ return msg
@forall_hosts
def _remove_daemons(self, name, host) -> str:
"""
(daemon_type, daemon_id) = name.split('.', 1)
- self.cephadm_services[daemon_type].pre_remove(daemon_id)
+ with set_exception_subject('service', orchestrator.DaemonDescription(
+ daemon_type=daemon_type,
+ daemon_id=daemon_id,
+ hostname=host,
+ ).service_id(), overwrite=True):
- args = ['--name', name, '--force']
- self.log.info('Removing daemon %s from %s' % (name, host))
- out, err, code = self._run_cephadm(
- host, name, 'rm-daemon', args)
- if not code:
- # remove item from cache
- self.cache.rm_daemon(host, name)
- self.cache.invalidate_host_daemons(host)
- return "Removed {} from host '{}'".format(name, host)
+
+ self.cephadm_services[daemon_type].pre_remove(daemon_id)
+
+ args = ['--name', name, '--force']
+ self.log.info('Removing daemon %s from %s' % (name, host))
+ out, err, code = self._run_cephadm(
+ host, name, 'rm-daemon', args)
+ if not code:
+ # remove item from cache
+ self.cache.rm_daemon(host, name)
+ self.cache.invalidate_host_daemons(host)
+ return "Removed {} from host '{}'".format(name, host)
def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]:
return {
except Exception as e:
self.log.exception('Failed to apply %s spec %s: %s' % (
spec.service_name(), spec, e))
+ self.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
+
return r
def _check_pool_exists(self, pool, service_name):