.set_default(5)
.set_flag(Option::FLAG_RUNTIME)
.set_description("interval in seconds for sending ping messages to active MDSs.")
- .set_long_description("interval in seconds for rank 0 to send ping messages to all active MDSs.")
+ .set_long_description("interval in seconds for rank 0 to send ping messages to all active MDSs."),
+
+ Option("mds_metrics_update_interval", Option::TYPE_SECS, Option::LEVEL_ADVANCED)
+ .set_default(2)
+ .set_flag(Option::FLAG_RUNTIME)
+ .set_description("interval in seconds for metrics data update.")
+ .set_long_description("interval in seconds after which active MDSs send client metrics data to rank 0.")
});
}
OpenFileTable.cc
MDSPinger.cc
MetricAggregator.cc
+ MetricsHandler.cc
${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc
${CMAKE_SOURCE_DIR}/src/common/MemoryModel.cc
${CMAKE_SOURCE_DIR}/src/osdc/Journaler.cc
#include "messages/MClientRequestForward.h"
#include "messages/MMDSLoadTargets.h"
#include "messages/MMDSTableRequest.h"
+#include "messages/MMDSMetrics.h"
#include "mgr/MgrClient.h"
}
)
),
+ metrics_handler(cct, this),
beacon(beacon_),
messenger(msgr), monc(monc_), mgrc(mgrc),
respawn_hook(respawn_hook_),
snapserver = new SnapServer(this, monc);
snapclient = new SnapClient(this);
- server = new Server(this);
+ server = new Server(this, &metrics_handler);
locker = new Locker(this, mdcache);
op_tracker.set_complaint_and_threshold(cct->_conf->mds_op_complaint_time,
purge_queue.shutdown();
+ // shutdown metrics handler/updater -- this is ok even if it was not
+ // inited.
+ metrics_handler.shutdown();
+
// shutdown metric aggergator
if (metric_aggregator != nullptr) {
metric_aggregator->shutdown();
mdcache->open_root();
}
+ dout(10) << __func__ << ": initializing metrics handler" << dendl;
+ metrics_handler.init();
+ messenger->add_dispatcher_tail(&metrics_handler);
+
// metric aggregation is solely done by rank 0
if (is_rank0()) {
dout(10) << __func__ << ": initializing metric aggregator" << dendl;
if (metric_aggregator != nullptr) {
metric_aggregator->notify_mdsmap(*mdsmap);
}
+ metrics_handler.notify_mdsmap(*mdsmap);
}
void MDSRank::handle_mds_recovery(mds_rank_t who)
#include "MDSContext.h"
#include "PurgeQueue.h"
#include "Server.h"
+#include "MetricsHandler.h"
#include "osdc/Journaler.h"
// Full .h import instead of forward declaration for PerfCounter, for the
// because its init/shutdown happens at the top level.
PurgeQueue purge_queue;
+ MetricsHandler metrics_handler;
std::unique_ptr<MetricAggregator> metric_aggregator;
list<cref_t<Message>> waiting_for_nolaggy;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/debug.h"
+#include "common/errno.h"
+
+#include "messages/MMDSMetrics.h"
+
+#include "MDSRank.h"
+#include "SessionMap.h"
+#include "MetricsHandler.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mds
+#undef dout_prefix
+#define dout_prefix *_dout << __func__ << ": mds.metrics"
+
+MetricsHandler::MetricsHandler(CephContext *cct, MDSRank *mds)
+ : Dispatcher(cct),
+ mds(mds) {
+}
+
+bool MetricsHandler::ms_can_fast_dispatch2(const cref_t<Message> &m) const {
+ return m->get_type() == CEPH_MSG_CLIENT_METRICS || m->get_type() == MSG_MDS_PING;
+}
+
+void MetricsHandler::ms_fast_dispatch2(const ref_t<Message> &m) {
+ bool handled = ms_dispatch2(m);
+ ceph_assert(handled);
+}
+
+bool MetricsHandler::ms_dispatch2(const ref_t<Message> &m) {
+ if (m->get_type() == CEPH_MSG_CLIENT_METRICS &&
+ m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_CLIENT) {
+ handle_client_metrics(ref_cast<MClientMetrics>(m));
+ return true;
+ }
+ if (m->get_type() == MSG_MDS_PING &&
+ m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MDS) {
+ handle_mds_ping(ref_cast<MMDSPing>(m));
+ return true;
+ }
+
+ return false;
+}
+
+void MetricsHandler::init() {
+ dout(10) << dendl;
+
+ updater = std::thread([this]() {
+ std::unique_lock locker(lock);
+ while (!stopping) {
+ double after = g_conf().get_val<std::chrono::seconds>("mds_metrics_update_interval").count();
+ locker.unlock();
+ sleep(after);
+ locker.lock();
+ update_rank0();
+ }
+ });
+}
+
+void MetricsHandler::shutdown() {
+ dout(10) << dendl;
+
+ {
+ std::scoped_lock locker(lock);
+ ceph_assert(!stopping);
+ stopping = true;
+ }
+
+ if (updater.joinable()) {
+ updater.join();
+ }
+}
+
+
+void MetricsHandler::add_session(Session *session) {
+ ceph_assert(session != nullptr);
+
+ auto &client = session->info.inst;
+ dout(10) << ": session=" << session << ", client=" << client << dendl;
+
+ std::scoped_lock locker(lock);
+
+ auto p = client_metrics_map.emplace(client, std::pair(last_updated_seq, Metrics())).first;
+ auto &metrics = p->second.second;
+ metrics.update_type = UPDATE_TYPE_REFRESH;
+ dout(20) << ": metrics=" << metrics << dendl;
+}
+
+void MetricsHandler::remove_session(Session *session) {
+ ceph_assert(session != nullptr);
+
+ auto &client = session->info.inst;
+ dout(10) << ": session=" << session << ", client=" << client << dendl;
+
+ std::scoped_lock locker(lock);
+
+ auto it = client_metrics_map.find(client);
+ if (it == client_metrics_map.end()) {
+ return;
+ }
+
+ // if a session got removed before rank 0 saw at least one refresh
+ // update from us or if we will send a remove type update as the
+ // the first "real" update (with an incoming sequence number), then
+ // cut short the update as rank 0 has not witnessed this client session
+ // update this rank.
+ auto lus = it->second.first;
+ if (lus == last_updated_seq) {
+ dout(10) << ": metric lus=" << lus << ", last_updated_seq=" << last_updated_seq
+ << dendl;
+ client_metrics_map.erase(it);
+ return;
+ }
+
+ // zero out all metrics
+ auto &metrics = it->second.second;
+ metrics.cap_hit_metric = { };
+ metrics.read_latency_metric = { };
+ metrics.write_latency_metric = { };
+ metrics.metadata_latency_metric = { };
+ metrics.update_type = UPDATE_TYPE_REMOVE;
+}
+
+void MetricsHandler::set_next_seq(version_t seq) {
+ dout(20) << ": current sequence number " << next_seq << ", setting next sequence number "
+ << seq << dendl;
+ next_seq = seq;
+}
+
+void MetricsHandler::handle_payload(Session *session, const CapInfoPayload &payload) {
+ dout(20) << ": session=" << session << ", hits=" << payload.cap_hits << ", misses="
+ << payload.cap_misses << dendl;
+
+ auto it = client_metrics_map.find(session->info.inst);
+ if (it == client_metrics_map.end()) {
+ return;
+ }
+
+ auto &metrics = it->second.second;
+ metrics.update_type = UPDATE_TYPE_REFRESH;
+ metrics.cap_hit_metric.hits = payload.cap_hits;
+ metrics.cap_hit_metric.misses = payload.cap_misses;
+}
+
+void MetricsHandler::handle_payload(Session *session, const ReadLatencyPayload &payload) {
+ dout(20) << ": session=" << session << ", latency=" << payload.lat << dendl;
+
+ auto it = client_metrics_map.find(session->info.inst);
+ if (it == client_metrics_map.end()) {
+ return;
+ }
+
+ auto &metrics = it->second.second;
+ metrics.update_type = UPDATE_TYPE_REFRESH;
+ metrics.read_latency_metric.lat = payload.lat;
+}
+
+void MetricsHandler::handle_payload(Session *session, const WriteLatencyPayload &payload) {
+ dout(20) << ": session=" << session << ", latency=" << payload.lat << dendl;
+
+ auto it = client_metrics_map.find(session->info.inst);
+ if (it == client_metrics_map.end()) {
+ return;
+ }
+
+ auto &metrics = it->second.second;
+ metrics.update_type = UPDATE_TYPE_REFRESH;
+ metrics.write_latency_metric.lat = payload.lat;
+}
+
+void MetricsHandler::handle_payload(Session *session, const MetadataLatencyPayload &payload) {
+ dout(20) << ": session=" << session << ", latency=" << payload.lat << dendl;
+
+ auto it = client_metrics_map.find(session->info.inst);
+ if (it == client_metrics_map.end()) {
+ return;
+ }
+
+ auto &metrics = it->second.second;
+ metrics.update_type = UPDATE_TYPE_REFRESH;
+ metrics.metadata_latency_metric.lat = payload.lat;
+}
+
+void MetricsHandler::handle_payload(Session *session, const UnknownPayload &payload) {
+ dout(5) << ": session=" << session << ", ignoring unknown payload" << dendl;
+}
+
+void MetricsHandler::handle_client_metrics(const cref_t<MClientMetrics> &m) {
+ std::scoped_lock locker(lock);
+
+ Session *session = mds->get_session(m);
+ dout(20) << ": session=" << session << dendl;
+
+ if (session == nullptr) {
+ dout(10) << ": ignoring session less message" << dendl;
+ return;
+ }
+
+ for (auto &metric : m->updates) {
+ boost::apply_visitor(HandlePayloadVisitor(this, session), metric.payload);
+ }
+}
+
+void MetricsHandler::handle_mds_ping(const cref_t<MMDSPing> &m) {
+ std::scoped_lock locker(lock);
+ set_next_seq(m->seq);
+}
+
+void MetricsHandler::notify_mdsmap(const MDSMap &mdsmap) {
+ dout(10) << dendl;
+
+ std::set<mds_rank_t> active_set;
+
+ std::scoped_lock locker(lock);
+ // reset the sequence number so that last_updated_seq starts
+ // updating when the new rank0 mds pings us.
+ set_next_seq(0);
+
+ // update new rank0 address
+ mdsmap.get_active_mds_set(active_set);
+ if (!active_set.count((mds_rank_t)0)) {
+ dout(10) << ": rank0 is unavailable" << dendl;
+ addr_rank0 = boost::none;
+ return;
+ }
+
+ auto new_rank0_addr = mdsmap.get_addrs((mds_rank_t)0);
+ if (addr_rank0 != new_rank0_addr) {
+ dout(10) << ": rank0 addr is now " << new_rank0_addr << dendl;
+ addr_rank0 = new_rank0_addr;
+ }
+}
+
+void MetricsHandler::update_rank0() {
+ dout(20) << dendl;
+
+ if (!addr_rank0) {
+ dout(20) << ": not yet notified with rank0 address, ignoring" << dendl;
+ return;
+ }
+
+ metrics_message_t metrics_message;
+ auto &update_client_metrics_map = metrics_message.client_metrics_map;
+
+ metrics_message.seq = next_seq;
+ metrics_message.rank = mds->get_nodeid();
+
+ for (auto p = client_metrics_map.begin(); p != client_metrics_map.end();) {
+ // copy metrics and update local metrics map as required
+ auto &metrics = p->second.second;
+ update_client_metrics_map.emplace(p->first, metrics);
+ if (metrics.update_type == UPDATE_TYPE_REFRESH) {
+ metrics = {};
+ ++p;
+ } else {
+ p = client_metrics_map.erase(p);
+ }
+ }
+
+ // only start incrementing when its kicked via set_next_seq()
+ if (next_seq != 0) {
+ ++last_updated_seq;
+ }
+
+ dout(20) << ": sending metric updates for " << update_client_metrics_map.size()
+ << " clients to rank 0 (address: " << *addr_rank0 << ") with sequence number "
+ << next_seq << ", last updated sequence number " << last_updated_seq << dendl;
+
+ mds->send_message_mds(make_message<MMDSMetrics>(std::move(metrics_message)), *addr_rank0);
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MDS_METRICS_HANDLER_H
+#define CEPH_MDS_METRICS_HANDLER_H
+
+#include <thread>
+#include <utility>
+#include <boost/variant.hpp>
+
+#include "msg/Dispatcher.h"
+#include "common/ceph_mutex.h"
+#include "include/cephfs/metrics/Types.h"
+
+#include "messages/MMDSPing.h"
+#include "messages/MClientMetrics.h"
+
+#include "MDSPerfMetricTypes.h"
+
+class MDSRank;
+class Session;
+class CephContext;
+
+class MetricsHandler : public Dispatcher {
+public:
+ MetricsHandler(CephContext *cct, MDSRank *mds);
+
+ bool ms_can_fast_dispatch_any() const override {
+ return true;
+ }
+ bool ms_can_fast_dispatch2(const cref_t<Message> &m) const override;
+ void ms_fast_dispatch2(const ref_t<Message> &m) override;
+ bool ms_dispatch2(const ref_t<Message> &m) override;
+
+ void ms_handle_connect(Connection *c) override {
+ }
+ bool ms_handle_reset(Connection *c) override {
+ return false;
+ }
+ void ms_handle_remote_reset(Connection *c) override {
+ }
+ bool ms_handle_refused(Connection *c) override {
+ return false;
+ }
+
+ void add_session(Session *session);
+ void remove_session(Session *session);
+
+ void init();
+ void shutdown();
+
+ void notify_mdsmap(const MDSMap &mdsmap);
+
+private:
+ struct HandlePayloadVisitor : public boost::static_visitor<void> {
+ MetricsHandler *metrics_handler;
+ Session *session;
+
+ HandlePayloadVisitor(MetricsHandler *metrics_handler, Session *session)
+ : metrics_handler(metrics_handler), session(session) {
+ }
+
+ template <typename ClientMetricPayload>
+ inline void operator()(const ClientMetricPayload &payload) const {
+ metrics_handler->handle_payload(session, payload);
+ }
+ };
+
+ MDSRank *mds;
+ // drop this lock when calling ->send_message_mds() else mds might
+ // deadlock
+ ceph::mutex lock = ceph::make_mutex("MetricsHandler::lock");
+
+ // ISN sent by rank0 pinger is 1
+ version_t next_seq = 0;
+
+ // sequence number incremented on each update sent to rank 0.
+ // this is nowhere related to next_seq and is completely used
+ // locally to figure out if a session got added and removed
+ // within an update to rank 0.
+ version_t last_updated_seq = 0;
+
+ std::thread updater;
+ std::map<entity_inst_t, std::pair<version_t, Metrics>> client_metrics_map;
+
+ // address of rank 0 mds, so that the message can be sent without
+ // acquiring mds_lock. misdirected messages to rank 0 are taken
+ // care of by rank 0.
+ boost::optional<entity_addrvec_t> addr_rank0;
+
+ bool stopping = false;
+
+ void handle_payload(Session *session, const CapInfoPayload &payload);
+ void handle_payload(Session *session, const ReadLatencyPayload &payload);
+ void handle_payload(Session *session, const WriteLatencyPayload &payload);
+ void handle_payload(Session *session, const MetadataLatencyPayload &payload);
+ void handle_payload(Session *session, const UnknownPayload &payload);
+
+ void set_next_seq(version_t seq);
+
+ void handle_client_metrics(const cref_t<MClientMetrics> &m);
+ void handle_mds_ping(const cref_t<MMDSPing> &m);
+
+ void update_rank0();
+};
+
+#endif // CEPH_MDS_METRICS_HANDLER_H
#include "InoTable.h"
#include "SnapClient.h"
#include "Mutation.h"
+#include "MetricsHandler.h"
#include "cephfs_features.h"
#include "msg/Messenger.h"
g_ceph_context->get_perfcounters_collection()->add(logger);
}
-Server::Server(MDSRank *m) :
+Server::Server(MDSRank *m, MetricsHandler *metrics_handler) :
mds(m),
mdcache(mds->mdcache), mdlog(mds->mdlog),
- recall_throttle(g_conf().get_val<double>("mds_recall_max_decay_rate"))
+ recall_throttle(g_conf().get_val<double>("mds_recall_max_decay_rate")),
+ metrics_handler(metrics_handler)
{
replay_unsafe_with_closed_session = g_conf().get_val<bool>("mds_replay_unsafe_with_closed_session");
cap_revoke_eviction_timeout = g_conf().get_val<double>("mds_cap_revoke_eviction_timeout");
}
}
- if (session->is_closed())
- mds->sessionmap.add_session(session);
+ if (session->is_closed()) {
+ mds->sessionmap.add_session(session);
+ }
pv = mds->sessionmap.mark_projected(session);
sseq = mds->sessionmap.set_state(session, Session::STATE_OPENING);
}
}
-
void Server::flush_session(Session *session, MDSGatherBuilder *gather) {
if (!session->is_open() ||
!session->get_connection() ||
ceph_assert(session->is_opening());
mds->sessionmap.set_state(session, Session::STATE_OPEN);
mds->sessionmap.touch_session(session);
+ metrics_handler->add_session(session);
ceph_assert(session->get_connection());
auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN);
if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
mds->send_message_client(make_message<MClientSession>(CEPH_SESSION_CLOSE), session);
mds->sessionmap.set_state(session, Session::STATE_CLOSED);
session->clear();
+ metrics_handler->remove_session(session);
mds->sessionmap.remove_session(session);
} else if (session->is_killing()) {
// destroy session, close connection
mds->sessionmap.set_state(session, Session::STATE_CLOSED);
session->set_connection(nullptr);
}
+ metrics_handler->remove_session(session);
mds->sessionmap.remove_session(session);
} else {
ceph_abort();
dout(10) << "force_open_sessions opened " << session->info.inst << dendl;
mds->sessionmap.set_state(session, Session::STATE_OPEN);
mds->sessionmap.touch_session(session);
+ metrics_handler->add_session(session);
auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN);
if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
}
if (!m->has_more()) {
+ metrics_handler->add_session(session);
// notify client of success with an OPEN
auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN);
if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
class EUpdate;
class MDLog;
struct SnapInfo;
+class MetricsHandler;
enum {
l_mdss_first = 1000,
TRIM = (1<<2),
ENFORCE_LIVENESS = (1<<3),
};
- explicit Server(MDSRank *m);
+ explicit Server(MDSRank *m, MetricsHandler *metrics_handler);
~Server() {
g_ceph_context->get_perfcounters_collection()->remove(logger);
delete logger;
DecayCounter recall_throttle;
time last_recall_state;
+
+ MetricsHandler *metrics_handler;
};
static inline constexpr auto operator|(Server::RecallFlags a, Server::RecallFlags b) {