]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: record metrics from all MDSs in MDS rank 0
authorVenky Shankar <vshankar@redhat.com>
Mon, 5 Aug 2019 09:32:28 +0000 (05:32 -0400)
committerVenky Shankar <vshankar@redhat.com>
Mon, 1 Jun 2020 08:05:39 +0000 (04:05 -0400)
`MetricAggregator` class aggregates metrics from all active ranks
and places metrics appropriately as defined by user queries.

This is implemented as a separate dispatcher since metric update
messages from active MDSs are frequent so as to avoid messages
getting stuck in MDSRank queue (suggested by Patrick).

Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/common/options.cc
src/mds/CMakeLists.txt
src/mds/MDSRank.cc
src/mds/MDSRank.h
src/mds/MetricAggregator.cc [new file with mode: 0644]
src/mds/MetricAggregator.h [new file with mode: 0644]

index 7e563c21a84ab2155a8fdbb1bf922f483f651fbd..0ca77ed254211da6e1097030f1d90939a9109d43 100644 (file)
@@ -8317,7 +8317,13 @@ std::vector<Option> get_mds_options() {
      .set_default(15)
      .set_flag(Option::FLAG_RUNTIME)
      .set_description("timeout after which an MDS is considered laggy by rank 0 MDS.")
-     .set_long_description("timeout for replying to a ping message sent by rank 0 after which an active MDS considered laggy (delayed metrics) by rank 0.")
+     .set_long_description("timeout for replying to a ping message sent by rank 0 after which an active MDS considered laggy (delayed metrics) by rank 0."),
+
+    Option("mds_ping_interval", Option::TYPE_SECS, Option::LEVEL_ADVANCED)
+     .set_default(5)
+     .set_flag(Option::FLAG_RUNTIME)
+     .set_description("interval in seconds for sending ping messages to active MDSs.")
+     .set_long_description("interval in seconds for rank 0 to send ping messages to all active MDSs.") 
   });
 }
 
index 2f63caea57af084f572f037d31752a08e29239f2..8527667de636c82b5a806e0e4a6910125b4a26cc 100644 (file)
@@ -41,9 +41,11 @@ set(mds_srcs
   Anchor.cc
   OpenFileTable.cc
   MDSPinger.cc
+  MetricAggregator.cc
   ${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc
   ${CMAKE_SOURCE_DIR}/src/common/MemoryModel.cc
-  ${CMAKE_SOURCE_DIR}/src/osdc/Journaler.cc)
+  ${CMAKE_SOURCE_DIR}/src/osdc/Journaler.cc
+  ${CMAKE_SOURCE_DIR}/src/mgr/MDSPerfMetricTypes.cc)
 add_library(mds STATIC ${mds_srcs})
 target_link_libraries(mds PRIVATE
   heap_profiler cpu_profiler osdc liblua)
index e49b515417cd2ec25b21792a56f13c0a6eb340e7..0e1e8dd56ac26c59ca350ea54052ece812dc10e3 100644 (file)
@@ -26,6 +26,7 @@
 
 #include "MDSDaemon.h"
 #include "MDSMap.h"
+#include "MetricAggregator.h"
 #include "SnapClient.h"
 #include "SnapServer.h"
 #include "MDBalancer.h"
@@ -821,6 +822,11 @@ void MDSRankDispatcher::shutdown()
 
   purge_queue.shutdown();
 
+  // shutdown metric aggergator
+  if (metric_aggregator != nullptr) {
+    metric_aggregator->shutdown();
+  }
+
   mds_lock.unlock();
   finisher->stop(); // no flushing
   mds_lock.lock();
@@ -2014,6 +2020,15 @@ void MDSRank::active_start()
     mdcache->open_root();
   }
 
