]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: add heartbeat support
authorKefu Chai <kchai@redhat.com>
Mon, 14 Jan 2019 08:59:09 +0000 (16:59 +0800)
committerKefu Chai <kchai@redhat.com>
Sat, 2 Feb 2019 05:20:00 +0000 (13:20 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/osd/CMakeLists.txt
src/crimson/osd/heartbeat.cc [new file with mode: 0644]
src/crimson/osd/heartbeat.h [new file with mode: 0644]
src/crimson/osd/osd.cc
src/crimson/osd/osd.h

index 624414701538f8153e1ab0945b2862803a341834..e86a11b5e605c495dd857d15ab9d6c9240f7b3fc 100644 (file)
@@ -1,5 +1,6 @@
 add_executable(crimson-osd
   chained_dispatchers.cc
+  heartbeat.cc
   main.cc
   osd.cc
   osd_meta.cc
diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc
new file mode 100644 (file)
index 0000000..c19ef80
--- /dev/null
@@ -0,0 +1,300 @@
+#include "heartbeat.h"
+
+#include <boost/range/join.hpp>
+
+#include "messages/MOSDPing.h"
+#include "messages/MOSDFailure.h"
+
+#include "crimson/common/config_proxy.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/SocketMessenger.h"
+#include "crimson/osd/osdmap_service.h"
+#include "crimson/mon/MonClient.h"
+
+#include "osd/OSDMap.h"
+
+using ceph::common::local_conf;
+
+namespace {
+  seastar::logger& logger() {
+    return ceph::get_logger(ceph_subsys_osd);
+  }
+
+  template<typename Message, typename... Args>
+  Ref<Message> make_message(Args&&... args)
+  {
+    return {new Message{std::forward<Args>(args)...}, false};
+  }
+}
+
+Heartbeat::Heartbeat(int whoami,
+                     uint32_t nonce,
+                     const OSDMapService& service,
+                     ceph::mon::Client& monc)
+  : front_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami),
+                                              "hb_front", nonce}},
+    back_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami),
+                                             "hb_back", nonce}},
+    service{service},
+    monc{monc},
+    timer{[this] {send_heartbeats();}}
+{}
+
+seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
+                                   entity_addrvec_t back_addrs)
+{
+  logger().info("heartbeat: start");
+  // i only care about the address, so any unused port would work
+  for (auto& addr : boost::join(front_addrs.v, back_addrs.v)) {
+    addr.set_port(0);
+  }
+  front_msgr->try_bind(front_addrs,
+                       local_conf()->ms_bind_port_min,
+                       local_conf()->ms_bind_port_max);
+  back_msgr->try_bind(front_addrs,
+                      local_conf()->ms_bind_port_min,
+                      local_conf()->ms_bind_port_max);
+  return seastar::when_all_succeed(front_msgr->start(this),
+                                   back_msgr->start(this)).then([this] {
+    timer.arm_periodic(
+      std::chrono::seconds(local_conf()->osd_heartbeat_interval));
+  });
+}
+
+seastar::future<> Heartbeat::stop()
+{
+  return seastar::when_all_succeed(front_msgr->shutdown(),
+                                   back_msgr->shutdown());
+}
+
+const entity_addrvec_t& Heartbeat::get_front_addrs() const
+{
+  return front_msgr->get_myaddrs();
+}
+
+const entity_addrvec_t& Heartbeat::get_back_addrs() const
+{
+  return back_msgr->get_myaddrs();
+}
+
+void Heartbeat::add_peer(osd_id_t peer)
+{
+  auto found = peers.find(peer);
+  if (found == peers.end()) {
+    logger().info("add_peer({})", peer);
+    PeerInfo info;
+    auto osdmap = service.get_map();
+    // TODO: msgr v2
+    info.con_front =
+      front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(),
+                          CEPH_ENTITY_TYPE_OSD);
+    info.con_back =
+      back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(),
+                         CEPH_ENTITY_TYPE_OSD);
+    peers.emplace(peer, std::move(info));
+  }
+}
+
+seastar::future<> Heartbeat::remove_peer(osd_id_t peer)
+{
+  auto found = peers.find(peer);
+  assert(found != peers.end());
+  logger().info("remove_peer({})", peer);
+  return seastar::when_all_succeed(found->second.con_front->close(),
+                                   found->second.con_back->close()).then(
+    [this, peer] {
+      peers.erase(peer);
+      return seastar::now();
+    });
+}
+
+seastar::future<> Heartbeat::ms_dispatch(ceph::net::ConnectionRef conn,
+                                         MessageRef m)
+{
+  logger().info("heartbeat: ms_dispatch {}", *m);
+  switch (m->get_type()) {
+  case CEPH_MSG_PING:
+    return handle_osd_ping(conn, boost::static_pointer_cast<MOSDPing>(m));
+  default:
+    return seastar::now();
+  }
+}
+
+seastar::future<> Heartbeat::handle_osd_ping(ceph::net::ConnectionRef conn,
+                                             Ref<MOSDPing> m)
+{
+  switch (m->op) {
+  case MOSDPing::PING:
+    return handle_ping(conn, m);
+  case MOSDPing::PING_REPLY:
+    return handle_reply(conn, m);
+  case MOSDPing::YOU_DIED:
+    return handle_you_died();
+  default:
+    return seastar::now();
+  }
+}
+
+seastar::future<> Heartbeat::handle_ping(ceph::net::ConnectionRef conn,
+                                         Ref<MOSDPing> m)
+{
+  auto min_message = static_cast<uint32_t>(
+    local_conf()->osd_heartbeat_min_size);
+  auto reply =
+    make_message<MOSDPing>(m->fsid,
+                           service.get_map()->get_epoch(),
+                           MOSDPing::PING_REPLY,
+                           m->stamp,
+                           min_message);
+  return conn->send(reply);
+}
+
+seastar::future<> Heartbeat::handle_reply(ceph::net::ConnectionRef conn,
+                                          Ref<MOSDPing> m)
+{
+  const osd_id_t from = m->get_source().num();
+  auto found = peers.find(from);
+  if (found == peers.end()) {
+    // stale reply
+    return seastar::now();
+  }
+  auto& peer = found->second;
+  auto ping = peer.ping_history.find(m->stamp);
+  if (ping == peer.ping_history.end()) {
+    // old replies, deprecated by newly sent pings.
+    return seastar::now();
+  }
+  const auto now = clock::now();
+  auto& unacked = ping->second.unacknowledged;
+  if (conn == peer.con_back) {
+    peer.last_rx_back = now;
+    unacked--;
+  } else if (conn == peer.con_front) {
+    peer.last_rx_front = now;
+    unacked--;
+  }
+  if (unacked == 0) {
+    peer.ping_history.erase(peer.ping_history.begin(), ++ping);
+  }
+  if (peer.is_healthy(now)) {
+    // cancel false reports
+    failure_queue.erase(from);
+    if (auto pending = failure_pending.find(from);
+        pending != failure_pending.end()) {
+      return send_still_alive(from, pending->second.addrs);
+    }
+  }
+  return seastar::now();
+}
+
+seastar::future<> Heartbeat::handle_you_died()
+{
+  // TODO: ask for newer osdmap
+  return seastar::now();
+}
+
+seastar::future<> Heartbeat::send_heartbeats()
+{
+  using peers_item_t = typename peers_map_t::value_type;
+  return seastar::parallel_for_each(peers,
+    [this](peers_item_t& item) {
+      const auto now = clock::now();
+      const auto deadline =
+        now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
+      auto& [peer, info] = item;
+      info.last_tx = now;
+      if (clock::is_zero(info.first_tx)) {
+        info.first_tx = now;
+      }
+      const utime_t sent_stamp{now};
+      auto [reply, added] = info.ping_history.emplace(sent_stamp,
+                                                      reply_t{deadline, 0});
+      std::vector<ceph::net::ConnectionRef> conns{info.con_front,
+                                                  info.con_back};
+      return seastar::parallel_for_each(std::move(conns),
+        [=] (auto con) {
+          if (con) {
+            auto min_message = static_cast<uint32_t>(
+              local_conf()->osd_heartbeat_min_size);
+            auto ping = make_message<MOSDPing>(monc.get_fsid(),
+                                               service.get_map()->get_epoch(),
+                                               MOSDPing::PING,
+                                               sent_stamp,
+                                               min_message);
+            return con->send(ping).then([&reply] {
+              reply->second.unacknowledged++;
+              return seastar::now();
+            });
+          } else {
+            return seastar::now();
+          }
+        });
+    });
+}
+
+seastar::future<> Heartbeat::send_failures()
+{
+  using failure_item_t = typename failure_queue_t::value_type;
+  return seastar::parallel_for_each(failure_queue,
+    [this](failure_item_t& failure_item) {
+      auto [osd, failed_since] = failure_item;
+      if (failure_pending.count(osd)) {
+        return seastar::now();
+      }
+      auto failed_for = chrono::duration_cast<chrono::seconds>(
+        clock::now() - failed_since).count();
+      auto osdmap = service.get_map();
+      auto failure_report =
+        make_message<MOSDFailure>(monc.get_fsid(),
+                                  osd,
+                                  osdmap->get_addrs(osd),
+                                  static_cast<int>(failed_for),
+                                  osdmap->get_epoch());
+      failure_pending.emplace(osd, failure_info_t{failed_since,
+                                                  osdmap->get_addrs(osd)});
+      return monc.send_message(failure_report);
+    }).then([this] {
+      failure_queue.clear();
+      return seastar::now();
+    });
+}
+
+seastar::future<> Heartbeat::send_still_alive(osd_id_t osd,
+                                              const entity_addrvec_t& addrs)
+{
+  auto still_alive = make_message<MOSDFailure>(monc.get_fsid(),
+                                               osd,
+                                               addrs,
+                                               0,
+                                               service.get_map()->get_epoch(),
+                                               MOSDFailure::FLAG_ALIVE);
+  return monc.send_message(still_alive).then([=] {
+    failure_pending.erase(osd);
+    return seastar::now();
+  });
+}
+
+bool Heartbeat::PeerInfo::is_unhealthy(clock::time_point now) const
+{
+  if (ping_history.empty()) {
+    // we haven't sent a ping yet or we have got all replies,
+    // in either way we are safe and healthy for now
+    return false;
+  } else {
+    auto oldest_ping = ping_history.begin();
+    return now > oldest_ping->second.deadline;
+  }
+}
+
+bool Heartbeat::PeerInfo::is_healthy(clock::time_point now) const
+{
+  if (con_front && clock::is_zero(last_rx_front)) {
+    return false;
+  }
+  if (con_back && clock::is_zero(last_rx_back)) {
+    return false;
+  }
+  // only declare to be healthy until we have received the first
+  // replies from both front/back connections
+  return !is_unhealthy(now);
+}
diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h
new file mode 100644 (file)
index 0000000..f59bbab
--- /dev/null
@@ -0,0 +1,107 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <cstdint>
+#include <seastar/core/future.hh>
+#include "common/ceph_time.h"
+#include "crimson/net/Dispatcher.h"
+#include "crimson/net/Fwd.h"
+
+class MOSDPing;
+class OSDMapService;
+
+namespace ceph::mon {
+  class Client;
+}
+
+template<typename Message> using Ref = boost::intrusive_ptr<Message>;
+
+class Heartbeat : public ceph::net::Dispatcher {
+public:
+  using osd_id_t = int;
+
+  Heartbeat(int whoami,
+           uint32_t nonce,
+           const OSDMapService& service,
+           ceph::mon::Client& monc);
+
+  seastar::future<> start(entity_addrvec_t front,
+                         entity_addrvec_t back);
+  seastar::future<> stop();
+
+  void add_peer(osd_id_t peer);
+  seastar::future<> remove_peer(osd_id_t peer);
+
+  seastar::future<> send_heartbeats();
+  seastar::future<> send_failures();
+
+  const entity_addrvec_t& get_front_addrs() const;
+  const entity_addrvec_t& get_back_addrs() const;
+
+  // Dispatcher methods
+  seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn,
+                               MessageRef m) override;
+
+private:
+  seastar::future<> handle_osd_ping(ceph::net::ConnectionRef conn,
+                                   Ref<MOSDPing> m);
+  seastar::future<> handle_ping(ceph::net::ConnectionRef conn,
+                               Ref<MOSDPing> m);
+  seastar::future<> handle_reply(ceph::net::ConnectionRef conn,
+                                Ref<MOSDPing> m);
+  seastar::future<> handle_you_died();
+
+  seastar::future<> send_still_alive(osd_id_t, const entity_addrvec_t&);
+
+private:
+  std::unique_ptr<ceph::net::Messenger> front_msgr;
+  std::unique_ptr<ceph::net::Messenger> back_msgr;
+  const OSDMapService& service;
+  ceph::mon::Client& monc;
+
+  seastar::timer<seastar::lowres_clock> timer;
+  // use real_clock so it can be converted to utime_t
+  using clock = ceph::coarse_real_clock;
+
+  struct reply_t {
+    clock::time_point deadline;
+    // one sent over front conn, another sent over back conn
+    uint8_t unacknowledged = 0;
+  };
+  struct PeerInfo {
+    /// peer connection (front)
+    ceph::net::ConnectionRef con_front;
+    /// peer connection (back)
+    ceph::net::ConnectionRef con_back;
+    /// time we sent our first ping request
+    clock::time_point first_tx;
+    /// last time we sent a ping request
+    clock::time_point last_tx;
+    /// last time we got a ping reply on the front side
+    clock::time_point last_rx_front;
+    /// last time we got a ping reply on the back side
+    clock::time_point last_rx_back;
+    /// history of inflight pings, arranging by timestamp we sent
+    std::map<utime_t, reply_t> ping_history;
+
+    bool is_unhealthy(clock::time_point now) const;
+    bool is_healthy(clock::time_point now) const;
+  };
+  using peers_map_t = std::map<osd_id_t, PeerInfo>;
+  peers_map_t peers;
+
+  // osds which are considered failed
+  // osd_id => when was the last time that both front and back pings were acked
+  //           use for calculating how long the OSD has been unresponsive
+  using failure_queue_t = std::map<osd_id_t, clock::time_point>;
+  failure_queue_t failure_queue;
+  struct failure_info_t {
+    clock::time_point failed_since;
+    entity_addrvec_t addrs;
+  };
+  // osds we've reported to monior as failed ones, but they are not marked down
+  // yet
+  std::map<osd_id_t, failure_info_t> failure_pending;
+};
index 2650e2cfc39cdee747d572301ead320208c636c6..7b465c336828de5bc0315286fd592d5d69b01f1d 100644 (file)
@@ -12,6 +12,7 @@
 #include "crimson/os/cyan_object.h"
 #include "crimson/os/cyan_store.h"
 #include "crimson/os/Transaction.h"
