]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: use rx_frame_asm for handling data read from wire
authorKefu Chai <kchai@redhat.com>
Fri, 24 Jul 2020 08:33:22 +0000 (16:33 +0800)
committerKefu Chai <kchai@redhat.com>
Mon, 27 Jul 2020 03:29:10 +0000 (11:29 +0800)
by leveraging FrameAssembler, it's much simpler. and it also pave the
road to a better messenger v2.0 and v2.1 protocol support.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index 448e53407683afe0183d9efd24b8758b55303488..9397d70a4b241674cee9b04e614285c36b052e32 100644 (file)
@@ -150,8 +150,7 @@ ProtocolV2::ProtocolV2(ChainedDispatchersRef& dispatcher,
                        SocketMessenger& messenger)
   : Protocol(proto_t::v2, dispatcher, conn),
     messenger{messenger},
-    protocol_timer{conn},
-    tx_frame_asm(&session_stream_handlers, false)
+    protocol_timer{conn}
 {}
 
 ProtocolV2::~ProtocolV2() {}
@@ -249,11 +248,11 @@ seastar::future<> ProtocolV2::write_flush(bufferlist&& buf)
 
 size_t ProtocolV2::get_current_msg_size() const
 {
-  ceph_assert(!rx_segments_desc.empty());
+  ceph_assert(rx_frame_asm.get_num_segments() > 0);
   size_t sum = 0;
   // we don't include SegmentIndex::Msg::HEADER.
-  for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
-    sum += rx_segments_desc[idx].length;
+  for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
+    sum += rx_frame_asm.get_segment_logical_len(idx);
   }
   return sum;
 }
@@ -262,78 +261,37 @@ seastar::future<Tag> ProtocolV2::read_main_preamble()
 {
   return read_exactly(sizeof(preamble_block_t))
     .then([this] (auto bl) {
-      if (session_stream_handlers.rx) {
-        session_stream_handlers.rx->reset_rx_handler();
-        /*
-        bl = session_stream_handlers.rx->authenticated_decrypt_update(
-            std::move(bl), segment_t::DEFAULT_ALIGNMENT);
-        */
-      }
-
-      // I expect ceph_le32 will make the endian conversion for me. Passing
-      // everything through ::Decode is unnecessary.
-      const auto& main_preamble = \
-        *reinterpret_cast<const preamble_block_t*>(bl.get());
-      logger().trace("{} RECV({}) main preamble: tag={}, num_segments={}, crc={}",
-                     conn, bl.size(), (int)main_preamble.tag,
-                     (int)main_preamble.num_segments, main_preamble.crc);
-
-      // verify preamble's CRC before any further processing
-      const auto rx_crc = ceph_crc32c(0,
-        reinterpret_cast<const unsigned char*>(&main_preamble),
-        sizeof(main_preamble) - sizeof(main_preamble.crc));
-      if (rx_crc != main_preamble.crc) {
-        logger().warn("{} crc mismatch for main preamble rx_crc={} tx_crc={}",
-                      conn, rx_crc, main_preamble.crc);
-        abort_in_fault();
-      }
-
-      // currently we do support between 1 and MAX_NUM_SEGMENTS segments
-      if (main_preamble.num_segments < 1 ||
-          main_preamble.num_segments > MAX_NUM_SEGMENTS) {
-        logger().warn("{} unsupported num_segments={}",
-                      conn, main_preamble.num_segments);
-        abort_in_fault();
-      }
-      if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
-        logger().warn("{} num_segments too much: {}",
-                      conn, main_preamble.num_segments);
-        abort_in_fault();
-      }
-
-      rx_segments_desc.clear();
       rx_segments_data.clear();
-
-      for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
-        logger().trace("{} GOT frame segment: len={} align={}",
-                       conn, main_preamble.segments[idx].length,
-                       main_preamble.segments[idx].alignment);
-        rx_segments_desc.emplace_back(main_preamble.segments[idx]);
+      try {
+        bufferlist preamble;
+        preamble.append(buffer::create(std::move(bl)));
+        const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
+        INTERCEPT_FRAME(tag, bp_type_t::READ);
+        return tag;
+      } catch (FrameError& e) {
+        logger().warn("{} read_main_preamble: {}", conn, e.what());
+        abort_in_fault();
       }
-
-      INTERCEPT_FRAME(main_preamble.tag, bp_type_t::READ);
-      return static_cast<Tag>(main_preamble.tag);
     });
 }
 
 seastar::future<> ProtocolV2::read_frame_payload()
 {
-  ceph_assert(!rx_segments_desc.empty());
   ceph_assert(rx_segments_data.empty());
 
   return seastar::do_until(
-    [this] { return rx_segments_desc.size() == rx_segments_data.size(); },
+    [this] { return rx_frame_asm.get_num_segments() == rx_segments_data.size(); },
     [this] {
-      // description of current segment to read
-      const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
       // TODO: create aligned and contiguous buffer from socket
-      if (cur_rx_desc.alignment != segment_t::DEFAULT_ALIGNMENT) {
+      const size_t seg_idx = rx_segments_data.size();
+      if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx);
+         alignment != segment_t::DEFAULT_ALIGNMENT) {
         logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
-                       conn, cur_rx_desc.alignment, rx_segments_data.size());
+                       conn, alignment, rx_segments_data.size());
       }