+  // metric aggregation is solely done by rank 0
+  if (is_rank0()) {
+    dout(10) << __func__ << ": initializing metric aggregator" << dendl;
+    ceph_assert(metric_aggregator == nullptr);
+    metric_aggregator = std::make_unique<MetricAggregator>(cct, this, mgrc);
+    metric_aggregator->init();
+    messenger->add_dispatcher_tail(metric_aggregator.get());
+  }
+
   mdcache->clean_open_file_lists();
   mdcache->export_remaining_imported_caps();
   finish_contexts(g_ceph_context, waiting_for_replay);  // kick waiters
@@ -2440,6 +2455,9 @@ void MDSRankDispatcher::handle_mds_map(
     }
   }
   mdcache->handle_mdsmap(*mdsmap);
+  if (metric_aggregator != nullptr) {
+    metric_aggregator->notify_mdsmap(*mdsmap);
+  }
 }
 
 void MDSRank::handle_mds_recovery(mds_rank_t who)
index 45cf67ce0cbbf2b9ed1e8b7bbb2e0f2eef37897d..1a6a04b16504ed6ce6f291e80ac8166787e7302e 100644 (file)
@@ -120,6 +120,7 @@ class SnapClient;
 class MDSTableServer;
 class MDSTableClient;
 class Messenger;
+class MetricAggregator;
 class Objecter;
 class MonClient;
 class MgrClient;
@@ -535,6 +536,8 @@ class MDSRank {
     // because its init/shutdown happens at the top level.
     PurgeQueue purge_queue;
 
+    std::unique_ptr<MetricAggregator> metric_aggregator;
+
     list<cref_t<Message>> waiting_for_nolaggy;
     MDSContext::que finished_queue;
     // Dispatch, retry, queues
@@ -583,6 +586,10 @@ private:
     void schedule_update_timer_task();
     void send_task_status();
 
+    bool is_rank0() const {
+      return whoami == (mds_rank_t)0;
+    }
+
     mono_time starttime = mono_clock::zero();
     boost::asio::io_context& ioc;
 };
