});
}
- seastar::future<> wait_ready(unsigned num_conns) {
- assert(num_conns > 0);
- if (interceptor.results.size() > num_conns) {
- throw std::runtime_error(fmt::format(
- "{} connections, more than expected: {}",
- interceptor.results.size(), num_conns));
- }
-
+ seastar::future<> wait_ready(unsigned num_ready_conns,
+ unsigned num_replaced,
+ bool wait_received) {
+ unsigned pending_conns = 0;
+ unsigned pending_establish = 0;
+ unsigned replaced_conns = 0;
for (auto& result : interceptor.results) {
if (result.conn->is_closed()) {
- continue;
- }
-
- if (result.conn->is_connected()) {
+ if (result.state == conn_state_t::replaced) {
+ ++replaced_conns;
+ }
+ } else if (result.conn->is_connected()) {
if (tracked_conn != result.conn || tracked_index != result.index) {
throw std::runtime_error(fmt::format(
"The connected connection [{}] {} doesn't"
" match the tracked connection [{}] {}",
result.index, *result.conn, tracked_index, tracked_conn));
}
-
- if (pending_send || pending_peer_receive || pending_receive) {
- logger().info("Waiting for pending_send={} pending_peer_receive={}"
- " pending_receive={} from [{}] {}",
- pending_send, pending_peer_receive, pending_receive,
- result.index, *result.conn);
- return interceptor.wait().then([this, num_conns] {
- return wait_ready(num_conns);
- });
- } else {
+ if (pending_send == 0 && pending_peer_receive == 0 && pending_receive == 0) {
result.state = conn_state_t::established;
+ } else {
+ ++pending_establish;
}
} else {
- logger().info("Waiting for connection [{}] {} connected/closed",
- result.index, *result.conn);
- return interceptor.wait().then([this, num_conns] {
- return wait_ready(num_conns);
- });
+ ++pending_conns;
}
}
- if (interceptor.results.size() < num_conns) {
- logger().info("Waiting for incoming connection, currently {}, expected {}",
- interceptor.results.size(), num_conns);
- return interceptor.wait().then([this, num_conns] {
- return wait_ready(num_conns);
- });
+ bool do_wait = false;
+ if (num_ready_conns > 0) {
+ if (interceptor.results.size() > num_ready_conns) {
+ throw std::runtime_error(fmt::format(
+ "{} connections,more than expected: {}",
+ interceptor.results.size(), num_ready_conns));
+ } else if (interceptor.results.size() < num_ready_conns || pending_conns > 0) {
+ logger().info("Waiting for connections, currently {} out of {},"
+ " pending {} ready ...",
+ interceptor.results.size(), num_ready_conns, pending_conns);
+ do_wait = true;
+ }
+ }
+ if (wait_received &&
+ (pending_send || pending_peer_receive || pending_receive)) {
+ if (pending_conns || pending_establish) {
+ logger().info("Waiting for pending_send={}, pending_peer_receive={},"
+ " pending_receive={}, with {} pending connections ...",
+ pending_send, pending_peer_receive, pending_receive,
+ pending_conns + pending_establish);
+ do_wait = true;
+ }
+ }
+ if (num_replaced > 0) {
+ if (replaced_conns > num_replaced) {
+ throw std::runtime_error(fmt::format(
+ "{} replaced connections, more than expected: {}",
+ replaced_conns, num_replaced));
+ }
+ if (replaced_conns < num_replaced) {
+ logger().info("Waiting for {} replaced connections, currently {} ...",
+ num_replaced, replaced_conns);
+ do_wait = true;
+ }
}
- logger().debug("Wait done!");
- return seastar::now();
+ if (do_wait) {
+ return interceptor.wait(
+ ).then([this, num_ready_conns, num_replaced, wait_received] {
+ return wait_ready(num_ready_conns, num_replaced, wait_received);
+ });
+ } else {
+ logger().info("Wait done!");
+ return seastar::now();
+ }
}
// called by FailoverTest
}
}
+ seastar::future<> wait_replaced(unsigned count) {
+ return wait_ready(0, count, false);
+ }
+
+ seastar::future<> wait_established() {
+ return wait_ready(0, 0, true);
+ }
+
seastar::future<std::reference_wrapper<ConnResults>>
- wait_results(unsigned num_conns) {
- return wait_ready(num_conns).then([this] {
+ wait_results(unsigned count) {
+ return wait_ready(count, 0, true).then([this] {
return std::reference_wrapper<ConnResults>(interceptor.results);
});
}