]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mds: track per session client metrics
authorVenky Shankar <vshankar@redhat.com>
Mon, 4 Feb 2019 13:19:58 +0000 (18:49 +0530)
committerVenky Shankar <vshankar@redhat.com>
Mon, 1 Jun 2020 08:05:39 +0000 (04:05 -0400)
Every MDS maintains a view of metrics that are forwarded to
it by clients. This is updated when clients forward metrics
via MClientMetrics message type. Periodically, each MDS
forwards its collected metrics to MDS rank 0 (which maintains
an aggregated view of metrics of all clients on all ranks).

Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/common/options.cc
src/mds/CMakeLists.txt
src/mds/MDSRank.cc
src/mds/MDSRank.h
src/mds/MetricsHandler.cc [new file with mode: 0644]
src/mds/MetricsHandler.h [new file with mode: 0644]
src/mds/Server.cc
src/mds/Server.h

index 0ca77ed254211da6e1097030f1d90939a9109d43..f57ce579251c6f2bc2e4668bccb06746e5483325 100644 (file)
@@ -8323,7 +8323,13 @@ std::vector<Option> get_mds_options() {
      .set_default(5)
      .set_flag(Option::FLAG_RUNTIME)
      .set_description("interval in seconds for sending ping messages to active MDSs.")
-     .set_long_description("interval in seconds for rank 0 to send ping messages to all active MDSs.") 
+     .set_long_description("interval in seconds for rank 0 to send ping messages to all active MDSs."),
+
+    Option("mds_metrics_update_interval", Option::TYPE_SECS, Option::LEVEL_ADVANCED)
+     .set_default(2)
+     .set_flag(Option::FLAG_RUNTIME)
+     .set_description("interval in seconds for metrics data update.")
+     .set_long_description("interval in seconds after which active MDSs send client metrics data to rank 0.")
   });
 }
 
index 8527667de636c82b5a806e0e4a6910125b4a26cc..38ea8adf2dda813abae59542c09719f9f2d24b5b 100644 (file)
@@ -42,6 +42,7 @@ set(mds_srcs
   OpenFileTable.cc
   MDSPinger.cc
   MetricAggregator.cc
+  MetricsHandler.cc
   ${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc
   ${CMAKE_SOURCE_DIR}/src/common/MemoryModel.cc
   ${CMAKE_SOURCE_DIR}/src/osdc/Journaler.cc
index 0e1e8dd56ac26c59ca350ea54052ece812dc10e3..64376a5b48c86e6442978b6594a17562d0483d87 100644 (file)
@@ -21,6 +21,7 @@
 #include "messages/MClientRequestForward.h"
 #include "messages/MMDSLoadTargets.h"
 #include "messages/MMDSTableRequest.h"
+#include "messages/MMDSMetrics.h"
 
 #include "mgr/MgrClient.h"
 
@@ -503,6 +504,7 @@ MDSRank::MDSRank(
        }
       )
     ),
+    metrics_handler(cct, this),
     beacon(beacon_),
     messenger(msgr), monc(monc_), mgrc(mgrc),
     respawn_hook(respawn_hook_),
