]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/heartbeat: relax the order of replacement reset and accept 51916/head
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 29 Jun 2023 05:25:28 +0000 (13:25 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Thu, 29 Jun 2023 06:27:58 +0000 (14:27 +0800)
With the new implementation in messenger, the order of replacement reset
and accept events cannot be determined because they are from different
connections.

Modify the heatbeat logic to tolerate the both cases.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Dispatcher.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/chained_dispatchers.cc
src/crimson/net/chained_dispatchers.h
src/crimson/net/io_handler.cc
src/crimson/net/io_handler.h
src/crimson/osd/heartbeat.cc
src/crimson/osd/heartbeat.h
src/test/crimson/test_messenger.cc
src/test/crimson/test_messenger_thrash.cc

index c563b5e266f2e607a043e440456ab4a297821a74..11908349e7cd1ce19ca8f42706bf1cfa4b44597d 100644 (file)
@@ -32,13 +32,18 @@ class Dispatcher {
 
   // The connection is accepted or recoverred(lossless), all the followup
   // events and messages will be dispatched to the new_shard.
-  virtual void ms_handle_accept(ConnectionRef conn, seastar::shard_id new_shard) {}
+  //
+  // is_replace=true means the accepted connection has replaced
+  // another connecting connection with the same peer_addr, which currently only
+  // happens under lossy policy when both sides wish to connect to each other.
+  virtual void ms_handle_accept(ConnectionRef conn, seastar::shard_id new_shard, bool is_replace) {}
 
   // The connection is (re)connected, all the followup events and messages will
   // be dispatched to the new_shard.
   virtual void ms_handle_connect(ConnectionRef conn, seastar::shard_id new_shard) {}
 
   // a reset event is dispatched when the connection is closed unexpectedly.
+  //
   // is_replace=true means the reset connection is going to be replaced by
   // another accepting connection with the same peer_addr, which currently only
   // happens under lossy policy when both sides wish to connect to each other.
index f6a235d4af3fe77039f487c72af77e13255722f4..869c11cf5a553d6696f11fdb14f8e65c7265d018 100644 (file)
@@ -1760,6 +1760,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
 
   ceph_assert_always(is_socket_valid);
   trigger_state(state_t::ESTABLISHING, io_state_t::delay);
+  bool is_replace;
   if (existing_conn) {
     logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
                   "client_cookie={}, server_cookie={}, {}, new_sid={}, "
@@ -1768,6 +1769,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
                   client_cookie, server_cookie,
                   io_states, frame_assembler->get_socket_shard_id(),
                   *existing_conn);
+    is_replace = true;
     ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
         existing_conn->protocol.get());
     existing_proto->do_close(
@@ -1786,10 +1788,11 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
                   conn, global_seq, peer_global_seq, connect_seq,
                   client_cookie, server_cookie, io_states,
                   frame_assembler->get_socket_shard_id());
+    is_replace = false;
     accept_me();
   }
 
