]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: cleanup, rename out_state to io_state
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 7 Dec 2022 01:47:53 +0000 (09:47 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 8 Feb 2023 06:07:41 +0000 (14:07 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index 3bc9e9641aa4589f6ba1fc9ac541c8cea6440938..01d2245cd2ae895b7af4524996603704eedd9443 100644 (file)
@@ -122,7 +122,7 @@ ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent(
 
 seastar::future<> Protocol::send(MessageURef msg)
 {
-  if (out_state != out_state_t::drop) {
+  if (io_state != io_state_t::drop) {
     out_pending_msgs.push_back(std::move(msg));
     notify_out_dispatch();
   }
@@ -140,22 +140,22 @@ seastar::future<> Protocol::send_keepalive()
 
 void Protocol::mark_down()
 {
-  ceph_assert_always(out_state != out_state_t::none);
+  ceph_assert_always(io_state != io_state_t::none);
   need_dispatch_reset = false;
-  if (out_state == out_state_t::drop) {
+  if (io_state == io_state_t::drop) {
     return;
   }
 
   logger().info("{} mark_down() with {}",
                 conn, io_stat_printer{*this});
-  set_out_state(out_state_t::drop);
+  set_io_state(io_state_t::drop);
   notify_mark_down();
 }
 
 void Protocol::print_io_stat(std::ostream &out) const
 {
   out << "io_stat("
-      << "out_state=" << fmt::format("{}", out_state)
+      << "io_state=" << fmt::format("{}", io_state)
       << ", in_seq=" << in_seq
       << ", out_seq=" << out_seq
       << ", out_pending_msgs_size=" << out_pending_msgs.size()
@@ -166,18 +166,18 @@ void Protocol::print_io_stat(std::ostream &out) const
       << ")";
 }
 
-void Protocol::set_out_state(
-    const Protocol::out_state_t &new_state,
+void Protocol::set_io_state(
+    const Protocol::io_state_t &new_state,
     FrameAssemblerV2Ref fa)
 {
   ceph_assert_always(!(
-    (new_state == out_state_t::none && out_state != out_state_t::none) ||
-    (new_state == out_state_t::open && out_state == out_state_t::open) ||
-    (new_state != out_state_t::drop && out_state == out_state_t::drop)
+    (new_state == io_state_t::none && io_state != io_state_t::none) ||
+    (new_state == io_state_t::open && io_state == io_state_t::open) ||
+    (new_state != io_state_t::drop && io_state == io_state_t::drop)
   ));
 
   bool dispatch_in = false;
-  if (new_state == out_state_t::open) {
+  if (new_state == io_state_t::open) {
     // to open
     ceph_assert_always(protocol_is_connected == true);
     assert(fa != nullptr);
@@ -190,7 +190,7 @@ void Protocol::set_out_state(
       conn.interceptor->register_conn_ready(conn);
     }
 #endif
-  } else if (out_state == out_state_t::open) {
+  } else if (io_state == io_state_t::open) {
     // from open
     ceph_assert_always(protocol_is_connected == true);
     protocol_is_connected = false;
@@ -205,10 +205,10 @@ void Protocol::set_out_state(
     assert(fa == nullptr);
   }
 
-  if (out_state != new_state) {
-    out_state = new_state;
-    out_state_changed.set_value();
-    out_state_changed = seastar::promise<>();
+  if (io_state != new_state) {
+    io_state = new_state;
+    io_state_changed.set_value();
+    io_state_changed = seastar::promise<>();
   }
 
   /*
@@ -222,7 +222,7 @@ void Protocol::set_out_state(
 
 seastar::future<FrameAssemblerV2Ref> Protocol::wait_io_exit_dispatching()
 {
-  ceph_assert_always(out_state != out_state_t::open);
+  ceph_assert_always(io_state != io_state_t::open);
   ceph_assert_always(frame_assembler != nullptr);
   ceph_assert_always(!frame_assembler->is_socket_valid());
   return seastar::when_all(
@@ -247,7 +247,7 @@ seastar::future<FrameAssemblerV2Ref> Protocol::wait_io_exit_dispatching()
 
 void Protocol::requeue_out_sent()
 {
-  assert(out_state != out_state_t::open);
+  assert(io_state != io_state_t::open);
   if (out_sent_msgs.empty()) {
     return;
   }
@@ -269,7 +269,7 @@ void Protocol::requeue_out_sent()
 
 void Protocol::requeue_out_sent_up_to(seq_num_t seq)
 {
-  assert(out_state != out_state_t::open);
+  assert(io_state != io_state_t::open);
   if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
     logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
                    conn, out_seq, seq);
@@ -291,7 +291,7 @@ void Protocol::requeue_out_sent_up_to(seq_num_t seq)
 
 void Protocol::reset_out()
 {
-  assert(out_state != out_state_t::open);
+  assert(io_state != io_state_t::open);
   out_seq = 0;
   out_pending_msgs.clear();
   out_sent_msgs.clear();
@@ -302,7 +302,7 @@ void Protocol::reset_out()
 
 void Protocol::dispatch_accept()
 {
-  if (out_state == out_state_t::drop) {
+  if (io_state == io_state_t::drop) {
     return;
   }
   // protocol_is_connected can be from true to true here if the replacing is
@@ -314,7 +314,7 @@ void Protocol::dispatch_accept()
 
 void Protocol::dispatch_connect()
 {
-  if (out_state == out_state_t::drop) {
+  if (io_state == io_state_t::drop) {
     return;
   }
   ceph_assert_always(protocol_is_connected == false);
@@ -325,7 +325,7 @@ void Protocol::dispatch_connect()
 
 void Protocol::dispatch_reset(bool is_replace)
 {
-  ceph_assert_always(out_state == out_state_t::drop);
+  ceph_assert_always(io_state == io_state_t::drop);
   if (!need_dispatch_reset) {
     return;
   }
@@ -337,7 +337,7 @@ void Protocol::dispatch_reset(bool is_replace)
 
 void Protocol::dispatch_remote_reset()
 {
-  if (out_state == out_state_t::drop) {
+  if (io_state == io_state_t::drop) {
     return;
   }
   dispatchers.ms_handle_remote_reset(
@@ -372,7 +372,7 @@ seastar::future<stop_t> Protocol::try_exit_out_dispatch() {
         out_exit_dispatching = std::nullopt;
         logger().info("{} do_out_dispatch: nothing queued at {},"
                       " set out_exit_dispatching",
-                      conn, out_state);
+                      conn, io_state);
       }
       return seastar::make_ready_future<stop_t>(stop_t::yes);
     } else {
@@ -385,8 +385,8 @@ seastar::future<stop_t> Protocol::try_exit_out_dispatch() {
 seastar::future<> Protocol::do_out_dispatch()
 {
   return seastar::repeat([this] {
-    switch (out_state) {
-     case out_state_t::open: {
+    switch (io_state) {
+     case io_state_t::open: {
       bool still_queued = is_out_queued();
       if (unlikely(!still_queued)) {
         return try_exit_out_dispatch();
@@ -412,7 +412,7 @@ seastar::future<> Protocol::do_out_dispatch()
         }
       });
      }
-     case out_state_t::delay:
+     case io_state_t::delay:
       // delay out dispatching until open
       if (out_exit_dispatching) {
         out_exit_dispatching->set_value();
@@ -421,9 +421,9 @@ seastar::future<> Protocol::do_out_dispatch()
       } else {
         logger().info("{} do_out_dispatch: delay ...", conn);
       }
-      return out_state_changed.get_future(
+      return io_state_changed.get_future(
       ).then([] { return stop_t::no; });
-     case out_state_t::drop:
+     case io_state_t::drop:
       ceph_assert(out_dispatching);
       out_dispatching = false;
       if (out_exit_dispatching) {
@@ -442,24 +442,24 @@ seastar::future<> Protocol::do_out_dispatch()
         e.code() != std::errc::connection_reset &&
         e.code() != error::negotiation_failure) {
       logger().error("{} do_out_dispatch(): unexpected error at {} -- {}",
-                     conn, out_state, e);
+                     conn, io_state, e);
       ceph_abort();
     }
 
-    if (out_state == out_state_t::open) {
+    if (io_state == io_state_t::open) {
       logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
-                    conn, out_state, e);
+                    conn, io_state, e);
       std::exception_ptr eptr;
       try {
         throw e;
       } catch(...) {
         eptr = std::current_exception();
       }
-      set_out_state(out_state_t::delay);
+      set_io_state(io_state_t::delay);
       notify_out_fault("do_out_dispatch", eptr);
     } else {
       logger().info("{} do_out_dispatch(): fault at {} -- {}",
-                    conn, out_state, e);
+                    conn, io_state, e);
     }
 
     return do_out_dispatch();
@@ -474,16 +474,16 @@ void Protocol::notify_out_dispatch()
     return;
   }
   out_dispatching = true;
-  switch (out_state) {
-   case out_state_t::open:
+  switch (io_state) {
+   case io_state_t::open:
      [[fallthrough]];
-   case out_state_t::delay:
+   case io_state_t::delay:
     assert(!gate.is_closed());
     gate.dispatch_in_background("do_out_dispatch", conn, [this] {
       return do_out_dispatch();
     });
     return;
-   case out_state_t::drop:
+   case io_state_t::drop:
     out_dispatching = false;
     return;
    default:
@@ -496,9 +496,9 @@ Protocol::read_message(utime_t throttle_stamp, std::size_t msg_size)
 {
   return frame_assembler->read_frame_payload(
   ).then([this, throttle_stamp, msg_size](auto payload) {
-    if (unlikely(out_state != out_state_t::open)) {
+    if (unlikely(io_state != io_state_t::open)) {
       logger().debug("{} triggered {} during read_message()",
-                     conn, out_state);
+                     conn, io_state);
       abort_protocol();
     }
 
@@ -597,7 +597,7 @@ Protocol::read_message(utime_t throttle_stamp, std::size_t msg_size)
 
     // TODO: change MessageRef with seastar::shared_ptr
     auto msg_ref = MessageRef{message, false};
-    assert(out_state == out_state_t::open);
+    assert(io_state == io_state_t::open);
     // throttle the reading process by the returned future
     return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
   });
@@ -688,14 +688,14 @@ void Protocol::do_in_dispatch()
         e_what = e.what();
       }
 
-      if (out_state == out_state_t::open) {
+      if (io_state == io_state_t::open) {
         logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
-                      conn, out_state, e_what);
-        set_out_state(out_state_t::delay);
+                      conn, io_state, e_what);
+        set_io_state(io_state_t::delay);
         notify_out_fault("do_in_dispatch", eptr);
       } else {
         logger().info("{} do_in_dispatch(): fault at {} -- {}",
-                      conn, out_state, e_what);
+                      conn, io_state, e_what);
       }
     }).finally([this] {
       ceph_assert_always(in_exit_dispatching.has_value());
index c9bf000c0d710a6d97038dd898c83a2218c592f2..062ffe697e2af9329b453dc6ced218baf3136779 100644 (file)
@@ -78,27 +78,27 @@ class Protocol {
 
 // TODO: encapsulate a SessionedSender class
  protected:
-  seastar::future<> close_out() {
-    ceph_assert_always(out_state == out_state_t::drop);
+  seastar::future<> close_io() {
+    ceph_assert_always(io_state == io_state_t::drop);
     assert(!gate.is_closed());
     return gate.close();
   }
 
   /**
-   * out_state_t
+   * io_state_t
    *
-   * The out_state is changed with protocol state atomically, indicating the
-   * out behavior of the according protocol state.
+   * The io_state is changed with protocol state atomically, indicating the
+   * IOHandler behavior of the according protocol state.
    */
-  enum class out_state_t : uint8_t {
+  enum class io_state_t : uint8_t {
     none,
     delay,
     open,
     drop
   };
-  friend class fmt::formatter<out_state_t>;
+  friend class fmt::formatter<io_state_t>;
 
-  void set_out_state(const out_state_t &new_state, FrameAssemblerV2Ref fa=nullptr);
+  void set_io_state(const io_state_t &new_state, FrameAssemblerV2Ref fa=nullptr);
 
   seastar::future<FrameAssemblerV2Ref> wait_io_exit_dispatching();
 
@@ -165,15 +165,15 @@ class Protocol {
 
   bool need_dispatch_reset = true;
 
+  io_state_t io_state = io_state_t::none;
+
+  // wait until current io_state changed
+  seastar::promise<> io_state_changed;
+
   /*
    * out states for writing
    */
 
-  out_state_t out_state = out_state_t::none;
-
-  // wait until current out_state changed
-  seastar::promise<> out_state_changed;
-
   bool out_dispatching = false;
 
   std::optional<seastar::promise<>> out_exit_dispatching;
@@ -216,11 +216,11 @@ inline std::ostream& operator<<(
 } // namespace crimson::net
 
 template <>
-struct fmt::formatter<crimson::net::Protocol::out_state_t>
+struct fmt::formatter<crimson::net::Protocol::io_state_t>
   : fmt::formatter<std::string_view> {
   template <typename FormatContext>
-  auto format(crimson::net::Protocol::out_state_t state, FormatContext& ctx) {
-    using enum crimson::net::Protocol::out_state_t;
+  auto format(crimson::net::Protocol::io_state_t state, FormatContext& ctx) {
+    using enum crimson::net::Protocol::io_state_t;
     std::string_view name;
     switch (state) {
     case none:
index eb9c795f6590b0a962fc5ffb72984307e255152a..900d4a5ca05d1d1e6813b3e3e924d07f31f9db1f 100644 (file)
@@ -201,7 +201,7 @@ void ProtocolV2::start_accept(SocketRef&& new_socket,
   execute_accepting();
 }
 
-void ProtocolV2::trigger_state(state_t new_state, out_state_t _out_state, bool reentrant)
+void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool reentrant)
 {
   if (!reentrant && new_state == state) {
     logger().error("{} is not allowed to re-trigger state {}",
@@ -225,9 +225,9 @@ void ProtocolV2::trigger_state(state_t new_state, out_state_t _out_state, bool r
   if (new_state == state_t::READY) {
     // I'm not responsible to shutdown the socket at READY
     is_socket_valid = false;
-    set_out_state(_out_state, std::move(frame_assembler));
+    set_io_state(new_io_state, std::move(frame_assembler));
   } else {
-    set_out_state(_out_state, nullptr);
+    set_io_state(new_io_state, nullptr);
   }
 
   /*
@@ -780,7 +780,7 @@ ProtocolV2::client_reconnect()
 void ProtocolV2::execute_connecting()
 {
   ceph_assert_always(!is_socket_valid);
-  trigger_state(state_t::CONNECTING, out_state_t::delay, false);
+  trigger_state(state_t::CONNECTING, io_state_t::delay, false);
   gated_execute("execute_connecting", conn, [this] {
       global_seq = messenger.get_global_seq();
       assert(client_cookie != 0);
@@ -1468,7 +1468,7 @@ ProtocolV2::server_reconnect()
 void ProtocolV2::execute_accepting()
 {
   assert(is_socket_valid);
-  trigger_state(state_t::ACCEPTING, out_state_t::none, false);
+  trigger_state(state_t::ACCEPTING, io_state_t::none, false);
   gate.dispatch_in_background("execute_accepting", conn, [this] {
       return seastar::futurize_invoke([this] {
           INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
@@ -1581,7 +1581,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
   };
 
   ceph_assert_always(is_socket_valid);
-  trigger_state(state_t::ESTABLISHING, out_state_t::delay, false);
+  trigger_state(state_t::ESTABLISHING, io_state_t::delay, false);
   if (existing_conn) {
     static_cast<ProtocolV2*>(existing_conn->protocol.get())->do_close(
         true /* is_dispatch_reset */, std::move(accept_me));
@@ -1681,7 +1681,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
                                    uint64_t new_connect_seq,
                                    uint64_t new_msg_seq)
 {
-  trigger_state(state_t::REPLACING, out_state_t::delay, false);
+  trigger_state(state_t::REPLACING, io_state_t::delay, false);
   ceph_assert_always(has_socket);
   ceph_assert_always(!mover.socket->is_shutdown());
   if (is_socket_valid) {
@@ -1791,7 +1791,7 @@ void ProtocolV2::execute_ready()
   assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
   protocol_timer.cancel();
   ceph_assert_always(is_socket_valid);
-  trigger_state(state_t::READY, out_state_t::open, false);
+  trigger_state(state_t::READY, io_state_t::open, false);
 }
 
 // STANDBY state
@@ -1799,7 +1799,7 @@ void ProtocolV2::execute_ready()
 void ProtocolV2::execute_standby()
 {
   ceph_assert_always(!is_socket_valid);
-  trigger_state(state_t::STANDBY, out_state_t::delay, false);
+  trigger_state(state_t::STANDBY, io_state_t::delay, false);
 }
 
 void ProtocolV2::notify_out()
@@ -1816,7 +1816,7 @@ void ProtocolV2::notify_out()
 void ProtocolV2::execute_wait(bool max_backoff)
 {
   ceph_assert_always(!is_socket_valid);
-  trigger_state(state_t::WAIT, out_state_t::delay, false);
+  trigger_state(state_t::WAIT, io_state_t::delay, false);
   gated_execute("execute_wait", conn, [this, max_backoff] {
     double backoff = protocol_timer.last_dur();
     if (max_backoff) {
@@ -1848,7 +1848,7 @@ void ProtocolV2::execute_wait(bool max_backoff)
 void ProtocolV2::execute_server_wait()
 {
   ceph_assert_always(is_socket_valid);
-  trigger_state(state_t::SERVER_WAIT, out_state_t::none, false);
+  trigger_state(state_t::SERVER_WAIT, io_state_t::none, false);
   gated_execute("execute_server_wait", conn, [this] {
     return frame_assembler->read_exactly(1
     ).then([this](auto bl) {
@@ -1924,7 +1924,7 @@ void ProtocolV2::do_close(
     ceph_assert(false);
   }
   protocol_timer.cancel();
-  trigger_state(state_t::CLOSING, out_state_t::drop, false);
+  trigger_state(state_t::CLOSING, io_state_t::drop, false);
 
   if (f_accept_new) {
     (*f_accept_new)();
@@ -1934,8 +1934,8 @@ void ProtocolV2::do_close(
     is_socket_valid = false;
   }
   assert(!gate.is_closed());
-  auto gate_closed = gate.close();
-  auto out_closed = close_out();
+  auto handshake_closed = gate.close();
+  auto io_closed = close_io();
 
   if (is_dispatch_reset) {
     dispatch_reset(is_replace);
@@ -1944,7 +1944,7 @@ void ProtocolV2::do_close(
   // asynchronous operations
   assert(!closed_clean_fut.valid());
   closed_clean_fut = seastar::when_all(
-      std::move(gate_closed), std::move(out_closed)
+      std::move(handshake_closed), std::move(io_closed)
   ).discard_result().then([this] {
     ceph_assert_always(!exit_io.has_value());
     if (has_socket) {
index e193e1c84c097167471dddb66c246c87d596d5bb..cbec7923b3222bf01b4e7dfa1ced07ec2fe11c2a 100644 (file)
@@ -107,7 +107,7 @@ class ProtocolV2 final : public Protocol {
     return statenames[static_cast<int>(state)];
   }
 
-  void trigger_state(state_t state, out_state_t out_state, bool reentrant);
+  void trigger_state(state_t state, io_state_t io_state, bool reentrant);
 
   uint64_t peer_supported_features = 0;