]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/progress: Global Recovery Event in ceph -s 37327/head
authorKamoltat <ksirivad@redhat.com>
Tue, 22 Sep 2020 13:12:00 +0000 (13:12 +0000)
committerKamoltat <ksirivad@redhat.com>
Thu, 22 Oct 2020 16:44:50 +0000 (16:44 +0000)
Modified the progress module and BaseMgrModule to
support Global Recovert Event. Adding more arguments
to update_progress_event, ceph_update_progress_event.
To only show global recovery event progress with `ceph -s`.
All sub events have been move to `ceph progress`

Signed-off-by: Kamoltat <ksirivad@redhat.com>
qa/tasks/mgr/test_progress.py
src/mgr/ActivePyModules.cc
src/mgr/ActivePyModules.h
src/mgr/BaseMgrModule.cc
src/mon/Monitor.cc
src/mon/mon_types.h
src/pybind/mgr/ceph_module.pyi
src/pybind/mgr/mgr_module.py
src/pybind/mgr/progress/module.py
src/pybind/mgr/progress/test_progress.py

index 16e5df1706121d3630924cfce0ef9fd1208fde97..fa73b951096c4b31c4e6c63f6d1ecc28f3cc596e 100644 (file)
@@ -44,12 +44,71 @@ class TestProgress(MgrTestCase):
         log.info(json.dumps(p, indent=2))
         return p['events']
 
+    def _completed_events(self):
+        """
+        This function returns all events that are completed
+        """
+        p = self._get_progress()
+        log.info(json.dumps(p, indent=2))
+        return p['completed']
+
+    def is_osd_marked_out(self, ev):
+        return ev['message'].endswith('marked out')
+
+    def is_osd_marked_in(self, ev):
+        return ev['message'].endswith('marked in')
+
+    def _osd_in_out_events_count(self, marked='both'):
+        """
+        Count the number of on going recovery events that deals with
+        OSDs being marked in, out or both.
+        """
+        events_in_progress = self._events_in_progress()
+        marked_in_count = 0
+        marked_out_count = 0
+
+        for ev in events_in_progress:
+            if self.is_osd_marked_out(ev):
+                marked_out_count += 1
+            elif self.is_osd_marked_in(ev):
+                marked_in_count += 1
+
+        if marked == 'both':
+            return marked_in_count + marked_out_count
+        elif marked == 'in':
+            return marked_in_count
+        else:
+            return marked_out_count
+
     def _setup_pool(self, size=None):
         self.mgr_cluster.mon_manager.create_pool(self.POOL)
         if size is not None:
             self.mgr_cluster.mon_manager.raw_cluster_cmd(
                 'osd', 'pool', 'set', self.POOL, 'size', str(size))
 
+    def _osd_in_out_completed_events_count(self, marked='both'):
+        """
+        Count the number of completed recovery events that deals with
+        OSDs being marked in, out, or both.
+        """
+
+        completed_events = self._completed_events()
+        marked_in_count = 0
+        marked_out_count = 0
+
+        for ev in completed_events:
+            if self.is_osd_marked_out(ev):
+                marked_out_count += 1
+            elif self.is_osd_marked_in(ev):
+                marked_in_count += 1
+
+        if marked == 'both':
+            return marked_in_count + marked_out_count
+        elif marked == 'in':
+            return marked_in_count
+        else:
+            return marked_out_count
+
     def _write_some_data(self, t):
         """
         To adapt to test systems of varying performance, we write
@@ -110,19 +169,16 @@ class TestProgress(MgrTestCase):
         ev = self._all_events()[0]
         log.info(json.dumps(ev, indent=1))
         self.assertIn("Rebalancing after osd.0 marked out", ev['message'])
-        
         return ev
 
     def _simulate_back_in(self, osd_ids, initial_event):
-        
         for osd_id in osd_ids:
             self.mgr_cluster.mon_manager.raw_cluster_cmd(
                     'osd', 'in', str(osd_id))
-        
+
         # First Event should complete promptly
         self.wait_until_true(lambda: self._is_complete(initial_event['id']),
                              timeout=self.EVENT_CREATION_PERIOD)
-
         try:
             # Wait for progress event marked in to pop up
             self.wait_until_equal(lambda: len(self._events_in_progress()), 1,
@@ -135,9 +191,6 @@ class TestProgress(MgrTestCase):
             return None
 
         new_event = self._events_in_progress()[0]
-        log.info(json.dumps(new_event, indent=1))
-        self.assertIn("Rebalancing after osd.0 marked in", new_event['message'])    
-        
         return new_event
 
     def _is_quiet(self):
@@ -180,7 +233,7 @@ class TestProgress(MgrTestCase):
         # Wait for progress event to ultimately reach completion
         self.wait_until_true(lambda: self._is_complete(ev['id']),
                              timeout=self.RECOVERY_PERIOD)
-        self.assertTrue(self._is_quiet())
+        self.assertEqual(self._osd_in_out_events_count(), 0)
 
     def test_pool_removal(self):
         """
