seastar::future<> Protocol::send(MessageURef msg)
{
- if (out_state != out_state_t::drop) {
+ if (io_state != io_state_t::drop) {
out_pending_msgs.push_back(std::move(msg));
notify_out_dispatch();
}
void Protocol::mark_down()
{
- ceph_assert_always(out_state != out_state_t::none);
+ ceph_assert_always(io_state != io_state_t::none);
need_dispatch_reset = false;
- if (out_state == out_state_t::drop) {
+ if (io_state == io_state_t::drop) {
return;
}
logger().info("{} mark_down() with {}",
conn, io_stat_printer{*this});
- set_out_state(out_state_t::drop);
+ set_io_state(io_state_t::drop);
notify_mark_down();
}
void Protocol::print_io_stat(std::ostream &out) const
{
out << "io_stat("
- << "out_state=" << fmt::format("{}", out_state)
+ << "io_state=" << fmt::format("{}", io_state)
<< ", in_seq=" << in_seq
<< ", out_seq=" << out_seq
<< ", out_pending_msgs_size=" << out_pending_msgs.size()
<< ")";
}
-void Protocol::set_out_state(
- const Protocol::out_state_t &new_state,
+void Protocol::set_io_state(
+ const Protocol::io_state_t &new_state,
FrameAssemblerV2Ref fa)
{
ceph_assert_always(!(
- (new_state == out_state_t::none && out_state != out_state_t::none) ||
- (new_state == out_state_t::open && out_state == out_state_t::open) ||
- (new_state != out_state_t::drop && out_state == out_state_t::drop)
+ (new_state == io_state_t::none && io_state != io_state_t::none) ||
+ (new_state == io_state_t::open && io_state == io_state_t::open) ||
+ (new_state != io_state_t::drop && io_state == io_state_t::drop)
));
bool dispatch_in = false;
- if (new_state == out_state_t::open) {
+ if (new_state == io_state_t::open) {
// to open
ceph_assert_always(protocol_is_connected == true);
assert(fa != nullptr);
conn.interceptor->register_conn_ready(conn);
}
#endif
- } else if (out_state == out_state_t::open) {
+ } else if (io_state == io_state_t::open) {
// from open
ceph_assert_always(protocol_is_connected == true);
protocol_is_connected = false;
assert(fa == nullptr);
}
- if (out_state != new_state) {
- out_state = new_state;
- out_state_changed.set_value();
- out_state_changed = seastar::promise<>();
+ if (io_state != new_state) {
+ io_state = new_state;
+ io_state_changed.set_value();
+ io_state_changed = seastar::promise<>();
}
/*
seastar::future<FrameAssemblerV2Ref> Protocol::wait_io_exit_dispatching()
{
- ceph_assert_always(out_state != out_state_t::open);
+ ceph_assert_always(io_state != io_state_t::open);
ceph_assert_always(frame_assembler != nullptr);
ceph_assert_always(!frame_assembler->is_socket_valid());
return seastar::when_all(
void Protocol::requeue_out_sent()
{
- assert(out_state != out_state_t::open);
+ assert(io_state != io_state_t::open);
if (out_sent_msgs.empty()) {
return;
}
void Protocol::requeue_out_sent_up_to(seq_num_t seq)
{
- assert(out_state != out_state_t::open);
+ assert(io_state != io_state_t::open);
if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
conn, out_seq, seq);
void Protocol::reset_out()
{
- assert(out_state != out_state_t::open);
+ assert(io_state != io_state_t::open);
out_seq = 0;
out_pending_msgs.clear();
out_sent_msgs.clear();
void Protocol::dispatch_accept()
{
- if (out_state == out_state_t::drop) {
+ if (io_state == io_state_t::drop) {
return;
}
// protocol_is_connected can be from true to true here if the replacing is
void Protocol::dispatch_connect()
{
- if (out_state == out_state_t::drop) {
+ if (io_state == io_state_t::drop) {
return;
}
ceph_assert_always(protocol_is_connected == false);
void Protocol::dispatch_reset(bool is_replace)
{
- ceph_assert_always(out_state == out_state_t::drop);
+ ceph_assert_always(io_state == io_state_t::drop);
if (!need_dispatch_reset) {
return;
}
void Protocol::dispatch_remote_reset()
{
- if (out_state == out_state_t::drop) {
+ if (io_state == io_state_t::drop) {
return;
}
dispatchers.ms_handle_remote_reset(
out_exit_dispatching = std::nullopt;
logger().info("{} do_out_dispatch: nothing queued at {},"
" set out_exit_dispatching",
- conn, out_state);
+ conn, io_state);
}
return seastar::make_ready_future<stop_t>(stop_t::yes);
} else {
seastar::future<> Protocol::do_out_dispatch()
{
return seastar::repeat([this] {
- switch (out_state) {
- case out_state_t::open: {
+ switch (io_state) {
+ case io_state_t::open: {
bool still_queued = is_out_queued();
if (unlikely(!still_queued)) {
return try_exit_out_dispatch();
}
});
}
- case out_state_t::delay:
+ case io_state_t::delay:
// delay out dispatching until open
if (out_exit_dispatching) {
out_exit_dispatching->set_value();
} else {
logger().info("{} do_out_dispatch: delay ...", conn);
}
- return out_state_changed.get_future(
+ return io_state_changed.get_future(
).then([] { return stop_t::no; });
- case out_state_t::drop:
+ case io_state_t::drop:
ceph_assert(out_dispatching);
out_dispatching = false;
if (out_exit_dispatching) {
e.code() != std::errc::connection_reset &&
e.code() != error::negotiation_failure) {
logger().error("{} do_out_dispatch(): unexpected error at {} -- {}",
- conn, out_state, e);
+ conn, io_state, e);
ceph_abort();
}
- if (out_state == out_state_t::open) {
+ if (io_state == io_state_t::open) {
logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
- conn, out_state, e);
+ conn, io_state, e);
std::exception_ptr eptr;
try {
throw e;
} catch(...) {
eptr = std::current_exception();
}
- set_out_state(out_state_t::delay);
+ set_io_state(io_state_t::delay);
notify_out_fault("do_out_dispatch", eptr);
} else {
logger().info("{} do_out_dispatch(): fault at {} -- {}",
- conn, out_state, e);
+ conn, io_state, e);
}
return do_out_dispatch();
return;
}
out_dispatching = true;
- switch (out_state) {
- case out_state_t::open:
+ switch (io_state) {
+ case io_state_t::open:
[[fallthrough]];
- case out_state_t::delay:
+ case io_state_t::delay:
assert(!gate.is_closed());
gate.dispatch_in_background("do_out_dispatch", conn, [this] {
return do_out_dispatch();
});
return;
- case out_state_t::drop:
+ case io_state_t::drop:
out_dispatching = false;
return;
default:
{
return frame_assembler->read_frame_payload(
).then([this, throttle_stamp, msg_size](auto payload) {
- if (unlikely(out_state != out_state_t::open)) {
+ if (unlikely(io_state != io_state_t::open)) {
logger().debug("{} triggered {} during read_message()",
- conn, out_state);
+ conn, io_state);
abort_protocol();
}
// TODO: change MessageRef with seastar::shared_ptr
auto msg_ref = MessageRef{message, false};
- assert(out_state == out_state_t::open);
+ assert(io_state == io_state_t::open);
// throttle the reading process by the returned future
return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
});
e_what = e.what();
}
- if (out_state == out_state_t::open) {
+ if (io_state == io_state_t::open) {
logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
- conn, out_state, e_what);
- set_out_state(out_state_t::delay);
+ conn, io_state, e_what);
+ set_io_state(io_state_t::delay);
notify_out_fault("do_in_dispatch", eptr);
} else {
logger().info("{} do_in_dispatch(): fault at {} -- {}",
- conn, out_state, e_what);
+ conn, io_state, e_what);
}
}).finally([this] {
ceph_assert_always(in_exit_dispatching.has_value());
// TODO: encapsulate a SessionedSender class
protected:
- seastar::future<> close_out() {
- ceph_assert_always(out_state == out_state_t::drop);
+ seastar::future<> close_io() {
+ ceph_assert_always(io_state == io_state_t::drop);
assert(!gate.is_closed());
return gate.close();
}
/**
- * out_state_t
+ * io_state_t
*
- * The out_state is changed with protocol state atomically, indicating the
- * out behavior of the according protocol state.
+ * The io_state is changed with protocol state atomically, indicating the
+ * IOHandler behavior of the according protocol state.
*/
- enum class out_state_t : uint8_t {
+ enum class io_state_t : uint8_t {
none,
delay,
open,
drop
};
- friend class fmt::formatter<out_state_t>;
+ friend class fmt::formatter<io_state_t>;
- void set_out_state(const out_state_t &new_state, FrameAssemblerV2Ref fa=nullptr);
+ void set_io_state(const io_state_t &new_state, FrameAssemblerV2Ref fa=nullptr);
seastar::future<FrameAssemblerV2Ref> wait_io_exit_dispatching();
bool need_dispatch_reset = true;
+ io_state_t io_state = io_state_t::none;
+
+ // wait until current io_state changed
+ seastar::promise<> io_state_changed;
+
/*
* out states for writing
*/
- out_state_t out_state = out_state_t::none;
-
- // wait until current out_state changed
- seastar::promise<> out_state_changed;
-
bool out_dispatching = false;
std::optional<seastar::promise<>> out_exit_dispatching;
} // namespace crimson::net
template <>
-struct fmt::formatter<crimson::net::Protocol::out_state_t>
+struct fmt::formatter<crimson::net::Protocol::io_state_t>
: fmt::formatter<std::string_view> {
template <typename FormatContext>
- auto format(crimson::net::Protocol::out_state_t state, FormatContext& ctx) {
- using enum crimson::net::Protocol::out_state_t;
+ auto format(crimson::net::Protocol::io_state_t state, FormatContext& ctx) {
+ using enum crimson::net::Protocol::io_state_t;
std::string_view name;
switch (state) {
case none:
execute_accepting();
}
-void ProtocolV2::trigger_state(state_t new_state, out_state_t _out_state, bool reentrant)
+void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool reentrant)
{
if (!reentrant && new_state == state) {
logger().error("{} is not allowed to re-trigger state {}",
if (new_state == state_t::READY) {
// I'm not responsible to shutdown the socket at READY
is_socket_valid = false;
- set_out_state(_out_state, std::move(frame_assembler));
+ set_io_state(new_io_state, std::move(frame_assembler));
} else {
- set_out_state(_out_state, nullptr);
+ set_io_state(new_io_state, nullptr);
}
/*
void ProtocolV2::execute_connecting()
{
ceph_assert_always(!is_socket_valid);
- trigger_state(state_t::CONNECTING, out_state_t::delay, false);
+ trigger_state(state_t::CONNECTING, io_state_t::delay, false);
gated_execute("execute_connecting", conn, [this] {
global_seq = messenger.get_global_seq();
assert(client_cookie != 0);
void ProtocolV2::execute_accepting()
{
assert(is_socket_valid);
- trigger_state(state_t::ACCEPTING, out_state_t::none, false);
+ trigger_state(state_t::ACCEPTING, io_state_t::none, false);
gate.dispatch_in_background("execute_accepting", conn, [this] {
return seastar::futurize_invoke([this] {
INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
};
ceph_assert_always(is_socket_valid);
- trigger_state(state_t::ESTABLISHING, out_state_t::delay, false);
+ trigger_state(state_t::ESTABLISHING, io_state_t::delay, false);
if (existing_conn) {
static_cast<ProtocolV2*>(existing_conn->protocol.get())->do_close(
true /* is_dispatch_reset */, std::move(accept_me));
uint64_t new_connect_seq,
uint64_t new_msg_seq)
{
- trigger_state(state_t::REPLACING, out_state_t::delay, false);
+ trigger_state(state_t::REPLACING, io_state_t::delay, false);
ceph_assert_always(has_socket);
ceph_assert_always(!mover.socket->is_shutdown());
if (is_socket_valid) {
assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
protocol_timer.cancel();
ceph_assert_always(is_socket_valid);
- trigger_state(state_t::READY, out_state_t::open, false);
+ trigger_state(state_t::READY, io_state_t::open, false);
}
// STANDBY state
void ProtocolV2::execute_standby()
{
ceph_assert_always(!is_socket_valid);
- trigger_state(state_t::STANDBY, out_state_t::delay, false);
+ trigger_state(state_t::STANDBY, io_state_t::delay, false);
}
void ProtocolV2::notify_out()
void ProtocolV2::execute_wait(bool max_backoff)
{
ceph_assert_always(!is_socket_valid);
- trigger_state(state_t::WAIT, out_state_t::delay, false);
+ trigger_state(state_t::WAIT, io_state_t::delay, false);
gated_execute("execute_wait", conn, [this, max_backoff] {
double backoff = protocol_timer.last_dur();
if (max_backoff) {
void ProtocolV2::execute_server_wait()
{
ceph_assert_always(is_socket_valid);
- trigger_state(state_t::SERVER_WAIT, out_state_t::none, false);
+ trigger_state(state_t::SERVER_WAIT, io_state_t::none, false);
gated_execute("execute_server_wait", conn, [this] {
return frame_assembler->read_exactly(1
).then([this](auto bl) {
ceph_assert(false);
}
protocol_timer.cancel();
- trigger_state(state_t::CLOSING, out_state_t::drop, false);
+ trigger_state(state_t::CLOSING, io_state_t::drop, false);
if (f_accept_new) {
(*f_accept_new)();
is_socket_valid = false;
}
assert(!gate.is_closed());
- auto gate_closed = gate.close();
- auto out_closed = close_out();
+ auto handshake_closed = gate.close();
+ auto io_closed = close_io();
if (is_dispatch_reset) {
dispatch_reset(is_replace);
// asynchronous operations
assert(!closed_clean_fut.valid());
closed_clean_fut = seastar::when_all(
- std::move(gate_closed), std::move(out_closed)
+ std::move(handshake_closed), std::move(io_closed)
).discard_result().then([this] {
ceph_assert_always(!exit_io.has_value());
if (has_socket) {