]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: misc cleanups to io handler implementations
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 1 Jun 2023 10:37:41 +0000 (18:37 +0800)
committerMatan Breizman <mbreizma@redhat.com>
Wed, 11 Oct 2023 11:38:32 +0000 (11:38 +0000)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
(cherry picked from commit 8d0f5943aaa91c220011c67fc43a8aa51c1df52b)

src/crimson/net/SocketConnection.cc
src/crimson/net/io_handler.cc
src/crimson/net/io_handler.h

index ad96c7b6688a5a97798804db3b30df7de6395a44..908977da36b519d4e2d23cea658628e863568da6 100644 (file)
@@ -70,21 +70,15 @@ bool SocketConnection::peer_wins() const
 
 seastar::future<> SocketConnection::send(MessageURef _msg)
 {
+  // may be invoked from any core
   MessageFRef msg = seastar::make_foreign(std::move(_msg));
-  return seastar::smp::submit_to(
-    io_handler->get_shard_id(),
-    [this, msg=std::move(msg)]() mutable {
-      return io_handler->send(std::move(msg));
-    });
+  return io_handler->send(std::move(msg));
 }
 
 seastar::future<> SocketConnection::send_keepalive()
 {
-  return seastar::smp::submit_to(
-    io_handler->get_shard_id(),
-    [this] {
-      return io_handler->send_keepalive();
-    });
+  // may be invoked from any core
+  return io_handler->send_keepalive();
 }
 
 SocketConnection::clock_t::time_point
index cbd16013e0708078f1d86fc9929995c3a11606f3..22dfa538b19d9a81004936a1e1104f129fb05f8b 100644 (file)
@@ -55,8 +55,10 @@ IOHandler::IOHandler(ChainedDispatchers &dispatchers,
 
 IOHandler::~IOHandler()
 {
+  // close_io() must be finished
   ceph_assert(gate.is_closed());
   assert(!out_exit_dispatching);
+  assert(!conn_ref);
 }
 
 ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
@@ -124,7 +126,19 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
 
 seastar::future<> IOHandler::send(MessageFRef msg)
 {
-  ceph_assert_always(seastar::this_shard_id() == sid);
+  if (seastar::this_shard_id() == sid) {
+    return do_send(std::move(msg));
+  } else {
+    return seastar::smp::submit_to(
+        sid, [this, msg=std::move(msg)]() mutable {
+      return do_send(std::move(msg));
+    });
+  }
+}
+
+seastar::future<> IOHandler::do_send(MessageFRef msg)
+{
+  assert(seastar::this_shard_id() == sid);
   if (io_state != io_state_t::drop) {
     out_pending_msgs.push_back(std::move(msg));
     notify_out_dispatch();
@@ -134,7 +148,19 @@ seastar::future<> IOHandler::send(MessageFRef msg)
 
 seastar::future<> IOHandler::send_keepalive()
 {
-  ceph_assert_always(seastar::this_shard_id() == sid);
+  if (seastar::this_shard_id() == sid) {
+    return do_send_keepalive();
+  } else {
+    return seastar::smp::submit_to(
+        sid, [this] {
+      return do_send_keepalive();
+    });
+  }
+}
+
+seastar::future<> IOHandler::do_send_keepalive()
+{
+  assert(seastar::this_shard_id() == sid);
   if (!need_keepalive) {
     need_keepalive = true;
     notify_out_dispatch();
@@ -176,10 +202,11 @@ void IOHandler::set_io_state(
     FrameAssemblerV2Ref fa,
     bool set_notify_out)
 {
+  auto prv_state = io_state;
   ceph_assert_always(!(
-    (new_state == io_state_t::none && io_state != io_state_t::none) ||
-    (new_state == io_state_t::open && io_state == io_state_t::open) ||
-    (new_state != io_state_t::drop && io_state == io_state_t::drop)
+    (new_state == io_state_t::none && prv_state != io_state_t::none) ||
+    (new_state == io_state_t::open && prv_state == io_state_t::open) ||
+    (new_state != io_state_t::drop && prv_state == io_state_t::drop)
   ));
 
   bool dispatch_in = false;
@@ -198,7 +225,7 @@ void IOHandler::set_io_state(
           conn.get_local_shared_foreign_from_this());
     }
 #endif
-  } else if (io_state == io_state_t::open) {
+  } else if (prv_state == io_state_t::open) {
     // from open
     ceph_assert_always(protocol_is_connected == true);
     protocol_is_connected = false;
@@ -223,7 +250,8 @@ void IOHandler::set_io_state(
     need_notify_out = false;
   }
 
-  if (io_state != new_state) {
+  // FIXME: simplify and drop the prv_state == new_state case
+  if (prv_state != new_state) {
     io_state = new_state;
     io_state_changed.set_value();
     io_state_changed = seastar::promise<>();
@@ -260,6 +288,8 @@ IOHandler::wait_io_exit_dispatching()
       }
     }()
   ).discard_result().then([this] {
+    ceph_assert_always(frame_assembler != nullptr);
+    ceph_assert_always(!frame_assembler->is_socket_valid());
     return exit_dispatching_ret{
       std::move(frame_assembler),
       get_states()};
@@ -359,6 +389,7 @@ void IOHandler::dispatch_accept()
   // protocol_is_connected can be from true to true here if the replacing is
   // happening to a connected connection.
   protocol_is_connected = true;
+  ceph_assert_always(conn_ref);
   dispatchers.ms_handle_accept(conn_ref);
 }
 
@@ -369,6 +400,7 @@ void IOHandler::dispatch_connect()
   }
   ceph_assert_always(protocol_is_connected == false);
   protocol_is_connected = true;
+  ceph_assert_always(conn_ref);
   dispatchers.ms_handle_connect(conn_ref);
 }
 
@@ -379,6 +411,7 @@ void IOHandler::dispatch_reset(bool is_replace)
     return;
   }
   need_dispatch_reset = false;
+  ceph_assert_always(conn_ref);
   dispatchers.ms_handle_reset(conn_ref, is_replace);
 }
 
@@ -387,6 +420,7 @@ void IOHandler::dispatch_remote_reset()
   if (io_state == io_state_t::drop) {
     return;
   }
+  ceph_assert_always(conn_ref);
   dispatchers.ms_handle_remote_reset(conn_ref);
 }
 