@@ -195,7 +248,7 @@ class TestProgress(MgrTestCase):
         # Event should complete promptly
         self.wait_until_true(lambda: self._is_complete(ev['id']),
                              timeout=self.EVENT_CREATION_PERIOD)
-        self.assertTrue(self._is_quiet())
+        self.assertEqual(self._osd_in_out_events_count(), 0)
 
     def test_osd_came_back(self):
         """
@@ -213,7 +266,7 @@ class TestProgress(MgrTestCase):
             self.wait_until_true(lambda: self._is_complete(ev2['id']),
                                  timeout=self.RECOVERY_PERIOD)
 
-        self.assertTrue(self._is_quiet())
+        self.assertEqual(self._osd_in_out_events_count(), 0)
 
     def test_osd_cannot_recover(self):
         """
@@ -243,14 +296,15 @@ class TestProgress(MgrTestCase):
 
         # We should see an event for each of the OSDs we took out
         self.wait_until_equal(
-            lambda: len(self._all_events()),
+            lambda: self._osd_in_out_events_count('out'),
             osd_count - pool_size,
-            timeout=self.EVENT_CREATION_PERIOD)
+            timeout=self.EVENT_CREATION_PERIOD*(osd_count - pool_size))
 
         # Those should complete cleanly
-        self.wait_until_true(
-            lambda: self._is_quiet(),
-            timeout=self.RECOVERY_PERIOD
+        self.wait_until_equal(
+            lambda: self._osd_in_out_completed_events_count('out'),
+            osd_count - pool_size,
+            timeout=self.RECOVERY_PERIOD*(osd_count - pool_size)
         )
 
         # Fail one last OSD, at the point the PGs have nowhere to go
@@ -261,4 +315,6 @@ class TestProgress(MgrTestCase):
         # Check that no event is created
         time.sleep(self.EVENT_CREATION_PERIOD)
 
-        self.assertEqual(len(self._all_events()), osd_count - pool_size)
+        self.assertEqual(
+            self._osd_in_out_completed_events_count('out'),
+            osd_count - pool_size)
index a7fc09407673c457a0f0a2c45347978f13c5a9a1..d08a9fd1c2f405ae5ca53ea7407028663e8cede6 100644 (file)
@@ -986,12 +986,14 @@ void ActivePyModules::get_health_checks(health_check_map_t *checks)
 void ActivePyModules::update_progress_event(
   const std::string& evid,
   const std::string& desc,
-  float progress)
+  float progress,
+  bool add_to_ceph_s)
 {
   std::lock_guard l(lock);
   auto& pe = progress_events[evid];
   pe.message = desc;
   pe.progress = progress;
+  pe.add_to_ceph_s = add_to_ceph_s;
 }
 
 void ActivePyModules::complete_progress_event(const std::string& evid)
index 4892f2705fcda596095f2a527395c5e516f76fc8..a6d334bbdded670587330cb1aa75ace270107cad 100644 (file)
@@ -132,7 +132,8 @@ public:
 
   void update_progress_event(const std::string& evid,
                             const std::string& desc,
-                            float progress);
+                            float progress,
+                            bool add_to_ceph_s);
   void complete_progress_event(const std::string& evid);
   void clear_all_progress_events();
   void get_progress_events(std::map<std::string,ProgressEvent>* events);
index b62b681d641739851acfcb3bc71b9ab566055f33..db9afc244c86cda9f2029b0276b8020cc11e8ec5 100644 (file)
@@ -675,13 +675,14 @@ ceph_update_progress_event(BaseMgrModule *self, PyObject *args)
   char *evid = nullptr;
   char *desc = nullptr;
   float progress = 0.0;
