log.info(json.dumps(p, indent=2))
return p['events']
+ def _completed_events(self):
+ """
+ This function returns all events that are completed
+ """
+ p = self._get_progress()
+ log.info(json.dumps(p, indent=2))
+ return p['completed']
+
+ def is_osd_marked_out(self, ev):
+ return ev['message'].endswith('marked out')
+
+ def is_osd_marked_in(self, ev):
+ return ev['message'].endswith('marked in')
+
+ def _osd_in_out_events_count(self, marked='both'):
+ """
+ Count the number of on going recovery events that deals with
+ OSDs being marked in, out or both.
+ """
+ events_in_progress = self._events_in_progress()
+ marked_in_count = 0
+ marked_out_count = 0
+
+ for ev in events_in_progress:
+ if self.is_osd_marked_out(ev):
+ marked_out_count += 1
+ elif self.is_osd_marked_in(ev):
+ marked_in_count += 1
+
+ if marked == 'both':
+ return marked_in_count + marked_out_count
+ elif marked == 'in':
+ return marked_in_count
+ else:
+ return marked_out_count
+
def _setup_pool(self, size=None):
self.mgr_cluster.mon_manager.create_pool(self.POOL)
if size is not None:
self.mgr_cluster.mon_manager.raw_cluster_cmd(
'osd', 'pool', 'set', self.POOL, 'size', str(size))
+ def _osd_in_out_completed_events_count(self, marked='both'):
+ """
+ Count the number of completed recovery events that deals with
+ OSDs being marked in, out, or both.
+ """
+
+ completed_events = self._completed_events()
+ marked_in_count = 0
+ marked_out_count = 0
+
+ for ev in completed_events:
+ if self.is_osd_marked_out(ev):
+ marked_out_count += 1
+ elif self.is_osd_marked_in(ev):
+ marked_in_count += 1
+
+ if marked == 'both':
+ return marked_in_count + marked_out_count
+ elif marked == 'in':
+ return marked_in_count
+ else:
+ return marked_out_count
+
def _write_some_data(self, t):
"""
To adapt to test systems of varying performance, we write
ev = self._all_events()[0]
log.info(json.dumps(ev, indent=1))
self.assertIn("Rebalancing after osd.0 marked out", ev['message'])
-
return ev
def _simulate_back_in(self, osd_ids, initial_event):
-
for osd_id in osd_ids:
self.mgr_cluster.mon_manager.raw_cluster_cmd(
'osd', 'in', str(osd_id))
-
+
# First Event should complete promptly
self.wait_until_true(lambda: self._is_complete(initial_event['id']),
timeout=self.EVENT_CREATION_PERIOD)
-
try:
# Wait for progress event marked in to pop up
self.wait_until_equal(lambda: len(self._events_in_progress()), 1,
return None
new_event = self._events_in_progress()[0]
- log.info(json.dumps(new_event, indent=1))
- self.assertIn("Rebalancing after osd.0 marked in", new_event['message'])
-
return new_event
def _is_quiet(self):
# Wait for progress event to ultimately reach completion
self.wait_until_true(lambda: self._is_complete(ev['id']),
timeout=self.RECOVERY_PERIOD)
- self.assertTrue(self._is_quiet())
+ self.assertEqual(self._osd_in_out_events_count(), 0)
def test_pool_removal(self):
"""
# Event should complete promptly
self.wait_until_true(lambda: self._is_complete(ev['id']),
timeout=self.EVENT_CREATION_PERIOD)
- self.assertTrue(self._is_quiet())
+ self.assertEqual(self._osd_in_out_events_count(), 0)
def test_osd_came_back(self):
"""
self.wait_until_true(lambda: self._is_complete(ev2['id']),
timeout=self.RECOVERY_PERIOD)
- self.assertTrue(self._is_quiet())
+ self.assertEqual(self._osd_in_out_events_count(), 0)
def test_osd_cannot_recover(self):
"""
# We should see an event for each of the OSDs we took out
self.wait_until_equal(
- lambda: len(self._all_events()),
+ lambda: self._osd_in_out_events_count('out'),
osd_count - pool_size,
- timeout=self.EVENT_CREATION_PERIOD)
+ timeout=self.EVENT_CREATION_PERIOD*(osd_count - pool_size))
# Those should complete cleanly
- self.wait_until_true(
- lambda: self._is_quiet(),
- timeout=self.RECOVERY_PERIOD
+ self.wait_until_equal(
+ lambda: self._osd_in_out_completed_events_count('out'),
+ osd_count - pool_size,
+ timeout=self.RECOVERY_PERIOD*(osd_count - pool_size)
)
# Fail one last OSD, at the point the PGs have nowhere to go
# Check that no event is created
time.sleep(self.EVENT_CREATION_PERIOD)
- self.assertEqual(len(self._all_events()), osd_count - pool_size)
+ self.assertEqual(
+ self._osd_in_out_completed_events_count('out'),
+ osd_count - pool_size)
objects (osds, pools) this relates to.
"""
- def __init__(self, message, refs, started_at=None):
- # type: (str, List[str], Optional[float]) -> None
+ def __init__(self, message, refs, add_to_ceph_s, started_at=None):
+ # type: (str, List[str], bool, Optional[float]) -> None
self._message = message
self._refs = refs
self.started_at = started_at if started_at else time.time()
self.id = None # type: Optional[str]
+ self._add_to_ceph_s = add_to_ceph_s
def _refresh(self):
global _module
_module.log.debug('refreshing mgr for %s (%s) at %f' % (self.id, self._message,
self.progress))
_module.update_progress_event(
- self.id, self.twoline_progress(6), self.progress)
+ self.id, self.twoline_progress(6), self.progress, self._add_to_ceph_s)
@property
def message(self):
def refs(self):
# type: () -> List[str]
return self._refs
+
+ @property
+ def add_to_ceph_s(self):
+ # type: () -> bool
+ return self._add_to_ceph_s
+
@property
def progress(self):
after the event is complete.
"""
- def __init__(self, my_id, message, refs, started_at, finished_at=None,
+ def __init__(self, my_id, message, refs, add_to_ceph_s, started_at, finished_at=None,
failed=False, failure_message=None):
- super(GhostEvent, self).__init__(message, refs, started_at)
+ super().__init__(message, refs, add_to_ceph_s, started_at)
self.finished_at = finished_at if finished_at else time.time()
self.id = my_id
"message": self.message,
"refs": self._refs,
"started_at": self.started_at,
- "finished_at": self.finished_at
+ "finished_at": self.finished_at,
+ "add_to_ceph_s:": self.add_to_ceph_s
}
if self._failed:
d["failed"] = True
d["failure_message"] = self._failure_message
return d
+class GlobalRecoveryEvent(Event):
+ """
+ An event whoese completion is determined by active+clean/total_pg_num
+ """
+
+ def __init__(self, message, refs, add_to_ceph_s, start_epoch, active_clean_num):
+ # type: (str, List[Any], bool, int, int) -> None
+ super().__init__(message, refs, add_to_ceph_s)
+ self._add_to_ceph_s = add_to_ceph_s
+ self._progress = 0.0
+ self.id = str(uuid.uuid4()) # type: str
+ self._start_epoch = start_epoch
+ self._active_clean_num = active_clean_num
+ self._refresh()
+
+ def global_event_update_progress(self, pg_dump):
+ # type: (Dict) -> None
+ "Update progress of Global Recovery Event"
+
+ pgs = pg_dump['pg_stats']
+ new_active_clean_num = 0
+ for pg in pgs:
+
+ if int(pg['reported_epoch']) < int(self._start_epoch):
+ continue
+
+ state = pg['state']
+
+ states = state.split("+")
+
+ if "active" in states and "clean" in states:
+ new_active_clean_num += 1
+
+ total_pg_num = len(pgs)
+ if self._active_clean_num != new_active_clean_num:
+ # Have this case to know when need to update
+ # the progress
+ try:
+ # Might be that total_pg_num is 0
+ self._progress = float(new_active_clean_num) / total_pg_num
+ except ZeroDivisionError:
+ self._progress = 0.0
+
+ self._refresh()
+
+ @property
+ def progress(self):
+ return self._progress
class RemoteEvent(Event):
"""
progress information as it emerges.
"""
- def __init__(self, my_id, message, refs):
- # type: (str, str, List[str]) -> None
- super(RemoteEvent, self).__init__(message, refs)
+ def __init__(self, my_id, message, refs, add_to_ceph_s):
+ # type: (str, str, List[str], bool) -> None
+ super().__init__(message, refs, add_to_ceph_s)
self.id = my_id
self._progress = 0.0
self._failed = False
Always call update() immediately after construction.
"""
- def __init__(self, message, refs, which_pgs, which_osds, start_epoch):
- # type: (str, List[Any], List[PgId], List[str], int) -> None
- super(PgRecoveryEvent, self).__init__(message, refs)
+ def __init__(self, message, refs, which_pgs, which_osds, start_epoch, add_to_ceph_s):
+ # type: (str, List[Any], List[PgId], List[str], int, bool) -> None
+ super().__init__(message, refs, add_to_ceph_s)
self._pgs = which_pgs
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
- self._events = {} # type: Dict[str, Union[RemoteEvent, PgRecoveryEvent]]
+ self._events = {} # type: Dict[str, Union[RemoteEvent, PgRecoveryEvent, GlobalRecoveryEvent]]
self._completed_events = [] # type: List[GhostEvent]
self._old_osd_map = None # type: Optional[OSDMap]
refs=[("osd", osd_id)],
which_pgs=affected_pgs,
which_osds=[osd_id],
- start_epoch=self.get_osdmap().get_epoch()
+ start_epoch=self.get_osdmap().get_epoch(),
+ add_to_ceph_s=False
)
r_ev.pg_update(self.get("pg_stats"), self.get("pg_ready"), self.log)
self._events[r_ev.id] = r_ev
self.log.warning("osd.{0} marked in".format(osd_id))
self._osd_in_out(old_osdmap, old_dump, new_osdmap, osd_id, "in")
+ def _pg_state_changed(self, pg_dump):
+
+ # This function both constructs and updates
+ # the global recovery event if one of the
+ # PGs is not at active+clean state
+
+ pgs = pg_dump['pg_stats']
+ total_pg_num = len(pgs)
+ active_clean_num = 0
+ for pg in pgs:
+ state = pg['state']
+
+ states = state.split("+")
+
+ if "active" in states and "clean" in states:
+ active_clean_num += 1
+ try:
+ # There might be a case where there is no pg_num
+ progress = float(active_clean_num) / total_pg_num
+ except ZeroDivisionError:
+ return
+ if progress < 1.0:
+ ev = GlobalRecoveryEvent("Global Recovery Event",
+ refs=[("global","")],
+ add_to_ceph_s=True,
+ start_epoch=self.get_osdmap().get_epoch(),
+ active_clean_num=active_clean_num)
+ ev.global_event_update_progress(pg_dump)
+ self._events[ev.id] = ev
+
def notify(self, notify_type, notify_data):
self._ready.wait()
# expensive get calls
if len(self._events) == 0:
return
+
+ global_event = False
data = self.get("pg_stats")
ready = self.get("pg_ready")
for ev_id in list(self._events):
ev = self._events[ev_id]
+ # Check for types of events
+ # we have to update
if isinstance(ev, PgRecoveryEvent):
ev.pg_update(data, ready, self.log)
self.maybe_complete(ev)
+ elif isinstance(ev, GlobalRecoveryEvent):
+ global_event = True
+ ev.global_event_update_progress(data)
+ self.maybe_complete(ev)
+
+ if not global_event:
+ # If there is no global event
+ # we create one
+ self._pg_state_changed(data)
def maybe_complete(self, event):
# type: (Event) -> None
self._shutdown.set()
self.clear_all_progress_events()
- def update(self, ev_id, ev_msg, ev_progress, refs=None):
- # type: (str, str, float, Optional[list]) -> None
+ def update(self, ev_id, ev_msg, ev_progress, refs=None, add_to_ceph_s=False):
+ # type: (str, str, float, Optional[list], bool) -> None
"""
For calling from other mgr modules
"""
ev = self._events[ev_id]
assert isinstance(ev, RemoteEvent)
except KeyError:
- ev = RemoteEvent(ev_id, ev_msg, refs)
+ ev = RemoteEvent(ev_id, ev_msg, refs, add_to_ceph_s)
self._events[ev_id] = ev
self.log.info("update: starting ev {0} ({1})".format(
ev_id, ev_msg))
self.complete_progress_event(ev.id)
self._completed_events.append(
- GhostEvent(ev.id, ev.message, ev.refs, ev.started_at,
+ GhostEvent(ev.id, ev.message, ev.refs, ev.add_to_ceph_s, ev.started_at,
failed=ev.failed, failure_message=ev.failure_message))
assert ev.id
del self._events[ev.id]