From: Yingxin Cheng Date: Thu, 1 Jun 2023 10:37:41 +0000 (+0800) Subject: crimson/net: misc cleanups to io handler implementations X-Git-Tag: v18.2.1~166^2~15 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=edfb0bdc0120ef06b136a70c38d06062f84ed71c;p=ceph.git crimson/net: misc cleanups to io handler implementations Signed-off-by: Yingxin Cheng (cherry picked from commit 8d0f5943aaa91c220011c67fc43a8aa51c1df52b) --- diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index ad96c7b6688a5..908977da36b51 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -70,21 +70,15 @@ bool SocketConnection::peer_wins() const seastar::future<> SocketConnection::send(MessageURef _msg) { + // may be invoked from any core MessageFRef msg = seastar::make_foreign(std::move(_msg)); - return seastar::smp::submit_to( - io_handler->get_shard_id(), - [this, msg=std::move(msg)]() mutable { - return io_handler->send(std::move(msg)); - }); + return io_handler->send(std::move(msg)); } seastar::future<> SocketConnection::send_keepalive() { - return seastar::smp::submit_to( - io_handler->get_shard_id(), - [this] { - return io_handler->send_keepalive(); - }); + // may be invoked from any core + return io_handler->send_keepalive(); } SocketConnection::clock_t::time_point diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index cbd16013e0708..22dfa538b19d9 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -55,8 +55,10 @@ IOHandler::IOHandler(ChainedDispatchers &dispatchers, IOHandler::~IOHandler() { + // close_io() must be finished ceph_assert(gate.is_closed()); assert(!out_exit_dispatching); + assert(!conn_ref); } ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent( @@ -124,7 +126,19 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent( seastar::future<> IOHandler::send(MessageFRef msg) { - ceph_assert_always(seastar::this_shard_id() == sid); + if (seastar::this_shard_id() == sid) { + return do_send(std::move(msg)); + } else { + return seastar::smp::submit_to( + sid, [this, msg=std::move(msg)]() mutable { + return do_send(std::move(msg)); + }); + } +} + +seastar::future<> IOHandler::do_send(MessageFRef msg) +{ + assert(seastar::this_shard_id() == sid); if (io_state != io_state_t::drop) { out_pending_msgs.push_back(std::move(msg)); notify_out_dispatch(); @@ -134,7 +148,19 @@ seastar::future<> IOHandler::send(MessageFRef msg) seastar::future<> IOHandler::send_keepalive() { - ceph_assert_always(seastar::this_shard_id() == sid); + if (seastar::this_shard_id() == sid) { + return do_send_keepalive(); + } else { + return seastar::smp::submit_to( + sid, [this] { + return do_send_keepalive(); + }); + } +} + +seastar::future<> IOHandler::do_send_keepalive() +{ + assert(seastar::this_shard_id() == sid); if (!need_keepalive) { need_keepalive = true; notify_out_dispatch(); @@ -176,10 +202,11 @@ void IOHandler::set_io_state( FrameAssemblerV2Ref fa, bool set_notify_out) { + auto prv_state = io_state; ceph_assert_always(!( - (new_state == io_state_t::none && io_state != io_state_t::none) || - (new_state == io_state_t::open && io_state == io_state_t::open) || - (new_state != io_state_t::drop && io_state == io_state_t::drop) + (new_state == io_state_t::none && prv_state != io_state_t::none) || + (new_state == io_state_t::open && prv_state == io_state_t::open) || + (new_state != io_state_t::drop && prv_state == io_state_t::drop) )); bool dispatch_in = false; @@ -198,7 +225,7 @@ void IOHandler::set_io_state( conn.get_local_shared_foreign_from_this()); } #endif - } else if (io_state == io_state_t::open) { + } else if (prv_state == io_state_t::open) { // from open ceph_assert_always(protocol_is_connected == true); protocol_is_connected = false; @@ -223,7 +250,8 @@ void IOHandler::set_io_state( need_notify_out = false; } - if (io_state != new_state) { + // FIXME: simplify and drop the prv_state == new_state case + if (prv_state != new_state) { io_state = new_state; io_state_changed.set_value(); io_state_changed = seastar::promise<>(); @@ -260,6 +288,8 @@ IOHandler::wait_io_exit_dispatching() } }() ).discard_result().then([this] { + ceph_assert_always(frame_assembler != nullptr); + ceph_assert_always(!frame_assembler->is_socket_valid()); return exit_dispatching_ret{ std::move(frame_assembler), get_states()}; @@ -359,6 +389,7 @@ void IOHandler::dispatch_accept() // protocol_is_connected can be from true to true here if the replacing is // happening to a connected connection. protocol_is_connected = true; + ceph_assert_always(conn_ref); dispatchers.ms_handle_accept(conn_ref); } @@ -369,6 +400,7 @@ void IOHandler::dispatch_connect() } ceph_assert_always(protocol_is_connected == false); protocol_is_connected = true; + ceph_assert_always(conn_ref); dispatchers.ms_handle_connect(conn_ref); } @@ -379,6 +411,7 @@ void IOHandler::dispatch_reset(bool is_replace) return; } need_dispatch_reset = false; + ceph_assert_always(conn_ref); dispatchers.ms_handle_reset(conn_ref, is_replace); } @@ -387,6 +420,7 @@ void IOHandler::dispatch_remote_reset() if (io_state == io_state_t::drop) { return; } + ceph_assert_always(conn_ref); dispatchers.ms_handle_remote_reset(conn_ref); } @@ -404,57 +438,56 @@ void IOHandler::ack_out_sent(seq_num_t seq) } } -seastar::future IOHandler::try_exit_out_dispatch() { - assert(!is_out_queued()); - return frame_assembler->flush( - ).then([this] { - if (!is_out_queued()) { - // still nothing pending to send after flush, - // the dispatching can ONLY stop now - ceph_assert(out_dispatching); - out_dispatching = false; - if (unlikely(out_exit_dispatching.has_value())) { - out_exit_dispatching->set_value(); - out_exit_dispatching = std::nullopt; - logger().info("{} do_out_dispatch: nothing queued at {}," - " set out_exit_dispatching", - conn, io_state); - } - return seastar::make_ready_future(stop_t::yes); - } else { - // something is pending to send during flushing - return seastar::make_ready_future(stop_t::no); - } - }); -} - seastar::future<> IOHandler::do_out_dispatch() { return seastar::repeat([this] { switch (io_state) { case io_state_t::open: { - bool still_queued = is_out_queued(); - if (unlikely(!still_queued)) { - return try_exit_out_dispatch(); + if (unlikely(!is_out_queued())) { + // try exit open dispatching + return frame_assembler->flush( + ).then([this] { + if (io_state != io_state_t::open || is_out_queued()) { + return seastar::make_ready_future(stop_t::no); + } + // still nothing pending to send after flush, + // open dispatching can ONLY stop now + ceph_assert(out_dispatching); + out_dispatching = false; + if (unlikely(out_exit_dispatching.has_value())) { + out_exit_dispatching->set_value(); + out_exit_dispatching = std::nullopt; + logger().info("{} do_out_dispatch: nothing queued at {}," + " set out_exit_dispatching", + conn, io_state); + } + return seastar::make_ready_future(stop_t::yes); + }); } + auto to_ack = ack_left; assert(to_ack == 0 || in_seq > 0); return frame_assembler->write( sweep_out_pending_msgs_to_sent( need_keepalive, next_keepalive_ack, to_ack > 0) ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] { + if (io_state != io_state_t::open) { + return frame_assembler->flush( + ).then([] { + return seastar::make_ready_future(stop_t::no); + }); + } + need_keepalive = false; if (next_keepalive_ack == prv_keepalive_ack) { next_keepalive_ack = std::nullopt; } assert(ack_left >= to_ack); ack_left -= to_ack; - if (!is_out_queued()) { - return try_exit_out_dispatch(); - } else { - // messages were enqueued during socket write - return seastar::make_ready_future(stop_t::no); - } + + // FIXME: may leak a flush if state is changed after return and before + // the next repeat body. + return seastar::make_ready_future(stop_t::no); }); } case io_state_t::delay: @@ -480,9 +513,9 @@ seastar::future<> IOHandler::do_out_dispatch() } return seastar::make_ready_future(stop_t::yes); default: - ceph_assert(false); + ceph_abort("impossible"); } - }).handle_exception_type([this] (const std::system_error& e) { + }).handle_exception_type([this](const std::system_error& e) { if (e.code() != std::errc::broken_pipe && e.code() != std::errc::connection_reset && e.code() != error::negotiation_failure) { @@ -522,6 +555,7 @@ void IOHandler::maybe_notify_out_dispatch() void IOHandler::notify_out_dispatch() { + assert(is_out_queued()); if (need_notify_out) { handshake_listener->notify_out(); } @@ -529,26 +563,29 @@ void IOHandler::notify_out_dispatch() // already dispatching return; } - out_dispatching = true; + switch (io_state) { - case io_state_t::open: - [[fallthrough]]; - case io_state_t::delay: + case io_state_t::open: + [[fallthrough]]; + case io_state_t::delay: + out_dispatching = true; assert(!gate.is_closed()); gate.dispatch_in_background("do_out_dispatch", conn, [this] { return do_out_dispatch(); }); return; - case io_state_t::drop: - out_dispatching = false; + case io_state_t::drop: + // do not dispatch out return; - default: - ceph_assert(false); + default: + ceph_abort("impossible"); } } seastar::future<> -IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size) +IOHandler::read_message( + utime_t throttle_stamp, + std::size_t msg_size) { return frame_assembler->read_frame_payload( ).then([this, throttle_stamp, msg_size](auto payload) { @@ -657,6 +694,7 @@ IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size) // TODO: change MessageRef with seastar::shared_ptr auto msg_ref = MessageRef{message, false}; assert(io_state == io_state_t::open); + ceph_assert_always(conn_ref); // throttle the reading process by the returned future return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); }); @@ -666,7 +704,8 @@ void IOHandler::do_in_dispatch() { ceph_assert_always(!in_exit_dispatching.has_value()); in_exit_dispatching = seastar::promise<>(); - gate.dispatch_in_background("do_in_dispatch", conn, [this] { + gate.dispatch_in_background( + "do_in_dispatch", conn, [this] { return seastar::keep_doing([this] { return frame_assembler->read_main_preamble( ).then([this](auto ret) { @@ -679,7 +718,7 @@ void IOHandler::do_in_dispatch() return seastar::now(); } // TODO: message throttler - ceph_assert(false); + ceph_abort("TODO"); return seastar::now(); }).then([this, msg_size] { // throttle_bytes() logic @@ -766,4 +805,20 @@ void IOHandler::do_in_dispatch() }); } +seastar::future<> +IOHandler::close_io(bool is_dispatch_reset, bool is_replace) +{ + ceph_assert_always(io_state == io_state_t::drop); + + if (is_dispatch_reset) { + dispatch_reset(is_replace); + } + + ceph_assert_always(conn_ref); + conn_ref.reset(); + + assert(!gate.is_closed()); + return gate.close(); +} + } // namespace crimson::net diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h index db82de5160ec8..f3220d994ba9d 100644 --- a/src/crimson/net/io_handler.h +++ b/src/crimson/net/io_handler.h @@ -159,21 +159,7 @@ public: }; void print_io_stat(std::ostream &out) const; - seastar::future<> close_io( - bool is_dispatch_reset, - bool is_replace) { - ceph_assert_always(io_state == io_state_t::drop); - - if (is_dispatch_reset) { - dispatch_reset(is_replace); - } - - ceph_assert_always(conn_ref); - conn_ref.reset(); - - assert(!gate.is_closed()); - return gate.close(); - } + seastar::future<> close_io(bool is_dispatch_reset, bool is_replace); /** * io_state_t @@ -213,6 +199,10 @@ public: void dispatch_connect(); private: + seastar::future<> do_send(MessageFRef msg); + + seastar::future<> do_send_keepalive(); + void dispatch_reset(bool is_replace); void dispatch_remote_reset(); @@ -224,14 +214,16 @@ public: next_keepalive_ack.has_value()); } + bool has_out_sent() const { + return !out_sent_msgs.empty(); + } + void reset_in(); void reset_out(); void discard_out_sent(); - seastar::future try_exit_out_dispatch(); - seastar::future<> do_out_dispatch(); ceph::bufferlist sweep_out_pending_msgs_to_sent( @@ -245,7 +237,9 @@ public: void ack_out_sent(seq_num_t seq); - seastar::future<> read_message(utime_t throttle_stamp, std::size_t msg_size); + seastar::future<> read_message( + utime_t throttle_stamp, + std::size_t msg_size); void do_in_dispatch();