]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: move IO members into Protocol class
authorYingxin Cheng <yingxin.cheng@intel.com>
Tue, 1 Nov 2022 08:43:15 +0000 (16:43 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 8 Feb 2023 06:07:41 +0000 (14:07 +0800)
In order to introduce the cross-core IOHandler class.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index 734af8eda5382239df3ddbcc6b44e48b2007cc10..1adfe895455ffe5357b3391c3362ee019ec5dab3 100644 (file)
@@ -92,24 +92,24 @@ ceph::bufferlist Protocol::sweep_messages_and_move_to_sent(
       std::optional<utime_t> keepalive_ack,
       bool require_ack)
 {
-  ceph::bufferlist bl = do_sweep_messages(conn.out_q, 
-                                          num_msgs, 
-                                          require_keepalive, 
-                                          keepalive_ack, 
+  ceph::bufferlist bl = do_sweep_messages(out_q,
+                                          num_msgs,
+                                          require_keepalive,
+                                          keepalive_ack,
                                           require_ack);
   if (!conn.policy.lossy) {
-    conn.sent.insert(conn.sent.end(),
-                     std::make_move_iterator(conn.out_q.begin()),
-                     std::make_move_iterator(conn.out_q.end()));
+    sent.insert(sent.end(),
+                std::make_move_iterator(out_q.begin()),
+                std::make_move_iterator(out_q.end()));
   }
-  conn.out_q.clear();
+  out_q.clear();
   return bl;
 }
 
 seastar::future<> Protocol::send(MessageURef msg)
 {
   if (write_state != write_state_t::drop) {
-    conn.out_q.push_back(std::move(msg));
+    out_q.push_back(std::move(msg));
     write_event();
   }
   return seastar::now();
@@ -142,41 +142,41 @@ void Protocol::notify_ack()
 void Protocol::requeue_sent()
 {
   assert(write_state != write_state_t::open);
-  if (conn.sent.empty()) {
+  if (sent.empty()) {
     return;
   }
 
-  conn.out_seq -= conn.sent.size();
+  out_seq -= sent.size();
   logger().debug("{} requeue {} items, revert out_seq to {}",
-                 conn, conn.sent.size(), conn.out_seq);
-  for (MessageURef& msg : conn.sent) {
+                 conn, sent.size(), out_seq);
+  for (MessageURef& msg : sent) {
     msg->clear_payload();
     msg->set_seq(0);
   }
-  conn.out_q.insert(conn.out_q.begin(),
-                    std::make_move_iterator(conn.sent.begin()),
-                    std::make_move_iterator(conn.sent.end()));
-  conn.sent.clear();
+  out_q.insert(out_q.begin(),
+               std::make_move_iterator(sent.begin()),
+               std::make_move_iterator(sent.end()));
+  sent.clear();
   write_event();
 }
 
 void Protocol::requeue_up_to(seq_num_t seq)
 {
   assert(write_state != write_state_t::open);
-  if (conn.sent.empty() && conn.out_q.empty()) {
+  if (sent.empty() && out_q.empty()) {
     logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
-                   conn, conn.out_seq, seq);
-    conn.out_seq = seq;
+                   conn, out_seq, seq);
+    out_seq = seq;
     return;
   }
   logger().debug("{} discarding sent items by seq {} (sent_len={}, out_seq={})",
-                 conn, seq, conn.sent.size(), conn.out_seq);
-  while (!conn.sent.empty()) {
-    auto cur_seq = conn.sent.front()->get_seq();
+                 conn, seq, sent.size(), out_seq);
+  while (!sent.empty()) {
+    auto cur_seq = sent.front()->get_seq();
     if (cur_seq == 0 || cur_seq > seq) {
       break;
     } else {
-      conn.sent.pop_front();
+      sent.pop_front();
     }
   }
   requeue_sent();
@@ -185,9 +185,9 @@ void Protocol::requeue_up_to(seq_num_t seq)
 void Protocol::reset_write()
 {
   assert(write_state != write_state_t::open);
-  conn.out_seq = 0;
-  conn.out_q.clear();
-  conn.sent.clear();
+  out_seq = 0;
+  out_q.clear();
+  sent.clear();
   need_keepalive = false;
   keepalive_ack = std::nullopt;
   ack_left = 0;
@@ -198,10 +198,10 @@ void Protocol::ack_writes(seq_num_t seq)
   if (conn.policy.lossy) {  // lossy connections don't keep sent messages
     return;
   }
-  while (!conn.sent.empty() && conn.sent.front()->get_seq() <= seq) {
+  while (!sent.empty() && sent.front()->get_seq() <= seq) {
     logger().trace("{} got ack seq {} >= {}, pop {}",
-                   conn, seq, conn.sent.front()->get_seq(), *conn.sent.front());
-    conn.sent.pop_front();
+                   conn, seq, sent.front()->get_seq(), *sent.front());
+    sent.pop_front();
   }
 }
 
@@ -233,13 +233,13 @@ seastar::future<> Protocol::do_write_dispatch_sweep()
   return seastar::repeat([this] {
     switch (write_state) {
      case write_state_t::open: {
-      size_t num_msgs = conn.out_q.size();
+      size_t num_msgs = out_q.size();
       bool still_queued = is_queued();
       if (unlikely(!still_queued)) {
         return try_exit_sweep();
       }
       auto acked = ack_left;
-      assert(acked == 0 || conn.in_seq > 0);
+      assert(acked == 0 || in_seq > 0);
       // sweep all pending writes with the concrete Protocol
       return conn.socket->write(sweep_messages_and_move_to_sent(
           num_msgs, need_keepalive, keepalive_ack, acked > 0)
index f819c4692083a6b2ca6f0a98e8c8e7eebc94194b..268ffc996820434f6c1443f72794503f86c30db3 100644 (file)
@@ -50,7 +50,8 @@ class Protocol {
   virtual void start_accept(SocketRef&& socket,
                             const entity_addr_t& peer_addr) = 0;
 
-  virtual void print(std::ostream&) const = 0;
+  virtual void print_conn(std::ostream&) const = 0;
+
  protected:
   Protocol(ChainedDispatchers& dispatchers,
            SocketConnection& conn);
@@ -86,9 +87,39 @@ class Protocol {
 
 // the write state-machine
  public:
+  using clock_t = seastar::lowres_system_clock;
+
   seastar::future<> send(MessageURef msg);
+
   seastar::future<> keepalive();
 
+  clock_t::time_point get_last_keepalive() const {
+    return last_keepalive;
+  }
+
+  clock_t::time_point get_last_keepalive_ack() const {
+    return last_keepalive_ack;
+  }
+
+  void set_last_keepalive_ack(clock_t::time_point when) {
+    last_keepalive_ack = when;
+  }
+
+  struct io_stat_printer {
+    const Protocol &protocol;
+  };
+  void print_io_stat(std::ostream &out) const {
+    out << "io_stat("
+        << "in_seq=" << in_seq
+        << ", out_seq=" << out_seq
+        << ", out_q_size=" << out_q.size()
+        << ", sent_size=" << sent.size()
+        << ", need_ack=" << (ack_left > 0)
+        << ", need_keepalive=" << need_keepalive
+        << ", need_keepalive_ack=" << bool(keepalive_ack)
+        << ")";
+  }
+
 // TODO: encapsulate a SessionedSender class
  protected:
   // write_state is changed with state atomically, indicating the write
@@ -129,21 +160,56 @@ class Protocol {
 
   void reset_write();
 
+  void reset_read() {
+    in_seq = 0;
+  }
+
   bool is_queued() const {
-    return (!conn.out_q.empty() ||
+    return (!out_q.empty() ||
             ack_left > 0 ||
             need_keepalive ||
             keepalive_ack.has_value());
   }
 
+  bool is_queued_or_sent() const {
+    return is_queued() || !sent.empty();
+  }
+
   void ack_writes(seq_num_t seq);
+
+  void set_last_keepalive(clock_t::time_point when) {
+    last_keepalive = when;
+  }
+
+  seq_num_t get_in_seq() const {
+    return in_seq;
+  }
+
+  void set_in_seq(seq_num_t _in_seq) {
+    in_seq = _in_seq;
+  }
+
+  seq_num_t increment_out() {
+    return ++out_seq;
+  }
+
   crimson::common::Gated gate;
 
  private:
   write_state_t write_state = write_state_t::none;
+
   // wait until current state changed
   seastar::shared_promise<> state_changed;
 
+  /// the seq num of the last transmitted message
+  seq_num_t out_seq = 0;
+
+  // messages to be resent after connection gets reset
+  std::deque<MessageURef> out_q;
+
+  // messages sent, but not yet acked by peer
+  std::deque<MessageURef> sent;
+
   bool need_keepalive = false;
   std::optional<utime_t> keepalive_ack = std::nullopt;
   uint64_t ack_left = 0;
@@ -153,16 +219,28 @@ class Protocol {
   // it needs to wait for exit_open until writing is stopped or failed.
   std::optional<seastar::shared_promise<>> exit_open;
 
+  /// the seq num of the last received message
+  seq_num_t in_seq = 0;
+
+  clock_t::time_point last_keepalive;
+
+  clock_t::time_point last_keepalive_ack;
+
   seastar::future<stop_t> try_exit_sweep();
   seastar::future<> do_write_dispatch_sweep();
   void write_event();
 };
 
 inline std::ostream& operator<<(std::ostream& out, const Protocol& proto) {
-  proto.print(out);
+  proto.print_conn(out);
   return out;
 }
 
+inline std::ostream& operator<<(
+    std::ostream& out, Protocol::io_stat_printer stat) {
+  stat.protocol.print_io_stat(out);
+  return out;
+}
 
 } // namespace crimson::net
 
index 11fdbbcd5c5942238284be8837454dc3e57c8a8a..0e74ce827a5942c213e7e998f708618fb42ab32c 100644 (file)
@@ -379,8 +379,7 @@ void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr e
                   conn, func_name, get_state_name(state), eptr);
     close(true);
   } else if (conn.policy.server ||
-             (conn.policy.standby &&
-              (!is_queued() && conn.sent.empty()))) {
+             (conn.policy.standby && !is_queued_or_sent())) {
     logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
                   conn, func_name, get_state_name(state), eptr);
     execute_standby();
@@ -399,7 +398,7 @@ void ProtocolV2::reset_session(bool full)
 {
   server_cookie = 0;
   connect_seq = 0;
-  conn.in_seq = 0;
+  reset_read();
   if (full) {
     client_cookie = generate_client_cookie();
     peer_global_seq = 0;
@@ -755,12 +754,12 @@ ProtocolV2::client_reconnect()
                                           server_cookie,
                                           global_seq,
                                           connect_seq,
-                                          conn.in_seq);
+                                          get_in_seq());
   logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
-                 " server_cookie={}, gs={}, cs={}, msg_seq={}",
+                 " server_cookie={}, gs={}, cs={}, in_seq={}",
                  conn, messenger.get_myaddrs(),
                  client_cookie, server_cookie,
-                 global_seq, connect_seq, conn.in_seq);
+                 global_seq, connect_seq, get_in_seq());
   return write_frame(reconnect).then([this] {
     return read_main_preamble();
   }).then([this] (Tag tag) {
@@ -899,12 +898,11 @@ void ProtocolV2::execute_connecting()
           }
           switch (next) {
            case next_step_t::ready: {
-            logger().info("{} connected:"
-                          " gs={}, pgs={}, cs={}, client_cookie={},"
-                          " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
+            logger().info("{} connected: gs={}, pgs={}, cs={}, "
+                          "client_cookie={}, server_cookie={}, {}",
                           conn, global_seq, peer_global_seq, connect_seq,
-                          client_cookie, server_cookie, conn.in_seq,
-                          conn.out_seq, conn.out_q.size());
+                          client_cookie, server_cookie,
+                          io_stat_printer{*this});
             execute_ready(true);
             break;
            }
@@ -927,8 +925,7 @@ void ProtocolV2::execute_connecting()
           }
 
           if (conn.policy.server ||
-              (conn.policy.standby &&
-               (!is_queued() && conn.sent.empty()))) {
+              (conn.policy.standby && !is_queued_or_sent())) {
             logger().info("{} execute_connecting(): fault at {} with nothing to send,"
                           " going to STANDBY -- {}",
                           conn, get_state_name(state), eptr);
@@ -1607,11 +1604,11 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
                        conn, get_state_name(state));
         abort_protocol();
       }
-      logger().info("{} established: gs={}, pgs={}, cs={}, client_cookie={},"
-                    " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
+      logger().info("{} established: gs={}, pgs={}, cs={}, "
+                    "client_cookie={}, server_cookie={}, {}",
                     conn, global_seq, peer_global_seq, connect_seq,
-                    client_cookie, server_cookie, conn.in_seq,
-                    conn.out_seq, conn.out_q.size());
+                    client_cookie, server_cookie,
+                    io_stat_printer{*this});
       execute_ready(false);
     }).handle_exception([this] (std::exception_ptr eptr) {
       if (state != state_t::ESTABLISHING) {
@@ -1639,7 +1636,7 @@ ProtocolV2::send_server_ident()
 
   // this is required for the case when this connection is being replaced
   requeue_up_to(0);
-  conn.in_seq = 0;
+  reset_read();
 
   if (!conn.policy.lossy) {
     server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
@@ -1739,8 +1736,8 @@ void ProtocolV2::trigger_replacing(bool reconnect,
         connect_seq = new_connect_seq;
         // send_reconnect_ok() logic
         requeue_up_to(new_msg_seq);
-        auto reconnect_ok = ReconnectOkFrame::Encode(conn.in_seq);
-        logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, conn.in_seq);
+        auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq());
+        logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq());
         return write_frame(reconnect_ok);
       } else {
         client_cookie = new_client_cookie;
@@ -1761,12 +1758,12 @@ void ProtocolV2::trigger_replacing(bool reconnect,
                        conn, get_state_name(state));
         abort_protocol();
       }
-      logger().info("{} replaced ({}):"
-                    " gs={}, pgs={}, cs={}, client_cookie={}, server_cookie={},"
-                    " in_seq={}, out_seq={}, out_q={}",
+      logger().info("{} replaced ({}): gs={}, pgs={}, cs={}, "
+                    "client_cookie={}, server_cookie={}, {}",
                     conn, reconnect ? "reconnected" : "connected",
-                    global_seq, peer_global_seq, connect_seq, client_cookie,
-                    server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size());
+                    global_seq, peer_global_seq, connect_seq,
+                    client_cookie, server_cookie,
+                    io_stat_printer{*this});
       execute_ready(false);
     }).handle_exception([this] (std::exception_ptr eptr) {
       if (state != state_t::REPLACING) {
@@ -1804,7 +1801,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
   }
 
   if (require_ack && num_msgs == 0u) {
-    auto ack_frame = AckFrame::Encode(conn.in_seq);
+    auto ack_frame = AckFrame::Encode(get_in_seq());
     bl.append(ack_frame.get_buffer(tx_frame_asm));
     INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
   }
@@ -1817,7 +1814,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
     msg->encode(conn.features, 0);
 
     ceph_assert(!msg->get_seq() && "message already has seq");
-    msg->set_seq(++conn.out_seq);
+    msg->set_seq(increment_out());
 
     ceph_msg_header &header = msg->get_header();
     ceph_msg_footer &footer = msg->get_footer();
@@ -1826,7 +1823,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
                              header.type,       header.priority,
                              header.version,
                              ceph_le32(0),      header.data_off,
-                             ceph_le64(conn.in_seq),
+                             ceph_le64(get_in_seq()),
                              footer.flags,      header.compat_version,
                              header.reserved};
 
@@ -1897,7 +1894,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
     // client side queueing because messages can't be renumbered, but the (kernel)
     // client will occasionally pull a message out of the sent queue to send
     // elsewhere.  in that case it doesn't matter if we "got" it or not.
-    uint64_t cur_seq = conn.in_seq;
+    uint64_t cur_seq = get_in_seq();
     if (message->get_seq() <= cur_seq) {
       logger().error("{} got old message {} <= {} {}, discarding",
                      conn, message->get_seq(), cur_seq, *message);
@@ -1915,7 +1912,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
     }
 
     // note last received message.
-    conn.in_seq = message->get_seq();
+    set_in_seq(message->get_seq());
     logger().debug("{} <== #{} === {} ({})",
                   conn, message->get_seq(), *message, message->get_type());
     notify_ack();
@@ -1990,16 +1987,17 @@ void ProtocolV2::execute_ready(bool dispatch_connect)
               logger().debug("{} GOT KeepAliveFrame: timestamp={}",
                              conn, keepalive_frame.timestamp());
               notify_keepalive_ack(keepalive_frame.timestamp());
-              conn.set_last_keepalive(seastar::lowres_system_clock::now());
+              set_last_keepalive(seastar::lowres_system_clock::now());
             });
           case Tag::KEEPALIVE2_ACK:
             return read_frame_payload().then([this] {
               // handle_keepalive2_ack() logic
               auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back());
-              conn.set_last_keepalive_ack(
-                seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()});
+              auto _last_keepalive_ack =
+                seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()};
+              set_last_keepalive_ack(_last_keepalive_ack);
               logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
-                             conn, conn.last_keepalive_ack);
+                             conn, _last_keepalive_ack);
             });
           default: {
             unexpected_tag(tag, conn, "execute_ready");
@@ -2122,7 +2120,7 @@ void ProtocolV2::on_closed()
        conn.shared_from_this()));
 }
 
