]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/progress: introduce the `progress` module
authorJohn Spray <john.spray@redhat.com>
Wed, 27 Sep 2017 18:17:18 +0000 (14:17 -0400)
committerJohn Spray <john.spray@redhat.com>
Tue, 17 Jul 2018 11:51:32 +0000 (12:51 +0100)
Signed-off-by: John Spray <john.spray@redhat.com>
src/pybind/mgr/progress/__init__.py [new file with mode: 0644]
src/pybind/mgr/progress/module.py [new file with mode: 0644]

diff --git a/src/pybind/mgr/progress/__init__.py b/src/pybind/mgr/progress/__init__.py
new file mode 100644 (file)
index 0000000..bd1113b
--- /dev/null
@@ -0,0 +1,2 @@
+
+from module import *
diff --git a/src/pybind/mgr/progress/module.py b/src/pybind/mgr/progress/module.py
new file mode 100644 (file)
index 0000000..cd57d12
--- /dev/null
@@ -0,0 +1,426 @@
+from mgr_module import MgrModule
+import threading
+import datetime
+import uuid
+
+import json
+
+
+# How often to potentially write back any dirty events (event history
+# persistence is best effort!
+PERSIST_PERIOD = 5
+
+ENCODING_VERSION = 1
+
+
+class Event(object):
+    """
+    A generic "event" that has a start time, completion percentage,
+    and a list of "refs" that are (type, id) tuples describing which
+    objects (osds, pools) this relates to.
+    """
+
+    def __init__(self, message, refs):
+        self._message = message
+        self._refs = refs
+
+        self.started_at = datetime.datetime.utcnow()
+
+        self.id = None
+
+    @property
+    def message(self):
+        return self._message
+
+    @property
+    def refs(self):
+        return self._refs
+
+    @property
+    def progress(self):
+        raise NotImplementedError()
+
+    def summary(self):
+        return "{0} {1}".format(self.progress, self._message)
+
+    def _progress_str(self, width):
+        inner_width = width - 2
+        out = "["
+        done_chars = int(self.progress * inner_width)
+        out += done_chars * '='
+        out += (inner_width - done_chars) * '.'
+        out += "]"
+
+        return out
+
+    def twoline_progress(self):
+        """
+        e.g.
+
+        - Eating my delicious strudel
+            [===============..............]
+
+        """
+        return "{0}\n    {1}".format(
+            self._message, self._progress_str(30))
+
+
+class GhostEvent(Event):
+    """
+    The ghost of a completed event: these are the fields that we persist
+    after the event is complete.
+    """
+
+    def __init__(self, my_id, message, refs):
+        super(GhostEvent, self).__init__(message, refs)
+        self.id = my_id
+
+    @property
+    def progress(self):
+        return 1.0
+
+    def encode(self):
+        return {
+            "id": self.id,
+            "message": self.message,
+            "refs": self._refs
+        }
+
+
+class RemoteEvent(Event):
+    """
+    An event that was published by another module: we know nothing about
+    this, rely on the other module to continuously update us with
+    progress information as it emerges.
+    """
+
+    def __init__(self, my_id, message, refs):
+        super(RemoteEvent, self).__init__(message, refs)
+        self.id = my_id
+        self._progress = 0.0
+
+    def set_progress(self, progress):
+        self._progress = progress
+
+    @property
+    def progress(self):
+        return self._progress
+
+
+class PgRecoveryEvent(Event):
+    """
+    An event whose completion is determined by the recovery of a set of
+    PGs to a healthy state.
+
+    Always call update() immediately after construction.
+    """
+
+    def __init__(self, message, refs, which_pgs, evactuate_osds):
+        super(PgRecoveryEvent, self).__init__(message, refs)
+
+        self._pgs = which_pgs
+
+        self._evacuate_osds = evactuate_osds
+
+        self._original_pg_count = len(self._pgs)
+
+        self._original_bytes_recovered = None
+
+        self._progress = 0.0
+
+        self.id = str(uuid.uuid4())
+
+    def pg_update(self, pg_dump, log):
+        # FIXME: O(pg_num) in python
+        # FIXME: far more fields getting pythonized than we really care about
+        pg_to_state = dict([(p['pgid'], p) for p in pg_dump['pg_stats']])
+
+        if self._original_bytes_recovered is None:
+            self._original_bytes_recovered = {}
+            for pg in self._pgs:
+                pg_str = str(pg)
+                log.debug(json.dumps(pg_to_state[pg_str], indent=2))
+                self._original_bytes_recovered[pg] = \
+                    pg_to_state[pg_str]['stat_sum']['num_bytes_recovered']
+
+        complete_accumulate = 0.0
+
+        # Calculating progress as the number of PGs recovered divided by the
+        # original where partially completed PGs count for something
+        # between 0.0-1.0. This is perhaps less faithful than lookign at the
+        # total number of bytes recovered, but it does a better job of
+        # representing the work still to do if there are a number of very
+        # few-bytes PGs that still need the housekeeping of their recovery
+        # to be done. This is subjective...
+
+        complete = set()
+        for pg in self._pgs:
+            pg_str = str(pg)
+            info = pg_to_state[pg_str]
+            state = info['state']
+
+            states = state.split("+")
+
+            unmoved = bool(set(self._evacuate_osds) & (
+                set(info['up']) | set(info['acting'])))
+
+            if "active" in states and "clean" in states and not unmoved:
+                complete.add(pg)
+            else:
+                # FIXME: handle apparent negative progress
+                #   (e.g. if num_bytes_recovered goes backwards)
+                # FIXME: handle apparent >1.0 progress
+                #   (num_bytes_recovered incremented by more than num_bytes)
+                if info['stat_sum']['num_bytes'] == 0:
+                    # Empty PGs are considered 0% done until they are
+                    # in the correct state.
+                    pass
+                else:
+                    recovered = info['stat_sum']['num_bytes_recovered']
+                    total_bytes = info['stat_sum']['num_bytes']
+                    ratio = float(recovered -
+                                  self._original_bytes_recovered[pg]) / \
+                        total_bytes
+
+                    complete_accumulate += ratio
+
+        self._pgs = list(set(self._pgs) ^ complete)
+        completed_pgs = self._original_pg_count - len(self._pgs)
+        self._progress = (completed_pgs + complete_accumulate)\
+            / self._original_pg_count
+
+        log.info("Updated progress to {0} ({1})".format(
+            self._progress, self._message
+        ))
+
+    @property
+    def progress(self):
+        return self._progress
+
+
+class PgId(object):
+    def __init__(self, pool_id, ps):
+        self.pool_id = pool_id
+        self.ps = ps
+
+    def __cmp__(self, other):
+        return (self.pool_id, self.ps) == (other.pool_id, other.ps)
+
+    def __lt__(self, other):
+        return (self.pool_id, self.ps) < (other.pool_id, other.ps)
+
+    def __str__(self):
+        return "{0}.{1:x}".format(self.pool_id, self.ps)
+
+
+class Module(MgrModule):
+    COMMANDS = [
+        {"cmd": "progress",
+         "desc": "Show progress of recovery operations",
+         "perm": "r"},
+        {"cmd": "progress clear",
+         "desc": "Reset progress tracking",
+         "perm": "rw"}
+    ]
+
+    def __init__(self, *args, **kwargs):
+        super(Module, self).__init__(*args, **kwargs)
+
+        self._events = {}
+        self._completed_events = []
+
+        self._old_osd_map = None
+
+        self._ready = threading.Event()
+        self._shutdown = threading.Event()
+
+        self._latest_osdmap = None
+
+        self._dirty = False
+
+    def _osd_out(self, old_map, old_dump, new_map, osd_id):
+        affected_pgs = []
+        for pool in old_dump['pools']:
+            pool_id = pool['pool']
+            for ps in range(0, pool['pg_num']):
+                self.log.debug("pool_id, ps = {0}, {1}".format(
+                    pool_id, ps
+                ))
+                up_acting = old_map.pg_to_up_acting_osds(pool['pool'], ps)
+                self.log.debug(
+                    "up_acting: {0}".format(json.dumps(up_acting, indent=2)))
+                if osd_id in up_acting['up'] or osd_id in up_acting['acting']:
+                    affected_pgs.append(PgId(pool_id, ps))
+
+        self.log.warn("{0} PGs affected by osd.{1} going out".format(
+            len(affected_pgs), osd_id))
+
+        if len(affected_pgs) == 0:
+            # Don't emit events if there were no PGs
+            return
+
+        # TODO: reconcile with existing events referring to this OSD going out
+        ev = PgRecoveryEvent(
+            "Rebalancing after OSD {0} marked out".format(osd_id),
+            refs=[("osd", osd_id)],
+            which_pgs=affected_pgs,
+            evactuate_osds=[osd_id]
+        )
+        ev.pg_update(self.get("pg_dump"), self.log)
+        self._events[ev.id] = ev
+
+    def _osdmap_changed(self, old_osdmap, new_osdmap):
+        old_dump = old_osdmap.dump()
+        new_dump = new_osdmap.dump()
+
+        old_osds = dict([(o['osd'], o) for o in old_dump['osds']])
+
+        for osd in new_dump['osds']:
+            osd_id = osd['osd']
+            new_weight = osd['in']
+            if osd_id in old_osds and new_weight == 0.0:
+                old_weight = old_osds[osd_id]['in']
+
+                if old_weight > new_weight:
+                    self.log.warn("osd.{0} marked out".format(osd_id))
+                    self._osd_out(old_osdmap, old_dump, new_osdmap, osd_id)
+
+    def notify(self, notify_type, notify_data):
+        self._ready.wait()
+
+        if notify_type == "osd_map":
+            old_osdmap = self._latest_osdmap
+            self._latest_osdmap = self.get_osdmap()
+
+            self.log.info("Processing OSDMap change {0}..{1}".format(
+                old_osdmap.get_epoch(), self._latest_osdmap.get_epoch()
+            ))
+            self._osdmap_changed(old_osdmap, self._latest_osdmap)
+        elif notify_type == "pg_summary":
+            data = self.get("pg_dump")
+            for ev_id, ev in self._events.items():
+                if isinstance(ev, PgRecoveryEvent):
+                    ev.pg_update(data, self.log)
+        else:
+            self.log.debug("Ignore notification '{0}'".format(notify_type))
+            pass
+
+    def _persist(self):
+        self.log.info("Writing back {0} completed events".format(
+            len(self._completed_events)
+        ))
+        # TODO: bound the number we store.
+        encoded = json.dumps({
+            "events": [ev.encode() for ev in self._completed_events],
+            "version": ENCODING_VERSION,
+            "compat_version": ENCODING_VERSION
+        })
+        self.set_store("completed", encoded)
+
+    def _unpersist(self):
+        stored = self.get_store("completed")
+
+        if stored is None:
+            self.log.info("No stored events to load")
+            return
+
+        decoded = json.loads(stored)
+        if decoded['compat_version'] > ENCODING_VERSION:
+            raise RuntimeError("Cannot decode version {0}".format(
+                               decoded['compat_version']))
+
+        for ev in decoded['events']:
+            self._completed_events.append(GhostEvent(ev['id'], ev['message'], ev['refs']))
+
+    def serve(self):
+        self.log.info("Loading...")
+
+        self._unpersist()
+        self.log.info("Loaded {0} historic events".format(self._completed_events))
+
+        self._latest_osdmap = self.get_osdmap()
+        self.log.info("Loaded OSDMap, ready.")
+
+        self._ready.set()
+
+        while True:
+            if self._dirty:
+                self._persist()
+                self._dirty = False
+
+            self._shutdown.wait(timeout=PERSIST_PERIOD)
+
+        self._shutdown.wait()
+
+    def shutdown(self):
+        self._shutdown.set()
+
+    def update(self, ev_id, ev_msg, ev_progress):
+        """
+        For calling from other mgr modules
+        """
+        try:
+            ev = self._events[ev_id]
+        except KeyError:
+            ev = RemoteEvent(ev_id, ev_msg, [])
+            self._events[ev_id] = ev
+            self.log.info("update: starting ev {0} ({1})".format(
+                ev_id, ev_msg))
+        else:
+            self.log.debug("update: {0} on {1}".format(
+                ev_progress, ev_msg))
+
+        ev.set_progress(ev_progress)
+
+    def complete(self, ev_id):
+        """
+        For calling from other mgr modules
+        """
+        try:
+            ev = self._events[ev_id]
+            ev.set_progress(1.0)
+            self.log.info("complete: finished ev {0} ({1})".format(ev_id,
+                                                                   ev.message))
+            self._completed_events.append(
+                GhostEvent(ev.id, ev.message, ev.refs))
+            self._dirty = True
+            del self._events[ev_id]
+
+        except KeyError:
+            self.log.warn("complete: ev {0} does not exist".format(ev_id))
+            pass
+
+    def _handle_ls(self):
+        if len(self._events) or len(self._completed_events):
+            out = ""
+            chrono_order = sorted(self._events.values(),
+                                  key=lambda x: x.started_at, reverse=True)
+            for ev in chrono_order:
+                out += ev.twoline_progress()
+                out += "\n"
+
+            if len(self._completed_events):
+                # TODO: limit number of completed events to show
+                out += "\n"
+                for ev in self._completed_events:
+                    out += "[Complete]: {0}\n".format(ev.twoline_progress())
+
+            return 0, out, ""
+        else:
+            return 0, "", "Nothing in progress"
+
+    def _handle_clear(self):
+        self._events = {}
+        return 0, "", ""
+
+    def handle_command(self, inbuf, cmd):
+        if cmd['prefix'] == "progress":
+            return self._handle_ls()
+        if cmd['prefix'] == "progress clear":
+            # The clear command isn't usually needed - it's to enable
+            # the admin to "kick" this module if it seems to have done
+            # something wrong (e.g. we have a bug causing a progress event
+            # that never finishes)
+            return self._handle_clear()
+        else:
+            raise NotImplementedError(cmd['prefix'])