@@ -404,57 +438,56 @@ void IOHandler::ack_out_sent(seq_num_t seq)
   }
 }
 
-seastar::future<stop_t> IOHandler::try_exit_out_dispatch() {
-  assert(!is_out_queued());
-  return frame_assembler->flush<false>(
-  ).then([this] {
-    if (!is_out_queued()) {
-      // still nothing pending to send after flush,
-      // the dispatching can ONLY stop now
-      ceph_assert(out_dispatching);
-      out_dispatching = false;
-      if (unlikely(out_exit_dispatching.has_value())) {
-        out_exit_dispatching->set_value();
-        out_exit_dispatching = std::nullopt;
-        logger().info("{} do_out_dispatch: nothing queued at {},"
-                      " set out_exit_dispatching",
-                      conn, io_state);
-      }
-      return seastar::make_ready_future<stop_t>(stop_t::yes);
-    } else {
-      // something is pending to send during flushing
-      return seastar::make_ready_future<stop_t>(stop_t::no);
-    }
-  });
-}
-
 seastar::future<> IOHandler::do_out_dispatch()
 {
   return seastar::repeat([this] {
     switch (io_state) {
      case io_state_t::open: {
-      bool still_queued = is_out_queued();
-      if (unlikely(!still_queued)) {
-        return try_exit_out_dispatch();
+      if (unlikely(!is_out_queued())) {
+        // try exit open dispatching
+        return frame_assembler->flush<false>(
+        ).then([this] {
+          if (io_state != io_state_t::open || is_out_queued()) {
+            return seastar::make_ready_future<stop_t>(stop_t::no);
+          }
+          // still nothing pending to send after flush,
+          // open dispatching can ONLY stop now
+          ceph_assert(out_dispatching);
+          out_dispatching = false;
+          if (unlikely(out_exit_dispatching.has_value())) {
+            out_exit_dispatching->set_value();
+            out_exit_dispatching = std::nullopt;
+            logger().info("{} do_out_dispatch: nothing queued at {},"
+                          " set out_exit_dispatching",
+                          conn, io_state);
+          }
+          return seastar::make_ready_future<stop_t>(stop_t::yes);
+        });
       }
+
       auto to_ack = ack_left;
       assert(to_ack == 0 || in_seq > 0);
       return frame_assembler->write<false>(
         sweep_out_pending_msgs_to_sent(
           need_keepalive, next_keepalive_ack, to_ack > 0)
       ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
+        if (io_state != io_state_t::open) {
+          return frame_assembler->flush<false>(
+          ).then([] {
+            return seastar::make_ready_future<stop_t>(stop_t::no);
+          });
+        }
+
         need_keepalive = false;
         if (next_keepalive_ack == prv_keepalive_ack) {
           next_keepalive_ack = std::nullopt;
         }
         assert(ack_left >= to_ack);
         ack_left -= to_ack;
-        if (!is_out_queued()) {
-          return try_exit_out_dispatch();
-        } else {
-          // messages were enqueued during socket write
-          return seastar::make_ready_future<stop_t>(stop_t::no);
-        }
+
+        // FIXME: may leak a flush if state is changed after return and before
+        // the next repeat body.
+        return seastar::make_ready_future<stop_t>(stop_t::no);
       });
      }
      case io_state_t::delay:
@@ -480,9 +513,9 @@ seastar::future<> IOHandler::do_out_dispatch()
       }
       return seastar::make_ready_future<stop_t>(stop_t::yes);
      default:
-      ceph_assert(false);
+      ceph_abort("impossible");
     }
-  }).handle_exception_type([this] (const std::system_error& e) {
+  }).handle_exception_type([this](const std::system_error& e) {
     if (e.code() != std::errc::broken_pipe &&
         e.code() != std::errc::connection_reset &&
         e.code() != error::negotiation_failure) {
@@ -522,6 +555,7 @@ void IOHandler::maybe_notify_out_dispatch()
 
 void IOHandler::notify_out_dispatch()
 {
+  assert(is_out_queued());
   if (need_notify_out) {
     handshake_listener->notify_out();
   }
@@ -529,26 +563,29 @@ void IOHandler::notify_out_dispatch()
     // already dispatching
     return;
   }
-  out_dispatching = true;
+
   switch (io_state) {
-   case io_state_t::open:
-     [[fallthrough]];
-   case io_state_t::delay:
+  case io_state_t::open:
+    [[fallthrough]];
+  case io_state_t::delay:
+    out_dispatching = true;
     assert(!gate.is_closed());
     gate.dispatch_in_background("do_out_dispatch", conn, [this] {
       return do_out_dispatch();
     });
     return;
-   case io_state_t::drop:
-    out_dispatching = false;
+  case io_state_t::drop:
+    // do not dispatch out
     return;
-   default:
-    ceph_assert(false);
+  default:
+    ceph_abort("impossible");
   }
 }
 
 seastar::future<>
-IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size)
+IOHandler::read_message(
+    utime_t throttle_stamp,
+    std::size_t msg_size)
 {
   return frame_assembler->read_frame_payload<false>(
   ).then([this, throttle_stamp, msg_size](auto payload) {
@@ -657,6 +694,7 @@ IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size)
     // TODO: change MessageRef with seastar::shared_ptr
     auto msg_ref = MessageRef{message, false};
     assert(io_state == io_state_t::open);
+    ceph_assert_always(conn_ref);
     // throttle the reading process by the returned future
     return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
   });
@@ -666,7 +704,8 @@ void IOHandler::do_in_dispatch()
 {
   ceph_assert_always(!in_exit_dispatching.has_value());
   in_exit_dispatching = seastar::promise<>();
-  gate.dispatch_in_background("do_in_dispatch", conn, [this] {
+  gate.dispatch_in_background(
+      "do_in_dispatch", conn, [this] {
     return seastar::keep_doing([this] {
       return frame_assembler->read_main_preamble<false>(
       ).then([this](auto ret) {
@@ -679,7 +718,7 @@ void IOHandler::do_in_dispatch()
                 return seastar::now();
               }
               // TODO: message throttler
-              ceph_assert(false);
+              ceph_abort("TODO");
               return seastar::now();
             }).then([this, msg_size] {
               // throttle_bytes() logic
@@ -766,4 +805,20 @@ void IOHandler::do_in_dispatch()
   });
 }
 
+seastar::future<>
+IOHandler::close_io(bool is_dispatch_reset, bool is_replace)
+{
+  ceph_assert_always(io_state == io_state_t::drop);
+
+  if (is_dispatch_reset) {
+    dispatch_reset(is_replace);
+  }
+
+  ceph_assert_always(conn_ref);
+  conn_ref.reset();
+
+  assert(!gate.is_closed());
+  return gate.close();
+}
+
 } // namespace crimson::net
index db82de5160ec89ab60d28d1c6ab2eb2c30e9f9f5..f3220d994ba9de412581fb3abb1375f63034c4ff 100644 (file)
@@ -159,21 +159,7 @@ public:
   };
   void print_io_stat(std::ostream &out) const;
 
-  seastar::future<> close_io(
-      bool is_dispatch_reset,
-      bool is_replace) {
-    ceph_assert_always(io_state == io_state_t::drop);
-
-    if (is_dispatch_reset) {
-      dispatch_reset(is_replace);
-    }
-
-    ceph_assert_always(conn_ref);
-    conn_ref.reset();
-
-    assert(!gate.is_closed());
-    return gate.close();
-  }
+  seastar::future<> close_io(bool is_dispatch_reset, bool is_replace);
 
   /**
    * io_state_t
@@ -213,6 +199,10 @@ public:
   void dispatch_connect();
 
  private:
+  seastar::future<> do_send(MessageFRef msg);
+
+  seastar::future<> do_send_keepalive();
+
   void dispatch_reset(bool is_replace);
 
   void dispatch_remote_reset();
@@ -224,14 +214,16 @@ public:
             next_keepalive_ack.has_value());
   }
 
+  bool has_out_sent() const {
+    return !out_sent_msgs.empty();
+  }
+
   void reset_in();
 
   void reset_out();
 
   void discard_out_sent();
 
-  seastar::future<stop_t> try_exit_out_dispatch();
-
   seastar::future<> do_out_dispatch();
 
   ceph::bufferlist sweep_out_pending_msgs_to_sent(
@@ -245,7 +237,9 @@ public:
 
   void ack_out_sent(seq_num_t seq);
 
-  seastar::future<> read_message(utime_t throttle_stamp, std::size_t msg_size);
+  seastar::future<> read_message(
+      utime_t throttle_stamp,
+      std::size_t msg_size);
 
   void do_in_dispatch();