]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: support frame read/write for ProtocolV2
authorYingxin Cheng <yingxincheng@gmail.com>
Wed, 6 Mar 2019 14:58:48 +0000 (22:58 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 5 Apr 2019 03:21:18 +0000 (11:21 +0800)
Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index c0b81c833f51e0e70646d3071f8556cfece321b0..21415e16ab50db5895e7962eff6751880af7e89f 100644 (file)
@@ -86,6 +86,205 @@ void ProtocolV2::start_accept(SocketFRef&& sock,
   execute_accepting();
 }
 
+// TODO: Frame related implementations, probably to a separate class.
+
+void ProtocolV2::enable_recording()
+{
+  ceph_assert(!rxbuf.length());
+  ceph_assert(!txbuf.length());
+  ceph_assert(!record_io);
+  record_io = true;
+}
+
+seastar::future<Socket::tmp_buf> ProtocolV2::read_exactly(size_t bytes)
+{
+  if (unlikely(record_io)) {
+    return socket->read_exactly(bytes)
+    .then([this] (auto bl) {
+      rxbuf.append(buffer::create(bl.share()));
+      return std::move(bl);
+    });
+  } else {
+    return socket->read_exactly(bytes);
+  };
+}
+
+seastar::future<bufferlist> ProtocolV2::read(size_t bytes)
+{
+  if (unlikely(record_io)) {
+    return socket->read(bytes)
+    .then([this] (auto buf) {
+      rxbuf.append(buf);
+      return std::move(buf);
+    });
+  } else {
+    return socket->read(bytes);
+  }
+}
+
+seastar::future<> ProtocolV2::write(bufferlist&& buf)
+{
+  if (unlikely(record_io)) {
+    txbuf.append(buf);
+  }
+  return socket->write(std::move(buf));
+}
+
+seastar::future<> ProtocolV2::write_flush(bufferlist&& buf)
+{
+  if (unlikely(record_io)) {
+    txbuf.append(buf);
+  }
+  return socket->write_flush(std::move(buf));
+}
+
+size_t ProtocolV2::get_current_msg_size() const
+{
+  ceph_assert(!rx_segments_desc.empty());
+  size_t sum = 0;
+  // we don't include SegmentIndex::Msg::HEADER.
+  for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
+    sum += rx_segments_desc[idx].length;
+  }
+  return sum;
+}
+
+seastar::future<Tag> ProtocolV2::read_main_preamble()
+{
+  return read_exactly(FRAME_PREAMBLE_SIZE)
+    .then([this] (auto bl) {
+      if (session_stream_handlers.rx) {
+        session_stream_handlers.rx->reset_rx_handler();
+        /*
+        bl = session_stream_handlers.rx->authenticated_decrypt_update(
+            std::move(bl), segment_t::DEFAULT_ALIGNMENT);
+        */
+      }
+
+      // I expect ceph_le32 will make the endian conversion for me. Passing
+      // everything through ::Decode is unnecessary.
+      const auto& main_preamble = \
+        *reinterpret_cast<const preamble_block_t*>(bl.get());
+
+      // verify preamble's CRC before any further processing
+      const auto rx_crc = ceph_crc32c(0,
+        reinterpret_cast<const unsigned char*>(&main_preamble),
+        sizeof(main_preamble) - sizeof(main_preamble.crc));
+      if (rx_crc != main_preamble.crc) {
+        logger().error("{} crc mismatch for main preamble rx_crc={} tx_crc={}",
+                       conn, rx_crc, main_preamble.crc);
+        abort_in_fault();
+      }
+      logger().debug("{} read main preamble: tag={}, len={}", conn, (int)main_preamble.tag, bl.size());
+
+      // currently we do support between 1 and MAX_NUM_SEGMENTS segments
+      if (main_preamble.num_segments < 1 ||
+          main_preamble.num_segments > MAX_NUM_SEGMENTS) {
+        logger().error("{} unsupported num_segments={}",
+                       conn, main_preamble.num_segments);
+        abort_in_fault();
+      }
+      if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
+        logger().error("{} num_segments too much: {}",
+                       conn, main_preamble.num_segments);
+        abort_in_fault();
+      }
+
+      rx_segments_desc.clear();
+      rx_segments_data.clear();
+
+      for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
+        logger().debug("{} got new segment: len={} align={}",
+                       conn, main_preamble.segments[idx].length,
+                       main_preamble.segments[idx].alignment);
+        rx_segments_desc.emplace_back(main_preamble.segments[idx]);
+      }
+
+      return static_cast<Tag>(main_preamble.tag);
+    });
+}
+
+seastar::future<> ProtocolV2::read_frame_payload()
+{
+  ceph_assert(!rx_segments_desc.empty());
+  ceph_assert(rx_segments_data.empty());
+
+  return seastar::do_until(
+    [this] { return rx_segments_desc.size() == rx_segments_data.size(); },
+    [this] {
+      // description of current segment to read
+      const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
+      // TODO: create aligned and contiguous buffer from socket
+      if (cur_rx_desc.alignment != segment_t::DEFAULT_ALIGNMENT) {
+        logger().debug("{} cannot allocate {} aligned buffer at segment desc index {}",
+                       conn, cur_rx_desc.alignment, rx_segments_data.size());
+      }
+      // TODO: create aligned and contiguous buffer from socket
+      return read_exactly(cur_rx_desc.length)
+      .then([this] (auto tmp_bl) {
+        bufferlist data;
+        data.append(buffer::create(std::move(tmp_bl)));
+        logger().debug("{} read frame segment[{}], length={}",
+                       conn, rx_segments_data.size(), data.length());
+        if (session_stream_handlers.rx) {
+          // TODO
+          ceph_assert(false);
+        }
+        rx_segments_data.emplace_back(std::move(data));
+      });
+    }
+  ).then([this] {
+    // TODO: get_epilogue_size()
+    ceph_assert(!session_stream_handlers.rx);
+    return read_exactly(FRAME_PLAIN_EPILOGUE_SIZE);
+  }).then([this] (auto bl) {
+    logger().debug("{} read frame epilogue length={}", conn, bl.size());
+
+    __u8 late_flags;
+    if (session_stream_handlers.rx) {
+      // TODO
+      ceph_assert(false);
+    } else {
+      auto& epilogue = *reinterpret_cast<const epilogue_plain_block_t*>(bl.get());
+      for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
+        const __u32 expected_crc = epilogue.crc_values[idx];
+        const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
+        if (expected_crc != calculated_crc) {
+          logger().error("{} message integrity check failed at index {}:"
+                         " expected_crc={} calculated_crc={}",
+                         conn, (unsigned int)idx, expected_crc, calculated_crc);
+          abort_in_fault();
+        } else {
+          logger().debug("{} message integrity check success at index {}: crc={}",
+                         conn, (unsigned int)idx, expected_crc);
+        }
+      }
+      late_flags = epilogue.late_flags;
+    }
+
+    // we do have a mechanism that allows transmitter to start sending message
+    // and abort after putting entire data field on wire. This will be used by
+    // the kernel client to avoid unnecessary buffering.
+    if (late_flags & FRAME_FLAGS_LATEABRT) {
+      // TODO
+      ceph_assert(false);
+    }
+  });
+}
+
+template <class F>
+seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
+{
+  auto bl = frame.get_buffer(session_stream_handlers);
+  logger().debug("{} write frame: tag={}, len={}", conn,
+                 static_cast<uint32_t>(frame.tag), bl.length());
+  if (flush) {
+    return write_flush(std::move(bl));
+  } else {
+    return write(std::move(bl));
+  }
+}
+
 void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant)
 {
   if (!reentrant && _state == state) {
index 3c761c9fa3cb566b1fceeabe84f4cac749af3ffd..a57d2668706006995ce962cfb0a9553d70074d55 100644 (file)
@@ -5,6 +5,7 @@
 
 #include "Protocol.h"
 #include "msg/async/frames_v2.h"
+#include "msg/async/crypto_onwire.h"
 
 namespace ceph::net {
 
@@ -63,6 +64,30 @@ class ProtocolV2 final : public Protocol {
 
   uint64_t global_seq = 0;
 
+ // TODO: Frame related implementations, probably to a separate class.
+ private:
+  bool record_io = false;
+  ceph::bufferlist rxbuf;
+  ceph::bufferlist txbuf;
+
+  void enable_recording();
+  seastar::future<Socket::tmp_buf> read_exactly(size_t bytes);
+  seastar::future<bufferlist> read(size_t bytes);
+  seastar::future<> write(bufferlist&& buf);
+  seastar::future<> write_flush(bufferlist&& buf);
+
+  ceph::crypto::onwire::rxtx_t session_stream_handlers;
+  boost::container::static_vector<ceph::msgr::v2::segment_t,
+                                 ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_desc;
+  boost::container::static_vector<ceph::bufferlist,
+                                 ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_data;
+
+  size_t get_current_msg_size() const;
+  seastar::future<ceph::msgr::v2::Tag> read_main_preamble();
+  seastar::future<> read_frame_payload();
+  template <class F>
+  seastar::future<> write_frame(F &frame, bool flush=true);
+
  private:
   seastar::future<> fault();
   seastar::future<> banner_exchange();