diff --git a/src/mds/MetricAggregator.cc b/src/mds/MetricAggregator.cc
new file mode 100644 (file)
index 0000000..cf90de6
--- /dev/null
@@ -0,0 +1,294 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <boost/range/adaptor/map.hpp>
+#include <boost/range/algorithm/copy.hpp>
+
+#include "MDSRank.h"
+#include "MetricAggregator.h"
+#include "mgr/MgrClient.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mds
+#undef dout_prefix
+#define dout_prefix *_dout << "mds.metric.aggregator" << " " << __func__
+
+MetricAggregator::MetricAggregator(CephContext *cct, MDSRank *mds, MgrClient *mgrc)
+  : Dispatcher(cct),
+    mds(mds),
+    mgrc(mgrc),
+    mds_pinger(mds) {
+}
+
+void MetricAggregator::ping_all_active_ranks() {
+  dout(10) << ": pinging " << active_rank_addrs.size() << " active mds(s)" << dendl;
+
+  for (const auto &[rank, addr] : active_rank_addrs) {
+    dout(20) << ": pinging rank=" << rank << " addr=" << addr << dendl;
+    mds_pinger.send_ping(rank, addr);
+  }
+}
+
+int MetricAggregator::init() {
+  dout(10) << dendl;
+
+  pinger = std::thread([this]() {
+      std::unique_lock locker(lock);
+      while (!stopping) {
+        ping_all_active_ranks();
+        locker.unlock();
+        double timo = g_conf().get_val<std::chrono::seconds>("mds_ping_interval").count();
+        sleep(timo);
+        locker.lock();
+      }
+    });
+
+  return 0;
+}
+
+void MetricAggregator::shutdown() {
+  dout(10) << dendl;
+
+  {
+    std::scoped_lock locker(lock);
+    ceph_assert(!stopping);
+    stopping = true;
+  }
+
+  if (pinger.joinable()) {
+    pinger.join();
+  }
+}
+
+bool MetricAggregator::ms_can_fast_dispatch2(const cref_t<Message> &m) const {
+  return m->get_type() == MSG_MDS_METRICS;
+}
+
+void MetricAggregator::ms_fast_dispatch2(const ref_t<Message> &m) {
+  bool handled = ms_dispatch2(m);
+  ceph_assert(handled);
+}
+
+bool MetricAggregator::ms_dispatch2(const ref_t<Message> &m) {
+  if (m->get_type() == MSG_MDS_METRICS) {
+    if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MDS) {
+      handle_mds_metrics(ref_cast<MMDSMetrics>(m));
+    }
+    return true;
+  }
+  return false;
+}
+
+void MetricAggregator::refresh_metrics_for_rank(const entity_inst_t &client,
+                                                mds_rank_t rank, const Metrics &metrics) {
+  dout(20) << ": client=" << client << ", rank=" << rank << ", metrics="
+           << metrics << dendl;
+
+  auto &p = clients_by_rank.at(rank);
+  bool ins = p.insert(client).second;
+  if (ins) {
+    dout(20) << ": rank=" << rank << " has " << p.size() << " connected"
+             << " client(s)" << dendl;
+  }
+
+  auto update_counter_func = [&metrics](const MDSPerformanceCounterDescriptor &d,
+                                        PerformanceCounter *c) {
+    ceph_assert(d.is_supported());
+
+    dout(20) << ": performance_counter_descriptor=" << d << dendl;
+
+    switch (d.type) {
+    case MDSPerformanceCounterType::CAP_HIT_METRIC:
+      c->first = metrics.cap_hit_metric.hits;
+      c->second = metrics.cap_hit_metric.misses;
+      break;
+    case MDSPerformanceCounterType::READ_LATENCY_METRIC:
+      c->first = metrics.read_latency_metric.lat.tv.tv_sec;
+      c->second = metrics.read_latency_metric.lat.tv.tv_nsec;
+      break;
+    case MDSPerformanceCounterType::WRITE_LATENCY_METRIC:
+      c->first = metrics.write_latency_metric.lat.tv.tv_sec;
+      c->second = metrics.write_latency_metric.lat.tv.tv_nsec;
+      break;
+    case MDSPerformanceCounterType::METADATA_LATENCY_METRIC:
+      c->first = metrics.metadata_latency_metric.lat.tv.tv_sec;
+      c->second = metrics.metadata_latency_metric.lat.tv.tv_nsec;
+      break;
+    default:
+      ceph_abort_msg("unknown counter type");
+    }
+  };
+
+  auto sub_key_func = [client, rank](const MDSPerfMetricSubKeyDescriptor &d,
+                                     MDSPerfMetricSubKey *sub_key) {
+    ceph_assert(d.is_supported());
+
+    dout(20) << ": sub_key_descriptor=" << d << dendl;
+
+    std::string match_string;
+    switch (d.type) {
+    case MDSPerfMetricSubKeyType::MDS_RANK:
+      match_string = stringify(rank);
+      break;
+    case MDSPerfMetricSubKeyType::CLIENT_ID:
+      match_string = stringify(client);
+      break;
+    default:
+      ceph_abort_msg("unknown counter type");
+    }
+
+    dout(20) << ": match_string=" << match_string << dendl;
+
+    std::smatch match;
+    if (!std::regex_search(match_string, match, d.regex)) {
+      return false;
+    }
+    if (match.size() <= 1) {
+      return false;
+    }
+    for (size_t i = 1; i < match.size(); ++i) {
+      sub_key->push_back(match[i].str());
+    }
+    return true;
+  };
+
+  for (auto& [query, perf_key_map] : query_metrics_map) {
+    MDSPerfMetricKey key;
+    if (query.get_key(sub_key_func, &key)) {
+      query.update_counters(update_counter_func, &perf_key_map[key]);
+    }
+  }
+}
+
+void MetricAggregator::remove_metrics_for_rank(const entity_inst_t &client,
+                                               mds_rank_t rank, bool remove) {
+  dout(20) << ": client=" << client << ", rank=" << rank << dendl;
+
+  if (remove) {
+    auto &p = clients_by_rank.at(rank);
+    bool rm = p.erase(client) != 0;
+    ceph_assert(rm);
+    dout(20) << ": rank=" << rank << " has " << p.size() << " connected"
+             << " client(s)" << dendl;
+  }
+
+  auto sub_key_func = [client, rank](const MDSPerfMetricSubKeyDescriptor &d,
+                                     MDSPerfMetricSubKey *sub_key) {
+    ceph_assert(d.is_supported());
+    dout(20) << ": sub_key_descriptor=" << d << dendl;
+
+    std::string match_string;
+    switch (d.type) {
+    case MDSPerfMetricSubKeyType::MDS_RANK:
+      match_string = stringify(rank);
+      break;
+    case MDSPerfMetricSubKeyType::CLIENT_ID:
+      match_string = stringify(client);
+      break;
+    default:
+      ceph_abort_msg("unknown counter type");
+    }
+
+    dout(20) << ": match_string=" << match_string << dendl;
+
+    std::smatch match;
+    if (!std::regex_search(match_string, match, d.regex)) {
+      return false;
+    }
+    if (match.size() <= 1) {
+      return false;
+    }
+    for (size_t i = 1; i < match.size(); ++i) {
+      sub_key->push_back(match[i].str());
+    }
+    return true;
+  };
+
+  for (auto& [query, perf_key_map] : query_metrics_map) {
+    MDSPerfMetricKey key;
+    if (query.get_key(sub_key_func, &key)) {
+      if (perf_key_map.erase(key)) {
+        dout(10) << ": removed metric for key=" << key << dendl;
+      }
+    }
+  }
+}
+
+void MetricAggregator::handle_mds_metrics(const cref_t<MMDSMetrics> &m) {
+  const metrics_message_t &metrics_message = m->metrics_message;
+
+  auto seq = metrics_message.seq;
+  auto rank = metrics_message.rank;
+  auto &client_metrics_map = metrics_message.client_metrics_map;
+
+  dout(20) << ": applying " << client_metrics_map.size() << " updates for rank="
+           << rank << " with sequence number " << seq << dendl;
+
+  std::scoped_lock locker(lock);
+  if (!mds_pinger.pong_received(rank, seq)) {
+    return;
+  }
+
+  for (auto& [client, metrics] : client_metrics_map) {
+    switch (metrics.update_type) {
+    case UpdateType::UPDATE_TYPE_REFRESH:
+      refresh_metrics_for_rank(client, rank, metrics);
+      break;
+    case UpdateType::UPDATE_TYPE_REMOVE:
+      remove_metrics_for_rank(client, rank, true);
+      break;
+    default:
+      ceph_abort();
+    }
+  }
+}
+
+void MetricAggregator::cull_metrics_for_rank(mds_rank_t rank) {
+  dout(20) << ": rank=" << rank << dendl;
+
+  auto &p = clients_by_rank.at(rank);
+  for (auto &client : p) {
+    remove_metrics_for_rank(client, rank, false);
+  }
+
+  dout(10) << ": culled " << p.size() << " clients" << dendl;
+  clients_by_rank.erase(rank);
+}
+
+void MetricAggregator::notify_mdsmap(const MDSMap &mdsmap) {
+  dout(10) << dendl;
+
+  std::scoped_lock locker(lock);
+  std::set<mds_rank_t> current_active;
+  mdsmap.get_active_mds_set(current_active);
+
+  std::set<mds_rank_t> active_set;
+  boost::copy(active_rank_addrs | boost::adaptors::map_keys,
+              std::inserter(active_set, active_set.begin()));
+
+  std::set<mds_rank_t> diff;
+  std::set_difference(active_set.begin(), active_set.end(),
+                      current_active.begin(), current_active.end(),
+                      std::inserter(diff, diff.end()));
+
+  for (auto &rank : diff) {
+    dout(10) << ": mds rank=" << rank << " removed from mdsmap" << dendl;
+    active_rank_addrs.erase(rank);
+    cull_metrics_for_rank(rank);
+    mds_pinger.reset_ping(rank);
+  }
+
+  diff.clear();
+  std::set_difference(current_active.begin(), current_active.end(),
+                      active_set.begin(), active_set.end(),
+                      std::inserter(diff, diff.end()));
+
+  for (auto &rank : diff) {
+    auto rank_addr = mdsmap.get_addrs(rank);
+    dout(10) << ": active rank=" << rank << " has addr=" << rank_addr << dendl;
+    active_rank_addrs.emplace(rank, rank_addr);
+    clients_by_rank.emplace(rank, std::unordered_set<entity_inst_t>{});
+  }
+
+  dout(10) << ": active set=["  << active_rank_addrs << "]" << dendl;
+}
diff --git a/src/mds/MetricAggregator.h b/src/mds/MetricAggregator.h
new file mode 100644 (file)
index 0000000..2da8f86
--- /dev/null
@@ -0,0 +1,85 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MDS_METRIC_AGGREGATOR_H
+#define CEPH_MDS_METRIC_AGGREGATOR_H
+
+#include <map>
+#include <set>
+#include <thread>
+
+#include "msg/msg_types.h"
+#include "msg/Dispatcher.h"
+#include "common/ceph_mutex.h"
+#include "messages/MMDSMetrics.h"
+
+#include "mgr/MDSPerfMetricTypes.h"
+
+#include "mdstypes.h"
+#include "MDSMap.h"
+#include "MDSPinger.h"
+
+class MDSRank;
+class MgrClient;
+class CephContext;
+
+class MetricAggregator : public Dispatcher {
+public:
+  MetricAggregator(CephContext *cct, MDSRank *mds, MgrClient *mgrc);
+
+  int init();
+  void shutdown();
+
+  void notify_mdsmap(const MDSMap &mdsmap);
+
+  bool ms_can_fast_dispatch_any() const override {
+    return true;
+  }
+  bool ms_can_fast_dispatch2(const cref_t<Message> &m) const override;
+  void ms_fast_dispatch2(const ref_t<Message> &m) override;
+  bool ms_dispatch2(const ref_t<Message> &m) override;
+
+  void ms_handle_connect(Connection *c) override {
+  }
+  bool ms_handle_reset(Connection *c) override {
+    return false;
+  }
+  void ms_handle_remote_reset(Connection *c) override {
+  }
+  bool ms_handle_refused(Connection *c) override {
+    return false;
+  }
+
+private:
+  // drop this lock when calling ->send_message_mds() else mds might
+  // deadlock
+  ceph::mutex lock = ceph::make_mutex("MetricAggregator::lock");
+  MDSRank *mds;
+  MgrClient *mgrc;
+
+  // maintain a map of rank to list of clients so that when a rank
+  // goes away we cull metrics of clients connected to that rank.
+  std::map<mds_rank_t, std::unordered_set<entity_inst_t>> clients_by_rank;
+
+  // user query to metrics map
+  std::map<MDSPerfMetricQuery, std::map<MDSPerfMetricKey, PerformanceCounters>> query_metrics_map;
+
+  MDSPinger mds_pinger;
+  std::thread pinger;
+
+  std::map<mds_rank_t, entity_addrvec_t> active_rank_addrs;
+
+  bool stopping = false;
+
+  void handle_mds_metrics(const cref_t<MMDSMetrics> &m);
+
+  void refresh_metrics_for_rank(const entity_inst_t &client, mds_rank_t rank,
+                                const Metrics &metrics);
+  void remove_metrics_for_rank(const entity_inst_t &client, mds_rank_t rank, bool remove);
+
+  void cull_metrics_for_rank(mds_rank_t rank);
+
+  void ping_all_active_ranks();
+};
+
+#endif // CEPH_MDS_METRIC_AGGREGATOR_H