]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async/ProtocolV2: switch to FrameAssembler
authorIlya Dryomov <idryomov@gmail.com>
Sat, 25 Apr 2020 09:44:47 +0000 (11:44 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Sun, 14 Jun 2020 11:56:09 +0000 (11:56 +0000)
Factor out the disassembly code from ProtocolV2 and switch
ProtocolV2 to FrameAssembler.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h
src/msg/async/frames_v2.cc
src/msg/async/frames_v2.h

index 9b9d6f122eea8a1f03c42dc10a99041290238cfa..00dd0ac5d5d61bd4b5e540d4f03785dd3f6ae48d 100644 (file)
@@ -93,6 +93,7 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection)
       can_write(false),
       bannerExchangeCallback(nullptr),
       tx_frame_asm(&session_stream_handlers),
+      rx_frame_asm(&session_stream_handlers),
       next_tag(static_cast<Tag>(0)),
       keepalive(false) {
 }
@@ -264,11 +265,11 @@ void ProtocolV2::reset_recv_state() {
 }
 
 size_t ProtocolV2::get_current_msg_size() const {
-  ceph_assert(!rx_segments_desc.empty());
+  ceph_assert(rx_frame_asm.get_num_segments() > 0);
   size_t sum = 0;
   // we don't include SegmentIndex::Msg::HEADER.
-  for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
-    sum += rx_segments_desc[idx].length;
+  for (size_t i = 1; i < rx_frame_asm.get_num_segments(); i++) {
+    sum += rx_frame_asm.get_segment_logical_len(i);
   }
   return sum;
 }
@@ -736,26 +737,6 @@ bool ProtocolV2::is_queued() {
   return !out_queue.empty() || connection->is_queued();
 }
 
-uint32_t ProtocolV2::get_onwire_size(const uint32_t logical_size) const {
-  if (session_stream_handlers.rx) {
-    return segment_onwire_size(logical_size);
-  } else {
-    return logical_size;
-  }
-}
-
-uint32_t ProtocolV2::get_epilogue_size() const {
-  // In secure mode size of epilogue is flexible and depends on particular
-  // cipher implementation. See the comment for epilogue_secure_block_t or
-  // epilogue_plain_block_t.
-  if (session_stream_handlers.rx) {
-    return FRAME_SECURE_EPILOGUE_SIZE + \
-        session_stream_handlers.rx->get_extra_size_at_final();
-  } else {
-    return FRAME_PLAIN_EPILOGUE_SIZE;
-  }
-}
-
 CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
                        rx_buffer_t &&buffer) {
   const auto len = buffer->length();
@@ -1071,7 +1052,12 @@ CtPtr ProtocolV2::read_frame() {
   }
 
   ldout(cct, 20) << __func__ << dendl;
-  return READ(FRAME_PREAMBLE_SIZE, handle_read_frame_preamble_main);
+  rx_preamble.clear();
+  rx_epilogue.clear();
+  rx_segments_data.clear();
+
+  return READ(rx_frame_asm.get_preamble_onwire_len(),
+              handle_read_frame_preamble_main);
 }
 
 CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) {
@@ -1083,73 +1069,31 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) {
     return _fault();
   }
 
-  ceph::bufferlist preamble;
-  preamble.push_back(std::move(buffer));
+  rx_preamble.push_back(std::move(buffer));
 
   ldout(cct, 30) << __func__ << " preamble\n";
-  preamble.hexdump(*_dout);
+  rx_preamble.hexdump(*_dout);
   *_dout << dendl;
 