-  if (!PyArg_ParseTuple(args, "ssf:ceph_update_progress_event",
-                       &evid, &desc, &progress)) {
+  bool add_to_ceph_s = false;
+  if (!PyArg_ParseTuple(args, "ssfb:ceph_update_progress_event",
+                       &evid, &desc, &progress, &add_to_ceph_s)) {
     return nullptr;
   }
 
   PyThreadState *tstate = PyEval_SaveThread();
-  self->py_modules->update_progress_event(evid, desc, progress);
+  self->py_modules->update_progress_event(evid, desc, progress, add_to_ceph_s);
   PyEval_RestoreThread(tstate);
 
   Py_RETURN_NONE;
index 08e0cbf7d095cfdf7dc382287837307b60ea60ae..8f127ca8b4ea35668ea23560166b32e3616fc5b0 100644 (file)
@@ -3083,7 +3083,9 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f,
     if (!pem.empty()) {
       ss << "\n \n  progress:\n";
       for (auto& i : pem) {
+       if (i.second.add_to_ceph_s){
        ss << "    " << i.second.message << "\n";
+       }
       }
     }
     ss << "\n ";
index e106d1a7b75d556920f06d08bf47b1771346f716..b6d916ae998bc62473a9a546f9792911b65c9ef0 100644 (file)
@@ -641,22 +641,31 @@ inline std::ostream& operator<<(std::ostream& out, const mon_feature_t& f) {
 struct ProgressEvent {
   std::string message;                  ///< event description
   float progress;                  ///< [0..1]
-
+  bool add_to_ceph_s;
   void encode(ceph::buffer::list& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     encode(message, bl);
     encode(progress, bl);
+    encode(add_to_ceph_s, bl);
     ENCODE_FINISH(bl);
   }
   void decode(ceph::buffer::list::const_iterator& p) {
-    DECODE_START(1, p);
+    DECODE_START(2, p);
     decode(message, p);
     decode(progress, p);
+    if (struct_v >= 2){
+       decode(add_to_ceph_s, p);
+    } else {
+      if (!message.empty()) {
+       add_to_ceph_s = true;
+      }
+    }
     DECODE_FINISH(p);
   }
   void dump(ceph::Formatter *f) const {
     f->dump_string("message", message);
     f->dump_float("progress", progress);
+    f->dump_bool("add_to_ceph_s", add_to_ceph_s);
   }
 };
 WRITE_CLASS_ENCODER(ProgressEvent)
index d71259c0dadee231ef561070b840b316c9f68375..0bb7ae4bad6d38266b70c138f6b48f930df12d7c 100644 (file)
@@ -61,7 +61,7 @@ class BaseMgrModule(object):
     def _ceph_get_osdmap(self):...
     def _ceph_set_uri(self, uri):...
     def _ceph_have_mon_connection(self):...
-    def _ceph_update_progress_event(self, evid, desc, progress):...
+    def _ceph_update_progress_event(self, evid, desc, progress, add_to_ceph_s):...
     def _ceph_complete_progress_event(self, evid):...
     def _ceph_clear_all_progress_events(self):...
     def _ceph_dispatch_remote(self, module_name, method_name, *args, **kwargs):...
index c30e5638c4f8311c14c301c9f3aa5f16c90ec1ce..0ba0f4531e5ad20c64cad071d8f07efbef6bd545 100644 (file)
@@ -1491,10 +1491,11 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
 
         return self._ceph_have_mon_connection()
 
-    def update_progress_event(self, evid, desc, progress):
+    def update_progress_event(self, evid, desc, progress, add_to_ceph_s):
         return self._ceph_update_progress_event(str(evid),
                                                 str(desc),
-                                                float(progress))
+                                                float(progress), 
+                                                bool(add_to_ceph_s))
 
     def complete_progress_event(self, evid):
         return self._ceph_complete_progress_event(str(evid))
index 78aa278d8f950143b59b077ea11f3cadb09dd058..40098d7b43bc44112912bfc02c24bf2c134912c8 100644 (file)
@@ -29,12 +29,13 @@ class Event(object):
     objects (osds, pools) this relates to.
     """
 
-    def __init__(self, message, refs, started_at=None):
-        # type: (str, List[str], Optional[float]) -> None
+    def __init__(self, message, refs, add_to_ceph_s, started_at=None):
+        # type: (str, List[str], bool, Optional[float]) -> None
         self._message = message
         self._refs = refs
         self.started_at = started_at if started_at else time.time()
         self.id = None  # type: Optional[str]
+        self._add_to_ceph_s = add_to_ceph_s
 
     def _refresh(self):
         global _module
@@ -42,7 +43,7 @@ class Event(object):
         _module.log.debug('refreshing mgr for %s (%s) at %f' % (self.id, self._message,
                                                                 self.progress))
         _module.update_progress_event(
-            self.id, self.twoline_progress(6), self.progress)
+            self.id, self.twoline_progress(6), self.progress, self._add_to_ceph_s)
 
     @property
     def message(self):
@@ -53,6 +54,12 @@ class Event(object):
     def refs(self):
         # type: () -> List[str]
         return self._refs
+    @property
+    def add_to_ceph_s(self):
+        # type: () -> bool
+        return self._add_to_ceph_s
+
 
     @property
     def progress(self):
@@ -133,9 +140,9 @@ class GhostEvent(Event):
     after the event is complete.
     """
 
-    def __init__(self, my_id, message, refs, started_at, finished_at=None,
+    def __init__(self, my_id, message, refs, add_to_ceph_s, started_at, finished_at=None,
                  failed=False, failure_message=None):
-        super(GhostEvent, self).__init__(message, refs, started_at)
+        super().__init__(message, refs, add_to_ceph_s, started_at)
         self.finished_at = finished_at if finished_at else time.time()
         self.id = my_id
 
@@ -163,13 +170,62 @@ class GhostEvent(Event):
             "message": self.message,
             "refs": self._refs,
             "started_at": self.started_at,
-            "finished_at": self.finished_at
+            "finished_at": self.finished_at,
+            "add_to_ceph_s:": self.add_to_ceph_s
         }
         if self._failed:
             d["failed"] = True
             d["failure_message"] = self._failure_message
         return d
 
+class GlobalRecoveryEvent(Event):
+    """
+    An event whoese completion is determined by active+clean/total_pg_num
+    """
+
+    def __init__(self, message, refs, add_to_ceph_s, start_epoch, active_clean_num):
+        # type: (str, List[Any], bool, int, int) -> None
+        super().__init__(message, refs, add_to_ceph_s)
+        self._add_to_ceph_s = add_to_ceph_s
+        self._progress = 0.0
+        self.id = str(uuid.uuid4()) # type: str
+        self._start_epoch = start_epoch
+        self._active_clean_num = active_clean_num
+        self._refresh()
+
+    def global_event_update_progress(self, pg_dump):
+        # type: (Dict) -> None
+        "Update progress of Global Recovery Event"
+
+        pgs = pg_dump['pg_stats']
+        new_active_clean_num = 0
+        for pg in pgs:
+
+            if int(pg['reported_epoch']) < int(self._start_epoch):
+                continue
+
+            state = pg['state']
+
+            states = state.split("+")
+
+            if "active" in states and "clean" in states:
+                new_active_clean_num += 1
+
+        total_pg_num = len(pgs)
+        if self._active_clean_num != new_active_clean_num:
+            # Have this case to know when need to update
+            # the progress
+            try:
+                # Might be that total_pg_num is 0
+                self._progress = float(new_active_clean_num) / total_pg_num
+            except ZeroDivisionError:
+                self._progress = 0.0
+
+        self._refresh()
+
+    @property
+    def progress(self):
+        return self._progress
 
 class RemoteEvent(Event):
     """
@@ -178,9 +234,9 @@ class RemoteEvent(Event):
     progress information as it emerges.
     """
 
-    def __init__(self, my_id, message, refs):
-        # type: (str, str, List[str]) -> None
-        super(RemoteEvent, self).__init__(message, refs)
+    def __init__(self, my_id, message, refs, add_to_ceph_s):
+        # type: (str, str, List[str], bool) -> None
+        super().__init__(message, refs, add_to_ceph_s)
         self.id = my_id
         self._progress = 0.0
         self._failed = False
@@ -222,9 +278,9 @@ class PgRecoveryEvent(Event):
     Always call update() immediately after construction.
     """
 
-    def __init__(self, message, refs, which_pgs, which_osds, start_epoch):
-        # type: (str, List[Any], List[PgId], List[str], int) -> None
-        super(PgRecoveryEvent, self).__init__(message, refs)
+    def __init__(self, message, refs, which_pgs, which_osds, start_epoch, add_to_ceph_s):
+        # type: (str, List[Any], List[PgId], List[str], int, bool) -> None
+        super().__init__(message, refs, add_to_ceph_s)
 
         self._pgs = which_pgs
 
@@ -383,7 +439,7 @@ class Module(MgrModule):
     def __init__(self, *args, **kwargs):
         super(Module, self).__init__(*args, **kwargs)
 
-        self._events = {}  # type: Dict[str, Union[RemoteEvent, PgRecoveryEvent]]
+        self._events = {}  # type: Dict[str, Union[RemoteEvent, PgRecoveryEvent, GlobalRecoveryEvent]]
         self._completed_events = [] # type: List[GhostEvent]
 
         self._old_osd_map = None  # type: Optional[OSDMap]
@@ -488,7 +544,8 @@ class Module(MgrModule):
                     refs=[("osd", osd_id)],
                     which_pgs=affected_pgs,
                     which_osds=[osd_id],
-                    start_epoch=self.get_osdmap().get_epoch()
+                    start_epoch=self.get_osdmap().get_epoch(),
+                    add_to_ceph_s=False
                     )
             r_ev.pg_update(self.get("pg_stats"), self.get("pg_ready"), self.log)
             self._events[r_ev.id] = r_ev
@@ -516,6 +573,36 @@ class Module(MgrModule):
                     self.log.warning("osd.{0} marked in".format(osd_id))
                     self._osd_in_out(old_osdmap, old_dump, new_osdmap, osd_id, "in")
 
+    def _pg_state_changed(self, pg_dump):
+
+        # This function both constructs and updates
+        # the global recovery event if one of the
+        # PGs is not at active+clean state
+
+        pgs = pg_dump['pg_stats']
+        total_pg_num = len(pgs)
+        active_clean_num = 0
+        for pg in pgs:
+            state = pg['state']
+
+            states = state.split("+")
+
+            if "active" in states and "clean" in states:
+                active_clean_num += 1
+        try:
+            # There might be a case where there is no pg_num
+            progress = float(active_clean_num) / total_pg_num
+        except ZeroDivisionError:
+            return
+        if progress < 1.0:
+            ev = GlobalRecoveryEvent("Global Recovery Event",
+                    refs=[("global","")],
+                    add_to_ceph_s=True,
+                    start_epoch=self.get_osdmap().get_epoch(),
+                    active_clean_num=active_clean_num)
+            ev.global_event_update_progress(pg_dump)
+            self._events[ev.id] = ev
+
     def notify(self, notify_type, notify_data):
         self._ready.wait()
 
@@ -534,13 +621,26 @@ class Module(MgrModule):
             # expensive get calls
             if len(self._events) == 0:
                 return
+            global_event = False
             data = self.get("pg_stats")
             ready = self.get("pg_ready")
             for ev_id in list(self._events):
                 ev = self._events[ev_id]
+                # Check for types of events 
+                # we have to update
                 if isinstance(ev, PgRecoveryEvent):
                     ev.pg_update(data, ready, self.log)
                     self.maybe_complete(ev)
+                elif isinstance(ev, GlobalRecoveryEvent):
+                    global_event = True
+                    ev.global_event_update_progress(data)
+                    self.maybe_complete(ev)
+
+            if not global_event:
+                # If there is no global event 
+                # we create one
+                self._pg_state_changed(data)
 
     def maybe_complete(self, event):
         # type: (Event) -> None
@@ -619,8 +719,8 @@ class Module(MgrModule):
         self._shutdown.set()
         self.clear_all_progress_events()
 
-    def update(self, ev_id, ev_msg, ev_progress, refs=None):
-        # type: (str, str, float, Optional[list]) -> None
+    def update(self, ev_id, ev_msg, ev_progress, refs=None, add_to_ceph_s=False):
+        # type: (str, str, float, Optional[list], bool) -> None
         """
         For calling from other mgr modules
         """
@@ -631,7 +731,7 @@ class Module(MgrModule):
             ev = self._events[ev_id]
             assert isinstance(ev, RemoteEvent)
         except KeyError:
-            ev = RemoteEvent(ev_id, ev_msg, refs)
+            ev = RemoteEvent(ev_id, ev_msg, refs, add_to_ceph_s)
             self._events[ev_id] = ev
             self.log.info("update: starting ev {0} ({1})".format(
                 ev_id, ev_msg))
@@ -651,7 +751,7 @@ class Module(MgrModule):
         self.complete_progress_event(ev.id)
 
         self._completed_events.append(
-            GhostEvent(ev.id, ev.message, ev.refs, ev.started_at,
+            GhostEvent(ev.id, ev.message, ev.refs, ev.add_to_ceph_s, ev.started_at,
                        failed=ev.failed, failure_message=ev.failure_message))
         assert ev.id
         del self._events[ev.id]
index 12a23f4a7baedd20be5f5a2dd32ba7fae363af41..2eab07fa39f4f5ecfe0a452f18b8328199129be6 100644 (file)
@@ -17,7 +17,7 @@ class TestPgRecoveryEvent(object):
         # Creating the class and Mocking 
         # a bunch of attributes for testing
         module._module = mock.Mock() # just so Event._refresh() works
-        self.test_event = module.PgRecoveryEvent(None, None, [module.PgId(1,i) for i in range(3)], [0], 30)
+        self.test_event = module.PgRecoveryEvent(None, None, [module.PgId(1,i) for i in range(3)], [0], 30, False)
 
     def test_pg_update(self):
         # Test for a completed event when the pg states show active+clear