+      uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
       // TODO: create aligned and contiguous buffer from socket
-      return read_exactly(cur_rx_desc.length)
-      .then([this] (auto tmp_bl) {
+      return read_exactly(onwire_len).then([this] (auto tmp_bl) {
         logger().trace("{} RECV({}) frame segment[{}]",
                        conn, tmp_bl.size(), rx_segments_data.size());
         bufferlist data;
@@ -346,40 +304,27 @@ seastar::future<> ProtocolV2::read_frame_payload()
       });
     }
   ).then([this] {
-    // TODO: get_epilogue_size()
     ceph_assert(!session_stream_handlers.rx);
-    return read_exactly(sizeof(epilogue_crc_rev0_block_t));
+    return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
   }).then([this] (auto bl) {
     logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
-
-    __u8 late_flags;
-    if (session_stream_handlers.rx) {
-      // TODO
-      ceph_assert(false);
-    } else {
-      auto& epilogue = *reinterpret_cast<const epilogue_crc_rev0_block_t*>(bl.get());
-      for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
-        const __u32 expected_crc = epilogue.crc_values[idx];
-        const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
-        if (expected_crc != calculated_crc) {
-          logger().warn("{} message integrity check failed at index {}:"
-                        " expected_crc={} calculated_crc={}",
-                        conn, (unsigned int)idx, expected_crc, calculated_crc);
-          abort_in_fault();
-        } else {
-          logger().trace("{} message integrity check success at index {}: crc={}",
-                         conn, (unsigned int)idx, expected_crc);
-        }
-      }
-      late_flags = epilogue.late_flags;
+    bool ok = false;
+    try {
+      // TODO: v2.1 rx_frame_asm.disassemble_first_segment();
+      bufferlist rx_epilogue;
+      rx_epilogue.append(buffer::create(std::move(bl)));
+      ok = rx_frame_asm.disassemble_remaining_segments(rx_segments_data.data(), rx_epilogue);
+    } catch (FrameError& e) {
+      logger().error("read_frame_payload: {} {}", conn, e.what());
+      abort_in_fault();
+    } catch (ceph::crypto::onwire::MsgAuthError&) {
+      logger().error("read_frame_payload: {} bad auth tag", conn);
+      abort_in_fault();
     }
-    logger().trace("{} GOT frame epilogue: late_flags={}",
-                   conn, (unsigned)late_flags);
-
     // we do have a mechanism that allows transmitter to start sending message
     // and abort after putting entire data field on wire. This will be used by
     // the kernel client to avoid unnecessary buffering.
-    if (late_flags & FRAME_LATE_FLAG_ABORTED) {
+    if (!ok) {
       // TODO
       ceph_assert(false);
     }
index 1630bd66f875d5964ad5b52fcee1f6dc13922db0..ea0f32611926da94fbe6ca2b08939028b3149b8e 100644 (file)
@@ -122,9 +122,8 @@ class ProtocolV2 final : public Protocol {
   seastar::future<> write_flush(bufferlist&& buf);
 
   ceph::crypto::onwire::rxtx_t session_stream_handlers;
-  boost::container::static_vector<ceph::msgr::v2::segment_t,
-                                 ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_desc;
-  ceph::msgr::v2::FrameAssembler tx_frame_asm;
+  ceph::msgr::v2::FrameAssembler tx_frame_asm{&session_stream_handlers, false};
+  ceph::msgr::v2::FrameAssembler rx_frame_asm{&session_stream_handlers, false};
   ceph::msgr::v2::segment_bls_t rx_segments_data;
 
   size_t get_current_msg_size() const;