crimson/net: message frame exchanges (lossy part)
authorYingxin Cheng <yingxincheng@gmail.com>
Tue, 19 Mar 2019 14:13:33 +0000 (22:13 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 5 Apr 2019 03:21:19 +0000 (11:21 +0800)
Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index 7a12511b1fd85ab22a79d7f3b37289d39e607de5..115f0618006af92f75c0d26189221c64b57cbdb9 100644 (file)
@@ -9,6 +9,7 @@
 #include "crimson/auth/AuthClient.h"
 #include "crimson/auth/AuthServer.h"
 
+#include "Config.h"
 #include "Dispatcher.h"
 #include "Errors.h"
 #include "Socket.h"
@@ -1336,34 +1337,223 @@ seastar::future<> ProtocolV2::send_reconnect_ok()
 
 seastar::future<> ProtocolV2::write_message(MessageRef msg)
 {
-  // TODO not implemented
-  // <scheduled by parent, to send out the message on the wire>
-  ceph_assert(false);
+  // TODO: move to common code
+  // set priority
+  msg->get_header().src = messenger.get_myname();
+
+  msg->encode(conn.features, 0);
+
+  msg->set_seq(++conn.out_seq);
+  uint64_t ack_seq = conn.in_seq;
+  // ack_left = 0;
+
+  ceph_msg_header &header = msg->get_header();
+  ceph_msg_footer &footer = msg->get_footer();
+
+  ceph_msg_header2 header2{header.seq,        header.tid,
+                           header.type,       header.priority,
+                           header.version,
+                           0,                 header.data_off,
+                           ack_seq,
+                           footer.flags,      header.compat_version,
+                           header.reserved};
+
+  auto message = MessageFrame::Encode(header2,
+      msg->get_payload(), msg->get_middle(), msg->get_data());
+  logger().debug("{} write msg type={} off={} seq={}",
+                 conn, header2.type, header2.data_off, header2.seq);
+  return write_frame(message, false);
 }
 
 seastar::future<> ProtocolV2::do_keepalive()
 {
-  // TODO not implemented
-  // <scheduled by parent, to send out KeepAliveFrame on the wire>
-  ceph_assert(false);
+  auto keepalive_frame = KeepAliveFrame::Encode();
+  return write_frame(keepalive_frame, false);
 }
 
 seastar::future<> ProtocolV2::do_keepalive_ack()
 {
-  // TODO not implemented
-  // <scheduled by parent, to send out KeepAliveAckFrame on the wire>
-  ceph_assert(false);
+  auto keepalive_ack_frame = KeepAliveFrameAck::Encode(last_keepalive_ack_to_send);
+  return write_frame(keepalive_ack_frame, false);
+}
+
+void ProtocolV2::handle_message_ack(seq_num_t seq) {
+  if (conn.policy.lossy) {  // lossy connections don't keep sent messages
+    return;
+  }
+
+  // TODO: lossless policy
+}
+
+seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
+{
+  return read_frame_payload()
+  .then([this, throttle_stamp] {
+    utime_t recv_stamp = ceph_clock_now();
+
+    // we need to get the size before std::moving segments data
+    const size_t cur_msg_size = get_current_msg_size();
+    auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
+    // XXX: paranoid copy just to avoid oops
+    ceph_msg_header2 current_header = msg_frame.header();
+
+    logger().debug("{} got {} + {} + {} byte message,"
+                   " envelope type={} src={} off={} seq={}",
+                   conn, msg_frame.front_len(), msg_frame.middle_len(),
+                   msg_frame.data_len(), current_header.type, peer_name,
+                   current_header.data_off, current_header.seq);
+
+    ceph_msg_header header{current_header.seq,
+                           current_header.tid,
+                           current_header.type,
+                           current_header.priority,
+                           current_header.version,
+                           msg_frame.front_len(),
+                           msg_frame.middle_len(),
+                           msg_frame.data_len(),
+                           current_header.data_off,
+                           peer_name,
+                           current_header.compat_version,
+                           current_header.reserved,
+                           0};
+    ceph_msg_footer footer{0, 0, 0, 0, current_header.flags};
+
+    Message *message = decode_message(nullptr, 0, header, footer,
+        msg_frame.front(), msg_frame.middle(), msg_frame.data(), nullptr);
+    if (!message) {
+      logger().warn("{} decode message failed", conn);
+      abort_in_fault();
+    }
+
+    // store reservation size in message, so we don't get confused
+    // by messages entering the dispatch queue through other paths.
+    message->set_dispatch_throttle_size(cur_msg_size);
+
+    message->set_throttle_stamp(throttle_stamp);
+    message->set_recv_stamp(recv_stamp);
+    message->set_recv_complete_stamp(ceph_clock_now());
+
+    // check received seq#.  if it is old, drop the message.
+    // note that incoming messages may skip ahead.  this is convenient for the
+    // client side queueing because messages can't be renumbered, but the (kernel)
+    // client will occasionally pull a message out of the sent queue to send
+    // elsewhere.  in that case it doesn't matter if we "got" it or not.
+    uint64_t cur_seq = conn.in_seq;
+    if (message->get_seq() <= cur_seq) {
+      logger().error("{} got old message {} <= {} {} {}, discarding",
+                     conn, message->get_seq(), cur_seq, message, *message);
+      if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
+          conf.ms_die_on_old_message) {
+        ceph_assert(0 == "old msgs despite reconnect_seq feature");
+      }
+      return;
+    } else if (message->get_seq() > cur_seq + 1) {
+      logger().error("{} missed message? skipped from seq {} to {}",
+                     conn, cur_seq, message->get_seq());
+      if (conf.ms_die_on_skipped_message) {
+        ceph_assert(0 == "skipped incoming seq");
+      }
+    }
+
+    // note last received message.
+    conn.in_seq = message->get_seq();
+    logger().debug("{} received message m={} seq={} from={} type={} {}",
+                   conn, message, message->get_seq(), message->get_source(),
+                   header.type, *message);
+
+    if (!conn.policy.lossy) {
+      // ++ack_left;
+    }
+    handle_message_ack(current_header.ack_seq);
+
+    // TODO: change MessageRef with seastar::shared_ptr
+    auto msg_ref = MessageRef{message, false};
+    seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
+      return dispatcher.ms_dispatch(
+          seastar::static_pointer_cast<SocketConnection>(
+            conn.shared_from_this()),
+          std::move(msg))
+      .handle_exception([this] (std::exception_ptr eptr) {
+        logger().error("{} ms_dispatch caught exception: {}", conn, eptr);
+        ceph_assert(false);
+      });
+    });
+  });
 }
 
 void ProtocolV2::execute_ready()
 {
-  // <schedule sending messages, AckFrame, KeepAliveFrame, KeepAliveAckFrame,
-  //  i.e. trigger READY state with write_state_t::open>
-  //   trigger_state(state_t::READY, write_state_t::open, false);
-  // TODO: schedule reading messages, AckFrame, KeepAliveFrame, KeepAliveAckFrame
-  state = state_t::READY;
-  logger().info("{} reaches READY state successfully.", conn);
-  close();
+  trigger_state(state_t::READY, write_state_t::open, false);
+  seastar::with_gate(pending_dispatch, [this] {
+    return seastar::keep_doing([this] {
+      return read_main_preamble()
+      .then([this] (Tag tag) {
+        switch (tag) {
+          case Tag::MESSAGE: {
+            return seastar::futurize_apply([this] {
+              // throttle_message() logic
+              if (!conn.policy.throttler_messages) {
+                return seastar::now();
+              }
+              // TODO: message throttler
+              ceph_assert(false);
+              return seastar::now();
+            }).then([this] {
+              // throttle_bytes() logic
+              if (!conn.policy.throttler_bytes) {
+                return seastar::now();
+              }
+              size_t cur_msg_size = get_current_msg_size();
+              if (!cur_msg_size) {
+                return seastar::now();
+              }
+              logger().debug("{} wants {} bytes from policy throttler {}/{}",
+                             conn, cur_msg_size,
+                             conn.policy.throttler_bytes->get_current(),
+                             conn.policy.throttler_bytes->get_max());
+              return conn.policy.throttler_bytes->get(cur_msg_size);
+            }).then([this] {
+              // TODO: throttle_dispatch_queue() logic
+              utime_t throttle_stamp = ceph_clock_now();
+              return read_message(throttle_stamp);
+            });
+          }
+          case Tag::ACK:
+            return read_frame_payload()
+            .then([this] {
+              // handle_message_ack() logic
+              auto ack = AckFrame::Decode(rx_segments_data.back());
+              handle_message_ack(ack.seq());
+            });
+          case Tag::KEEPALIVE2:
+            return read_frame_payload()
+            .then([this] {
+              // handle_keepalive2() logic
+              auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back());
+              last_keepalive_ack_to_send = keepalive_frame.timestamp();
+              logger().debug("{} got KEEPALIVE2 {}",
+                             conn, last_keepalive_ack_to_send);
+              conn.last_keepalive = ceph_clock_now();
+              notify_keepalive_ack();
+            });
+          case Tag::KEEPALIVE2_ACK:
+            return read_frame_payload()
+            .then([this] {
+              // handle_keepalive2_ack() logic
+              auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back());
+              conn.last_keepalive_ack = keepalive_ack_frame.timestamp();
+              logger().debug("{} got KEEPALIVE_ACK {}",
+                             conn, conn.last_keepalive_ack);
+            });
+          default:
+            return unexpected_tag(tag, conn, "execute_ready");
+        }
+      });
+    }).handle_exception([this] (std::exception_ptr eptr) {
+      // TODO: handle fault in READY state
+      return fault();
+    });
+  });
 }
 
 // STANDBY state
index 08f935470fd788eda6d0ec1f78ab76c02db99ff9..e176dc343fafbb45c4cea28981121bfce5197569 100644 (file)
@@ -72,6 +72,8 @@ class ProtocolV2 final : public Protocol {
   uint64_t peer_global_seq = 0;
   uint64_t connect_seq = 0;
 
+  utime_t last_keepalive_ack_to_send;
+
  // TODO: Frame related implementations, probably to a separate class.
  private:
   bool record_io = false;
@@ -143,6 +145,8 @@ class ProtocolV2 final : public Protocol {
   seastar::future<> send_reconnect_ok();
 
   // READY
+  seastar::future<> read_message(utime_t throttle_stamp);
+  void handle_message_ack(seq_num_t seq);
   void execute_ready();
 
   // STANDBY