]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/frames_v2: introduce FrameAssembler
authorIlya Dryomov <idryomov@gmail.com>
Fri, 24 Apr 2020 16:01:46 +0000 (18:01 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Mon, 13 Jul 2020 12:45:06 +0000 (14:45 +0200)
Start separating frame assembly and disassembly code from
frame sending, receiving and handling code, so that assembly
and disassembly pieces can be unit tested and hopefully also
shared between different messengers (e.g. crimson).

This commit factors out the assembly code from Frame.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
(cherry picked from commit 872b125a5b817def84ed5d70d204fec5a4fa7c1e)

Conflicts:
src/crimson/CMakeLists.txt [ crimson doesn't support msgr2
  in nautilus
src/crimson/net/ProtocolV2.cc [ ditto ]
src/crimson/net/ProtocolV2.h [ ditto ]
src/msg/async/frames_v2.h [ commits f1cf408412c8
  ("msg/async/frames_v2.h: fix warning"), c70f779d12a2
  ("headers: Make ceph_le member private"), 9908f0e652b9
  ("msg: Add optimizing move") and 1a975fb3a801 ("msg/async:
  fix unnecessary 4 kB allocation in secure mode.") not in
  nautilus; fmt include adjusted as in rgw_lc.cc ]

src/msg/CMakeLists.txt
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h
src/msg/async/frames_v2.cc [new file with mode: 0644]
src/msg/async/frames_v2.h

index 647bbb80e8361408c53334e95cfc0c1a99bba4a6..1ad346153655f7d665126cd9e873835b889e9607 100644 (file)
@@ -29,6 +29,7 @@ list(APPEND msg_srcs
   async/PosixStack.cc
   async/Stack.cc
   async/crypto_onwire.cc
+  async/frames_v2.cc
   async/net_handler.cc)
 
 if(LINUX)
index 378147b23ff852a29abc6f2662fd19834cb34453..5cb73678459b23746df39441a39db580859b83c5 100644 (file)
@@ -92,6 +92,7 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection)
       replacing(false),
       can_write(false),
       bannerExchangeCallback(nullptr),
+      tx_frame_asm(&session_stream_handlers),
       next_tag(static_cast<Tag>(0)),
       keepalive(false) {
 }
@@ -573,11 +574,14 @@ template <class F>
 bool ProtocolV2::append_frame(F& frame) {
   ceph::bufferlist bl;
   try {
-    bl = frame.get_buffer(session_stream_handlers);
+    bl = frame.get_buffer(tx_frame_asm);
   } catch (ceph::crypto::onwire::TxHandlerError &e) {
     ldout(cct, 1) << __func__ << " " << e.what() << dendl;
     return false;
   }
+
+  ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
+                 << " bytes " << tx_frame_asm << dendl;
   connection->outgoing_bl.append(bl);
   return true;
 }
@@ -778,11 +782,14 @@ CtPtr ProtocolV2::write(const std::string &desc,
                         F &frame) {
   ceph::bufferlist bl;
   try {
-    bl = frame.get_buffer(session_stream_handlers);
+    bl = frame.get_buffer(tx_frame_asm);
   } catch (ceph::crypto::onwire::TxHandlerError &e) {
     ldout(cct, 1) << __func__ << " " << e.what() << dendl;
     return _fault();
   }
+
+  ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
+                 << " bytes " << tx_frame_asm << dendl;
   return write(desc, next, bl);
 }
 
index a4c2b1f5b8fe368bcd66fff17602bcf6ffe7f8e9..3cd2577a89d08f960db7f961916a4cfa1ba16443 100644 (file)
@@ -98,6 +98,7 @@ private:
 
   boost::container::static_vector<ceph::msgr::v2::segment_t,
                                  ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_desc;
+  ceph::msgr::v2::FrameAssembler tx_frame_asm;
   ceph::msgr::v2::segment_bls_t rx_segments_data;
   ceph::msgr::v2::Tag next_tag;
   utime_t backoff;  // backoff time
diff --git a/src/msg/async/frames_v2.cc b/src/msg/async/frames_v2.cc
new file mode 100644 (file)
index 0000000..99b79e8
--- /dev/null
@@ -0,0 +1,169 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "frames_v2.h"
+
+#include <ostream>
+
+#undef FMT_HEADER_ONLY
+#define FMT_HEADER_ONLY 1
+#include "seastar/fmt/include/fmt/format.h"
+
+namespace ceph::msgr::v2 {
+
+// Discards trailing empty segments, unless there is just one segment.
+// A frame always has at least one (possibly empty) segment.
+static size_t calc_num_segments(const bufferlist segment_bls[],
+                                size_t segment_count) {
+  ceph_assert(segment_count > 0 && segment_count <= MAX_NUM_SEGMENTS);
+  for (size_t i = segment_count; i-- > 0; ) {
+    if (segment_bls[i].length() > 0) {
+      return i + 1;
+    }
+  }
+  return 1;
+}
+
+void FrameAssembler::fill_preamble(Tag tag,
+                                   preamble_block_t& preamble) const {
+  // FIPS zeroization audit 20191115: this memset is not security related.
+  ::memset(&preamble, 0, sizeof(preamble));
+
+  preamble.tag = static_cast<__u8>(tag);
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    preamble.segments[i].length = m_descs[i].logical_len;
+    preamble.segments[i].alignment = m_descs[i].align;
+  }
+  preamble.num_segments = m_descs.size();
+  preamble.crc = ceph_crc32c(
+      0, reinterpret_cast<const unsigned char*>(&preamble),
+      sizeof(preamble) - sizeof(preamble.crc));
+}
+
+uint64_t FrameAssembler::get_frame_logical_len() const {
+  ceph_assert(!m_descs.empty());
+  uint64_t logical_len = 0;
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    logical_len += m_descs[i].logical_len;
+  }
+  return logical_len;
+}
+
+uint64_t FrameAssembler::get_frame_onwire_len() const {
+  ceph_assert(!m_descs.empty());
+  uint64_t onwire_len = get_preamble_onwire_len();
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    onwire_len += get_segment_onwire_len(i);
+  }
+  onwire_len += get_epilogue_onwire_len();
+  return onwire_len;
+}
+
+bufferlist FrameAssembler::asm_crc_rev0(const preamble_block_t& preamble,
+                                        bufferlist segment_bls[]) const {
+  epilogue_plain_block_t epilogue;
+  // FIPS zeroization audit 20191115: this memset is not security related.
+  ::memset(&epilogue, 0, sizeof(epilogue));
+
+  bufferlist frame_bl(sizeof(preamble) + sizeof(epilogue));
+  frame_bl.append(reinterpret_cast<const char*>(&preamble), sizeof(preamble));
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    ceph_assert(segment_bls[i].length() == m_descs[i].logical_len);
+    epilogue.crc_values[i] = segment_bls[i].crc32c(-1);
+    if (segment_bls[i].length() > 0) {
+      frame_bl.claim_append(segment_bls[i]);
+    }
+  }
+  frame_bl.append(reinterpret_cast<const char*>(&epilogue), sizeof(epilogue));
+  return frame_bl;
+}
+
+bufferlist FrameAssembler::asm_secure_rev0(const preamble_block_t& preamble,
+                                           bufferlist segment_bls[]) const {
+  bufferlist preamble_bl(sizeof(preamble));
+  preamble_bl.append(reinterpret_cast<const char*>(&preamble),
+                     sizeof(preamble));
+
+  epilogue_secure_block_t epilogue;
+  // FIPS zeroization audit 20191115: this memset is not security related.
+  ::memset(&epilogue, 0, sizeof(epilogue));
+  bufferlist epilogue_bl(sizeof(epilogue));
+  epilogue_bl.append(reinterpret_cast<const char*>(&epilogue),
+                     sizeof(epilogue));
+
+  // preamble + MAX_NUM_SEGMENTS + epilogue
+  uint32_t onwire_lens[MAX_NUM_SEGMENTS + 2];
+  onwire_lens[0] = preamble_bl.length();
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    onwire_lens[i + 1] = segment_bls[i].length();  // already padded
+  }
+  onwire_lens[m_descs.size() + 1] = epilogue_bl.length();
+  m_crypto->tx->reset_tx_handler(onwire_lens,
+                                 onwire_lens + m_descs.size() + 2);
+  m_crypto->tx->authenticated_encrypt_update(preamble_bl);
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    if (segment_bls[i].length() > 0) {
+      m_crypto->tx->authenticated_encrypt_update(segment_bls[i]);
+    }
+  }
+  m_crypto->tx->authenticated_encrypt_update(epilogue_bl);
+  return m_crypto->tx->authenticated_encrypt_final();
+}
+
+bufferlist FrameAssembler::assemble_frame(Tag tag, bufferlist segment_bls[],
+                                          const uint16_t segment_aligns[],
+                                          size_t segment_count) {
+  m_descs.resize(calc_num_segments(segment_bls, segment_count));
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    m_descs[i].logical_len = segment_bls[i].length();
+    m_descs[i].align = segment_aligns[i];
+  }
+
+  preamble_block_t preamble;
+  fill_preamble(tag, preamble);
+
+  if (m_crypto->rx) {
+    for (size_t i = 0; i < m_descs.size(); i++) {
+      ceph_assert(segment_bls[i].length() == m_descs[i].logical_len);
+      // We're padding segments to biggest cipher's block size. Although
+      // AES-GCM can live without that as it's a stream cipher, we don't
+      // want to be fixed to stream ciphers only.
+      uint32_t padded_len = get_segment_padded_len(i);
+      if (padded_len > segment_bls[i].length()) {
+        uint32_t pad_len = padded_len - segment_bls[i].length();
+        segment_bls[i].reserve(pad_len);
+        segment_bls[i].append_zero(pad_len);
+      }
+    }
+    return asm_secure_rev0(preamble, segment_bls);
+  }
+  return asm_crc_rev0(preamble, segment_bls);
+}
+
+std::ostream& operator<<(std::ostream& os, const FrameAssembler& frame_asm) {
+  if (!frame_asm.m_descs.empty()) {
+    os << frame_asm.get_preamble_onwire_len();
+    for (size_t i = 0; i < frame_asm.m_descs.size(); i++) {
+      os << " + " << frame_asm.get_segment_onwire_len(i)
+         << " (logical " << frame_asm.m_descs[i].logical_len
+         << "/" << frame_asm.m_descs[i].align << ")";
+    }
+    os << " + " << frame_asm.get_epilogue_onwire_len() << " ";
+  }
+  os << "rx=" << frame_asm.m_crypto->rx.get()
+     << " tx=" << frame_asm.m_crypto->tx.get();
+  return os;
+}
+
+}  // namespace ceph::msgr::v2
index dd544a2a0a14ea21b2e58ac296e9274abe69f334..d509e46366b11e23d9691d69362525b2a66b7691 100644 (file)
@@ -5,6 +5,7 @@
 #include "common/Clock.h"
 #include "crypto_onwire.h"
 #include <array>
