]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/crimson/test_messenger: move all connection state checks to the protocol level
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 21 Jul 2023 03:17:32 +0000 (11:17 +0800)
committerMatan Breizman <mbreizma@redhat.com>
Thu, 19 Oct 2023 07:15:14 +0000 (07:15 +0000)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
(cherry picked from commit 933745cd815f7cc81438a89b3ba9ac5fc2ef61b7)

src/crimson/net/Connection.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/io_handler.cc
src/test/crimson/test_messenger.cc

index 4339e41f0c2dbd8f77ebe8efe3a1fffeb87de26d..7141e20f476df7a1461e854d0744142d4a24cb28 100644 (file)
@@ -121,9 +121,13 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
   virtual void print(std::ostream& out) const = 0;
 
 #ifdef UNIT_TESTS_BUILT
-  virtual bool is_closed() const = 0;
+  virtual bool is_protocol_ready() const = 0;
 
-  virtual bool is_closed_clean() const = 0;
+  virtual bool is_protocol_standby() const = 0;
+
+  virtual bool is_protocol_closed() const = 0;
+
+  virtual bool is_protocol_closed_clean() const = 0;
 
   virtual bool peer_wins() const = 0;
 #endif
index 045022b353cd8fcf15ec5252f909cc641266b641..8bb9f7b6821831a0028d7f3f4e4322b1c7508389 100644 (file)
@@ -2112,6 +2112,13 @@ void ProtocolV2::execute_ready()
   // I'm not responsible to shutdown the socket at READY
   is_socket_valid = false;
   trigger_state(state_t::READY, io_state_t::open);
+#ifdef UNIT_TESTS_BUILT
+  if (conn.interceptor) {
+    // FIXME: doesn't support cross-core
+    conn.interceptor->register_conn_ready(
+        conn.get_local_shared_foreign_from_this());
+  }
+#endif
 }
 
 // STANDBY state
index a9aa4ecdf76a708590931aad1b1e5add1d1530c2..dd7a1e7039b519e0c06dbd2a9feae13605de5393 100644 (file)
@@ -53,6 +53,14 @@ public:
   seastar::future<> close_clean_yielded();
 
 #ifdef UNIT_TESTS_BUILT
+  bool is_ready() const {
+    return state == state_t::READY;
+  }
+
+  bool is_standby() const {
+    return state == state_t::STANDBY;
+  }
+
   bool is_closed_clean() const {
     return closed_clean;
   }
index 908977da36b519d4e2d23cea658628e863568da6..57e5c12c1aed433e89df4c3d4b7c15348b226e08 100644 (file)
@@ -49,13 +49,24 @@ bool SocketConnection::is_connected() const
 }
 
 #ifdef UNIT_TESTS_BUILT
-bool SocketConnection::is_closed() const
+bool SocketConnection::is_protocol_ready() const
+{
+  assert(seastar::this_shard_id() == msgr_sid);
+  return protocol->is_ready();
+}
+
+bool SocketConnection::is_protocol_standby() const {
+  assert(seastar::this_shard_id() == msgr_sid);
+  return protocol->is_standby();
+}
+
+bool SocketConnection::is_protocol_closed() const
 {
   assert(seastar::this_shard_id() == msgr_sid);
   return protocol->is_closed();
 }
 
