} else {
assert(new_io_state != io_state_t::open);
}
- logger().debug("{} IOHandler::set_io_state(): new_state={}, new_io_state={}, "
+
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::set_io_state(): new_state={}, new_io_state={}, "
"fa={}, set_notify_out={}",
- conn, get_state_name(new_state), new_io_state,
+ conn, cc_seq, get_state_name(new_state), new_io_state,
fa ? fmt::format("(sid={})", fa->get_shard_id()) : "N/A",
need_notify_out);
gate.dispatch_in_background(
"set_io_state", conn,
- [this, new_io_state, fa=std::move(fa)]() mutable {
+ [this, cc_seq, new_io_state, fa=std::move(fa)]() mutable {
return seastar::smp::submit_to(
io_handler.get_shard_id(),
- [this, new_io_state, fa=std::move(fa), set_notify_out=need_notify_out]() mutable {
- io_handler.set_io_state(new_io_state, std::move(fa), set_notify_out);
+ [this, cc_seq, new_io_state,
+ fa=std::move(fa), set_notify_out=need_notify_out]() mutable {
+ return io_handler.set_io_state(
+ cc_seq, new_io_state, std::move(fa), set_notify_out);
});
});
if (need_exit_io) {
// from READY
- logger().debug("{} IOHandler::wait_io_exit_dispatching() start ...", conn);
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::wait_io_exit_dispatching() ...",
+ conn, cc_seq);
assert(pr_exit_io.has_value());
assert(new_io_state != io_state_t::open);
need_exit_io = false;
- gate.dispatch_in_background("exit_io", conn, [this] {
+ gate.dispatch_in_background("exit_io", conn, [this, cc_seq] {
return seastar::smp::submit_to(
- io_handler.get_shard_id(), [this] {
- return io_handler.wait_io_exit_dispatching();
- }).then([this](auto ret) {
- logger().debug("{} IOHandler::wait_io_exit_dispatching() finish, {}",
- conn, ret.io_states);
+ io_handler.get_shard_id(), [this, cc_seq] {
+ return io_handler.wait_io_exit_dispatching(cc_seq);
+ }).then([this, cc_seq](auto ret) {
+ logger().debug("{} finish {} IOHandler::wait_io_exit_dispatching(), {}",
+ conn, cc_seq, ret.io_states);
frame_assembler = std::move(ret.frame_assembler);
assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
ceph_assert_always(
peer_global_seq = 0;
}
- logger().debug("{} IOHandler::reset_session({})", conn, full);
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::reset_session({})",
+ conn, cc_seq, full);
io_states.reset_session(full);
gate.dispatch_in_background(
- "reset_session", conn, [this, full] {
+ "reset_session", conn, [this, cc_seq, full] {
return seastar::smp::submit_to(
- io_handler.get_shard_id(), [this, full] {
- io_handler.reset_session(full);
+ io_handler.get_shard_id(), [this, cc_seq, full] {
+ return io_handler.reset_session(cc_seq, full);
});
});
// user can make changes
}
// handle_server_ident() logic
- logger().debug("{} IOHandler::requeue_out_sent()", conn);
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::requeue_out_sent()",
+ conn, cc_seq);
io_states.requeue_out_sent();
gate.dispatch_in_background(
- "requeue_out_sent", conn, [this] {
+ "requeue_out_sent", conn, [this, cc_seq] {
return seastar::smp::submit_to(
- io_handler.get_shard_id(), [this] {
- io_handler.requeue_out_sent();
+ io_handler.get_shard_id(), [this, cc_seq] {
+ return io_handler.requeue_out_sent(cc_seq);
});
});
// handle_reconnect_ok() logic
auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
+ auto cc_seq = crosscore.prepare_submit();
logger().debug("{} GOT ReconnectOkFrame: msg_seq={}, "
- "IOHandler::requeue_out_sent_up_to()",
- conn, reconnect_ok.msg_seq());
+ "send {} IOHandler::requeue_out_sent_up_to()",
+ conn, reconnect_ok.msg_seq(), cc_seq);
io_states.requeue_out_sent_up_to();
auto msg_seq = reconnect_ok.msg_seq();
gate.dispatch_in_background(
- "requeue_out_reconnecting", conn, [this, msg_seq] {
+ "requeue_out_reconnecting", conn, [this, cc_seq, msg_seq] {
return seastar::smp::submit_to(
- io_handler.get_shard_id(), [this, msg_seq] {
- io_handler.requeue_out_sent_up_to(msg_seq);
+ io_handler.get_shard_id(), [this, cc_seq, msg_seq] {
+ return io_handler.requeue_out_sent_up_to(cc_seq, msg_seq);
});
});
abort_protocol();
}
+ auto cc_seq = crosscore.prepare_submit();
logger().info("{} connected: gs={}, pgs={}, cs={}, "
"client_cookie={}, server_cookie={}, {}, new_sid={}, "
- "IOHandler::dispatch_connect()",
+ "send {} IOHandler::dispatch_connect()",
conn, global_seq, peer_global_seq, connect_seq,
client_cookie, server_cookie, io_states,
- frame_assembler->get_socket_shard_id());
+ frame_assembler->get_socket_shard_id(), cc_seq);
// set io_handler to a new shard
auto new_io_shard = frame_assembler->get_socket_shard_id();
pr_switch_io_shard = seastar::shared_promise<>();
return seastar::smp::submit_to(
io_handler.get_shard_id(),
- [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable {
+ [this, cc_seq, new_io_shard,
+ conn_fref=std::move(conn_fref)]() mutable {
return io_handler.dispatch_connect(
- new_io_shard, std::move(conn_fref));
+ cc_seq, new_io_shard, std::move(conn_fref));
}).then([this, new_io_shard] {
ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
pr_switch_io_shard->set_value();
ceph_assert_always(state == state_t::ESTABLISHING);
// set io_handler to a new shard
+ auto cc_seq = crosscore.prepare_submit();
auto new_io_shard = frame_assembler->get_socket_shard_id();
- logger().debug("{} IOHandler::dispatch_accept({})", conn, new_io_shard);
+ logger().debug("{} send {} IOHandler::dispatch_accept({})",
+ conn, cc_seq, new_io_shard);
ConnectionFRef conn_fref = seastar::make_foreign(
conn.shared_from_this());
ceph_assert_always(!pr_switch_io_shard.has_value());
pr_switch_io_shard = seastar::shared_promise<>();
return seastar::smp::submit_to(
io_handler.get_shard_id(),
- [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable {
+ [this, cc_seq, new_io_shard,
+ conn_fref=std::move(conn_fref)]() mutable {
return io_handler.dispatch_accept(
- new_io_shard, std::move(conn_fref));
+ cc_seq, new_io_shard, std::move(conn_fref));
}).then([this, new_io_shard] {
ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
pr_switch_io_shard->set_value();
// refered to async-conn v2: not assign gs to global_seq
global_seq = messenger.get_global_seq();
+ auto cc_seq = crosscore.prepare_submit();
logger().debug("{} UPDATE: gs={} for server ident, "
- "IOHandler::reset_peer_state()",
- conn, global_seq);
+ "send {} IOHandler::reset_peer_state()",
+ conn, global_seq, cc_seq);
// this is required for the case when this connection is being replaced
io_states.reset_peer_state();
gate.dispatch_in_background(
- "reset_peer_state", conn, [this] {
+ "reset_peer_state", conn, [this, cc_seq] {
return seastar::smp::submit_to(
- io_handler.get_shard_id(), [this] {
- io_handler.reset_peer_state();
+ io_handler.get_shard_id(), [this, cc_seq] {
+ return io_handler.reset_peer_state(cc_seq);
});
});
// set io_handler to a new shard
// we should prevent parallel switching core attemps
- logger().debug("{} IOHandler::dispatch_accept({})", conn, new_io_shard);
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::dispatch_accept({})",
+ conn, cc_seq, new_io_shard);
ConnectionFRef conn_fref = seastar::make_foreign(
conn.shared_from_this());
ceph_assert_always(!pr_switch_io_shard.has_value());
pr_switch_io_shard = seastar::shared_promise<>();
return seastar::smp::submit_to(
io_handler.get_shard_id(),
- [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable {
+ [this, cc_seq, new_io_shard,
+ conn_fref=std::move(conn_fref)]() mutable {
return io_handler.dispatch_accept(
- new_io_shard, std::move(conn_fref));
+ cc_seq, new_io_shard, std::move(conn_fref));
}).then([this, new_io_shard] {
ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
pr_switch_io_shard->set_value();
connect_seq = new_connect_seq;
// send_reconnect_ok() logic
- logger().debug("{} IOHandler::requeue_out_sent_up_to({})", conn, new_msg_seq);
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::requeue_out_sent_up_to({})",
+ conn, cc_seq, new_msg_seq);
io_states.requeue_out_sent_up_to();
gate.dispatch_in_background(
- "requeue_out_replacing", conn, [this, new_msg_seq] {
+ "requeue_out_replacing", conn, [this, cc_seq, new_msg_seq] {
return seastar::smp::submit_to(
- io_handler.get_shard_id(), [this, new_msg_seq] {
- io_handler.requeue_out_sent_up_to(new_msg_seq);
+ io_handler.get_shard_id(), [this, cc_seq, new_msg_seq] {
+ return io_handler.requeue_out_sent_up_to(cc_seq, new_msg_seq);
});
});
// READY state
-void ProtocolV2::notify_out_fault(
+seastar::future<> ProtocolV2::notify_out_fault(
+ crosscore_t::seq_t cc_seq,
const char *where,
std::exception_ptr eptr,
io_handler_state _io_states)
{
assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} notify_out_fault(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, where, eptr, _io_states] {
+ return notify_out_fault(cc_seq, where, eptr, _io_states);
+ });
+ }
+
io_states = _io_states;
- logger().debug("{} got notify_out_fault(): io_states={}", conn, io_states);
+ logger().debug("{} got {} notify_out_fault(): io_states={}",
+ conn, cc_seq, io_states);
fault(state_t::READY, where, eptr);
+ return seastar::now();
}
void ProtocolV2::execute_ready()
trigger_state(state_t::STANDBY, io_state_t::delay);
}
-void ProtocolV2::notify_out()
+seastar::future<> ProtocolV2::notify_out(
+ crosscore_t::seq_t cc_seq)
{
assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
- logger().debug("{} got notify_out(): at {}", conn, get_state_name(state));
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} notify_out(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq] {
+ return notify_out(cc_seq);
+ });
+ }
+
+ logger().debug("{} got {} notify_out(): at {}",
+ conn, cc_seq, get_state_name(state));
io_states.is_out_queued = true;
if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
logger().info("{} notify_out(): at {}, going to CONNECTING",
conn, get_state_name(state));
execute_connecting();
}
+ return seastar::now();
}
// WAIT state
// CLOSING state
-void ProtocolV2::notify_mark_down()
+seastar::future<> ProtocolV2::notify_mark_down(
+ crosscore_t::seq_t cc_seq)
{
assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
- logger().debug("{} got notify_mark_down()", conn);
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} notify_mark_down(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq] {
+ return notify_mark_down(cc_seq);
+ });
+ }
+
+ logger().debug("{} got {} notify_mark_down()",
+ conn, cc_seq);
do_close(false);
+ return seastar::now();
}
seastar::future<> ProtocolV2::close_clean_yielded()
return wait_switch_io_shard(
).then([this, is_dispatch_reset, is_replace] {
trigger_state_phase2(state_t::CLOSING, io_state_t::drop);
- logger().debug("{} IOHandler::close_io(reset={}, replace={})",
- conn, is_dispatch_reset, is_replace);
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::close_io(reset={}, replace={})",
+ conn, cc_seq, is_dispatch_reset, is_replace);
std::ignore = gate.close(
).then([this] {
return seastar::smp::submit_to(
io_handler.get_shard_id(),
- [this, is_dispatch_reset, is_replace] {
- return io_handler.close_io(is_dispatch_reset, is_replace);
+ [this, cc_seq, is_dispatch_reset, is_replace] {
+ return io_handler.close_io(cc_seq, is_dispatch_reset, is_replace);
});
// user can make changes
});
* as HandshakeListener
*/
private:
- void notify_out() final;
+ seastar::future<> notify_out(
+ crosscore_t::seq_t cc_seq) final;
- void notify_out_fault(const char *where, std::exception_ptr, io_handler_state) final;
+ seastar::future<> notify_out_fault(
+ crosscore_t::seq_t cc_seq,
+ const char *where,
+ std::exception_ptr,
+ io_handler_state) final;
- void notify_mark_down() final;
+ seastar::future<> notify_mark_down(
+ crosscore_t::seq_t cc_seq) final;
/*
* as ProtocolV2 to be called by SocketConnection
// asynchronously populated from io_handler
io_handler_state io_states;
+ crosscore_t crosscore;
+
bool has_socket = false;
// the socket exists and it is not shutdown
return;
}
- logger().info("{} mark_down() at {}, send notify_mark_down()",
- conn, io_stat_printer{*this});
- set_io_state(io_state_t::drop);
+ auto cc_seq = crosscore.prepare_submit();
+ logger().info("{} mark_down() at {}, send {} notify_mark_down()",
+ conn, io_stat_printer{*this}, cc_seq);
+ do_set_io_state(io_state_t::drop);
shard_states->dispatch_in_background(
- "notify_mark_down", conn, [this] {
+ "notify_mark_down", conn, [this, cc_seq] {
return seastar::smp::submit_to(
- conn.get_messenger_shard_id(), [this] {
- handshake_listener->notify_mark_down();
+ conn.get_messenger_shard_id(), [this, cc_seq] {
+ return handshake_listener->notify_mark_down(cc_seq);
});
});
}
ceph_assert_always(frame_assembler->is_socket_valid());
}
-void IOHandler::set_io_state(
+void IOHandler::do_set_io_state(
io_state_t new_state,
+ std::optional<crosscore_t::seq_t> cc_seq,
FrameAssemblerV2Ref fa,
bool set_notify_out)
{
ceph_assert_always(seastar::this_shard_id() == get_shard_id());
auto prv_state = get_io_state();
- logger().debug("{} got set_io_state(): prv_state={}, new_state={}, "
+ logger().debug("{} got {}do_set_io_state(): prv_state={}, new_state={}, "
"fa={}, set_notify_out={}, at {}",
- conn, prv_state, new_state,
+ conn,
+ cc_seq.has_value() ? fmt::format("{} ", *cc_seq) : "",
+ prv_state, new_state,
fa ? "present" : "N/A", set_notify_out,
io_stat_printer{*this});
ceph_assert_always(!(
}
}
+seastar::future<> IOHandler::set_io_state(
+ crosscore_t::seq_t cc_seq,
+ io_state_t new_state,
+ FrameAssemblerV2Ref fa,
+ bool set_notify_out)
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} set_io_state(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, new_state,
+ fa=std::move(fa), set_notify_out]() mutable {
+ return set_io_state(cc_seq, new_state, std::move(fa), set_notify_out);
+ });
+ }
+
+ do_set_io_state(new_state, cc_seq, std::move(fa), set_notify_out);
+ return seastar::now();
+}
+
seastar::future<IOHandler::exit_dispatching_ret>
-IOHandler::wait_io_exit_dispatching()
+IOHandler::wait_io_exit_dispatching(
+ crosscore_t::seq_t cc_seq)
{
assert(seastar::this_shard_id() == get_shard_id());
- logger().debug("{} got wait_io_exit_dispatching()", conn);
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} wait_io_exit_dispatching(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq] {
+ return wait_io_exit_dispatching(cc_seq);
+ });
+ }
+
+ logger().debug("{} got {} wait_io_exit_dispatching()",
+ conn, cc_seq);
ceph_assert_always(get_io_state() != io_state_t::open);
ceph_assert_always(frame_assembler != nullptr);
ceph_assert_always(!frame_assembler->is_socket_valid());
});
}
-void IOHandler::reset_session(bool full)
+seastar::future<> IOHandler::reset_session(
+ crosscore_t::seq_t cc_seq,
+ bool full)
{
assert(seastar::this_shard_id() == get_shard_id());
- logger().debug("{} got reset_session({})", conn, full);
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} reset_session(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, full] {
+ return reset_session(cc_seq, full);
+ });
+ }
+
+ logger().debug("{} got {} reset_session({})",
+ conn, cc_seq, full);
assert(get_io_state() != io_state_t::open);
reset_in();
if (full) {
reset_out();
dispatch_remote_reset();
}
+ return seastar::now();
}
-void IOHandler::reset_peer_state()
+seastar::future<> IOHandler::reset_peer_state(
+ crosscore_t::seq_t cc_seq)
{
assert(seastar::this_shard_id() == get_shard_id());
- logger().debug("{} got reset_peer_state()", conn);
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} reset_peer_state(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq] {
+ return reset_peer_state(cc_seq);
+ });
+ }
+
+ logger().debug("{} got {} reset_peer_state()",
+ conn, cc_seq);
assert(get_io_state() != io_state_t::open);
reset_in();
- requeue_out_sent_up_to(0);
+ do_requeue_out_sent_up_to(0);
discard_out_sent();
+ return seastar::now();
}
-void IOHandler::requeue_out_sent()
+seastar::future<> IOHandler::requeue_out_sent(
+ crosscore_t::seq_t cc_seq)
{
assert(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} requeue_out_sent(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq] {
+ return requeue_out_sent(cc_seq);
+ });
+ }
+
+ logger().debug("{} got {} requeue_out_sent()",
+ conn, cc_seq);
+ do_requeue_out_sent();
+ return seastar::now();
+}
+
+void IOHandler::do_requeue_out_sent()
+{
assert(get_io_state() != io_state_t::open);
if (out_sent_msgs.empty()) {
return;
maybe_notify_out_dispatch();
}
-void IOHandler::requeue_out_sent_up_to(seq_num_t seq)
+seastar::future<> IOHandler::requeue_out_sent_up_to(
+ crosscore_t::seq_t cc_seq,
+ seq_num_t msg_seq)
{
assert(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} requeue_out_sent_up_to(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, msg_seq] {
+ return requeue_out_sent_up_to(cc_seq, msg_seq);
+ });
+ }
+
+ logger().debug("{} got {} requeue_out_sent_up_to({})",
+ conn, cc_seq, msg_seq);
+ do_requeue_out_sent_up_to(msg_seq);
+ return seastar::now();
+}
+
+void IOHandler::do_requeue_out_sent_up_to(seq_num_t seq)
+{
assert(get_io_state() != io_state_t::open);
if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
out_sent_msgs.pop_front();
}
}
- requeue_out_sent();
+ do_requeue_out_sent();
}
void IOHandler::reset_in()
seastar::future<>
IOHandler::dispatch_accept(
+ crosscore_t::seq_t cc_seq,
seastar::shard_id new_sid,
ConnectionFRef conn_fref)
{
ceph_assert_always(seastar::this_shard_id() == get_shard_id());
- logger().debug("{} got dispatch_accept({}) at {}",
- conn, new_sid, io_stat_printer{*this});
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} dispatch_accept(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, new_sid,
+ conn_fref=std::move(conn_fref)]() mutable {
+ return dispatch_accept(cc_seq, new_sid, std::move(conn_fref));
+ });
+ }
+
+ logger().debug("{} got {} dispatch_accept({}) at {}",
+ conn, cc_seq, new_sid, io_stat_printer{*this});
if (get_io_state() == io_state_t::drop) {
assert(!protocol_is_connected);
// it is possible that both io_handler and protocolv2 are
seastar::future<>
IOHandler::dispatch_connect(
+ crosscore_t::seq_t cc_seq,
seastar::shard_id new_sid,
ConnectionFRef conn_fref)
{
ceph_assert_always(seastar::this_shard_id() == get_shard_id());
- logger().debug("{} got dispatch_connect({}) at {}",
- conn, new_sid, io_stat_printer{*this});
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} dispatch_connect(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, new_sid,
+ conn_fref=std::move(conn_fref)]() mutable {
+ return dispatch_connect(cc_seq, new_sid, std::move(conn_fref));
+ });
+ }
+
+ logger().debug("{} got {} dispatch_connect({}) at {}",
+ conn, cc_seq, new_sid, io_stat_printer{*this});
if (get_io_state() == io_state_t::drop) {
assert(!protocol_is_connected);
// it is possible that both io_handler and protocolv2 are
}
if (io_state == io_state_t::open) {
+ auto cc_seq = crosscore.prepare_submit();
logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, "
- "send notify_out_fault()",
- conn, io_state, io_stat_printer{*this}, e.what());
+ "send {} notify_out_fault()",
+ conn, io_state, io_stat_printer{*this}, e.what(), cc_seq);
std::exception_ptr eptr;
try {
throw e;
} catch(...) {
eptr = std::current_exception();
}
- set_io_state(io_state_t::delay);
+ do_set_io_state(io_state_t::delay);
shard_states->dispatch_in_background(
- "notify_out_fault(out)", conn, [this, eptr] {
+ "notify_out_fault(out)", conn, [this, cc_seq, eptr] {
auto states = get_states();
return seastar::smp::submit_to(
- conn.get_messenger_shard_id(), [this, eptr, states] {
- handshake_listener->notify_out_fault(
- "do_out_dispatch", eptr, states);
+ conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] {
+ return handshake_listener->notify_out_fault(
+ cc_seq, "do_out_dispatch", eptr, states);
});
});
} else {
ceph_assert_always(seastar::this_shard_id() == get_shard_id());
assert(is_out_queued());
if (need_notify_out) {
- logger().debug("{} send notify_out()", conn);
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} notify_out()",
+ conn, cc_seq);
shard_states->dispatch_in_background(
- "notify_out", conn, [this] {
+ "notify_out", conn, [this, cc_seq] {
return seastar::smp::submit_to(
- conn.get_messenger_shard_id(), [this] {
- handshake_listener->notify_out();
+ conn.get_messenger_shard_id(), [this, cc_seq] {
+ return handshake_listener->notify_out(cc_seq);
});
});
}
auto io_state = ctx.get_io_state();
if (io_state == io_state_t::open) {
+ auto cc_seq = crosscore.prepare_submit();
logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, "
- "send notify_out_fault()",
- conn, io_state, io_stat_printer{*this}, e_what);
- set_io_state(io_state_t::delay);
+ "send {} notify_out_fault()",
+ conn, io_state, io_stat_printer{*this}, e_what, cc_seq);
+ do_set_io_state(io_state_t::delay);
shard_states->dispatch_in_background(
- "notify_out_fault(in)", conn, [this, eptr] {
+ "notify_out_fault(in)", conn, [this, cc_seq, eptr] {
auto states = get_states();
return seastar::smp::submit_to(
- conn.get_messenger_shard_id(), [this, eptr, states] {
- handshake_listener->notify_out_fault(
- "do_in_dispatch", eptr, states);
+ conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] {
+ return handshake_listener->notify_out_fault(
+ cc_seq, "do_in_dispatch", eptr, states);
});
});
} else {
}
seastar::future<>
-IOHandler::close_io(bool is_dispatch_reset, bool is_replace)
+IOHandler::close_io(
+ crosscore_t::seq_t cc_seq,
+ bool is_dispatch_reset,
+ bool is_replace)
{
ceph_assert_always(seastar::this_shard_id() == get_shard_id());
- ceph_assert_always(get_io_state() == io_state_t::drop);
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} close_io(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, is_dispatch_reset, is_replace] {
+ return close_io(cc_seq, is_dispatch_reset, is_replace);
+ });
+ }
- logger().debug("{} got close_io(reset={}, replace={})",
- conn, is_dispatch_reset, is_replace);
+ logger().debug("{} got {} close_io(reset={}, replace={})",
+ conn, cc_seq, is_dispatch_reset, is_replace);
+ ceph_assert_always(get_io_state() == io_state_t::drop);
if (is_dispatch_reset) {
dispatch_reset(is_replace);
#pragma once
+#include <seastar/core/shared_future.hh>
#include <seastar/util/later.hh>
#include "crimson/common/gated.h"
namespace crimson::net {
+/**
+ * crosscore_t
+ *
+ * To preserve the event order across cores.
+ */
+class crosscore_t {
+public:
+ using seq_t = uint64_t;
+
+ crosscore_t() = default;
+ ~crosscore_t() = default;
+
+ seq_t get_in_seq() const {
+ return in_seq;
+ }
+
+ seq_t prepare_submit() {
+ ++out_seq;
+ return out_seq;
+ }
+
+ bool proceed_or_wait(seq_t seq) {
+ if (seq == in_seq + 1) {
+ ++in_seq;
+ if (unlikely(in_pr_wait.has_value())) {
+ in_pr_wait->set_value();
+ in_pr_wait = std::nullopt;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ seastar::future<> wait(seq_t seq) {
+ assert(seq != in_seq + 1);
+ if (!in_pr_wait.has_value()) {
+ in_pr_wait = seastar::shared_promise<>();
+ }
+ return in_pr_wait->get_shared_future();
+ }
+
+private:
+ seq_t out_seq = 0;
+ seq_t in_seq = 0;
+ std::optional<seastar::shared_promise<>> in_pr_wait;
+};
+
/**
* io_handler_state
*
HandshakeListener &operator=(const HandshakeListener &) = delete;
HandshakeListener &operator=(HandshakeListener &&) = delete;
- virtual void notify_out() = 0;
+ virtual seastar::future<> notify_out(
+ crosscore_t::seq_t cc_seq) = 0;
- virtual void notify_out_fault(
+ virtual seastar::future<> notify_out_fault(
+ crosscore_t::seq_t cc_seq,
const char *where,
std::exception_ptr,
io_handler_state) = 0;
- virtual void notify_mark_down() = 0;
+ virtual seastar::future<> notify_mark_down(
+ crosscore_t::seq_t cc_seq) = 0;
protected:
HandshakeListener() = default;
}
io_handler_state get_states() const {
- assert(seastar::this_shard_id() == get_shard_id());
+ // might be called from prv_sid during wait_io_exit_dispatching()
return {in_seq, is_out_queued(), has_out_sent()};
}
* may be called cross-core
*/
- seastar::future<> close_io(bool is_dispatch_reset, bool is_replace);
+ seastar::future<> close_io(
+ crosscore_t::seq_t cc_seq,
+ bool is_dispatch_reset,
+ bool is_replace);
/**
* io_state_t
};
friend class fmt::formatter<io_state_t>;
- void set_io_state(
+ seastar::future<> set_io_state(
+ crosscore_t::seq_t cc_seq,
io_state_t new_state,
- FrameAssemblerV2Ref fa = nullptr,
- bool set_notify_out = false);
+ FrameAssemblerV2Ref fa,
+ bool set_notify_out);
struct exit_dispatching_ret {
FrameAssemblerV2Ref frame_assembler;
io_handler_state io_states;
};
- seastar::future<exit_dispatching_ret> wait_io_exit_dispatching();
+ seastar::future<exit_dispatching_ret>
+ wait_io_exit_dispatching(
+ crosscore_t::seq_t cc_seq);
- void reset_session(bool full);
+ seastar::future<> reset_session(
+ crosscore_t::seq_t cc_seq,
+ bool full);
- void reset_peer_state();
+ seastar::future<> reset_peer_state(
+ crosscore_t::seq_t cc_seq);
- void requeue_out_sent_up_to(seq_num_t seq);
+ seastar::future<> requeue_out_sent_up_to(
+ crosscore_t::seq_t cc_seq,
+ seq_num_t msg_seq);
- void requeue_out_sent();
+ seastar::future<> requeue_out_sent(
+ crosscore_t::seq_t cc_seq);
seastar::future<> dispatch_accept(
- seastar::shard_id new_sid, ConnectionFRef);
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef);
seastar::future<> dispatch_connect(
- seastar::shard_id new_sid, ConnectionFRef);
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef);
private:
class shard_states_t;
std::optional<seastar::promise<>> in_exit_dispatching;
};
+ void do_set_io_state(
+ io_state_t new_state,
+ std::optional<crosscore_t::seq_t> cc_seq = std::nullopt,
+ FrameAssemblerV2Ref fa = nullptr,
+ bool set_notify_out = false);
+
io_state_t get_io_state() const {
return shard_states->get_io_state();
}
+ void do_requeue_out_sent();
+
+ void do_requeue_out_sent_up_to(seq_num_t seq);
+
void assign_frame_assembler(FrameAssemblerV2Ref);
seastar::future<> send_redirected(MessageFRef msg);
private:
shard_states_ref_t shard_states;
+ crosscore_t crosscore;
+
// drop was happening in the previous sid
std::optional<seastar::shard_id> maybe_dropped_sid;