def pg_update(self, pg_dump, log):
# FIXME: O(pg_num) in python
# FIXME: far more fields getting pythonized than we really care about
+ # Sanity check to see if there are any missing PGs and to assign
+ # empty array and dictionary if there hasn't been any recovery
pg_to_state = dict([(p['pgid'], p) for p in pg_dump['pg_stats']])
-
if self._original_bytes_recovered is None:
self._original_bytes_recovered = {}
missing_pgs = []
states = state.split("+")
- unmoved = bool(set(self._evacuate_osds) & (
- set(info['up']) | set(info['acting'])))
-
- if "active" in states and "clean" in states and not unmoved:
+ if "active" in states and "clean" in states:
complete.add(pg)
else:
if info['stat_sum']['num_bytes'] == 0:
ratio = float(recovered -
self._original_bytes_recovered[pg]) / \
total_bytes
-
# Since the recovered bytes (over time) could perhaps
# exceed the contents of the PG (moment in time), we
# must clamp this
# Dataless PGs (e.g. containing only OMAPs) count
# as half done.
ratio = 0.5
-
complete_accumulate += ratio
self._pgs = list(set(self._pgs) ^ complete)
ev.pg_update(self.get("pg_dump"), self.log)
self._events[ev.id] = ev
- def _osd_in(self, osd_id):
+ def _osd_in(self, old_map, old_dump, new_map, osd_id):
+ affected_pgs = []
+ unmoved_pgs = []
+ for pool in old_dump['pools']:
+ pool_id = pool['pool']
+ for ps in range(0, pool['pg_num']):
+ up_acting = old_map.pg_to_up_acting_osds(pool['pool'], ps)
+
+ # Was this OSD affected by the OSD coming in?
+ old_osds = set(up_acting['acting'])
+ was_on_out_osd = osd_id in old_osds
+ if not was_on_out_osd:
+ continue
+
+ self.log.debug("pool_id, ps = {0}, {1}".format(
+ pool_id, ps
+ ))
+
+ self.log.debug(
+ "up_acting: {0}".format(json.dumps(up_acting, indent=2)))
+
+ new_up_acting = new_map.pg_to_up_acting_osds(pool['pool'], ps)
+ new_osds = set(new_up_acting['acting'])
+
+ # Has this OSD been assigned a new location?
+ # (it might not be if there is no suitable place to move
+ # after an OSD is marked in)
+ is_relocated = len(old_osds - new_osds) > 0
+ self.log.debug(
+ "new_up_acting: {0}".format(json.dumps(new_up_acting,
+ indent=2)))
+
+ if was_on_out_osd and is_relocated:
+ # This PG is now in motion, track its progress
+ affected_pgs.append(PgId(pool_id, ps))
+ elif not is_relocated:
+ # This PG didn't get a new location, we'll log it
+ unmoved_pgs.append(PgId(pool_id, ps))
+
+ # In the case that we ignored some PGs, log the reason why (we may
+ # not end up creating a progress event)
+ if len(unmoved_pgs):
+ self.log.warn("{0} PGs were on osd.{1}, but didn't get new locations".format(
+ len(unmoved_pgs), osd_id))
+
+ self.log.warn("{0} PGs affected by osd.{1} coming in".format(
+ len(affected_pgs), osd_id))
+
+ if len(affected_pgs) > 0:
+ ev = PgRecoveryEvent(
+ "Rebalancing after osd.{0} marked in".format(osd_id),
+ refs=[("osd", osd_id)],
+ which_pgs=affected_pgs,
+ evacuate_osds=[osd_id]
+ )
+ ev.pg_update(self.get("pg_dump"), self.log)
+ self._events[ev.id] = ev
+
for ev_id, ev in self._events.items():
if isinstance(ev, PgRecoveryEvent) and osd_id in ev.evacuating_osds:
self.log.info("osd.{0} came back in, cancelling event".format(
# individual recovery events on every adjustment
# in a gradual weight-in
self.log.warn("osd.{0} marked in".format(osd_id))
- self._osd_in(osd_id)
+ self._osd_in(old_osdmap, old_dump, new_osdmap, osd_id)
def notify(self, notify_type, notify_data):
self._ready.wait()