]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: refactor Heartbeat::Peer
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 17 Jun 2020 03:39:03 +0000 (11:39 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 17 Jun 2020 03:39:03 +0000 (11:39 +0800)
* encapsulate con_front and con_back as Heartbeat::Connection;
* encapsulate connectivity tracker as Heartbeat::Connector;
* encapsulate the session-alike part of Heartbeat::Peer as
  Heartbeat::Session;

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/osd/heartbeat.cc
src/crimson/osd/heartbeat.h

index b5a40fb8977824cf306231ee632b05485506a306..a5f06ba8a4303449d26d93fba711d8d351377b3d 100644 (file)
@@ -351,69 +351,129 @@ void Heartbeat::print(std::ostream& out) const
   out << "heartbeat";
 }
 
-Heartbeat::Peer::Peer(Heartbeat& heartbeat, osd_id_t peer)
-  : heartbeat(heartbeat), peer(peer)
+Heartbeat::Connection::~Connection()
 {
-  logger().info("Heartbeat::Peer: osd.{} added", peer);
-  connect_front();
-  connect_back();
+  if (conn) {
+    conn->mark_down();
+  }
 }
 
-Heartbeat::Peer::~Peer()
+bool Heartbeat::Connection::match(crimson::net::Connection* _conn) const
 {
-  logger().info("Heartbeat::Peer: osd.{} removed", peer);
-  if (con_front) {
-    con_front->mark_down();
+  if (conn && conn.get() == _conn) {
+    return true;
+  } else {
+    return false;
   }
-  if (con_back) {
-    con_back->mark_down();
+}
+
+void Heartbeat::Connection::accepted(crimson::net::ConnectionRef accepted_conn)
+{
+  if (!conn) {
+    if (accepted_conn->get_peer_addr() == connector.get_peer_addr(type)) {
+      logger().info("Heartbeat::Connection::accepted(): "
+                    "{} racing resolved", *this);
+      conn = accepted_conn;
+      set_connected();
+    }
+  } else if (conn == accepted_conn) {
+    set_connected();
   }
 }
 
-bool Heartbeat::Peer::pinged() const
+void Heartbeat::Connection::replaced()
 {
-  if (clock::is_zero(first_tx)) {
-    // i can never receive a pong without sending any ping message first.
-    assert(clock::is_zero(last_rx_front) &&
-          clock::is_zero(last_rx_back));
-    return false;
+  assert(!is_connected);
+  auto replaced_conn = conn;
+  // set the racing connection, will be handled by handle_accept()
+  conn = msgr.connect(replaced_conn->get_peer_addr(),
+                      replaced_conn->get_peer_name());
+  racing_detected = true;
+  logger().warn("Heartbeat::Connection::replaced(): {} racing", *this);
+  assert(conn != replaced_conn);
+  assert(!conn->is_connected());
+}
+
+void Heartbeat::Connection::reset()
+{
+  conn = nullptr;
+  if (is_connected) {
+    is_connected = false;
+    connector.decrease_connected();
+  }
+  if (!racing_detected || is_winner_side) {
+    connect();
   } else {
-    return true;
+    logger().info("Heartbeat::Connection::reset(): "
+                  "{} racing detected and lose, "
+                  "waiting for peer connect me", *this);
   }
 }
 
-Heartbeat::Peer::health_state
-Heartbeat::Peer::do_health_screen(clock::time_point now) const
+seastar::future<> Heartbeat::Connection::send(MessageRef msg)
 {
-  if (!pinged()) {
-    // we are not healty nor unhealty because we haven't sent anything yet
-    return health_state::UNKNOWN;
-  } else if (!ping_history.empty() && ping_history.begin()->second.deadline < now) {
-    return health_state::UNHEALTHY;
-  } else if (!clock::is_zero(last_rx_front) &&
-             !clock::is_zero(last_rx_back)) {
-    // only declare to be healthy until we have received the first
-    // replies from both front/back connections
-    return health_state::HEALTHY;
-  } else {
-    return health_state::UNKNOWN;
+  assert(is_connected);
+  return conn->send(msg);
+}
+
+void Heartbeat::Connection::validate()
+{
+  assert(is_connected);
+  auto peer_addr = connector.get_peer_addr(type);
+  if (conn->get_peer_addr() != peer_addr) {
+    logger().info("Heartbeat::Connection::validate(): "
+                  "{} has new address {} over {}, reset",
+                  *this, peer_addr, conn->get_peer_addr());
+    conn->mark_down();
+    racing_detected = false;
+    reset();
+  }
+}
+
+void Heartbeat::Connection::retry()
+{
+  racing_detected = false;
+  if (!is_connected) {
+    if (conn) {
+      conn->mark_down();
+      reset();
+    } else {
+      connect();
+    }
+  }
+}
+
+void Heartbeat::Connection::set_connected()
+{
+  assert(!is_connected);
+  is_connected = true;
+  connector.increase_connected();
+}
+
+void Heartbeat::Connection::connect()
+{
+  assert(!conn);
+  auto addr = connector.get_peer_addr(type);
+  conn = msgr.connect(addr, entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
+  if (conn->is_connected()) {
+    set_connected();
   }
 }
 
 Heartbeat::clock::time_point
-Heartbeat::Peer::failed_since(clock::time_point now) const
+Heartbeat::Session::failed_since(Heartbeat::clock::time_point now) const
 {
   if (do_health_screen(now) == health_state::UNHEALTHY) {
     auto oldest_deadline = ping_history.begin()->second.deadline;
     auto failed_since = std::min(last_rx_back, last_rx_front);
     if (clock::is_zero(failed_since)) {
-      logger().error("Heartbeat::Peer::failed_since(): no reply from osd.{} "
+      logger().error("Heartbeat::Session::failed_since(): no reply from osd.{} "
                      "ever on either front or back, first ping sent {} "
                      "(oldest deadline {})",
                      peer, first_tx, oldest_deadline);
       failed_since = first_tx;
     } else {
-      logger().error("Heartbeat::Peer::failed_since(): no reply from osd.{} "
+      logger().error("Heartbeat::Session::failed_since(): no reply from osd.{} "
                      "since back {} front {} (oldest deadline {})",
                      peer, last_rx_back, last_rx_front, oldest_deadline);
     }
@@ -423,107 +483,58 @@ Heartbeat::Peer::failed_since(clock::time_point now) const
   }
 }
 
-void Heartbeat::Peer::do_send_heartbeat(
-    clock::time_point now,
-    ceph::signedspan mnow,
-    std::vector<seastar::future<>>* futures)
+void Heartbeat::Session::set_inactive_history(clock::time_point now)
 {
-  assert(session_started);
-  const utime_t sent_stamp{now};
-  const auto deadline =
-    now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
-  [[maybe_unused]] auto [reply, added] =
-    ping_history.emplace(sent_stamp, reply_t{deadline, 0});
-  for (auto& con : {con_front, con_back}) {
-    auto min_message = static_cast<uint32_t>(
-      local_conf()->osd_heartbeat_min_size);
-    auto ping = make_message<MOSDPing>(
-      heartbeat.monc.get_fsid(),
-      heartbeat.service.get_osdmap_service().get_map()->get_epoch(),
-      MOSDPing::PING,
-      sent_stamp,
-      mnow,
-      mnow,
-      heartbeat.service.get_osdmap_service().get_up_epoch(),
-      min_message);
-    reply->second.unacknowledged++;
-    if (futures) {
-      futures->push_back(con->send(std::move(ping)));
-    }
+  assert(!started);
+  if (ping_history.empty()) {
+    const utime_t sent_stamp{now};
+    const auto deadline =
+      now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
+    [[maybe_unused]] auto [reply, added] =
+      ping_history.emplace(sent_stamp, reply_t{deadline, 0});
+  } else { // the entry is already added
+    assert(ping_history.size() == 1);
   }
 }
 
+Heartbeat::Peer::Peer(Heartbeat& heartbeat, osd_id_t peer)
+  : Connector(2), heartbeat{heartbeat}, peer{peer}, session{peer},
+  con_front(peer, heartbeat.whoami > peer, Connection::type_t::front,
+            *heartbeat.front_msgr, *this),
+  con_back(peer, heartbeat.whoami > peer, Connection::type_t::back,
+           *heartbeat.back_msgr, *this)
+{
+  logger().info("Heartbeat::Peer: osd.{} added", peer);
+}
+
+Heartbeat::Peer::~Peer()
+{
+  logger().info("Heartbeat::Peer: osd.{} removed", peer);
+}
+
 void Heartbeat::Peer::send_heartbeat(
-    clock::time_point now,
-    ceph::signedspan mnow,
+    clock::time_point now, ceph::signedspan mnow,
     std::vector<seastar::future<>>& futures)
 {
-  if (!pinged()) {
-    first_tx = now;
-  }
-  last_tx = now;
-
-  if (session_started) {
+  session.set_tx(now);
+  if (session.is_started()) {
     do_send_heartbeat(now, mnow, &futures);
-
-    // validate connection addresses
-    const auto osdmap = heartbeat.service.get_osdmap_service().get_map();
-    const auto front_addr = osdmap->get_hb_front_addrs(peer).front();
-    if (con_front->get_peer_addr() != front_addr) {
-      logger().info("Heartbeat::Peer::send_heartbeat(): "
-                    "peer osd.{} con_front has new address {} over {}, reset",
-                    peer, front_addr, con_front->get_peer_addr());
-      con_front->mark_down();
-      has_racing = false;
-      handle_reset(con_front, false);
-    }
-    const auto back_addr = osdmap->get_hb_back_addrs(peer).front();
-    if (con_back->get_peer_addr() != back_addr) {
-      logger().info("Heartbeat::Peer::send_heartbeat(): "
-                    "peer osd.{} con_back has new address {} over {}, reset",
-                    peer, back_addr, con_back->get_peer_addr());
-      con_back->mark_down();
-      has_racing = false;
-      handle_reset(con_back, false);
-    }
+    for_each_conn([] (auto& conn) {
+      conn.validate();
+    });
   } else {
     // we should send MOSDPing but still cannot at this moment
     if (pending_send) {
       // we have already pending for a entire heartbeat interval
       logger().warn("Heartbeat::Peer::send_heartbeat(): "
-                    "heartbeat to {} is still pending...", peer);
-      has_racing = false;
-      // retry con_front if still pending
-      if (!front_ready) {
-        if (con_front) {
-          con_front->mark_down();
-          handle_reset(con_front, false);
-        } else {
-          connect_front();
-        }
-      }
-      // retry con_back if still pending
-      if (!back_ready) {
-        if (con_back) {
-          con_back->mark_down();
-          handle_reset(con_back, false);
-        } else {
-          connect_back();
-        }
-      }
+                    "heartbeat to osd.{} is still pending...", peer);
+      for_each_conn([] (auto& conn) {
+        conn.retry();
+      });
     } else {
       logger().info("Heartbeat::Peer::send_heartbeat(): "
-                    "heartbeat to {} is pending send...", peer);
-      // maintain an entry in ping_history for unhealthy check
-      if (ping_history.empty()) {
-        const utime_t sent_stamp{now};
-        const auto deadline =
-          now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
-        [[maybe_unused]] auto [reply, added] =
-          ping_history.emplace(sent_stamp, reply_t{deadline, 0});
-      } else { // the entry is already added
-        assert(ping_history.size() == 1);
-      }
+                    "heartbeat to osd.{} is pending send...", peer);
+      session.set_inactive_history(now);
       pending_send = true;
     }
   }
@@ -532,215 +543,79 @@ void Heartbeat::Peer::send_heartbeat(
 seastar::future<> Heartbeat::Peer::handle_reply(
     crimson::net::Connection* conn, Ref<MOSDPing> m)
 {
-  if (!session_started) {
+  if (!session.is_started()) {
     // we haven't sent any ping yet
     return seastar::now();
   }
-  auto ping = ping_history.find(m->ping_stamp);
-  if (ping == ping_history.end()) {
-    // old replies, deprecated by newly sent pings.
+  type_t type;
+  if (con_front.match(conn)) {
+    type = type_t::front;
+  } else if (con_back.match(conn)) {
+    type = type_t::back;
+  } else {
     return seastar::now();
   }
   const auto now = clock::now();
-  auto& unacked = ping->second.unacknowledged;
-  assert(unacked);
-  if (conn == con_back.get()) {
-    last_rx_back = now;
-    unacked--;
-  } else if (conn == con_front.get()) {
-    last_rx_front = now;
-    unacked--;
-  }
-  if (unacked == 0) {
-    ping_history.erase(ping_history.begin(), ++ping);
-  }
-  if (do_health_screen(now) == health_state::HEALTHY) {
-    return heartbeat.failing_peers.cancel_one(peer);
-  }
-  return seastar::now();
-}
-
-void Heartbeat::Peer::handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
-{
-  if (con_front == conn) {
-    con_front = nullptr;
-    if (is_replace) {
-      assert(!front_ready);
-      assert(!session_started);
-      // set the racing connection, will be handled by handle_accept()
-      con_front = heartbeat.front_msgr->connect(
-          conn->get_peer_addr(), entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
-      has_racing = true;
-      logger().warn("Heartbeat::Peer::handle_reset(): "
-                    "con_front racing with osd.{}, updated by {}",
-                    peer, con_front);
-    } else {
-      if (front_ready) {
-        front_ready = false;
-      }
-      if (session_started) {
-        reset_session();
-      }
-      assert(heartbeat.whoami != peer);
-      if (heartbeat.whoami > peer || !has_racing) {
-        connect_front();
-      } else { // whoami < peer && has_racing
-        logger().info("Heartbeat::Peer::handle_reset(): "
-                      "con_front racing detected and lose, "
-                      "waiting for osd.{} connect me", peer);
-      }
+  if (session.handle_reply(m->ping_stamp, type, now)) {
+    if (session.do_health_screen(now) == Session::health_state::HEALTHY) {
+      return heartbeat.failing_peers.cancel_one(peer);
     }
-  } else if (con_back == conn) {
-    con_back = nullptr;
-    if (is_replace) {
-      assert(!back_ready);
-      assert(!session_started);
-      // set the racing connection, will be handled by handle_accept()
-      con_back = heartbeat.back_msgr->connect(
-          conn->get_peer_addr(), entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
-      has_racing = true;
-      logger().warn("Heartbeat::Peer::handle_reset(): "
-                    "con_back racing with osd.{}, updated by {}",
-                    peer, con_back);
-    } else {
-      if (back_ready) {
-        back_ready = false;
-      }
-      if (session_started) {
-        reset_session();
-      }
-      if (heartbeat.whoami == peer) {
-        logger().error("Heartbeat::Peer::handle_reset(): "
-                       "peer is myself ({})", peer);
-      } else if (heartbeat.whoami > peer || !has_racing) {
-        connect_back();
-      } else { // whoami < peer && has_racing
-        logger().info("Heartbeat::Peer::handle_reset(): "
-                      "con_back racing detected and lose, "
-                      "waiting for osd.{} connect me", peer);
-      }
-    }
-  } else {
-    // ignore the unrelated conn
-  }
-}
-
-void Heartbeat::Peer::handle_connect(crimson::net::ConnectionRef conn)
-{
-  if (con_front == conn) {
-    assert(!front_ready);
-    assert(!session_started);
-    notify_front_ready();
-  } else if (con_back == conn) {
-    assert(!back_ready);
-    assert(!session_started);
-    notify_back_ready();
-  } else {
-    // ignore the unrelated connection
   }
+  return seastar::now();
 }
 
-void Heartbeat::Peer::handle_accept(crimson::net::ConnectionRef conn)
+entity_addr_t Heartbeat::Peer::get_peer_addr(type_t type)
 {
-  handle_connect(conn);
-
-  const auto peer_addr = conn->get_peer_addr();
   const auto osdmap = heartbeat.service.get_osdmap_service().get_map();
-  const auto front_addr = osdmap->get_hb_front_addrs(peer).front();
-  if (!con_front && front_addr == peer_addr) {
-    logger().info("Heartbeat::Peer::handle_accept(): "
-                  "con_front racing resolved for osd.{}", peer);
-    con_front = conn;
-    notify_front_ready();
-  }
-  const auto back_addr = osdmap->get_hb_back_addrs(peer).front();
-  if (!con_back && back_addr == peer_addr) {
-    logger().info("Heartbeat::Peer::handle_accept(): "
-                  "con_back racing resolved for osd.{}", peer);
-    con_back = conn;
-    notify_back_ready();
+  if (type == type_t::front) {
+    return osdmap->get_hb_front_addrs(peer).front();
+  } else {
+    return osdmap->get_hb_back_addrs(peer).front();
   }
 }
 
-void Heartbeat::Peer::start_session()
+void Heartbeat::Peer::all_connected()
 {
   logger().info("Heartbeat::Peer: osd.{} started (send={})",
                 peer, pending_send);
-  assert(!session_started);
-  session_started = true;
-  ping_history.clear();
+  session.start();
   if (pending_send) {
     pending_send = false;
     do_send_heartbeat(clock::now(), heartbeat.service.get_mnow(), nullptr);
   }
 }
 
-void Heartbeat::Peer::reset_session()
+void Heartbeat::Peer::connection_lost()
 {
   logger().info("Heartbeat::Peer: osd.{} reset", peer);
-  assert(session_started);
-  if (!ping_history.empty()) {
-    // we lost our ping_history of the last session, but still need to keep
-    // the oldest deadline for unhealthy check.
-    auto oldest = ping_history.begin();
-    auto sent_stamp = oldest->first;
-    auto deadline = oldest->second.deadline;
-    ping_history.clear();
-    ping_history.emplace(sent_stamp, reply_t{deadline, 0});
-  }
-  session_started = false;
-}
-
-void Heartbeat::Peer::notify_front_ready()
-{
-  assert(con_front);
-  assert(!front_ready);
-  assert(!session_started);
-  front_ready = true;
-  if (front_ready && back_ready) {
-    start_session();
-  }
-}
-
-void Heartbeat::Peer::connect_front()
-{
-  assert(!con_front);
-  assert(!front_ready);
-  assert(!session_started);
-  auto osdmap = heartbeat.service.get_osdmap_service().get_map();
-  // TODO: use addrs
-  con_front = heartbeat.front_msgr->connect(
-      osdmap->get_hb_front_addrs(peer).front(),
-      entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
-  if (con_front->is_connected()) {
-    notify_front_ready();
-  }
-}
-
-void Heartbeat::Peer::notify_back_ready()
-{
-  assert(con_back);
-  assert(!back_ready);
-  assert(!session_started);
-  back_ready = true;
-  if (front_ready && back_ready) {
-    start_session();
-  }
+  session.lost();
 }
 
-void Heartbeat::Peer::connect_back()
+void Heartbeat::Peer::do_send_heartbeat(
+    Heartbeat::clock::time_point now,
+    ceph::signedspan mnow,
+    std::vector<seastar::future<>>* futures)
 {
-  assert(!con_back);
-  assert(!back_ready);
-  assert(!session_started);
-  auto osdmap = heartbeat.service.get_osdmap_service().get_map();
-  // TODO: use addrs
-  con_back = heartbeat.back_msgr->connect(
-      osdmap->get_hb_back_addrs(peer).front(),
-      entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
-  if (con_back->is_connected()) {
-    notify_back_ready();
-  }
+  const utime_t sent_stamp{now};
+  const auto deadline =
+    now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
+  session.emplace_history(sent_stamp, deadline);
+  for_each_conn([&, this] (auto& conn) {
+    auto min_message = static_cast<uint32_t>(
+      local_conf()->osd_heartbeat_min_size);
+    auto ping = make_message<MOSDPing>(
+      heartbeat.monc.get_fsid(),
+      heartbeat.service.get_osdmap_service().get_map()->get_epoch(),
+      MOSDPing::PING,
+      sent_stamp,
+      mnow,
+      mnow,
+      heartbeat.service.get_osdmap_service().get_up_epoch(),
+      min_message);
+    if (futures) {
+      futures->push_back(conn.send(std::move(ping)));
+    }
+  });
 }
 
 bool Heartbeat::FailingPeers::add_pending(
index dd1ab949a7d9922769b563e8d3b76aa7a3aa5fc9..f9f54228ebcc8815347351ca524c218882b96c13 100644 (file)
@@ -83,6 +83,9 @@ private:
   // use real_clock so it can be converted to utime_t
   using clock = ceph::coarse_real_clock;
 
+  class Connector;
+  class Connection;
+  class Session;
   class Peer;
   using peers_map_t = std::map<osd_id_t, Peer>;
   peers_map_t peers;
@@ -127,56 +130,190 @@ inline std::ostream& operator<<(std::ostream& out, const Heartbeat& hb) {
   return out;
 }
 
-class Heartbeat::Peer {
+class Heartbeat::Connector {
  public:
-  Peer(Heartbeat&, osd_id_t);
-  ~Peer();
-  Peer(Peer&&) = delete;
-  Peer(const Peer&) = delete;
-  Peer& operator=(const Peer&) = delete;
+  Connector(size_t connections) : connections{connections} {}
+
+  void increase_connected() {
+    assert(connected < connections);
+    ++connected;
+    if (connected == connections) {
+      all_connected();
+    }
+  }
+  void decrease_connected() {
+    assert(connected > 0);
+    if (connected == connections) {
+      connection_lost();
+    }
+    --connected;
+  }
+  enum class type_t { front, back };
+  virtual entity_addr_t get_peer_addr(type_t) = 0;
+
+ protected:
+  virtual void all_connected() = 0;
+  virtual void connection_lost() = 0;
 
-  void set_epoch(epoch_t epoch_) { epoch = epoch_; }
-  epoch_t get_epoch() const { return epoch; }
+ private:
+  const size_t connections;
+  size_t connected = 0;
+};
 
-  // if failure, return time_point since last active
-  // else, return clock::zero()
-  clock::time_point failed_since(clock::time_point now) const;
-  void send_heartbeat(clock::time_point now,
-                      ceph::signedspan mnow,
-                      std::vector<seastar::future<>>&);
-  seastar::future<> handle_reply(crimson::net::Connection*, Ref<MOSDPing>);
-  void handle_reset(crimson::net::ConnectionRef, bool is_replace);
-  void handle_connect(crimson::net::ConnectionRef);
-  void handle_accept(crimson::net::ConnectionRef);
+class Heartbeat::Connection {
+ public:
+  using type_t = Connector::type_t;
+  Connection(osd_id_t peer, bool is_winner_side, type_t type,
+             crimson::net::Messenger& msgr, Connector& connector)
+    : peer{peer}, is_winner_side{is_winner_side}, type{type},
+      msgr{msgr}, connector{connector} {
+    connect();
+  }
+  ~Connection();
+
+  bool match(crimson::net::Connection* _conn) const;
+  bool match(crimson::net::ConnectionRef conn) const {
+    return match(conn.get());
+  }
+  void connected() {
+    set_connected();
+  }
+  void accepted(crimson::net::ConnectionRef);
+  void replaced();
+  void reset();
+  seastar::future<> send(MessageRef msg);
+  void validate();
+  // retry connection if still pending
+  void retry();
 
  private:
-  bool pinged() const;
+  void set_connected();
+  void connect();
+
+  const osd_id_t peer;
+  const bool is_winner_side;
+  const type_t type;
+  crimson::net::Messenger& msgr;
+  Connector& connector;
+
+  crimson::net::ConnectionRef conn;
+  bool is_connected = false;
+  bool racing_detected = false;
+
+ friend std::ostream& operator<<(std::ostream& os, const Connection c) {
+   if (c.type == type_t::front) {
+     return os << "con_front(osd." << c.peer << ")";
+   } else {
+     return os << "con_back(osd." << c.peer << ")";
+   }
+ }
+};
+
+class Heartbeat::Session {
+ public:
+  Session(osd_id_t peer) : peer{peer} {}
+
+  void set_epoch(epoch_t epoch_) { epoch = epoch_; }
+  epoch_t get_epoch() const { return epoch; }
+  bool is_started() const { return started; }
+  bool pinged() const {
+    if (clock::is_zero(first_tx)) {
+      // i can never receive a pong without sending any ping message first.
+      assert(clock::is_zero(last_rx_front) &&
+             clock::is_zero(last_rx_back));
+      return false;
+    } else {
+      return true;
+    }
+  }
+
   enum class health_state {
     UNKNOWN,
     UNHEALTHY,
     HEALTHY,
   };
-  health_state do_health_screen(clock::time_point now) const;
-
-  // a session starts when con_front and con_back are both connected
-  void start_session();
-  // a session resets when either con_front or con_back is reset
-  void reset_session();
-  // notify when con_front becomes ready, possibly start session
-  void notify_front_ready();
-  void connect_front();
-  // notify when con_back becomes ready, possibly start session
-  void notify_back_ready();
-  void connect_back();
-
-  void do_send_heartbeat(clock::time_point now,
-                         ceph::signedspan mnow,
-                         std::vector<seastar::future<>>*);
+  health_state do_health_screen(clock::time_point now) const {
+    if (!pinged()) {
+      // we are not healty nor unhealty because we haven't sent anything yet
+      return health_state::UNKNOWN;
+    } else if (!ping_history.empty() && ping_history.begin()->second.deadline < now) {
+      return health_state::UNHEALTHY;
+    } else if (!clock::is_zero(last_rx_front) &&
+               !clock::is_zero(last_rx_back)) {
+      // only declare to be healthy until we have received the first
+      // replies from both front/back connections
+      return health_state::HEALTHY;
+    } else {
+      return health_state::UNKNOWN;
+    }
+  }
+
+  clock::time_point failed_since(clock::time_point now) const;
+
+  void set_tx(clock::time_point now) {
+    if (!pinged()) {
+      first_tx = now;
+    }
+    last_tx = now;
+  }
+
+  void start() {
+    assert(!started);
+    started = true;
+    ping_history.clear();
+  }
+
+  void emplace_history(const utime_t& sent_stamp,
+                       const clock::time_point& deadline) {
+    assert(started);
+    [[maybe_unused]] auto [reply, added] =
+      ping_history.emplace(sent_stamp, reply_t{deadline, 2});
+  }
+
+  bool handle_reply(const utime_t& ping_stamp,
+                    Connection::type_t type,
+                    clock::time_point now) {
+    assert(started);
+    auto ping = ping_history.find(ping_stamp);
+    if (ping == ping_history.end()) {
+      // old replies, deprecated by newly sent pings.
+      return false;
+    }
+    auto& unacked = ping->second.unacknowledged;
+    assert(unacked);
+    if (type == Connection::type_t::front) {
+      last_rx_front = now;
+      unacked--;
+    } else {
+      last_rx_back = now;
+      unacked--;
+    }
+    if (unacked == 0) {
+      ping_history.erase(ping_history.begin(), ++ping);
+    }
+    return true;
+  }
+
+  void lost() {
+    assert(started);
+    started = false;
+    if (!ping_history.empty()) {
+      // we lost our ping_history of the last session, but still need to keep
+      // the oldest deadline for unhealthy check.
+      auto oldest = ping_history.begin();
+      auto sent_stamp = oldest->first;
+      auto deadline = oldest->second.deadline;
+      ping_history.clear();
+      ping_history.emplace(sent_stamp, reply_t{deadline, 0});
+    }
+  }
+
+  // maintain an entry in ping_history for unhealthy check
+  void set_inactive_history(clock::time_point);
 
  private:
-  Heartbeat& heartbeat;
   const osd_id_t peer;
-
+  bool started = false;
   // time we sent our first ping request
   clock::time_point first_tx;
   // last time we sent a ping request
@@ -188,20 +325,6 @@ class Heartbeat::Peer {
   // most recent epoch we wanted this peer
   epoch_t epoch;
 
-  // if racing happened
-  bool has_racing = false;
-  // peer connection (front)
-  crimson::net::ConnectionRef con_front;
-  bool front_ready = false;
-  // peer connection (back)
-  crimson::net::ConnectionRef con_back;
-  bool back_ready = false;
-
-  // start to send pings and track ping_history
-  bool session_started = false;
-  // if need to send heartbeat when session started
-  bool pending_send = false;
-
   struct reply_t {
     clock::time_point deadline;
     // one sent over front conn, another sent over back conn
@@ -210,3 +333,68 @@ class Heartbeat::Peer {
   // history of inflight pings, arranging by timestamp we sent
   std::map<utime_t, reply_t> ping_history;
 };
+
+class Heartbeat::Peer final : private Heartbeat::Connector {
+ public:
+  Peer(Heartbeat&, osd_id_t);
+  ~Peer();
+  Peer(Peer&&) = delete;
+  Peer(const Peer&) = delete;
+  Peer& operator=(const Peer&) = delete;
+
+  void set_epoch(epoch_t epoch) { session.set_epoch(epoch); }
+  epoch_t get_epoch() const { return session.get_epoch(); }
+
+  // if failure, return time_point since last active
+  // else, return clock::zero()
+  clock::time_point failed_since(clock::time_point now) const {
+    return session.failed_since(now);
+  }
+  void send_heartbeat(
+      clock::time_point, ceph::signedspan, std::vector<seastar::future<>>&);
+  seastar::future<> handle_reply(crimson::net::Connection*, Ref<MOSDPing>);
+  void handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
+    for_each_conn([&] (auto& _conn) {
+      if (_conn.match(conn)) {
+        if (is_replace) {
+          _conn.replaced();
+        } else {
+          _conn.reset();
+        }
+      }
+    });
+  }
+  void handle_connect(crimson::net::ConnectionRef conn) {
+    for_each_conn([&] (auto& _conn) {
+      if (_conn.match(conn)) {
+        _conn.connected();
+      }
+    });
+  }
+  void handle_accept(crimson::net::ConnectionRef conn) {
+    for_each_conn([&] (auto& _conn) {
+      _conn.accepted(conn);
+    });
+  }
+
+ private:
+  entity_addr_t get_peer_addr(type_t type) override;
+  void all_connected() override;
+  void connection_lost() override;
+  void do_send_heartbeat(
+      clock::time_point, ceph::signedspan, std::vector<seastar::future<>>*);
+
+  template <typename Func>
+  void for_each_conn(Func&& f) {
+    f(con_front);
+    f(con_back);
+  }
+
+  Heartbeat& heartbeat;
+  const osd_id_t peer;
+  Session session;
+  // if need to send heartbeat when session started
+  bool pending_send = false;
+  Connection con_front;
+  Connection con_back;
+};