bool FrameAssemblerV2::is_socket_valid() const
{
assert(seastar::this_shard_id() == sid);
- return has_socket() && !socket->is_shutdown();
+#ifndef NDEBUG
+ if (has_socket() && socket->get_shard_id() == sid) {
+ assert(socket->is_shutdown() == is_socket_shutdown);
+ }
+#endif
+ return has_socket() && !is_socket_shutdown;
}
SocketFRef FrameAssemblerV2::move_socket()
assert(new_socket);
socket = std::move(new_socket);
conn.set_socket(socket.get());
+ is_socket_shutdown = false;
assert(is_socket_valid());
}
socket->learn_ephemeral_port_as_connector(port);
}
-void FrameAssemblerV2::shutdown_socket()
+template <bool may_cross_core>
+void FrameAssemblerV2::shutdown_socket(crimson::common::Gated *gate)
{
assert(seastar::this_shard_id() == sid);
assert(is_socket_valid());
- socket->shutdown();
+ is_socket_shutdown = true;
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ assert(gate);
+ gate->dispatch_in_background("shutdown_socket", conn, [this] {
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this] {
+ socket->shutdown();
+ });
+ });
+ } else {
+ assert(socket->get_shard_id() == sid);
+ assert(!gate);
+ socket->shutdown();
+ }
}
+template void FrameAssemblerV2::shutdown_socket<true>(crimson::common::Gated *);
+template void FrameAssemblerV2::shutdown_socket<false>(crimson::common::Gated *);
seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketFRef &&new_socket)
{
assert(seastar::this_shard_id() == sid);
assert(has_socket());
- assert(socket->is_shutdown());
+ assert(!is_socket_valid());
auto old_socket = move_socket();
+ auto old_socket_shard_id = old_socket->get_shard_id();
set_socket(std::move(new_socket));
- return old_socket->close(
- ).then([sock = std::move(old_socket)] {});
+ return seastar::smp::submit_to(
+ old_socket_shard_id,
+ [old_socket = std::move(old_socket)]() mutable {
+ return old_socket->close(
+ ).then([sock = std::move(old_socket)] {});
+ });
}
seastar::future<> FrameAssemblerV2::close_shutdown_socket()
{
assert(seastar::this_shard_id() == sid);
assert(has_socket());
- assert(socket->is_shutdown());
- return socket->close();
+ assert(!is_socket_valid());
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this] {
+ return socket->close();
+ });
}
seastar::future<ceph::bufferptr>
#include "msg/async/crypto_onwire.h"
#include "msg/async/compression_onwire.h"
+#include "crimson/common/gated.h"
#include "crimson/net/Socket.h"
namespace crimson::net {
void learn_socket_ephemeral_port_as_connector(uint16_t port);
- void shutdown_socket();
+ // if may_cross_core == true, gate is required for cross-core shutdown
+ template <bool may_cross_core>
+ void shutdown_socket(crimson::common::Gated *gate);
seastar::future<> replace_shutdown_socket(SocketFRef &&);
SocketFRef socket;
+ // checking Socket::is_shutdown() synchronously is impossible when sid is
+ // different from the socket sid.
+ bool is_socket_shutdown = false;
+
seastar::shard_id sid;
/*
return io_handler.wait_io_exit_dispatching(
).then([this](FrameAssemblerV2Ref fa) {
frame_assembler = std::move(fa);
+ ceph_assert_always(!frame_assembler->is_socket_valid());
exit_io->set_value();
exit_io = std::nullopt;
});
if (likely(has_socket)) {
if (likely(is_socket_valid)) {
ceph_assert_always(state != state_t::READY);
- frame_assembler->shutdown_socket();
+ frame_assembler->shutdown_socket<true>(&gate);
is_socket_valid = false;
} else {
ceph_assert_always(state != state_t::ESTABLISHING);
case next_step_t::wait: {
logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
ceph_assert_always(is_socket_valid);
- frame_assembler->shutdown_socket();
+ frame_assembler->shutdown_socket<true>(&gate);
is_socket_valid = false;
execute_wait(true);
break;
ceph_assert_always(!mover.socket->is_shutdown());
trigger_state(state_t::REPLACING, io_state_t::delay, false);
if (is_socket_valid) {
- frame_assembler->shutdown_socket();
+ frame_assembler->shutdown_socket<true>(&gate);
is_socket_valid = false;
}
gate.dispatch_in_background(
(*f_accept_new)();
}
if (is_socket_valid) {
- frame_assembler->shutdown_socket();
+ frame_assembler->shutdown_socket<true>(&gate);
is_socket_valid = false;
}
assert(!gate.is_closed());