]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: inter-mds ping-pong message and type
authorVenky Shankar <vshankar@redhat.com>
Sun, 6 Oct 2019 15:25:25 +0000 (11:25 -0400)
committerVenky Shankar <vshankar@redhat.com>
Mon, 1 Jun 2020 08:05:39 +0000 (04:05 -0400)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/common/options.cc
src/mds/CMakeLists.txt
src/mds/MDSPinger.cc [new file with mode: 0644]
src/mds/MDSPinger.h [new file with mode: 0644]
src/messages/MMDSPing.h [new file with mode: 0644]
src/msg/Message.cc
src/msg/Message.h

index c6cd45109f2734e0c67e3aa16fafbc1db05b7eaa..7e563c21a84ab2155a8fdbb1bf922f483f651fbd 100644 (file)
@@ -8311,7 +8311,13 @@ std::vector<Option> get_mds_options() {
     .set_default(2)
     .set_min(1)
     .set_description("Size of thread pool for ASIO completions")
-    .add_tag("mds")
+    .add_tag("mds"),
+
+    Option("mds_ping_grace", Option::TYPE_SECS, Option::LEVEL_ADVANCED)
+     .set_default(15)
+     .set_flag(Option::FLAG_RUNTIME)
+     .set_description("timeout after which an MDS is considered laggy by rank 0 MDS.")
+     .set_long_description("timeout for replying to a ping message sent by rank 0 after which an active MDS considered laggy (delayed metrics) by rank 0.")
   });
 }
 
