return has_socket() && !is_socket_shutdown;
}
+seastar::shard_id
+FrameAssemblerV2::get_socket_shard_id() const
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(is_socket_valid());
+ return socket->get_shard_id();
+}
+
SocketFRef FrameAssemblerV2::move_socket()
{
assert(has_socket());
FrameAssemblerV2(FrameAssemblerV2 &&) = delete;
+ void set_shard_id(seastar::shard_id _sid) {
+ assert(seastar::this_shard_id() == sid);
+ clear();
+ sid = _sid;
+ }
+
+ seastar::shard_id get_shard_id() const {
+ return sid;
+ }
+
void set_is_rev1(bool is_rev1);
void create_session_stream_handlers(
// the socket exists and not shutdown
bool is_socket_valid() const;
+ seastar::shard_id get_socket_shard_id() const;
+
void set_socket(SocketFRef &&);
void learn_socket_ephemeral_port_as_connector(uint16_t port);
FrameAssemblerV2Ref fa;
if (new_state == state_t::READY) {
assert(new_io_state == io_state_t::open);
+ assert(io_handler.get_shard_id() ==
+ frame_assembler->get_socket_shard_id());
+ frame_assembler->set_shard_id(io_handler.get_shard_id());
fa = std::move(frame_assembler);
} else {
assert(new_io_state != io_state_t::open);
io_handler.set_io_state(new_io_state, std::move(fa), need_notify_out);
if (pre_state == state_t::READY) {
+ logger().debug("{} IOHandler::wait_io_exit_dispatching() start ...", conn);
assert(new_io_state != io_state_t::open);
gate.dispatch_in_background("exit_io", conn, [this] {
- return io_handler.wait_io_exit_dispatching(
- ).then([this](auto ret) {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this] {
+ return io_handler.wait_io_exit_dispatching();
+ }).then([this](auto ret) {
+ logger().debug("{} IOHandler::wait_io_exit_dispatching() finish, {}",
+ conn, ret.io_states);
frame_assembler = std::move(ret.frame_assembler);
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ ceph_assert_always(
+ seastar::this_shard_id() == frame_assembler->get_shard_id());
ceph_assert_always(!frame_assembler->is_socket_valid());
io_states = ret.io_states;
pr_exit_io->set_value();
}
switch (next) {
case next_step_t::ready: {
- logger().info("{} connected: gs={}, pgs={}, cs={}, "
- "client_cookie={}, server_cookie={}, {}",
- conn, global_seq, peer_global_seq, connect_seq,
- client_cookie, server_cookie, io_states);
- io_handler.dispatch_connect();
if (unlikely(state != state_t::CONNECTING)) {
- logger().debug("{} triggered {} after ms_handle_connect(), abort",
+ logger().debug("{} triggered {} before dispatch_connect(), abort",
conn, get_state_name(state));
abort_protocol();
}
- execute_ready();
- break;
+
+ logger().info("{} connected: gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+ "IOHandler::dispatch_connect()",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, io_states,
+ frame_assembler->get_socket_shard_id());
+
+ // set io_handler to a new shard
+ auto new_io_shard = frame_assembler->get_socket_shard_id();
+ ConnectionFRef conn_fref = seastar::make_foreign(
+ conn.shared_from_this());
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+ pr_switch_io_shard = seastar::shared_promise<>();
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable {
+ return io_handler.dispatch_connect(
+ new_io_shard, std::move(conn_fref));
+ }).then([this, new_io_shard] {
+ ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+ pr_switch_io_shard->set_value();
+ pr_switch_io_shard = std::nullopt;
+ // user can make changes
+
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} after dispatch_connect(), abort",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ execute_ready();
+ });
}
case next_step_t::wait: {
logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
frame_assembler->shutdown_socket<true>(&gate);
is_socket_valid = false;
execute_wait(true);
- break;
+ return seastar::now();
}
default: {
ceph_abort("impossible next step");
ceph_assert_always(is_socket_valid);
trigger_state(state_t::ESTABLISHING, io_state_t::delay);
if (existing_conn) {
+ logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+ "close existing {}",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie,
+ io_states, frame_assembler->get_socket_shard_id(),
+ *existing_conn);
ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
existing_conn->protocol.get());
existing_proto->do_close(
abort_protocol();
}
} else {
+ logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+ "no existing",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, io_states,
+ frame_assembler->get_socket_shard_id());
accept_me();
}
- io_handler.dispatch_accept();
- if (unlikely(state != state_t::ESTABLISHING)) {
- logger().debug("{} triggered {} after ms_handle_accept() during execute_establishing()",
- conn, get_state_name(state));
- abort_protocol();
- }
-
gated_execute("execute_establishing", conn, [this] {
ceph_assert_always(state == state_t::ESTABLISHING);
- return seastar::futurize_invoke([this] {
+
+ // set io_handler to a new shard
+ auto new_io_shard = frame_assembler->get_socket_shard_id();
+ logger().debug("{} IOHandler::dispatch_accept({})", conn, new_io_shard);
+ ConnectionFRef conn_fref = seastar::make_foreign(
+ conn.shared_from_this());
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+ pr_switch_io_shard = seastar::shared_promise<>();
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable {
+ return io_handler.dispatch_accept(
+ new_io_shard, std::move(conn_fref));
+ }).then([this, new_io_shard] {
+ ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+ pr_switch_io_shard->set_value();
+ pr_switch_io_shard = std::nullopt;
+ // user can make changes
+
+ if (unlikely(state != state_t::ESTABLISHING)) {
+ logger().debug("{} triggered {} after dispatch_accept() during execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
return send_server_ident();
}).then([this] {
if (unlikely(state != state_t::ESTABLISHING)) {
conn, get_state_name(state));
abort_protocol();
}
- logger().info("{} established: gs={}, pgs={}, cs={}, "
- "client_cookie={}, server_cookie={}, {}",
- conn, global_seq, peer_global_seq, connect_seq,
- client_cookie, server_cookie, io_states);
+ logger().info("{} established, going to ready", conn);
execute_ready();
}).handle_exception([this](std::exception_ptr eptr) {
fault(state_t::ESTABLISHING, "execute_establishing", eptr);
ceph_assert_always(state <= state_t::WAIT);
ceph_assert_always(has_socket || state == state_t::CONNECTING);
ceph_assert_always(!mover.socket->is_shutdown());
+
+ logger().info("{} start replacing ({}): pgs was {}, cs was {}, "
+ "client_cookie was {}, {}, new_sid={}",
+ conn, reconnect ? "reconnected" : "connected",
+ peer_global_seq, connect_seq, client_cookie,
+ io_states, mover.socket->get_shard_id());
trigger_state(state_t::REPLACING, io_state_t::delay);
if (is_socket_valid) {
frame_assembler->shutdown_socket<true>(&gate);
new_peer_global_seq,
new_connect_seq, new_msg_seq] () mutable {
ceph_assert_always(state == state_t::REPLACING);
- io_handler.dispatch_accept();
- // state may become CLOSING, close mover.socket and abort later
+ auto new_io_shard = mover.socket->get_shard_id();
+ // state may become CLOSING below, but we cannot abort the chain until
+ // mover.socket is correctly handled (closed or replaced).
+
return wait_exit_io(
).then([this] {
+ if (unlikely(state != state_t::REPLACING)) {
+ ceph_assert_always(state == state_t::CLOSING);
+ return seastar::now();
+ }
+
ceph_assert_always(frame_assembler);
protocol_timer.cancel();
auto done = std::move(execution_done);
execution_done = seastar::now();
return done;
+ }).then([this, new_io_shard] {
+ if (unlikely(state != state_t::REPLACING)) {
+ ceph_assert_always(state == state_t::CLOSING);
+ return seastar::now();
+ }
+
+ // set io_handler to a new shard
+ // we should prevent parallel switching core attemps
+ logger().debug("{} IOHandler::dispatch_accept({})", conn, new_io_shard);
+ ConnectionFRef conn_fref = seastar::make_foreign(
+ conn.shared_from_this());
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+ pr_switch_io_shard = seastar::shared_promise<>();
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable {
+ return io_handler.dispatch_accept(
+ new_io_shard, std::move(conn_fref));
+ }).then([this, new_io_shard] {
+ ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+ pr_switch_io_shard->set_value();
+ pr_switch_io_shard = std::nullopt;
+ // user can make changes
+ });
}).then([this,
reconnect,
do_reset,
FrameAssemblerV2Ref frame_assembler;
+ std::optional<seastar::shared_promise<>> pr_switch_io_shard;
+
std::optional<seastar::shared_promise<>> pr_exit_io;
AuthConnectionMetaRef auth_meta;
IOHandler::~IOHandler()
{
// close_io() must be finished
- ceph_assert_always(shard_states->assert_closed_and_exit());
+ ceph_assert_always(maybe_prv_shard_states == nullptr);
+ // should be true in the according shard
+ // ceph_assert_always(shard_states->assert_closed_and_exit());
assert(!conn_ref);
}
<< ")";
}
+void IOHandler::assign_frame_assembler(FrameAssemblerV2Ref fa)
+{
+ assert(fa != nullptr);
+ ceph_assert_always(frame_assembler == nullptr);
+ frame_assembler = std::move(fa);
+ ceph_assert_always(
+ frame_assembler->get_shard_id() == get_shard_id());
+ // should have been set through dispatch_accept/connect()
+ ceph_assert_always(
+ frame_assembler->get_socket_shard_id() == get_shard_id());
+ ceph_assert_always(frame_assembler->is_socket_valid());
+}
+
void IOHandler::set_io_state(
io_state_t new_state,
FrameAssemblerV2Ref fa,
{
ceph_assert_always(seastar::this_shard_id() == get_shard_id());
auto prv_state = get_io_state();
+ logger().debug("{} got set_io_state(): prv_state={}, new_state={}, "
+ "fa={}, set_notify_out={}, at {}",
+ conn, prv_state, new_state,
+ fa ? "present" : "N/A", set_notify_out,
+ io_stat_printer{*this});
ceph_assert_always(!(
(new_state == io_state_t::none && prv_state != io_state_t::none) ||
- (new_state == io_state_t::open && prv_state == io_state_t::open) ||
- (new_state != io_state_t::drop && prv_state == io_state_t::drop)
+ (new_state == io_state_t::open && prv_state == io_state_t::open)
));
+ if (prv_state == io_state_t::drop) {
+ // only possible due to a racing mark_down() from user
+ if (new_state == io_state_t::open) {
+ assign_frame_assembler(std::move(fa));
+ frame_assembler->shutdown_socket<false>(nullptr);
+ } else {
+ assert(fa == nullptr);
+ }
+ return;
+ }
+
bool dispatch_in = false;
if (new_state == io_state_t::open) {
// to open
ceph_assert_always(protocol_is_connected == true);
- assert(fa != nullptr);
- ceph_assert_always(frame_assembler == nullptr);
- frame_assembler = std::move(fa);
- ceph_assert_always(frame_assembler->is_socket_valid());
+ assign_frame_assembler(std::move(fa));
dispatch_in = true;
#ifdef UNIT_TESTS_BUILT
if (conn.interceptor) {
seastar::future<IOHandler::exit_dispatching_ret>
IOHandler::wait_io_exit_dispatching()
{
+ assert(seastar::this_shard_id() == get_shard_id());
+ logger().debug("{} got wait_io_exit_dispatching()", conn);
ceph_assert_always(get_io_state() != io_state_t::open);
ceph_assert_always(frame_assembler != nullptr);
ceph_assert_always(!frame_assembler->is_socket_valid());
- return shard_states->wait_io_exit_dispatching(
- ).then([this] {
+ return seastar::futurize_invoke([this] {
+ // cannot be running in parallel with to_new_sid()
+ if (maybe_dropped_sid.has_value()) {
+ ceph_assert_always(get_io_state() == io_state_t::drop);
+ assert(shard_states->assert_closed_and_exit());
+ auto prv_sid = *maybe_dropped_sid;
+ return seastar::smp::submit_to(prv_sid, [this] {
+ logger().debug("{} got wait_io_exit_dispatching from prv_sid", conn);
+ assert(maybe_prv_shard_states != nullptr);
+ return maybe_prv_shard_states->wait_io_exit_dispatching();
+ });
+ } else {
+ return shard_states->wait_io_exit_dispatching();
+ }
+ }).then([this] {
+ logger().debug("{} finish wait_io_exit_dispatching at {}",
+ conn, io_stat_printer{*this});
ceph_assert_always(frame_assembler != nullptr);
ceph_assert_always(!frame_assembler->is_socket_valid());
+ frame_assembler->set_shard_id(conn.get_messenger_shard_id());
return exit_dispatching_ret{
std::move(frame_assembler),
get_states()};
out_sent_msgs.clear();
}
-void IOHandler::dispatch_accept()
+seastar::future<>
+IOHandler::dispatch_accept(
+ seastar::shard_id new_sid,
+ ConnectionFRef conn_fref)
{
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ logger().debug("{} got dispatch_accept({}) at {}",
+ conn, new_sid, io_stat_printer{*this});
if (get_io_state() == io_state_t::drop) {
- return;
+ assert(!protocol_is_connected);
+ // it is possible that both io_handler and protocolv2 are
+ // trying to close each other from different cores simultaneously.
+ return to_new_sid(new_sid, std::move(conn_fref));
}
// protocol_is_connected can be from true to true here if the replacing is
// happening to a connected connection.
protocol_is_connected = true;
ceph_assert_always(conn_ref);
- dispatchers.ms_handle_accept(conn_ref, get_shard_id());
+ auto _conn_ref = conn_ref;
+ auto fut = to_new_sid(new_sid, std::move(conn_fref));
+ dispatchers.ms_handle_accept(_conn_ref, new_sid);
+ return fut;
}
-void IOHandler::dispatch_connect()
+seastar::future<>
+IOHandler::dispatch_connect(
+ seastar::shard_id new_sid,
+ ConnectionFRef conn_fref)
{
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ logger().debug("{} got dispatch_connect({}) at {}",
+ conn, new_sid, io_stat_printer{*this});
if (get_io_state() == io_state_t::drop) {
- return;
+ assert(!protocol_is_connected);
+ // it is possible that both io_handler and protocolv2 are
+ // trying to close each other from different cores simultaneously.
+ return to_new_sid(new_sid, std::move(conn_fref));
}
ceph_assert_always(protocol_is_connected == false);
protocol_is_connected = true;
ceph_assert_always(conn_ref);
- dispatchers.ms_handle_connect(conn_ref, get_shard_id());
+ auto _conn_ref = conn_ref;
+ auto fut = to_new_sid(new_sid, std::move(conn_fref));
+ dispatchers.ms_handle_connect(_conn_ref, new_sid);
+ return fut;
+}
+
+seastar::future<>
+IOHandler::cleanup_prv_shard(seastar::shard_id prv_sid)
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ return seastar::smp::submit_to(prv_sid, [this] {
+ logger().debug("{} got cleanup_prv_shard()", conn);
+ assert(maybe_prv_shard_states != nullptr);
+ auto ref_prv_states = std::move(maybe_prv_shard_states);
+ auto &prv_states = *ref_prv_states;
+ return prv_states.close(
+ ).then([ref_prv_states=std::move(ref_prv_states)] {
+ ceph_assert_always(ref_prv_states->assert_closed_and_exit());
+ });
+ }).then([this] {
+ ceph_assert_always(maybe_prv_shard_states == nullptr);
+ });
+}
+
+seastar::future<>
+IOHandler::to_new_sid(
+ seastar::shard_id new_sid,
+ ConnectionFRef conn_fref)
+{
+ /*
+ * Note:
+ * - It must be called before user is aware of the new core (through dispatching);
+ * - Messenger must wait the returned future for futher operations to prevent racing;
+ * - In general, the below submitted continuation should be the first one from the prv sid
+ * to the new sid;
+ */
+
+ assert(seastar::this_shard_id() == get_shard_id());
+ bool is_dropped = false;
+ if (get_io_state() == io_state_t::drop) {
+ is_dropped = true;
+ }
+ ceph_assert_always(get_io_state() != io_state_t::open);
+
+ // apply the switching atomically
+ ceph_assert_always(conn_ref);
+ conn_ref.reset();
+ auto prv_sid = get_shard_id();
+ ceph_assert_always(maybe_prv_shard_states == nullptr);
+ maybe_prv_shard_states = std::move(shard_states);
+ shard_states = shard_states_t::create_from_previous(
+ *maybe_prv_shard_states, new_sid);
+ assert(new_sid == get_shard_id());
+
+ return seastar::smp::submit_to(new_sid,
+ [this, is_dropped, prv_sid, conn_fref=std::move(conn_fref)]() mutable {
+ logger().debug("{} see new_sid in io_handler(new_sid) from {}, is_dropped={}",
+ conn, prv_sid, is_dropped);
+
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ ceph_assert_always(get_io_state() != io_state_t::open);
+ ceph_assert_always(!maybe_dropped_sid.has_value());
+
+ ceph_assert_always(!conn_ref);
+ conn_ref = make_local_shared_foreign(std::move(conn_fref));
+
+ if (is_dropped) {
+ // the follow up cleanups will be done in the prv_sid
+ ceph_assert_always(shard_states->assert_closed_and_exit());
+ maybe_dropped_sid = prv_sid;
+ } else {
+ // may be at io_state_t::drop
+ // cleanup the prvious shard
+ shard_states->dispatch_in_background(
+ "cleanup_prv_sid", conn, [this, prv_sid] {
+ return cleanup_prv_shard(prv_sid);
+ });
+ maybe_notify_out_dispatch();
+ }
+ });
}
void IOHandler::dispatch_reset(bool is_replace)
seastar::future<>
IOHandler::close_io(bool is_dispatch_reset, bool is_replace)
{
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
ceph_assert_always(get_io_state() == io_state_t::drop);
+ logger().debug("{} got close_io(reset={}, replace={})",
+ conn, is_dispatch_reset, is_replace);
+
if (is_dispatch_reset) {
dispatch_reset(is_replace);
}
ceph_assert_always(conn_ref);
conn_ref.reset();
- return shard_states->close(
- ).then([this] {
+ // cannot be running in parallel with to_new_sid()
+ if (maybe_dropped_sid.has_value()) {
assert(shard_states->assert_closed_and_exit());
- });
+ auto prv_sid = *maybe_dropped_sid;
+ return cleanup_prv_shard(prv_sid);
+ } else {
+ return shard_states->close(
+ ).then([this] {
+ assert(shard_states->assert_closed_and_exit());
+ });
+ }
}
/*
/*
* as ConnectionHandler
*/
-private:
+public:
seastar::shard_id get_shard_id() const final {
return shard_states->get_shard_id();
}
void requeue_out_sent();
- void dispatch_accept();
+ seastar::future<> dispatch_accept(
+ seastar::shard_id new_sid, ConnectionFRef);
- void dispatch_connect();
+ seastar::future<> dispatch_connect(
+ seastar::shard_id new_sid, ConnectionFRef);
private:
class shard_states_t;
return shard_states->get_io_state();
}
+ void assign_frame_assembler(FrameAssemblerV2Ref);
+
seastar::future<> send_redirected(MessageFRef msg);
seastar::future<> do_send(MessageFRef msg);
seastar::future<> do_send_keepalive();
+ seastar::future<> to_new_sid(
+ seastar::shard_id new_sid, ConnectionFRef);
+
void dispatch_reset(bool is_replace);
void dispatch_remote_reset();
void do_in_dispatch();
+ seastar::future<> cleanup_prv_shard(seastar::shard_id prv_sid);
+
private:
shard_states_ref_t shard_states;
+ // drop was happening in the previous sid
+ std::optional<seastar::shard_id> maybe_dropped_sid;
+
+ // the remaining states in the previous sid for cleanup, see to_new_sid()
+ shard_states_ref_t maybe_prv_shard_states;
+
ChainedDispatchers &dispatchers;
SocketConnection &conn;