]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/frames_v2: implement msgr2.1 wire format
authorIlya Dryomov <idryomov@gmail.com>
Thu, 7 May 2020 09:32:52 +0000 (11:32 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Wed, 17 Jun 2020 19:56:41 +0000 (21:56 +0200)
Implement msgr2.1-crc and msgr2.1-secure modes.

Issues with existing msgr2.0-crc and msgr2.0-secure modes and
their resolution will be described in doc/dev/msgr2.rst.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
src/crimson/net/ProtocolV2.cc
src/msg/async/ProtocolV2.cc
src/msg/async/frames_v2.cc
src/msg/async/frames_v2.h

index 26c2f6c40993bbb050bcf7f20f54b02dd15b9e36..6cf0d69238bb650c698e82c3fa1b20bb09044d12 100644 (file)
@@ -149,7 +149,7 @@ ProtocolV2::ProtocolV2(Dispatcher& dispatcher,
   : Protocol(proto_t::v2, dispatcher, conn),
     messenger{messenger},
     protocol_timer{conn},
-    tx_frame_asm(&session_stream_handlers)
+    tx_frame_asm(&session_stream_handlers, false)
 {}
 
 ProtocolV2::~ProtocolV2() {}
index 8713add919724ae545ab494ddcd9e426a0460806..c4e66d7bcd0736d78e99253dd2cb185365984518 100644 (file)
@@ -92,8 +92,8 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection)
       replacing(false),
       can_write(false),
       bannerExchangeCallback(nullptr),
-      tx_frame_asm(&session_stream_handlers),
-      rx_frame_asm(&session_stream_handlers),
+      tx_frame_asm(&session_stream_handlers, false),
+      rx_frame_asm(&session_stream_handlers, false),
       next_tag(static_cast<Tag>(0)),
       keepalive(false) {
 }
@@ -1290,8 +1290,9 @@ CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r)
 
   bool aborted;
   try {
-    aborted = !rx_frame_asm.disassemble_segments(rx_segments_data.data(),
-                                                 rx_epilogue);
+    rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]);
+    aborted = !rx_frame_asm.disassemble_remaining_segments(
+        rx_segments_data.data(), rx_epilogue);
   } catch (FrameError& e) {
     ldout(cct, 1) << __func__ << " " << e.what() << dendl;
     return _fault();
index 7711cc8d292c3aa3c9b57ad7d9985ab3b1950e06..8fdded42d87714cdc33c8cc6811574167b271849 100644 (file)
@@ -50,6 +50,17 @@ static void check_segment_crc(const bufferlist& segment_bl,
   }
 }
 
