]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/crimson/test_messenger: relax the ordering checks to tracked_conn
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 26 Jul 2023 04:52:57 +0000 (12:52 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 9 Aug 2023 07:30:53 +0000 (15:30 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/test/crimson/test_messenger.cc

index 4dbb90bceb970752a3abade28008fa58dfe0c44e..8b1ca07edb9206a57e32a7376a77cb5f4f84ac26 100644 (file)
@@ -848,24 +848,24 @@ class FailoverSuite : public Dispatcher {
   TestInterceptor interceptor;
 
   unsigned tracked_index = 0;
-  Connection *tracked_conn;
+  Connection *tracked_conn = nullptr;
   unsigned pending_send = 0;
   unsigned pending_peer_receive = 0;
   unsigned pending_receive = 0;
 
-  std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
-    auto result = interceptor.find_result(&*c);
+  std::optional<seastar::future<>> ms_dispatch(ConnectionRef conn, MessageRef m) override {
+    auto result = interceptor.find_result(&*conn);
     if (result == nullptr) {
-      logger().error("Untracked ms dispatched connection: {}", *c);
+      logger().error("Untracked ms dispatched connection: {}", *conn);
       ceph_abort();
     }
 
-    if (tracked_conn != &*c) {
-      logger().error("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
-                     result->index, *c, tracked_index, *tracked_conn);
-      ceph_abort();
+    if (tracked_conn != &*conn) {
+      logger().warn("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
+                    result->index, *conn, tracked_index, *tracked_conn);
+    } else {
+      ceph_assert(result->index == tracked_index);
     }
-    ceph_assert(result->index == tracked_index);
 
     ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
     ceph_assert(pending_receive > 0);
@@ -874,7 +874,7 @@ class FailoverSuite : public Dispatcher {
       interceptor.notify();
     }
     logger().info("[Test] got op, left {} ops -- [{}] {}",
-                  pending_receive, result->index, *c);
+                  pending_receive, result->index, *conn);
     return {seastar::now()};
   }
 
@@ -891,14 +891,14 @@ class FailoverSuite : public Dispatcher {
 
     if (tracked_conn &&
         !tracked_conn->is_protocol_closed() &&
-        tracked_conn != conn) {
-      logger().error("[{}] {} got accepted, but there's already traced_conn [{}] {}",
+        tracked_conn != &*conn) {
+      logger().error("[{}] {} got accepted, but there's already a valid traced_conn [{}] {}",
                      result->index, *conn, tracked_index, *tracked_conn);
       ceph_abort();
     }
 
     tracked_index = result->index;
-    tracked_conn = conn;
+    tracked_conn = &*conn;
     ++result->cnt_accept_dispatched;
     logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}",
                   result->cnt_accept_dispatched, result->index, *conn);
@@ -915,12 +915,17 @@ class FailoverSuite : public Dispatcher {
       ceph_abort();
     }
 
-    if (tracked_conn != conn) {
-      logger().error("[{}] {} got connected, but doesn't match tracked_conn [{}] {}",
+    if (tracked_conn &&
+        !tracked_conn->is_protocol_closed() &&
+        tracked_conn != &*conn) {
+      logger().error("[{}] {} got connected, but there's already a avlid tracked_conn [{}] {}",
                      result->index, *conn, tracked_index, *tracked_conn);
       ceph_abort();
     }
-    ceph_assert(result->index == tracked_index);
+
+    if (tracked_conn == &*conn) {
+      ceph_assert(result->index == tracked_index);
+    }
 
     ++result->cnt_connect_dispatched;
     logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}",
@@ -934,15 +939,15 @@ class FailoverSuite : public Dispatcher {
       ceph_abort();
     }
 
-    if (tracked_conn != conn) {
-      logger().error("[{}] {} got reset, but doesn't match tracked_conn [{}] {}",
-                     result->index, *conn, tracked_index, *tracked_conn);
-      ceph_abort();
+    if (tracked_conn != &*conn) {
+      logger().warn("[{}] {} got reset, but doesn't match tracked_conn [{}] {}",
+                    result->index, *conn, tracked_index, *tracked_conn);
+    } else {
+      ceph_assert(result->index == tracked_index);
+      tracked_index = 0;
+      tracked_conn = nullptr;
     }
-    ceph_assert(result->index == tracked_index);
 
-    tracked_index = 0;
-    tracked_conn = nullptr;
     ++result->cnt_reset_dispatched;
     logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}",
                   result->cnt_reset_dispatched, result->index, *conn);
@@ -955,12 +960,12 @@ class FailoverSuite : public Dispatcher {
       ceph_abort();
     }
 
-    if (tracked_conn != conn) {
-      logger().error("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}",
-                     result->index, *conn, tracked_index, *tracked_conn);
-      ceph_abort();
+    if (tracked_conn != &*conn) {
+      logger().warn("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}",
+                    result->index, *conn, tracked_index, *tracked_conn);
+    } else {
+      ceph_assert(result->index == tracked_index);
     }
-    ceph_assert(result->index == tracked_index);
 
     ++result->cnt_remote_reset_dispatched;
     logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}",
@@ -984,6 +989,7 @@ class FailoverSuite : public Dispatcher {
 
   seastar::future<> send_op(bool expect_reply=true) {
     ceph_assert(tracked_conn);
+    ceph_assert(!tracked_conn->is_protocol_closed());
     if (expect_reply) {
       ++pending_peer_receive;
     }
@@ -1000,6 +1006,7 @@ class FailoverSuite : public Dispatcher {
       logger().info("[Test] flush sending {} ops", pending_send);
     }
     ceph_assert(tracked_conn);
+    ceph_assert(!tracked_conn->is_protocol_closed());
     return seastar::do_until(
         [this] { return pending_send == 0; },
         [this] {
@@ -1020,12 +1027,6 @@ class FailoverSuite : public Dispatcher {
           ++replaced_conns;
         }
       } else if (result.conn->is_protocol_ready()) {
-        if (tracked_conn != result.conn || tracked_index != result.index) {
-          throw std::runtime_error(fmt::format(
-                "The connected connection [{}] {} doesn't"
-                " match the tracked connection [{}] {}",
-                result.index, *result.conn, tracked_index, *tracked_conn));
-        }
         if (pending_send == 0 && pending_peer_receive == 0 && pending_receive == 0) {
           result.state = conn_state_t::established;
         } else {
@@ -1174,18 +1175,19 @@ class FailoverSuite : public Dispatcher {
 
     if (tracked_conn) {
       if (tracked_conn->is_protocol_closed()) {
-        ceph_assert(tracked_conn != conn);
-        logger().info("[Test] this is a new session replacing an closed one");
+        logger().info("[Test] this is a new session"
+                      " replacing an closed one");
+        ceph_assert(tracked_conn != &*conn);
       } else {
-        ceph_assert(tracked_index == result->index);
-        ceph_assert(tracked_conn == conn);
         logger().info("[Test] this is not a new session");
+        ceph_assert(tracked_index == result->index);
+        ceph_assert(tracked_conn == &*conn);
       }
     } else {
       logger().info("[Test] this is a new session");
     }
     tracked_index = result->index;
-    tracked_conn = conn;
+    tracked_conn = &*conn;
 
     return flush_pending_send();
   }
@@ -1193,6 +1195,7 @@ class FailoverSuite : public Dispatcher {
   seastar::future<> send_peer() {
     if (tracked_conn) {
       logger().info("[Test] send_peer()");
+      ceph_assert(!tracked_conn->is_protocol_closed());
       ceph_assert(!pending_send);
       return send_op();
     } else {
@@ -1205,12 +1208,14 @@ class FailoverSuite : public Dispatcher {
   seastar::future<> keepalive_peer() {
     logger().info("[Test] keepalive_peer()");
     ceph_assert(tracked_conn);
+    ceph_assert(!tracked_conn->is_protocol_closed());
     return tracked_conn->send_keepalive();
   }
 
   seastar::future<> try_send_peer() {
     logger().info("[Test] try_send_peer()");
     ceph_assert(tracked_conn);
+    ceph_assert(!tracked_conn->is_protocol_closed());
     return send_op(false);
   }
 
@@ -1473,10 +1478,9 @@ class FailoverSuitePeer : public Dispatcher {
   ConnectionRef tracked_conn;
   unsigned pending_send = 0;
 
-  std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
+  std::optional<seastar::future<>> ms_dispatch(ConnectionRef conn, MessageRef m) override {
     logger().info("[TestPeer] got op from Test");
     ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
-    ceph_assert(tracked_conn == c);
     std::ignore = op_callback();
     return {seastar::now()};
   }
@@ -1487,17 +1491,19 @@ class FailoverSuitePeer : public Dispatcher {
       bool is_replace) override {
     assert(prv_shard == seastar::this_shard_id());
     logger().info("[TestPeer] got accept from Test");
-    ceph_assert(!tracked_conn ||
-                tracked_conn->is_protocol_closed() ||
-                tracked_conn == conn);
+
+    if (tracked_conn &&
+        !tracked_conn->is_protocol_closed() &&
+        tracked_conn != conn) {
+      logger().error("[TestPeer] {} got accepted, but there's already a valid traced_conn {}",
+                     *conn, *tracked_conn);
+    }
     tracked_conn = conn;
     std::ignore = flush_pending_send();
   }
 
   void ms_handle_reset(ConnectionRef conn, bool is_replace) override {
     logger().info("[TestPeer] got reset from Test");
-    ceph_assert(tracked_conn == conn);
-    tracked_conn = nullptr;
   }
 
  private:
@@ -1516,6 +1522,11 @@ class FailoverSuitePeer : public Dispatcher {
 
   seastar::future<> send_op() {
     ceph_assert(tracked_conn);
+    if (tracked_conn->is_protocol_closed()) {
+      logger().error("[TestPeer] send op but the connection is closed -- {}",
+                     *tracked_conn);
+    }
+
     pg_t pgid;
     object_locator_t oloc;
     hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
@@ -1539,7 +1550,8 @@ class FailoverSuitePeer : public Dispatcher {
 
  public:
   FailoverSuitePeer(MessengerRef peer_msgr, cb_t op_callback)
-    : peer_msgr(peer_msgr), op_callback(op_callback) { }
+    : peer_msgr(peer_msgr),
+      op_callback(op_callback) { }
 
   seastar::future<> shutdown() {
     peer_msgr->stop();
@@ -1548,26 +1560,29 @@ class FailoverSuitePeer : public Dispatcher {
 
   seastar::future<> connect_peer(entity_addr_t test_addr_decoded) {
     logger().info("[TestPeer] connect_peer({})", test_addr_decoded);
-    auto new_tracked_conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD);
+    auto conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD);
+
     if (tracked_conn) {
       if (tracked_conn->is_protocol_closed()) {
-        ceph_assert(tracked_conn != new_tracked_conn);
         logger().info("[TestPeer] this is a new session"
                       " replacing an closed one");
+        ceph_assert(tracked_conn != conn);
       } else {
-        ceph_assert(tracked_conn == new_tracked_conn);
         logger().info("[TestPeer] this is not a new session");
+        ceph_assert(tracked_conn == conn);
       }
     } else {
       logger().info("[TestPeer] this is a new session");
     }
-    tracked_conn = new_tracked_conn;
+    tracked_conn = conn;
+
     return flush_pending_send();
   }
 
   seastar::future<> send_peer() {
     if (tracked_conn) {
       logger().info("[TestPeer] send_peer()");
+      ceph_assert(!pending_send);
       return send_op();
     } else {
       ++pending_send;