+#include <iosfwd>
 #include <utility>
 
 #include <boost/container/static_vector.hpp>
@@ -157,156 +158,106 @@ static uint32_t segment_onwire_size(const uint32_t logical_size)
   return p2roundup<uint32_t>(logical_size, CRYPTO_BLOCK_SIZE);
 }
 
-static ceph::bufferlist segment_onwire_bufferlist(ceph::bufferlist&& bl)
-{
-  const auto padding_size = segment_onwire_size(bl.length()) - bl.length();
-  if (padding_size) {
-    bl.append_zero(padding_size);
-  }
-  return bl;
-}
+struct FrameError : std::runtime_error {
+  using runtime_error::runtime_error;
+};
 
-template <class T, uint16_t... SegmentAlignmentVs>
-struct Frame {
-  static constexpr size_t SegmentsNumV = sizeof...(SegmentAlignmentVs);
-  static_assert(SegmentsNumV > 0 && SegmentsNumV <= MAX_NUM_SEGMENTS);
-protected:
-  std::array<ceph::bufferlist, SegmentsNumV> segments;
+class FrameAssembler {
+public:
+  // crypto must be non-null
+  FrameAssembler(const ceph::crypto::onwire::rxtx_t* crypto)
+      : m_crypto(crypto) {}
 
-private:
-  static constexpr std::array<uint16_t, SegmentsNumV> alignments {
-    SegmentAlignmentVs...
-  };
-  ceph::bufferlist::contiguous_filler preamble_filler;
-
-  __u8 calc_num_segments(
-    const std::array<segment_t, MAX_NUM_SEGMENTS>& segments)
-  {
-    for (__u8 num = SegmentsNumV; num > 0; num--) {
-      if (segments[num-1].length) {
-        return num;
-      }
-    }
-    // frame always has at least one segment.
-    return 1;
+  size_t get_num_segments() const {
+    ceph_assert(!m_descs.empty());
+    return m_descs.size();
   }
 
-  // craft the main preamble. It's always present regardless of the number
-  // of segments message is composed from.
-  void fill_preamble() {
-    ceph_assert(std::size(segments) <= MAX_NUM_SEGMENTS);
-
-    preamble_block_t main_preamble;
-    // FIPS zeroization audit 20191115: this memset is not security related.
-    ::memset(&main_preamble, 0, sizeof(main_preamble));
-
-    main_preamble.tag = static_cast<__u8>(T::tag);
-    ceph_assert(main_preamble.tag != 0);
-
-    // implementation detail: the first bufferlist of Frame::segments carries
-    // space for preamble. This glueing isn't a part of the onwire format but
-    // just our private detail.
-    main_preamble.segments.front().length =
-        segments.front().length() - FRAME_PREAMBLE_SIZE;
-    main_preamble.segments.front().alignment = alignments.front();
-
-    // there is no business in issuing frame without at least one segment
-    // filled.
-    if constexpr(SegmentsNumV > 1) {
-      for (__u8 idx = 1; idx < SegmentsNumV; idx++) {
-        main_preamble.segments[idx].length = segments[idx].length();
-        main_preamble.segments[idx].alignment = alignments[idx];
-      }
-    }
-    // calculate the number of non-empty segments.
-    // TODO: reorder segments to get DATA first
-    main_preamble.num_segments = calc_num_segments(main_preamble.segments);
+  uint32_t get_segment_logical_len(size_t seg_idx) const {
+    ceph_assert(seg_idx < m_descs.size());
+    return m_descs[seg_idx].logical_len;
+  }
 
-    main_preamble.crc =
-        ceph_crc32c(0, reinterpret_cast<unsigned char *>(&main_preamble),
-                    sizeof(main_preamble) - sizeof(main_preamble.crc));
+  uint16_t get_segment_align(size_t seg_idx) const {
+    ceph_assert(seg_idx < m_descs.size());
+    return m_descs[seg_idx].align;
+  }
 
-    preamble_filler.copy_in(sizeof(main_preamble),
-                            reinterpret_cast<const char *>(&main_preamble));
+  uint32_t get_preamble_onwire_len() const {
+    return sizeof(preamble_block_t);
   }
 
-  template <size_t... Is>
-  void reset_tx_handler(
-    ceph::crypto::onwire::rxtx_t &session_stream_handlers,
-    std::index_sequence<Is...>)
-  {
-    session_stream_handlers.tx->reset_tx_handler({ segments[Is].length()... });
+  uint32_t get_segment_onwire_len(size_t seg_idx) const {
+    ceph_assert(seg_idx < m_descs.size());
+    if (m_crypto->rx) {
+      return get_segment_padded_len(seg_idx);
+    }
+    return m_descs[seg_idx].logical_len;
   }
 
-public:
-  ceph::bufferlist get_buffer(
-    ceph::crypto::onwire::rxtx_t &session_stream_handlers)
-  {
-    fill_preamble();
-    if (session_stream_handlers.tx) {
-      // we're padding segments to biggest cipher's block size. Although
-      // AES-GCM can live without that as it's a stream cipher, we don't
-      // to be fixed to stream ciphers only.
-      for (auto& segment : segments) {
-        segment = segment_onwire_bufferlist(std::move(segment));
-      }
+  uint32_t get_epilogue_onwire_len() const {
+    ceph_assert(!m_descs.empty());
+    if (m_crypto->rx) {
+      return sizeof(epilogue_secure_block_t) + get_auth_tag_len();
+    }
+    return sizeof(epilogue_plain_block_t);
+  }
 
-      // let's cipher allocate one huge buffer for entire ciphertext.
-      reset_tx_handler(
-          session_stream_handlers, std::make_index_sequence<SegmentsNumV>());
+  uint64_t get_frame_logical_len() const;
+  uint64_t get_frame_onwire_len() const;
 
-      for (auto& segment : segments) {
-        if (segment.length()) {
-          session_stream_handlers.tx->authenticated_encrypt_update(
-            std::move(segment));
-        }
-      }
+  bufferlist assemble_frame(Tag tag, bufferlist segment_bls[],
+                            const uint16_t segment_aligns[],
+                            size_t segment_count);
 
-      // in secure mode we craft only the late_flags. Signature (for AES-GCM
-      // called auth tag) will be added by the cipher.
-      {
-        epilogue_secure_block_t epilogue;
-        // FIPS zeroization audit 20191115: this memset is not security
-        // related.
-        ::memset(&epilogue, 0, sizeof(epilogue));
-        ceph::bufferlist epilogue_bl;
-        epilogue_bl.append(reinterpret_cast<const char*>(&epilogue),
-                           sizeof(epilogue));
-        session_stream_handlers.tx->authenticated_encrypt_update(epilogue_bl);
-      }
-      return session_stream_handlers.tx->authenticated_encrypt_final();
-    } else {
-      // plain mode
-      epilogue_plain_block_t epilogue;
-      // FIPS zeroization audit 20191115: this memset is not security related.
-      ::memset(&epilogue, 0, sizeof(epilogue));
-
-      ceph::bufferlist::const_iterator hdriter(&segments.front(),
-                                               FRAME_PREAMBLE_SIZE);
-      epilogue.crc_values[SegmentIndex::Control::PAYLOAD] =
-          hdriter.crc32c(hdriter.get_remaining(), -1);
-      if constexpr(SegmentsNumV > 1) {
-        for (__u8 idx = 1; idx < SegmentsNumV; idx++) {
-          epilogue.crc_values[idx] = segments[idx].crc32c(-1);
-        }
-      }
+private:
+  struct segment_desc_t {
+    uint32_t logical_len;
+    uint16_t align;
+  };
 
-      ceph::bufferlist ret;
-      for (auto& segment : segments) {
-        ret.claim_append(segment);
-      }
-      ret.append(reinterpret_cast<const char*>(&epilogue), sizeof(epilogue));
-      return ret;
-    }
+  uint32_t get_segment_padded_len(size_t seg_idx) const {
+    return p2roundup<uint32_t>(m_descs[seg_idx].logical_len,
+                               CRYPTO_BLOCK_SIZE);
   }
 
-  Frame()
-    : preamble_filler(segments.front().append_hole(FRAME_PREAMBLE_SIZE)) {
+  uint32_t get_auth_tag_len() const {
+    return m_crypto->rx->get_extra_size_at_final();
   }
 
-public:
+  bufferlist asm_crc_rev0(const preamble_block_t& preamble,
+                          bufferlist segment_bls[]) const;
+  bufferlist asm_secure_rev0(const preamble_block_t& preamble,
+                             bufferlist segment_bls[]) const;
+
+  void fill_preamble(Tag tag, preamble_block_t& preamble) const;
+  friend std::ostream& operator<<(std::ostream& os,
+                                  const FrameAssembler& frame_asm);
+
+  boost::container::static_vector<segment_desc_t, MAX_NUM_SEGMENTS> m_descs;
+  const ceph::crypto::onwire::rxtx_t* m_crypto;
 };
 
+template <class T, uint16_t... SegmentAlignmentVs>
+struct Frame {
+  static constexpr size_t SegmentsNumV = sizeof...(SegmentAlignmentVs);
+  static_assert(SegmentsNumV > 0 && SegmentsNumV <= MAX_NUM_SEGMENTS);
+protected:
+  std::array<ceph::bufferlist, SegmentsNumV> segments;
+
+private:
+  static constexpr std::array<uint16_t, SegmentsNumV> alignments {
+    SegmentAlignmentVs...
+  };
+
+public:
+  ceph::bufferlist get_buffer(FrameAssembler& tx_frame_asm) {
+    auto bl = tx_frame_asm.assemble_frame(T::tag, segments.data(),
+                                          alignments.data(), SegmentsNumV);
+    ceph_assert(bl.length() == tx_frame_asm.get_frame_onwire_len());
+    return bl;
+  }
+};
 
 // ControlFrames are used to manage transceiver state (like connections) and
 // orchestrate transfers of MessageFrames. They use only single segment with