]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: Set exception context to populate orch events. 35456/head
authorSebastian Wagner <sebastian.wagner@suse.com>
Sat, 6 Jun 2020 23:25:21 +0000 (01:25 +0200)
committerSebastian Wagner <sebastian.wagner@suse.com>
Thu, 16 Jul 2020 09:39:44 +0000 (11:39 +0200)
Like when if daemon deployment fails

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/orchestrator/__init__.py
src/pybind/mgr/orchestrator/_interface.py
src/pybind/mgr/orchestrator/module.py
src/pybind/mgr/selftest/module.py

index caa6de4b4c8bc63a7a909f60eaeb8e9225bc6f57..73459d3f2336b265f7f6fab9ee4664e35c66ee20 100644 (file)
@@ -29,7 +29,7 @@ from cephadm.services.cephadmservice import CephadmDaemonSpec
 from mgr_module import MgrModule, HandleCommandResult
 import orchestrator
 from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
-    CLICommandMeta, OrchestratorEvent
+    CLICommandMeta, OrchestratorEvent, set_exception_subject, DaemonDescription
 from orchestrator._interface import GenericSpec
 
 from . import remotes
@@ -472,38 +472,50 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.log.debug("serve starting")
         while self.run:
 
-            # refresh daemons
-            self.log.debug('refreshing hosts and daemons')
-            self._refresh_hosts_and_daemons()
+            try:
 
-            self._check_for_strays()
+                # refresh daemons
+                self.log.debug('refreshing hosts and daemons')
+                self._refresh_hosts_and_daemons()
 
-            if self.paused:
-                self.health_checks['CEPHADM_PAUSED'] = {
-                    'severity': 'warning',
-                    'summary': 'cephadm background work is paused',
-                    'count': 1,
-                    'detail': ["'ceph orch resume' to resume"],
-                }
-                self.set_health_checks(self.health_checks)
-            else:
-                if 'CEPHADM_PAUSED' in self.health_checks:
-                    del self.health_checks['CEPHADM_PAUSED']
+                self._check_for_strays()
+
+                if self.paused:
+                    self.health_checks['CEPHADM_PAUSED'] = {
+                        'severity': 'warning',
+                        'summary': 'cephadm background work is paused',
+                        'count': 1,
+                        'detail': ["'ceph orch resume' to resume"],
+                    }
                     self.set_health_checks(self.health_checks)
+                else:
+                    if 'CEPHADM_PAUSED' in self.health_checks:
+                        del self.health_checks['CEPHADM_PAUSED']
+                        self.set_health_checks(self.health_checks)
 
-                self.rm_util._remove_osds_bg()
+                    self.rm_util._remove_osds_bg()
 
-                self.migration.migrate()
-                if self.migration.is_migration_ongoing():
-                    continue
+                    self.migration.migrate()
+                    if self.migration.is_migration_ongoing():
+                        continue
 
-                if self._apply_all_services():
-                    continue  # did something, refresh
+                    if self._apply_all_services():
+                        continue  # did something, refresh
 
-                self._check_daemons()
+                    self._check_daemons()
 
-                if self.upgrade.continue_upgrade():
-                    continue
+                    if self.upgrade.continue_upgrade():
+                        continue
+
+            except OrchestratorError as e:
+                if e.event_subject:
+                    self.events.add(OrchestratorEvent(
+                        datetime.datetime.utcnow(),
+                        e.event_subject[0],
+                        e.event_subject[1],
+                        "ERROR",
+                        str(e)
+                    ))
 
             self._serve_sleep()
         self.log.debug("serve exit")
@@ -1071,7 +1083,7 @@ you may want to run:
             if err:
                 self.log.debug('err: %s' % '\n'.join(err))
             if code and not error_ok:
-                raise RuntimeError(
+                raise OrchestratorError(
                     'cephadm exited with an error code: %d, stderr:%s' % (
                         code, '\n'.join(err)))
             return out, err, code
@@ -1467,7 +1479,11 @@ you may want to run:
 
     @forall_hosts
     def _daemon_actions(self, daemon_type, daemon_id, host, action):
-        return self._daemon_action(daemon_type, daemon_id, host, action)
+        with set_exception_subject('daemon', DaemonDescription(
+            daemon_type=daemon_type,
+            daemon_id=daemon_id
+        ).name()):
+            return self._daemon_action(daemon_type, daemon_id, host, action)
 
     def _daemon_action(self, daemon_type, daemon_id, host, action):
         daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
@@ -1728,58 +1744,64 @@ you may want to run:
                        ) -> str:
 
 