-void ProtocolV2::print(std::ostream& out) const
+void ProtocolV2::print_conn(std::ostream& out) const
 {
   out << conn;
 }
index d86d4b5721bccba5593a1901eb835ac4b8315b16..b580110834f9d2ea78945ca941ecd193fbb8bd8c 100644 (file)
@@ -20,7 +20,8 @@ class ProtocolV2 final : public Protocol {
              SocketConnection& conn,
              SocketMessenger& messenger);
   ~ProtocolV2() override;
-  void print(std::ostream&) const final;
+  void print_conn(std::ostream&) const final;
+
  private:
   void on_closed() override;
   bool is_connected() const override;
@@ -144,7 +145,7 @@ class ProtocolV2 final : public Protocol {
 
  private:
   void fault(bool backoff, const char* func_name, std::exception_ptr eptr);
-  void reset_session(bool full);
+  void reset_session(bool is_full);
   seastar::future<std::tuple<entity_type_t, entity_addr_t>>
   banner_exchange(bool is_connect);
 
index 5ba2ea5c5666fe521892b41a3ae3de94e144c6c2..e5419125ae971bc18e912a6a75eed9586bb30b78 100644 (file)
@@ -84,29 +84,27 @@ seastar::future<> SocketConnection::keepalive()
     });
 }
 
