]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/progress: fix PgRecoveryEvent completion cases
authorJohn Spray <john.spray@redhat.com>
Fri, 20 Jul 2018 12:22:23 +0000 (08:22 -0400)
committerJohn Spray <john.spray@redhat.com>
Tue, 11 Sep 2018 10:21:35 +0000 (11:21 +0100)
The event was previously not getting moved to the completed
list.  There are a couple more cases too:
 - When some pgs go away (a pool is removed) during the event
 - When the OSD comes back in after going out

Signed-off-by: John Spray <john.spray@redhat.com>
src/pybind/mgr/mgr_module.py
src/pybind/mgr/progress/module.py

index 700f624d1ac5688a22aea9cd12c8683ba12000d8..73e58f049d25efe826492a2045f046ed8c92f611 100644 (file)
@@ -136,7 +136,7 @@ class OSDMap(ceph_module.BasePyOSDMap):
         return self._map_pool_pgs_up(poolid)
 
     def pg_to_up_acting_osds(self, pool_id, ps):
-        return self._pg_to_up_acting_osds(self._handle, pool_id, ps)
+        return self._pg_to_up_acting_osds(pool_id, ps)
 
 
 class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
index cd57d12dc71ffd9a436ac79f0cd6cda5da118f87..b998b1bc1d7b681d3837ff3bc2d8e5bb12207c4b 100644 (file)
@@ -64,6 +64,13 @@ class Event(object):
         return "{0}\n    {1}".format(
             self._message, self._progress_str(30))
 
+    def to_json(self):
+        return {
+            "id": self.id,
+            "message": self.message,
+            "refs": self._refs
+        }
+
 
 class GhostEvent(Event):
     """
@@ -79,13 +86,6 @@ class GhostEvent(Event):
     def progress(self):
         return 1.0
 
-    def encode(self):
-        return {
-            "id": self.id,
-            "message": self.message,
-            "refs": self._refs
-        }
-
 
 class RemoteEvent(Event):
     """
@@ -130,6 +130,10 @@ class PgRecoveryEvent(Event):
 
         self.id = str(uuid.uuid4())
 
+    @property
+    def evacuating_osds(self):
+        return self. _evacuate_osds
+
     def pg_update(self, pg_dump, log):
         # FIXME: O(pg_num) in python
         # FIXME: far more fields getting pythonized than we really care about
@@ -147,7 +151,7 @@ class PgRecoveryEvent(Event):
 
         # Calculating progress as the number of PGs recovered divided by the
         # original where partially completed PGs count for something
-        # between 0.0-1.0. This is perhaps less faithful than lookign at the
+        # between 0.0-1.0. This is perhaps less faithful than looking at the
         # total number of bytes recovered, but it does a better job of
         # representing the work still to do if there are a number of very
         # few-bytes PGs that still need the housekeeping of their recovery
@@ -156,7 +160,13 @@ class PgRecoveryEvent(Event):
         complete = set()
         for pg in self._pgs:
             pg_str = str(pg)
-            info = pg_to_state[pg_str]
+            try:
+                info = pg_to_state[pg_str]
+            except KeyError:
+                # The PG is gone!  Probably a pool was deleted. Drop it.
+                complete.add(pg)
+                continue
+
             state = info['state']
 
             states = state.split("+")
@@ -167,10 +177,6 @@ class PgRecoveryEvent(Event):
             if "active" in states and "clean" in states and not unmoved:
                 complete.add(pg)
             else:
-                # FIXME: handle apparent negative progress
-                #   (e.g. if num_bytes_recovered goes backwards)
-                # FIXME: handle apparent >1.0 progress
-                #   (num_bytes_recovered incremented by more than num_bytes)
                 if info['stat_sum']['num_bytes'] == 0:
                     # Empty PGs are considered 0% done until they are
                     # in the correct state.
@@ -178,9 +184,20 @@ class PgRecoveryEvent(Event):
                 else:
                     recovered = info['stat_sum']['num_bytes_recovered']
                     total_bytes = info['stat_sum']['num_bytes']
-                    ratio = float(recovered -
-                                  self._original_bytes_recovered[pg]) / \
-                        total_bytes
+                    if total_bytes > 0:
+                        ratio = float(recovered -
+                                      self._original_bytes_recovered[pg]) / \
+                            total_bytes
+
+                        # Since the recovered bytes (over time) could perhaps
+                        # exceed the contents of the PG (moment in time), we
+                        # must clamp this
+                        ratio = min(ratio, 1.0)
+
+                    else:
+                        # Dataless PGs (e.g. containing only OMAPs) count
+                        # as half done.
+                        ratio = 0.5
 
                     complete_accumulate += ratio
 
@@ -218,6 +235,9 @@ class Module(MgrModule):
         {"cmd": "progress",
          "desc": "Show progress of recovery operations",
          "perm": "r"},
+        {"cmd": "progress json",
+         "desc": "Show machine readable progress information",
+         "perm": "r"},
         {"cmd": "progress clear",
          "desc": "Reset progress tracking",
          "perm": "rw"}
@@ -269,6 +289,14 @@ class Module(MgrModule):
         ev.pg_update(self.get("pg_dump"), self.log)
         self._events[ev.id] = ev
 
