From 51691d84b91b4c0a08cd52f3175821b4a2f44e6f Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 20 Jul 2018 08:22:23 -0400 Subject: [PATCH] mgr/progress: fix PgRecoveryEvent completion cases The event was previously not getting moved to the completed list. There are a couple more cases too: - When some pgs go away (a pool is removed) during the event - When the OSD comes back in after going out Signed-off-by: John Spray --- src/pybind/mgr/mgr_module.py | 2 +- src/pybind/mgr/progress/module.py | 124 ++++++++++++++++++++++-------- 2 files changed, 91 insertions(+), 35 deletions(-) diff --git a/src/pybind/mgr/mgr_module.py b/src/pybind/mgr/mgr_module.py index 700f624d1ac..73e58f049d2 100644 --- a/src/pybind/mgr/mgr_module.py +++ b/src/pybind/mgr/mgr_module.py @@ -136,7 +136,7 @@ class OSDMap(ceph_module.BasePyOSDMap): return self._map_pool_pgs_up(poolid) def pg_to_up_acting_osds(self, pool_id, ps): - return self._pg_to_up_acting_osds(self._handle, pool_id, ps) + return self._pg_to_up_acting_osds(pool_id, ps) class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental): diff --git a/src/pybind/mgr/progress/module.py b/src/pybind/mgr/progress/module.py index cd57d12dc71..b998b1bc1d7 100644 --- a/src/pybind/mgr/progress/module.py +++ b/src/pybind/mgr/progress/module.py @@ -64,6 +64,13 @@ class Event(object): return "{0}\n {1}".format( self._message, self._progress_str(30)) + def to_json(self): + return { + "id": self.id, + "message": self.message, + "refs": self._refs + } + class GhostEvent(Event): """ @@ -79,13 +86,6 @@ class GhostEvent(Event): def progress(self): return 1.0 - def encode(self): - return { - "id": self.id, - "message": self.message, - "refs": self._refs - } - class RemoteEvent(Event): """ @@ -130,6 +130,10 @@ class PgRecoveryEvent(Event): self.id = str(uuid.uuid4()) + @property + def evacuating_osds(self): + return self. _evacuate_osds + def pg_update(self, pg_dump, log): # FIXME: O(pg_num) in python # FIXME: far more fields getting pythonized than we really care about @@ -147,7 +151,7 @@ class PgRecoveryEvent(Event): # Calculating progress as the number of PGs recovered divided by the # original where partially completed PGs count for something - # between 0.0-1.0. This is perhaps less faithful than lookign at the + # between 0.0-1.0. This is perhaps less faithful than looking at the # total number of bytes recovered, but it does a better job of # representing the work still to do if there are a number of very # few-bytes PGs that still need the housekeeping of their recovery @@ -156,7 +160,13 @@ class PgRecoveryEvent(Event): complete = set() for pg in self._pgs: pg_str = str(pg) - info = pg_to_state[pg_str] + try: + info = pg_to_state[pg_str] + except KeyError: + # The PG is gone! Probably a pool was deleted. Drop it. + complete.add(pg) + continue + state = info['state'] states = state.split("+") @@ -167,10 +177,6 @@ class PgRecoveryEvent(Event): if "active" in states and "clean" in states and not unmoved: complete.add(pg) else: - # FIXME: handle apparent negative progress - # (e.g. if num_bytes_recovered goes backwards) - # FIXME: handle apparent >1.0 progress - # (num_bytes_recovered incremented by more than num_bytes) if info['stat_sum']['num_bytes'] == 0: # Empty PGs are considered 0% done until they are # in the correct state. @@ -178,9 +184,20 @@ class PgRecoveryEvent(Event): else: recovered = info['stat_sum']['num_bytes_recovered'] total_bytes = info['stat_sum']['num_bytes'] - ratio = float(recovered - - self._original_bytes_recovered[pg]) / \ - total_bytes + if total_bytes > 0: + ratio = float(recovered - + self._original_bytes_recovered[pg]) / \ + total_bytes + + # Since the recovered bytes (over time) could perhaps + # exceed the contents of the PG (moment in time), we + # must clamp this + ratio = min(ratio, 1.0) + + else: + # Dataless PGs (e.g. containing only OMAPs) count + # as half done. + ratio = 0.5 complete_accumulate += ratio @@ -218,6 +235,9 @@ class Module(MgrModule): {"cmd": "progress", "desc": "Show progress of recovery operations", "perm": "r"}, + {"cmd": "progress json", + "desc": "Show machine readable progress information", + "perm": "r"}, {"cmd": "progress clear", "desc": "Reset progress tracking", "perm": "rw"} @@ -269,6 +289,14 @@ class Module(MgrModule): ev.pg_update(self.get("pg_dump"), self.log) self._events[ev.id] = ev + def _osd_in(self, osd_id): + for ev_id, ev in self._events.items(): + if isinstance(ev, PgRecoveryEvent) and osd_id in ev.evacuating_osds: + self.log.info("OSD {0} came back in, cancelling event".format( + osd_id + )) + self._complete(ev) + def _osdmap_changed(self, old_osdmap, new_osdmap): old_dump = old_osdmap.dump() new_dump = new_osdmap.dump() @@ -278,12 +306,18 @@ class Module(MgrModule): for osd in new_dump['osds']: osd_id = osd['osd'] new_weight = osd['in'] - if osd_id in old_osds and new_weight == 0.0: + if osd_id in old_osds: old_weight = old_osds[osd_id]['in'] - if old_weight > new_weight: + if new_weight == 0.0 and old_weight > new_weight: self.log.warn("osd.{0} marked out".format(osd_id)) self._osd_out(old_osdmap, old_dump, new_osdmap, osd_id) + elif new_weight >= 1.0 and old_weight == 0.0: + # Only consider weight>=1.0 as "in" to avoid spawning + # individual recovery events on every adjustment + # in a gradual weight-in + self.log.warn("osd.{0} marked in".format(osd_id)) + self._osd_in(osd_id) def notify(self, notify_type, notify_data): self._ready.wait() @@ -301,23 +335,25 @@ class Module(MgrModule): for ev_id, ev in self._events.items(): if isinstance(ev, PgRecoveryEvent): ev.pg_update(data, self.log) - else: - self.log.debug("Ignore notification '{0}'".format(notify_type)) - pass + self.maybe_complete(ev) - def _persist(self): + def maybe_complete(self, event): + if event.progress >= 1.0: + self._complete(event) + + def _save(self): self.log.info("Writing back {0} completed events".format( len(self._completed_events) )) # TODO: bound the number we store. encoded = json.dumps({ - "events": [ev.encode() for ev in self._completed_events], + "events": [ev.to_json() for ev in self._completed_events], "version": ENCODING_VERSION, "compat_version": ENCODING_VERSION }) self.set_store("completed", encoded) - def _unpersist(self): + def _load(self): stored = self.get_store("completed") if stored is None: @@ -335,7 +371,7 @@ class Module(MgrModule): def serve(self): self.log.info("Loading...") - self._unpersist() + self._load() self.log.info("Loaded {0} historic events".format(self._completed_events)) self._latest_osdmap = self.get_osdmap() @@ -343,9 +379,10 @@ class Module(MgrModule): self._ready.set() - while True: + while not self._shutdown.is_set(): + # Lazy periodic write back of completed events if self._dirty: - self._persist() + self._save() self._dirty = False self._shutdown.wait(timeout=PERSIST_PERIOD) @@ -372,6 +409,17 @@ class Module(MgrModule): ev.set_progress(ev_progress) + def _complete(self, ev): + duration = (datetime.datetime.utcnow() - ev.started_at) + self.log.info("Completed event {0} ({1}) in {2} seconds".format( + ev.id, ev.message, duration.seconds + )) + + self._completed_events.append( + GhostEvent(ev.id, ev.message, ev.refs)) + del self._events[ev.id] + self._dirty = True + def complete(self, ev_id): """ For calling from other mgr modules @@ -381,11 +429,7 @@ class Module(MgrModule): ev.set_progress(1.0) self.log.info("complete: finished ev {0} ({1})".format(ev_id, ev.message)) - self._completed_events.append( - GhostEvent(ev.id, ev.message, ev.refs)) - self._dirty = True - del self._events[ev_id] - + self._complete(ev) except KeyError: self.log.warn("complete: ev {0} does not exist".format(ev_id)) pass @@ -409,18 +453,30 @@ class Module(MgrModule): else: return 0, "", "Nothing in progress" + def _json(self): + return { + 'events': [ev.to_json() for ev in self._events.values()], + 'completed': [ev.to_json() for ev in self._completed_events] + } + def _handle_clear(self): self._events = {} + self._completed_events = [] + self._dirty = True + self._save() + return 0, "", "" - def handle_command(self, inbuf, cmd): + def handle_command(self, _, cmd): if cmd['prefix'] == "progress": return self._handle_ls() - if cmd['prefix'] == "progress clear": + elif cmd['prefix'] == "progress clear": # The clear command isn't usually needed - it's to enable # the admin to "kick" this module if it seems to have done # something wrong (e.g. we have a bug causing a progress event # that never finishes) return self._handle_clear() + elif cmd['prefix'] == "progress json": + return 0, json.dumps(self._json(), indent=2), "" else: raise NotImplementedError(cmd['prefix']) -- 2.39.5