return "{0}\n {1}".format(
self._message, self._progress_str(30))
+ def to_json(self):
+ return {
+ "id": self.id,
+ "message": self.message,
+ "refs": self._refs
+ }
+
class GhostEvent(Event):
"""
def progress(self):
return 1.0
- def encode(self):
- return {
- "id": self.id,
- "message": self.message,
- "refs": self._refs
- }
-
class RemoteEvent(Event):
"""
self.id = str(uuid.uuid4())
+ @property
+ def evacuating_osds(self):
+ return self. _evacuate_osds
+
def pg_update(self, pg_dump, log):
# FIXME: O(pg_num) in python
# FIXME: far more fields getting pythonized than we really care about
# Calculating progress as the number of PGs recovered divided by the
# original where partially completed PGs count for something
- # between 0.0-1.0. This is perhaps less faithful than lookign at the
+ # between 0.0-1.0. This is perhaps less faithful than looking at the
# total number of bytes recovered, but it does a better job of
# representing the work still to do if there are a number of very
# few-bytes PGs that still need the housekeeping of their recovery
complete = set()
for pg in self._pgs:
pg_str = str(pg)
- info = pg_to_state[pg_str]
+ try:
+ info = pg_to_state[pg_str]
+ except KeyError:
+ # The PG is gone! Probably a pool was deleted. Drop it.
+ complete.add(pg)
+ continue
+
state = info['state']
states = state.split("+")
if "active" in states and "clean" in states and not unmoved:
complete.add(pg)
else:
- # FIXME: handle apparent negative progress
- # (e.g. if num_bytes_recovered goes backwards)
- # FIXME: handle apparent >1.0 progress
- # (num_bytes_recovered incremented by more than num_bytes)
if info['stat_sum']['num_bytes'] == 0:
# Empty PGs are considered 0% done until they are
# in the correct state.
else:
recovered = info['stat_sum']['num_bytes_recovered']
total_bytes = info['stat_sum']['num_bytes']
- ratio = float(recovered -
- self._original_bytes_recovered[pg]) / \
- total_bytes
+ if total_bytes > 0:
+ ratio = float(recovered -
+ self._original_bytes_recovered[pg]) / \
+ total_bytes
+
+ # Since the recovered bytes (over time) could perhaps
+ # exceed the contents of the PG (moment in time), we
+ # must clamp this
+ ratio = min(ratio, 1.0)
+
+ else:
+ # Dataless PGs (e.g. containing only OMAPs) count
+ # as half done.
+ ratio = 0.5
complete_accumulate += ratio
{"cmd": "progress",
"desc": "Show progress of recovery operations",
"perm": "r"},
+ {"cmd": "progress json",
+ "desc": "Show machine readable progress information",
+ "perm": "r"},
{"cmd": "progress clear",
"desc": "Reset progress tracking",
"perm": "rw"}
ev.pg_update(self.get("pg_dump"), self.log)
self._events[ev.id] = ev
+ def _osd_in(self, osd_id):
+ for ev_id, ev in self._events.items():
+ if isinstance(ev, PgRecoveryEvent) and osd_id in ev.evacuating_osds:
+ self.log.info("OSD {0} came back in, cancelling event".format(
+ osd_id
+ ))
+ self._complete(ev)
+
def _osdmap_changed(self, old_osdmap, new_osdmap):
old_dump = old_osdmap.dump()
new_dump = new_osdmap.dump()
for osd in new_dump['osds']:
osd_id = osd['osd']
new_weight = osd['in']
- if osd_id in old_osds and new_weight == 0.0:
+ if osd_id in old_osds:
old_weight = old_osds[osd_id]['in']
- if old_weight > new_weight:
+ if new_weight == 0.0 and old_weight > new_weight:
self.log.warn("osd.{0} marked out".format(osd_id))
self._osd_out(old_osdmap, old_dump, new_osdmap, osd_id)
+ elif new_weight >= 1.0 and old_weight == 0.0:
+ # Only consider weight>=1.0 as "in" to avoid spawning
+ # individual recovery events on every adjustment
+ # in a gradual weight-in
+ self.log.warn("osd.{0} marked in".format(osd_id))
+ self._osd_in(osd_id)
def notify(self, notify_type, notify_data):
self._ready.wait()
for ev_id, ev in self._events.items():
if isinstance(ev, PgRecoveryEvent):
ev.pg_update(data, self.log)
- else:
- self.log.debug("Ignore notification '{0}'".format(notify_type))
- pass
+ self.maybe_complete(ev)
- def _persist(self):
+ def maybe_complete(self, event):
+ if event.progress >= 1.0:
+ self._complete(event)
+
+ def _save(self):
self.log.info("Writing back {0} completed events".format(
len(self._completed_events)
))
# TODO: bound the number we store.
encoded = json.dumps({
- "events": [ev.encode() for ev in self._completed_events],
+ "events": [ev.to_json() for ev in self._completed_events],
"version": ENCODING_VERSION,
"compat_version": ENCODING_VERSION
})
self.set_store("completed", encoded)
- def _unpersist(self):
+ def _load(self):
stored = self.get_store("completed")
if stored is None:
def serve(self):
self.log.info("Loading...")
- self._unpersist()
+ self._load()
self.log.info("Loaded {0} historic events".format(self._completed_events))
self._latest_osdmap = self.get_osdmap()
self._ready.set()
- while True:
+ while not self._shutdown.is_set():
+ # Lazy periodic write back of completed events
if self._dirty:
- self._persist()
+ self._save()
self._dirty = False
self._shutdown.wait(timeout=PERSIST_PERIOD)
ev.set_progress(ev_progress)
+ def _complete(self, ev):
+ duration = (datetime.datetime.utcnow() - ev.started_at)
+ self.log.info("Completed event {0} ({1}) in {2} seconds".format(
+ ev.id, ev.message, duration.seconds
+ ))
+
+ self._completed_events.append(
+ GhostEvent(ev.id, ev.message, ev.refs))
+ del self._events[ev.id]
+ self._dirty = True
+
def complete(self, ev_id):
"""
For calling from other mgr modules
ev.set_progress(1.0)
self.log.info("complete: finished ev {0} ({1})".format(ev_id,
ev.message))
- self._completed_events.append(
- GhostEvent(ev.id, ev.message, ev.refs))
- self._dirty = True
- del self._events[ev_id]
-
+ self._complete(ev)
except KeyError:
self.log.warn("complete: ev {0} does not exist".format(ev_id))
pass
else:
return 0, "", "Nothing in progress"
+ def _json(self):
+ return {
+ 'events': [ev.to_json() for ev in self._events.values()],
+ 'completed': [ev.to_json() for ev in self._completed_events]
+ }
+
def _handle_clear(self):
self._events = {}
+ self._completed_events = []
+ self._dirty = True
+ self._save()
+
return 0, "", ""
- def handle_command(self, inbuf, cmd):
+ def handle_command(self, _, cmd):
if cmd['prefix'] == "progress":
return self._handle_ls()
- if cmd['prefix'] == "progress clear":
+ elif cmd['prefix'] == "progress clear":
# The clear command isn't usually needed - it's to enable
# the admin to "kick" this module if it seems to have done
# something wrong (e.g. we have a bug causing a progress event
# that never finishes)
return self._handle_clear()
+ elif cmd['prefix'] == "progress json":
+ return 0, json.dumps(self._json(), indent=2), ""
else:
raise NotImplementedError(cmd['prefix'])