From 1dd2c5c4fbce49495736c12c3aea2c96f6ad4696 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Sun, 7 Jun 2020 01:25:21 +0200 Subject: [PATCH] mgr/cephadm: Set exception context to populate orch events. Like when if daemon deployment fails Signed-off-by: Sebastian Wagner --- src/pybind/mgr/cephadm/module.py | 199 +++++++++++++--------- src/pybind/mgr/orchestrator/__init__.py | 2 +- src/pybind/mgr/orchestrator/_interface.py | 15 ++ src/pybind/mgr/orchestrator/module.py | 4 +- src/pybind/mgr/selftest/module.py | 4 +- 5 files changed, 135 insertions(+), 89 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index caa6de4b4c8bc..73459d3f2336b 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -29,7 +29,7 @@ from cephadm.services.cephadmservice import CephadmDaemonSpec from mgr_module import MgrModule, HandleCommandResult import orchestrator from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \ - CLICommandMeta, OrchestratorEvent + CLICommandMeta, OrchestratorEvent, set_exception_subject, DaemonDescription from orchestrator._interface import GenericSpec from . import remotes @@ -472,38 +472,50 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.log.debug("serve starting") while self.run: - # refresh daemons - self.log.debug('refreshing hosts and daemons') - self._refresh_hosts_and_daemons() + try: - self._check_for_strays() + # refresh daemons + self.log.debug('refreshing hosts and daemons') + self._refresh_hosts_and_daemons() - if self.paused: - self.health_checks['CEPHADM_PAUSED'] = { - 'severity': 'warning', - 'summary': 'cephadm background work is paused', - 'count': 1, - 'detail': ["'ceph orch resume' to resume"], - } - self.set_health_checks(self.health_checks) - else: - if 'CEPHADM_PAUSED' in self.health_checks: - del self.health_checks['CEPHADM_PAUSED'] + self._check_for_strays() + + if self.paused: + self.health_checks['CEPHADM_PAUSED'] = { + 'severity': 'warning', + 'summary': 'cephadm background work is paused', + 'count': 1, + 'detail': ["'ceph orch resume' to resume"], + } self.set_health_checks(self.health_checks) + else: + if 'CEPHADM_PAUSED' in self.health_checks: + del self.health_checks['CEPHADM_PAUSED'] + self.set_health_checks(self.health_checks) - self.rm_util._remove_osds_bg() + self.rm_util._remove_osds_bg() - self.migration.migrate() - if self.migration.is_migration_ongoing(): - continue + self.migration.migrate() + if self.migration.is_migration_ongoing(): + continue - if self._apply_all_services(): - continue # did something, refresh + if self._apply_all_services(): + continue # did something, refresh - self._check_daemons() + self._check_daemons() - if self.upgrade.continue_upgrade(): - continue + if self.upgrade.continue_upgrade(): + continue + + except OrchestratorError as e: + if e.event_subject: + self.events.add(OrchestratorEvent( + datetime.datetime.utcnow(), + e.event_subject[0], + e.event_subject[1], + "ERROR", + str(e) + )) self._serve_sleep() self.log.debug("serve exit") @@ -1071,7 +1083,7 @@ you may want to run: if err: self.log.debug('err: %s' % '\n'.join(err)) if code and not error_ok: - raise RuntimeError( + raise OrchestratorError( 'cephadm exited with an error code: %d, stderr:%s' % ( code, '\n'.join(err))) return out, err, code @@ -1467,7 +1479,11 @@ you may want to run: @forall_hosts def _daemon_actions(self, daemon_type, daemon_id, host, action): - return self._daemon_action(daemon_type, daemon_id, host, action) + with set_exception_subject('daemon', DaemonDescription( + daemon_type=daemon_type, + daemon_id=daemon_id + ).name()): + return self._daemon_action(daemon_type, daemon_id, host, action) def _daemon_action(self, daemon_type, daemon_id, host, action): daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec( @@ -1728,58 +1744,64 @@ you may want to run: ) -> str: - start_time = datetime.datetime.utcnow() - cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(daemon_spec) + with set_exception_subject('service', orchestrator.DaemonDescription( + daemon_type=daemon_spec.daemon_type, + daemon_id=daemon_spec.daemon_id, + hostname=daemon_spec.host, + ).service_id(), overwrite=True): + + start_time = datetime.datetime.utcnow() + cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(daemon_spec) - daemon_spec.extra_args.extend(['--config-json', '-']) + daemon_spec.extra_args.extend(['--config-json', '-']) - # osd deployments needs an --osd-uuid arg - if daemon_spec.daemon_type == 'osd': - if not osd_uuid_map: - osd_uuid_map = self.get_osd_uuid_map() - osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id) - if not osd_uuid: - raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id) - daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid]) + # osd deployments needs an --osd-uuid arg + if daemon_spec.daemon_type == 'osd': + if not osd_uuid_map: + osd_uuid_map = self.get_osd_uuid_map() + osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id) + if not osd_uuid: + raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id) + daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid]) - if reconfig: - daemon_spec.extra_args.append('--reconfig') - if self.allow_ptrace: - daemon_spec.extra_args.append('--allow-ptrace') + if reconfig: + daemon_spec.extra_args.append('--reconfig') + if self.allow_ptrace: + daemon_spec.extra_args.append('--allow-ptrace') - self.log.info('%s daemon %s on %s' % ( - 'Reconfiguring' if reconfig else 'Deploying', - daemon_spec.name(), daemon_spec.host)) + self.log.info('%s daemon %s on %s' % ( + 'Reconfiguring' if reconfig else 'Deploying', + daemon_spec.name(), daemon_spec.host)) - out, err, code = self._run_cephadm( - daemon_spec.host, daemon_spec.name(), 'deploy', - [ - '--name', daemon_spec.name(), - ] + daemon_spec.extra_args, - stdin=json.dumps(cephadm_config)) - if not code and daemon_spec.host in self.cache.daemons: - # prime cached service state with what we (should have) - # just created - sd = orchestrator.DaemonDescription() - sd.daemon_type = daemon_spec.daemon_type - sd.daemon_id = daemon_spec.daemon_id - sd.hostname = daemon_spec.host - sd.status = 1 - sd.status_desc = 'starting' - self.cache.add_daemon(daemon_spec.host, sd) - if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']: - self.requires_post_actions.add(daemon_spec.daemon_type) - self.cache.invalidate_host_daemons(daemon_spec.host) - self.cache.update_daemon_config_deps(daemon_spec.host, daemon_spec.name(), deps, start_time) - self.cache.save_host(daemon_spec.host) - msg = "{} {} on host '{}'".format( - 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host) - if not code: - self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg) - else: - what = 'reconfigure' if reconfig else 'deploy' - self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}') - return msg + out, err, code = self._run_cephadm( + daemon_spec.host, daemon_spec.name(), 'deploy', + [ + '--name', daemon_spec.name(), + ] + daemon_spec.extra_args, + stdin=json.dumps(cephadm_config)) + if not code and daemon_spec.host in self.cache.daemons: + # prime cached service state with what we (should have) + # just created + sd = orchestrator.DaemonDescription() + sd.daemon_type = daemon_spec.daemon_type + sd.daemon_id = daemon_spec.daemon_id + sd.hostname = daemon_spec.host + sd.status = 1 + sd.status_desc = 'starting' + self.cache.add_daemon(daemon_spec.host, sd) + if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']: + self.requires_post_actions.add(daemon_spec.daemon_type) + self.cache.invalidate_host_daemons(daemon_spec.host) + self.cache.update_daemon_config_deps(daemon_spec.host, daemon_spec.name(), deps, start_time) + self.cache.save_host(daemon_spec.host) + msg = "{} {} on host '{}'".format( + 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host) + if not code: + self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg) + else: + what = 'reconfigure' if reconfig else 'deploy' + self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}') + return msg @forall_hosts def _remove_daemons(self, name, host) -> str: @@ -1791,17 +1813,24 @@ you may want to run: """ (daemon_type, daemon_id) = name.split('.', 1) - self.cephadm_services[daemon_type].pre_remove(daemon_id) + with set_exception_subject('service', orchestrator.DaemonDescription( + daemon_type=daemon_type, + daemon_id=daemon_id, + hostname=host, + ).service_id(), overwrite=True): - args = ['--name', name, '--force'] - self.log.info('Removing daemon %s from %s' % (name, host)) - out, err, code = self._run_cephadm( - host, name, 'rm-daemon', args) - if not code: - # remove item from cache - self.cache.rm_daemon(host, name) - self.cache.invalidate_host_daemons(host) - return "Removed {} from host '{}'".format(name, host) + + self.cephadm_services[daemon_type].pre_remove(daemon_id) + + args = ['--name', name, '--force'] + self.log.info('Removing daemon %s from %s' % (name, host)) + out, err, code = self._run_cephadm( + host, name, 'rm-daemon', args) + if not code: + # remove item from cache + self.cache.rm_daemon(host, name) + self.cache.invalidate_host_daemons(host) + return "Removed {} from host '{}'".format(name, host) def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]: return { @@ -1924,6 +1953,8 @@ you may want to run: except Exception as e: self.log.exception('Failed to apply %s spec %s: %s' % ( spec.service_name(), spec, e)) + self.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e)) + return r def _check_pool_exists(self, pool, service_name): diff --git a/src/pybind/mgr/orchestrator/__init__.py b/src/pybind/mgr/orchestrator/__init__.py index fcb6a1313f2df..2d6cba8a15626 100644 --- a/src/pybind/mgr/orchestrator/__init__.py +++ b/src/pybind/mgr/orchestrator/__init__.py @@ -14,6 +14,6 @@ from ._interface import \ OrchestratorValidationError, OrchestratorError, NoOrchestrator, \ ServiceDescription, InventoryFilter, HostSpec, \ DaemonDescription, \ - OrchestratorEvent, \ + OrchestratorEvent, set_exception_subject, \ InventoryHost, DeviceLightLoc, \ UpgradeStatusSpec diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index 0f0040907307d..ab9b5539b0ee4 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -15,6 +15,7 @@ import time import uuid from collections import namedtuple, OrderedDict +from contextlib import contextmanager from functools import wraps import yaml @@ -46,6 +47,10 @@ class OrchestratorError(Exception): It's not intended for programming errors or orchestrator internal errors. """ + def __init__(self, msg, event_kind_subject: Optional[Tuple[str, str]]=None): + super(Exception, self).__init__(msg) + # See OrchestratorEvent.subject + self.event_subject = event_kind_subject class NoOrchestrator(OrchestratorError): @@ -62,6 +67,16 @@ class OrchestratorValidationError(OrchestratorError): """ +@contextmanager +def set_exception_subject(kind, subject, overwrite=False): + try: + yield + except OrchestratorError as e: + if overwrite or hasattr(e, 'event_subject'): + e.event_subject = (kind, subject) + raise + + def handle_exception(prefix, cmd_args, desc, perm, func): @wraps(func) def wrapper(*args, **kwargs): diff --git a/src/pybind/mgr/orchestrator/module.py b/src/pybind/mgr/orchestrator/module.py index 3096d0fe89070..93e7b97bfaa10 100644 --- a/src/pybind/mgr/orchestrator/module.py +++ b/src/pybind/mgr/orchestrator/module.py @@ -1278,14 +1278,14 @@ Usage: raise_if_exception(e1) assert False except ZeroDivisionError as e: - assert e.args == ('hello', 'world') + assert e.args == ('hello, world',) e2 = self.remote('selftest', 'remote_from_orchestrator_cli_self_test', "OrchestratorError") try: raise_if_exception(e2) assert False except OrchestratorError as e: - assert e.args == ('hello', 'world') + assert e.args == ('hello, world',) c = TrivialReadCompletion(result=True) assert c.has_result diff --git a/src/pybind/mgr/selftest/module.py b/src/pybind/mgr/selftest/module.py index c6decf2fc8cfa..6603a603e7e1f 100644 --- a/src/pybind/mgr/selftest/module.py +++ b/src/pybind/mgr/selftest/module.py @@ -431,11 +431,11 @@ class Module(MgrModule): import orchestrator if what == 'OrchestratorError': c = orchestrator.TrivialReadCompletion(result=None) - c.fail(orchestrator.OrchestratorError('hello', 'world')) + c.fail(orchestrator.OrchestratorError('hello, world')) return c elif what == "ZeroDivisionError": c = orchestrator.TrivialReadCompletion(result=None) - c.fail(ZeroDivisionError('hello', 'world')) + c.fail(ZeroDivisionError('hello, world')) return c assert False, repr(what) -- 2.39.5