-        start_time = datetime.datetime.utcnow()
-        cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(daemon_spec)
+        with set_exception_subject('service', orchestrator.DaemonDescription(
+                daemon_type=daemon_spec.daemon_type,
+                daemon_id=daemon_spec.daemon_id,
+                hostname=daemon_spec.host,
+        ).service_id(), overwrite=True):
+
+            start_time = datetime.datetime.utcnow()
+            cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(daemon_spec)
 
-        daemon_spec.extra_args.extend(['--config-json', '-'])
+            daemon_spec.extra_args.extend(['--config-json', '-'])
 
-        # osd deployments needs an --osd-uuid arg
-        if daemon_spec.daemon_type == 'osd':
-            if not osd_uuid_map:
-                osd_uuid_map = self.get_osd_uuid_map()
-            osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
-            if not osd_uuid:
-                raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
-            daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
+            # osd deployments needs an --osd-uuid arg
+            if daemon_spec.daemon_type == 'osd':
+                if not osd_uuid_map:
+                    osd_uuid_map = self.get_osd_uuid_map()
+                osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
+                if not osd_uuid:
+                    raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
+                daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
 
-        if reconfig:
-            daemon_spec.extra_args.append('--reconfig')
-        if self.allow_ptrace:
-            daemon_spec.extra_args.append('--allow-ptrace')
+            if reconfig:
+                daemon_spec.extra_args.append('--reconfig')
+            if self.allow_ptrace:
+                daemon_spec.extra_args.append('--allow-ptrace')
 
-        self.log.info('%s daemon %s on %s' % (
-            'Reconfiguring' if reconfig else 'Deploying',
-            daemon_spec.name(), daemon_spec.host))
+            self.log.info('%s daemon %s on %s' % (
+                'Reconfiguring' if reconfig else 'Deploying',
+                daemon_spec.name(), daemon_spec.host))
 