-  if (session_stream_handlers.rx) {
-    ceph_assert(session_stream_handlers.rx);
-
-    session_stream_handlers.rx->reset_rx_handler();
-    session_stream_handlers.rx->authenticated_decrypt_update(preamble);
+  try {
+    next_tag = rx_frame_asm.disassemble_preamble(rx_preamble);
+  } catch (FrameError& e) {
+    ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+    return _fault();
+  } catch (ceph::crypto::onwire::MsgAuthError&) {
+    ldout(cct, 1) << __func__ << "bad auth tag" << dendl;
+    return _fault();
+  }
 
-    ldout(cct, 10) << __func__ << " got encrypted preamble."
-                   << " after decrypt premable.length()=" << preamble.length()
-                   << dendl;
+  ldout(cct, 25) << __func__ << " disassembled preamble " << rx_frame_asm
+                 << dendl;
 
+  if (session_stream_handlers.rx) {
     ldout(cct, 30) << __func__ << " preamble after decrypt\n";
-    preamble.hexdump(*_dout);
+    rx_preamble.hexdump(*_dout);
     *_dout << dendl;
   }
 
-  {
-    // I expect ceph_le32 will make the endian conversion for me. Passing
-    // everything through ::Decode is unnecessary.
-    const auto& main_preamble = \
-      reinterpret_cast<preamble_block_t&>(*preamble.c_str());
-
-    // verify preamble's CRC before any further processing
-    const auto rx_crc = ceph_crc32c(0,
-      reinterpret_cast<const unsigned char*>(&main_preamble),
-      sizeof(main_preamble) - sizeof(main_preamble.crc));
-    if (rx_crc != main_preamble.crc) {
-      ldout(cct, 10) << __func__ << " crc mismatch for main preamble"
-                    << " rx_crc=" << rx_crc
-                    << " tx_crc=" << main_preamble.crc << dendl;
-      return _fault();
-    }
-
-    // currently we do support between 1 and MAX_NUM_SEGMENTS segments
-    if (main_preamble.num_segments < 1 ||
-        main_preamble.num_segments > MAX_NUM_SEGMENTS) {
-      ldout(cct, 10) << __func__ << " unsupported num_segments="
-                    << " tx_crc=" << main_preamble.num_segments << dendl;
-      return _fault();
-    }
-
-    next_tag = static_cast<Tag>(main_preamble.tag);
-
-    rx_segments_desc.clear();
-    rx_segments_data.clear();
-
-    if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
-      ldout(cct, 30) << __func__
-                    << " num_segments=" << main_preamble.num_segments
-                    << " is too much" << dendl;
-      return _fault();
-    }
-    for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
-      ldout(cct, 10) << __func__ << " got new segment:"
-                    << " len=" << main_preamble.segments[idx].length
-                    << " align=" << main_preamble.segments[idx].alignment
-                    << dendl;
-      rx_segments_desc.emplace_back(main_preamble.segments[idx]);
-    }
-  }
-
   // does it need throttle?
   if (next_tag == Tag::MESSAGE) {
     if (state != READY) {
@@ -1202,21 +1146,23 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() {
 }
 
 CtPtr ProtocolV2::read_frame_segment() {
-  ldout(cct, 20) << __func__ << dendl;
-  ceph_assert(!rx_segments_desc.empty());
+  size_t seg_idx = rx_segments_data.size();
+  ldout(cct, 20) << __func__ << " seg_idx=" << seg_idx << dendl;
+  rx_segments_data.emplace_back();
+
+  uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
+  uint16_t align = rx_frame_asm.get_segment_align(seg_idx);
 
-  // description of current segment to read
-  const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
   rx_buffer_t rx_buffer;
   try {
     rx_buffer = ceph::buffer::ptr_node::create(ceph::buffer::create_aligned(
-      get_onwire_size(cur_rx_desc.length), cur_rx_desc.alignment));
+        onwire_len, align));
   } catch (std::bad_alloc&) {
     // Catching because of potential issues with satisfying alignment.
     ldout(cct, 1) << __func__ << " can't allocate aligned rx_buffer"
-                  << " len=" << get_onwire_size(cur_rx_desc.length)
-                  << " align=" << cur_rx_desc.alignment
-                  << dendl;
+                  << " len=" << onwire_len
+                  << " align=" << align
+                  << dendl;
     return _fault();
   }
 
@@ -1232,36 +1178,15 @@ CtPtr ProtocolV2::handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r) {
     return _fault();
   }
 
-  rx_segments_data.emplace_back();
   rx_segments_data.back().push_back(std::move(rx_buffer));
 