+#include "crimson/osd/heartbeat.h"
 #include "crimson/osd/osd_meta.h"
 #include "crimson/osd/pg.h"
 #include "crimson/osd/pg_meta.h"
@@ -26,6 +27,7 @@ namespace {
   {
     return {new Message{std::forward<Args>(args)...}, false};
   }
+  static constexpr int TICK_INTERVAL = 1;
 }
 
 using ceph::common::local_conf;
@@ -38,6 +40,8 @@ OSD::OSD(int id, uint32_t nonce)
     public_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami),
                                                "client", nonce}},
     monc{*public_msgr},
+    heartbeat{new Heartbeat{whoami, nonce, *this, monc}},
+    heartbeat_timer{[this] { update_heartbeat_peers(); }}
 {
   for (auto msgr : {cluster_msgr.get(), public_msgr.get()}) {
     if (local_conf()->ms_crc_data) {
@@ -155,6 +159,9 @@ seastar::future<> OSD::start()
     monc.sub_want("mgrmap", 0, 0);
     monc.sub_want("osdmap", 0, 0);
     return monc.renew_subs();
+  }).then([this] {
+    return heartbeat->start(public_msgr->get_myaddrs(),
+                            cluster_msgr->get_myaddrs());
   }).then([this] {
     return start_boot();
   });
@@ -205,15 +212,14 @@ seastar::future<> OSD::_send_boot()
 {
   state.set_booting();
 
-  entity_addrvec_t hb_back_addrs;
-  entity_addrvec_t hb_front_addrs;
-
+  logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs());
+  logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs());
   logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr());
   auto m = make_message<MOSDBoot>(superblock,
                                   osdmap->get_epoch(),
                                   osdmap->get_epoch(),
-                                  hb_back_addrs,
-                                  hb_front_addrs,
+                                  heartbeat->get_back_addrs(),
+                                  heartbeat->get_front_addrs(),
                                   cluster_msgr->get_myaddrs(),
                                   CEPH_FEATURES_ALL);
   return monc.send_message(m);
