} else {
assert(new_io_state != io_state_t::open);
}
- io_handler.set_io_state(new_io_state, std::move(fa), need_notify_out);
+ logger().debug("{} IOHandler::set_io_state(): new_state={}, new_io_state={}, "
+ "fa={}, set_notify_out={}",
+ conn, get_state_name(new_state), new_io_state,
+ fa ? fmt::format("(sid={})", fa->get_shard_id()) : "N/A",
+ need_notify_out);
+ gate.dispatch_in_background(
+ "set_io_state", conn,
+ [this, new_io_state, fa=std::move(fa)]() mutable {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, new_io_state, fa=std::move(fa), set_notify_out=need_notify_out]() mutable {
+ io_handler.set_io_state(new_io_state, std::move(fa), set_notify_out);
+ });
+ });
if (need_exit_io) {
// from READY
client_cookie = generate_client_cookie();
peer_global_seq = 0;
}
+
+ logger().debug("{} IOHandler::reset_session({})", conn, full);
io_states.reset_session(full);
- io_handler.reset_session(full);
+ gate.dispatch_in_background(
+ "reset_session", conn, [this, full] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, full] {
+ io_handler.reset_session(full);
+ });
+ });
+ // user can make changes
}
seastar::future<std::tuple<entity_type_t, entity_addr_t>>
case Tag::SERVER_IDENT:
return frame_assembler->read_frame_payload(
).then([this](auto payload) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} at receiving SERVER_IDENT",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
// handle_server_ident() logic
+ logger().debug("{} IOHandler::requeue_out_sent()", conn);
io_states.requeue_out_sent();
- io_handler.requeue_out_sent();
+ gate.dispatch_in_background(
+ "requeue_out_sent", conn, [this] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this] {
+ io_handler.requeue_out_sent();
+ });
+ });
+
auto server_ident = ServerIdentFrame::Decode(payload->back());
logger().debug("{} GOT ServerIdentFrame:"
" addrs={}, gid={}, gs={},"
case Tag::SESSION_RECONNECT_OK:
return frame_assembler->read_frame_payload(
).then([this](auto payload) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} at receiving RECONNECT_OK",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
// handle_reconnect_ok() logic
auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
- logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
+ logger().debug("{} GOT ReconnectOkFrame: msg_seq={}, "
+ "IOHandler::requeue_out_sent_up_to()",
conn, reconnect_ok.msg_seq());
+
io_states.requeue_out_sent_up_to();
- io_handler.requeue_out_sent_up_to(reconnect_ok.msg_seq());
+ auto msg_seq = reconnect_ok.msg_seq();
+ gate.dispatch_in_background(
+ "requeue_out_reconnecting", conn, [this, msg_seq] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, msg_seq] {
+ io_handler.requeue_out_sent_up_to(msg_seq);
+ });
+ });
+
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
});
default: {
// refered to async-conn v2: not assign gs to global_seq
global_seq = messenger.get_global_seq();
- logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
+ logger().debug("{} UPDATE: gs={} for server ident, "
+ "IOHandler::reset_peer_state()",
+ conn, global_seq);
// this is required for the case when this connection is being replaced
io_states.reset_peer_state();
- io_handler.reset_peer_state();
+ gate.dispatch_in_background(
+ "reset_peer_state", conn, [this] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this] {
+ io_handler.reset_peer_state();
+ });
+ });
if (!conn.policy.lossy) {
server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
if (reconnect) {
connect_seq = new_connect_seq;
// send_reconnect_ok() logic
+
+ logger().debug("{} IOHandler::requeue_out_sent_up_to({})", conn, new_msg_seq);
io_states.requeue_out_sent_up_to();
- io_handler.requeue_out_sent_up_to(new_msg_seq);
+ gate.dispatch_in_background(
+ "requeue_out_replacing", conn, [this, new_msg_seq] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, new_msg_seq] {
+ io_handler.requeue_out_sent_up_to(new_msg_seq);
+ });
+ });
+
auto reconnect_ok = ReconnectOkFrame::Encode(io_states.in_seq);
logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_states.in_seq);
return frame_assembler->write_flush_frame(reconnect_ok);
return;
}
- logger().info("{} mark_down() with {}",
+ logger().info("{} mark_down() at {}, send notify_mark_down()",
conn, io_stat_printer{*this});
set_io_state(io_state_t::drop);
- handshake_listener->notify_mark_down();
+ shard_states->dispatch_in_background(
+ "notify_mark_down", conn, [this] {
+ return seastar::smp::submit_to(
+ conn.get_messenger_shard_id(), [this] {
+ handshake_listener->notify_mark_down();
+ });
+ });
}
void IOHandler::print_io_stat(std::ostream &out) const
}
if (io_state == io_state_t::open) {
- logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
- conn, io_state, e.what());
+ logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, "
+ "send notify_out_fault()",
+ conn, io_state, io_stat_printer{*this}, e.what());
std::exception_ptr eptr;
try {
throw e;
eptr = std::current_exception();
}
set_io_state(io_state_t::delay);
- auto states = get_states();
- handshake_listener->notify_out_fault(
- "do_out_dispatch", eptr, states);
+ shard_states->dispatch_in_background(
+ "notify_out_fault(out)", conn, [this, eptr] {
+ auto states = get_states();
+ return seastar::smp::submit_to(
+ conn.get_messenger_shard_id(), [this, eptr, states] {
+ handshake_listener->notify_out_fault(
+ "do_out_dispatch", eptr, states);
+ });
+ });
} else {
if (io_state != io_state_t::switched) {
logger().info("{} do_out_dispatch(): fault at {}, {} -- {}",
{
assert(is_out_queued());
if (need_notify_out) {
- handshake_listener->notify_out();
+ logger().debug("{} send notify_out()", conn);
+ shard_states->dispatch_in_background(
+ "notify_out", conn, [this] {
+ return seastar::smp::submit_to(
+ conn.get_messenger_shard_id(), [this] {
+ handshake_listener->notify_out();
+ });
+ });
}
if (shard_states->try_enter_out_dispatching()) {
shard_states->dispatch_in_background(
auto io_state = ctx.get_io_state();
if (io_state == io_state_t::open) {
- logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
- conn, io_state, e_what);
+ logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, "
+ "send notify_out_fault()",
+ conn, io_state, io_stat_printer{*this}, e_what);
set_io_state(io_state_t::delay);
- auto states = get_states();
- handshake_listener->notify_out_fault(
- "do_in_dispatch", eptr, states);
+ shard_states->dispatch_in_background(
+ "notify_out_fault(in)", conn, [this, eptr] {
+ auto states = get_states();
+ return seastar::smp::submit_to(
+ conn.get_messenger_shard_id(), [this, eptr, states] {
+ handshake_listener->notify_out_fault(
+ "do_in_dispatch", eptr, states);
+ });
+ });
} else {
if (io_state != io_state_t::switched) {
logger().info("{} do_in_dispatch(): fault at {}, {} -- {}",