-void SocketConnection::mark_down()
+SocketConnection::clock_t::time_point
+SocketConnection::get_last_keepalive() const
 {
-  assert(seastar::this_shard_id() == shard_id());
-  protocol->close(false);
+  return protocol->get_last_keepalive();
 }
 
-bool SocketConnection::update_rx_seq(seq_num_t seq)
+SocketConnection::clock_t::time_point
+SocketConnection::get_last_keepalive_ack() const
 {
-  if (seq <= in_seq) {
-    if (HAVE_FEATURE(features, RECONNECT_SEQ) &&
-        local_conf()->ms_die_on_old_message) {
-      ceph_abort_msg("old msgs despite reconnect_seq feature");
-    }
-    return false;
-  } else if (seq > in_seq + 1) {
-    if (local_conf()->ms_die_on_skipped_message) {
-      ceph_abort_msg("skipped incoming seq");
-    }
-    return false;
-  } else {
-    in_seq = seq;
-    return true;
-  }
+  return protocol->get_last_keepalive_ack();
+}
+
+void SocketConnection::set_last_keepalive_ack(clock_t::time_point when)
+{
+  protocol->set_last_keepalive_ack(when);
+}
+
+void SocketConnection::mark_down()
+{
+  assert(seastar::this_shard_id() == shard_id());
+  protocol->close(false);
 }
 
 void
