From 28bbab69aabfc18a27d614f052339ebce8ce8134 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 2 Jun 2023 15:45:07 +0800 Subject: [PATCH] crimson/net: factor out IOHandler::shard_states_t which will be switchable atomically Signed-off-by: Yingxin Cheng (cherry picked from commit 37f1456027abf38516ddf11f9c3d8f210ca0d91f) --- src/crimson/net/io_handler.cc | 208 ++++++++++++++++------------------ src/crimson/net/io_handler.h | 157 +++++++++++++++++++++---- 2 files changed, 238 insertions(+), 127 deletions(-) diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index 22dfa538b19d9..b6c3cf694c3f9 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -47,7 +47,8 @@ namespace crimson::net { IOHandler::IOHandler(ChainedDispatchers &dispatchers, SocketConnection &conn) - : sid(seastar::this_shard_id()), + : shard_states(shard_states_t::create( + seastar::this_shard_id(), io_state_t::none)), dispatchers(dispatchers), conn(conn), conn_ref(conn.get_local_shared_foreign_from_this()) @@ -56,8 +57,7 @@ IOHandler::IOHandler(ChainedDispatchers &dispatchers, IOHandler::~IOHandler() { // close_io() must be finished - ceph_assert(gate.is_closed()); - assert(!out_exit_dispatching); + ceph_assert_always(shard_states->assert_closed_and_exit()); assert(!conn_ref); } @@ -126,11 +126,11 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent( seastar::future<> IOHandler::send(MessageFRef msg) { - if (seastar::this_shard_id() == sid) { + if (seastar::this_shard_id() == get_shard_id()) { return do_send(std::move(msg)); } else { return seastar::smp::submit_to( - sid, [this, msg=std::move(msg)]() mutable { + get_shard_id(), [this, msg=std::move(msg)]() mutable { return do_send(std::move(msg)); }); } @@ -138,8 +138,8 @@ seastar::future<> IOHandler::send(MessageFRef msg) seastar::future<> IOHandler::do_send(MessageFRef msg) { - assert(seastar::this_shard_id() == sid); - if (io_state != io_state_t::drop) { + assert(seastar::this_shard_id() == get_shard_id()); + if (get_io_state() != io_state_t::drop) { out_pending_msgs.push_back(std::move(msg)); notify_out_dispatch(); } @@ -148,11 +148,11 @@ seastar::future<> IOHandler::do_send(MessageFRef msg) seastar::future<> IOHandler::send_keepalive() { - if (seastar::this_shard_id() == sid) { + if (seastar::this_shard_id() == get_shard_id()) { return do_send_keepalive(); } else { return seastar::smp::submit_to( - sid, [this] { + get_shard_id(), [this] { return do_send_keepalive(); }); } @@ -160,7 +160,7 @@ seastar::future<> IOHandler::send_keepalive() seastar::future<> IOHandler::do_send_keepalive() { - assert(seastar::this_shard_id() == sid); + assert(seastar::this_shard_id() == get_shard_id()); if (!need_keepalive) { need_keepalive = true; notify_out_dispatch(); @@ -170,10 +170,10 @@ seastar::future<> IOHandler::do_send_keepalive() void IOHandler::mark_down() { - ceph_assert_always(seastar::this_shard_id() == sid); - ceph_assert_always(io_state != io_state_t::none); + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + ceph_assert_always(get_io_state() != io_state_t::none); need_dispatch_reset = false; - if (io_state == io_state_t::drop) { + if (get_io_state() == io_state_t::drop) { return; } @@ -185,8 +185,9 @@ void IOHandler::mark_down() void IOHandler::print_io_stat(std::ostream &out) const { + assert(seastar::this_shard_id() == get_shard_id()); out << "io_stat(" - << "io_state=" << fmt::format("{}", io_state) + << "io_state=" << fmt::format("{}", get_io_state()) << ", in_seq=" << in_seq << ", out_seq=" << out_seq << ", out_pending_msgs_size=" << out_pending_msgs.size() @@ -202,7 +203,8 @@ void IOHandler::set_io_state( FrameAssemblerV2Ref fa, bool set_notify_out) { - auto prv_state = io_state; + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + auto prv_state = get_io_state(); ceph_assert_always(!( (new_state == io_state_t::none && prv_state != io_state_t::none) || (new_state == io_state_t::open && prv_state == io_state_t::open) || @@ -232,10 +234,6 @@ void IOHandler::set_io_state( assert(fa == nullptr); ceph_assert_always(frame_assembler->is_socket_valid()); frame_assembler->shutdown_socket(nullptr); - if (out_dispatching) { - ceph_assert_always(!out_exit_dispatching.has_value()); - out_exit_dispatching = seastar::promise<>(); - } } else { assert(fa == nullptr); } @@ -252,9 +250,7 @@ void IOHandler::set_io_state( // FIXME: simplify and drop the prv_state == new_state case if (prv_state != new_state) { - io_state = new_state; - io_state_changed.set_value(); - io_state_changed = seastar::promise<>(); + shard_states->set_io_state(new_state); } /* @@ -269,25 +265,11 @@ void IOHandler::set_io_state( seastar::future IOHandler::wait_io_exit_dispatching() { - ceph_assert_always(io_state != io_state_t::open); + ceph_assert_always(get_io_state() != io_state_t::open); ceph_assert_always(frame_assembler != nullptr); ceph_assert_always(!frame_assembler->is_socket_valid()); - return seastar::when_all( - [this] { - if (out_exit_dispatching) { - return out_exit_dispatching->get_future(); - } else { - return seastar::now(); - } - }(), - [this] { - if (in_exit_dispatching) { - return in_exit_dispatching->get_future(); - } else { - return seastar::now(); - } - }() - ).discard_result().then([this] { + return shard_states->wait_io_exit_dispatching( + ).then([this] { ceph_assert_always(frame_assembler != nullptr); ceph_assert_always(!frame_assembler->is_socket_valid()); return exit_dispatching_ret{ @@ -298,7 +280,7 @@ IOHandler::wait_io_exit_dispatching() void IOHandler::reset_session(bool full) { - assert(io_state != io_state_t::open); + assert(get_io_state() != io_state_t::open); reset_in(); if (full) { reset_out(); @@ -308,7 +290,7 @@ void IOHandler::reset_session(bool full) void IOHandler::reset_peer_state() { - assert(io_state != io_state_t::open); + assert(get_io_state() != io_state_t::open); reset_in(); requeue_out_sent_up_to(0); discard_out_sent(); @@ -316,7 +298,7 @@ void IOHandler::reset_peer_state() void IOHandler::requeue_out_sent() { - assert(io_state != io_state_t::open); + assert(get_io_state() != io_state_t::open); if (out_sent_msgs.empty()) { return; } @@ -338,7 +320,7 @@ void IOHandler::requeue_out_sent() void IOHandler::requeue_out_sent_up_to(seq_num_t seq) { - assert(io_state != io_state_t::open); + assert(get_io_state() != io_state_t::open); if (out_sent_msgs.empty() && out_pending_msgs.empty()) { logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}", conn, out_seq, seq); @@ -360,13 +342,13 @@ void IOHandler::requeue_out_sent_up_to(seq_num_t seq) void IOHandler::reset_in() { - assert(io_state != io_state_t::open); + assert(get_io_state() != io_state_t::open); in_seq = 0; } void IOHandler::reset_out() { - assert(io_state != io_state_t::open); + assert(get_io_state() != io_state_t::open); discard_out_sent(); out_pending_msgs.clear(); need_keepalive = false; @@ -376,14 +358,14 @@ void IOHandler::reset_out() void IOHandler::discard_out_sent() { - assert(io_state != io_state_t::open); + assert(get_io_state() != io_state_t::open); out_seq = 0; out_sent_msgs.clear(); } void IOHandler::dispatch_accept() { - if (io_state == io_state_t::drop) { + if (get_io_state() == io_state_t::drop) { return; } // protocol_is_connected can be from true to true here if the replacing is @@ -395,7 +377,7 @@ void IOHandler::dispatch_accept() void IOHandler::dispatch_connect() { - if (io_state == io_state_t::drop) { + if (get_io_state() == io_state_t::drop) { return; } ceph_assert_always(protocol_is_connected == false); @@ -406,7 +388,7 @@ void IOHandler::dispatch_connect() void IOHandler::dispatch_reset(bool is_replace) { - ceph_assert_always(io_state == io_state_t::drop); + ceph_assert_always(get_io_state() == io_state_t::drop); if (!need_dispatch_reset) { return; } @@ -417,7 +399,7 @@ void IOHandler::dispatch_reset(bool is_replace) void IOHandler::dispatch_remote_reset() { - if (io_state == io_state_t::drop) { + if (get_io_state() == io_state_t::drop) { return; } ceph_assert_always(conn_ref); @@ -441,26 +423,18 @@ void IOHandler::ack_out_sent(seq_num_t seq) seastar::future<> IOHandler::do_out_dispatch() { return seastar::repeat([this] { - switch (io_state) { + switch (get_io_state()) { case io_state_t::open: { if (unlikely(!is_out_queued())) { // try exit open dispatching return frame_assembler->flush( ).then([this] { - if (io_state != io_state_t::open || is_out_queued()) { + if (get_io_state() != io_state_t::open || is_out_queued()) { return seastar::make_ready_future(stop_t::no); } // still nothing pending to send after flush, // open dispatching can ONLY stop now - ceph_assert(out_dispatching); - out_dispatching = false; - if (unlikely(out_exit_dispatching.has_value())) { - out_exit_dispatching->set_value(); - out_exit_dispatching = std::nullopt; - logger().info("{} do_out_dispatch: nothing queued at {}," - " set out_exit_dispatching", - conn, io_state); - } + shard_states->exit_out_dispatching("exit-open", conn); return seastar::make_ready_future(stop_t::yes); }); } @@ -471,7 +445,7 @@ seastar::future<> IOHandler::do_out_dispatch() sweep_out_pending_msgs_to_sent( need_keepalive, next_keepalive_ack, to_ack > 0) ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] { - if (io_state != io_state_t::open) { + if (get_io_state() != io_state_t::open) { return frame_assembler->flush( ).then([] { return seastar::make_ready_future(stop_t::no); @@ -492,30 +466,17 @@ seastar::future<> IOHandler::do_out_dispatch() } case io_state_t::delay: // delay out dispatching until open - if (out_exit_dispatching) { - out_exit_dispatching->set_value(); - out_exit_dispatching = std::nullopt; - logger().info("{} do_out_dispatch: delay and set out_exit_dispatching ...", conn); - } else { - logger().info("{} do_out_dispatch: delay ...", conn); - } - return io_state_changed.get_future( + shard_states->notify_out_dispatching_stopped("delay...", conn); + return shard_states->wait_state_change( ).then([] { return stop_t::no; }); case io_state_t::drop: - ceph_assert(out_dispatching); - out_dispatching = false; - if (out_exit_dispatching) { - out_exit_dispatching->set_value(); - out_exit_dispatching = std::nullopt; - logger().info("{} do_out_dispatch: dropped and set out_exit_dispatching", conn); - } else { - logger().info("{} do_out_dispatch: dropped", conn); - } + shard_states->exit_out_dispatching("dropped", conn); return seastar::make_ready_future(stop_t::yes); default: ceph_abort("impossible"); } }).handle_exception_type([this](const std::system_error& e) { + auto io_state = get_io_state(); if (e.code() != std::errc::broken_pipe && e.code() != std::errc::connection_reset && e.code() != error::negotiation_failure) { @@ -559,26 +520,11 @@ void IOHandler::notify_out_dispatch() if (need_notify_out) { handshake_listener->notify_out(); } - if (out_dispatching) { - // already dispatching - return; - } - - switch (io_state) { - case io_state_t::open: - [[fallthrough]]; - case io_state_t::delay: - out_dispatching = true; - assert(!gate.is_closed()); - gate.dispatch_in_background("do_out_dispatch", conn, [this] { + if (shard_states->try_enter_out_dispatching()) { + shard_states->dispatch_in_background( + "do_out_dispatch", conn, [this] { return do_out_dispatch(); }); - return; - case io_state_t::drop: - // do not dispatch out - return; - default: - ceph_abort("impossible"); } } @@ -589,9 +535,9 @@ IOHandler::read_message( { return frame_assembler->read_frame_payload( ).then([this, throttle_stamp, msg_size](auto payload) { - if (unlikely(io_state != io_state_t::open)) { + if (unlikely(get_io_state() != io_state_t::open)) { logger().debug("{} triggered {} during read_message()", - conn, io_state); + conn, get_io_state()); abort_protocol(); } @@ -693,7 +639,7 @@ IOHandler::read_message( // TODO: change MessageRef with seastar::shared_ptr auto msg_ref = MessageRef{message, false}; - assert(io_state == io_state_t::open); + assert(get_io_state() == io_state_t::open); ceph_assert_always(conn_ref); // throttle the reading process by the returned future return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); @@ -702,9 +648,8 @@ IOHandler::read_message( void IOHandler::do_in_dispatch() { - ceph_assert_always(!in_exit_dispatching.has_value()); - in_exit_dispatching = seastar::promise<>(); - gate.dispatch_in_background( + shard_states->enter_in_dispatching(); + shard_states->dispatch_in_background( "do_in_dispatch", conn, [this] { return seastar::keep_doing([this] { return frame_assembler->read_main_preamble( @@ -786,6 +731,7 @@ void IOHandler::do_in_dispatch() e_what = e.what(); } + auto io_state = get_io_state(); if (io_state == io_state_t::open) { logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}", conn, io_state, e_what); @@ -798,9 +744,7 @@ void IOHandler::do_in_dispatch() conn, io_state, e_what); } }).finally([this] { - ceph_assert_always(in_exit_dispatching.has_value()); - in_exit_dispatching->set_value(); - in_exit_dispatching = std::nullopt; + shard_states->exit_in_dispatching(); }); }); } @@ -808,7 +752,7 @@ void IOHandler::do_in_dispatch() seastar::future<> IOHandler::close_io(bool is_dispatch_reset, bool is_replace) { - ceph_assert_always(io_state == io_state_t::drop); + ceph_assert_always(get_io_state() == io_state_t::drop); if (is_dispatch_reset) { dispatch_reset(is_replace); @@ -817,8 +761,56 @@ IOHandler::close_io(bool is_dispatch_reset, bool is_replace) ceph_assert_always(conn_ref); conn_ref.reset(); + return shard_states->close( + ).then([this] { + assert(shard_states->assert_closed_and_exit()); + }); +} + +/* + * IOHandler::shard_states_t + */ + +void +IOHandler::shard_states_t::notify_out_dispatching_stopped( + const char *what, SocketConnection &conn) +{ + assert(seastar::this_shard_id() == sid); + if (unlikely(out_exit_dispatching.has_value())) { + out_exit_dispatching->set_value(); + out_exit_dispatching = std::nullopt; + logger().info("{} do_out_dispatch: stop({}) at {}, set out_exit_dispatching", + conn, what, io_state); + } else { + if (unlikely(io_state != io_state_t::open)) { + logger().info("{} do_out_dispatch: stop({}) at {}, no out_exit_dispatching", + conn, what, io_state); + } + } +} + +seastar::future<> +IOHandler::shard_states_t::wait_io_exit_dispatching() +{ + assert(seastar::this_shard_id() == sid); + assert(io_state != io_state_t::open); assert(!gate.is_closed()); - return gate.close(); + return seastar::when_all( + [this] { + if (out_exit_dispatching) { + return out_exit_dispatching->get_future(); + } else { + return seastar::now(); + } + }(), + [this] { + if (in_exit_dispatching) { + return in_exit_dispatching->get_future(); + } else { + return seastar::now(); + } + }() + ).discard_result(); } } // namespace crimson::net diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h index f3220d994ba9d..76e0cc010bc87 100644 --- a/src/crimson/net/io_handler.h +++ b/src/crimson/net/io_handler.h @@ -110,11 +110,11 @@ public: */ private: seastar::shard_id get_shard_id() const final { - return sid; + return shard_states->get_shard_id(); } bool is_connected() const final { - ceph_assert_always(seastar::this_shard_id() == sid); + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); return protocol_is_connected; } @@ -123,17 +123,17 @@ private: seastar::future<> send_keepalive() final; clock_t::time_point get_last_keepalive() const final { - ceph_assert_always(seastar::this_shard_id() == sid); + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); return last_keepalive; } clock_t::time_point get_last_keepalive_ack() const final { - ceph_assert_always(seastar::this_shard_id() == sid); + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); return last_keepalive_ack; } void set_last_keepalive_ack(clock_t::time_point when) final { - ceph_assert_always(seastar::this_shard_id() == sid); + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); last_keepalive_ack = when; } @@ -199,6 +199,138 @@ public: void dispatch_connect(); private: + class shard_states_t; + using shard_states_ref_t = std::unique_ptr; + + class shard_states_t { + public: + shard_states_t(seastar::shard_id _sid, io_state_t state) + : sid{_sid}, io_state{state} {} + + seastar::shard_id get_shard_id() const { + return sid; + } + + io_state_t get_io_state() const { + assert(seastar::this_shard_id() == sid); + return io_state; + } + + void set_io_state(io_state_t new_state) { + assert(seastar::this_shard_id() == sid); + assert(io_state != new_state); + pr_io_state_changed.set_value(); + pr_io_state_changed = seastar::promise<>(); + if (io_state == io_state_t::open) { + // from open + if (out_dispatching) { + ceph_assert_always(!out_exit_dispatching.has_value()); + out_exit_dispatching = seastar::promise<>(); + } + } + io_state = new_state; + } + + seastar::future<> wait_state_change() { + assert(seastar::this_shard_id() == sid); + return pr_io_state_changed.get_future(); + } + + template + void dispatch_in_background( + const char *what, SocketConnection &who, Func &&func) { + assert(seastar::this_shard_id() == sid); + ceph_assert_always(!gate.is_closed()); + gate.dispatch_in_background(what, who, std::move(func)); + } + + void enter_in_dispatching() { + assert(seastar::this_shard_id() == sid); + assert(io_state == io_state_t::open); + ceph_assert_always(!in_exit_dispatching.has_value()); + in_exit_dispatching = seastar::promise<>(); + } + + void exit_in_dispatching() { + assert(seastar::this_shard_id() == sid); + assert(io_state != io_state_t::open); + ceph_assert_always(in_exit_dispatching.has_value()); + in_exit_dispatching->set_value(); + in_exit_dispatching = std::nullopt; + } + + bool try_enter_out_dispatching() { + assert(seastar::this_shard_id() == sid); + if (out_dispatching) { + // already dispatching out + return false; + } + switch (io_state) { + case io_state_t::open: + [[fallthrough]]; + case io_state_t::delay: + out_dispatching = true; + return true; + case io_state_t::drop: + // do not dispatch out + return false; + default: + ceph_abort("impossible"); + } + } + + void notify_out_dispatching_stopped( + const char *what, SocketConnection &conn); + + void exit_out_dispatching( + const char *what, SocketConnection &conn) { + assert(seastar::this_shard_id() == sid); + ceph_assert_always(out_dispatching); + out_dispatching = false; + notify_out_dispatching_stopped(what, conn); + } + + seastar::future<> wait_io_exit_dispatching(); + + seastar::future<> close() { + assert(seastar::this_shard_id() == sid); + assert(!gate.is_closed()); + return gate.close(); + } + + bool assert_closed_and_exit() const { + assert(seastar::this_shard_id() == sid); + if (gate.is_closed()) { + ceph_assert_always(io_state == io_state_t::drop); + ceph_assert_always(!out_dispatching); + ceph_assert_always(!out_exit_dispatching); + ceph_assert_always(!in_exit_dispatching); + return true; + } else { + return false; + } + } + + static shard_states_ref_t create( + seastar::shard_id sid, io_state_t state) { + return std::make_unique(sid, state); + } + + private: + const seastar::shard_id sid; + io_state_t io_state; + + crimson::common::Gated gate; + seastar::promise<> pr_io_state_changed; + bool out_dispatching = false; + std::optional> out_exit_dispatching; + std::optional> in_exit_dispatching; + }; + + io_state_t get_io_state() const { + return shard_states->get_io_state(); + } + seastar::future<> do_send(MessageFRef msg); seastar::future<> do_send_keepalive(); @@ -244,7 +376,7 @@ public: void do_in_dispatch(); private: - seastar::shard_id sid; + shard_states_ref_t shard_states; ChainedDispatchers &dispatchers; @@ -255,27 +387,16 @@ private: HandshakeListener *handshake_listener = nullptr; - crimson::common::Gated gate; - FrameAssemblerV2Ref frame_assembler; bool protocol_is_connected = false; bool need_dispatch_reset = true; - io_state_t io_state = io_state_t::none; - - // wait until current io_state changed - seastar::promise<> io_state_changed; - /* * out states for writing */ - bool out_dispatching = false; - - std::optional> out_exit_dispatching; - /// the seq num of the last transmitted message seq_num_t out_seq = 0; @@ -297,8 +418,6 @@ private: * in states for reading */ - std::optional> in_exit_dispatching; - /// the seq num of the last received message seq_num_t in_seq = 0; -- 2.39.5