]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: allow io-handler to dispatch in/out independently based on ctx
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 2 Jun 2023 08:06:33 +0000 (16:06 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 25 Jun 2023 03:57:19 +0000 (11:57 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/io_handler.cc
src/crimson/net/io_handler.h

index 49b13fe4484e0308841323261e759a29f0dbda23..2b7dee0c2d19f424dd98a339ad587f05a4e9702a 100644 (file)
@@ -456,21 +456,22 @@ void IOHandler::ack_out_sent(seq_num_t seq)
   }
 }
 
-seastar::future<> IOHandler::do_out_dispatch()
+seastar::future<>
+IOHandler::do_out_dispatch(shard_states_t &ctx)
 {
-  return seastar::repeat([this] {
-    switch (get_io_state()) {
+  return seastar::repeat([this, &ctx] {
+    switch (ctx.get_io_state()) {
      case io_state_t::open: {
       if (unlikely(!is_out_queued())) {
         // try exit open dispatching
         return frame_assembler->flush<false>(
-        ).then([this] {
-          if (get_io_state() != io_state_t::open || is_out_queued()) {
+        ).then([this, &ctx] {
+          if (ctx.get_io_state() != io_state_t::open || is_out_queued()) {
             return seastar::make_ready_future<stop_t>(stop_t::no);
           }
           // still nothing pending to send after flush,
           // open dispatching can ONLY stop now
-          shard_states->exit_out_dispatching("exit-open", conn);
+          ctx.exit_out_dispatching("exit-open", conn);
           return seastar::make_ready_future<stop_t>(stop_t::yes);
         });
       }
@@ -480,8 +481,8 @@ seastar::future<> IOHandler::do_out_dispatch()
       return frame_assembler->write<false>(
         sweep_out_pending_msgs_to_sent(
           need_keepalive, next_keepalive_ack, to_ack > 0)
-      ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
-        if (get_io_state() != io_state_t::open) {
+      ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack, &ctx] {
+        if (ctx.get_io_state() != io_state_t::open) {
           return frame_assembler->flush<false>(
           ).then([] {
             return seastar::make_ready_future<stop_t>(stop_t::no);
@@ -502,17 +503,17 @@ seastar::future<> IOHandler::do_out_dispatch()
      }
      case io_state_t::delay:
       // delay out dispatching until open
-      shard_states->notify_out_dispatching_stopped("delay...", conn);
-      return shard_states->wait_state_change(
+      ctx.notify_out_dispatching_stopped("delay...", conn);
+      return ctx.wait_state_change(
       ).then([] { return stop_t::no; });
      case io_state_t::drop:
-      shard_states->exit_out_dispatching("dropped", conn);
+      ctx.exit_out_dispatching("dropped", conn);
       return seastar::make_ready_future<stop_t>(stop_t::yes);
      default:
       ceph_abort("impossible");
     }
-  }).handle_exception_type([this](const std::system_error& e) {
-    auto io_state = get_io_state();
+  }).handle_exception_type([this, &ctx](const std::system_error& e) {
+    auto io_state = ctx.get_io_state();
     if (e.code() != std::errc::broken_pipe &&
         e.code() != std::errc::connection_reset &&
         e.code() != error::negotiation_failure) {
@@ -539,7 +540,7 @@ seastar::future<> IOHandler::do_out_dispatch()
                     conn, io_state, e.what());
     }
 
-    return do_out_dispatch();
+    return do_out_dispatch(ctx);
   });
 }
 
@@ -559,21 +560,22 @@ void IOHandler::notify_out_dispatch()
   if (shard_states->try_enter_out_dispatching()) {
     shard_states->dispatch_in_background(
         "do_out_dispatch", conn, [this] {
-      return do_out_dispatch();
+      return do_out_dispatch(*shard_states);
     });
   }
 }
 
 seastar::future<>
 IOHandler::read_message(
+    shard_states_t &ctx,
     utime_t throttle_stamp,
     std::size_t msg_size)
 {
   return frame_assembler->read_frame_payload<false>(
-  ).then([this, throttle_stamp, msg_size](auto payload) {
-    if (unlikely(get_io_state() != io_state_t::open)) {
+  ).then([this, throttle_stamp, msg_size, &ctx](auto payload) {
+    if (unlikely(ctx.get_io_state() != io_state_t::open)) {
       logger().debug("{} triggered {} during read_message()",
-                     conn, get_io_state());
+                     conn, ctx.get_io_state());
       abort_protocol();
     }
 
@@ -675,6 +677,7 @@ IOHandler::read_message(
 
     // TODO: change MessageRef with seastar::shared_ptr
     auto msg_ref = MessageRef{message, false};
+    assert(ctx.get_io_state() == io_state_t::open);
     assert(get_io_state() == io_state_t::open);
     ceph_assert_always(conn_ref);
     // throttle the reading process by the returned future
@@ -686,10 +689,10 @@ void IOHandler::do_in_dispatch()
 {
   shard_states->enter_in_dispatching();
   shard_states->dispatch_in_background(
-      "do_in_dispatch", conn, [this] {
-    return seastar::keep_doing([this] {
+      "do_in_dispatch", conn, [this, &ctx=*shard_states] {
+    return seastar::keep_doing([this, &ctx] {
       return frame_assembler->read_main_preamble<false>(
-      ).then([this](auto ret) {
+      ).then([this, &ctx](auto ret) {
         switch (ret.tag) {
           case Tag::MESSAGE: {
             size_t msg_size = get_msg_size(*ret.rx_frame_asm);
@@ -714,10 +717,10 @@ void IOHandler::do_in_dispatch()
                              conn.policy.throttler_bytes->get_current(),
                              conn.policy.throttler_bytes->get_max());
               return conn.policy.throttler_bytes->get(msg_size);
-            }).then([this, msg_size] {
+            }).then([this, msg_size, &ctx] {
               // TODO: throttle_dispatch_queue() logic
               utime_t throttle_stamp{seastar::lowres_system_clock::now()};
-              return read_message(throttle_stamp, msg_size);
+              return read_message(ctx, throttle_stamp, msg_size);
             });
           }
           case Tag::ACK:
@@ -737,7 +740,9 @@ void IOHandler::do_in_dispatch()
                              conn, keepalive_frame.timestamp());
               // notify keepalive ack
               next_keepalive_ack = keepalive_frame.timestamp();
-              notify_out_dispatch();
+              if (seastar::this_shard_id() == get_shard_id()) {
+                notify_out_dispatch();
+              }
 
               last_keepalive = seastar::lowres_system_clock::now();
             });
@@ -759,7 +764,7 @@ void IOHandler::do_in_dispatch()
           }
         }
       });
-    }).handle_exception([this](std::exception_ptr eptr) {
+    }).handle_exception([this, &ctx](std::exception_ptr eptr) {
       const char *e_what;
       try {
         std::rethrow_exception(eptr);
@@ -767,7 +772,7 @@ void IOHandler::do_in_dispatch()
         e_what = e.what();
       }
 
-      auto io_state = get_io_state();
+      auto io_state = ctx.get_io_state();
       if (io_state == io_state_t::open) {
         logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
                       conn, io_state, e_what);
@@ -779,8 +784,8 @@ void IOHandler::do_in_dispatch()
         logger().info("{} do_in_dispatch(): fault at {} -- {}",
                       conn, io_state, e_what);
       }
-    }).finally([this] {
-      shard_states->exit_in_dispatching();
+    }).finally([&ctx] {
+      ctx.exit_in_dispatching();
     });
   });
 }
index 122e6281829f61647eab2afd836b51d6ad31e21c..cd85604dafdc1476afd6d04ac9262f6dfa2f460a 100644 (file)
@@ -360,7 +360,7 @@ public:
 
   void discard_out_sent();
 
-  seastar::future<> do_out_dispatch();
+  seastar::future<> do_out_dispatch(shard_states_t &ctx);
 
   ceph::bufferlist sweep_out_pending_msgs_to_sent(
       bool require_keepalive,
@@ -374,6 +374,7 @@ public:
   void ack_out_sent(seq_num_t seq);
 
   seastar::future<> read_message(
+      shard_states_t &ctx,
       utime_t throttle_stamp,
       std::size_t msg_size);