From 2b83e185920908d67f64a4ea4e30e7a6964fe2e2 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 6 Sep 2019 10:46:47 +0800 Subject: [PATCH] test/crimson: implement different ways to wait in tests wait_replaced(count): wait for connection replaced; wait_established(): wait for all pending messages sent and received; Signed-off-by: Yingxin Cheng --- src/test/crimson/test_messenger.cc | 104 +++++++++++++++++++---------- 1 file changed, 67 insertions(+), 37 deletions(-) diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 158cd13ed74..5a6dbae9ded 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -1037,57 +1037,79 @@ class FailoverSuite : public Dispatcher { }); } - seastar::future<> wait_ready(unsigned num_conns) { - assert(num_conns > 0); - if (interceptor.results.size() > num_conns) { - throw std::runtime_error(fmt::format( - "{} connections, more than expected: {}", - interceptor.results.size(), num_conns)); - } - + seastar::future<> wait_ready(unsigned num_ready_conns, + unsigned num_replaced, + bool wait_received) { + unsigned pending_conns = 0; + unsigned pending_establish = 0; + unsigned replaced_conns = 0; for (auto& result : interceptor.results) { if (result.conn->is_closed()) { - continue; - } - - if (result.conn->is_connected()) { + if (result.state == conn_state_t::replaced) { + ++replaced_conns; + } + } else if (result.conn->is_connected()) { if (tracked_conn != result.conn || tracked_index != result.index) { throw std::runtime_error(fmt::format( "The connected connection [{}] {} doesn't" " match the tracked connection [{}] {}", result.index, *result.conn, tracked_index, tracked_conn)); } - - if (pending_send || pending_peer_receive || pending_receive) { - logger().info("Waiting for pending_send={} pending_peer_receive={}" - " pending_receive={} from [{}] {}", - pending_send, pending_peer_receive, pending_receive, - result.index, *result.conn); - return interceptor.wait().then([this, num_conns] { - return wait_ready(num_conns); - }); - } else { + if (pending_send == 0 && pending_peer_receive == 0 && pending_receive == 0) { result.state = conn_state_t::established; + } else { + ++pending_establish; } } else { - logger().info("Waiting for connection [{}] {} connected/closed", - result.index, *result.conn); - return interceptor.wait().then([this, num_conns] { - return wait_ready(num_conns); - }); + ++pending_conns; } } - if (interceptor.results.size() < num_conns) { - logger().info("Waiting for incoming connection, currently {}, expected {}", - interceptor.results.size(), num_conns); - return interceptor.wait().then([this, num_conns] { - return wait_ready(num_conns); - }); + bool do_wait = false; + if (num_ready_conns > 0) { + if (interceptor.results.size() > num_ready_conns) { + throw std::runtime_error(fmt::format( + "{} connections,more than expected: {}", + interceptor.results.size(), num_ready_conns)); + } else if (interceptor.results.size() < num_ready_conns || pending_conns > 0) { + logger().info("Waiting for connections, currently {} out of {}," + " pending {} ready ...", + interceptor.results.size(), num_ready_conns, pending_conns); + do_wait = true; + } + } + if (wait_received && + (pending_send || pending_peer_receive || pending_receive)) { + if (pending_conns || pending_establish) { + logger().info("Waiting for pending_send={}, pending_peer_receive={}," + " pending_receive={}, with {} pending connections ...", + pending_send, pending_peer_receive, pending_receive, + pending_conns + pending_establish); + do_wait = true; + } + } + if (num_replaced > 0) { + if (replaced_conns > num_replaced) { + throw std::runtime_error(fmt::format( + "{} replaced connections, more than expected: {}", + replaced_conns, num_replaced)); + } + if (replaced_conns < num_replaced) { + logger().info("Waiting for {} replaced connections, currently {} ...", + num_replaced, replaced_conns); + do_wait = true; + } } - logger().debug("Wait done!"); - return seastar::now(); + if (do_wait) { + return interceptor.wait( + ).then([this, num_ready_conns, num_replaced, wait_received] { + return wait_ready(num_ready_conns, num_replaced, wait_received); + }); + } else { + logger().info("Wait done!"); + return seastar::now(); + } } // called by FailoverTest @@ -1180,9 +1202,17 @@ class FailoverSuite : public Dispatcher { } } + seastar::future<> wait_replaced(unsigned count) { + return wait_ready(0, count, false); + } + + seastar::future<> wait_established() { + return wait_ready(0, 0, true); + } + seastar::future> - wait_results(unsigned num_conns) { - return wait_ready(num_conns).then([this] { + wait_results(unsigned count) { + return wait_ready(count, 0, true).then([this] { return std::reference_wrapper(interceptor.results); }); } -- 2.39.5