#endif
}
-seastar::future<> IOHandler::send(MessageFRef msg)
+seastar::future<> IOHandler::send(MessageURef _msg)
{
+ // may be invoked from any core
+ MessageFRef msg = seastar::make_foreign(std::move(_msg));
+ auto cc_seq = io_crosscore.prepare_submit();
+ auto source_core = seastar::this_shard_id();
// sid may be changed on-the-fly during the submission
- if (seastar::this_shard_id() == get_shard_id()) {
- return do_send(std::move(msg));
+ if (source_core == get_shard_id()) {
+ return do_send(cc_seq, source_core, std::move(msg));
} else {
- logger().trace("{} send() is directed to {} -- {}",
- conn, get_shard_id(), *msg);
+ logger().trace("{} send() {} is directed to core {} -- {}",
+ conn, cc_seq, get_shard_id(), *msg);
return seastar::smp::submit_to(
- get_shard_id(), [this, msg=std::move(msg)]() mutable {
- return send_redirected(std::move(msg));
+ get_shard_id(),
+ [this, cc_seq, source_core, msg=std::move(msg)]() mutable {
+ return send_recheck_shard(cc_seq, source_core, std::move(msg));
});
}
}
-seastar::future<> IOHandler::send_redirected(MessageFRef msg)
+seastar::future<> IOHandler::send_recheck_shard(
+ cc_seq_t cc_seq,
+ core_id_t source_core,
+ MessageFRef msg)
{
// sid may be changed on-the-fly during the submission
if (seastar::this_shard_id() == get_shard_id()) {
- return do_send(std::move(msg));
+ return do_send(cc_seq, source_core, std::move(msg));
} else {
- logger().debug("{} send() is redirected to {} -- {}",
- conn, get_shard_id(), *msg);
+ logger().debug("{} send_recheck_shard() {} "
+ "is redirected from core {} to {} -- {}",
+ conn, cc_seq, source_core, get_shard_id(), *msg);
return seastar::smp::submit_to(
- get_shard_id(), [this, msg=std::move(msg)]() mutable {
- return send_redirected(std::move(msg));
+ get_shard_id(),
+ [this, cc_seq, source_core, msg=std::move(msg)]() mutable {
+ return send_recheck_shard(cc_seq, source_core, std::move(msg));
});
}
}
-seastar::future<> IOHandler::do_send(MessageFRef msg)
+seastar::future<> IOHandler::do_send(
+ cc_seq_t cc_seq,
+ core_id_t source_core,
+ MessageFRef msg)
{
assert(seastar::this_shard_id() == get_shard_id());
- logger().trace("{} do_send() got message -- {}", conn, *msg);
- if (get_io_state() != io_state_t::drop) {
- out_pending_msgs.push_back(std::move(msg));
- notify_out_dispatch();
+ if (io_crosscore.proceed_or_wait(cc_seq, source_core)) {
+ logger().trace("{} do_send() got {} from core {}: send message -- {}",
+ conn, cc_seq, source_core, *msg);
+ if (get_io_state() != io_state_t::drop) {
+ out_pending_msgs.push_back(std::move(msg));
+ notify_out_dispatch();
+ }
+ return seastar::now();
+ } else {
+ logger().debug("{} do_send() got {} from core {}, wait at {} -- {}",
+ conn, cc_seq, source_core,
+ io_crosscore.get_in_seq(source_core),
+ *msg);
+ return io_crosscore.wait(cc_seq, source_core
+ ).then([this, cc_seq, source_core, msg=std::move(msg)]() mutable {
+ return send_recheck_shard(cc_seq, source_core, std::move(msg));
+ });
}
- return seastar::now();
}
seastar::future<> IOHandler::send_keepalive()
{
+ // may be invoked from any core
+ auto cc_seq = io_crosscore.prepare_submit();
+ auto source_core = seastar::this_shard_id();
// sid may be changed on-the-fly during the submission
- if (seastar::this_shard_id() == get_shard_id()) {
- return do_send_keepalive();
+ if (source_core == get_shard_id()) {
+ return do_send_keepalive(cc_seq, source_core);
} else {
- logger().trace("{} send_keepalive() is directed to {}", conn, get_shard_id());
+ logger().trace("{} send_keepalive() {} is directed to core {}",
+ conn, cc_seq, get_shard_id());
return seastar::smp::submit_to(
- get_shard_id(), [this] {
- return send_keepalive_redirected();
+ get_shard_id(),
+ [this, cc_seq, source_core] {
+ return send_keepalive_recheck_shard(cc_seq, source_core);
});
}
}
-seastar::future<> IOHandler::send_keepalive_redirected()
+seastar::future<> IOHandler::send_keepalive_recheck_shard(
+ cc_seq_t cc_seq,
+ core_id_t source_core)
{
// sid may be changed on-the-fly during the submission
if (seastar::this_shard_id() == get_shard_id()) {
- return do_send_keepalive();
+ return do_send_keepalive(cc_seq, source_core);
} else {
- logger().debug("{} send_keepalive() is redirected to {}", conn, get_shard_id());
+ logger().debug("{} send_keepalive_recheck_shard() {} "
+ "is redirected from core {} to {}",
+ conn, cc_seq, source_core, get_shard_id());
return seastar::smp::submit_to(
- get_shard_id(), [this] {
- return send_keepalive_redirected();
+ get_shard_id(),
+ [this, cc_seq, source_core] {
+ return send_keepalive_recheck_shard(cc_seq, source_core);
});
}
}
-seastar::future<> IOHandler::do_send_keepalive()
+seastar::future<> IOHandler::do_send_keepalive(
+ cc_seq_t cc_seq,
+ core_id_t source_core)
{
assert(seastar::this_shard_id() == get_shard_id());
- logger().trace("{} do_send_keeplive(): need_keepalive={}", conn, need_keepalive);
- if (!need_keepalive) {
- need_keepalive = true;
- notify_out_dispatch();
+ if (io_crosscore.proceed_or_wait(cc_seq, source_core)) {
+ logger().trace("{} do_send_keeplive() got {} from core {}: need_keepalive={}",
+ conn, cc_seq, source_core, need_keepalive);
+ if (!need_keepalive) {
+ need_keepalive = true;
+ notify_out_dispatch();
+ }
+ return seastar::now();
+ } else {
+ logger().debug("{} do_send_keepalive() got {} from core {}, wait at {}",
+ conn, cc_seq, source_core,
+ io_crosscore.get_in_seq(source_core));
+ return io_crosscore.wait(cc_seq, source_core
+ ).then([this, cc_seq, source_core] {
+ return send_keepalive_recheck_shard(cc_seq, source_core);
+ });
}
- return seastar::now();
}
void IOHandler::mark_down()
return;
}
- auto cc_seq = crosscore.prepare_submit();
+ auto cc_seq = proto_crosscore.prepare_submit();
logger().info("{} mark_down() at {}, send {} notify_mark_down()",
conn, io_stat_printer{*this}, cc_seq);
do_set_io_state(io_state_t::drop);
bool set_notify_out)
{
assert(seastar::this_shard_id() == get_shard_id());
- if (!crosscore.proceed_or_wait(cc_seq)) {
+ if (!proto_crosscore.proceed_or_wait(cc_seq)) {
logger().debug("{} got {} set_io_state(), wait at {}",
- conn, cc_seq, crosscore.get_in_seq());
- return crosscore.wait(cc_seq
+ conn, cc_seq, proto_crosscore.get_in_seq());
+ return proto_crosscore.wait(cc_seq
).then([this, cc_seq, new_state,
fa=std::move(fa), set_notify_out]() mutable {
return set_io_state(cc_seq, new_state, std::move(fa), set_notify_out);
cc_seq_t cc_seq)
{
assert(seastar::this_shard_id() == get_shard_id());
- if (!crosscore.proceed_or_wait(cc_seq)) {
+ if (!proto_crosscore.proceed_or_wait(cc_seq)) {
logger().debug("{} got {} wait_io_exit_dispatching(), wait at {}",
- conn, cc_seq, crosscore.get_in_seq());
- return crosscore.wait(cc_seq
+ conn, cc_seq, proto_crosscore.get_in_seq());
+ return proto_crosscore.wait(cc_seq
).then([this, cc_seq] {
return wait_io_exit_dispatching(cc_seq);
});
bool full)
{
assert(seastar::this_shard_id() == get_shard_id());
- if (!crosscore.proceed_or_wait(cc_seq)) {
+ if (!proto_crosscore.proceed_or_wait(cc_seq)) {
logger().debug("{} got {} reset_session(), wait at {}",
- conn, cc_seq, crosscore.get_in_seq());
- return crosscore.wait(cc_seq
+ conn, cc_seq, proto_crosscore.get_in_seq());
+ return proto_crosscore.wait(cc_seq
).then([this, cc_seq, full] {
return reset_session(cc_seq, full);
});
cc_seq_t cc_seq)
{
assert(seastar::this_shard_id() == get_shard_id());
- if (!crosscore.proceed_or_wait(cc_seq)) {
+ if (!proto_crosscore.proceed_or_wait(cc_seq)) {
logger().debug("{} got {} reset_peer_state(), wait at {}",
- conn, cc_seq, crosscore.get_in_seq());
- return crosscore.wait(cc_seq
+ conn, cc_seq, proto_crosscore.get_in_seq());
+ return proto_crosscore.wait(cc_seq
).then([this, cc_seq] {
return reset_peer_state(cc_seq);
});
cc_seq_t cc_seq)
{
assert(seastar::this_shard_id() == get_shard_id());
- if (!crosscore.proceed_or_wait(cc_seq)) {
+ if (!proto_crosscore.proceed_or_wait(cc_seq)) {
logger().debug("{} got {} requeue_out_sent(), wait at {}",
- conn, cc_seq, crosscore.get_in_seq());
- return crosscore.wait(cc_seq
+ conn, cc_seq, proto_crosscore.get_in_seq());
+ return proto_crosscore.wait(cc_seq
).then([this, cc_seq] {
return requeue_out_sent(cc_seq);
});
seq_num_t msg_seq)
{
assert(seastar::this_shard_id() == get_shard_id());
- if (!crosscore.proceed_or_wait(cc_seq)) {
+ if (!proto_crosscore.proceed_or_wait(cc_seq)) {
logger().debug("{} got {} requeue_out_sent_up_to(), wait at {}",
- conn, cc_seq, crosscore.get_in_seq());
- return crosscore.wait(cc_seq
+ conn, cc_seq, proto_crosscore.get_in_seq());
+ return proto_crosscore.wait(cc_seq
).then([this, cc_seq, msg_seq] {
return requeue_out_sent_up_to(cc_seq, msg_seq);
});
std::optional<bool> is_replace)
{
ceph_assert_always(seastar::this_shard_id() == get_shard_id());
- if (!crosscore.proceed_or_wait(cc_seq)) {
+ if (!proto_crosscore.proceed_or_wait(cc_seq)) {
logger().debug("{} got {} to_new_sid(), wait at {}",
- conn, cc_seq, crosscore.get_in_seq());
- return crosscore.wait(cc_seq
+ conn, cc_seq, proto_crosscore.get_in_seq());
+ return proto_crosscore.wait(cc_seq
).then([this, cc_seq, new_sid, is_replace,
conn_fref=std::move(conn_fref)]() mutable {
return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace);
shard_states = shard_states_t::create_from_previous(
*maybe_prv_shard_states, new_sid);
assert(new_sid == get_shard_id());
+ // broadcast shard change to all the io waiters, atomically.
+ io_crosscore.reset_wait();
return seastar::smp::submit_to(new_sid,
[this, next_cc_seq, is_dropped, prv_sid, is_replace, conn_fref=std::move(conn_fref)]() mutable {
ceph_assert_always(seastar::this_shard_id() == get_shard_id());
ceph_assert_always(get_io_state() != io_state_t::open);
ceph_assert_always(!maybe_dropped_sid.has_value());
- ceph_assert_always(crosscore.proceed_or_wait(next_cc_seq));
+ ceph_assert_always(proto_crosscore.proceed_or_wait(next_cc_seq));
if (is_dropped) {
ceph_assert_always(get_io_state() == io_state_t::drop);
return seastar::smp::submit_to(sid,
[this, cc_seq, conn_fref=std::move(conn_fref)]() mutable {
// must be the first to proceed
- ceph_assert_always(crosscore.proceed_or_wait(cc_seq));
+ ceph_assert_always(proto_crosscore.proceed_or_wait(cc_seq));
logger().debug("{} set accepted sid", conn);
ceph_assert_always(seastar::this_shard_id() == get_shard_id());
}
if (io_state == io_state_t::open) {
- auto cc_seq = crosscore.prepare_submit();
+ auto cc_seq = proto_crosscore.prepare_submit();
logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, "
"send {} notify_out_fault()",
conn, io_state, io_stat_printer{*this}, e.what(), cc_seq);
ceph_assert_always(seastar::this_shard_id() == get_shard_id());
assert(is_out_queued());
if (need_notify_out) {
- auto cc_seq = crosscore.prepare_submit();
+ auto cc_seq = proto_crosscore.prepare_submit();
logger().debug("{} send {} notify_out()",
conn, cc_seq);
shard_states->dispatch_in_background(
auto io_state = ctx.get_io_state();
if (io_state == io_state_t::open) {
- auto cc_seq = crosscore.prepare_submit();
+ auto cc_seq = proto_crosscore.prepare_submit();
logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, "
"send {} notify_out_fault()",
conn, io_state, io_stat_printer{*this}, e_what, cc_seq);
bool is_replace)
{
ceph_assert_always(seastar::this_shard_id() == get_shard_id());
- if (!crosscore.proceed_or_wait(cc_seq)) {
+ if (!proto_crosscore.proceed_or_wait(cc_seq)) {
logger().debug("{} got {} close_io(), wait at {}",
- conn, cc_seq, crosscore.get_in_seq());
- return crosscore.wait(cc_seq
+ conn, cc_seq, proto_crosscore.get_in_seq());
+ return proto_crosscore.wait(cc_seq
).then([this, cc_seq, is_dispatch_reset, is_replace] {
return close_io(cc_seq, is_dispatch_reset, is_replace);
});