]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/insights: add insights reporting plugin
authorNoah Watkins <noahwatkins@gmail.com>
Mon, 30 Jul 2018 21:47:18 +0000 (14:47 -0700)
committerNoah Watkins <nwatkins@redhat.com>
Thu, 16 Aug 2018 22:24:01 +0000 (15:24 -0700)
Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
src/pybind/mgr/insights/__init__.py [new file with mode: 0644]
src/pybind/mgr/insights/health.py [new file with mode: 0644]
src/pybind/mgr/insights/module.py [new file with mode: 0644]

diff --git a/src/pybind/mgr/insights/__init__.py b/src/pybind/mgr/insights/__init__.py
new file mode 100644 (file)
index 0000000..8f210ac
--- /dev/null
@@ -0,0 +1 @@
+from .module import Module
diff --git a/src/pybind/mgr/insights/health.py b/src/pybind/mgr/insights/health.py
new file mode 100644 (file)
index 0000000..866c4b6
--- /dev/null
@@ -0,0 +1,191 @@
+import json
+import six
+from collections import defaultdict
+import datetime
+
+# freq to write cached state to disk
+PERSIST_PERIOD = datetime.timedelta(seconds = 10)
+# on disk key prefix
+HEALTH_HISTORY_KEY_PREFIX = "health_history/"
+# apply on offset to "now": used for testing
+NOW_OFFSET = None
+
+class HealthEncoder(json.JSONEncoder):
+    def default(self, obj):
+        if isinstance(obj, set):
+            return list(obj)
+        return json.JSONEncoder.default(self, obj)
+
+class HealthCheckAccumulator(object):
+    """
+    Deuplicated storage of health checks.
+    """
+    def __init__(self, init_checks = None):
+        # check : severity : { summary, detail }
+        # summary and detail are deduplicated
+        self._checks = defaultdict(lambda:
+            defaultdict(lambda: {
+                "summary": set(),
+                "detail": set()
+            }))
+
+        if init_checks:
+            self._update(init_checks)
+
+    def __str__(self):
+        return "check count {}".format(len(self._checks))
+
+    def add(self, checks):
+        """
+        Add health checks to the current state
+
+        Returns:
+            bool: True if the state changed, False otherwise.
+        """
+        changed = False
+
+        for check, info in six.iteritems(checks):
+
+            # only keep the icky stuff
+            severity = info["severity"]
+            if severity == "HEALTH_OK":
+                continue
+
+            summary = info["summary"]["message"]
+            details = map(lambda d: d["message"], info["detail"])
+
+            if self._add_check(check, severity, [summary], details):
+                changed = True
+
+        return changed
+
+    def checks(self):
+        return self._checks
+
+    def merge(self, other):
+        assert isinstance(other, HealthCheckAccumulator)
+        self._update(other._checks)
+
+    def _update(self, checks):
+        """Merge checks with same structure. Does not set dirty bit"""
+        for check in checks:
+            for severity in checks[check]:
+                summaries = set(checks[check][severity]["summary"])
+                details = set(checks[check][severity]["detail"])
+                self._add_check(check, severity, summaries, details)
+
+    def _add_check(self, check, severity, summaries, details):
+        changed = False
+
+        for summary in summaries:
+            if summary not in self._checks[check][severity]["summary"]:
+                changed = True
+                self._checks[check][severity]["summary"].add(summary)
+
+        for detail in details:
+            if detail not in self._checks[check][severity]["detail"]:
+                changed = True
+                self._checks[check][severity]["detail"].add(detail)
+
+        return changed
+
+class HealthHistorySlot(object):
+    """
+    Manage the life cycle of a health history time slot.
+
+    A time slot is a fixed slice of wall clock time (e.g. every hours, from :00
+    to :59), and all health updates that occur during this time are deduplicated
+    together. A slot is initially in a clean state, and becomes dirty when a new
+    health check is observed. The state of a slot should be persisted when
+    need_flush returns true. Once the state has been flushed, reset the dirty
+    bit by calling mark_flushed.
+    """
+    def __init__(self, init_health = dict()):
+        self._checks = HealthCheckAccumulator(init_health.get("checks"))
+        self._slot = self._curr_slot()
+        self._next_flush = None
+
+    def __str__(self):
+        return "key {} next flush {} checks {}".format(
+            self.key(), self._next_flush, self._checks)
+
+    def health(self):
+        return dict(checks = self._checks.checks())
+
+    def key(self):
+        """Identifer in the persist store"""
+        return self._key(self._slot)
+
+    def expired(self):
+        """True if this slot is the current slot, False otherwise"""
+        return self._slot != self._curr_slot()
+
+    def need_flush(self):
+        """True if this slot needs to be flushed, False otherwise"""
+        now = HealthHistorySlot._now()
+        if self._next_flush is not None:
+            if self._next_flush <= now or self.expired():
+                return True
+        return False
+
+    def mark_flushed(self):
+        """Reset the dirty bit. Caller persists state"""
+        assert self._next_flush
+        self._next_flush = None
+
+    def add(self, health):
+        """
+        Add health to the underlying health accumulator. When the slot
+        transitions from clean to dirty a target flush time is computed.
+        """
+        changed = self._checks.add(health["checks"])
+        if changed and not self._next_flush:
+            self._next_flush = HealthHistorySlot._now() + PERSIST_PERIOD
+        return changed
+
+    def merge(self, other):
+        assert isinstance(other, HealthHistorySlot)
+        self._checks.merge(other._checks)
+
+    @staticmethod
+    def key_range(hours):
+        """Return the time slot keys for the past N hours"""
+        def inner(curr, hours):
+            slot = curr - datetime.timedelta(hours = hours)
+            return HealthHistorySlot._key(slot)
+        curr = HealthHistorySlot._curr_slot()
+        return map(lambda i: inner(curr, i), range(hours))
+
+    @staticmethod
+    def curr_key():
+        """Key for the current UTC time slot"""
+        return HealthHistorySlot._key(HealthHistorySlot._curr_slot())
+
+    @staticmethod
+    def key_to_time(key):
+        """Return key converted into datetime"""
+        timestr = key[len(HEALTH_HISTORY_KEY_PREFIX):]
+        return datetime.datetime.strptime(timestr, "%Y-%m-%d_%H")
+
+    @staticmethod
+    def _key(dt):
+        """Key format. Example: health_2018_11_05_00"""
+        return HEALTH_HISTORY_KEY_PREFIX + dt.strftime("%Y-%m-%d_%H")
+
+    @staticmethod
+    def _now():
+        """Control now time for easier testing"""
+        now = datetime.datetime.utcnow()
+        if NOW_OFFSET is not None:
+            now = now + NOW_OFFSET
+        return now
+
+    @staticmethod
+    def _curr_slot():
+        """Slot for the current UTC time"""
+        dt = HealthHistorySlot._now()
+        return datetime.datetime(
+            year  = dt.year,
+            month = dt.month,
+            day   = dt.day,
+            hour  = dt.hour)
diff --git a/src/pybind/mgr/insights/module.py b/src/pybind/mgr/insights/module.py
new file mode 100644 (file)
index 0000000..eb6e7e2
--- /dev/null
@@ -0,0 +1,298 @@
+import datetime
+import json
+import re
+import threading
+import six
+from mgr_module import MgrModule, CommandResult
+import health as health_util
+
+# hours of crash history to report
+CRASH_HISTORY_HOURS = 24
+# hours of health history to report
+HEALTH_HISTORY_HOURS = 24
+# how many hours of health history to keep
+HEALTH_RETENTION_HOURS = 30
+# health check name for insights health
+INSIGHTS_HEALTH_CHECK = "MGR_INSIGHTS_WARNING"
+# version tag for persistent data format
+ON_DISK_VERSION = 1
+
+class Module(MgrModule):
+    COMMANDS = [
+        {
+            "cmd": "insights",
+            "desc": "Retreive insights report",
+            "perm": "r",
+            "poll": "false",
+        },
+        {
+            'cmd': 'insights prune-health name=hours,type=CephString',
+            'desc': 'Remove health history older than <hours> hours',
+            'perm': 'rw',
+            "poll": "false",
+        },
+    ]
+
+    def __init__(self, *args, **kwargs):
+        super(Module, self).__init__(*args, **kwargs)
+
+        self._shutdown = False
+        self._evt = threading.Event()
+
+        # health history tracking
+        self._pending_health = []
+        self._health_slot = None
+
+    def notify(self, ttype, ident):
+        """Queue updates for processing"""
+        if ttype == "health":
+            self.log.info("Received health check update {} pending".format(
+                len(self._pending_health)))
+            health = json.loads(self.get("health")["json"])
+            self._pending_health.append(health)
+            self._evt.set()
+
+    def serve(self):
+        self._health_reset()
+        while True:
+            self._evt.wait(health_util.PERSIST_PERIOD.total_seconds())
+            self._evt.clear()
+            if self._shutdown:
+                break
+
+            # when the current health slot expires, finalize it by flushing it to
+            # the store, and initializing a new empty slot.
+            if self._health_slot.expired():
+                self.log.info("Health history slot expired {}".format(
+                    self._health_slot))
+                self._health_maybe_flush()
+                self._health_reset()
+                self._health_prune_history(HEALTH_RETENTION_HOURS)
+
+            # fold in pending health snapshots and flush
+            self.log.info("Applying {} health updates to slot {}".format(
+                len(self._pending_health), self._health_slot))
+            for health in self._pending_health:
+                self._health_slot.add(health)
+            self._pending_health = []
+            self._health_maybe_flush()
+
+    def shutdown(self):
+        self._shutdown = True
+        self._evt.set()
+
+    def _health_reset(self):
+        """Initialize the current health slot
+
+        The slot will be initialized with any state found to have already been
+        persisted, otherwise the slot will start empty.
+        """
+        key = health_util.HealthHistorySlot.curr_key()
+        data = self.get_store(key)
+        if data:
+            init_health = json.loads(data)
+            self._health_slot = health_util.HealthHistorySlot(init_health)
+        else:
+            self._health_slot = health_util.HealthHistorySlot()
+        self.log.info("Reset curr health slot {}".format(self._health_slot))
+
+    def _health_maybe_flush(self):
+        """Store the health for the current time slot if needed"""
+
+        self.log.info("Maybe flushing slot {} needed {}".format(
+            self._health_slot, self._health_slot.need_flush()))
+
+        if self._health_slot.need_flush():
+            key = self._health_slot.key()
+
+            # build store data entry
+            slot = self._health_slot.health()
+            assert "version" not in slot
+            slot.update(dict(version = ON_DISK_VERSION))
+            data = json.dumps(slot, cls=health_util.HealthEncoder)
+
+            self.log.debug("Storing health key {} data {}".format(
+                key, json.dumps(slot, indent=2, cls=health_util.HealthEncoder)))
+
+            self.set_store(key, data)
+            self._health_slot.mark_flushed()
+
+    def _health_filter(self, f):
+        """Filter hourly health reports timestamp"""
+        matches = filter(
+            lambda t: f(health_util.HealthHistorySlot.key_to_time(t[0])),
+            six.iteritems(self.get_store_prefix(health_util.HEALTH_HISTORY_KEY_PREFIX)))
+        return map(lambda t: t[0], matches)
+
+    def _health_prune_history(self, hours):
+        """Prune old health entries"""
+        cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours = hours)
+        for key in self._health_filter(lambda ts: ts <= cutoff):
+            self.log.info("Removing old health slot key {}".format(key))
+            self.set_store(key, None)
+
+    def _health_report(self, hours):
+        """
+        Report a consolidated health report for the past N hours.
+        """
+        # roll up the past N hours of health info
+        collector = health_util.HealthHistorySlot()
+        keys = health_util.HealthHistorySlot.key_range(hours)
+        for key in keys:
+            data = self.get_store(key)
+            self.log.info("Reporting health key {} found {}".format(
+                key, bool(data)))
+            health = json.loads(data) if data else {}
+            slot = health_util.HealthHistorySlot(health)
+            collector.merge(slot)
+
+        # include history that hasn't yet been flushed
+        collector.merge(self._health_slot)
+
+        return dict(
+           current = json.loads(self.get("health")["json"]),
+           history = collector.health()
+        )
+
+    def _version_parse(self, version):
+        """
+        Return the components of a Ceph version string.
+
+        This returns nothing when the verison string cannot be parsed into its
+        constituent components, such as when Ceph has been built with
+        ENABLE_GIT_VERSION=OFF.
+        """
+        r = "ceph version (?P<release>\d+)\.(?P<major>\d+)\.(?P<minor>\d+)"
+        m = re.match(r, version)
+        ver = {} if not m else {
+            "release": m.group("release"),
+            "major": m.group("major"),
+            "minor": m.group("minor")
+        }
+        return { k:int(v) for k,v in six.iteritems(ver) }
+
+    def _crash_history(self, hours):
+        """
+        Load crash history for the past N hours from the crash module.
+        """
+        params = dict(
+            prefix = "crash json_report",
+            hours = hours
+        )
+
+        result = dict(
+            summary = {},
+            hours = params["hours"],
+        )
+
+        health_check_details = []
+
+        try:
+            _, _, crashes = self.remote("crash", "handle_command", "", params)
+            result["summary"] = json.loads(crashes)
+        except Exception as e:
+            errmsg = "failed to invoke crash module"
+            self.log.warning("{}: {}".format(errmsg, str(e)))
+            health_check_details.append(errmsg)
+        else:
+            self.log.debug("Crash module invocation succeeded {}".format(
+                json.dumps(result["summary"], indent=2)))
+
+        return result, health_check_details
+
+    def _config_dump(self):
+        """Report cluster configuration
+
+        This report is the standard `config dump` report. It does not include
+        configuration defaults; these can be inferred from the version number.
+        """
+        result = CommandResult("")
+        args = dict(prefix = "config dump", format = "json")
+        self.send_command(result, "mon", "", json.dumps(args), "")
+        ret, outb, outs = result.wait()
+        if ret == 0:
+            return json.loads(outb), []
+        else:
+            self.log.warning("send_command 'config dump' failed. \
+                    ret={}, outs=\"{}\"".format(ret, outs))
+            return [], ["Failed to read monitor config dump"]
+
+    def do_report(self, inbuf, command):
+        health_check_details = []
+        report = {}
+
+        report.update({
+            "version": dict(full = self.version,
+                **self._version_parse(self.version))
+        })
+
+        # crash history
+        crashes, health_details = self._crash_history(CRASH_HISTORY_HOURS)
+        report["crashes"] = crashes
+        health_check_details.extend(health_details)
+
+        # health history
+        report["health"] = self._health_report(HEALTH_HISTORY_HOURS)
+
+        # cluster configuration
+        config, health_details = self._config_dump()
+        report["config"] = config
+        health_check_details.extend(health_details)
+
+        osd_map = self.get("osd_map")
+        del osd_map['pg_temp']
+        report["osd_dump"] = osd_map
+
+        report["df"] = self.get("df")
+        report["osd_tree"] = self.get("osd_map_tree")
+        report["fs_map"] = self.get("fs_map")
+        report["crush_map"] = self.get("osd_map_crush")
+        report["mon_map"] = self.get("mon_map")
+        report["service_map"] = self.get("service_map")
+        report["manager_map"] = self.get("mgr_map")
+        report["mon_status"] = json.loads(self.get("mon_status")["json"])
+        report["pg_summary"] = self.get("pg_summary")
+        report["osd_metadata"] = self.get("osd_metadata")
+
+        report.update({
+            "errors": health_check_details
+        })
+
+        if health_check_details:
+            self.set_health_checks({
+                INSIGHTS_HEALTH_CHECK: {
+                    "severity": "warning",
+                    "summary": "Generated incomplete Insights report",
+                    "detail": health_check_details
+                }
+            })
+
+        return 0, json.dumps(report, indent=2, cls=health_util.HealthEncoder), ""
+
+    def do_prune_health(self, inbuf, command):
+        try:
+            hours = int(command['hours'])
+        except ValueError:
+            return errno.EINVAL, '', 'hours argument must be integer'
+
+        self._health_prune_history(hours)
+
+        return 0, "", ""
+
+    def testing_set_now_time_offset(self, hours):
+        """
+        Control what "now" time it is by applying an offset. This is called from
+        the selftest module to manage testing scenarios related to tracking
+        health history.
+        """
+        hours = long(hours)
+        health_util.NOW_OFFSET = datetime.timedelta(hours = hours)
+        self.log.warning("Setting now time offset {}".format(health_util.NOW_OFFSET))
+
+    def handle_command(self, inbuf, command):
+        if command["prefix"] == "insights":
+            return self.do_report(inbuf, command)
+        elif command["prefix"] == "insights prune-health":
+            return self.do_prune_health(inbuf, command)
+        else:
+            raise NotImplementedError(cmd["prefix"])