-  // decrypt incoming data
-  // FIXME: if (auth_meta->is_mode_secure()) {
-  if (session_stream_handlers.rx) {
-    ceph_assert(session_stream_handlers.rx);
-
-    auto& new_seg = rx_segments_data.back();
-    if (new_seg.length()) {
-      session_stream_handlers.rx->authenticated_decrypt_update(new_seg);
-      const auto idx = rx_segments_data.size() - 1;
-      if (new_seg.length() > rx_segments_desc[idx].length) {
-        new_seg.splice(rx_segments_desc[idx].length,
-                       new_seg.length() - rx_segments_desc[idx].length);
-      }
-
-      ldout(cct, 20) << __func__
-                     << " unpadded new_seg.length()=" << new_seg.length()
-                     << dendl;
-    }
-  }
-
-  if (rx_segments_desc.size() == rx_segments_data.size()) {
+  if (rx_segments_data.size() == rx_frame_asm.get_num_segments()) {
     // OK, all segments planned to read are read. Can go with epilogue.
-    return READ(get_epilogue_size(), handle_read_frame_epilogue_main);
-  } else {
-    // TODO: for makeshift only. This will be more generic and throttled
-    return read_frame_segment();
+    return READ(rx_frame_asm.get_epilogue_onwire_len(),
+                handle_read_frame_epilogue_main);
   }
+  // TODO: for makeshift only. This will be more generic and throttled
+  return read_frame_segment();
 }
 
 CtPtr ProtocolV2::handle_frame_payload() {
@@ -1361,61 +1286,29 @@ CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r)
     return _fault();
   }
 
-  __u8 late_flags;
+  rx_epilogue.push_back(std::move(buffer));
 
-  // FIXME: if (auth_meta->is_mode_secure()) {
-  if (session_stream_handlers.rx) {
-    ldout(cct, 1) << __func__ << " read frame epilogue bytes="
-                  << get_epilogue_size() << dendl;
-
-    // decrypt epilogue and authenticate entire frame.
-    ceph::bufferlist epilogue_bl;
-    {
-      epilogue_bl.push_back(std::move(buffer));
-      try {
-        session_stream_handlers.rx->authenticated_decrypt_update_final(
-            epilogue_bl);
-      } catch (ceph::crypto::onwire::MsgAuthError &e) {
-        ldout(cct, 5) << __func__ << " message authentication failed: "
-                      << e.what() << dendl;
-        return _fault();
-      }
-    }
-    auto& epilogue =
-        reinterpret_cast<epilogue_plain_block_t&>(*epilogue_bl.c_str());
-    late_flags = epilogue.late_flags;
-  } else {
-    auto& epilogue = reinterpret_cast<epilogue_plain_block_t&>(*buffer->c_str());
-
-    for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
-      const __u32 expected_crc = epilogue.crc_values[idx];
-      const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
-      if (expected_crc != calculated_crc) {
-       ldout(cct, 5) << __func__ << " message integrity check failed: "
-                     << " expected_crc=" << expected_crc
-                     << " calculated_crc=" << calculated_crc
-                     << dendl;
-       return _fault();
-      } else {
-       ldout(cct, 20) << __func__ << " message integrity check success: "
-                      << " expected_crc=" << expected_crc
-                      << " calculated_crc=" << calculated_crc
-                      << dendl;
-      }
-    }
-    late_flags = epilogue.late_flags;
+  bool aborted;
+  try {
+    aborted = !rx_frame_asm.disassemble_segments(rx_segments_data.data(),
+                                                 rx_epilogue);
+  } catch (FrameError& e) {
+    ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+    return _fault();
+  } catch (ceph::crypto::onwire::MsgAuthError&) {
+    ldout(cct, 1) << __func__ << "bad auth tag" << dendl;
+    return _fault();
   }
 
   // we do have a mechanism that allows transmitter to start sending message
   // and abort after putting entire data field on wire. This will be used by
   // the kernel client to avoid unnecessary buffering.
