]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
test/crimson: improve messenger test coverage
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 13 Feb 2023 05:37:25 +0000 (13:37 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 13 Feb 2023 06:39:46 +0000 (14:39 +0800)
* add blockings at custom_bp_t::SOCKET_CONNECTING
* adjust peer_wins to true

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/ProtocolV2.cc
src/crimson/net/SocketConnection.h
src/test/crimson/test_messenger.cc
src/test/crimson/test_messenger.h
src/test/crimson/test_messenger_peer.cc

index 1a64662112d979778df14d8e0d49f23c207f47fc..10bd054d1bd8a4bcdca3294895e8440e0e4f9ed9 100644 (file)
@@ -1166,6 +1166,7 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
     // each other at the same time.
     if (existing_proto->client_cookie != client_cookie) {
       if (existing_conn->peer_wins()) {
+        // acceptor (this connection, the peer) wins
         logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
                       " and win, reusing existing {} {}",
                       conn,
@@ -1175,6 +1176,7 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
                       *existing_conn);
         return reuse_connection(existing_proto);
       } else {
+        // acceptor (this connection, the peer) loses
         logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
                       " and lose to existing {}, ask client to wait",
                       conn, client_cookie, existing_proto->client_cookie, *existing_conn);
@@ -1446,6 +1448,7 @@ ProtocolV2::server_reconnect()
     } else if (existing_proto->connect_seq == reconnect.connect_seq()) {
       // reconnect race: both peers are sending reconnect messages
       if (existing_conn->peer_wins()) {
+        // acceptor (this connection, the peer) wins
         logger().warn("{} server_reconnect: reconnect race detected (cs={})"
                       " and win, reusing existing {} {}",
                       conn,
@@ -1456,6 +1459,7 @@ ProtocolV2::server_reconnect()
             existing_proto, false,
             true, reconnect.connect_seq(), reconnect.msg_seq());
       } else {
+        // acceptor (this connection, the peer) loses
         logger().warn("{} server_reconnect: reconnect race detected (cs={})"
                       " and lose to existing {}, ask client to wait",
                       conn, reconnect.connect_seq(), *existing_conn);
index 863968cac58d72cebd726e9e01c79aa8e5724125..f6ef6f49753315b26323e6e8c63bb71e84c039bd 100644 (file)
@@ -209,10 +209,12 @@ private:
 
   bool is_closed() const override;
 
+  // peer wins if myaddr > peeraddr
   bool peer_wins() const override;
 
   Interceptor *interceptor = nullptr;
 #else
+  // peer wins if myaddr > peeraddr
   bool peer_wins() const;
 #endif
 
index dd43496c50e6e6c3dc4fb45215f1a23d72c54aa7..8f14bafb6522573b03358dd083d9ddd024987b06 100644 (file)
@@ -1671,10 +1671,10 @@ class FailoverTestPeer : public Dispatcher {
 seastar::future<>
 test_v2_lossy_early_connect_fault(FailoverTest& test) {
   return seastar::do_with(std::vector<Breakpoint>{
+      {custom_bp_t::SOCKET_CONNECTING},
       {custom_bp_t::BANNER_WRITE},
       {custom_bp_t::BANNER_READ},
       {custom_bp_t::BANNER_PAYLOAD_READ},
-      {custom_bp_t::SOCKET_CONNECTING},
       {Tag::HELLO, bp_type_t::WRITE},
       {Tag::HELLO, bp_type_t::READ},
       {Tag::AUTH_REQUEST, bp_type_t::WRITE},
@@ -2360,9 +2360,9 @@ test_v2_peer_connected_fault_reaccept(FailoverTest& test) {
 }
 
 seastar::future<bool>
-peer_wins(FailoverTest& test) {
+check_peer_wins(FailoverTest& test) {
   return seastar::do_with(bool(), [&test] (auto& ret) {
-    return test.run_suite("peer_wins",
+    return test.run_suite("check_peer_wins",
                           TestInterceptor(),
                           policy_t::lossy_client,
                           policy_t::stateless_server,
@@ -2372,7 +2372,7 @@ peer_wins(FailoverTest& test) {
       }).then([&ret] (ConnResults& results) {
         results[0].assert_state_at(conn_state_t::established);
         ret = results[0].conn->peer_wins();
-        logger().info("peer_wins: {}", ret);
+        logger().info("check_peer_wins: {}", ret);
       });
     }).then([&ret] {
       return ret;
@@ -2381,8 +2381,9 @@ peer_wins(FailoverTest& test) {
 }
 
 seastar::future<>
-test_v2_racing_reconnect_win(FailoverTest& test) {
+test_v2_racing_reconnect_acceptor_lose(FailoverTest& test) {
   return seastar::do_with(std::vector<std::pair<unsigned, Breakpoint>>{
+      {1, {Tag::SESSION_RECONNECT, bp_type_t::READ}},
       {2, {custom_bp_t::BANNER_WRITE}},
       {2, {custom_bp_t::BANNER_READ}},
       {2, {custom_bp_t::BANNER_PAYLOAD_READ}},
@@ -2392,14 +2393,15 @@ test_v2_racing_reconnect_win(FailoverTest& test) {
       {2, {Tag::AUTH_DONE, bp_type_t::WRITE}},
       {2, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}},
       {2, {Tag::AUTH_SIGNATURE, bp_type_t::READ}},
-      {1, {Tag::SESSION_RECONNECT, bp_type_t::READ}},
   }, [&test] (auto& failure_cases) {
     return seastar::do_for_each(failure_cases, [&test] (auto bp) {
       TestInterceptor interceptor;
+      // fault acceptor
       interceptor.make_fault({Tag::MESSAGE, bp_type_t::READ});
+      // block acceptor
       interceptor.make_block(bp.second, bp.first);
       return test.run_suite(
-          fmt::format("test_v2_racing_reconnect_win -- {}({})",
+          fmt::format("test_v2_racing_reconnect_acceptor_lose -- {}({})",
                       bp.second, bp.first),
           interceptor,
           policy_t::lossless_peer,
@@ -2434,8 +2436,10 @@ test_v2_racing_reconnect_win(FailoverTest& test) {
 }
 
 seastar::future<>
-test_v2_racing_reconnect_lose(FailoverTest& test) {
+test_v2_racing_reconnect_acceptor_win(FailoverTest& test) {
   return seastar::do_with(std::vector<std::pair<unsigned, Breakpoint>>{
+      {1, {Tag::SESSION_RECONNECT, bp_type_t::WRITE}},
+      {2, {custom_bp_t::SOCKET_CONNECTING}},
       {2, {custom_bp_t::BANNER_WRITE}},
       {2, {custom_bp_t::BANNER_READ}},
       {2, {custom_bp_t::BANNER_PAYLOAD_READ}},
@@ -2445,14 +2449,15 @@ test_v2_racing_reconnect_lose(FailoverTest& test) {
       {2, {Tag::AUTH_DONE, bp_type_t::READ}},
       {2, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}},
       {2, {Tag::AUTH_SIGNATURE, bp_type_t::READ}},
-      {1, {Tag::SESSION_RECONNECT, bp_type_t::WRITE}},
   }, [&test] (auto& failure_cases) {
     return seastar::do_for_each(failure_cases, [&test] (auto bp) {
       TestInterceptor interceptor;
+      // fault connector
       interceptor.make_fault({Tag::MESSAGE, bp_type_t::WRITE});
+      // block connector
       interceptor.make_block(bp.second, bp.first);
       return test.run_suite(
-          fmt::format("test_v2_racing_reconnect_lose -- {}({})",
+          fmt::format("test_v2_racing_reconnect_acceptor_win -- {}({})",
                       bp.second, bp.first),
           interceptor,
           policy_t::lossless_peer,
@@ -2487,7 +2492,7 @@ test_v2_racing_reconnect_lose(FailoverTest& test) {
 }
 
 seastar::future<>
-test_v2_racing_connect_win(FailoverTest& test) {
+test_v2_racing_connect_acceptor_lose(FailoverTest& test) {
   return seastar::do_with(std::vector<Breakpoint>{
       {custom_bp_t::BANNER_WRITE},
       {custom_bp_t::BANNER_READ},
@@ -2502,9 +2507,10 @@ test_v2_racing_connect_win(FailoverTest& test) {
   }, [&test] (auto& failure_cases) {
     return seastar::do_for_each(failure_cases, [&test] (auto bp) {
       TestInterceptor interceptor;
+      // block acceptor
       interceptor.make_block(bp);
       return test.run_suite(
-          fmt::format("test_v2_racing_connect_win -- {}", bp),
+          fmt::format("test_v2_racing_connect_acceptor_lose -- {}", bp),
           interceptor,
           policy_t::lossless_peer,
           policy_t::lossless_peer,
@@ -2540,8 +2546,9 @@ test_v2_racing_connect_win(FailoverTest& test) {
 }
 
 seastar::future<>
-test_v2_racing_connect_lose(FailoverTest& test) {
+test_v2_racing_connect_acceptor_win(FailoverTest& test) {
   return seastar::do_with(std::vector<Breakpoint>{
+      {custom_bp_t::SOCKET_CONNECTING},
       {custom_bp_t::BANNER_WRITE},
       {custom_bp_t::BANNER_READ},
       {custom_bp_t::BANNER_PAYLOAD_READ},
@@ -2555,9 +2562,10 @@ test_v2_racing_connect_lose(FailoverTest& test) {
   }, [&test] (auto& failure_cases) {
     return seastar::do_for_each(failure_cases, [&test] (auto bp) {
       TestInterceptor interceptor;
+      // block connector
       interceptor.make_block(bp);
       return test.run_suite(
-          fmt::format("test_v2_racing_connect_lose -- {}", bp),
+          fmt::format("test_v2_racing_connect_acceptor_win -- {}", bp),
           interceptor,
           policy_t::lossless_peer,
           policy_t::lossless_peer,
@@ -3465,7 +3473,8 @@ seastar::future<>
 test_v2_protocol(entity_addr_t test_addr,
                  entity_addr_t cmd_peer_addr,
                  entity_addr_t test_peer_addr,
-                 bool test_peer_islocal) {
+                 bool test_peer_islocal,
+                 bool peer_wins) {
   ceph_assert_always(test_addr.is_msgr2());
   ceph_assert_always(cmd_peer_addr.is_msgr2());
   ceph_assert_always(test_peer_addr.is_msgr2());
@@ -3474,8 +3483,13 @@ test_v2_protocol(entity_addr_t test_addr,
     // initiate crimson test peer locally
     logger().info("test_v2_protocol: start local TestPeer at {}...", cmd_peer_addr);
     return FailoverTestPeer::create(cmd_peer_addr, test_peer_addr
-    ).then([test_addr, cmd_peer_addr, test_peer_addr](auto peer) {
-      return test_v2_protocol(test_addr, cmd_peer_addr, test_peer_addr, false
+    ).then([test_addr, cmd_peer_addr, test_peer_addr, peer_wins](auto peer) {
+      return test_v2_protocol(
+        test_addr,
+        cmd_peer_addr,
+        test_peer_addr,
+        false,
+        peer_wins
       ).then([peer = std::move(peer)] () mutable {
         return peer->wait().then([peer = std::move(peer)] {});
       });
@@ -3486,7 +3500,7 @@ test_v2_protocol(entity_addr_t test_addr,
   }
 
   return FailoverTest::create(test_addr, cmd_peer_addr, test_peer_addr
-  ).then([](auto test) {
+  ).then([peer_wins](auto test) {
     return seastar::futurize_invoke([test] {
       return test_v2_lossy_early_connect_fault(*test);
     }).then([test] {
@@ -3528,19 +3542,20 @@ test_v2_protocol(entity_addr_t test_addr,
     }).then([test] {
       return test_v2_peer_connected_fault_reaccept(*test);
     }).then([test] {
-      return peer_wins(*test);
-    }).then([test] (bool peer_wins) {
-      if (peer_wins) {
+      return check_peer_wins(*test);
+    }).then([test, peer_wins](bool ret_peer_wins) {
+      ceph_assert(peer_wins == ret_peer_wins);
+      if (ret_peer_wins) {
         return seastar::futurize_invoke([test] {
-          return test_v2_racing_connect_lose(*test);
+          return test_v2_racing_connect_acceptor_win(*test);
         }).then([test] {
-          return test_v2_racing_reconnect_lose(*test);
+          return test_v2_racing_reconnect_acceptor_win(*test);
         });
       } else {
         return seastar::futurize_invoke([test] {
-          return test_v2_racing_connect_win(*test);
+          return test_v2_racing_connect_acceptor_lose(*test);
         }).then([test] {
-          return test_v2_racing_reconnect_win(*test);
+          return test_v2_racing_reconnect_acceptor_lose(*test);
         });
       }
     }).then([test] {
@@ -3614,20 +3629,26 @@ seastar::future<int> do_test(seastar::app_template& app)
     cmd_peer_addr.set_nonce(CMD_SRV_NONCE);
 
     entity_addr_t test_peer_addr = get_test_peer_addr(cmd_peer_addr);
+    bool peer_wins = (test_addr > test_peer_addr);
 
     logger().info("test configuration: verbose={}, rounds={}, keepalive_ratio={}, "
                   "test_addr={}, cmd_peer_addr={}, test_peer_addr={}, "
-                  "testpeer_islocal={}",
+                  "testpeer_islocal={}, peer_wins={}",
                   verbose, rounds, keepalive_ratio,
                   test_addr, cmd_peer_addr, test_peer_addr,
-                  testpeer_islocal);
-    return test_echo(rounds, keepalive_ratio)
-    .then([] {
+                  testpeer_islocal, peer_wins);
+    return test_echo(rounds, keepalive_ratio
+    ).then([] {
       return test_concurrent_dispatch();
     }).then([] {
       return test_preemptive_shutdown();
-    }).then([test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal] {
-      return test_v2_protocol(test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal);
+    }).then([test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal, peer_wins] {
+      return test_v2_protocol(
+          test_addr,
+          cmd_peer_addr,
+          test_peer_addr,
+          testpeer_islocal,
+          peer_wins);
     }).then([] {
       logger().info("All tests succeeded");
       // Seastar has bugs to have events undispatched during shutdown,
@@ -3654,9 +3675,9 @@ int main(int argc, char** argv)
      "number of pingpong rounds")
     ("keepalive-ratio", bpo::value<double>()->default_value(0.1),
      "ratio of keepalive in ping messages")
-    ("test-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9012"),
+    ("test-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9014"),
      "address of v2 failover tests")
-    ("testpeer-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9013"),
+    ("testpeer-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9012"),
      "addresses of v2 failover testpeer"
      " (This is CmdSrv address, and TestPeer address is at port+=1)")
     ("testpeer-islocal", bpo::value<bool>()->default_value(true),
index 75d060f6f39c3b1f2df367e90b9a923ea899e928..635f7fae3d65d82955a6155a89eb6ce66be08ba1 100644 (file)
@@ -13,7 +13,7 @@ constexpr uint64_t TEST_NONCE = 2;
 constexpr int64_t TEST_OSD = 2;
 constexpr uint64_t CMD_SRV_NONCE = 3;
 constexpr int64_t CMD_SRV_OSD = 3;
-constexpr uint64_t TEST_PEER_NONCE = 4;
+constexpr uint64_t TEST_PEER_NONCE = 2;
 constexpr int64_t TEST_PEER_OSD = 4;
 
 inline entity_addr_t get_test_peer_addr(
index 34d7a7413fcac888fae93fd12d55f1e79b96da37..28d8a3d384363466507cf43945556c6bcf0dd809 100644 (file)
@@ -411,7 +411,7 @@ int main(int argc, char** argv)
   po::options_description desc{"Allowed options"};
   desc.add_options()
     ("help,h", "show help message")
-    ("addr", po::value<std::string>()->default_value("v2:127.0.0.1:9013"),
+    ("addr", po::value<std::string>()->default_value("v2:127.0.0.1:9012"),
      "This is CmdSrv address, and TestPeer address is at port+=1")
     ("nonstop", po::value<bool>()->default_value(false),
      "Do not shutdown TestPeer when all tests are successful");