From ca30d02ccb40c70c8a8608af5ef5f339d343cb79 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 2 Jun 2023 16:06:33 +0800 Subject: [PATCH] crimson/net: allow io-handler to dispatch in/out independently based on ctx Signed-off-by: Yingxin Cheng --- src/crimson/net/io_handler.cc | 61 +++++++++++++++++++---------------- src/crimson/net/io_handler.h | 3 +- 2 files changed, 35 insertions(+), 29 deletions(-) diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index 49b13fe4484..2b7dee0c2d1 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -456,21 +456,22 @@ void IOHandler::ack_out_sent(seq_num_t seq) } } -seastar::future<> IOHandler::do_out_dispatch() +seastar::future<> +IOHandler::do_out_dispatch(shard_states_t &ctx) { - return seastar::repeat([this] { - switch (get_io_state()) { + return seastar::repeat([this, &ctx] { + switch (ctx.get_io_state()) { case io_state_t::open: { if (unlikely(!is_out_queued())) { // try exit open dispatching return frame_assembler->flush( - ).then([this] { - if (get_io_state() != io_state_t::open || is_out_queued()) { + ).then([this, &ctx] { + if (ctx.get_io_state() != io_state_t::open || is_out_queued()) { return seastar::make_ready_future(stop_t::no); } // still nothing pending to send after flush, // open dispatching can ONLY stop now - shard_states->exit_out_dispatching("exit-open", conn); + ctx.exit_out_dispatching("exit-open", conn); return seastar::make_ready_future(stop_t::yes); }); } @@ -480,8 +481,8 @@ seastar::future<> IOHandler::do_out_dispatch() return frame_assembler->write( sweep_out_pending_msgs_to_sent( need_keepalive, next_keepalive_ack, to_ack > 0) - ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] { - if (get_io_state() != io_state_t::open) { + ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack, &ctx] { + if (ctx.get_io_state() != io_state_t::open) { return frame_assembler->flush( ).then([] { return seastar::make_ready_future(stop_t::no); @@ -502,17 +503,17 @@ seastar::future<> IOHandler::do_out_dispatch() } case io_state_t::delay: // delay out dispatching until open - shard_states->notify_out_dispatching_stopped("delay...", conn); - return shard_states->wait_state_change( + ctx.notify_out_dispatching_stopped("delay...", conn); + return ctx.wait_state_change( ).then([] { return stop_t::no; }); case io_state_t::drop: - shard_states->exit_out_dispatching("dropped", conn); + ctx.exit_out_dispatching("dropped", conn); return seastar::make_ready_future(stop_t::yes); default: ceph_abort("impossible"); } - }).handle_exception_type([this](const std::system_error& e) { - auto io_state = get_io_state(); + }).handle_exception_type([this, &ctx](const std::system_error& e) { + auto io_state = ctx.get_io_state(); if (e.code() != std::errc::broken_pipe && e.code() != std::errc::connection_reset && e.code() != error::negotiation_failure) { @@ -539,7 +540,7 @@ seastar::future<> IOHandler::do_out_dispatch() conn, io_state, e.what()); } - return do_out_dispatch(); + return do_out_dispatch(ctx); }); } @@ -559,21 +560,22 @@ void IOHandler::notify_out_dispatch() if (shard_states->try_enter_out_dispatching()) { shard_states->dispatch_in_background( "do_out_dispatch", conn, [this] { - return do_out_dispatch(); + return do_out_dispatch(*shard_states); }); } } seastar::future<> IOHandler::read_message( + shard_states_t &ctx, utime_t throttle_stamp, std::size_t msg_size) { return frame_assembler->read_frame_payload( - ).then([this, throttle_stamp, msg_size](auto payload) { - if (unlikely(get_io_state() != io_state_t::open)) { + ).then([this, throttle_stamp, msg_size, &ctx](auto payload) { + if (unlikely(ctx.get_io_state() != io_state_t::open)) { logger().debug("{} triggered {} during read_message()", - conn, get_io_state()); + conn, ctx.get_io_state()); abort_protocol(); } @@ -675,6 +677,7 @@ IOHandler::read_message( // TODO: change MessageRef with seastar::shared_ptr auto msg_ref = MessageRef{message, false}; + assert(ctx.get_io_state() == io_state_t::open); assert(get_io_state() == io_state_t::open); ceph_assert_always(conn_ref); // throttle the reading process by the returned future @@ -686,10 +689,10 @@ void IOHandler::do_in_dispatch() { shard_states->enter_in_dispatching(); shard_states->dispatch_in_background( - "do_in_dispatch", conn, [this] { - return seastar::keep_doing([this] { + "do_in_dispatch", conn, [this, &ctx=*shard_states] { + return seastar::keep_doing([this, &ctx] { return frame_assembler->read_main_preamble( - ).then([this](auto ret) { + ).then([this, &ctx](auto ret) { switch (ret.tag) { case Tag::MESSAGE: { size_t msg_size = get_msg_size(*ret.rx_frame_asm); @@ -714,10 +717,10 @@ void IOHandler::do_in_dispatch() conn.policy.throttler_bytes->get_current(), conn.policy.throttler_bytes->get_max()); return conn.policy.throttler_bytes->get(msg_size); - }).then([this, msg_size] { + }).then([this, msg_size, &ctx] { // TODO: throttle_dispatch_queue() logic utime_t throttle_stamp{seastar::lowres_system_clock::now()}; - return read_message(throttle_stamp, msg_size); + return read_message(ctx, throttle_stamp, msg_size); }); } case Tag::ACK: @@ -737,7 +740,9 @@ void IOHandler::do_in_dispatch() conn, keepalive_frame.timestamp()); // notify keepalive ack next_keepalive_ack = keepalive_frame.timestamp(); - notify_out_dispatch(); + if (seastar::this_shard_id() == get_shard_id()) { + notify_out_dispatch(); + } last_keepalive = seastar::lowres_system_clock::now(); }); @@ -759,7 +764,7 @@ void IOHandler::do_in_dispatch() } } }); - }).handle_exception([this](std::exception_ptr eptr) { + }).handle_exception([this, &ctx](std::exception_ptr eptr) { const char *e_what; try { std::rethrow_exception(eptr); @@ -767,7 +772,7 @@ void IOHandler::do_in_dispatch() e_what = e.what(); } - auto io_state = get_io_state(); + auto io_state = ctx.get_io_state(); if (io_state == io_state_t::open) { logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}", conn, io_state, e_what); @@ -779,8 +784,8 @@ void IOHandler::do_in_dispatch() logger().info("{} do_in_dispatch(): fault at {} -- {}", conn, io_state, e_what); } - }).finally([this] { - shard_states->exit_in_dispatching(); + }).finally([&ctx] { + ctx.exit_in_dispatching(); }); }); } diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h index 122e6281829..cd85604dafd 100644 --- a/src/crimson/net/io_handler.h +++ b/src/crimson/net/io_handler.h @@ -360,7 +360,7 @@ public: void discard_out_sent(); - seastar::future<> do_out_dispatch(); + seastar::future<> do_out_dispatch(shard_states_t &ctx); ceph::bufferlist sweep_out_pending_msgs_to_sent( bool require_keepalive, @@ -374,6 +374,7 @@ public: void ack_out_sent(seq_num_t seq); seastar::future<> read_message( + shard_states_t &ctx, utime_t throttle_stamp, std::size_t msg_size); -- 2.39.5