-  if (late_flags & FRAME_FLAGS_LATEABRT) {
+  if (aborted) {
     reset_throttle();
     state = READY;
     return CONTINUE(read_frame);
-  } else {
-    return handle_read_frame_dispatch();
   }
+  return handle_read_frame_dispatch();
 }
 
 CtPtr ProtocolV2::handle_message() {
index 78224772604ee79519581204815bce989cc28eb0..bb3508523b4f6e029d349b4281b026b406709efb 100644 (file)
@@ -4,8 +4,6 @@
 #ifndef _MSG_ASYNC_PROTOCOL_V2_
 #define _MSG_ASYNC_PROTOCOL_V2_
 
-#include <boost/container/static_vector.hpp>
-
 #include "Protocol.h"
 #include "crypto_onwire.h"
 #include "frames_v2.h"
@@ -96,9 +94,11 @@ private:
   using ProtFuncPtr = void (ProtocolV2::*)();
   Ct<ProtocolV2> *bannerExchangeCallback;
 
-  boost::container::static_vector<ceph::msgr::v2::segment_t,
-                                 ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_desc;
   ceph::msgr::v2::FrameAssembler tx_frame_asm;
+  ceph::msgr::v2::FrameAssembler rx_frame_asm;
+
+  ceph::bufferlist rx_preamble;
+  ceph::bufferlist rx_epilogue;
   ceph::msgr::v2::segment_bls_t rx_segments_data;
   ceph::msgr::v2::Tag next_tag;
   utime_t backoff;  // backoff time
@@ -251,8 +251,6 @@ private:
   Ct<ProtocolV2> *send_reconnect_ok();
   Ct<ProtocolV2> *server_ready();
 
-  uint32_t get_onwire_size(uint32_t logical_size) const;
-  uint32_t get_epilogue_size() const;
   size_t get_current_msg_size() const;
 };
 
