]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: improve readability of Heartbeat::Peer classes
authorYingxin Cheng <yingxin.cheng@intel.com>
Sun, 28 Jun 2020 02:59:15 +0000 (10:59 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 28 Jun 2020 16:16:05 +0000 (00:16 +0800)
* Add rationales to introduce Heartbeat::Peer class series.
* Find better names for internal interfaces.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/osd/heartbeat.cc
src/crimson/osd/heartbeat.h

index a5f06ba8a4303449d26d93fba711d8d351377b3d..e3b8b3c6a2a50cf642b1d8a17362029127568f29 100644 (file)
@@ -358,19 +358,15 @@ Heartbeat::Connection::~Connection()
   }
 }
 
-bool Heartbeat::Connection::match(crimson::net::Connection* _conn) const
+bool Heartbeat::Connection::matches(crimson::net::Connection* _conn) const
 {
-  if (conn && conn.get() == _conn) {
-    return true;
-  } else {
-    return false;
-  }
+  return (conn && conn.get() == _conn);
 }
 
 void Heartbeat::Connection::accepted(crimson::net::ConnectionRef accepted_conn)
 {
   if (!conn) {
-    if (accepted_conn->get_peer_addr() == connector.get_peer_addr(type)) {
+    if (accepted_conn->get_peer_addr() == listener.get_peer_addr(type)) {
       logger().info("Heartbeat::Connection::accepted(): "
                     "{} racing resolved", *this);
       conn = accepted_conn;
@@ -399,7 +395,7 @@ void Heartbeat::Connection::reset()
   conn = nullptr;
   if (is_connected) {
     is_connected = false;
-    connector.decrease_connected();
+    listener.decrease_connected();
   }
   if (!racing_detected || is_winner_side) {
     connect();
@@ -419,7 +415,7 @@ seastar::future<> Heartbeat::Connection::send(MessageRef msg)
 void Heartbeat::Connection::validate()
 {
   assert(is_connected);
-  auto peer_addr = connector.get_peer_addr(type);
+  auto peer_addr = listener.get_peer_addr(type);
   if (conn->get_peer_addr() != peer_addr) {
     logger().info("Heartbeat::Connection::validate(): "
                   "{} has new address {} over {}, reset",
@@ -447,13 +443,13 @@ void Heartbeat::Connection::set_connected()
 {
   assert(!is_connected);
   is_connected = true;
-  connector.increase_connected();
+  listener.increase_connected();
 }
 
 void Heartbeat::Connection::connect()
 {
   assert(!conn);
-  auto addr = connector.get_peer_addr(type);
+  auto addr = listener.get_peer_addr(type);
   conn = msgr.connect(addr, entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
   if (conn->is_connected()) {
     set_connected();
@@ -485,20 +481,19 @@ Heartbeat::Session::failed_since(Heartbeat::clock::time_point now) const
 
 void Heartbeat::Session::set_inactive_history(clock::time_point now)
 {
-  assert(!started);
+  assert(!connected);
   if (ping_history.empty()) {
     const utime_t sent_stamp{now};
     const auto deadline =
       now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
-    [[maybe_unused]] auto [reply, added] =
-      ping_history.emplace(sent_stamp, reply_t{deadline, 0});
+    ping_history.emplace(sent_stamp, reply_t{deadline, 0});
   } else { // the entry is already added
     assert(ping_history.size() == 1);
   }
 }
 
 Heartbeat::Peer::Peer(Heartbeat& heartbeat, osd_id_t peer)
-  : Connector(2), heartbeat{heartbeat}, peer{peer}, session{peer},
+  : ConnectionListener(2), heartbeat{heartbeat}, peer{peer}, session{peer},
   con_front(peer, heartbeat.whoami > peer, Connection::type_t::front,
             *heartbeat.front_msgr, *this),
   con_back(peer, heartbeat.whoami > peer, Connection::type_t::back,
@@ -548,15 +543,15 @@ seastar::future<> Heartbeat::Peer::handle_reply(
     return seastar::now();
   }
   type_t type;
-  if (con_front.match(conn)) {
+  if (con_front.matches(conn)) {
     type = type_t::front;
-  } else if (con_back.match(conn)) {
+  } else if (con_back.matches(conn)) {
     type = type_t::back;
   } else {
     return seastar::now();
   }
   const auto now = clock::now();
-  if (session.handle_reply(m->ping_stamp, type, now)) {
+  if (session.on_pong(m->ping_stamp, type, now)) {
     if (session.do_health_screen(now) == Session::health_state::HEALTHY) {
       return heartbeat.failing_peers.cancel_one(peer);
     }
@@ -574,21 +569,21 @@ entity_addr_t Heartbeat::Peer::get_peer_addr(type_t type)
   }
 }
 
-void Heartbeat::Peer::all_connected()
+void Heartbeat::Peer::on_connected()
 {
-  logger().info("Heartbeat::Peer: osd.{} started (send={})",
+  logger().info("Heartbeat::Peer: osd.{} connected (send={})",
                 peer, pending_send);
-  session.start();
+  session.on_connected();
   if (pending_send) {
     pending_send = false;
     do_send_heartbeat(clock::now(), heartbeat.service.get_mnow(), nullptr);
   }
 }
 
-void Heartbeat::Peer::connection_lost()
+void Heartbeat::Peer::on_disconnected()
 {
-  logger().info("Heartbeat::Peer: osd.{} reset", peer);
-  session.lost();
+  logger().info("Heartbeat::Peer: osd.{} disconnected", peer);
+  session.on_disconnected();
 }
 
 void Heartbeat::Peer::do_send_heartbeat(
@@ -599,7 +594,7 @@ void Heartbeat::Peer::do_send_heartbeat(
   const utime_t sent_stamp{now};
   const auto deadline =
     now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
-  session.emplace_history(sent_stamp, deadline);
+  session.on_ping(sent_stamp, deadline);
   for_each_conn([&, this] (auto& conn) {
     auto min_message = static_cast<uint32_t>(
       local_conf()->osd_heartbeat_min_size);
index f9f54228ebcc8815347351ca524c218882b96c13..55f779a3a8b29d7e205cc98d123d2c481049dcb6 100644 (file)
@@ -83,7 +83,7 @@ private:
   // use real_clock so it can be converted to utime_t
   using clock = ceph::coarse_real_clock;
 
-  class Connector;
+  class ConnectionListener;
   class Connection;
   class Session;
   class Peer;
@@ -130,21 +130,25 @@ inline std::ostream& operator<<(std::ostream& out, const Heartbeat& hb) {
   return out;
 }
 
-class Heartbeat::Connector {
+/*
+ * Event driven interface for Heartbeat::Peer to be notified when both hb_front
+ * and hb_back are connected, or connection is lost.
+ */
+class Heartbeat::ConnectionListener {
  public:
-  Connector(size_t connections) : connections{connections} {}
+  ConnectionListener(size_t connections) : connections{connections} {}
 
   void increase_connected() {
     assert(connected < connections);
     ++connected;
     if (connected == connections) {
-      all_connected();
+      on_connected();
     }
   }
   void decrease_connected() {
     assert(connected > 0);
     if (connected == connections) {
-      connection_lost();
+      on_disconnected();
     }
     --connected;
   }
@@ -152,8 +156,8 @@ class Heartbeat::Connector {
   virtual entity_addr_t get_peer_addr(type_t) = 0;
 
  protected:
-  virtual void all_connected() = 0;
-  virtual void connection_lost() = 0;
+  virtual void on_connected() = 0;
+  virtual void on_disconnected() = 0;
 
  private:
   const size_t connections;
@@ -162,18 +166,20 @@ class Heartbeat::Connector {
 
 class Heartbeat::Connection {
  public:
-  using type_t = Connector::type_t;
+  using type_t = ConnectionListener::type_t;
   Connection(osd_id_t peer, bool is_winner_side, type_t type,
-             crimson::net::Messenger& msgr, Connector& connector)
-    : peer{peer}, is_winner_side{is_winner_side}, type{type},
-      msgr{msgr}, connector{connector} {
+             crimson::net::Messenger& msgr,
+             ConnectionListener& listener)
+    : peer{peer}, type{type},
+      msgr{msgr}, listener{listener},
+      is_winner_side{is_winner_side} {
     connect();
   }
   ~Connection();
 
-  bool match(crimson::net::Connection* _conn) const;
-  bool match(crimson::net::ConnectionRef conn) const {
-    return match(conn.get());
+  bool matches(crimson::net::Connection* _conn) const;
+  bool matches(crimson::net::ConnectionRef conn) const {
+    return matches(conn.get());
   }
   void connected() {
     set_connected();
@@ -191,14 +197,42 @@ class Heartbeat::Connection {
   void connect();
 
   const osd_id_t peer;
-  const bool is_winner_side;
   const type_t type;
   crimson::net::Messenger& msgr;
-  Connector& connector;
+  ConnectionListener& listener;
+
+/*
+ * Resolve the following racing when both me and peer are trying to connect
+ * each other symmetrically, under SocketPolicy::lossy_client:
+ *
+ * OSD.A               OSD.B
+ * -                       -
+ * |-[1]---->       <----[2]-|
+ *           \     /
+ *             \ /
+ *    delay..   X   delay..
+ *             / \
+ * |-[1]x>   /     \   <x[2]-|
+ * |<-[2]---         ---[1]->|
+ * |(reset#1)       (reset#2)|
+ * |(reconnectB) (reconnectA)|
+ * |-[2]--->         <---[1]-|
+ *  delay..           delay..
+ *   (remote close populated)
+ * |-[2]x>             <x[1]-|
+ * |(reset#2)       (reset#1)|
+ * | ...                 ... |
+ *         (dead loop!)
+ *
+ * Our solution is to remember if such racing was happened recently, and
+ * establish connection asymmetrically only from the winner side whose osd-id
+ * is larger.
+ */
+  const bool is_winner_side;
+  bool racing_detected = false;
 
   crimson::net::ConnectionRef conn;
   bool is_connected = false;
-  bool racing_detected = false;
 
  friend std::ostream& operator<<(std::ostream& os, const Connection c) {
    if (c.type == type_t::front) {
@@ -209,13 +243,31 @@ class Heartbeat::Connection {
  }
 };
 
+/*
+ * Track the ping history and ping reply (the pong) from the same session, clean up
+ * history once hb_front or hb_back loses connection and restart the session once
+ * both connections are connected again.
+ *
+ * We cannot simply remove the entire Heartbeat::Peer once hb_front or hb_back
+ * loses connection, because we would end up with the following deadloop:
+ *
+ * OSD.A                                   OSD.B
+ * -                                           -
+ * hb_front reset <--(network)--- hb_front close
+ *       |                             ^
+ *       |                             |
+ *  remove Peer B  (dead loop!)   remove Peer A
+ *       |                             |
+ *       V                             |
+ * hb_back close ----(network)---> hb_back reset
+ */
 class Heartbeat::Session {
  public:
   Session(osd_id_t peer) : peer{peer} {}
 
   void set_epoch(epoch_t epoch_) { epoch = epoch_; }
   epoch_t get_epoch() const { return epoch; }
-  bool is_started() const { return started; }
+  bool is_started() const { return connected; }
   bool pinged() const {
     if (clock::is_zero(first_tx)) {
       // i can never receive a pong without sending any ping message first.
@@ -257,23 +309,23 @@ class Heartbeat::Session {
     last_tx = now;
   }
 
-  void start() {
-    assert(!started);
-    started = true;
+  void on_connected() {
+    assert(!connected);
+    connected = true;
     ping_history.clear();
   }
 
-  void emplace_history(const utime_t& sent_stamp,
-                       const clock::time_point& deadline) {
-    assert(started);
+  void on_ping(const utime_t& sent_stamp,
+               const clock::time_point& deadline) {
+    assert(connected);
     [[maybe_unused]] auto [reply, added] =
       ping_history.emplace(sent_stamp, reply_t{deadline, 2});
   }
 
-  bool handle_reply(const utime_t& ping_stamp,
-                    Connection::type_t type,
-                    clock::time_point now) {
-    assert(started);
+  bool on_pong(const utime_t& ping_stamp,
+               Connection::type_t type,
+               clock::time_point now) {
+    assert(connected);
     auto ping = ping_history.find(ping_stamp);
     if (ping == ping_history.end()) {
       // old replies, deprecated by newly sent pings.
@@ -294,9 +346,9 @@ class Heartbeat::Session {
     return true;
   }
 
-  void lost() {
-    assert(started);
-    started = false;
+  void on_disconnected() {
+    assert(connected);
+    connected = false;
     if (!ping_history.empty()) {
       // we lost our ping_history of the last session, but still need to keep
       // the oldest deadline for unhealthy check.
@@ -313,7 +365,7 @@ class Heartbeat::Session {
 
  private:
   const osd_id_t peer;
-  bool started = false;
+  bool connected = false;
   // time we sent our first ping request
   clock::time_point first_tx;
   // last time we sent a ping request
@@ -334,7 +386,7 @@ class Heartbeat::Session {
   std::map<utime_t, reply_t> ping_history;
 };
 
-class Heartbeat::Peer final : private Heartbeat::Connector {
+class Heartbeat::Peer final : private Heartbeat::ConnectionListener {
  public:
   Peer(Heartbeat&, osd_id_t);
   ~Peer();
@@ -355,7 +407,7 @@ class Heartbeat::Peer final : private Heartbeat::Connector {
   seastar::future<> handle_reply(crimson::net::Connection*, Ref<MOSDPing>);
   void handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
     for_each_conn([&] (auto& _conn) {
-      if (_conn.match(conn)) {
+      if (_conn.matches(conn)) {
         if (is_replace) {
           _conn.replaced();
         } else {
@@ -366,7 +418,7 @@ class Heartbeat::Peer final : private Heartbeat::Connector {
   }
   void handle_connect(crimson::net::ConnectionRef conn) {
     for_each_conn([&] (auto& _conn) {
-      if (_conn.match(conn)) {
+      if (_conn.matches(conn)) {
         _conn.connected();
       }
     });
@@ -379,8 +431,8 @@ class Heartbeat::Peer final : private Heartbeat::Connector {
 
  private:
   entity_addr_t get_peer_addr(type_t type) override;
-  void all_connected() override;
-  void connection_lost() override;
+  void on_connected() override;
+  void on_disconnected() override;
   void do_send_heartbeat(
       clock::time_point, ceph::signedspan, std::vector<seastar::future<>>*);
 
@@ -393,7 +445,7 @@ class Heartbeat::Peer final : private Heartbeat::Connector {
   Heartbeat& heartbeat;
   const osd_id_t peer;
   Session session;
-  // if need to send heartbeat when session started
+  // if need to send heartbeat when session connected
   bool pending_send = false;
   Connection con_front;
   Connection con_back;