+// Returns true if the frame is ready for dispatching, or false if
+// it was aborted by the sender and must be dropped.
+static bool check_epilogue_late_status(__u8 late_status) {
+  __u8 aborted = late_status & FRAME_LATE_STATUS_ABORTED_MASK;
+  if (aborted != FRAME_LATE_STATUS_ABORTED &&
+      aborted != FRAME_LATE_STATUS_COMPLETE) {
+    throw FrameError(fmt::format("bad late_status"));
+  }
+  return aborted == FRAME_LATE_STATUS_COMPLETE;
+}
+
 void FrameAssembler::fill_preamble(Tag tag,
                                    preamble_block_t& preamble) const {
   // FIPS zeroization audit 20191115: this memset is not security related.
@@ -136,6 +147,98 @@ bufferlist FrameAssembler::asm_secure_rev0(const preamble_block_t& preamble,
   return m_crypto->tx->authenticated_encrypt_final();
 }
 
+bufferlist FrameAssembler::asm_crc_rev1(const preamble_block_t& preamble,
+                                        bufferlist segment_bls[]) const {
+  epilogue_crc_rev1_block_t epilogue;
+  // FIPS zeroization audit 20191115: this memset is not security related.
+  ::memset(&epilogue, 0, sizeof(epilogue));
+  epilogue.late_status |= FRAME_LATE_STATUS_COMPLETE;
+
+  bufferlist frame_bl(sizeof(preamble) + FRAME_CRC_SIZE + sizeof(epilogue));
+  frame_bl.append(reinterpret_cast<const char*>(&preamble), sizeof(preamble));
+
+  ceph_assert(segment_bls[0].length() == m_descs[0].logical_len);
+  if (segment_bls[0].length() > 0) {
+    uint32_t crc = segment_bls[0].crc32c(-1);
+    frame_bl.claim_append(segment_bls[0]);
+    encode(crc, frame_bl);
+  }
+  if (m_descs.size() == 1) {
+    return frame_bl;  // no epilogue if only one segment
+  }
+
+  for (size_t i = 1; i < m_descs.size(); i++) {
+    ceph_assert(segment_bls[i].length() == m_descs[i].logical_len);
+    epilogue.crc_values[i - 1] = segment_bls[i].crc32c(-1);
+    if (segment_bls[i].length() > 0) {
+      frame_bl.claim_append(segment_bls[i]);
+    }
+  }
+  frame_bl.append(reinterpret_cast<const char*>(&epilogue), sizeof(epilogue));
+  return frame_bl;
+}
+
+bufferlist FrameAssembler::asm_secure_rev1(const preamble_block_t& preamble,
+                                           bufferlist segment_bls[]) const {
+  bufferlist preamble_bl;
+  if (segment_bls[0].length() > FRAME_PREAMBLE_INLINE_SIZE) {
+    // first segment is partially inlined, inline buffer is full
+    preamble_bl.reserve(sizeof(preamble));
+    preamble_bl.append(reinterpret_cast<const char*>(&preamble),
+                       sizeof(preamble));
+    segment_bls[0].splice(0, FRAME_PREAMBLE_INLINE_SIZE, &preamble_bl);
+  } else {
+    // first segment is fully inlined, inline buffer may need padding
+    uint32_t pad_len = FRAME_PREAMBLE_INLINE_SIZE - segment_bls[0].length();
+    preamble_bl.reserve(sizeof(preamble) + pad_len);
+    preamble_bl.append(reinterpret_cast<const char*>(&preamble),
+                       sizeof(preamble));
+    preamble_bl.claim_append(segment_bls[0]);
+    if (pad_len > 0) {
+      preamble_bl.append_zero(pad_len);
+    }
+  }
+
+  m_crypto->tx->reset_tx_handler({preamble_bl.length()});
+  m_crypto->tx->authenticated_encrypt_update(preamble_bl);
+  auto frame_bl = m_crypto->tx->authenticated_encrypt_final();
+
+  if (segment_bls[0].length() > 0) {
+    m_crypto->tx->reset_tx_handler({segment_bls[0].length()});
+    m_crypto->tx->authenticated_encrypt_update(segment_bls[0]);
+    auto tmp = m_crypto->tx->authenticated_encrypt_final();
+    frame_bl.claim_append(tmp);
+  }
+  if (m_descs.size() == 1) {
+    return frame_bl;  // no epilogue if only one segment
+  }
+
+  epilogue_secure_rev1_block_t epilogue;
+  // FIPS zeroization audit 20191115: this memset is not security related.
+  ::memset(&epilogue, 0, sizeof(epilogue));
+  epilogue.late_status |= FRAME_LATE_STATUS_COMPLETE;
+  bufferlist epilogue_bl(sizeof(epilogue));
+  epilogue_bl.append(reinterpret_cast<const char*>(&epilogue),
+                     sizeof(epilogue));
+
+  // MAX_NUM_SEGMENTS - 1 + epilogue
+  uint32_t onwire_lens[MAX_NUM_SEGMENTS];
+  for (size_t i = 1; i < m_descs.size(); i++) {
+    onwire_lens[i - 1] = segment_bls[i].length();  // already padded
+  }
+  onwire_lens[m_descs.size() - 1] = epilogue_bl.length();
+  m_crypto->tx->reset_tx_handler(onwire_lens, onwire_lens + m_descs.size());
+  for (size_t i = 1; i < m_descs.size(); i++) {
+    if (segment_bls[i].length() > 0) {
+      m_crypto->tx->authenticated_encrypt_update(segment_bls[i]);
+    }
+  }
+  m_crypto->tx->authenticated_encrypt_update(epilogue_bl);
+  auto tmp = m_crypto->tx->authenticated_encrypt_final();
+  frame_bl.claim_append(tmp);
+  return frame_bl;
+}
+
 bufferlist FrameAssembler::assemble_frame(Tag tag, bufferlist segment_bls[],
                                           const uint16_t segment_aligns[],
                                           size_t segment_count) {
@@ -161,16 +264,30 @@ bufferlist FrameAssembler::assemble_frame(Tag tag, bufferlist segment_bls[],
         segment_bls[i].append_zero(pad_len);
       }
     }
+    if (m_is_rev1) {
+      return asm_secure_rev1(preamble, segment_bls);
+    }
     return asm_secure_rev0(preamble, segment_bls);
   }
