.set_default(2)
.set_min(1)
.set_description("Size of thread pool for ASIO completions")
- .add_tag("mds")
+ .add_tag("mds"),
+
+ Option("mds_ping_grace", Option::TYPE_SECS, Option::LEVEL_ADVANCED)
+ .set_default(15)
+ .set_flag(Option::FLAG_RUNTIME)
+ .set_description("timeout after which an MDS is considered laggy by rank 0 MDS.")
+ .set_long_description("timeout for replying to a ping message sent by rank 0 after which an active MDS considered laggy (delayed metrics) by rank 0.")
});
}
Mantle.cc
Anchor.cc
OpenFileTable.cc
+ MDSPinger.cc
${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc
${CMAKE_SOURCE_DIR}/src/common/MemoryModel.cc
${CMAKE_SOURCE_DIR}/src/osdc/Journaler.cc)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/dout.h"
+
+#include "mds/MDSRank.h"
+#include "mds/MDSPinger.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mds
+#undef dout_prefix
+#define dout_prefix *_dout << "mds.pinger " << __func__
+
+MDSPinger::MDSPinger(MDSRank *mds)
+ : mds(mds) {
+}
+
+void MDSPinger::send_ping(mds_rank_t rank, const entity_addrvec_t &addr) {
+ dout(10) << ": rank=" << rank << dendl;
+
+ std::scoped_lock locker(lock);
+ auto [it, inserted] = ping_state_by_rank.emplace(rank, PingState());
+ if (inserted) {
+ dout(20) << ": init ping pong state for rank=" << rank << dendl;
+ }
+
+ auto &ping_state = it->second;
+ auto last_seq = ping_state.last_seq++;
+
+ ping_state.seq_time_map.emplace(last_seq, clock::now());
+
+ dout(10) << ": sending ping with sequence=" << last_seq << " to rank="
+ << rank << dendl;
+ mds->send_message_mds(make_message<MMDSPing>(last_seq), addr);
+}
+
+bool MDSPinger::pong_received(mds_rank_t rank, version_t seq) {
+ dout(10) << ": rank=" << rank << ", sequence=" << seq << dendl;
+
+ std::scoped_lock locker(lock);
+ auto it1 = ping_state_by_rank.find(rank);
+ if (it1 == ping_state_by_rank.end()) {
+ // this *might* just happen on mds failover when a non-rank-0 mds
+ // acks backs a ping message from an earlier rank 0 mds to a newly
+ // appointed rank 0 mds (possible?).
+ // or when non rank 0 active MDSs begin sending metric updates before
+ // rank 0 can start pinging it (although, that should resolve out soon).
+ dout(10) << ": received pong from rank=" << rank << " to which ping was never"
+ << " sent (ignoring...)." << dendl;
+ return false;
+ }
+
+ auto &ping_state = it1->second;
+ // find incoming seq timestamp for updation
+ auto it2 = ping_state.seq_time_map.find(seq);
+ if (it2 == ping_state.seq_time_map.end()) {
+ // rank still bootstrapping
+ dout(10) << ": pong received for unknown ping sequence " << seq
+ << ", rank " << rank << " should catch up soon." << dendl;
+ return false;
+ }
+
+ ping_state.last_acked_time = it2->second;
+ ping_state.seq_time_map.erase(ping_state.seq_time_map.begin(), it2);
+
+ return true;
+}
+
+void MDSPinger::reset_ping(mds_rank_t rank) {
+ dout(10) << ": rank=" << rank << dendl;
+
+ std::scoped_lock locker(lock);
+ auto it = ping_state_by_rank.find(rank);
+ if (it == ping_state_by_rank.end()) {
+ dout(10) << ": rank=" << rank << " was never sent ping request." << dendl;
+ return;
+ }
+
+ // remove the rank from ping state, send_ping() will init it
+ // later when invoked.
+ ping_state_by_rank.erase(it);
+}
+
+bool MDSPinger::is_rank_lagging(mds_rank_t rank) {
+ dout(10) << ": rank=" << rank << dendl;
+
+ std::scoped_lock locker(lock);
+ auto it = ping_state_by_rank.find(rank);
+ if (it == ping_state_by_rank.end()) {
+ derr << ": rank=" << rank << " was never sent ping request." << dendl;
+ return false;
+ }
+
+ auto now = clock::now();
+ auto since = std::chrono::duration<double>(now - it->second.last_acked_time).count();
+ if (since > g_conf().get_val<std::chrono::seconds>("mds_ping_grace").count()) {
+ dout(5) << ": rank=" << rank << " is lagging a pong response (last ack time is "
+ << it->second.last_acked_time << ")" << dendl;
+ return true;
+ }
+
+ return false;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MDS_PINGER_H
+#define CEPH_MDS_PINGER_H
+
+#include <map>
+
+#include "include/types.h"
+
+#include "msg/msg_types.h"
+#include "common/ceph_mutex.h"
+#include "common/ceph_time.h"
+#include "messages/MMDSPing.h"
+
+#include "mdstypes.h"
+
+class MDSRank;
+
+class MDSPinger {
+public:
+ MDSPinger(MDSRank *mds);
+
+ // send a ping message to an mds rank. initialize ping state if
+ // required.
+ void send_ping(mds_rank_t rank, const entity_addrvec_t &addr);
+
+ // check if a pong response is valid. a pong reponse from an
+ // mds is valid if at least one ping message was sent to the
+ // mds and the sequence number in the pong is outstanding.
+ bool pong_received(mds_rank_t rank, version_t seq);
+
+ // reset the ping state for a given rank
+ void reset_ping(mds_rank_t rank);
+
+ // check if a rank is lagging (based on pong response) responding
+ // to a ping message.
+ bool is_rank_lagging(mds_rank_t rank);
+
+private:
+ using clock = ceph::coarse_mono_clock;
+ using time = ceph::coarse_mono_time;
+
+ // Initial Sequence Number (ISN) of the first ping message sent
+ // by rank 0 to other active ranks (incuding itself).
+ static constexpr uint64_t MDS_PINGER_ISN = 1;
+
+ struct PingState {
+ version_t last_seq = MDS_PINGER_ISN;
+ std::map<version_t, time> seq_time_map;
+ time last_acked_time = clock::now();
+ };
+
+ MDSRank *mds;
+ // drop this lock when calling ->send_message_mds() else mds might
+ // deadlock
+ ceph::mutex lock = ceph::make_mutex("MDSPinger::lock");
+ std::map<mds_rank_t, PingState> ping_state_by_rank;
+};
+
+#endif // CEPH_MDS_PINGER_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MESSAGES_MMDSPING_H
+#define CEPH_MESSAGES_MMDSPING_H
+
+#include "include/types.h"
+#include "msg/Message.h"
+
+class MMDSPing : public SafeMessage {
+private:
+ static constexpr int HEAD_VERSION = 1;
+ static constexpr int COMPAT_VERSION = 1;
+public:
+ version_t seq;
+
+protected:
+ MMDSPing() : SafeMessage(MSG_MDS_PING, HEAD_VERSION, COMPAT_VERSION) {
+ }
+ MMDSPing(version_t seq)
+ : SafeMessage(MSG_MDS_PING, HEAD_VERSION, COMPAT_VERSION), seq(seq) {
+ }
+ ~MMDSPing() { }
+
+public:
+ std::string_view get_type_name() const override {
+ return "mdsping";
+ }
+
+ void print(ostream &out) const override {
+ out << "mdsping";
+ }
+
+ void encode_payload(uint64_t features) override {
+ using ceph::encode;
+ encode(seq, payload);
+ }
+
+ void decode_payload() override {
+ using ceph::decode;
+ auto iter = payload.cbegin();
+ decode(seq, iter);
+ }
+
+private:
+ template<class T, typename... Args>
+ friend boost::intrusive_ptr<T> ceph::make_message(Args&&... args);
+};
+
+#endif // CEPH_MESSAGES_MMDSPING_H
#include "messages/MMDSTableRequest.h"
#include "messages/MMDSMetrics.h"
+#include "messages/MMDSPing.h"
//#include "messages/MInodeUpdate.h"
#include "messages/MCacheExpire.h"
m = make_message<MMDSMetrics>();
break;
+ case MSG_MDS_PING:
+ m = make_message<MMDSPing>();
+ break;
+
case MSG_MGR_BEACON:
m = make_message<MMgrBeacon>();
break;
#define MSG_MDS_LOCK 0x300
#define MSG_MDS_INODEFILECAPS 0x301
#define MSG_MDS_METRICS 0x302
+#define MSG_MDS_PING 0x303
#define MSG_MDS_EXPORTDIRDISCOVER 0x449
#define MSG_MDS_EXPORTDIRDISCOVERACK 0x450