#ifdef UNIT_TESTS_BUILT
virtual bool is_closed() const = 0;
+ virtual bool is_closed_clean() const = 0;
+
virtual bool peer_wins() const = 0;
#endif
#include "crimson/common/log.h"
#include "crimson/net/Errors.h"
+#include "crimson/net/Dispatcher.h"
#include "crimson/net/Socket.h"
#include "crimson/net/SocketConnection.h"
#include "msg/Message.h"
return write_state == write_state_t::open;
}
-seastar::future<> Protocol::close()
+void Protocol::close(bool dispatch_reset)
{
if (closed) {
// already closing
assert(close_ready.valid());
- return close_ready.get_future();
+ return;
}
// unregister_conn() drops a reference, so hold another until completion
auto cleanup = [conn_ref = conn.shared_from_this(), this] {
logger().debug("{} closed!", conn);
+#ifdef UNIT_TESTS_BUILT
+ is_closed_clean = true;
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_closed(conn);
+ }
+#endif
};
- trigger_close();
-
// close_ready become valid only after state is state_t::closing
assert(!close_ready.valid());
+ // atomic operations
+ trigger_close();
if (socket) {
socket->shutdown();
- close_ready = pending_dispatch.close().finally([this] {
- return socket->close();
- }).finally(std::move(cleanup));
- } else {
- close_ready = pending_dispatch.close().finally(std::move(cleanup));
}
-
closed = true;
set_write_state(write_state_t::drop);
+ auto gate_closed = pending_dispatch.close();
- return close_ready.get_future();
+ // asynchronous operations
+ close_ready = seastar::when_all_succeed(
+ std::move(gate_closed).finally([this] {
+ if (socket) {
+ return socket->close();
+ }
+ return seastar::now();
+ }),
+ [this, dispatch_reset] {
+ if (dispatch_reset) {
+ // force ms_handle_reset() to be an asynchronous task to prevent
+ // internal state contamination.
+ return seastar::sleep(0s).then([this] {
+ return dispatcher.ms_handle_reset(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ logger().error("{} ms_handle_reset caught exception: {}", conn, eptr);
+ ceph_abort("unexpected exception from ms_handle_reset()");
+ });
+ }
+ return seastar::now();
+ }
+ ).finally(std::move(cleanup));
}
seastar::future<> Protocol::send(MessageRef msg)
bool is_connected() const;
+#ifdef UNIT_TESTS_BUILT
+ bool is_closed_clean = false;
bool is_closed() const { return closed; }
+#endif
// Reentrant closing
- seastar::future<> close();
+ void close(bool dispatch_reset);
+ seastar::future<> close_clean(bool dispatch_reset) {
+ close(dispatch_reset);
+ return close_ready.get_future();
+ }
virtual void start_connect(const entity_addr_t& peer_addr,
const entity_type_t& peer_type) = 0;
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the connecting state
logger().warn("{} connecting fault: {}", conn, eptr);
- (void) close();
+ close(false);
});
});
}
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the accepting state
logger().warn("{} accepting fault: {}", conn, eptr);
- (void) close();
+ close(false);
});
});
}
logger().warn("{} open fault: {}", conn, e);
if (e.code() == error::protocol_aborted ||
e.code() == std::errc::connection_reset) {
- return dispatcher.ms_handle_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
- .then([this] {
- (void) close();
- });
+ close(true);
+ return seastar::now();
} else if (e.code() == error::read_eof) {
return dispatcher.ms_handle_remote_reset(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
.then([this] {
- (void) close();
+ close(false);
});
} else {
throw e;
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the open state
logger().warn("{} open fault: {}", conn, eptr);
- (void) close();
+ close(false);
});
});
}
throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
}
-void abort_in_close(crimson::net::ProtocolV2& proto) {
- (void) proto.close();
+void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) {
+ proto.close(dispatch_reset);
abort_protocol();
}
if (conn.policy.lossy) {
logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
conn, func_name, get_state_name(state), eptr);
- dispatch_reset();
- (void) close();
+ close(true);
} else if (conn.policy.server ||
(conn.policy.standby &&
(!is_queued() && conn.sent.empty()))) {
}
}
-void ProtocolV2::dispatch_reset()
-{
- (void) seastar::with_gate(pending_dispatch, [this] {
- return dispatcher.ms_handle_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- }).handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} ms_handle_reset caught exception: {}", conn, eptr);
- ceph_abort("unexpected exception from ms_handle_reset()");
- });
-}
-
void ProtocolV2::reset_session(bool full)
{
server_cookie = 0;
logger().error("{} peer does not support all required features"
" required={} peer_supported={}",
conn, required_features, peer_supported_features);
- abort_in_close(*this);
+ abort_in_close(*this, false);
}
if ((supported_features & peer_required_features) != peer_required_features) {
logger().error("{} we do not support all peer required features"
" peer_required={} supported={}",
conn, peer_required_features, supported_features);
- abort_in_close(*this);
+ abort_in_close(*this, false);
}
this->peer_required_features = peer_required_features;
if (this->peer_required_features == 0) {
});
} catch (const crimson::auth::error& e) {
logger().error("{} get_initial_auth_request returned {}", conn, e);
- dispatch_reset();
- abort_in_close(*this);
+ abort_in_close(*this, true);
return seastar::now();
}
}
logger().warn("{} connection peer type does not match what peer advertises {} != {}",
conn, ceph_entity_type_name(conn.get_peer_type()),
ceph_entity_type_name(_peer_type));
- dispatch_reset();
- abort_in_close(*this);
+ abort_in_close(*this, true);
}
conn.set_ephemeral_port(_my_addr_from_peer.get_port(),
SocketConnection::side_t::connector);
// close this connection because all the necessary information is delivered
// to the exisiting connection, and jump to error handling code to abort the
// current state.
- abort_in_close(*this);
+ abort_in_close(*this, false);
return seastar::make_ready_future<next_step_t>(next_step_t::none);
}
logger().warn("{} server_connect:"
" existing connection {} is a lossy channel. Close existing in favor of"
" this connection", conn, *existing_conn);
- existing_proto->dispatch_reset();
- (void) existing_proto->close();
+ existing_proto->close(true);
if (unlikely(state != state_t::ACCEPTING)) {
logger().debug("{} triggered {} in execute_accepting()",
}).handle_exception([this] (std::exception_ptr eptr) {
logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
conn, get_state_name(state), eptr);
- (void) close();
+ close(false);
});
});
}
}).handle_exception([this] (std::exception_ptr eptr) {
logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
conn, get_state_name(state), eptr);
- (void) close();
+ close(false);
});
});
}
protocol_timer.cancel();
trigger_state(state_t::CLOSING, write_state_t::drop, false);
-#ifdef UNIT_TESTS_BUILT
- if (conn.interceptor) {
- conn.interceptor->register_conn_closed(conn);
- }
-#endif
}
} // namespace crimson::net
private:
void fault(bool backoff, const char* func_name, std::exception_ptr eptr);
- void dispatch_reset();
void reset_session(bool full);
seastar::future<entity_type_t, entity_addr_t> banner_exchange();
return protocol->is_closed();
}
+bool SocketConnection::is_closed_clean() const
+{
+ assert(seastar::engine().cpu_id() == shard_id());
+ return protocol->is_closed_clean;
+}
+
#endif
bool SocketConnection::peer_wins() const
{
seastar::future<> SocketConnection::close()
{
assert(seastar::engine().cpu_id() == shard_id());
- return protocol->close();
+ return protocol->close_clean(false);
}
bool SocketConnection::update_rx_seq(seq_num_t seq)
bool is_connected() const override;
#ifdef UNIT_TESTS_BUILT
+ bool is_closed_clean() const override;
+
bool is_closed() const override;
bool peer_wins() const override;
unsigned pending_establish = 0;
unsigned replaced_conns = 0;
for (auto& result : interceptor.results) {
- if (result.conn->is_closed()) {
+ if (result.conn->is_closed_clean()) {
if (result.state == conn_state_t::replaced) {
++replaced_conns;
}