+    def _osd_in(self, osd_id):
+        for ev_id, ev in self._events.items():
+            if isinstance(ev, PgRecoveryEvent) and osd_id in ev.evacuating_osds:
+                self.log.info("OSD {0} came back in, cancelling event".format(
+                    osd_id
+                ))
+                self._complete(ev)
+
     def _osdmap_changed(self, old_osdmap, new_osdmap):
         old_dump = old_osdmap.dump()
         new_dump = new_osdmap.dump()
@@ -278,12 +306,18 @@ class Module(MgrModule):
         for osd in new_dump['osds']:
             osd_id = osd['osd']
             new_weight = osd['in']
-            if osd_id in old_osds and new_weight == 0.0:
+            if osd_id in old_osds:
                 old_weight = old_osds[osd_id]['in']
 
-                if old_weight > new_weight:
+                if new_weight == 0.0 and old_weight > new_weight:
                     self.log.warn("osd.{0} marked out".format(osd_id))
                     self._osd_out(old_osdmap, old_dump, new_osdmap, osd_id)
+                elif new_weight >= 1.0 and old_weight == 0.0:
+                    # Only consider weight>=1.0 as "in" to avoid spawning
+                    # individual recovery events on every adjustment
+                    # in a gradual weight-in
+                    self.log.warn("osd.{0} marked in".format(osd_id))
+                    self._osd_in(osd_id)
 
     def notify(self, notify_type, notify_data):
         self._ready.wait()
@@ -301,23 +335,25 @@ class Module(MgrModule):
             for ev_id, ev in self._events.items():
                 if isinstance(ev, PgRecoveryEvent):
                     ev.pg_update(data, self.log)
-        else:
-            self.log.debug("Ignore notification '{0}'".format(notify_type))
-            pass
+                    self.maybe_complete(ev)
 
-    def _persist(self):
+    def maybe_complete(self, event):
+        if event.progress >= 1.0:
+            self._complete(event)
+
+    def _save(self):
         self.log.info("Writing back {0} completed events".format(
             len(self._completed_events)
         ))
         # TODO: bound the number we store.
         encoded = json.dumps({
-            "events": [ev.encode() for ev in self._completed_events],
+            "events": [ev.to_json() for ev in self._completed_events],
             "version": ENCODING_VERSION,
             "compat_version": ENCODING_VERSION
         })
         self.set_store("completed", encoded)
 
-    def _unpersist(self):
+    def _load(self):
         stored = self.get_store("completed")
 
         if stored is None:
@@ -335,7 +371,7 @@ class Module(MgrModule):
     def serve(self):
         self.log.info("Loading...")
 
-        self._unpersist()
+        self._load()
         self.log.info("Loaded {0} historic events".format(self._completed_events))
 
         self._latest_osdmap = self.get_osdmap()
@@ -343,9 +379,10 @@ class Module(MgrModule):
 
         self._ready.set()
 
-        while True:
+        while not self._shutdown.is_set():
+            # Lazy periodic write back of completed events
             if self._dirty:
-                self._persist()
+                self._save()
                 self._dirty = False
 
             self._shutdown.wait(timeout=PERSIST_PERIOD)
@@ -372,6 +409,17 @@ class Module(MgrModule):
 
         ev.set_progress(ev_progress)
 
+    def _complete(self, ev):
+        duration = (datetime.datetime.utcnow() - ev.started_at)
+        self.log.info("Completed event {0} ({1}) in {2} seconds".format(
+            ev.id, ev.message, duration.seconds
+        ))
+
+        self._completed_events.append(
+            GhostEvent(ev.id, ev.message, ev.refs))
+        del self._events[ev.id]
+        self._dirty = True
+
     def complete(self, ev_id):
         """
         For calling from other mgr modules
@@ -381,11 +429,7 @@ class Module(MgrModule):
             ev.set_progress(1.0)
             self.log.info("complete: finished ev {0} ({1})".format(ev_id,
                                                                    ev.message))
-            self._completed_events.append(
-                GhostEvent(ev.id, ev.message, ev.refs))
-            self._dirty = True
-            del self._events[ev_id]
-
+            self._complete(ev)
         except KeyError:
             self.log.warn("complete: ev {0} does not exist".format(ev_id))
             pass
@@ -409,18 +453,30 @@ class Module(MgrModule):
         else:
             return 0, "", "Nothing in progress"
 
+    def _json(self):
+        return {
+            'events': [ev.to_json() for ev in self._events.values()],
+            'completed': [ev.to_json() for ev in self._completed_events]
+        }
+
     def _handle_clear(self):
         self._events = {}
+        self._completed_events = []
+        self._dirty = True
+        self._save()
+
         return 0, "", ""
 
-    def handle_command(self, inbuf, cmd):
+    def handle_command(self, _, cmd):
         if cmd['prefix'] == "progress":
             return self._handle_ls()
-        if cmd['prefix'] == "progress clear":
+        elif cmd['prefix'] == "progress clear":
             # The clear command isn't usually needed - it's to enable
             # the admin to "kick" this module if it seems to have done
             # something wrong (e.g. we have a bug causing a progress event
             # that never finishes)
             return self._handle_clear()
+        elif cmd['prefix'] == "progress json":
+            return 0, json.dumps(self._json(), indent=2), ""
         else:
             raise NotImplementedError(cmd['prefix'])