index ead90ae9081a1d317f130a9c6173eeb951d58b30..bcbebdbd2d22923718ca8a3c358e7e23798d223a 100644 (file)
 
 namespace ceph::msgr::v2 {
 
+// Unpads bufferlist to unpadded_len.
+static void unpad_zero(bufferlist& bl, uint32_t unpadded_len) {
+  ceph_assert(bl.length() >= unpadded_len);
+  if (bl.length() > unpadded_len) {
+    bl.splice(unpadded_len, bl.length() - unpadded_len);
+  }
+}
+
 // Discards trailing empty segments, unless there is just one segment.
 // A frame always has at least one (possibly empty) segment.
 static size_t calc_num_segments(const bufferlist segment_bls[],
@@ -33,6 +41,15 @@ static size_t calc_num_segments(const bufferlist segment_bls[],
   return 1;
 }
 
+static void check_segment_crc(const bufferlist& segment_bl,
+                              uint32_t expected_crc) {
+  uint32_t crc = segment_bl.crc32c(-1);
+  if (crc != expected_crc) {
+    throw FrameError(fmt::format(
+        "bad segment crc calculated={} expected={}", crc, expected_crc));
+  }
+}
+
 void FrameAssembler::fill_preamble(Tag tag,
                                    preamble_block_t& preamble) const {
   // FIPS zeroization audit 20191115: this memset is not security related.
@@ -149,6 +166,85 @@ bufferlist FrameAssembler::assemble_frame(Tag tag, bufferlist segment_bls[],
   return asm_crc_rev0(preamble, segment_bls);
 }
 
+Tag FrameAssembler::disassemble_preamble(bufferlist& preamble_bl) {
+  ceph_assert(preamble_bl.length() == sizeof(preamble_block_t));
+  if (m_crypto->rx) {
+    m_crypto->rx->reset_rx_handler();
+    m_crypto->rx->authenticated_decrypt_update(preamble_bl);
+  }
+
+  // I expect ceph_le32 will make the endian conversion for me. Passing
+  // everything through ::Decode is unnecessary.
+  auto preamble = reinterpret_cast<const preamble_block_t*>(
+      preamble_bl.c_str());
+  // check preamble crc before any further processing
+  uint32_t crc = ceph_crc32c(
+      0, reinterpret_cast<const unsigned char*>(preamble),
+      sizeof(*preamble) - sizeof(preamble->crc));
+  if (crc != preamble->crc) {
+    throw FrameError(fmt::format(
+        "bad preamble crc calculated={} expected={}", crc, preamble->crc));
+  }
+
+  // see calc_num_segments()
+  if (preamble->num_segments < 1 ||
+      preamble->num_segments > MAX_NUM_SEGMENTS) {
+    throw FrameError(fmt::format(
+        "bad number of segments num_segments={}", preamble->num_segments));
+  }
+  if (preamble->num_segments > 1 &&
+      preamble->segments[preamble->num_segments - 1].length == 0) {
+    throw FrameError("last segment empty");
+  }
+
+  m_descs.resize(preamble->num_segments);
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    m_descs[i].logical_len = preamble->segments[i].length;
+    m_descs[i].align = preamble->segments[i].alignment;
+  }
+  return static_cast<Tag>(preamble->tag);
+}
+
+bool FrameAssembler::disasm_all_crc_rev0(bufferlist segment_bls[],
+                                         bufferlist& epilogue_bl) const {
+  ceph_assert(epilogue_bl.length() == sizeof(epilogue_plain_block_t));
+  auto epilogue = reinterpret_cast<const epilogue_plain_block_t*>(
+      epilogue_bl.c_str());
+
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    ceph_assert(segment_bls[i].length() == m_descs[i].logical_len);
+    check_segment_crc(segment_bls[i], epilogue->crc_values[i]);
+  }
+  return !(epilogue->late_flags & FRAME_FLAGS_LATEABRT);
+}
+
+bool FrameAssembler::disasm_all_secure_rev0(bufferlist segment_bls[],
+                                            bufferlist& epilogue_bl) const {
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    ceph_assert(segment_bls[i].length() == get_segment_padded_len(i));
+    if (segment_bls[i].length() > 0) {
+      m_crypto->rx->authenticated_decrypt_update(segment_bls[i]);
+      unpad_zero(segment_bls[i], m_descs[i].logical_len);
+    }
+  }
+
+  ceph_assert(epilogue_bl.length() == sizeof(epilogue_secure_block_t) +
+                                      get_auth_tag_len());
+  m_crypto->rx->authenticated_decrypt_update_final(epilogue_bl);
+  auto epilogue = reinterpret_cast<const epilogue_secure_block_t*>(
+      epilogue_bl.c_str());
+  return !(epilogue->late_flags & FRAME_FLAGS_LATEABRT);
+}
+
+bool FrameAssembler::disassemble_segments(bufferlist segment_bls[],
+                                          bufferlist& epilogue_bl) const {
+  ceph_assert(!m_descs.empty());
+  if (m_crypto->rx) {
+    return disasm_all_secure_rev0(segment_bls, epilogue_bl);
+  }
+  return disasm_all_crc_rev0(segment_bls, epilogue_bl);
+}
+
 std::ostream& operator<<(std::ostream& os, const FrameAssembler& frame_asm) {
   if (!frame_asm.m_descs.empty()) {
     os << frame_asm.get_preamble_onwire_len();
index 035549b8a064a822bafce5035c1f4cd1e679e10b..bfb86fba947fb7843999dc5cd68d24fcaee89094 100644 (file)
@@ -153,11 +153,6 @@ static constexpr uint32_t FRAME_SECURE_EPILOGUE_SIZE =
 
 #define FRAME_FLAGS_LATEABRT      (1<<0)   /* frame was aborted after txing data */
 
-static uint32_t segment_onwire_size(const uint32_t logical_size)
-{
-  return p2roundup<uint32_t>(logical_size, CRYPTO_BLOCK_SIZE);
-}
-
 struct FrameError : std::runtime_error {
   using runtime_error::runtime_error;
 };
@@ -210,6 +205,11 @@ public:
                             const uint16_t segment_aligns[],
                             size_t segment_count);
 
+  Tag disassemble_preamble(bufferlist& preamble_bl);
+
+  bool disassemble_segments(bufferlist segment_bls[],
+                            bufferlist& epilogue_bl) const;
+
 private:
   struct segment_desc_t {
     uint32_t logical_len;
@@ -230,6 +230,11 @@ private:
   bufferlist asm_secure_rev0(const preamble_block_t& preamble,
                              bufferlist segment_bls[]) const;
 
+  bool disasm_all_crc_rev0(bufferlist segment_bls[],
+                           bufferlist& epilogue_bl) const;
+  bool disasm_all_secure_rev0(bufferlist segment_bls[],
+                              bufferlist& epilogue_bl) const;
+
   void fill_preamble(Tag tag, preamble_block_t& preamble) const;
   friend std::ostream& operator<<(std::ostream& os,
                                   const FrameAssembler& frame_asm);