index 5a698919f28f73fdb976c1634df3794bfc569809..5e928de79db5994d1cfe226df477a1ce6d670fc2 100644 (file)
@@ -47,24 +47,10 @@ class SocketConnection : public Connection {
   // or should reconnect to (as peer)
   entity_addr_t target_addr;
 
-  clock_t::time_point last_keepalive;
-
-  clock_t::time_point last_keepalive_ack;
-
   uint64_t features = 0;
 
   ceph::net::Policy<crimson::common::Throttle> policy;
 
-  /// the seq num of the last transmitted message
-  seq_num_t out_seq = 0;
-  /// the seq num of the last received message
-  seq_num_t in_seq = 0;
-
-  // messages to be resent after connection gets reset
-  std::deque<MessageURef> out_q;
-  // messages sent, but not yet acked by peer
-  std::deque<MessageURef> sent;
-
   uint64_t peer_global_id = 0;
 
   std::unique_ptr<user_private_t> user_private;
@@ -98,17 +84,11 @@ class SocketConnection : public Connection {
 
   seastar::future<> keepalive() override;
 
-  clock_t::time_point get_last_keepalive() const override {
-    return last_keepalive;
-  }
+  clock_t::time_point get_last_keepalive() const override;
 
-  clock_t::time_point get_last_keepalive_ack() const override {
-    return last_keepalive_ack;
-  }
+  clock_t::time_point get_last_keepalive_ack() const override;
 
-  void set_last_keepalive_ack(clock_t::time_point when) override {
-    last_keepalive_ack = when;
-  }
+  void set_last_keepalive_ack(clock_t::time_point when) override;
 
   void mark_down() override;
 
@@ -151,11 +131,6 @@ class SocketConnection : public Connection {
 private:
   seastar::shard_id shard_id() const;
 
-  /// update the seq num of last received message
-  /// @returns true if the @c seq is valid, and @c in_seq is updated,
-  ///          false otherwise.
-  bool update_rx_seq(seq_num_t seq);
-
   void set_peer_type(entity_type_t peer_type) {
     // it is not allowed to assign an unknown value when the current
     // value is known
@@ -187,10 +162,6 @@ private:
     set_peer_id(name.num());
   }
 
-  void set_last_keepalive(clock_t::time_point when) {
-    last_keepalive = when;
-  }
-
   void set_features(uint64_t f) {
     features = f;
   }