]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr, pybind/mgr, mgr/stats: be resilient to offline rank0 MDS
authorJos Collin <jcollin@redhat.com>
Tue, 29 Jun 2021 09:59:01 +0000 (15:29 +0530)
committerJos Collin <jcollin@redhat.com>
Wed, 16 Feb 2022 05:48:45 +0000 (11:18 +0530)
Reregister the user queries during the rank0 MDS failover event
by calling listener.handle_query_updated(). This enables
`ceph fs perf stats` to receive the updated metrics after the
failover.

Fixes: https://tracker.ceph.com/issues/50033
Signed-off-by: Jos Collin <jcollin@redhat.com>
14 files changed:
src/mgr/ActivePyModules.cc
src/mgr/ActivePyModules.h
src/mgr/BaseMgrModule.cc
src/mgr/DaemonServer.cc
src/mgr/DaemonServer.h
src/mgr/MDSPerfMetricCollector.cc
src/mgr/MDSPerfMetricCollector.h
src/mgr/MDSPerfMetricTypes.h
src/mgr/MetricCollector.cc
src/mgr/MetricCollector.h
src/pybind/mgr/ceph_module.pyi
src/pybind/mgr/mgr_module.py
src/pybind/mgr/stats/fs/perf_stats.py
src/pybind/mgr/stats/module.py

index 712471ca0076de09bdcca5d08834f8eb8f879958..18b988ef3e00b29163010dafd8ef6090fb971f56 100644 (file)
@@ -1426,6 +1426,11 @@ void ActivePyModules::remove_mds_perf_query(MetricQueryID query_id)
   }
 }
 
+void ActivePyModules::reregister_mds_perf_queries()
+{
+  server.reregister_mds_perf_queries();
+}
+
 PyObject *ActivePyModules::get_mds_perf_counters(MetricQueryID query_id)
 {
   MDSPerfCollector collector(query_id);
@@ -1468,6 +1473,11 @@ PyObject *ActivePyModules::get_mds_perf_counters(MetricQueryID query_id)
     f.close_section(); // i
   }
   f.close_section(); // counters
+
+  f.open_array_section("last_updated");
+  f.dump_float("last_updated_mono", collector.last_updated_mono);
+  f.close_section(); // last_updated
+
   f.close_section(); // metrics
 
   return f.get();
index 0469c4755e9e024a56d331b1981bf9f15b853672..f283224f5880785c33209bedf7045df1ce6204e5 100644 (file)
@@ -124,6 +124,7 @@ public:
       const MDSPerfMetricQuery &query,
       const std::optional<MDSPerfMetricLimit> &limit);
   void remove_mds_perf_query(MetricQueryID query_id);
+  void reregister_mds_perf_queries();
   PyObject *get_mds_perf_counters(MetricQueryID query_id);
 
   bool get_store(const std::string &module_name,
index 4c905a6f79ecdde3e654ebb4c3d387967af57290..2e894b031d48718c2121633869e7307faa1b6ab2 100644 (file)
@@ -1321,6 +1321,13 @@ ceph_remove_mds_perf_query(BaseMgrModule *self, PyObject *args)
   Py_RETURN_NONE;
 }
 
