]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
test/crimson: test reconnect racing cases for v2
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 6 Sep 2019 03:01:16 +0000 (11:01 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 18 Sep 2019 04:30:14 +0000 (12:30 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/test/crimson/test_messenger.cc

index 1bba72861ce47312b75afb50dfb73b5c41d880fa..285a144cff145cefc54f3d0fcce654f35e6e26f0 100644 (file)
@@ -2180,16 +2180,145 @@ test_v2_peer_connected_fault_reaccept(FailoverTest& test) {
   });
 }
 
+seastar::future<bool>
+peer_wins(FailoverTest& test) {
+  return seastar::do_with(bool(), [&test] (auto& ret) {
+    return test.run_suite("peer_wins",
+                          TestInterceptor(),
+                          policy_t::lossy_client,
+                          policy_t::stateless_server,
+                          [&test, &ret] (FailoverSuite& suite) {
+      return suite.connect_peer().then([&suite] {
+        return suite.wait_results(1);
+      }).then([&ret] (ConnResults& results) {
+        results[0].assert_state_at(conn_state_t::established);
+        ret = results[0].conn->peer_wins();
+        logger().info("peer_wins: {}", ret);
+      });
+    }).then([&ret] {
+      return ret;
+    });
+  });
+}
+
+seastar::future<>
+test_v2_racing_reconnect_win(FailoverTest& test) {
+  return seastar::do_with(std::vector<std::pair<unsigned, Breakpoint>>{
+      {2, {custom_bp_t::BANNER_WRITE}},
+      {2, {custom_bp_t::BANNER_READ}},
+      {2, {custom_bp_t::BANNER_PAYLOAD_READ}},
+      {2, {Tag::HELLO, bp_type_t::WRITE}},
+      {2, {Tag::HELLO, bp_type_t::READ}},
+      {2, {Tag::AUTH_REQUEST, bp_type_t::READ}},
+      {2, {Tag::AUTH_DONE, bp_type_t::WRITE}},
+      {2, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}},
+      {2, {Tag::AUTH_SIGNATURE, bp_type_t::READ}},
+      {1, {Tag::SESSION_RECONNECT, bp_type_t::READ}},
+  }, [&test] (auto& failure_cases) {
+    return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+      TestInterceptor interceptor;
+      interceptor.make_fault({Tag::MESSAGE, bp_type_t::READ});
+      interceptor.make_block(bp.second, bp.first);
+      return test.run_suite(
+          fmt::format("test_v2_racing_reconnect_win -- {}({})",
+                      bp.second, bp.first),
+          interceptor,
+          policy_t::lossless_peer,
+          policy_t::lossless_peer,
+          [&test] (FailoverSuite& suite) {
+        return seastar::futurize_apply([&test] {
+          return test.peer_send_me();
+        }).then([&test] {
+          return test.peer_connect_me();
+        }).then([&suite] {
+          return suite.wait_blocked();
+        }).then([&suite] {
+          return suite.send_peer();
+        }).then([&suite] {
+          return suite.wait_established();
+        }).then([&suite] {
+          suite.unblock();
+          return suite.wait_results(2);
+        }).then([] (ConnResults& results) {
+          results[0].assert_state_at(conn_state_t::established);
+          results[0].assert_connect(1, 0, 1, 1);
+          results[0].assert_accept(1, 1, 0, 1);
+          results[0].assert_reset(0, 0);
+          results[1].assert_state_at(conn_state_t::closed);
+          results[1].assert_connect(0, 0, 0, 0);
+          ceph_assert(results[1].accept_attempts == 1);
+          ceph_assert(results[1].cnt_accept_dispatched == 0);
+          results[1].assert_reset(0, 0);
+        });
+      });
+    });
+  });
+}
+
 seastar::future<>
