From: Kamoltat Date: Tue, 22 Sep 2020 13:12:00 +0000 (+0000) Subject: mgr/progress: Global Recovery Event in ceph -s X-Git-Tag: v16.1.0~718^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2af2afa5e9191115bb6f0b36194830ffb91938bf;p=ceph.git mgr/progress: Global Recovery Event in ceph -s Modified the progress module and BaseMgrModule to support Global Recovert Event. Adding more arguments to update_progress_event, ceph_update_progress_event. To only show global recovery event progress with `ceph -s`. All sub events have been move to `ceph progress` Signed-off-by: Kamoltat --- diff --git a/qa/tasks/mgr/test_progress.py b/qa/tasks/mgr/test_progress.py index 16e5df170612..fa73b951096c 100644 --- a/qa/tasks/mgr/test_progress.py +++ b/qa/tasks/mgr/test_progress.py @@ -44,12 +44,71 @@ class TestProgress(MgrTestCase): log.info(json.dumps(p, indent=2)) return p['events'] + def _completed_events(self): + """ + This function returns all events that are completed + """ + p = self._get_progress() + log.info(json.dumps(p, indent=2)) + return p['completed'] + + def is_osd_marked_out(self, ev): + return ev['message'].endswith('marked out') + + def is_osd_marked_in(self, ev): + return ev['message'].endswith('marked in') + + def _osd_in_out_events_count(self, marked='both'): + """ + Count the number of on going recovery events that deals with + OSDs being marked in, out or both. + """ + events_in_progress = self._events_in_progress() + marked_in_count = 0 + marked_out_count = 0 + + for ev in events_in_progress: + if self.is_osd_marked_out(ev): + marked_out_count += 1 + elif self.is_osd_marked_in(ev): + marked_in_count += 1 + + if marked == 'both': + return marked_in_count + marked_out_count + elif marked == 'in': + return marked_in_count + else: + return marked_out_count + def _setup_pool(self, size=None): self.mgr_cluster.mon_manager.create_pool(self.POOL) if size is not None: self.mgr_cluster.mon_manager.raw_cluster_cmd( 'osd', 'pool', 'set', self.POOL, 'size', str(size)) + def _osd_in_out_completed_events_count(self, marked='both'): + """ + Count the number of completed recovery events that deals with + OSDs being marked in, out, or both. + """ + + completed_events = self._completed_events() + marked_in_count = 0 + marked_out_count = 0 + + for ev in completed_events: + if self.is_osd_marked_out(ev): + marked_out_count += 1 + elif self.is_osd_marked_in(ev): + marked_in_count += 1 + + if marked == 'both': + return marked_in_count + marked_out_count + elif marked == 'in': + return marked_in_count + else: + return marked_out_count + def _write_some_data(self, t): """ To adapt to test systems of varying performance, we write @@ -110,19 +169,16 @@ class TestProgress(MgrTestCase): ev = self._all_events()[0] log.info(json.dumps(ev, indent=1)) self.assertIn("Rebalancing after osd.0 marked out", ev['message']) - return ev def _simulate_back_in(self, osd_ids, initial_event): - for osd_id in osd_ids: self.mgr_cluster.mon_manager.raw_cluster_cmd( 'osd', 'in', str(osd_id)) - + # First Event should complete promptly self.wait_until_true(lambda: self._is_complete(initial_event['id']), timeout=self.EVENT_CREATION_PERIOD) - try: # Wait for progress event marked in to pop up self.wait_until_equal(lambda: len(self._events_in_progress()), 1, @@ -135,9 +191,6 @@ class TestProgress(MgrTestCase): return None new_event = self._events_in_progress()[0] - log.info(json.dumps(new_event, indent=1)) - self.assertIn("Rebalancing after osd.0 marked in", new_event['message']) - return new_event def _is_quiet(self): @@ -180,7 +233,7 @@ class TestProgress(MgrTestCase): # Wait for progress event to ultimately reach completion self.wait_until_true(lambda: self._is_complete(ev['id']), timeout=self.RECOVERY_PERIOD) - self.assertTrue(self._is_quiet()) + self.assertEqual(self._osd_in_out_events_count(), 0) def test_pool_removal(self): """ @@ -195,7 +248,7 @@ class TestProgress(MgrTestCase): # Event should complete promptly self.wait_until_true(lambda: self._is_complete(ev['id']), timeout=self.EVENT_CREATION_PERIOD) - self.assertTrue(self._is_quiet()) + self.assertEqual(self._osd_in_out_events_count(), 0) def test_osd_came_back(self): """ @@ -213,7 +266,7 @@ class TestProgress(MgrTestCase): self.wait_until_true(lambda: self._is_complete(ev2['id']), timeout=self.RECOVERY_PERIOD) - self.assertTrue(self._is_quiet()) + self.assertEqual(self._osd_in_out_events_count(), 0) def test_osd_cannot_recover(self): """ @@ -243,14 +296,15 @@ class TestProgress(MgrTestCase): # We should see an event for each of the OSDs we took out self.wait_until_equal( - lambda: len(self._all_events()), + lambda: self._osd_in_out_events_count('out'), osd_count - pool_size, - timeout=self.EVENT_CREATION_PERIOD) + timeout=self.EVENT_CREATION_PERIOD*(osd_count - pool_size)) # Those should complete cleanly - self.wait_until_true( - lambda: self._is_quiet(), - timeout=self.RECOVERY_PERIOD + self.wait_until_equal( + lambda: self._osd_in_out_completed_events_count('out'), + osd_count - pool_size, + timeout=self.RECOVERY_PERIOD*(osd_count - pool_size) ) # Fail one last OSD, at the point the PGs have nowhere to go @@ -261,4 +315,6 @@ class TestProgress(MgrTestCase): # Check that no event is created time.sleep(self.EVENT_CREATION_PERIOD) - self.assertEqual(len(self._all_events()), osd_count - pool_size) + self.assertEqual( + self._osd_in_out_completed_events_count('out'), + osd_count - pool_size) diff --git a/src/mgr/ActivePyModules.cc b/src/mgr/ActivePyModules.cc index a7fc09407673..d08a9fd1c2f4 100644 --- a/src/mgr/ActivePyModules.cc +++ b/src/mgr/ActivePyModules.cc @@ -986,12 +986,14 @@ void ActivePyModules::get_health_checks(health_check_map_t *checks) void ActivePyModules::update_progress_event( const std::string& evid, const std::string& desc, - float progress) + float progress, + bool add_to_ceph_s) { std::lock_guard l(lock); auto& pe = progress_events[evid]; pe.message = desc; pe.progress = progress; + pe.add_to_ceph_s = add_to_ceph_s; } void ActivePyModules::complete_progress_event(const std::string& evid) diff --git a/src/mgr/ActivePyModules.h b/src/mgr/ActivePyModules.h index 4892f2705fcd..a6d334bbdded 100644 --- a/src/mgr/ActivePyModules.h +++ b/src/mgr/ActivePyModules.h @@ -132,7 +132,8 @@ public: void update_progress_event(const std::string& evid, const std::string& desc, - float progress); + float progress, + bool add_to_ceph_s); void complete_progress_event(const std::string& evid); void clear_all_progress_events(); void get_progress_events(std::map* events); diff --git a/src/mgr/BaseMgrModule.cc b/src/mgr/BaseMgrModule.cc index b62b681d6417..db9afc244c86 100644 --- a/src/mgr/BaseMgrModule.cc +++ b/src/mgr/BaseMgrModule.cc @@ -675,13 +675,14 @@ ceph_update_progress_event(BaseMgrModule *self, PyObject *args) char *evid = nullptr; char *desc = nullptr; float progress = 0.0; - if (!PyArg_ParseTuple(args, "ssf:ceph_update_progress_event", - &evid, &desc, &progress)) { + bool add_to_ceph_s = false; + if (!PyArg_ParseTuple(args, "ssfb:ceph_update_progress_event", + &evid, &desc, &progress, &add_to_ceph_s)) { return nullptr; } PyThreadState *tstate = PyEval_SaveThread(); - self->py_modules->update_progress_event(evid, desc, progress); + self->py_modules->update_progress_event(evid, desc, progress, add_to_ceph_s); PyEval_RestoreThread(tstate); Py_RETURN_NONE; diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 08e0cbf7d095..8f127ca8b4ea 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -3083,7 +3083,9 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f, if (!pem.empty()) { ss << "\n \n progress:\n"; for (auto& i : pem) { + if (i.second.add_to_ceph_s){ ss << " " << i.second.message << "\n"; + } } } ss << "\n "; diff --git a/src/mon/mon_types.h b/src/mon/mon_types.h index e106d1a7b75d..b6d916ae998b 100644 --- a/src/mon/mon_types.h +++ b/src/mon/mon_types.h @@ -641,22 +641,31 @@ inline std::ostream& operator<<(std::ostream& out, const mon_feature_t& f) { struct ProgressEvent { std::string message; ///< event description float progress; ///< [0..1] - + bool add_to_ceph_s; void encode(ceph::buffer::list& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); encode(message, bl); encode(progress, bl); + encode(add_to_ceph_s, bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator& p) { - DECODE_START(1, p); + DECODE_START(2, p); decode(message, p); decode(progress, p); + if (struct_v >= 2){ + decode(add_to_ceph_s, p); + } else { + if (!message.empty()) { + add_to_ceph_s = true; + } + } DECODE_FINISH(p); } void dump(ceph::Formatter *f) const { f->dump_string("message", message); f->dump_float("progress", progress); + f->dump_bool("add_to_ceph_s", add_to_ceph_s); } }; WRITE_CLASS_ENCODER(ProgressEvent) diff --git a/src/pybind/mgr/ceph_module.pyi b/src/pybind/mgr/ceph_module.pyi index d71259c0dade..0bb7ae4bad6d 100644 --- a/src/pybind/mgr/ceph_module.pyi +++ b/src/pybind/mgr/ceph_module.pyi @@ -61,7 +61,7 @@ class BaseMgrModule(object): def _ceph_get_osdmap(self):... def _ceph_set_uri(self, uri):... def _ceph_have_mon_connection(self):... - def _ceph_update_progress_event(self, evid, desc, progress):... + def _ceph_update_progress_event(self, evid, desc, progress, add_to_ceph_s):... def _ceph_complete_progress_event(self, evid):... def _ceph_clear_all_progress_events(self):... def _ceph_dispatch_remote(self, module_name, method_name, *args, **kwargs):... diff --git a/src/pybind/mgr/mgr_module.py b/src/pybind/mgr/mgr_module.py index c30e5638c4f8..0ba0f4531e5a 100644 --- a/src/pybind/mgr/mgr_module.py +++ b/src/pybind/mgr/mgr_module.py @@ -1491,10 +1491,11 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return self._ceph_have_mon_connection() - def update_progress_event(self, evid, desc, progress): + def update_progress_event(self, evid, desc, progress, add_to_ceph_s): return self._ceph_update_progress_event(str(evid), str(desc), - float(progress)) + float(progress), + bool(add_to_ceph_s)) def complete_progress_event(self, evid): return self._ceph_complete_progress_event(str(evid)) diff --git a/src/pybind/mgr/progress/module.py b/src/pybind/mgr/progress/module.py index 78aa278d8f95..40098d7b43bc 100644 --- a/src/pybind/mgr/progress/module.py +++ b/src/pybind/mgr/progress/module.py @@ -29,12 +29,13 @@ class Event(object): objects (osds, pools) this relates to. """ - def __init__(self, message, refs, started_at=None): - # type: (str, List[str], Optional[float]) -> None + def __init__(self, message, refs, add_to_ceph_s, started_at=None): + # type: (str, List[str], bool, Optional[float]) -> None self._message = message self._refs = refs self.started_at = started_at if started_at else time.time() self.id = None # type: Optional[str] + self._add_to_ceph_s = add_to_ceph_s def _refresh(self): global _module @@ -42,7 +43,7 @@ class Event(object): _module.log.debug('refreshing mgr for %s (%s) at %f' % (self.id, self._message, self.progress)) _module.update_progress_event( - self.id, self.twoline_progress(6), self.progress) + self.id, self.twoline_progress(6), self.progress, self._add_to_ceph_s) @property def message(self): @@ -53,6 +54,12 @@ class Event(object): def refs(self): # type: () -> List[str] return self._refs + + @property + def add_to_ceph_s(self): + # type: () -> bool + return self._add_to_ceph_s + @property def progress(self): @@ -133,9 +140,9 @@ class GhostEvent(Event): after the event is complete. """ - def __init__(self, my_id, message, refs, started_at, finished_at=None, + def __init__(self, my_id, message, refs, add_to_ceph_s, started_at, finished_at=None, failed=False, failure_message=None): - super(GhostEvent, self).__init__(message, refs, started_at) + super().__init__(message, refs, add_to_ceph_s, started_at) self.finished_at = finished_at if finished_at else time.time() self.id = my_id @@ -163,13 +170,62 @@ class GhostEvent(Event): "message": self.message, "refs": self._refs, "started_at": self.started_at, - "finished_at": self.finished_at + "finished_at": self.finished_at, + "add_to_ceph_s:": self.add_to_ceph_s } if self._failed: d["failed"] = True d["failure_message"] = self._failure_message return d +class GlobalRecoveryEvent(Event): + """ + An event whoese completion is determined by active+clean/total_pg_num + """ + + def __init__(self, message, refs, add_to_ceph_s, start_epoch, active_clean_num): + # type: (str, List[Any], bool, int, int) -> None + super().__init__(message, refs, add_to_ceph_s) + self._add_to_ceph_s = add_to_ceph_s + self._progress = 0.0 + self.id = str(uuid.uuid4()) # type: str + self._start_epoch = start_epoch + self._active_clean_num = active_clean_num + self._refresh() + + def global_event_update_progress(self, pg_dump): + # type: (Dict) -> None + "Update progress of Global Recovery Event" + + pgs = pg_dump['pg_stats'] + new_active_clean_num = 0 + for pg in pgs: + + if int(pg['reported_epoch']) < int(self._start_epoch): + continue + + state = pg['state'] + + states = state.split("+") + + if "active" in states and "clean" in states: + new_active_clean_num += 1 + + total_pg_num = len(pgs) + if self._active_clean_num != new_active_clean_num: + # Have this case to know when need to update + # the progress + try: + # Might be that total_pg_num is 0 + self._progress = float(new_active_clean_num) / total_pg_num + except ZeroDivisionError: + self._progress = 0.0 + + self._refresh() + + @property + def progress(self): + return self._progress class RemoteEvent(Event): """ @@ -178,9 +234,9 @@ class RemoteEvent(Event): progress information as it emerges. """ - def __init__(self, my_id, message, refs): - # type: (str, str, List[str]) -> None - super(RemoteEvent, self).__init__(message, refs) + def __init__(self, my_id, message, refs, add_to_ceph_s): + # type: (str, str, List[str], bool) -> None + super().__init__(message, refs, add_to_ceph_s) self.id = my_id self._progress = 0.0 self._failed = False @@ -222,9 +278,9 @@ class PgRecoveryEvent(Event): Always call update() immediately after construction. """ - def __init__(self, message, refs, which_pgs, which_osds, start_epoch): - # type: (str, List[Any], List[PgId], List[str], int) -> None - super(PgRecoveryEvent, self).__init__(message, refs) + def __init__(self, message, refs, which_pgs, which_osds, start_epoch, add_to_ceph_s): + # type: (str, List[Any], List[PgId], List[str], int, bool) -> None + super().__init__(message, refs, add_to_ceph_s) self._pgs = which_pgs @@ -383,7 +439,7 @@ class Module(MgrModule): def __init__(self, *args, **kwargs): super(Module, self).__init__(*args, **kwargs) - self._events = {} # type: Dict[str, Union[RemoteEvent, PgRecoveryEvent]] + self._events = {} # type: Dict[str, Union[RemoteEvent, PgRecoveryEvent, GlobalRecoveryEvent]] self._completed_events = [] # type: List[GhostEvent] self._old_osd_map = None # type: Optional[OSDMap] @@ -488,7 +544,8 @@ class Module(MgrModule): refs=[("osd", osd_id)], which_pgs=affected_pgs, which_osds=[osd_id], - start_epoch=self.get_osdmap().get_epoch() + start_epoch=self.get_osdmap().get_epoch(), + add_to_ceph_s=False ) r_ev.pg_update(self.get("pg_stats"), self.get("pg_ready"), self.log) self._events[r_ev.id] = r_ev @@ -516,6 +573,36 @@ class Module(MgrModule): self.log.warning("osd.{0} marked in".format(osd_id)) self._osd_in_out(old_osdmap, old_dump, new_osdmap, osd_id, "in") + def _pg_state_changed(self, pg_dump): + + # This function both constructs and updates + # the global recovery event if one of the + # PGs is not at active+clean state + + pgs = pg_dump['pg_stats'] + total_pg_num = len(pgs) + active_clean_num = 0 + for pg in pgs: + state = pg['state'] + + states = state.split("+") + + if "active" in states and "clean" in states: + active_clean_num += 1 + try: + # There might be a case where there is no pg_num + progress = float(active_clean_num) / total_pg_num + except ZeroDivisionError: + return + if progress < 1.0: + ev = GlobalRecoveryEvent("Global Recovery Event", + refs=[("global","")], + add_to_ceph_s=True, + start_epoch=self.get_osdmap().get_epoch(), + active_clean_num=active_clean_num) + ev.global_event_update_progress(pg_dump) + self._events[ev.id] = ev + def notify(self, notify_type, notify_data): self._ready.wait() @@ -534,13 +621,26 @@ class Module(MgrModule): # expensive get calls if len(self._events) == 0: return + + global_event = False data = self.get("pg_stats") ready = self.get("pg_ready") for ev_id in list(self._events): ev = self._events[ev_id] + # Check for types of events + # we have to update if isinstance(ev, PgRecoveryEvent): ev.pg_update(data, ready, self.log) self.maybe_complete(ev) + elif isinstance(ev, GlobalRecoveryEvent): + global_event = True + ev.global_event_update_progress(data) + self.maybe_complete(ev) + + if not global_event: + # If there is no global event + # we create one + self._pg_state_changed(data) def maybe_complete(self, event): # type: (Event) -> None @@ -619,8 +719,8 @@ class Module(MgrModule): self._shutdown.set() self.clear_all_progress_events() - def update(self, ev_id, ev_msg, ev_progress, refs=None): - # type: (str, str, float, Optional[list]) -> None + def update(self, ev_id, ev_msg, ev_progress, refs=None, add_to_ceph_s=False): + # type: (str, str, float, Optional[list], bool) -> None """ For calling from other mgr modules """ @@ -631,7 +731,7 @@ class Module(MgrModule): ev = self._events[ev_id] assert isinstance(ev, RemoteEvent) except KeyError: - ev = RemoteEvent(ev_id, ev_msg, refs) + ev = RemoteEvent(ev_id, ev_msg, refs, add_to_ceph_s) self._events[ev_id] = ev self.log.info("update: starting ev {0} ({1})".format( ev_id, ev_msg)) @@ -651,7 +751,7 @@ class Module(MgrModule): self.complete_progress_event(ev.id) self._completed_events.append( - GhostEvent(ev.id, ev.message, ev.refs, ev.started_at, + GhostEvent(ev.id, ev.message, ev.refs, ev.add_to_ceph_s, ev.started_at, failed=ev.failed, failure_message=ev.failure_message)) assert ev.id del self._events[ev.id] diff --git a/src/pybind/mgr/progress/test_progress.py b/src/pybind/mgr/progress/test_progress.py index 12a23f4a7bae..2eab07fa39f4 100644 --- a/src/pybind/mgr/progress/test_progress.py +++ b/src/pybind/mgr/progress/test_progress.py @@ -17,7 +17,7 @@ class TestPgRecoveryEvent(object): # Creating the class and Mocking # a bunch of attributes for testing module._module = mock.Mock() # just so Event._refresh() works - self.test_event = module.PgRecoveryEvent(None, None, [module.PgId(1,i) for i in range(3)], [0], 30) + self.test_event = module.PgRecoveryEvent(None, None, [module.PgId(1,i) for i in range(3)], [0], 30, False) def test_pg_update(self): # Test for a completed event when the pg states show active+clear