+  if (m_is_rev1) {
+    return asm_crc_rev1(preamble, segment_bls);
+  }
   return asm_crc_rev0(preamble, segment_bls);
 }
 
 Tag FrameAssembler::disassemble_preamble(bufferlist& preamble_bl) {
-  ceph_assert(preamble_bl.length() == sizeof(preamble_block_t));
   if (m_crypto->rx) {
     m_crypto->rx->reset_rx_handler();
-    m_crypto->rx->authenticated_decrypt_update(preamble_bl);
+    if (m_is_rev1) {
+      ceph_assert(preamble_bl.length() == FRAME_PREAMBLE_WITH_INLINE_SIZE +
+                                          get_auth_tag_len());
+      m_crypto->rx->authenticated_decrypt_update_final(preamble_bl);
+    } else {
+      ceph_assert(preamble_bl.length() == sizeof(preamble_block_t));
+      m_crypto->rx->authenticated_decrypt_update(preamble_bl);
+    }
+  } else {
+    ceph_assert(preamble_bl.length() == sizeof(preamble_block_t));
   }
 
   // I expect ceph_le32 will make the endian conversion for me. Passing
@@ -236,9 +353,106 @@ bool FrameAssembler::disasm_all_secure_rev0(bufferlist segment_bls[],
   return !(epilogue->late_flags & FRAME_LATE_FLAG_ABORTED);
 }
 
-bool FrameAssembler::disassemble_segments(bufferlist segment_bls[],
-                                          bufferlist& epilogue_bl) const {
+void FrameAssembler::disasm_first_crc_rev1(bufferlist& preamble_bl,
+                                           bufferlist& segment_bl) const {
+  ceph_assert(preamble_bl.length() == sizeof(preamble_block_t));
+  if (m_descs[0].logical_len > 0) {
+    ceph_assert(segment_bl.length() == m_descs[0].logical_len +
+                                       FRAME_CRC_SIZE);
+    bufferlist::const_iterator it(&segment_bl, m_descs[0].logical_len);
+    uint32_t expected_crc;
+    decode(expected_crc, it);
+    segment_bl.splice(m_descs[0].logical_len, FRAME_CRC_SIZE);
+    check_segment_crc(segment_bl, expected_crc);
+  } else {
+    ceph_assert(segment_bl.length() == 0);
+  }
+}
+
+bool FrameAssembler::disasm_remaining_crc_rev1(bufferlist segment_bls[],
+                                               bufferlist& epilogue_bl) const {
+  ceph_assert(epilogue_bl.length() == sizeof(epilogue_crc_rev1_block_t));
+  auto epilogue = reinterpret_cast<const epilogue_crc_rev1_block_t*>(
+      epilogue_bl.c_str());
+
+  for (size_t i = 1; i < m_descs.size(); i++) {
+    ceph_assert(segment_bls[i].length() == m_descs[i].logical_len);
+    check_segment_crc(segment_bls[i], epilogue->crc_values[i - 1]);
+  }
+  return check_epilogue_late_status(epilogue->late_status);
+}
+
+void FrameAssembler::disasm_first_secure_rev1(bufferlist& preamble_bl,
+                                              bufferlist& segment_bl) const {
+  ceph_assert(preamble_bl.length() == FRAME_PREAMBLE_WITH_INLINE_SIZE);
+  uint32_t padded_len = get_segment_padded_len(0);
+  if (padded_len > FRAME_PREAMBLE_INLINE_SIZE) {
+    ceph_assert(segment_bl.length() == padded_len + get_auth_tag_len() -
+                                       FRAME_PREAMBLE_INLINE_SIZE);
+    m_crypto->rx->reset_rx_handler();
+    m_crypto->rx->authenticated_decrypt_update_final(segment_bl);
+    // prepend the inline buffer (already decrypted) to segment_bl
+    bufferlist tmp;
+    segment_bl.swap(tmp);
+    preamble_bl.splice(sizeof(preamble_block_t), FRAME_PREAMBLE_INLINE_SIZE,
+                       &segment_bl);
+    segment_bl.claim_append(tmp);
+  } else {
+    ceph_assert(segment_bl.length() == 0);
+    preamble_bl.splice(sizeof(preamble_block_t), FRAME_PREAMBLE_INLINE_SIZE,
+                       &segment_bl);
+  }
+  unpad_zero(segment_bl, m_descs[0].logical_len);
+  ceph_assert(segment_bl.length() == m_descs[0].logical_len);
+}
+
+bool FrameAssembler::disasm_remaining_secure_rev1(
+    bufferlist segment_bls[], bufferlist& epilogue_bl) const {
+  m_crypto->rx->reset_rx_handler();
+  for (size_t i = 1; i < m_descs.size(); i++) {
+    ceph_assert(segment_bls[i].length() == get_segment_padded_len(i));
+    if (segment_bls[i].length() > 0) {
+      m_crypto->rx->authenticated_decrypt_update(segment_bls[i]);
+      unpad_zero(segment_bls[i], m_descs[i].logical_len);
+    }
+  }
+
+  ceph_assert(epilogue_bl.length() == sizeof(epilogue_secure_rev1_block_t) +
+                                      get_auth_tag_len());
+  m_crypto->rx->authenticated_decrypt_update_final(epilogue_bl);
+  auto epilogue = reinterpret_cast<const epilogue_secure_rev1_block_t*>(
+      epilogue_bl.c_str());
+  return check_epilogue_late_status(epilogue->late_status);
+}
+
+void FrameAssembler::disassemble_first_segment(bufferlist& preamble_bl,
+                                               bufferlist& segment_bl) const {
+  ceph_assert(!m_descs.empty());
+  if (m_is_rev1) {
+    if (m_crypto->rx) {
+      disasm_first_secure_rev1(preamble_bl, segment_bl);
+    } else {
+      disasm_first_crc_rev1(preamble_bl, segment_bl);
+    }
+  } else {
+    // noop, everything is handled in disassemble_remaining_segments()
+  }
+}
+
+bool FrameAssembler::disassemble_remaining_segments(
+    bufferlist segment_bls[], bufferlist& epilogue_bl) const {
   ceph_assert(!m_descs.empty());
+  if (m_is_rev1) {
+    if (m_descs.size() == 1) {
+      // no epilogue if only one segment
+      ceph_assert(epilogue_bl.length() == 0);
+      return true;
+    }
+    if (m_crypto->rx) {
+      return disasm_remaining_secure_rev1(segment_bls, epilogue_bl);
+    }
+    return disasm_remaining_crc_rev1(segment_bls, epilogue_bl);
+  }
   if (m_crypto->rx) {
     return disasm_all_secure_rev0(segment_bls, epilogue_bl);
   }
@@ -255,7 +469,8 @@ std::ostream& operator<<(std::ostream& os, const FrameAssembler& frame_asm) {
     }
     os << " + " << frame_asm.get_epilogue_onwire_len() << " ";
   }
-  os << "rx=" << frame_asm.m_crypto->rx.get()
+  os << "rev1=" << frame_asm.m_is_rev1
+     << " rx=" << frame_asm.m_crypto->rx.get()
      << " tx=" << frame_asm.m_crypto->tx.get();
   return os;
 }
index f7eea559a2a6ae8005be00cfed3b029162068a33..0003b273096ea2a441ff3f647d9ab54ae2244130 100644 (file)
@@ -110,45 +110,65 @@ struct preamble_block_t {
 static_assert(sizeof(preamble_block_t) % CRYPTO_BLOCK_SIZE == 0);
 static_assert(std::is_standard_layout<preamble_block_t>::value);
 
-// Each Frame has an epilogue for integrity or authenticity validation.
-// For plain mode it's quite straightforward - the structure stores up
-// to MAX_NUM_SEGMENTS crc32 checksums, one per each segment.
-// For secure mode things become very different. The fundamental thing
-// is that epilogue format is **an implementation detail of particular
-// cipher**. ProtocolV2 only knows:
-//   * where the data is placed (always at the end of ciphertext),
-//   * how long it is. RxHandler provides get_extra_size_at_final() but
-//     ProtocolV2 has NO WAY to alter this.
-//
-// The intention behind the contract is to provide flexibility of cipher
-// selection. Currently AES in GCM mode is used and epilogue conveys its
-// *auth tag* (following OpenSSL's terminology). However, it would be OK
-// to switch to e.g. AES128-CBC + HMAC-SHA512 without affecting protocol
-// (expect the cipher negotiation, of course).
-//
-// In addition to integrity/authenticity data each variant of epilogue
-// conveys late_flags. The initial user of this field will be the late
-// frame abortion facility.
 struct epilogue_crc_rev0_block_t {
   __u8 late_flags;  // FRAME_LATE_FLAG_ABORTED
   ceph_le32 crc_values[MAX_NUM_SEGMENTS];
 } __attribute__((packed));
 static_assert(std::is_standard_layout_v<epilogue_crc_rev0_block_t>);
 
+struct epilogue_crc_rev1_block_t {
+  __u8 late_status;  // FRAME_LATE_STATUS_*
+  ceph_le32 crc_values[MAX_NUM_SEGMENTS - 1];
+} __attribute__((packed));
+static_assert(std::is_standard_layout_v<epilogue_crc_rev1_block_t>);
+
 struct epilogue_secure_rev0_block_t {
   __u8 late_flags;  // FRAME_LATE_FLAG_ABORTED
   __u8 padding[CRYPTO_BLOCK_SIZE - sizeof(late_flags)];
-
-  __u8 ciphers_private_data[];
 } __attribute__((packed));
 static_assert(sizeof(epilogue_secure_rev0_block_t) % CRYPTO_BLOCK_SIZE == 0);
 static_assert(std::is_standard_layout_v<epilogue_secure_rev0_block_t>);
 
+// epilogue_secure_rev0_block_t with late_flags changed to late_status
+struct epilogue_secure_rev1_block_t {
+  __u8 late_status;  // FRAME_LATE_STATUS_*
+  __u8 padding[CRYPTO_BLOCK_SIZE - sizeof(late_status)];
+} __attribute__((packed));
+static_assert(sizeof(epilogue_secure_rev1_block_t) % CRYPTO_BLOCK_SIZE == 0);
+static_assert(std::is_standard_layout_v<epilogue_secure_rev1_block_t>);
+
+static constexpr uint32_t FRAME_CRC_SIZE = 4;
+static constexpr uint32_t FRAME_PREAMBLE_INLINE_SIZE = 48;
+static_assert(FRAME_PREAMBLE_INLINE_SIZE % CRYPTO_BLOCK_SIZE == 0);
+// just for performance, nothing should break otherwise
+static_assert(sizeof(ceph_msg_header2) <= FRAME_PREAMBLE_INLINE_SIZE);
+static constexpr uint32_t FRAME_PREAMBLE_WITH_INLINE_SIZE =
+    sizeof(preamble_block_t) + FRAME_PREAMBLE_INLINE_SIZE;
+
 // A frame can be aborted by the sender after transmitting the
 // preamble and the first segment.  The remainder of the frame
 // is filled with zeros, up until the epilogue.
+//
+// This flag is for msgr2.0.  Note that in crc mode, late_flags
+// is not covered by any crc -- a single bit flip can result in
+// a completed frame being dropped or in an aborted frame with
+// garbage segment payloads being dispatched.
 #define FRAME_LATE_FLAG_ABORTED           (1<<0)
 
+// For msgr2.1, FRAME_LATE_STATUS_ABORTED has the same meaning
+// as FRAME_LATE_FLAG_ABORTED and late_status replaces late_flags.
+// Bit error detection in crc mode is achieved by using a 4-bit
+// nibble per flag with two code words that are far apart in terms
+// of Hamming Distance (HD=4, same as provided by CRC32-C for
+// input lengths over ~5K).
+#define FRAME_LATE_STATUS_ABORTED         0x1
+#define FRAME_LATE_STATUS_COMPLETE        0xe
+#define FRAME_LATE_STATUS_ABORTED_MASK    0xf
+
+#define FRAME_LATE_STATUS_RESERVED_TRUE   0x10
+#define FRAME_LATE_STATUS_RESERVED_FALSE  0xe0
+#define FRAME_LATE_STATUS_RESERVED_MASK   0xf0
+
 struct FrameError : std::runtime_error {
   using runtime_error::runtime_error;
 };
@@ -156,8 +176,8 @@ struct FrameError : std::runtime_error {
 class FrameAssembler {
 public:
   // crypto must be non-null
-  FrameAssembler(const ceph::crypto::onwire::rxtx_t* crypto)
-      : m_crypto(crypto) {}
+  FrameAssembler(const ceph::crypto::onwire::rxtx_t* crypto, bool is_rev1)
+      : m_crypto(crypto), m_is_rev1(is_rev1) {}
 
   size_t get_num_segments() const {
     ceph_assert(!m_descs.empty());
@@ -174,24 +194,91 @@ public:
     return m_descs[seg_idx].align;
   }
 
+  // Preamble:
+  //
+  //   preamble_block_t
+  //   [preamble inline buffer + auth tag -- only in msgr2.1 secure mode]
+  //
+  // The preamble is generated unconditionally.
+  //
+  // In msgr2.1 secure mode, the first segment is inlined into the
+  // preamble inline buffer, either fully or partially.
   uint32_t get_preamble_onwire_len() const {
+    if (m_is_rev1 && m_crypto->rx) {
+      return FRAME_PREAMBLE_WITH_INLINE_SIZE + get_auth_tag_len();
+    }
     return sizeof(preamble_block_t);
   }
 
+  // Segment:
+  //
+  //   segment payload
+  //   [zero padding -- only in secure mode]
+  //   [crc or auth tag -- only in msgr2.1, only for the first segment]
+  //
+  // For an empty segment, nothing is generated.  In msgr2.1 secure
+  // mode, if the first segment gets fully inlined into the preamble
+  // inline buffer, it is considered empty.
   uint32_t get_segment_onwire_len(size_t seg_idx) const {
     ceph_assert(seg_idx < m_descs.size());
     if (m_crypto->rx) {
-      return get_segment_padded_len(seg_idx);
+      uint32_t padded_len = get_segment_padded_len(seg_idx);
+      if (m_is_rev1 && seg_idx == 0) {
+        if (padded_len > FRAME_PREAMBLE_INLINE_SIZE) {
+          return padded_len + get_auth_tag_len() - FRAME_PREAMBLE_INLINE_SIZE;
+        }
+        return 0;
+      }
+      return padded_len;
+    }
+    if (m_is_rev1 && seg_idx == 0 && m_descs[0].logical_len > 0) {
+      return m_descs[0].logical_len + FRAME_CRC_SIZE;
     }
     return m_descs[seg_idx].logical_len;
   }
 
+  // Epilogue:
+  //
+  //   epilogue_*_block_t
+  //   [auth tag -- only in secure mode]
+  //
+  // For msgr2.0, the epilogue is generated unconditionally.  In
+  // crc mode, it stores crcs for all segments; the preamble is
+  // covered by its own crc.  In secure mode, the epilogue auth tag
+  // covers the whole frame.
+  //
+  // For msgr2.1, the epilogue is generated only if the frame has
+  // more than one segment (i.e. at least one of second to fourth
+  // segments is not empty).  In crc mode, it stores crcs for
+  // second to fourh segments; the preamble and the first segment
+  // are covered by their own crcs.  In secure mode, the epilogue
+  // auth tag covers second to fourth segments; the preamble and the
+  // first segment (if not fully inlined into the preamble inline
+  // buffer) are covered by their own auth tags.
+  //
+  // Note that the auth tag format is an implementation detail of a
+  // particular cipher.  FrameAssembler is concerned only with where
+  // the auth tag is placed (at the end of the ciphertext) and how
+  // long it is (RxHandler::get_extra_size_at_final()).  This is to
+  // provide room for other encryption algorithms: currently we use
+  // AES-128-GCM with 16-byte tags, but it is possible to switch to
+  // e.g. AES-128-CBC + HMAC-SHA512 without affecting the protocol
+  // (except for the cipher negotiation, of course).
+  //
+  // Additionally, each variant of the epilogue contains either
+  // late_flags or late_status field that directs handling of frames
+  // with more than one segment.
   uint32_t get_epilogue_onwire_len() const {
     ceph_assert(!m_descs.empty());
+    if (m_is_rev1 && m_descs.size() == 1) {
+      return 0;
+    }
     if (m_crypto->rx) {
-      return sizeof(epilogue_secure_rev0_block_t) + get_auth_tag_len();
+      return (m_is_rev1 ? sizeof(epilogue_secure_rev1_block_t) :
+                  sizeof(epilogue_secure_rev0_block_t)) + get_auth_tag_len();
     }
-    return sizeof(epilogue_crc_rev0_block_t);
+    return m_is_rev1 ? sizeof(epilogue_crc_rev1_block_t) :
+                       sizeof(epilogue_crc_rev0_block_t);
   }
 
   uint64_t get_frame_logical_len() const;
@@ -203,8 +290,36 @@ public:
 
   Tag disassemble_preamble(bufferlist& preamble_bl);
 
-  bool disassemble_segments(bufferlist segment_bls[],
-                            bufferlist& epilogue_bl) const;
+  // Like msgr1, and unlike msgr2.0, msgr2.1 allows interpreting the
+  // first segment before reading in the rest of the frame.
+  //
+  // For msgr2.1 (set_is_rev1(true)), you may:
+  //
+  // - read in the first segment
+  // - call disassemble_first_segment()
+  // - use the contents of the first segment, for example to
+  //   look up user-provided buffers based on ceph_msg_header2::tid
+  // - read in the remaining segments, possibly directly into
+  //   user-provided buffers
+  // - read in epilogue
+  // - call disassemble_remaining_segments()
+  //
+  // For msgr2.0 (set_is_rev1(false)), disassemble_first_segment() is
+  // a noop.  To accomodate, disassemble_remaining_segments() always
+  // takes all segments and skips over the first segment in msgr2.1
+  // case.  You must:
+  //
+  // - read in all segments
+  // - read in epilogue
+  // - call disassemble_remaining_segments()
+  //
+  // disassemble_remaining_segments() returns true if the frame is
+  // ready for dispatching, or false if it was aborted by the sender
+  // and must be dropped.
+  void disassemble_first_segment(bufferlist& preamble_bl,
+                                 bufferlist& segment_bl) const;
+  bool disassemble_remaining_segments(bufferlist segment_bls[],
+                                      bufferlist& epilogue_bl) const;
 
 private:
   struct segment_desc_t {
@@ -225,11 +340,23 @@ private:
                           bufferlist segment_bls[]) const;
   bufferlist asm_secure_rev0(const preamble_block_t& preamble,
                              bufferlist segment_bls[]) const;
+  bufferlist asm_crc_rev1(const preamble_block_t& preamble,
+                          bufferlist segment_bls[]) const;
+  bufferlist asm_secure_rev1(const preamble_block_t& preamble,
+                             bufferlist segment_bls[]) const;
 
   bool disasm_all_crc_rev0(bufferlist segment_bls[],
                            bufferlist& epilogue_bl) const;
   bool disasm_all_secure_rev0(bufferlist segment_bls[],
                               bufferlist& epilogue_bl) const;
+  void disasm_first_crc_rev1(bufferlist& preamble_bl,
+                             bufferlist& segment_bl) const;
+  bool disasm_remaining_crc_rev1(bufferlist segment_bls[],
+                                 bufferlist& epilogue_bl) const;
+  void disasm_first_secure_rev1(bufferlist& preamble_bl,
+                                bufferlist& segment_bl) const;
+  bool disasm_remaining_secure_rev1(bufferlist segment_bls[],
+                                    bufferlist& epilogue_bl) const;
 
   void fill_preamble(Tag tag, preamble_block_t& preamble) const;
   friend std::ostream& operator<<(std::ostream& os,
@@ -237,6 +364,7 @@ private:
 
   boost::container::static_vector<segment_desc_t, MAX_NUM_SEGMENTS> m_descs;
   const ceph::crypto::onwire::rxtx_t* m_crypto;
+  bool m_is_rev1;  // msgr2.1?
 };
 
 template <class T, uint16_t... SegmentAlignmentVs>