-bool SocketConnection::is_closed_clean() const
+bool SocketConnection::is_protocol_closed_clean() const
 {
   assert(seastar::this_shard_id() == msgr_sid);
   return protocol->is_closed_clean();
index 102e179583127a0a1224db14409a24a828081d2b..823d6c574dad7b8f7962ffe68cbe500c0b9c443c 100644 (file)
@@ -171,9 +171,13 @@ private:
   void set_socket(Socket *s);
 
 #ifdef UNIT_TESTS_BUILT
-  bool is_closed_clean() const override;
+  bool is_protocol_ready() const override;
 
-  bool is_closed() const override;
+  bool is_protocol_standby() const override;
+
+  bool is_protocol_closed_clean() const override;
+
+  bool is_protocol_closed() const override;
 
   // peer wins if myaddr > peeraddr
   bool peer_wins() const override;
index 8b774e678b7912291b4dc678cfaa2c59b6444c02..15d5509dc160af8e8325b1c8897e507479388718 100644 (file)
@@ -293,13 +293,6 @@ void IOHandler::do_set_io_state(
     ceph_assert_always(protocol_is_connected == true);
     assign_frame_assembler(std::move(fa));
     dispatch_in = true;
-#ifdef UNIT_TESTS_BUILT
-    if (conn.interceptor) {
-      // FIXME: doesn't support cross-core
-      conn.interceptor->register_conn_ready(
-          conn.get_local_shared_foreign_from_this());
-    }
-#endif
   } else if (prv_state == io_state_t::open) {
     // from open
     ceph_assert_always(protocol_is_connected == true);
index 8f55412c9af445a0acf5c4cca1ea8e5ebd0e5a66..4dbb90bceb970752a3abade28008fa58dfe0c44e 100644 (file)
@@ -668,7 +668,7 @@ using ConnResults = std::vector<ConnResult>;
 struct TestInterceptor : public Interceptor {
   std::map<Breakpoint, std::map<unsigned, bp_action_t>> breakpoints;
   std::map<Breakpoint, counter_t> breakpoints_counter;
-  std::map<ConnectionRef, unsigned> conns;
+  std::map<Connection*, unsigned> conns;
   ConnResults results;
   std::optional<seastar::abort_source> signal;
 
@@ -697,7 +697,7 @@ struct TestInterceptor : public Interceptor {
     breakpoints[bp][round] = bp_action_t::STALL;
   }
 
-  ConnResult* find_result(ConnectionRef conn) {
+  ConnResult* find_result(Connection *conn) {
     auto it = conns.find(conn);
     if (it == conns.end()) {
       return nullptr;
@@ -725,7 +725,7 @@ struct TestInterceptor : public Interceptor {
 
  private:
   void register_conn(ConnectionRef conn) override {
-    auto result = find_result(conn);
+    auto result = find_result(&*conn);
     if (result != nullptr) {
       logger().error("The connection [{}] {} already exists when register {}",
                      result->index, *result->conn, *conn);
@@ -733,13 +733,13 @@ struct TestInterceptor : public Interceptor {
     }
     unsigned index = results.size();
     results.emplace_back(conn, index);
-    conns[conn] = index;
+    conns[&*conn] = index;
     notify();
     logger().info("[{}] {} new connection registered", index, *conn);
   }
 
   void register_conn_closed(ConnectionRef conn) override {
-    auto result = find_result(conn);
+    auto result = find_result(&*conn);
     if (result == nullptr) {
       logger().error("Untracked closed connection: {}", *conn);
       ceph_abort();
@@ -753,19 +753,19 @@ struct TestInterceptor : public Interceptor {
   }
 
   void register_conn_ready(ConnectionRef conn) override {
-    auto result = find_result(conn);
+    auto result = find_result(&*conn);
     if (result == nullptr) {
       logger().error("Untracked ready connection: {}", *conn);
       ceph_abort();
     }
 
-    ceph_assert(conn->is_connected());
+    ceph_assert(conn->is_protocol_ready());
     notify();
     logger().info("[{}] {} ready", result->index, *conn);
   }
 
   void register_conn_replaced(ConnectionRef conn) override {
-    auto result = find_result(conn);
+    auto result = find_result(&*conn);
     if (result == nullptr) {
       logger().error("Untracked replaced connection: {}", *conn);
       ceph_abort();
@@ -778,7 +778,7 @@ struct TestInterceptor : public Interceptor {
   bp_action_t intercept(ConnectionRef conn, Breakpoint bp) override {
     ++breakpoints_counter[bp].counter;
 
-    auto result = find_result(conn);
+    auto result = find_result(&*conn);
     if (result == nullptr) {
       logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
                      *conn, bp, breakpoints_counter[bp].counter);
@@ -848,19 +848,19 @@ class FailoverSuite : public Dispatcher {
   TestInterceptor interceptor;
 
   unsigned tracked_index = 0;
-  ConnectionRef tracked_conn;
+  Connection *tracked_conn;
   unsigned pending_send = 0;
   unsigned pending_peer_receive = 0;
   unsigned pending_receive = 0;
 
   std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
-    auto result = interceptor.find_result(c);
+    auto result = interceptor.find_result(&*c);
     if (result == nullptr) {
       logger().error("Untracked ms dispatched connection: {}", *c);
       ceph_abort();
     }
 
-    if (tracked_conn != c) {
+    if (tracked_conn != &*c) {
       logger().error("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
                      result->index, *c, tracked_index, *tracked_conn);
       ceph_abort();
@@ -883,14 +883,14 @@ class FailoverSuite : public Dispatcher {
       seastar::shard_id prv_shard,
       bool is_replace) override {
     assert(prv_shard == seastar::this_shard_id());
-    auto result = interceptor.find_result(conn);
+    auto result = interceptor.find_result(&*conn);
     if (result == nullptr) {
       logger().error("Untracked accepted connection: {}", *conn);
       ceph_abort();
     }
 
     if (tracked_conn &&
-        !tracked_conn->is_closed() &&
+        !tracked_conn->is_protocol_closed() &&
         tracked_conn != conn) {
       logger().error("[{}] {} got accepted, but there's already traced_conn [{}] {}",
                      result->index, *conn, tracked_index, *tracked_conn);
@@ -909,7 +909,7 @@ class FailoverSuite : public Dispatcher {
       ConnectionRef conn,
       seastar::shard_id prv_shard) override {
     assert(prv_shard == seastar::this_shard_id());
-    auto result = interceptor.find_result(conn);
+    auto result = interceptor.find_result(&*conn);
     if (result == nullptr) {
       logger().error("Untracked connected connection: {}", *conn);
       ceph_abort();
@@ -928,7 +928,7 @@ class FailoverSuite : public Dispatcher {
   }
 
   void ms_handle_reset(ConnectionRef conn, bool is_replace) override {
-    auto result = interceptor.find_result(conn);
+    auto result = interceptor.find_result(&*conn);
     if (result == nullptr) {
       logger().error("Untracked reset connection: {}", *conn);
       ceph_abort();
@@ -949,7 +949,7 @@ class FailoverSuite : public Dispatcher {
   }
 
   void ms_handle_remote_reset(ConnectionRef conn) override {
-    auto result = interceptor.find_result(conn);
+    auto result = interceptor.find_result(&*conn);
     if (result == nullptr) {
       logger().error("Untracked remotely reset connection: {}", *conn);
       ceph_abort();
@@ -1015,11 +1015,11 @@ class FailoverSuite : public Dispatcher {
     unsigned pending_establish = 0;
     unsigned replaced_conns = 0;
     for (auto& result : interceptor.results) {
-      if (result.conn->is_closed_clean()) {
+      if (result.conn->is_protocol_closed_clean()) {
         if (result.state == conn_state_t::replaced) {
           ++replaced_conns;
         }
-      } else if (result.conn->is_connected()) {
+      } else if (result.conn->is_protocol_ready()) {
         if (tracked_conn != result.conn || tracked_index != result.index) {
           throw std::runtime_error(fmt::format(
                 "The connected connection [{}] {} doesn't"
@@ -1049,15 +1049,22 @@ class FailoverSuite : public Dispatcher {
         do_wait = true;
       }
     }
-    if (wait_received &&
-        (pending_send || pending_peer_receive || pending_receive)) {
-      if (pending_conns || pending_establish) {
-        logger().info("[Test] wait_ready(): wait for pending_send={},"
-                      " pending_peer_receive={}, pending_receive={},"
-                      " pending {}/{} ready/establish connections ...",
-                      pending_send, pending_peer_receive, pending_receive,
-                      pending_conns, pending_establish);
-        do_wait = true;
+    if (wait_received) {
+      if (pending_send || pending_peer_receive || pending_receive) {
+        if (pending_conns || pending_establish) {
+          logger().info("[Test] wait_ready(): wait for pending_send={},"
+                        " pending_peer_receive={}, pending_receive={},"
+                        " pending {}/{} ready/establish connections ...",
+                        pending_send, pending_peer_receive, pending_receive,
+                        pending_conns, pending_establish);
+          do_wait = true;
+        } else {
+          // If there are pending messages, stop waiting if there are
+          // no longer pending connections.
+        }
+      } else {
+         // Stop waiting if there are no pending messages. Pending connections
+         // should not be important.
       }
     }
     if (num_replaced > 0) {
@@ -1162,11 +1169,11 @@ class FailoverSuite : public Dispatcher {
   seastar::future<> connect_peer() {
     logger().info("[Test] connect_peer({})", test_peer_addr);
     auto conn = test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD);
-    auto result = interceptor.find_result(conn);
+    auto result = interceptor.find_result(&*conn);
     ceph_assert(result != nullptr);
 
     if (tracked_conn) {
-      if (tracked_conn->is_closed()) {
+      if (tracked_conn->is_protocol_closed()) {
         ceph_assert(tracked_conn != conn);
         logger().info("[Test] this is a new session replacing an closed one");
       } else {
@@ -1247,7 +1254,7 @@ class FailoverSuite : public Dispatcher {
 
   bool is_standby() {
     ceph_assert(tracked_conn);
-    return !(tracked_conn->is_connected() || tracked_conn->is_closed());
+    return tracked_conn->is_protocol_standby();
   }
 };
 
@@ -1481,7 +1488,7 @@ class FailoverSuitePeer : public Dispatcher {
     assert(prv_shard == seastar::this_shard_id());
     logger().info("[TestPeer] got accept from Test");
     ceph_assert(!tracked_conn ||
-                tracked_conn->is_closed() ||
+                tracked_conn->is_protocol_closed() ||
                 tracked_conn == conn);
     tracked_conn = conn;
     std::ignore = flush_pending_send();
@@ -1543,7 +1550,7 @@ class FailoverSuitePeer : public Dispatcher {
     logger().info("[TestPeer] connect_peer({})", test_addr_decoded);
     auto new_tracked_conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD);
     if (tracked_conn) {
-      if (tracked_conn->is_closed()) {
+      if (tracked_conn->is_protocol_closed()) {
         ceph_assert(tracked_conn != new_tracked_conn);
         logger().info("[TestPeer] this is a new session"
                       " replacing an closed one");