-  gated_execute("execute_establishing", conn, [this] {
+  gated_execute("execute_establishing", conn, [this, is_replace] {
     ceph_assert_always(state == state_t::ESTABLISHING);
 
     // set io_handler to a new shard
@@ -1803,10 +1806,10 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
     pr_switch_io_shard = seastar::shared_promise<>();
     return seastar::smp::submit_to(
         io_handler.get_shard_id(),
-        [this, cc_seq, new_io_shard,
+        [this, cc_seq, new_io_shard, is_replace,
          conn_fref=std::move(conn_fref)]() mutable {
       return io_handler.dispatch_accept(
-          cc_seq, new_io_shard, std::move(conn_fref));
+          cc_seq, new_io_shard, std::move(conn_fref), is_replace);
     }).then([this, new_io_shard] {
       ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
       pr_switch_io_shard->set_value();
@@ -1976,7 +1979,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
           [this, cc_seq, new_io_shard,
            conn_fref=std::move(conn_fref)]() mutable {
         return io_handler.dispatch_accept(
-            cc_seq, new_io_shard, std::move(conn_fref));
+            cc_seq, new_io_shard, std::move(conn_fref), false);
       }).then([this, new_io_shard] {
         ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
         pr_switch_io_shard->set_value();
index 2656c0e57492bc207747a2ae0b1c15069ef8a9d0..dfff6d916fa6dacea20daab6ed28694743bbf6a1 100644 (file)
@@ -41,10 +41,11 @@ ChainedDispatchers::ms_dispatch(crimson::net::ConnectionRef conn,
 void
 ChainedDispatchers::ms_handle_accept(
     crimson::net::ConnectionRef conn,
-    seastar::shard_id new_shard) {
+    seastar::shard_id new_shard,
+    bool is_replace) {
   try {
     for (auto& dispatcher : dispatchers) {
-      dispatcher->ms_handle_accept(conn, new_shard);
+      dispatcher->ms_handle_accept(conn, new_shard, is_replace);
     }
   } catch (...) {
     logger().error("{} got unexpected exception in ms_handle_accept() {}",
index 40356e9d473678be1ce0e46b88b8930704f6a424..5835205119d8d24a0fa20efccb9a335fa883e3fa 100644 (file)
@@ -26,7 +26,7 @@ public:
     return dispatchers.empty();
   }
   seastar::future<> ms_dispatch(crimson::net::ConnectionRef, MessageRef);
-  void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id);
+  void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id, bool is_replace);
   void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id);
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace);
   void ms_handle_remote_reset(crimson::net::ConnectionRef conn);
index b138ed0f26e0efd9b2da0aa30d1cb2805a1bb8fb..abb7f5e467346b7c509e26677ae16bfc241df58c 100644 (file)
@@ -558,21 +558,22 @@ seastar::future<>
 IOHandler::dispatch_accept(
     crosscore_t::seq_t cc_seq,
     seastar::shard_id new_sid,
-    ConnectionFRef conn_fref)
+    ConnectionFRef conn_fref,
+    bool is_replace)
 {
   ceph_assert_always(seastar::this_shard_id() == get_shard_id());
   if (!crosscore.proceed_or_wait(cc_seq)) {
     logger().debug("{} got {} dispatch_accept(), wait at {}",
                    conn, cc_seq, crosscore.get_in_seq());
     return crosscore.wait(cc_seq
-    ).then([this, cc_seq, new_sid,
+    ).then([this, cc_seq, new_sid, is_replace,
             conn_fref=std::move(conn_fref)]() mutable {
-      return dispatch_accept(cc_seq, new_sid, std::move(conn_fref));
+      return dispatch_accept(cc_seq, new_sid, std::move(conn_fref), is_replace);
     });
   }
 
-  logger().debug("{} got {} dispatch_accept({}) at {}",
-                 conn, cc_seq, new_sid, io_stat_printer{*this});
+  logger().debug("{} got {} dispatch_accept(new_sid={}, replace={}) at {}",
+                 conn, cc_seq, new_sid, is_replace, io_stat_printer{*this});
   if (get_io_state() == io_state_t::drop) {
     assert(!protocol_is_connected);
     // it is possible that both io_handler and protocolv2 are
@@ -586,7 +587,7 @@ IOHandler::dispatch_accept(
   auto _conn_ref = conn_ref;
   auto fut = to_new_sid(new_sid, std::move(conn_fref));
 
-  dispatchers.ms_handle_accept(_conn_ref, new_sid);
+  dispatchers.ms_handle_accept(_conn_ref, new_sid, is_replace);
   // user can make changes
 
   return fut;
index 0175d2c522c98f22526cc4f903083e65c0701898..edb69b3407afa46e9ab2506d3d8920dd5759a59c 100644 (file)
@@ -279,7 +279,8 @@ public:
   seastar::future<> dispatch_accept(
       crosscore_t::seq_t cc_seq,
       seastar::shard_id new_sid,
-      ConnectionFRef);
+      ConnectionFRef,
+      bool is_replace);
 
   seastar::future<> dispatch_connect(
       crosscore_t::seq_t cc_seq,
index 30de528291ab710ddf649bba6d60802eaec559d0..cdee52731e644a3e211554404aff1a7f497e086a 100644 (file)
@@ -254,7 +254,8 @@ void Heartbeat::ms_handle_connect(
 
 void Heartbeat::ms_handle_accept(
     crimson::net::ConnectionRef conn,
-    seastar::shard_id new_shard)
+    seastar::shard_id new_shard,
+    bool is_replace)
 {
   ceph_assert_always(seastar::this_shard_id() == new_shard);
   auto peer = conn->get_peer_id();
@@ -264,7 +265,7 @@ void Heartbeat::ms_handle_accept(
   }
   if (auto found = peers.find(peer);
       found != peers.end()) {
-    found->second.handle_accept(conn);
+    found->second.handle_accept(conn, is_replace);
   }
 }
 
@@ -433,42 +434,57 @@ bool Heartbeat::Connection::matches(crimson::net::ConnectionRef _conn) const
   return (conn && conn == _conn);
 }
 
-void Heartbeat::Connection::accepted(crimson::net::ConnectionRef accepted_conn)
+bool Heartbeat::Connection::accepted(
+    crimson::net::ConnectionRef accepted_conn,
+    bool is_replace)
 {
-  if (!conn) {
-    if (accepted_conn->get_peer_addr() == listener.get_peer_addr(type)) {
-      logger().info("Heartbeat::Connection::accepted(): "
-                    "{} racing resolved", *this);
-      conn = accepted_conn;
-      set_connected();
+  ceph_assert(accepted_conn);
+  ceph_assert(accepted_conn != conn);
+  if (accepted_conn->get_peer_addr() != listener.get_peer_addr(type)) {
+    return false;
+  }
+
+  if (is_replace) {
+    logger().info("Heartbeat::Connection::accepted(): "
+                  "{} racing", *this);
+    racing_detected = true;
+  }
+  if (conn) {
+    // there is no assumption about the ordering of the reset and accept
+    // events for the 2 racing connections.
+    if (is_connected) {
+      logger().warn("Heartbeat::Connection::accepted(): "
+                    "{} is accepted while connected, is_replace={}",
+                    *this, is_replace);
+      conn->mark_down();
+      set_unconnected();
     }
-  } else if (conn == accepted_conn) {
-    set_connected();
   }
+  conn = accepted_conn;
+  set_connected();
+  return true;
 }
 
-void Heartbeat::Connection::replaced()
+void Heartbeat::Connection::reset(bool is_replace)
 {
-  assert(!is_connected);
-  auto replaced_conn = conn;
-  // set the racing connection, will be handled by handle_accept()
-  conn = msgr.connect(replaced_conn->get_peer_addr(),
-                      replaced_conn->get_peer_name());
-  racing_detected = true;
-  logger().warn("Heartbeat::Connection::replaced(): {} racing", *this);
-  assert(conn != replaced_conn);
-}
+  if (is_replace) {
+    logger().info("Heartbeat::Connection::reset(): "
+                  "{} racing, waiting for the replacing accept",
+                  *this);
+    racing_detected = true;
+  }
 
-void Heartbeat::Connection::reset()
-{
-  conn = nullptr;
   if (is_connected) {
-    is_connected = false;
-    listener.decrease_connected();
+    set_unconnected();
+  } else {
+    conn = nullptr;
   }
-  if (!racing_detected || is_winner_side) {
+
+  if (is_replace) {
+    // waiting for the replacing accept event
+  } else if (!racing_detected || is_winner_side) {
     connect();
-  } else {
+  } else { // racing_detected && !is_winner_side
     logger().info("Heartbeat::Connection::reset(): "
                   "{} racing detected and lose, "
                   "waiting for peer connect me", *this);
@@ -510,11 +526,22 @@ void Heartbeat::Connection::retry()
 
 void Heartbeat::Connection::set_connected()
 {
+  assert(conn);
   assert(!is_connected);
+  ceph_assert(conn->is_connected());
   is_connected = true;
   listener.increase_connected();
 }
 
+void Heartbeat::Connection::set_unconnected()
+{
+  assert(conn);
+  assert(is_connected);
+  conn = nullptr;
+  is_connected = false;
+  listener.decrease_connected();
+}
+
 void Heartbeat::Connection::connect()
 {
   assert(!conn);
@@ -604,6 +631,64 @@ void Heartbeat::Peer::send_heartbeat(
   }
 }
 
+void Heartbeat::Peer::handle_reset(
+    crimson::net::ConnectionRef conn, bool is_replace)
+{
+  int cnt = 0;
+  for_each_conn([&] (auto& _conn) {
+    if (_conn.matches(conn)) {
+      ++cnt;
+      _conn.reset(is_replace);
+    }
+  });
+
+  if (cnt == 0) {
+    logger().info("Heartbeat::Peer::handle_reset(): {} ignores conn, is_replace={} -- {}",
+                  *this, is_replace, *conn);
+  } else if (cnt > 1) {
+    logger().error("Heartbeat::Peer::handle_reset(): {} handles conn {} times -- {}",
+                  *this, cnt, *conn);
+  }
+}
+
+void Heartbeat::Peer::handle_connect(crimson::net::ConnectionRef conn)
+{
+  int cnt = 0;
+  for_each_conn([&] (auto& _conn) {
+    if (_conn.matches(conn)) {
+      ++cnt;
+      _conn.connected();
+    }
+  });
+
+  if (cnt == 0) {
+    logger().error("Heartbeat::Peer::handle_connect(): {} ignores conn -- {}",
+                   *this, *conn);
+    conn->mark_down();
+  } else if (cnt > 1) {
+    logger().error("Heartbeat::Peer::handle_connect(): {} handles conn {} times -- {}",
+                  *this, cnt, *conn);
+  }
+}
+
+void Heartbeat::Peer::handle_accept(crimson::net::ConnectionRef conn, bool is_replace)
+{
+  int cnt = 0;
+  for_each_conn([&] (auto& _conn) {
+    if (_conn.accepted(conn, is_replace)) {
+      ++cnt;
+    }
+  });
+
+  if (cnt == 0) {
+    logger().warn("Heartbeat::Peer::handle_accept(): {} ignores conn -- {}",
+                  *this, *conn);
+  } else if (cnt > 1) {
+    logger().error("Heartbeat::Peer::handle_accept(): {} handles conn {} times -- {}",
+                  *this, cnt, *conn);
+  }
+}
+
 seastar::future<> Heartbeat::Peer::handle_reply(
     crimson::net::ConnectionRef conn, Ref<MOSDPing> m)
 {
index c5bf8f0ded2322df8682b755a7aa07542475857b..f5da451181e91b7aa579376cae24aa77b243db15 100644 (file)
@@ -53,7 +53,7 @@ public:
       crimson::net::ConnectionRef conn, MessageRef m) override;
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
   void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id) override;
-  void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id) override;
+  void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id, bool is_replace) override;
 
   void print(std::ostream&) const;
 private:
@@ -189,9 +189,8 @@ class Heartbeat::Connection {
   void connected() {
     set_connected();
   }
-  void accepted(crimson::net::ConnectionRef);
-  void replaced();
-  void reset();
+  bool accepted(crimson::net::ConnectionRef, bool is_replace);
+  void reset(bool is_replace=false);
   seastar::future<> send(MessageURef msg);
   void validate();
   // retry connection if still pending
@@ -199,6 +198,7 @@ class Heartbeat::Connection {
 
  private:
   void set_connected();
+  void set_unconnected();
   void connect();
 
   const osd_id_t peer;
@@ -239,19 +239,15 @@ class Heartbeat::Connection {
   crimson::net::ConnectionRef conn;
   bool is_connected = false;
 
- friend std::ostream& operator<<(std::ostream& os, const Connection& c) {
-   if (c.type == type_t::front) {
-     return os << "con_front(osd." << c.peer << ")";
-   } else {
-     return os << "con_back(osd." << c.peer << ")";
-   }
- }
 friend std::ostream& operator<<(std::ostream& os, const Connection& c) {
+    if (c.type == type_t::front) {
+      return os << "con_front(osd." << c.peer << ")";
+    } else {
+      return os << "con_back(osd." << c.peer << ")";
+    }
 }
 };
 
-#if FMT_VERSION >= 90000
-template <> struct fmt::formatter<Heartbeat::Connection> : fmt::ostream_formatter {};
-#endif
-
 /*
  * Track the ping history and ping reply (the pong) from the same session, clean up
  * history once hb_front or hb_back loses connection and restart the session once
@@ -425,29 +421,12 @@ class Heartbeat::Peer final : private Heartbeat::ConnectionListener {
   void send_heartbeat(
       clock::time_point, ceph::signedspan, std::vector<seastar::future<>>&);
   seastar::future<> handle_reply(crimson::net::ConnectionRef, Ref<MOSDPing>);
-  void handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
-    for_each_conn([&] (auto& _conn) {
-      if (_conn.matches(conn)) {
-        if (is_replace) {
-          _conn.replaced();
-        } else {
-          _conn.reset();
-        }
-      }
-    });
-  }
-  void handle_connect(crimson::net::ConnectionRef conn) {
-    for_each_conn([&] (auto& _conn) {
-      if (_conn.matches(conn)) {
-        _conn.connected();
-      }
-    });
-  }
-  void handle_accept(crimson::net::ConnectionRef conn) {
-    for_each_conn([&] (auto& _conn) {
-      _conn.accepted(conn);
-    });
-  }
+
+  void handle_reset(crimson::net::ConnectionRef conn, bool is_replace);
+
+  void handle_connect(crimson::net::ConnectionRef conn);
+
+  void handle_accept(crimson::net::ConnectionRef conn, bool is_replace);
 
  private:
   entity_addr_t get_peer_addr(type_t type) override;
@@ -469,8 +448,14 @@ class Heartbeat::Peer final : private Heartbeat::ConnectionListener {
   bool pending_send = false;
   Connection con_front;
   Connection con_back;
+
+  friend std::ostream& operator<<(std::ostream& os, const Peer& p) {
+    return os << "peer(osd." << p.peer << ")";
+  }
 };
 
 #if FMT_VERSION >= 90000
 template <> struct fmt::formatter<Heartbeat> : fmt::ostream_formatter {};
+template <> struct fmt::formatter<Heartbeat::Connection> : fmt::ostream_formatter {};
+template <> struct fmt::formatter<Heartbeat::Peer> : fmt::ostream_formatter {};
 #endif
index 60b5439b228a89465bc4c8d2d08dbb42cca4939b..bba62cc7974d436c5a485c7419635cbfe7f7e3d7 100644 (file)
@@ -862,7 +862,8 @@ class FailoverSuite : public Dispatcher {
 
   void ms_handle_accept(
       ConnectionRef conn,
-      seastar::shard_id new_shard) override {
+      seastar::shard_id new_shard,
+      bool is_replace) override {
     assert(new_shard == seastar::this_shard_id());
     auto result = interceptor.find_result(conn);
     if (result == nullptr) {
@@ -1457,7 +1458,8 @@ class FailoverSuitePeer : public Dispatcher {
 
   void ms_handle_accept(
       ConnectionRef conn,
-      seastar::shard_id new_shard) override {
+      seastar::shard_id new_shard,
+      bool is_replace) override {
     assert(new_shard == seastar::this_shard_id());
     logger().info("[TestPeer] got accept from Test");
     ceph_assert(!tracked_conn ||
@@ -1616,7 +1618,8 @@ class FailoverTestPeer : public Dispatcher {
 
   void ms_handle_accept(
       ConnectionRef conn,
-      seastar::shard_id new_shard) override {
+      seastar::shard_id new_shard,
+      bool is_replace) override {
     assert(new_shard == seastar::this_shard_id());
     cmd_conn = conn;
   }
index 3b41fe16e7ded18ebf4a59dfe87b6779b373bcba..7c26d6ffdec672dfc6b31ecf0cb2203cc683db99 100644 (file)
@@ -105,7 +105,7 @@ class SyntheticDispatcher final
   }
 
   std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef con,
-                                               MessageRef m) {
+                                               MessageRef m) final {
     if (verbose) {
       logger().warn("{}: con = {}", __func__, *con);
     }
@@ -136,21 +136,22 @@ class SyntheticDispatcher final
 
   void ms_handle_accept(
       crimson::net::ConnectionRef conn,
-      seastar::shard_id new_shard) {
+      seastar::shard_id new_shard,
+      bool is_replace) final {
     logger().info("{} - Connection:{}", __func__, *conn);
     assert(new_shard == seastar::this_shard_id());
   }
 
   void ms_handle_connect(
       crimson::net::ConnectionRef conn,
-      seastar::shard_id new_shard) {
+      seastar::shard_id new_shard) final {
     logger().info("{} - Connection:{}", __func__, *conn);
     assert(new_shard == seastar::this_shard_id());
   }
 
-  void ms_handle_reset(crimson::net::ConnectionRef con, bool is_replace);
+  void ms_handle_reset(crimson::net::ConnectionRef con, bool is_replace) final;
 
-  void ms_handle_remote_reset(crimson::net::ConnectionRef con) {
+  void ms_handle_remote_reset(crimson::net::ConnectionRef con) final {
     clear_pending(con);
   }