-test_v2_failover(entity_addr_t test_addr = entity_addr_t(),
+test_v2_racing_reconnect_lose(FailoverTest& test) {
+  return seastar::do_with(std::vector<std::pair<unsigned, Breakpoint>>{
+      {2, {custom_bp_t::BANNER_WRITE}},
+      {2, {custom_bp_t::BANNER_READ}},
+      {2, {custom_bp_t::BANNER_PAYLOAD_READ}},
+      {2, {Tag::HELLO, bp_type_t::WRITE}},
+      {2, {Tag::HELLO, bp_type_t::READ}},
+      {2, {Tag::AUTH_REQUEST, bp_type_t::WRITE}},
+      {2, {Tag::AUTH_DONE, bp_type_t::READ}},
+      {2, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}},
+      {2, {Tag::AUTH_SIGNATURE, bp_type_t::READ}},
+      {1, {Tag::SESSION_RECONNECT, bp_type_t::WRITE}},
+  }, [&test] (auto& failure_cases) {
+    return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+      TestInterceptor interceptor;
+      interceptor.make_fault({Tag::MESSAGE, bp_type_t::WRITE});
+      interceptor.make_block(bp.second, bp.first);
+      return test.run_suite(
+          fmt::format("test_v2_racing_reconnect_lose -- {}({})",
+                      bp.second, bp.first),
+          interceptor,
+          policy_t::lossless_peer,
+          policy_t::lossless_peer,
+          [&test] (FailoverSuite& suite) {
+        return seastar::futurize_apply([&suite] {
+          return suite.send_peer();
+        }).then([&suite] {
+          return suite.connect_peer();
+        }).then([&suite] {
+          return suite.wait_blocked();
+        }).then([&test] {
+          return test.peer_send_me();
+        }).then([&suite] {
+          return suite.wait_replaced(1);
+        }).then([&suite] {
+          suite.unblock();
+          return suite.wait_results(2);
+        }).then([] (ConnResults& results) {
+          results[0].assert_state_at(conn_state_t::established);
+          ceph_assert(results[0].connect_attempts == 2);
+          ceph_assert(results[0].cnt_connect_dispatched == 1);
+          results[0].assert_accept(0, 0, 0, 0);
+          results[0].assert_reset(0, 0);
+          results[1].assert_state_at(conn_state_t::replaced);
+          results[1].assert_connect(0, 0, 0, 0);
+          results[1].assert_accept(1, 0, 1, 0);
+          results[1].assert_reset(0, 0);
+        });
+      });
+    });
+  });
+}
+
+seastar::future<>
+test_v2_protocol(entity_addr_t test_addr = entity_addr_t(),
                  entity_addr_t cmd_peer_addr = entity_addr_t()) {
   if (test_addr == entity_addr_t() || cmd_peer_addr == entity_addr_t()) {
     // initiate crimson test peer locally
-    logger().info("test_v2_failover: start local TestPeer...");
+    logger().info("test_v2_protocol: start local TestPeer...");
     return FailoverTestPeer::create().then([] (auto peer) {
       entity_addr_t test_addr_;
       test_addr_.parse("v2:127.0.0.1:9010");
-      return test_v2_failover(test_addr_, peer->get_addr()
+      return test_v2_protocol(test_addr_, peer->get_addr()
       ).finally([peer = std::move(peer)] () mutable {
         return peer->wait().then([peer = std::move(peer)] {});
       });
@@ -2233,6 +2362,14 @@ test_v2_failover(entity_addr_t test_addr = entity_addr_t(),
       return test_v2_peer_connected_fault_reconnect(*test);
     }).then([test] {
       return test_v2_peer_connected_fault_reaccept(*test);
+    }).then([test] {
+      return peer_wins(*test);
+    }).then([test] (bool peer_wins) {
+      if (peer_wins) {
+        return test_v2_racing_reconnect_lose(*test);
+      } else {
+        return test_v2_racing_reconnect_win(*test);
+      }
     }).finally([test] {
       return test->shutdown().then([test] {});
     });
@@ -2271,7 +2408,7 @@ int main(int argc, char** argv)
     }).then([] {
       return test_preemptive_shutdown(true);
     }).then([] {
-      return test_v2_failover();
+      return test_v2_protocol();
     }).then([] {
       std::cout << "All tests succeeded" << std::endl;
     }).handle_exception([] (auto eptr) {