@@ -528,7 +530,7 @@ MDSRank::MDSRank(
   snapserver = new SnapServer(this, monc);
   snapclient = new SnapClient(this);
 
-  server = new Server(this);
+  server = new Server(this, &metrics_handler);
   locker = new Locker(this, mdcache);
 
   op_tracker.set_complaint_and_threshold(cct->_conf->mds_op_complaint_time,
@@ -822,6 +824,10 @@ void MDSRankDispatcher::shutdown()
 
   purge_queue.shutdown();
 
+  // shutdown metrics handler/updater -- this is ok even if it was not
+  // inited.
+  metrics_handler.shutdown();
+
   // shutdown metric aggergator
   if (metric_aggregator != nullptr) {
     metric_aggregator->shutdown();
@@ -2020,6 +2026,10 @@ void MDSRank::active_start()
     mdcache->open_root();
   }
 
+  dout(10) << __func__ << ": initializing metrics handler" << dendl;
+  metrics_handler.init();
+  messenger->add_dispatcher_tail(&metrics_handler);
+
   // metric aggregation is solely done by rank 0
   if (is_rank0()) {
     dout(10) << __func__ << ": initializing metric aggregator" << dendl;
@@ -2458,6 +2468,7 @@ void MDSRankDispatcher::handle_mds_map(
   if (metric_aggregator != nullptr) {
     metric_aggregator->notify_mdsmap(*mdsmap);
   }
+  metrics_handler.notify_mdsmap(*mdsmap);
 }
 
 void MDSRank::handle_mds_recovery(mds_rank_t who)
index 1a6a04b16504ed6ce6f291e80ac8166787e7302e..51a87a93597ff116ffd32cf3c09d272d84c8dae6 100644 (file)
@@ -39,6 +39,7 @@
 #include "MDSContext.h"
 #include "PurgeQueue.h"
 #include "Server.h"
+#include "MetricsHandler.h"
 #include "osdc/Journaler.h"
 
 // Full .h import instead of forward declaration for PerfCounter, for the
@@ -536,6 +537,7 @@ class MDSRank {
     // because its init/shutdown happens at the top level.
     PurgeQueue purge_queue;
 
+    MetricsHandler metrics_handler;
     std::unique_ptr<MetricAggregator> metric_aggregator;
 
     list<cref_t<Message>> waiting_for_nolaggy;
diff --git a/src/mds/MetricsHandler.cc b/src/mds/MetricsHandler.cc
new file mode 100644 (file)
index 0000000..18c7b02
--- /dev/null
@@ -0,0 +1,272 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/debug.h"
+#include "common/errno.h"
+
+#include "messages/MMDSMetrics.h"
+
+#include "MDSRank.h"
+#include "SessionMap.h"
+#include "MetricsHandler.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mds
+#undef dout_prefix
+#define dout_prefix *_dout << __func__ << ": mds.metrics"
+
+MetricsHandler::MetricsHandler(CephContext *cct, MDSRank *mds)
+  : Dispatcher(cct),
+    mds(mds) {
+}
+
+bool MetricsHandler::ms_can_fast_dispatch2(const cref_t<Message> &m) const {
+  return m->get_type() == CEPH_MSG_CLIENT_METRICS || m->get_type() == MSG_MDS_PING;
+}
+
+void MetricsHandler::ms_fast_dispatch2(const ref_t<Message> &m) {
+  bool handled = ms_dispatch2(m);
+  ceph_assert(handled);
+}
+
+bool MetricsHandler::ms_dispatch2(const ref_t<Message> &m) {
+  if (m->get_type() == CEPH_MSG_CLIENT_METRICS &&
+      m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_CLIENT) {
+    handle_client_metrics(ref_cast<MClientMetrics>(m));
+    return true;
+  }
+  if (m->get_type() == MSG_MDS_PING &&
+      m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MDS) {
+    handle_mds_ping(ref_cast<MMDSPing>(m));
+    return true;
+  }
+
+  return false;
+}
+
+void MetricsHandler::init() {
+  dout(10) << dendl;
+
+  updater = std::thread([this]() {
+      std::unique_lock locker(lock);
+      while (!stopping) {
+        double after = g_conf().get_val<std::chrono::seconds>("mds_metrics_update_interval").count();
+        locker.unlock();
+        sleep(after);
+        locker.lock();
+        update_rank0();
+      }
+    });
+}
+
+void MetricsHandler::shutdown() {
+  dout(10) << dendl;
+
+  {
+    std::scoped_lock locker(lock);
+    ceph_assert(!stopping);
+    stopping = true;
+  }
+
+  if (updater.joinable()) {
+    updater.join();
+  }
+}
+
+
+void MetricsHandler::add_session(Session *session) {
+  ceph_assert(session != nullptr);
+
+  auto &client = session->info.inst;
+  dout(10) << ": session=" << session << ", client=" << client << dendl;
+
+  std::scoped_lock locker(lock);
+
+  auto p = client_metrics_map.emplace(client, std::pair(last_updated_seq, Metrics())).first;
+  auto &metrics = p->second.second;
+  metrics.update_type = UPDATE_TYPE_REFRESH;
+  dout(20) << ": metrics=" << metrics << dendl;
+}
+
+void MetricsHandler::remove_session(Session *session) {
+  ceph_assert(session != nullptr);
+
+  auto &client = session->info.inst;
+  dout(10) << ": session=" << session << ", client=" << client << dendl;
+
+  std::scoped_lock locker(lock);
+
+  auto it = client_metrics_map.find(client);
+  if (it == client_metrics_map.end()) {
+    return;
+  }
+
+  // if a session got removed before rank 0 saw at least one refresh
+  // update from us or if we will send a remove type update as the
+  // the first "real" update (with an incoming sequence number), then
+  // cut short the update as rank 0 has not witnessed this client session
+  // update this rank.
+  auto lus = it->second.first;
+  if (lus == last_updated_seq) {
+    dout(10) << ": metric lus=" << lus << ", last_updated_seq=" << last_updated_seq
+             << dendl;
+    client_metrics_map.erase(it);
+    return;
+  }
+
+  // zero out all metrics
+  auto &metrics = it->second.second;
+  metrics.cap_hit_metric = { };
+  metrics.read_latency_metric = { };
+  metrics.write_latency_metric = { };
+  metrics.metadata_latency_metric = { };
+  metrics.update_type = UPDATE_TYPE_REMOVE;
+}
+
+void MetricsHandler::set_next_seq(version_t seq) {
+  dout(20) << ": current sequence number " << next_seq << ", setting next sequence number "
+           << seq << dendl;
+  next_seq = seq;
+}
+
+void MetricsHandler::handle_payload(Session *session, const CapInfoPayload &payload) {
+  dout(20) << ": session=" << session << ", hits=" << payload.cap_hits << ", misses="
+           << payload.cap_misses << dendl;
+
+  auto it = client_metrics_map.find(session->info.inst);
+  if (it == client_metrics_map.end()) {
+    return;
+  }
+
+  auto &metrics = it->second.second;
+  metrics.update_type = UPDATE_TYPE_REFRESH;
+  metrics.cap_hit_metric.hits = payload.cap_hits;
+  metrics.cap_hit_metric.misses = payload.cap_misses;
+}
+
+void MetricsHandler::handle_payload(Session *session, const ReadLatencyPayload &payload) {
+  dout(20) << ": session=" << session << ", latency=" << payload.lat << dendl;
+
+  auto it = client_metrics_map.find(session->info.inst);
+  if (it == client_metrics_map.end()) {
+    return;
+  }
+
+  auto &metrics = it->second.second;
+  metrics.update_type = UPDATE_TYPE_REFRESH;
+  metrics.read_latency_metric.lat = payload.lat;
+}
+
+void MetricsHandler::handle_payload(Session *session, const WriteLatencyPayload &payload) {
+  dout(20) << ": session=" << session << ", latency=" << payload.lat << dendl;
+
+  auto it = client_metrics_map.find(session->info.inst);
+  if (it == client_metrics_map.end()) {
+    return;
+  }
+
+  auto &metrics = it->second.second;
+  metrics.update_type = UPDATE_TYPE_REFRESH;
+  metrics.write_latency_metric.lat = payload.lat;
+}
+
+void MetricsHandler::handle_payload(Session *session, const MetadataLatencyPayload &payload) {
+  dout(20) << ": session=" << session << ", latency=" << payload.lat << dendl;
+
+  auto it = client_metrics_map.find(session->info.inst);
+  if (it == client_metrics_map.end()) {
+    return;
+  }
+
+  auto &metrics = it->second.second;
+  metrics.update_type = UPDATE_TYPE_REFRESH;
+  metrics.metadata_latency_metric.lat = payload.lat;
+}
+
+void MetricsHandler::handle_payload(Session *session, const UnknownPayload &payload) {
+  dout(5) << ": session=" << session << ", ignoring unknown payload" << dendl;
+}
+
+void MetricsHandler::handle_client_metrics(const cref_t<MClientMetrics> &m) {
+  std::scoped_lock locker(lock);
+
+  Session *session = mds->get_session(m);
+  dout(20) << ": session=" << session << dendl;
+
+  if (session == nullptr) {
+    dout(10) << ": ignoring session less message" << dendl;
+    return;
+  }
+
+  for (auto &metric : m->updates) {
+    boost::apply_visitor(HandlePayloadVisitor(this, session), metric.payload);
+  }
+}
+
+void MetricsHandler::handle_mds_ping(const cref_t<MMDSPing> &m) {
+  std::scoped_lock locker(lock);
+  set_next_seq(m->seq);
+}
+
+void MetricsHandler::notify_mdsmap(const MDSMap &mdsmap) {
+  dout(10) << dendl;
+
+  std::set<mds_rank_t> active_set;
+
+  std::scoped_lock locker(lock);
+  // reset the sequence number so that last_updated_seq starts
+  // updating when the new rank0 mds pings us.
+  set_next_seq(0);
+
+  // update new rank0 address
+  mdsmap.get_active_mds_set(active_set);
+  if (!active_set.count((mds_rank_t)0)) {
+    dout(10) << ": rank0 is unavailable" << dendl;
+    addr_rank0 = boost::none;
+    return;
+  }
+
+  auto new_rank0_addr = mdsmap.get_addrs((mds_rank_t)0);
+  if (addr_rank0 != new_rank0_addr) {
+    dout(10) << ": rank0 addr is now " << new_rank0_addr << dendl;
+    addr_rank0 = new_rank0_addr;
+  }
+}
+
+void MetricsHandler::update_rank0() {
+  dout(20) << dendl;
+
+  if (!addr_rank0) {
+    dout(20) << ": not yet notified with rank0 address, ignoring" << dendl;
+    return;
+  }
+
+  metrics_message_t metrics_message;
+  auto &update_client_metrics_map = metrics_message.client_metrics_map;
+
+  metrics_message.seq = next_seq;
+  metrics_message.rank = mds->get_nodeid();
+
+  for (auto p = client_metrics_map.begin(); p != client_metrics_map.end();) {
+    // copy metrics and update local metrics map as required
+    auto &metrics = p->second.second;
+    update_client_metrics_map.emplace(p->first, metrics);
+    if (metrics.update_type == UPDATE_TYPE_REFRESH) {
+      metrics = {};
+      ++p;
+    } else {
+      p = client_metrics_map.erase(p);
+    }
+  }
+
+  // only start incrementing when its kicked via set_next_seq()
+  if (next_seq != 0) {
+    ++last_updated_seq;
+  }
+
+  dout(20) << ": sending metric updates for " << update_client_metrics_map.size()
+           << " clients to rank 0 (address: " << *addr_rank0 << ") with sequence number "
+           << next_seq << ", last updated sequence number " << last_updated_seq << dendl;
+
+  mds->send_message_mds(make_message<MMDSMetrics>(std::move(metrics_message)), *addr_rank0);
+}
diff --git a/src/mds/MetricsHandler.h b/src/mds/MetricsHandler.h
new file mode 100644 (file)
index 0000000..73a18a0
--- /dev/null
@@ -0,0 +1,107 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MDS_METRICS_HANDLER_H
+#define CEPH_MDS_METRICS_HANDLER_H
+
+#include <thread>
+#include <utility>
+#include <boost/variant.hpp>
+
+#include "msg/Dispatcher.h"
+#include "common/ceph_mutex.h"
+#include "include/cephfs/metrics/Types.h"
+
+#include "messages/MMDSPing.h"
+#include "messages/MClientMetrics.h"
+
+#include "MDSPerfMetricTypes.h"
+
+class MDSRank;
+class Session;
+class CephContext;
+
+class MetricsHandler : public Dispatcher {
+public:
+  MetricsHandler(CephContext *cct, MDSRank *mds);
+
+  bool ms_can_fast_dispatch_any() const override {
+    return true;
+  }
+  bool ms_can_fast_dispatch2(const cref_t<Message> &m) const override;
+  void ms_fast_dispatch2(const ref_t<Message> &m) override;
+  bool ms_dispatch2(const ref_t<Message> &m) override;
+
+  void ms_handle_connect(Connection *c) override {
+  }
+  bool ms_handle_reset(Connection *c) override {
+    return false;
+  }
+  void ms_handle_remote_reset(Connection *c) override {
+  }
+  bool ms_handle_refused(Connection *c) override {
+    return false;
+  }
+
+  void add_session(Session *session);
+  void remove_session(Session *session);
+
+  void init();
+  void shutdown();
+
+  void notify_mdsmap(const MDSMap &mdsmap);
+
+private:
+  struct HandlePayloadVisitor : public boost::static_visitor<void> {
+    MetricsHandler *metrics_handler;
+    Session *session;
+
+    HandlePayloadVisitor(MetricsHandler *metrics_handler, Session *session)
+      : metrics_handler(metrics_handler), session(session) {
+    }
+
+    template <typename ClientMetricPayload>
+    inline void operator()(const ClientMetricPayload &payload) const {
+      metrics_handler->handle_payload(session, payload);
+    }
+  };
+
+  MDSRank *mds;
+  // drop this lock when calling ->send_message_mds() else mds might
+  // deadlock
+  ceph::mutex lock = ceph::make_mutex("MetricsHandler::lock");
+
+  // ISN sent by rank0 pinger is 1
+  version_t next_seq = 0;
+
+  // sequence number incremented on each update sent to rank 0.
+  // this is nowhere related to next_seq and is completely used
+  // locally to figure out if a session got added and removed
+  // within an update to rank 0.
+  version_t last_updated_seq = 0;
+
+  std::thread updater;
+  std::map<entity_inst_t, std::pair<version_t, Metrics>> client_metrics_map;
+
+  // address of rank 0 mds, so that the message can be sent without
+  // acquiring mds_lock. misdirected messages to rank 0 are taken
+  // care of by rank 0.
+  boost::optional<entity_addrvec_t> addr_rank0;
+
+  bool stopping = false;
+
+  void handle_payload(Session *session, const CapInfoPayload &payload);
+  void handle_payload(Session *session, const ReadLatencyPayload &payload);
+  void handle_payload(Session *session, const WriteLatencyPayload &payload);
+  void handle_payload(Session *session, const MetadataLatencyPayload &payload);
+  void handle_payload(Session *session, const UnknownPayload &payload);
+
+  void set_next_seq(version_t seq);
+
+  void handle_client_metrics(const cref_t<MClientMetrics> &m);
+  void handle_mds_ping(const cref_t<MMDSPing> &m);
+
+  void update_rank0();
+};
+
+#endif // CEPH_MDS_METRICS_HANDLER_H
index bd11f9384a5be31d0be12e5a931be55231c8fe0d..c107277a99830e97e8af3d1db36a7e07f41de1e3 100644 (file)
@@ -29,6 +29,7 @@
 #include "InoTable.h"
 #include "SnapClient.h"
 #include "Mutation.h"
+#include "MetricsHandler.h"
 #include "cephfs_features.h"
 
 #include "msg/Messenger.h"
@@ -223,10 +224,11 @@ void Server::create_logger()
   g_ceph_context->get_perfcounters_collection()->add(logger);
 }
 
-Server::Server(MDSRank *m) : 
+Server::Server(MDSRank *m, MetricsHandler *metrics_handler) :
   mds(m), 
   mdcache(mds->mdcache), mdlog(mds->mdlog),
-  recall_throttle(g_conf().get_val<double>("mds_recall_max_decay_rate"))
+  recall_throttle(g_conf().get_val<double>("mds_recall_max_decay_rate")),
+  metrics_handler(metrics_handler)
 {
   replay_unsafe_with_closed_session = g_conf().get_val<bool>("mds_replay_unsafe_with_closed_session");
   cap_revoke_eviction_timeout = g_conf().get_val<double>("mds_cap_revoke_eviction_timeout");
@@ -641,8 +643,9 @@ void Server::handle_client_session(const cref_t<MClientSession> &m)
        }
       }
 
-      if (session->is_closed())
-       mds->sessionmap.add_session(session);
+      if (session->is_closed()) {
+        mds->sessionmap.add_session(session);
+      }
 
       pv = mds->sessionmap.mark_projected(session);
       sseq = mds->sessionmap.set_state(session, Session::STATE_OPENING);
@@ -720,7 +723,6 @@ void Server::handle_client_session(const cref_t<MClientSession> &m)
   }
 }
 
