]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: promote Heartbeat::Peer to an inner class
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 26 Mar 2020 06:33:37 +0000 (14:33 +0800)
committerKefu Chai <kchai@redhat.com>
Mon, 15 Jun 2020 13:50:46 +0000 (21:50 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/osd/heartbeat.cc
src/crimson/osd/heartbeat.h

index a6e0c1c1039172a57f43f093130e1b885e98959a..7385f840481b79a0e52d925b4f14b184a46b2964 100644 (file)
@@ -113,31 +113,21 @@ void Heartbeat::set_require_authorizer(bool require_authorizer)
   }
 }
 
-void Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
+void Heartbeat::add_peer(osd_id_t _peer, epoch_t epoch)
 {
-  auto [peer_info, added] = peers.try_emplace(peer);
-  auto& info = peer_info->second;
-  info.epoch = epoch;
-  if (added) {
-    logger().info("add_peer({})", peer);
-    auto osdmap = service.get_osdmap_service().get_map();
-    // TODO: use addrs
-    peer_info->second.con_front = front_msgr->connect(
-        osdmap->get_hb_front_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
-    peer_info->second.con_back = back_msgr->connect(
-        osdmap->get_hb_back_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
-  }
+  auto [iter, added] = peers.try_emplace(_peer, *this, _peer);
+  auto& peer = iter->second;
+  peer.set_epoch(epoch);
 }
 
 Heartbeat::osds_t Heartbeat::remove_down_peers()
 {
   osds_t osds;
-  for (auto& peer : peers) {
-    auto osd = peer.first;
+  for (auto& [osd, peer] : peers) {
     auto osdmap = service.get_osdmap_service().get_map();
     if (!osdmap->is_up(osd)) {
       remove_peer(osd);
-    } else if (peer.epoch < osdmap->get_epoch()) {
+    } else if (peer.get_epoch() < osdmap->get_epoch()) {
       osds.push_back(osd);
     }
   }
@@ -192,11 +182,8 @@ void Heartbeat::update_peers(int whoami)
 
 void Heartbeat::remove_peer(osd_id_t peer)
 {
-  logger().info("remove_peer({})", peer);
   auto found = peers.find(peer);
   assert(found != peers.end());
-  found->second.con_front->mark_down();
-  found->second.con_back->mark_down();
   peers.erase(peer);
 }
 
@@ -215,18 +202,11 @@ seastar::future<> Heartbeat::ms_dispatch(crimson::net::Connection* conn,
 
 void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
 {
-  auto found = std::find_if(peers.begin(), peers.end(),
-                            [conn](const peers_map_t::value_type& peer) {
-                              return (peer.second.con_front == conn ||
-                                      peer.second.con_back == conn);
-                            });
-  if (found == peers.end()) {
-    return;
+  // TODO: we should already have enough information to know which peer the
+  // conn belongs, so no need to do linear search here.
+  for (auto& [osd, peer] : peers) {
+    peer.handle_reset(conn);
   }
-  const auto peer = found->first;
-  const auto epoch = found->second.epoch;
-  remove_peer(peer);
-  add_peer(peer, epoch);
 }
 
 seastar::future<> Heartbeat::handle_osd_ping(crimson::net::Connection* conn,
@@ -272,31 +252,7 @@ seastar::future<> Heartbeat::handle_reply(crimson::net::Connection* conn,
     return seastar::now();
   }
   auto& peer = found->second;
-  auto ping = peer.ping_history.find(m->ping_stamp);
-  if (ping == peer.ping_history.end()) {
-    // old replies, deprecated by newly sent pings.
-    return seastar::now();
-  }
-  const auto now = clock::now();
-  auto& unacked = ping->second.unacknowledged;
-  if (conn == peer.con_back.get()) {
-    peer.last_rx_back = now;
-    unacked--;
-  } else if (conn == peer.con_front.get()) {
-    peer.last_rx_front = now;
-    unacked--;
-  }
-  if (unacked == 0) {
-    peer.ping_history.erase(peer.ping_history.begin(), ++ping);
-  }
-  if (peer.is_healthy(now)) {
-    // cancel false reports
-    if (auto pending = failure_pending.find(from);
-        pending != failure_pending.end()) {
-      return send_still_alive(from, pending->second.addrs);
-    }
-  }
-  return seastar::now();
+  return peer.handle_reply(conn, m);
 }
 
 seastar::future<> Heartbeat::handle_you_died()
@@ -309,27 +265,9 @@ void Heartbeat::heartbeat_check()
 {
   failure_queue_t failure_queue;
   const auto now = clock::now();
-  for (const auto& [osd, peer_info]: peers) {
-    if (clock::is_zero(peer_info.first_tx)) {
-      continue;
-    }
-
-    if (peer_info.is_unhealthy(now)) {
-      auto oldest_deadline = peer_info.ping_history.begin()->second.deadline;
-      auto failed_since = std::min(peer_info.last_rx_back,
-                                   peer_info.last_rx_front);
-      if (clock::is_zero(failed_since)) {
-        logger().error("heartbeat_check: no reply from osd.{} "
-                       "ever on either front or back, first ping sent {} "
-                       "(oldest deadline {})",
-                       osd, peer_info.first_tx, oldest_deadline);
-        failed_since = peer_info.first_tx;
-      } else {
-        logger().error("heartbeat_check: no reply from osd.{} "
-                       "since back {} front {} (oldest deadline {})",
-                       osd, peer_info.last_rx_back, peer_info.last_rx_front,
-                       oldest_deadline);
-      }
+  for (const auto& [osd, peer] : peers) {
+    auto failed_since = peer.failed_since(now);
+    if (!clock::is_zero(failed_since)) {
       failure_queue.emplace(osd, failed_since);
     }
   }
@@ -355,37 +293,10 @@ seastar::future<> Heartbeat::send_heartbeats()
 {
   const auto mnow = service.get_mnow();
   const auto now = clock::now();
-  const auto deadline =
-    now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
-  const utime_t sent_stamp{now};
 
   std::vector<seastar::future<>> futures;
-  for (auto& item : peers) {
-    auto& info = item.second;
-    info.last_tx = now;
-    if (clock::is_zero(info.first_tx)) {
-      info.first_tx = now;
-    }
-    [[maybe_unused]] auto [reply, added] =
-      info.ping_history.emplace(sent_stamp, reply_t{deadline, 0});
-    crimson::net::ConnectionRef conns[] = {info.con_front, info.con_back};
-    for (auto& con : conns) {
-      if (con) {
-        auto min_message = static_cast<uint32_t>(
-          local_conf()->osd_heartbeat_min_size);
-        auto ping = make_message<MOSDPing>(
-          monc.get_fsid(),
-          service.get_osdmap_service().get_map()->get_epoch(),
-          MOSDPing::PING,
-          sent_stamp,
-          mnow,
-          mnow,
-          service.get_osdmap_service().get_up_epoch(),
-          min_message);
-        reply->second.unacknowledged++;
-        futures.push_back(con->send(std::move(ping)));
-      }
-    }
+  for (auto& [osd, peer] : peers) {
+    peer.send_heartbeat(now, mnow, futures);
   }
   return seastar::when_all_succeed(futures.begin(), futures.end());
 }
@@ -431,7 +342,37 @@ seastar::future<> Heartbeat::send_still_alive(osd_id_t osd,
   });
 }
 
-bool Heartbeat::PeerInfo::is_unhealthy(clock::time_point now) const
+void Heartbeat::print(std::ostream& out) const
+{
+  out << "heartbeat";
+}
+
+void Heartbeat::Peer::connect()
+{
+  logger().info("peer osd.{} added", peer);
+  auto osdmap = heartbeat.service.get_osdmap_service().get_map();
+  // TODO: use addrs
+  con_front = heartbeat.front_msgr->connect(
+      osdmap->get_hb_front_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
+  con_back = heartbeat.back_msgr->connect(
+      osdmap->get_hb_back_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
+}
+
+Heartbeat::Peer::Peer(Heartbeat& heartbeat, osd_id_t peer)
+  : heartbeat(heartbeat), peer(peer)
+{ connect(); }
+
+void Heartbeat::Peer::disconnect()
+{
+  logger().info("peer osd.{} removed", peer);
+  con_front->mark_down();
+  con_back->mark_down();
+}
+
+Heartbeat::Peer::~Peer()
+{ disconnect(); }
+
+bool Heartbeat::Peer::is_unhealthy(clock::time_point now) const
 {
   if (ping_history.empty()) {
     // we haven't sent a ping yet or we have got all replies,
@@ -443,7 +384,7 @@ bool Heartbeat::PeerInfo::is_unhealthy(clock::time_point now) const
   }
 }
 
-bool Heartbeat::PeerInfo::is_healthy(clock::time_point now) const
+bool Heartbeat::Peer::is_healthy(clock::time_point now) const
 {
   if (con_front && clock::is_zero(last_rx_front)) {
     return false;
@@ -456,7 +397,106 @@ bool Heartbeat::PeerInfo::is_healthy(clock::time_point now) const
   return !is_unhealthy(now);
 }
 
-void Heartbeat::print(std::ostream& out) const
+Heartbeat::clock::time_point
+Heartbeat::Peer::failed_since(clock::time_point now) const
 {
-  out << "heartbeat";
+  if (clock::is_zero(first_tx)) {
+    return clock::zero();
+  }
+  if (!is_unhealthy(now)) {
+    return clock::zero();
+  }
+
+  auto oldest_deadline = ping_history.begin()->second.deadline;
+  auto failed_since = std::min(last_rx_back, last_rx_front);
+  if (clock::is_zero(failed_since)) {
+    logger().error("failed_since: no reply from osd.{} "
+                   "ever on either front or back, first ping sent {} "
+                   "(oldest deadline {})",
+                   peer, first_tx, oldest_deadline);
+    failed_since = first_tx;
+  } else {
+    logger().error("failed_since: no reply from osd.{} "
+                   "since back {} front {} (oldest deadline {})",
+                   peer, last_rx_back, last_rx_front, oldest_deadline);
+  }
+  return failed_since;
+}
+
+void Heartbeat::Peer::send_heartbeat(
+    clock::time_point now,
+    ceph::signedspan mnow,
+    std::vector<seastar::future<>>& futures)
+{
+  if (clock::is_zero(first_tx)) {
+    first_tx = now;
+  }
+  last_tx = now;
+
+  const utime_t sent_stamp{now};
+  const auto deadline =
+    now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
+  [[maybe_unused]] auto [reply, added] =
+    ping_history.emplace(sent_stamp, reply_t{deadline, 0});
+  for (auto& con : {con_front, con_back}) {
+    if (con) {
+      auto min_message = static_cast<uint32_t>(
+        local_conf()->osd_heartbeat_min_size);
+      auto ping = make_message<MOSDPing>(
+        heartbeat.monc.get_fsid(),
+        heartbeat.service.get_osdmap_service().get_map()->get_epoch(),
+        MOSDPing::PING,
+        sent_stamp,
+        mnow,
+        mnow,
+        heartbeat.service.get_osdmap_service().get_up_epoch(),
+        min_message);
+      reply->second.unacknowledged++;
+      futures.push_back(con->send(std::move(ping)));
+    }
+  }
+}
+
+seastar::future<> Heartbeat::Peer::handle_reply(
+    crimson::net::Connection* conn, Ref<MOSDPing> m)
+{
+  auto ping = ping_history.find(m->ping_stamp);
+  if (ping == ping_history.end()) {
+    // old replies, deprecated by newly sent pings.
+    return seastar::now();
+  }
+  const auto now = clock::now();
+  auto& unacked = ping->second.unacknowledged;
+  if (conn == con_back.get()) {
+    last_rx_back = now;
+    unacked--;
+  } else if (conn == con_front.get()) {
+    last_rx_front = now;
+    unacked--;
+  }
+  if (unacked == 0) {
+    ping_history.erase(ping_history.begin(), ++ping);
+  }
+  if (is_healthy(now)) {
+    // cancel false reports
+    if (auto pending = heartbeat.failure_pending.find(peer);
+        pending != heartbeat.failure_pending.end()) {
+      return heartbeat.send_still_alive(peer, pending->second.addrs);
+    }
+  }
+  return seastar::now();
+}
+
+void Heartbeat::Peer::handle_reset(crimson::net::ConnectionRef conn)
+{
+  if (con_front != conn && con_back != conn) {
+    return;
+  }
+  disconnect();
+  first_tx = {};
+  last_tx = {};
+  last_rx_front = {};
+  last_rx_back = {};
+  ping_history = {};
+  connect();
 }
index 2ab4970f59e7ad91529430d57b76005d1cfbf712..63f77a2b9de081df8fbc0d000f8bd25ceb01610f 100644 (file)
@@ -81,34 +81,10 @@ private:
   // use real_clock so it can be converted to utime_t
   using clock = ceph::coarse_real_clock;
 
-  struct reply_t {
-    clock::time_point deadline;
-    // one sent over front conn, another sent over back conn
-    uint8_t unacknowledged = 0;
-  };
-  struct PeerInfo {
-    /// peer connection (front)
-    crimson::net::ConnectionRef con_front;
-    /// peer connection (back)
-    crimson::net::ConnectionRef con_back;
-    /// time we sent our first ping request
-    clock::time_point first_tx;
-    /// last time we sent a ping request
-    clock::time_point last_tx;
-    /// last time we got a ping reply on the front side
-    clock::time_point last_rx_front;
-    /// last time we got a ping reply on the back side
-    clock::time_point last_rx_back;
-    /// most recent epoch we wanted this peer
-    epoch_t epoch;
-    /// history of inflight pings, arranging by timestamp we sent
-    std::map<utime_t, reply_t> ping_history;
-
-    bool is_unhealthy(clock::time_point now) const;
-    bool is_healthy(clock::time_point now) const;
-  };
-  using peers_map_t = std::map<osd_id_t, PeerInfo>;
+  class Peer;
+  using peers_map_t = std::map<osd_id_t, Peer>;
   peers_map_t peers;
+
   // osds which are considered failed
   // osd_id => when was the last time that both front and back pings were acked
   //           or sent.
@@ -132,3 +108,58 @@ inline std::ostream& operator<<(std::ostream& out, const Heartbeat& hb) {
   hb.print(out);
   return out;
 }
+
+class Heartbeat::Peer {
+ public:
+  Peer(Heartbeat&, osd_id_t);
+  ~Peer();
+  Peer(Peer&&) = delete;
+  Peer(const Peer&) = delete;
+  Peer& operator=(const Peer&) = delete;
+
+  void set_epoch(epoch_t epoch_) { epoch = epoch_; }
+  epoch_t get_epoch() const { return epoch; }
+
+  // if failure, return time_point since last active
+  // else, return clock::zero()
+  clock::time_point failed_since(clock::time_point now) const;
+  void send_heartbeat(clock::time_point now,
+                      ceph::signedspan mnow,
+                      std::vector<seastar::future<>>&);
+  seastar::future<> handle_reply(crimson::net::Connection*, Ref<MOSDPing>);
+  void handle_reset(crimson::net::ConnectionRef);
+
+ private:
+  bool is_unhealthy(clock::time_point now) const;
+  bool is_healthy(clock::time_point now) const;
+
+  void connect();
+  void disconnect();
+
+ private:
+  Heartbeat& heartbeat;
+  const osd_id_t peer;
+
+  /// peer connection (front)
+  crimson::net::ConnectionRef con_front;
+  /// peer connection (back)
+  crimson::net::ConnectionRef con_back;
+  /// time we sent our first ping request
+  clock::time_point first_tx;
+  /// last time we sent a ping request
+  clock::time_point last_tx;
+  /// last time we got a ping reply on the front side
+  clock::time_point last_rx_front;
+  /// last time we got a ping reply on the back side
+  clock::time_point last_rx_back;
+  /// most recent epoch we wanted this peer
+  epoch_t epoch;
+
+  struct reply_t {
+    clock::time_point deadline;
+    // one sent over front conn, another sent over back conn
+    uint8_t unacknowledged = 0;
+  };
+  /// history of inflight pings, arranging by timestamp we sent
+  std::map<utime_t, reply_t> ping_history;
+};