index ddb07af3761d5944ef583553e374660701c22474..2f63caea57af084f572f037d31752a08e29239f2 100644 (file)
@@ -40,6 +40,7 @@ set(mds_srcs
   Mantle.cc
   Anchor.cc
   OpenFileTable.cc
+  MDSPinger.cc
   ${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc
   ${CMAKE_SOURCE_DIR}/src/common/MemoryModel.cc
   ${CMAKE_SOURCE_DIR}/src/osdc/Journaler.cc)
diff --git a/src/mds/MDSPinger.cc b/src/mds/MDSPinger.cc
new file mode 100644 (file)
index 0000000..bc63a22
--- /dev/null
@@ -0,0 +1,103 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/dout.h"
+
+#include "mds/MDSRank.h"
+#include "mds/MDSPinger.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mds
+#undef dout_prefix
+#define dout_prefix *_dout << "mds.pinger " << __func__
+
+MDSPinger::MDSPinger(MDSRank *mds)
+  : mds(mds) {
+}
+
+void MDSPinger::send_ping(mds_rank_t rank, const entity_addrvec_t &addr) {
+  dout(10) << ": rank=" << rank << dendl;
+
+  std::scoped_lock locker(lock);
+  auto [it, inserted] = ping_state_by_rank.emplace(rank, PingState());
+  if (inserted) {
+    dout(20) << ": init ping pong state for rank=" << rank << dendl;
+  }
+
+  auto &ping_state = it->second;
+  auto last_seq = ping_state.last_seq++;
+
+  ping_state.seq_time_map.emplace(last_seq, clock::now());
+
+  dout(10) << ": sending ping with sequence=" << last_seq << " to rank="
+           << rank << dendl;
+  mds->send_message_mds(make_message<MMDSPing>(last_seq), addr);
+}
+
+bool MDSPinger::pong_received(mds_rank_t rank, version_t seq) {
+  dout(10) << ": rank=" << rank << ", sequence=" << seq << dendl;
+
+  std::scoped_lock locker(lock);
+  auto it1 = ping_state_by_rank.find(rank);
+  if (it1 == ping_state_by_rank.end()) {
+    // this *might* just happen on mds failover when a non-rank-0 mds
+    // acks backs a ping message from an earlier rank 0 mds to a newly
+    // appointed rank 0 mds (possible?).
+    // or when non rank 0 active MDSs begin sending metric updates before
+    // rank 0 can start pinging it (although, that should resolve out soon).
+    dout(10) << ": received pong from rank=" << rank << " to which ping was never"
+             << " sent (ignoring...)." << dendl;
+    return false;
+  }
+
+  auto &ping_state = it1->second;
+  // find incoming seq timestamp for updation
+  auto it2 = ping_state.seq_time_map.find(seq);
+  if (it2 == ping_state.seq_time_map.end()) {
+    // rank still bootstrapping
+    dout(10) << ": pong received for unknown ping sequence " << seq
+             << ", rank " << rank << " should catch up soon." << dendl;
+    return false;
+  }
+
+  ping_state.last_acked_time = it2->second;
+  ping_state.seq_time_map.erase(ping_state.seq_time_map.begin(), it2);
+
+  return true;
+}
+
+void MDSPinger::reset_ping(mds_rank_t rank) {
+  dout(10) << ": rank=" << rank << dendl;
+
+  std::scoped_lock locker(lock);
+  auto it = ping_state_by_rank.find(rank);
+  if (it == ping_state_by_rank.end()) {
+    dout(10) << ": rank=" << rank << " was never sent ping request." << dendl;
+    return;
+  }
+
+  // remove the rank from ping state, send_ping() will init it
+  // later when invoked.
+  ping_state_by_rank.erase(it);
+}
+
+bool MDSPinger::is_rank_lagging(mds_rank_t rank) {
+  dout(10) << ": rank=" << rank << dendl;
+
+  std::scoped_lock locker(lock);
+  auto it = ping_state_by_rank.find(rank);
+  if (it == ping_state_by_rank.end()) {
+    derr << ": rank=" << rank << " was never sent ping request." << dendl;
+    return false;
+  }
+
+  auto now = clock::now();
+  auto since = std::chrono::duration<double>(now - it->second.last_acked_time).count();
+  if (since > g_conf().get_val<std::chrono::seconds>("mds_ping_grace").count()) {
+    dout(5) << ": rank=" << rank << " is lagging a pong response (last ack time is "
+            <<  it->second.last_acked_time << ")" << dendl;
+    return true;
+  }
+
+  return false;
+}
diff --git a/src/mds/MDSPinger.h b/src/mds/MDSPinger.h
new file mode 100644 (file)
index 0000000..51c3ebe
--- /dev/null
@@ -0,0 +1,61 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MDS_PINGER_H
+#define CEPH_MDS_PINGER_H
+
+#include <map>
+
+#include "include/types.h"
+
+#include "msg/msg_types.h"
+#include "common/ceph_mutex.h"
+#include "common/ceph_time.h"
+#include "messages/MMDSPing.h"
+
+#include "mdstypes.h"
+
+class MDSRank;
+
+class MDSPinger {
+public:
+  MDSPinger(MDSRank *mds);
+
+  // send a ping message to an mds rank. initialize ping state if
+  // required.
+  void send_ping(mds_rank_t rank, const entity_addrvec_t &addr);
+
+  // check if a pong response is valid. a pong reponse from an
+  // mds is valid if at least one ping message was sent to the
+  // mds and the sequence number in the pong is outstanding.
+  bool pong_received(mds_rank_t rank, version_t seq);
+
+  // reset the ping state for a given rank
+  void reset_ping(mds_rank_t rank);
+
+  // check if a rank is lagging (based on pong response) responding
+  // to a ping message.
+  bool is_rank_lagging(mds_rank_t rank);
+
+private:
+  using clock = ceph::coarse_mono_clock;
+  using time = ceph::coarse_mono_time;
+
+  // Initial Sequence Number (ISN) of the first ping message sent
+  // by rank 0 to other active ranks (incuding itself).
+  static constexpr uint64_t MDS_PINGER_ISN = 1;
+
+  struct PingState {
+    version_t last_seq = MDS_PINGER_ISN;
+    std::map<version_t, time> seq_time_map;
+    time last_acked_time = clock::now();
+  };
+
+  MDSRank *mds;
+  // drop this lock when calling ->send_message_mds() else mds might
+  // deadlock
+  ceph::mutex lock = ceph::make_mutex("MDSPinger::lock");
+  std::map<mds_rank_t, PingState> ping_state_by_rank;
+};
+
+#endif // CEPH_MDS_PINGER_H
diff --git a/src/messages/MMDSPing.h b/src/messages/MMDSPing.h
new file mode 100644 (file)
index 0000000..8746550
--- /dev/null
@@ -0,0 +1,50 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MESSAGES_MMDSPING_H
+#define CEPH_MESSAGES_MMDSPING_H
+
+#include "include/types.h"
+#include "msg/Message.h"
+
+class MMDSPing : public SafeMessage {
+private:
+  static constexpr int HEAD_VERSION = 1;
+  static constexpr int COMPAT_VERSION = 1;
+public:
+  version_t seq;
+
+protected:
+  MMDSPing() : SafeMessage(MSG_MDS_PING, HEAD_VERSION, COMPAT_VERSION) {
+  }
+  MMDSPing(version_t seq)
+    : SafeMessage(MSG_MDS_PING, HEAD_VERSION, COMPAT_VERSION), seq(seq) {
+  }
+  ~MMDSPing() { }
+
+public:
+  std::string_view get_type_name() const override {
+    return "mdsping";
+  }
+
+  void print(ostream &out) const override {
+    out << "mdsping";
+  }
+
+  void encode_payload(uint64_t features) override {
+    using ceph::encode;
+    encode(seq, payload);
+  }
+
+  void decode_payload() override {
+    using ceph::decode;
+    auto iter = payload.cbegin();
+    decode(seq, iter);
+  }
+
+private:
+  template<class T, typename... Args>
+  friend boost::intrusive_ptr<T> ceph::make_message(Args&&... args);
+};
+
+#endif // CEPH_MESSAGES_MMDSPING_H
index 134d8ceab67db1ef7e9c56ec005b67eb8435b8ef..00eab04bb18d5f538f9fe9d888f6511c5cde060e 100644 (file)
 
 #include "messages/MMDSTableRequest.h"
 #include "messages/MMDSMetrics.h"
+#include "messages/MMDSPing.h"
 
 //#include "messages/MInodeUpdate.h"
 #include "messages/MCacheExpire.h"
@@ -838,6 +839,10 @@ Message *decode_message(CephContext *cct,
     m = make_message<MMDSMetrics>();
     break;
 
+  case MSG_MDS_PING:
+    m = make_message<MMDSPing>();
+    break;
+
   case MSG_MGR_BEACON:
     m = make_message<MMgrBeacon>();
     break;
index 60b0eac02d59c446e9c41679639afe904856772f..5b7e6eabd85c231cf669596a32b6f31453876790 100644 (file)
 #define MSG_MDS_LOCK               0x300
 #define MSG_MDS_INODEFILECAPS      0x301
 #define MSG_MDS_METRICS            0x302
+#define MSG_MDS_PING               0x303
 
 #define MSG_MDS_EXPORTDIRDISCOVER     0x449
 #define MSG_MDS_EXPORTDIRDISCOVERACK  0x450