+static PyObject*
+ceph_reregister_mds_perf_queries(BaseMgrModule *self, PyObject *args)
+{
+  self->py_modules->reregister_mds_perf_queries();
+  Py_RETURN_NONE;
+}
+
 static PyObject*
 ceph_get_mds_perf_counters(BaseMgrModule *self, PyObject *args)
 {
@@ -1512,6 +1519,9 @@ PyMethodDef BaseMgrModule_methods[] = {
   {"_ceph_remove_mds_perf_query", (PyCFunction)ceph_remove_mds_perf_query,
     METH_VARARGS, "Remove an mds perf query"},
 
+  {"_ceph_reregister_mds_perf_queries", (PyCFunction)ceph_reregister_mds_perf_queries,
+    METH_NOARGS, "Re-register mds perf queries"},
+
   {"_ceph_get_mds_perf_counters", (PyCFunction)ceph_get_mds_perf_counters,
     METH_VARARGS, "Get mds perf counters"},
 
index f5c0376c7ca63366a38949a4229cac79c2f7a609..fd493a8d947d8ba29f0a2b6135d0108e9ee9528e 100644 (file)
@@ -3084,6 +3084,11 @@ int DaemonServer::remove_mds_perf_query(MetricQueryID query_id)
   return mds_perf_metric_collector.remove_query(query_id);
 }
 
+void DaemonServer::reregister_mds_perf_queries()
+{
+  mds_perf_metric_collector.reregister_queries();
+}
+
 int DaemonServer::get_mds_perf_counters(MDSPerfCollector *collector)
 {
   return mds_perf_metric_collector.get_counters(collector);
index b3c9f191aa194de1d2b8030596f0b904a1c1263c..d719a96a7bcc36cd3d051b839063d54bc30e4bf7 100644 (file)
@@ -296,6 +296,7 @@ public:
   MetricQueryID add_mds_perf_query(const MDSPerfMetricQuery &query,
                                    const std::optional<MDSPerfMetricLimit> &limit);
   int remove_mds_perf_query(MetricQueryID query_id);
+  void reregister_mds_perf_queries();
   int get_mds_perf_counters(MDSPerfCollector *collector);
 
   virtual const char** get_tracked_conf_keys() const override;
index 74404a89d515538519e895a322718be30843fd77..62298aba36a01b00d859327ad05c8332240f56ec 100644 (file)
@@ -33,6 +33,8 @@ void MDSPerfMetricCollector::process_reports(const MetricPayload &payload) {
   // update delayed rank set
   delayed_ranks = metric_report.rank_metrics_delayed;
   dout(20) << ": delayed ranks=[" << delayed_ranks << "]" << dendl;
+
+  clock_gettime(CLOCK_MONOTONIC_COARSE, &last_updated_mono);
 }
 
 int MDSPerfMetricCollector::get_counters(PerfCollector *collector) {
@@ -47,6 +49,7 @@ int MDSPerfMetricCollector::get_counters(PerfCollector *collector) {
 
   get_delayed_ranks(&c->delayed_ranks);
 
+  get_last_updated(&c->last_updated_mono);
   return r;
 }
 
@@ -54,3 +57,8 @@ void MDSPerfMetricCollector::get_delayed_ranks(std::set<mds_rank_t> *ranks) {
   ceph_assert(ceph_mutex_is_locked(lock));
   *ranks = delayed_ranks;
 }
+
+void MDSPerfMetricCollector::get_last_updated(utime_t *ts) {
+  ceph_assert(ceph_mutex_is_locked(lock));
+  *ts = utime_t(last_updated_mono);
+}
index c6b379ec5b78bf6a1ecfc993f11ff5d2a01a9b28..c72bce09103554482bd505f2de246008bec9b6f9 100644 (file)
@@ -13,9 +13,11 @@ class MDSPerfMetricCollector
                            MDSPerfMetrics> {
 private:
   std::set<mds_rank_t> delayed_ranks;
+  struct timespec last_updated_mono;
 
   void get_delayed_ranks(std::set<mds_rank_t> *ranks);
 
+  void get_last_updated(utime_t *ts);
 public:
   MDSPerfMetricCollector(MetricListener &listener);
 
index b778b0347431a1a0eda267c41dcadfc86ea28277..a965e5fa7122f39f04a442ad7732676585d53665 100644 (file)
@@ -317,6 +317,7 @@ std::ostream &operator<<(std::ostream &os, const MDSPerfMetricQuery &query);
 struct MDSPerfCollector : PerfCollector {
   std::map<MDSPerfMetricKey, PerformanceCounters> counters;
   std::set<mds_rank_t> delayed_ranks;
+  utime_t last_updated_mono;
 
   MDSPerfCollector(MetricQueryID query_id)
     : PerfCollector(query_id) {
index 7da0dae7fe47429d58992334d3bbc5fd2709e251..c31dcf0b90c6d042555f49f3be38e22f75d4d107 100644 (file)
@@ -114,6 +114,12 @@ void MetricCollector<Query, Limit, Key, Report>::remove_all_queries() {
   }
 }
 
+template <typename Query, typename Limit, typename Key, typename Report>
+void MetricCollector<Query, Limit, Key, Report>::reregister_queries() {
+  dout(20) << dendl;
+  listener.handle_query_updated();
+}
+
 template <typename Query, typename Limit, typename Key, typename Report>
 int MetricCollector<Query, Limit, Key, Report>::get_counters_generic(
     MetricQueryID query_id, std::map<Key, PerformanceCounters> *c) {
index 65e71e276e4326ee537d576d3783018f52a8d5a4..91fa78781954218ac85449eb0541626bf5f67ca8 100644 (file)
@@ -34,6 +34,8 @@ public:
 
   void remove_all_queries();
 
+  void reregister_queries();
+
   std::map<Query, Limits> get_queries() const {
     std::lock_guard locker(lock);
 
index ca323870e58525868a14e25efcc6e064ef079aba..1719192957611c7bfb34332d98a9afc43a89f33d 100644 (file)
@@ -110,6 +110,7 @@ class BaseMgrModule(object):
     def _ceph_get_osd_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]: ...
     def _ceph_add_mds_perf_query(self, query: Dict[str, Dict[str, Any]]) -> Optional[int]: ...
     def _ceph_remove_mds_perf_query(self, query_id: int) -> None: ...
+    def _ceph_reregister_mds_perf_queries(self) -> None: ...
     def _ceph_get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]: ...
     def _ceph_unregister_client(self, addrs: str) -> None: ...
     def _ceph_register_client(self, addrs: str) -> None: ...
index c3a295d33c7eb96e4c82a1c3feec9bcf4904406f..d1076b51be81d252c23f31f05349e556f1f67d84 100644 (file)
@@ -2204,6 +2204,13 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         return self._ceph_remove_mds_perf_query(query_id)
 
     @API.expose
+
+    def reregister_mds_perf_queries(self) -> None:
+        """
+        Re-register MDS perf queries.
+        """
+        return self._ceph_reregister_mds_perf_queries()
+
     def get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
         """
         Get stats collected for an MDS perf query.
index 13d2adc8fc1dac596798d355d5ba2187282fabd3..94ef3924cb2c31a93bca6fd618b32b6919d84f61 100644 (file)
@@ -4,13 +4,14 @@ import time
 import uuid
 import errno
 import traceback
+import logging
 from collections import OrderedDict
 from typing import List, Dict, Set
 
 from mgr_module import CommandResult
 
 from datetime import datetime, timedelta
-from threading import Lock, Condition, Thread
+from threading import Lock, Condition, Thread, Timer
 from ipaddress import ip_address
 
 PERF_STATS_VERSION = 1
@@ -41,6 +42,7 @@ MDS_PERF_QUERY_COUNTERS = [] # type: List[str]
 MDS_GLOBAL_PERF_QUERY_COUNTERS = ['cap_hit', 'read_latency', 'write_latency', 'metadata_latency', 'dentry_lease', 'opened_files', 'pinned_icaps', 'opened_inodes', 'read_io_sizes', 'write_io_sizes'] # type: List[str]
 
 QUERY_EXPIRE_INTERVAL = timedelta(minutes=1)
+REREGISTER_TIMER_INTERVAL = 1
 
 CLIENT_METADATA_KEY = "client_metadata"
 CLIENT_METADATA_SUBKEYS = ["hostname", "root"]
@@ -48,6 +50,8 @@ CLIENT_METADATA_SUBKEYS_OPTIONAL = ["mount_point"]
 
 NON_EXISTENT_KEY_STR = "N/A"
 
+logger = logging.getLogger(__name__)
+
 class FilterSpec(object):
     """
     query filters encapsulated and used as key for query map
@@ -118,6 +122,7 @@ class FSPerfStats(object):
     user_queries = {} # type: Dict[str, Dict]
 
     meta_lock = Lock()
+    rqtimer = None
     client_metadata = {
         'metadata' : {},
         'to_purge' : set(),
@@ -127,6 +132,7 @@ class FSPerfStats(object):
     def __init__(self, module):
         self.module = module
         self.log = module.log
+        self.prev_rank0_gid = None
         # report processor thread
         self.report_processor = Thread(target=self.run)
         self.report_processor.start()
@@ -136,7 +142,7 @@ class FSPerfStats(object):
         if not key in result or not result[key] == meta:
             result[key] = meta
 
-    def notify(self, cmdtag):
+    def notify_cmd(self, cmdtag):
         self.log.debug("cmdtag={0}".format(cmdtag))
         with self.meta_lock:
             try:
@@ -177,6 +183,46 @@ class FSPerfStats(object):
             self.log.debug("client_metadata={0}, to_purge={1}".format(
                 self.client_metadata['metadata'], self.client_metadata['to_purge']))
 
+    def notify_fsmap(self):
+        #Reregister the user queries when there is a new rank0 mds
+        with self.lock:
+            gid_state = FSPerfStats.get_rank0_mds_gid_state(self.module.get('fs_map'))
+            if not gid_state:
+                return
+            rank0_gid, state = gid_state
+            if (rank0_gid and rank0_gid != self.prev_rank0_gid and state == 'up:active'):
+                #the new rank0 MDS is up:active
+                ua_last_updated = time.monotonic()
+                if (self.rqtimer and self.rqtimer.is_alive()):
+                    self.rqtimer.cancel()
+                self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL,
+                                     self.re_register_queries, args=(rank0_gid, ua_last_updated,))
+                self.rqtimer.start()
+
+    def re_register_queries(self, rank0_gid, ua_last_updated):
+        #reregister queries if the metrics are the latest. Otherwise reschedule the timer and
+        #wait for the empty metrics
+        with self.lock:
+            if self.mx_last_updated >= ua_last_updated:
+                self.log.debug("reregistering queries...")
+                self.module.reregister_mds_perf_queries()
+                self.prev_rank0_gid = rank0_gid
+            else:
+                #reschedule the timer
+                self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL,
+                                     self.re_register_queries, args=(rank0_gid, ua_last_updated,))
+                self.rqtimer.start()
+
+    @staticmethod
+    def get_rank0_mds_gid_state(fsmap):
+        for fs in fsmap['filesystems']:
+            mds_map = fs['mdsmap']
+            if mds_map is not None:
+                for mds_id, mds_status in mds_map['info'].items():
+                    if mds_status['rank'] == 0:
+                        return mds_status['gid'], mds_status['state']
+        logger.warn("No rank0 mds in the fsmap")
+
     def update_client_meta(self, rank_set):
         new_updates = {}
         pending_updates = [v[0] for v in self.client_metadata['in_progress'].values()]
@@ -277,6 +323,9 @@ class FSPerfStats(object):
             # what's received from MDS
             incoming_metrics = result['metrics'][1]
 
+            # metrics updated (monotonic) time
+            self.mx_last_updated = result['metrics'][2][0]
+
             # cull missing MDSs and clients
             self.cull_missing_entries(raw_perf_counters, incoming_metrics)
 
index d942c19e87bba5bf6fc887e292b7e569d2c652d2..fcc1bce97b5cf617b3d58b4e27904e64b766b3b6 100644 (file)
@@ -21,7 +21,7 @@ class Module(MgrModule):
         },
     ]
     MODULE_OPTIONS: List[Option] = []
-    NOTIFY_TYPES = [NotifyType.command]
+    NOTIFY_TYPES = [NotifyType.command, NotifyType.fs_map]
 
     def __init__(self, *args, **kwargs):
         super(Module, self).__init__(*args, **kwargs)
@@ -29,7 +29,9 @@ class Module(MgrModule):
 
     def notify(self, notify_type: NotifyType, notify_id):
         if notify_type == NotifyType.command:
-            self.fs_perf_stats.notify(notify_id)
+            self.fs_perf_stats.notify_cmd(notify_id)
+        elif notify_type == NotifyType.fs_map:
+            self.fs_perf_stats.notify_fsmap()
 
     def handle_command(self, inbuf, cmd):
         prefix = cmd['prefix']