void ProtocolV2::execute_connecting()
{
trigger_state(state_t::CONNECTING, write_state_t::delay, true);
+ if (socket) {
+ socket->shutdown();
+ }
seastar::with_gate(pending_dispatch, [this] {
// we don't know my socket_port yet
conn.set_ephemeral_port(0, SocketConnection::side_t::none);
logger().debug("{} UPDATE: gs={}, cc={} for connect",
conn, global_seq, client_cookie);
}
+
+ return wait_write_exit();
+ }).then([this] {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} before Socket::connect()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ if (socket) {
+ with_gate(pending_dispatch, [this, sock = std::move(socket)] () mutable {
+ return sock->close().then([sock = std::move(sock)] {});
+ });
+ }
return Socket::connect(conn.peer_addr);
}).then([this](SocketFRef sock) {
logger().debug("{} socket connected", conn);
- socket = std::move(sock);
- if (state == state_t::CLOSING) {
- return socket->close().then([this] {
- logger().warn("{} is closed during Socket::connect()", conn);
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} during Socket::connect()",
+ conn, get_state_name(state));
+ return sock->close().then([this, sock = std::move(sock)] {
abort_protocol();
});
}
+ socket = std::move(sock);
return seastar::now();
}).then([this] {
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();