-        out, err, code = self._run_cephadm(
-            daemon_spec.host, daemon_spec.name(), 'deploy',
-            [
-                '--name', daemon_spec.name(),
-            ] + daemon_spec.extra_args,
-            stdin=json.dumps(cephadm_config))
-        if not code and daemon_spec.host in self.cache.daemons:
-            # prime cached service state with what we (should have)
-            # just created
-            sd = orchestrator.DaemonDescription()
-            sd.daemon_type = daemon_spec.daemon_type
-            sd.daemon_id = daemon_spec.daemon_id
-            sd.hostname = daemon_spec.host
-            sd.status = 1
-            sd.status_desc = 'starting'
-            self.cache.add_daemon(daemon_spec.host, sd)
-            if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
-                self.requires_post_actions.add(daemon_spec.daemon_type)
-        self.cache.invalidate_host_daemons(daemon_spec.host)
-        self.cache.update_daemon_config_deps(daemon_spec.host, daemon_spec.name(), deps, start_time)
-        self.cache.save_host(daemon_spec.host)
-        msg = "{} {} on host '{}'".format(
-            'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
-        if not code:
-            self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
-        else:
-            what = 'reconfigure' if reconfig else 'deploy'
-            self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
-        return msg
+            out, err, code = self._run_cephadm(
+                daemon_spec.host, daemon_spec.name(), 'deploy',
+                [
+                    '--name', daemon_spec.name(),
+                ] + daemon_spec.extra_args,
+                stdin=json.dumps(cephadm_config))
+            if not code and daemon_spec.host in self.cache.daemons:
+                # prime cached service state with what we (should have)
+                # just created
+                sd = orchestrator.DaemonDescription()
+                sd.daemon_type = daemon_spec.daemon_type
+                sd.daemon_id = daemon_spec.daemon_id
+                sd.hostname = daemon_spec.host
+                sd.status = 1
+                sd.status_desc = 'starting'
+                self.cache.add_daemon(daemon_spec.host, sd)
+                if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
+                    self.requires_post_actions.add(daemon_spec.daemon_type)
+            self.cache.invalidate_host_daemons(daemon_spec.host)
+            self.cache.update_daemon_config_deps(daemon_spec.host, daemon_spec.name(), deps, start_time)
+            self.cache.save_host(daemon_spec.host)
+            msg = "{} {} on host '{}'".format(
+                'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
+            if not code:
+                self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
+            else:
+                what = 'reconfigure' if reconfig else 'deploy'
+                self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
+            return msg
 
     @forall_hosts
     def _remove_daemons(self, name, host) -> str:
@@ -1791,17 +1813,24 @@ you may want to run:
         """
         (daemon_type, daemon_id) = name.split('.', 1)
 
-        self.cephadm_services[daemon_type].pre_remove(daemon_id)
+        with set_exception_subject('service', orchestrator.DaemonDescription(
+                daemon_type=daemon_type,
+                daemon_id=daemon_id,
+                hostname=host,
+        ).service_id(), overwrite=True):
 
-        args = ['--name', name, '--force']
-        self.log.info('Removing daemon %s from %s' % (name, host))
-        out, err, code = self._run_cephadm(
-            host, name, 'rm-daemon', args)
-        if not code:
-            # remove item from cache
-            self.cache.rm_daemon(host, name)
-        self.cache.invalidate_host_daemons(host)
-        return "Removed {} from host '{}'".format(name, host)
+
+            self.cephadm_services[daemon_type].pre_remove(daemon_id)
+
+            args = ['--name', name, '--force']
+            self.log.info('Removing daemon %s from %s' % (name, host))
+            out, err, code = self._run_cephadm(
+                host, name, 'rm-daemon', args)
+            if not code:
+                # remove item from cache
+                self.cache.rm_daemon(host, name)
+            self.cache.invalidate_host_daemons(host)
+            return "Removed {} from host '{}'".format(name, host)
 
     def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]:
         return {
@@ -1924,6 +1953,8 @@ you may want to run:
             except Exception as e:
                 self.log.exception('Failed to apply %s spec %s: %s' % (
                     spec.service_name(), spec, e))
+                self.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
+
         return r
 
     def _check_pool_exists(self, pool, service_name):
index fcb6a1313f2df15b9eddf3e3dc71e143bd10db9c..2d6cba8a15626bdf0cad9e33cf01ab8b6203f4e0 100644 (file)
@@ -14,6 +14,6 @@ from ._interface import \
     OrchestratorValidationError, OrchestratorError, NoOrchestrator, \
     ServiceDescription, InventoryFilter, HostSpec, \
     DaemonDescription, \
-    OrchestratorEvent, \
+    OrchestratorEvent, set_exception_subject, \
     InventoryHost, DeviceLightLoc, \
     UpgradeStatusSpec
index 0f0040907307d4f9a8cc63f3eb774434392e4171..ab9b5539b0ee4a58f4cf9e5be663854c57082e76 100644 (file)
@@ -15,6 +15,7 @@ import time
 import uuid
 
 from collections import namedtuple, OrderedDict
+from contextlib import contextmanager
 from functools import wraps
 
 import yaml
@@ -46,6 +47,10 @@ class OrchestratorError(Exception):
 
     It's not intended for programming errors or orchestrator internal errors.
     """
+    def __init__(self, msg, event_kind_subject: Optional[Tuple[str, str]]=None):
+        super(Exception, self).__init__(msg)
+        # See OrchestratorEvent.subject
+        self.event_subject = event_kind_subject
 
 
 class NoOrchestrator(OrchestratorError):
@@ -62,6 +67,16 @@ class OrchestratorValidationError(OrchestratorError):
     """
 
 
+@contextmanager
+def set_exception_subject(kind, subject, overwrite=False):
+    try:
+        yield
+    except OrchestratorError as e:
+        if overwrite or hasattr(e, 'event_subject'):
+            e.event_subject = (kind, subject)
+        raise
+
+
 def handle_exception(prefix, cmd_args, desc, perm, func):
     @wraps(func)
     def wrapper(*args, **kwargs):
index 3096d0fe8907080d8261e0bc4ac1f38213f65d87..93e7b97bfaa10b1e254699576045c90bd0a77d66 100644 (file)
@@ -1278,14 +1278,14 @@ Usage:
             raise_if_exception(e1)
             assert False
         except ZeroDivisionError as e:
-            assert e.args == ('hello', 'world')
+            assert e.args == ('hello, world',)
 
         e2 = self.remote('selftest', 'remote_from_orchestrator_cli_self_test', "OrchestratorError")
         try:
             raise_if_exception(e2)
             assert False
         except OrchestratorError as e:
-            assert e.args == ('hello', 'world')
+            assert e.args == ('hello, world',)
 
         c = TrivialReadCompletion(result=True)
         assert c.has_result
index c6decf2fc8cfaa008f7929d3926140efd8ef71e5..6603a603e7e1fb70e0ad81c3d203eed289db92e9 100644 (file)
@@ -431,11 +431,11 @@ class Module(MgrModule):
         import orchestrator
         if what == 'OrchestratorError':
             c = orchestrator.TrivialReadCompletion(result=None)
-            c.fail(orchestrator.OrchestratorError('hello', 'world'))
+            c.fail(orchestrator.OrchestratorError('helloworld'))
             return c
         elif what == "ZeroDivisionError":
             c = orchestrator.TrivialReadCompletion(result=None)
-            c.fail(ZeroDivisionError('hello', 'world'))
+            c.fail(ZeroDivisionError('helloworld'))
             return c
         assert False, repr(what)