-
 void Server::flush_session(Session *session, MDSGatherBuilder *gather) {
   if (!session->is_open() ||
       !session->get_connection() ||
@@ -787,6 +789,7 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve
     ceph_assert(session->is_opening());
     mds->sessionmap.set_state(session, Session::STATE_OPEN);
     mds->sessionmap.touch_session(session);
+    metrics_handler->add_session(session);
     ceph_assert(session->get_connection());
     auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN);
     if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
@@ -843,6 +846,7 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve
       mds->send_message_client(make_message<MClientSession>(CEPH_SESSION_CLOSE), session);
       mds->sessionmap.set_state(session, Session::STATE_CLOSED);
       session->clear();
+      metrics_handler->remove_session(session);
       mds->sessionmap.remove_session(session);
     } else if (session->is_killing()) {
       // destroy session, close connection
@@ -851,6 +855,7 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve
         mds->sessionmap.set_state(session, Session::STATE_CLOSED);
         session->set_connection(nullptr);
       }
+      metrics_handler->remove_session(session);
       mds->sessionmap.remove_session(session);
     } else {
       ceph_abort();
@@ -937,6 +942,7 @@ void Server::finish_force_open_sessions(const map<client_t,pair<Session*,uint64_
        dout(10) << "force_open_sessions opened " << session->info.inst << dendl;
        mds->sessionmap.set_state(session, Session::STATE_OPEN);
        mds->sessionmap.touch_session(session);
+        metrics_handler->add_session(session);
 
        auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN);
        if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
@@ -1403,6 +1409,7 @@ void Server::handle_client_reconnect(const cref_t<MClientReconnect> &m)
   }
 
   if (!m->has_more()) {
+    metrics_handler->add_session(session);
     // notify client of success with an OPEN
     auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN);
     if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
index 530450deabdb525dbbea32b1dacd0b95336f2fa6..52cebd80dd863c83cfff9cb9583b5a1d3c2bd4f6 100644 (file)
@@ -40,6 +40,7 @@ class EMetaBlob;
 class EUpdate;
 class MDLog;
 struct SnapInfo;
+class MetricsHandler;
 
 enum {
   l_mdss_first = 1000,
@@ -92,7 +93,7 @@ public:
     TRIM = (1<<2),
     ENFORCE_LIVENESS = (1<<3),
   };
-  explicit Server(MDSRank *m);
+  explicit Server(MDSRank *m, MetricsHandler *metrics_handler);
   ~Server() {
     g_ceph_context->get_perfcounters_collection()->remove(logger);
     delete logger;
@@ -350,6 +351,8 @@ private:
 
   DecayCounter recall_throttle;
   time last_recall_state;
+
+  MetricsHandler *metrics_handler;
 };
 
 static inline constexpr auto operator|(Server::RecallFlags a, Server::RecallFlags b) {