]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: rename in/out related members and methods
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 3 Nov 2022 02:48:56 +0000 (10:48 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 8 Feb 2023 06:07:41 +0000 (14:07 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/mon/MonClient.cc
src/crimson/net/Connection.h
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/test/crimson/test_messenger.cc

index 760a97579291d7b0d9ae804298b9b1b8726df698..7be09915a94618fe3b3cc02daa38ebdec9edd480 100644 (file)
@@ -465,7 +465,7 @@ void Client::tick()
   gate.dispatch_in_background(__func__, *this, [this] {
     if (active_con) {
       return seastar::when_all_succeed(wait_for_send_log(),
-                                       active_con->get_conn()->keepalive(),
+                                       active_con->get_conn()->send_keepalive(),
                                        active_con->renew_tickets(),
                                        active_con->renew_rotating_keyring()).discard_result();
     } else {
index 7cb78438dc3f875cbc606586e188655e56882fce..4c90f6e6852c2f0deca5f17595a59bf00da44b91 100644 (file)
@@ -76,14 +76,14 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
   virtual seastar::future<> send(MessageURef msg) = 0;
 
   /**
-   * keepalive
+   * send_keepalive
    *
    * Send a keepalive message over a connection that has completed its
    * handshake.
    *
    * May be invoked from any core.
    */
-  virtual seastar::future<> keepalive() = 0;
+  virtual seastar::future<> send_keepalive() = 0;
 
   virtual clock_t::time_point get_last_keepalive() const = 0;
 
index 1adfe895455ffe5357b3391c3362ee019ec5dab3..27fd98f54aeb17ba2ccb943194463d1eac89e327 100644 (file)
@@ -28,7 +28,7 @@ Protocol::Protocol(ChainedDispatchers& dispatchers,
 Protocol::~Protocol()
 {
   ceph_assert(gate.is_closed());
-  assert(!exit_open);
+  assert(!out_exit_dispatching);
 }
 
 void Protocol::close(bool dispatch_reset,
@@ -53,7 +53,7 @@ void Protocol::close(bool dispatch_reset,
   if (conn.socket) {
     conn.socket->shutdown();
   }
-  set_write_state(write_state_t::drop);
+  set_out_state(out_state_t::drop);
   assert(!gate.is_closed());
   auto gate_closed = gate.close();
 
@@ -86,139 +86,143 @@ void Protocol::close(bool dispatch_reset,
   });
 }
 
-ceph::bufferlist Protocol::sweep_messages_and_move_to_sent(
+ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent(
       size_t num_msgs,
       bool require_keepalive,
-      std::optional<utime_t> keepalive_ack,
+      std::optional<utime_t> maybe_keepalive_ack,
       bool require_ack)
 {
-  ceph::bufferlist bl = do_sweep_messages(out_q,
+  ceph::bufferlist bl = do_sweep_messages(out_pending_msgs,
                                           num_msgs,
                                           require_keepalive,
-                                          keepalive_ack,
+                                          maybe_keepalive_ack,
                                           require_ack);
   if (!conn.policy.lossy) {
-    sent.insert(sent.end(),
-                std::make_move_iterator(out_q.begin()),
-                std::make_move_iterator(out_q.end()));
+    out_sent_msgs.insert(
+        out_sent_msgs.end(),
+        std::make_move_iterator(out_pending_msgs.begin()),
+        std::make_move_iterator(out_pending_msgs.end()));
   }
-  out_q.clear();
+  out_pending_msgs.clear();
   return bl;
 }
 
 seastar::future<> Protocol::send(MessageURef msg)
 {
-  if (write_state != write_state_t::drop) {
-    out_q.push_back(std::move(msg));
-    write_event();
+  if (out_state != out_state_t::drop) {
+    out_pending_msgs.push_back(std::move(msg));
+    notify_out_dispatch();
   }
   return seastar::now();
 }
 
-seastar::future<> Protocol::keepalive()
+seastar::future<> Protocol::send_keepalive()
 {
   if (!need_keepalive) {
     need_keepalive = true;
-    write_event();
+    notify_out_dispatch();
   }
   return seastar::now();
 }
 
-void Protocol::notify_keepalive_ack(utime_t _keepalive_ack)
+void Protocol::notify_keepalive_ack(utime_t keepalive_ack)
 {
-  logger().trace("{} got keepalive ack {}", conn, _keepalive_ack);
-  keepalive_ack = _keepalive_ack;
-  write_event();
+  logger().trace("{} got keepalive ack {}", conn, keepalive_ack);
+  next_keepalive_ack = keepalive_ack;
+  notify_out_dispatch();
 }
 
 void Protocol::notify_ack()
 {
   if (!conn.policy.lossy) {
     ++ack_left;
-    write_event();
+    notify_out_dispatch();
   }
 }
 
-void Protocol::requeue_sent()
+void Protocol::requeue_out_sent()
 {
-  assert(write_state != write_state_t::open);
-  if (sent.empty()) {
+  assert(out_state != out_state_t::open);
+  if (out_sent_msgs.empty()) {
     return;
   }
 
-  out_seq -= sent.size();
+  out_seq -= out_sent_msgs.size();
   logger().debug("{} requeue {} items, revert out_seq to {}",
-                 conn, sent.size(), out_seq);
-  for (MessageURef& msg : sent) {
+                 conn, out_sent_msgs.size(), out_seq);
+  for (MessageURef& msg : out_sent_msgs) {
     msg->clear_payload();
     msg->set_seq(0);
   }
-  out_q.insert(out_q.begin(),
-               std::make_move_iterator(sent.begin()),
-               std::make_move_iterator(sent.end()));
-  sent.clear();
-  write_event();
+  out_pending_msgs.insert(
+      out_pending_msgs.begin(),
+      std::make_move_iterator(out_sent_msgs.begin()),
+      std::make_move_iterator(out_sent_msgs.end()));
+  out_sent_msgs.clear();
+  notify_out_dispatch();
 }
 
-void Protocol::requeue_up_to(seq_num_t seq)
+void Protocol::requeue_out_sent_up_to(seq_num_t seq)
 {
-  assert(write_state != write_state_t::open);
-  if (sent.empty() && out_q.empty()) {
+  assert(out_state != out_state_t::open);
+  if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
     logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
                    conn, out_seq, seq);
     out_seq = seq;
     return;
   }
-  logger().debug("{} discarding sent items by seq {} (sent_len={}, out_seq={})",
-                 conn, seq, sent.size(), out_seq);
-  while (!sent.empty()) {
-    auto cur_seq = sent.front()->get_seq();
+  logger().debug("{} discarding sent msgs by seq {} (sent_len={}, out_seq={})",
+                 conn, seq, out_sent_msgs.size(), out_seq);
+  while (!out_sent_msgs.empty()) {
+    auto cur_seq = out_sent_msgs.front()->get_seq();
     if (cur_seq == 0 || cur_seq > seq) {
       break;
     } else {
-      sent.pop_front();
+      out_sent_msgs.pop_front();
     }
   }
-  requeue_sent();
+  requeue_out_sent();
 }
 
-void Protocol::reset_write()
+void Protocol::reset_out()
 {
-  assert(write_state != write_state_t::open);
+  assert(out_state != out_state_t::open);
   out_seq = 0;
-  out_q.clear();
-  sent.clear();
+  out_pending_msgs.clear();
+  out_sent_msgs.clear();
   need_keepalive = false;
-  keepalive_ack = std::nullopt;
+  next_keepalive_ack = std::nullopt;
   ack_left = 0;
 }
 
-void Protocol::ack_writes(seq_num_t seq)
+void Protocol::ack_out_sent(seq_num_t seq)
 {
   if (conn.policy.lossy) {  // lossy connections don't keep sent messages
     return;
   }
-  while (!sent.empty() && sent.front()->get_seq() <= seq) {
+  while (!out_sent_msgs.empty() &&
+         out_sent_msgs.front()->get_seq() <= seq) {
     logger().trace("{} got ack seq {} >= {}, pop {}",
-                   conn, seq, sent.front()->get_seq(), *sent.front());
-    sent.pop_front();
+                   conn, seq, out_sent_msgs.front()->get_seq(),
+                   *out_sent_msgs.front());
+    out_sent_msgs.pop_front();
   }
 }
 
-seastar::future<stop_t> Protocol::try_exit_sweep() {
-  assert(!is_queued());
+seastar::future<stop_t> Protocol::try_exit_out_dispatch() {
+  assert(!is_out_queued());
   return conn.socket->flush().then([this] {
-    if (!is_queued()) {
+    if (!is_out_queued()) {
       // still nothing pending to send after flush,
       // the dispatching can ONLY stop now
-      ceph_assert(write_dispatching);
-      write_dispatching = false;
-      if (unlikely(exit_open.has_value())) {
-        exit_open->set_value();
-        exit_open = std::nullopt;
-        logger().info("{} write_event: nothing queued at {},"
-                      " set exit_open",
-                      conn, write_state);
+      ceph_assert(out_dispatching);
+      out_dispatching = false;
+      if (unlikely(out_exit_dispatching.has_value())) {
+        out_exit_dispatching->set_value();
+        out_exit_dispatching = std::nullopt;
+        logger().info("{} do_out_dispatch: nothing queued at {},"
+                      " set out_exit_dispatching",
+                      conn, out_state);
       }
       return seastar::make_ready_future<stop_t>(stop_t::yes);
     } else {
@@ -228,56 +232,57 @@ seastar::future<stop_t> Protocol::try_exit_sweep() {
   });
 }
 
-seastar::future<> Protocol::do_write_dispatch_sweep()
+seastar::future<> Protocol::do_out_dispatch()
 {
   return seastar::repeat([this] {
-    switch (write_state) {
-     case write_state_t::open: {
-      size_t num_msgs = out_q.size();
-      bool still_queued = is_queued();
+    switch (out_state) {
+     case out_state_t::open: {
+      size_t num_msgs = out_pending_msgs.size();
+      bool still_queued = is_out_queued();
       if (unlikely(!still_queued)) {
-        return try_exit_sweep();
+        return try_exit_out_dispatch();
       }
-      auto acked = ack_left;
-      assert(acked == 0 || in_seq > 0);
-      // sweep all pending writes with the concrete Protocol
-      return conn.socket->write(sweep_messages_and_move_to_sent(
-          num_msgs, need_keepalive, keepalive_ack, acked > 0)
-      ).then([this, prv_keepalive_ack=keepalive_ack, acked] {
+      auto to_ack = ack_left;
+      assert(to_ack == 0 || in_seq > 0);
+      // sweep all pending out with the concrete Protocol
+      return conn.socket->write(
+        sweep_out_pending_msgs_to_sent(
+          num_msgs, need_keepalive, next_keepalive_ack, to_ack > 0)
+      ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
         need_keepalive = false;
-        if (keepalive_ack == prv_keepalive_ack) {
-          keepalive_ack = std::nullopt;
+        if (next_keepalive_ack == prv_keepalive_ack) {
+          next_keepalive_ack = std::nullopt;
         }
-        assert(ack_left >= acked);
-        ack_left -= acked;
-        if (!is_queued()) {
-          return try_exit_sweep();
+        assert(ack_left >= to_ack);
+        ack_left -= to_ack;
+        if (!is_out_queued()) {
+          return try_exit_out_dispatch();
         } else {
           // messages were enqueued during socket write
           return seastar::make_ready_future<stop_t>(stop_t::no);
         }
       });
      }
-     case write_state_t::delay:
-      // delay dispatching writes until open
-      if (exit_open) {
-        exit_open->set_value();
-        exit_open = std::nullopt;
-        logger().info("{} write_event: delay and set exit_open ...", conn);
+     case out_state_t::delay:
+      // delay out dispatching until open
+      if (out_exit_dispatching) {
+        out_exit_dispatching->set_value();
+        out_exit_dispatching = std::nullopt;
+        logger().info("{} do_out_dispatch: delay and set out_exit_dispatching ...", conn);
       } else {
-        logger().info("{} write_event: delay ...", conn);
+        logger().info("{} do_out_dispatch: delay ...", conn);
       }
-      return state_changed.get_shared_future()
-      .then([] { return stop_t::no; });
-     case write_state_t::drop:
-      ceph_assert(write_dispatching);
-      write_dispatching = false;
-      if (exit_open) {
-        exit_open->set_value();
-        exit_open = std::nullopt;
-        logger().info("{} write_event: dropped and set exit_open", conn);
+      return out_state_changed.get_shared_future(
+      ).then([] { return stop_t::no; });
+     case out_state_t::drop:
+      ceph_assert(out_dispatching);
+      out_dispatching = false;
+      if (out_exit_dispatching) {
+        out_exit_dispatching->set_value();
+        out_exit_dispatching = std::nullopt;
+        logger().info("{} do_out_dispatch: dropped and set out_exit_dispatching", conn);
       } else {
-        logger().info("{} write_event: dropped", conn);
+        logger().info("{} do_out_dispatch: dropped", conn);
       }
       return seastar::make_ready_future<stop_t>(stop_t::yes);
      default:
@@ -287,42 +292,42 @@ seastar::future<> Protocol::do_write_dispatch_sweep()
     if (e.code() != std::errc::broken_pipe &&
         e.code() != std::errc::connection_reset &&
         e.code() != error::negotiation_failure) {
-      logger().error("{} write_event(): unexpected error at {} -- {}",
-                     conn, write_state, e);
+      logger().error("{} do_out_dispatch(): unexpected error at {} -- {}",
+                     conn, out_state, e);
       ceph_abort();
     }
     conn.socket->shutdown();
-    if (write_state == write_state_t::open) {
-      logger().info("{} write_event(): fault at {}, going to delay -- {}",
-                    conn, write_state, e);
-      write_state = write_state_t::delay;
+    if (out_state == out_state_t::open) {
+      logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
+                    conn, out_state, e);
+      out_state = out_state_t::delay;
     } else {
-      logger().info("{} write_event(): fault at {} -- {}",
-                    conn, write_state, e);
+      logger().info("{} do_out_dispatch(): fault at {} -- {}",
+                    conn, out_state, e);
     }
-    return do_write_dispatch_sweep();
+    return do_out_dispatch();
   });
 }
 
-void Protocol::write_event()
+void Protocol::notify_out_dispatch()
 {
-  notify_write();
-  if (write_dispatching) {
+  notify_out();
+  if (out_dispatching) {
     // already dispatching
     return;
   }
-  write_dispatching = true;
-  switch (write_state) {
-   case write_state_t::open:
+  out_dispatching = true;
+  switch (out_state) {
+   case out_state_t::open:
      [[fallthrough]];
-   case write_state_t::delay:
+   case out_state_t::delay:
     assert(!gate.is_closed());
-    gate.dispatch_in_background("do_write_dispatch_sweep", *this, [this] {
-      return do_write_dispatch_sweep();
+    gate.dispatch_in_background("do_out_dispatch", *this, [this] {
+      return do_out_dispatch();
     });
     return;
-   case write_state_t::drop:
-    write_dispatching = false;
+   case out_state_t::drop:
+    out_dispatching = false;
     return;
    default:
     ceph_assert(false);
index 268ffc996820434f6c1443f72794503f86c30db3..c71b37f07c4d1a75a1e44ced2c1a2029bccb34ac 100644 (file)
@@ -62,23 +62,12 @@ class Protocol {
       const std::deque<MessageURef>& msgs,
       size_t num_msgs,
       bool require_keepalive,
-      std::optional<utime_t> keepalive_ack,
+      std::optional<utime_t> maybe_keepalive_ack,
       bool require_ack) = 0;
 
-  virtual void notify_write() {};
+  virtual void notify_out() = 0;
 
-  virtual void on_closed() {}
- private:
-  ceph::bufferlist sweep_messages_and_move_to_sent(
-      size_t num_msgs,
-      bool require_keepalive,
-      std::optional<utime_t> keepalive_ack,
-      bool require_ack); 
-
- protected:
-  ChainedDispatchers& dispatchers;
-  SocketConnection &conn;
+  virtual void on_closed() = 0;
 
  private:
   bool closed = false;
@@ -91,7 +80,7 @@ class Protocol {
 
   seastar::future<> send(MessageURef msg);
 
-  seastar::future<> keepalive();
+  seastar::future<> send_keepalive();
 
   clock_t::time_point get_last_keepalive() const {
     return last_keepalive;
@@ -112,40 +101,44 @@ class Protocol {
     out << "io_stat("
         << "in_seq=" << in_seq
         << ", out_seq=" << out_seq
-        << ", out_q_size=" << out_q.size()
-        << ", sent_size=" << sent.size()
+        << ", out_pending_msgs_size=" << out_pending_msgs.size()
+        << ", out_sent_msgs_size=" << out_sent_msgs.size()
         << ", need_ack=" << (ack_left > 0)
         << ", need_keepalive=" << need_keepalive
-        << ", need_keepalive_ack=" << bool(keepalive_ack)
+        << ", need_keepalive_ack=" << bool(next_keepalive_ack)
         << ")";
   }
 
 // TODO: encapsulate a SessionedSender class
  protected:
-  // write_state is changed with state atomically, indicating the write
-  // behavior of the according state.
-  enum class write_state_t : uint8_t {
+  /**
+   * out_state_t
+   *
+   * The out_state is changed with protocol state atomically, indicating the
+   * out behavior of the according protocol state.
+   */
+  enum class out_state_t : uint8_t {
     none,
     delay,
     open,
     drop
   };
 
-  friend class fmt::formatter<write_state_t>;
-  void set_write_state(const write_state_t& state) {
-    if (write_state == write_state_t::open &&
-        state != write_state_t::open &&
-        write_dispatching) {
-      exit_open = seastar::shared_promise<>();
+  friend class fmt::formatter<out_state_t>;
+  void set_out_state(const out_state_t& state) {
+    if (out_state == out_state_t::open &&
+        state != out_state_t::open &&
+        out_dispatching) {
+      out_exit_dispatching = seastar::shared_promise<>();
     }
-    write_state = state;
-    state_changed.set_value();
-    state_changed = seastar::shared_promise<>();
+    out_state = state;
+    out_state_changed.set_value();
+    out_state_changed = seastar::shared_promise<>();
   }
 
-  seastar::future<> wait_write_exit() {
-    if (exit_open) {
-      return exit_open->get_shared_future();
+  seastar::future<> wait_out_exit_dispatching() {
+    if (out_exit_dispatching) {
+      return out_exit_dispatching->get_shared_future();
     }
     return seastar::now();
   }
@@ -154,28 +147,28 @@ class Protocol {
 
   void notify_ack();
 
-  void requeue_up_to(seq_num_t seq);
+  void requeue_out_sent_up_to(seq_num_t seq);
 
-  void requeue_sent();
+  void requeue_out_sent();
 
-  void reset_write();
+  void reset_out();
 
-  void reset_read() {
+  void reset_in() {
     in_seq = 0;
   }
 
-  bool is_queued() const {
-    return (!out_q.empty() ||
+  bool is_out_queued() const {
+    return (!out_pending_msgs.empty() ||
             ack_left > 0 ||
             need_keepalive ||
-            keepalive_ack.has_value());
+            next_keepalive_ack.has_value());
   }
 
-  bool is_queued_or_sent() const {
-    return is_queued() || !sent.empty();
+  bool is_out_queued_or_sent() const {
+    return is_out_queued() || !out_sent_msgs.empty();
   }
 
-  void ack_writes(seq_num_t seq);
+  void ack_out_sent(seq_num_t seq);
 
   void set_last_keepalive(clock_t::time_point when) {
     last_keepalive = when;
@@ -189,35 +182,63 @@ class Protocol {
     in_seq = _in_seq;
   }
 
-  seq_num_t increment_out() {
+  seq_num_t increment_out_seq() {
     return ++out_seq;
   }
 
   crimson::common::Gated gate;
 
+  ChainedDispatchers& dispatchers;
+
+  SocketConnection &conn;
+
  private:
-  write_state_t write_state = write_state_t::none;
+  seastar::future<stop_t> try_exit_out_dispatch();
+
+  seastar::future<> do_out_dispatch();
+
+  ceph::bufferlist sweep_out_pending_msgs_to_sent(
+      size_t num_msgs,
+      bool require_keepalive,
+      std::optional<utime_t> maybe_keepalive_ack,
+      bool require_ack);
+
+  void notify_out_dispatch();
+
+  /*
+   * out states for writing
+   */
 
-  // wait until current state changed
-  seastar::shared_promise<> state_changed;
+  out_state_t out_state = out_state_t::none;
+
+  // wait until current out_state changed
+  seastar::shared_promise<> out_state_changed;
+
+  bool out_dispatching = false;
+
+  // If another continuation is trying to close or replace socket when
+  // out_dispatching is true and out_state is open, it needs to wait for
+  // out_exit_dispatching until writing is stopped or failed.
+  std::optional<seastar::shared_promise<>> out_exit_dispatching;
 
   /// the seq num of the last transmitted message
   seq_num_t out_seq = 0;
 
   // messages to be resent after connection gets reset
-  std::deque<MessageURef> out_q;
+  std::deque<MessageURef> out_pending_msgs;
 
   // messages sent, but not yet acked by peer
-  std::deque<MessageURef> sent;
+  std::deque<MessageURef> out_sent_msgs;
 
   bool need_keepalive = false;
-  std::optional<utime_t> keepalive_ack = std::nullopt;
+
+  std::optional<utime_t> next_keepalive_ack = std::nullopt;
+
   uint64_t ack_left = 0;
-  bool write_dispatching = false;
-  // If another continuation is trying to close or replace socket when
-  // write_dispatching is true and write_state is open,
-  // it needs to wait for exit_open until writing is stopped or failed.
-  std::optional<seastar::shared_promise<>> exit_open;
+
+  /*
+   * in states for reading
+   */
 
   /// the seq num of the last received message
   seq_num_t in_seq = 0;
@@ -225,10 +246,6 @@ class Protocol {
   clock_t::time_point last_keepalive;
 
   clock_t::time_point last_keepalive_ack;
-
-  seastar::future<stop_t> try_exit_sweep();
-  seastar::future<> do_write_dispatch_sweep();
-  void write_event();
 };
 
 inline std::ostream& operator<<(std::ostream& out, const Protocol& proto) {
@@ -245,11 +262,11 @@ inline std::ostream& operator<<(
 } // namespace crimson::net
 
 template <>
-struct fmt::formatter<crimson::net::Protocol::write_state_t>
+struct fmt::formatter<crimson::net::Protocol::out_state_t>
   : fmt::formatter<std::string_view> {
   template <typename FormatContext>
-  auto format(crimson::net::Protocol::write_state_t state, FormatContext& ctx) {
-    using enum crimson::net::Protocol::write_state_t;
+  auto format(crimson::net::Protocol::out_state_t state, FormatContext& ctx) {
+    using enum crimson::net::Protocol::out_state_t;
     std::string_view name;
     switch (state) {
     case none:
index 0e74ce827a5942c213e7e998f708618fb42ab32c..0c911afdc2729a05653c59bac4c888deef843564 100644 (file)
@@ -359,7 +359,7 @@ seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
   }
 }
 
-void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant)
+void ProtocolV2::trigger_state(state_t _state, out_state_t _out_state, bool reentrant)
 {
   if (!reentrant && _state == state) {
     logger().error("{} is not allowed to re-trigger state {}",
@@ -369,7 +369,7 @@ void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool
   logger().debug("{} TRIGGER {}, was {}",
                  conn, get_state_name(_state), get_state_name(state));
   state = _state;
-  set_write_state(_write_state);
+  set_out_state(_out_state);
 }
 
 void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr eptr)
@@ -379,7 +379,7 @@ void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr e
                   conn, func_name, get_state_name(state), eptr);
     close(true);
   } else if (conn.policy.server ||
-             (conn.policy.standby && !is_queued_or_sent())) {
+             (conn.policy.standby && !is_out_queued_or_sent())) {
     logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
                   conn, func_name, get_state_name(state), eptr);
     execute_standby();
@@ -398,11 +398,11 @@ void ProtocolV2::reset_session(bool full)
 {
   server_cookie = 0;
   connect_seq = 0;
-  reset_read();
+  reset_in();
   if (full) {
     client_cookie = generate_client_cookie();
     peer_global_seq = 0;
-    reset_write();
+    reset_out();
     dispatchers.ms_handle_remote_reset(
        seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
   }
@@ -678,7 +678,7 @@ ProtocolV2::client_connect()
       case Tag::SERVER_IDENT:
         return read_frame_payload().then([this] {
           // handle_server_ident() logic
-          requeue_sent();
+          requeue_out_sent();
           auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
           logger().debug("{} GOT ServerIdentFrame:"
                          " addrs={}, gid={}, gs={},"
@@ -800,7 +800,7 @@ ProtocolV2::client_reconnect()
           auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
           logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
                          conn, reconnect_ok.msg_seq());
-          requeue_up_to(reconnect_ok.msg_seq());
+          requeue_out_sent_up_to(reconnect_ok.msg_seq());
           return seastar::make_ready_future<next_step_t>(next_step_t::ready);
         });
       default: {
@@ -813,7 +813,7 @@ ProtocolV2::client_reconnect()
 
 void ProtocolV2::execute_connecting()
 {
-  trigger_state(state_t::CONNECTING, write_state_t::delay, false);
+  trigger_state(state_t::CONNECTING, out_state_t::delay, false);
   if (conn.socket) {
     conn.socket->shutdown();
   }
@@ -829,7 +829,7 @@ void ProtocolV2::execute_connecting()
         assert(server_cookie == 0);
         logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
       }
-      return wait_write_exit().then([this] {
+      return wait_out_exit_dispatching().then([this] {
           if (unlikely(state != state_t::CONNECTING)) {
             logger().debug("{} triggered {} before Socket::connect()",
                            conn, get_state_name(state));
@@ -925,7 +925,7 @@ void ProtocolV2::execute_connecting()
           }
 
           if (conn.policy.server ||
-              (conn.policy.standby && !is_queued_or_sent())) {
+              (conn.policy.standby && !is_out_queued_or_sent())) {
             logger().info("{} execute_connecting(): fault at {} with nothing to send,"
                           " going to STANDBY -- {}",
                           conn, get_state_name(state), eptr);
@@ -1178,7 +1178,7 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
         logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
                       " and lose to existing {}, ask client to wait",
                       conn, client_cookie, existing_proto->client_cookie, *existing_conn);
-        return existing_conn->keepalive().then([this] {
+        return existing_conn->send_keepalive().then([this] {
           return send_wait();
         });
       }
@@ -1457,7 +1457,7 @@ ProtocolV2::server_reconnect()
 
 void ProtocolV2::execute_accepting()
 {
-  trigger_state(state_t::ACCEPTING, write_state_t::none, false);
+  trigger_state(state_t::ACCEPTING, out_state_t::none, false);
   gate.dispatch_in_background("execute_accepting", *this, [this] {
       return seastar::futurize_invoke([this] {
           INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
@@ -1578,7 +1578,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
         conn.shared_from_this()));
   };
 
-  trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
+  trigger_state(state_t::ESTABLISHING, out_state_t::delay, false);
   if (existing_conn) {
     existing_conn->protocol->close(
         true /* dispatch_reset */, std::move(accept_me));
@@ -1635,8 +1635,8 @@ ProtocolV2::send_server_ident()
   logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
 
   // this is required for the case when this connection is being replaced
-  requeue_up_to(0);
-  reset_read();
+  requeue_out_sent_up_to(0);
+  reset_in();
 
   if (!conn.policy.lossy) {
     server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
@@ -1682,7 +1682,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
                                    uint64_t new_connect_seq,
                                    uint64_t new_msg_seq)
 {
-  trigger_state(state_t::REPLACING, write_state_t::delay, false);
+  trigger_state(state_t::REPLACING, out_state_t::delay, false);
   if (conn.socket) {
     conn.socket->shutdown();
   }
@@ -1699,7 +1699,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
                   new_conn_features, new_peer_supported_features,
                   new_peer_global_seq,
                   new_connect_seq, new_msg_seq] () mutable {
-    return wait_write_exit().then([this, do_reset] {
+    return wait_out_exit_dispatching().then([this, do_reset] {
       if (do_reset) {
         reset_session(true);
       }
@@ -1735,7 +1735,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
       if (reconnect) {
         connect_seq = new_connect_seq;
         // send_reconnect_ok() logic
-        requeue_up_to(new_msg_seq);
+        requeue_out_sent_up_to(new_msg_seq);
         auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq());
         logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq());
         return write_frame(reconnect_ok);
@@ -1783,7 +1783,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
     const std::deque<MessageURef>& msgs,
     size_t num_msgs,
     bool require_keepalive,
-    std::optional<utime_t> _keepalive_ack,
+    std::optional<utime_t> maybe_keepalive_ack,
     bool require_ack)
 {
   ceph::bufferlist bl;
@@ -1794,8 +1794,8 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
     INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
   }
 
-  if (unlikely(_keepalive_ack.has_value())) {
-    auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
+  if (unlikely(maybe_keepalive_ack.has_value())) {
+    auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
     bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm));
     INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
   }
@@ -1814,7 +1814,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
     msg->encode(conn.features, 0);
 
     ceph_assert(!msg->get_seq() && "message already has seq");
-    msg->set_seq(increment_out());
+    msg->set_seq(increment_out_seq());
 
     ceph_msg_header &header = msg->get_header();
     ceph_msg_footer &footer = msg->get_footer();
@@ -1916,7 +1916,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
     logger().debug("{} <== #{} === {} ({})",
                   conn, message->get_seq(), *message, message->get_type());
     notify_ack();
-    ack_writes(current_header.ack_seq);
+    ack_out_sent(current_header.ack_seq);
 
     // TODO: change MessageRef with seastar::shared_ptr
     auto msg_ref = MessageRef{message, false};
@@ -1928,7 +1928,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
 void ProtocolV2::execute_ready(bool dispatch_connect)
 {
   assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
-  trigger_state(state_t::READY, write_state_t::open, false);
+  trigger_state(state_t::READY, out_state_t::open, false);
   if (dispatch_connect) {
     dispatchers.ms_handle_connect(
        seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
@@ -1978,7 +1978,7 @@ void ProtocolV2::execute_ready(bool dispatch_connect)
               // handle_message_ack() logic
               auto ack = AckFrame::Decode(rx_segments_data.back());
               logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
-              ack_writes(ack.seq());
+              ack_out_sent(ack.seq());
             });
           case Tag::KEEPALIVE2:
             return read_frame_payload().then([this] {
@@ -2022,16 +2022,16 @@ void ProtocolV2::execute_ready(bool dispatch_connect)
 
 void ProtocolV2::execute_standby()
 {
-  trigger_state(state_t::STANDBY, write_state_t::delay, false);
+  trigger_state(state_t::STANDBY, out_state_t::delay, false);
   if (conn.socket) {
     conn.socket->shutdown();
   }
 }
 
-void ProtocolV2::notify_write()
+void ProtocolV2::notify_out()
 {
   if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
-    logger().info("{} notify_write(): at {}, going to CONNECTING",
+    logger().info("{} notify_out(): at {}, going to CONNECTING",
                   conn, get_state_name(state));
     execute_connecting();
   }
@@ -2041,7 +2041,7 @@ void ProtocolV2::notify_write()
 
 void ProtocolV2::execute_wait(bool max_backoff)
 {
-  trigger_state(state_t::WAIT, write_state_t::delay, false);
+  trigger_state(state_t::WAIT, out_state_t::delay, false);
   if (conn.socket) {
     conn.socket->shutdown();
   }
@@ -2075,7 +2075,7 @@ void ProtocolV2::execute_wait(bool max_backoff)
 
 void ProtocolV2::execute_server_wait()
 {
-  trigger_state(state_t::SERVER_WAIT, write_state_t::none, false);
+  trigger_state(state_t::SERVER_WAIT, out_state_t::none, false);
   gated_execute("execute_server_wait", [this] {
     return read_exactly(1).then([this] (auto bl) {
       logger().warn("{} SERVER_WAIT got read, abort", conn);
@@ -2110,7 +2110,7 @@ void ProtocolV2::trigger_close()
   }
 
   protocol_timer.cancel();
-  trigger_state(state_t::CLOSING, write_state_t::drop, false);
+  trigger_state(state_t::CLOSING, out_state_t::drop, false);
 }
 
 void ProtocolV2::on_closed()
index b580110834f9d2ea78945ca941ecd193fbb8bd8c..cfd5781ff0435ac75f246df82cc7811029232fcf 100644 (file)
@@ -41,7 +41,7 @@ class ProtocolV2 final : public Protocol {
       std::optional<utime_t> keepalive_ack,
       bool require_ack) override;
 
-  void notify_write() override;
+  void notify_out() override;
 
  private:
   SocketMessenger &messenger;
@@ -76,7 +76,7 @@ class ProtocolV2 final : public Protocol {
     return statenames[static_cast<int>(state)];
   }
 
-  void trigger_state(state_t state, write_state_t write_state, bool reentrant);
+  void trigger_state(state_t state, out_state_t out_state, bool reentrant);
 
   uint64_t peer_supported_features = 0;
 
index e5419125ae971bc18e912a6a75eed9586bb30b78..bd7259c6e7c94babe8b95a9e1f74aec37285a466 100644 (file)
@@ -75,12 +75,12 @@ seastar::future<> SocketConnection::send(MessageURef msg)
     });
 }
 
-seastar::future<> SocketConnection::keepalive()
+seastar::future<> SocketConnection::send_keepalive()
 {
   return seastar::smp::submit_to(
     shard_id(),
     [this] {
-      return protocol->keepalive();
+      return protocol->send_keepalive();
     });
 }
 
index 5e928de79db5994d1cfe226df477a1ce6d670fc2..ea18407e4591eb65574d3d2b8c355728a3ea38cd 100644 (file)
@@ -82,7 +82,7 @@ class SocketConnection : public Connection {
 
   seastar::future<> send(MessageURef msg) override;
 
-  seastar::future<> keepalive() override;
+  seastar::future<> send_keepalive() override;
 
   clock_t::time_point get_last_keepalive() const override;
 
index a0b982645e8cfe890df2b9fa31f4c9809abacd86..e5215a0ac55bc45fb2430501cb6342a535e070ee 100644 (file)
@@ -197,7 +197,7 @@ static seastar::future<> test_echo(unsigned rounds,
             [this, conn, &count_ping, &count_keepalive] {
               return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
                   if (keepalive_dist(rng)) {
-                    return conn->keepalive()
+                    return conn->send_keepalive()
                       .then([&count_keepalive] {
                         count_keepalive += 1;
                         return seastar::make_ready_future<seastar::stop_iteration>(
@@ -1155,7 +1155,7 @@ class FailoverSuite : public Dispatcher {
   seastar::future<> keepalive_peer() {
     logger().info("[Test] keepalive_peer()");
     ceph_assert(tracked_conn);
-    return tracked_conn->keepalive();
+    return tracked_conn->send_keepalive();
   }
 
   seastar::future<> try_send_peer() {
@@ -1524,7 +1524,7 @@ class FailoverSuitePeer : public Dispatcher {
   seastar::future<> keepalive_peer() {
     logger().info("[TestPeer] keepalive_peer()");
     ceph_assert(tracked_conn);
-    return tracked_conn->keepalive();
+    return tracked_conn->send_keepalive();
   }
 
   seastar::future<> markdown() {