using namespace ceph::msgr::v2;
using crimson::common::local_conf;
-using io_state_t = crimson::net::IOHandler::io_state_t;
-using io_stat_printer = crimson::net::IOHandler::io_stat_printer;
namespace {
frame_assembler{FrameAssemblerV2::create(conn)},
auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()},
protocol_timer{conn}
-{}
+{
+ io_states = io_handler.get_states();
+}
ProtocolV2::~ProtocolV2() {}
ceph_assert_always(!exit_io.has_value());
exit_io = seastar::shared_promise<>();
}
+
+ bool need_notify_out;
+ if (new_state == state_t::STANDBY && !conn.policy.server) {
+ need_notify_out = true;
+ } else {
+ need_notify_out = false;
+ }
+
state = new_state;
if (new_state == state_t::READY) {
// I'm not responsible to shutdown the socket at READY
is_socket_valid = false;
- io_handler.set_io_state(new_io_state, std::move(frame_assembler));
+ io_handler.set_io_state(new_io_state, std::move(frame_assembler), need_notify_out);
} else {
- io_handler.set_io_state(new_io_state, nullptr);
+ io_handler.set_io_state(new_io_state, nullptr, need_notify_out);
}
/*
if (pre_state == state_t::READY) {
gate.dispatch_in_background("exit_io", conn, [this] {
return io_handler.wait_io_exit_dispatching(
- ).then([this](FrameAssemblerV2Ref fa) {
- frame_assembler = std::move(fa);
+ ).then([this](auto ret) {
+ frame_assembler = std::move(ret.frame_assembler);
ceph_assert_always(!frame_assembler->is_socket_valid());
+ io_states = ret.io_states;
exit_io->set_value();
exit_io = std::nullopt;
});
}
if (conn.policy.server ||
- (conn.policy.standby && !io_handler.is_out_queued_or_sent())) {
+ (conn.policy.standby && !io_states.is_out_queued_or_sent())) {
if (conn.policy.server) {
logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}",
conn,
get_state_name(state),
where,
- io_stat_printer{io_handler},
+ io_states,
e_what);
} else {
logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}",
conn,
get_state_name(state),
where,
- io_stat_printer{io_handler},
+ io_states,
e_what);
}
execute_standby();
conn,
get_state_name(state),
where,
- io_stat_printer{io_handler},
+ io_states,
e_what);
execute_wait(false);
} else {
conn,
get_state_name(state),
where,
- io_stat_printer{io_handler},
+ io_states,
e_what);
execute_connecting();
}
client_cookie = generate_client_cookie();
peer_global_seq = 0;
}
+ io_states.reset_session(full);
io_handler.reset_session(full);
}
return frame_assembler->read_frame_payload(
).then([this](auto payload) {
// handle_server_ident() logic
+ io_states.requeue_out_sent();
io_handler.requeue_out_sent();
auto server_ident = ServerIdentFrame::Decode(payload->back());
logger().debug("{} GOT ServerIdentFrame:"
server_cookie,
global_seq,
connect_seq,
- io_handler.get_in_seq());
+ io_states.in_seq);
logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
" server_cookie={}, gs={}, cs={}, in_seq={}",
conn, messenger.get_myaddrs(),
client_cookie, server_cookie,
- global_seq, connect_seq, io_handler.get_in_seq());
+ global_seq, connect_seq, io_states.in_seq);
return frame_assembler->write_flush_frame(reconnect).then([this] {
return frame_assembler->read_main_preamble();
}).then([this](auto ret) {
auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
conn, reconnect_ok.msg_seq());
+ io_states.requeue_out_sent_up_to();
io_handler.requeue_out_sent_up_to(reconnect_ok.msg_seq());
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
});
logger().info("{} connected: gs={}, pgs={}, cs={}, "
"client_cookie={}, server_cookie={}, {}",
conn, global_seq, peer_global_seq, connect_seq,
- client_cookie, server_cookie,
- io_stat_printer{io_handler});
+ client_cookie, server_cookie, io_states);
io_handler.dispatch_connect();
if (unlikely(state != state_t::CONNECTING)) {
logger().debug("{} triggered {} after ms_handle_connect(), abort",
logger().info("{} established: gs={}, pgs={}, cs={}, "
"client_cookie={}, server_cookie={}, {}",
conn, global_seq, peer_global_seq, connect_seq,
- client_cookie, server_cookie,
- io_stat_printer{io_handler});
+ client_cookie, server_cookie, io_states);
execute_ready();
}).handle_exception([this](std::exception_ptr eptr) {
fault(state_t::ESTABLISHING, "execute_establishing", eptr);
logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
// this is required for the case when this connection is being replaced
+ io_states.reset_peer_state();
io_handler.reset_peer_state();
if (!conn.policy.lossy) {
if (reconnect) {
connect_seq = new_connect_seq;
// send_reconnect_ok() logic
+ io_states.requeue_out_sent_up_to();
io_handler.requeue_out_sent_up_to(new_msg_seq);
- auto reconnect_ok = ReconnectOkFrame::Encode(io_handler.get_in_seq());
- logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_handler.get_in_seq());
+ auto reconnect_ok = ReconnectOkFrame::Encode(io_states.in_seq);
+ logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_states.in_seq);
return frame_assembler->write_flush_frame(reconnect_ok);
} else {
client_cookie = new_client_cookie;
"client_cookie={}, server_cookie={}, {}",
conn, reconnect ? "reconnected" : "connected",
global_seq, peer_global_seq, connect_seq,
- client_cookie, server_cookie,
- io_stat_printer{io_handler});
+ client_cookie, server_cookie, io_states);
execute_ready();
}).handle_exception([this](std::exception_ptr eptr) {
fault(state_t::REPLACING, "trigger_replacing", eptr);
// READY state
-void ProtocolV2::notify_out_fault(const char *where, std::exception_ptr eptr)
+void ProtocolV2::notify_out_fault(
+ const char *where,
+ std::exception_ptr eptr,
+ io_handler_state _io_states)
{
+ io_states = _io_states;
fault(state_t::READY, where, eptr);
}
void ProtocolV2::notify_out()
{
+ io_states.is_out_queued = true;
if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
logger().info("{} notify_out(): at {}, going to CONNECTING",
conn, get_state_name(state));
}
if (require_ack && num_msgs == 0u) {
- auto ack_frame = AckFrame::Encode(get_in_seq());
+ auto ack_frame = AckFrame::Encode(in_seq);
bl.append(frame_assembler->get_buffer(ack_frame));
}
header.type, header.priority,
header.version,
ceph_le32(0), header.data_off,
- ceph_le64(get_in_seq()),
+ ceph_le64(in_seq),
footer.flags, header.compat_version,
header.reserved};
}
void IOHandler::set_io_state(
- const IOHandler::io_state_t &new_state,
- FrameAssemblerV2Ref fa)
+ io_state_t new_state,
+ FrameAssemblerV2Ref fa,
+ bool set_notify_out)
{
ceph_assert_always(!(
(new_state == io_state_t::none && io_state != io_state_t::none) ||
assert(fa == nullptr);
}
+ if (new_state == io_state_t::delay) {
+ need_notify_out = set_notify_out;
+ if (need_notify_out) {
+ maybe_notify_out_dispatch();
+ }
+ } else {
+ assert(set_notify_out == false);
+ need_notify_out = false;
+ }
+
if (io_state != new_state) {
io_state = new_state;
io_state_changed.set_value();
}
}
-seastar::future<FrameAssemblerV2Ref> IOHandler::wait_io_exit_dispatching()
+seastar::future<IOHandler::exit_dispatching_ret>
+IOHandler::wait_io_exit_dispatching()
{
ceph_assert_always(io_state != io_state_t::open);
ceph_assert_always(frame_assembler != nullptr);
}
}()
).discard_result().then([this] {
- return std::move(frame_assembler);
+ return exit_dispatching_ret{
+ std::move(frame_assembler),
+ get_states()};
});
}
std::make_move_iterator(out_sent_msgs.begin()),
std::make_move_iterator(out_sent_msgs.end()));
out_sent_msgs.clear();
- notify_out_dispatch();
+ maybe_notify_out_dispatch();
}
void IOHandler::requeue_out_sent_up_to(seq_num_t seq)
eptr = std::current_exception();
}
set_io_state(io_state_t::delay);
- handshake_listener->notify_out_fault("do_out_dispatch", eptr);
+ auto states = get_states();
+ handshake_listener->notify_out_fault(
+ "do_out_dispatch", eptr, states);
} else {
logger().info("{} do_out_dispatch(): fault at {} -- {}",
conn, io_state, e.what());
});
}
+void IOHandler::maybe_notify_out_dispatch()
+{
+ if (is_out_queued()) {
+ notify_out_dispatch();
+ }
+}
+
void IOHandler::notify_out_dispatch()
{
- handshake_listener->notify_out();
+ if (need_notify_out) {
+ handshake_listener->notify_out();
+ }
if (out_dispatching) {
// already dispatching
return;
// client side queueing because messages can't be renumbered, but the (kernel)
// client will occasionally pull a message out of the sent queue to send
// elsewhere. in that case it doesn't matter if we "got" it or not.
- uint64_t cur_seq = get_in_seq();
+ uint64_t cur_seq = in_seq;
if (message->get_seq() <= cur_seq) {
logger().error("{} got old message {} <= {} {}, discarding",
conn, message->get_seq(), cur_seq, *message);
logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
conn, io_state, e_what);
set_io_state(io_state_t::delay);
- handshake_listener->notify_out_fault("do_in_dispatch", eptr);
+ auto states = get_states();
+ handshake_listener->notify_out_fault(
+ "do_in_dispatch", eptr, states);
} else {
logger().info("{} do_in_dispatch(): fault at {} -- {}",
conn, io_state, e_what);
namespace crimson::net {
+/**
+ * io_handler_state
+ *
+ * It is required to populate the states from IOHandler to ProtocolV2
+ * asynchronously.
+ */
+struct io_handler_state {
+ seq_num_t in_seq;
+ bool is_out_queued;
+ bool has_out_sent;
+
+ bool is_out_queued_or_sent() const {
+ return is_out_queued || has_out_sent;
+ }
+
+ /*
+ * should be consistent with the accroding interfaces in IOHandler
+ */
+
+ void reset_session(bool full) {
+ in_seq = 0;
+ if (full) {
+ is_out_queued = false;
+ has_out_sent = false;
+ }
+ }
+
+ void reset_peer_state() {
+ in_seq = 0;
+ is_out_queued = is_out_queued_or_sent();
+ has_out_sent = false;
+ }
+
+ void requeue_out_sent_up_to() {
+ // noop since the information is insufficient
+ }
+
+ void requeue_out_sent() {
+ if (has_out_sent) {
+ has_out_sent = false;
+ is_out_queued = true;
+ }
+ }
+};
+
/**
* HandshakeListener
*
- * The interface class for IOHandler to notify the ProtocolV2 for handshake.
+ * The interface class for IOHandler to notify the ProtocolV2.
*
* The notifications may be cross-core and asynchronous.
*/
virtual void notify_out() = 0;
- virtual void notify_out_fault(const char *where, std::exception_ptr) = 0;
+ virtual void notify_out_fault(
+ const char *where,
+ std::exception_ptr,
+ io_handler_state) = 0;
virtual void notify_mark_down() = 0;
handshake_listener = &hl;
}
+ io_handler_state get_states() const {
+ return {in_seq, is_out_queued(), has_out_sent()};
+ }
+
struct io_stat_printer {
const IOHandler &io_handler;
};
};
friend class fmt::formatter<io_state_t>;
- void set_io_state(const io_state_t &new_state, FrameAssemblerV2Ref fa=nullptr);
+ void set_io_state(
+ io_state_t new_state,
+ FrameAssemblerV2Ref fa = nullptr,
+ bool set_notify_out = false);
- seastar::future<FrameAssemblerV2Ref> wait_io_exit_dispatching();
+ struct exit_dispatching_ret {
+ FrameAssemblerV2Ref frame_assembler;
+ io_handler_state io_states;
+ };
+ seastar::future<exit_dispatching_ret> wait_io_exit_dispatching();
void reset_session(bool full);
void requeue_out_sent();
- bool is_out_queued_or_sent() const {
- return is_out_queued() || !out_sent_msgs.empty();
- }
-
- seq_num_t get_in_seq() const {
- return in_seq;
- }
-
void dispatch_accept();
void dispatch_connect();
std::optional<utime_t> maybe_keepalive_ack,
bool require_ack);
+ void maybe_notify_out_dispatch();
+
void notify_out_dispatch();
void ack_out_sent(seq_num_t seq);
uint64_t ack_left = 0;
+ bool need_notify_out = false;
+
/*
* in states for reading
*/
} // namespace crimson::net
+template <>
+struct fmt::formatter<crimson::net::io_handler_state> {
+ constexpr auto parse(format_parse_context& ctx) {
+ return ctx.begin();
+ }
+
+ template <typename FormatContext>
+ auto format(crimson::net::io_handler_state state, FormatContext& ctx) {
+ return fmt::format_to(
+ ctx.out(),
+ "io(in_seq={}, is_out_queued={}, has_out_sent={})",
+ state.in_seq,
+ state.is_out_queued,
+ state.has_out_sent);
+ }
+};
+
template <>
struct fmt::formatter<crimson::net::IOHandler::io_state_t>
: fmt::formatter<std::string_view> {