@@ -224,6 +230,8 @@ seastar::future<> OSD::stop()
   // see also OSD::shutdown()
   state.set_stopping();
   return gate.close().then([this] {
+    return heartbeat->stop();
+  }).then([this] {
     return monc.stop();
   }).then([this] {
     return public_msgr->shutdown();
@@ -496,6 +504,8 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
         state.set_active();
         beacon_timer.arm_periodic(
           std::chrono::seconds(local_conf()->osd_beacon_report_interval));
+        heartbeat_timer.arm_periodic(
+          std::chrono::seconds(TICK_INTERVAL));
       }
     }
 
@@ -575,3 +585,22 @@ seastar::future<> OSD::send_beacon()
                                     min_last_epoch_clean);
   return monc.send_message(m);
 }
+
+void OSD::update_heartbeat_peers()
+{
+  if (!state.is_active()) {
+    return;
+  }
+  for (auto& pg : pgs) {
+    vector<int> up, acting;
+    osdmap->pg_to_up_acting_osds(pg.first.pgid,
+                                 &up, nullptr,
+                                 &acting, nullptr);
+    for (auto osd : boost::join(up, acting)) {
+      if (osd != CRUSH_ITEM_NONE) {
+        heartbeat->add_peer(osd);
+      }
+    }
+  }
+  // TODO: remove down OSD
+}
index 51af15256d9933a872b55706b0c68299e615401a..fc66666fba005cf663bf55bfccb1a7efce5c12fc 100644 (file)
@@ -21,6 +21,7 @@ class MOSDMap;
 class OSDMap;
 class OSDMeta;
 class PG;
+class Heartbeat;
 
 namespace ceph::net {
   class Messenger;
@@ -46,6 +47,9 @@ class OSD : public ceph::net::Dispatcher,
   ChainedDispatchers dispatchers;
   ceph::mon::Client monc;
 
+  std::unique_ptr<Heartbeat> heartbeat;
+  seastar::timer<seastar::lowres_clock> heartbeat_timer;
+
   // TODO: use LRU cache
   std::map<epoch_t, seastar::lw_shared_ptr<OSDMap>> osdmaps;
   std::map<epoch_t, bufferlist> map_bl_cache;
@@ -115,4 +119,5 @@ private:
   seastar::future<> shutdown();
 
   seastar::future<> send_beacon();
+  void update_heartbeat_peers();
 };