]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/frames_v2: introduce FrameAssembler
authorIlya Dryomov <idryomov@gmail.com>
Fri, 24 Apr 2020 16:01:46 +0000 (18:01 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Sun, 14 Jun 2020 11:56:06 +0000 (11:56 +0000)
Start separating frame assembly and disassembly code from
frame sending, receiving and handling code, so that assembly
and disassembly pieces can be unit tested and hopefully also
shared between different messengers (e.g. crimson).

This commit factors out the assembly code from Frame.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
src/crimson/CMakeLists.txt
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/msg/CMakeLists.txt
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h
src/msg/async/frames_v2.cc [new file with mode: 0644]
src/msg/async/frames_v2.h

index 61a45391cd3594111167d31323639808cf467a7c..424378ba62cf81a209ec868b98f9979dd9e9c503 100644 (file)
@@ -157,6 +157,7 @@ set(crimson_mon_srcs
   mon/MonClient.cc
   ${PROJECT_SOURCE_DIR}/src/mon/MonSub.cc)
 set(crimson_net_srcs
+  ${PROJECT_SOURCE_DIR}/src/msg/async/frames_v2.cc
   net/Errors.cc
   net/Messenger.cc
   net/SocketConnection.cc
index 3221d648b360b7f3c6432b65c71ed7d24f9ddd79..3e48417e9cc526c59efdddd9dd9387cc9e23a0a9 100644 (file)
@@ -148,7 +148,8 @@ ProtocolV2::ProtocolV2(Dispatcher& dispatcher,
                        SocketMessenger& messenger)
   : Protocol(proto_t::v2, dispatcher, conn),
     messenger{messenger},
-    protocol_timer{conn}
+    protocol_timer{conn},
+    tx_frame_asm(&session_stream_handlers)
 {}
 
 ProtocolV2::~ProtocolV2() {}
@@ -386,7 +387,7 @@ seastar::future<> ProtocolV2::read_frame_payload()
 template <class F>
 seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
 {
-  auto bl = frame.get_buffer(session_stream_handlers);
+  auto bl = frame.get_buffer(tx_frame_asm);
   const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
   logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
                  conn, bl.length(), (int)main_preamble->tag,
@@ -1850,19 +1851,19 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
 
   if (unlikely(require_keepalive)) {
     auto keepalive_frame = KeepAliveFrame::Encode();
-    bl.append(keepalive_frame.get_buffer(session_stream_handlers));
+    bl.append(keepalive_frame.get_buffer(tx_frame_asm));
     INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
   }
 
   if (unlikely(_keepalive_ack.has_value())) {
     auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
-    bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
+    bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm));
     INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
   }
 
   if (require_ack && !num_msgs) {
     auto ack_frame = AckFrame::Encode(conn.in_seq);
-    bl.append(ack_frame.get_buffer(session_stream_handlers));
+    bl.append(ack_frame.get_buffer(tx_frame_asm));
     INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
   }
 
@@ -1891,7 +1892,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
         msg->get_payload(), msg->get_middle(), msg->get_data());
     logger().debug("{} --> #{} === {} ({})",
                   conn, msg->get_seq(), *msg, msg->get_type());
-    bl.append(message.get_buffer(session_stream_handlers));
+    bl.append(message.get_buffer(tx_frame_asm));
     INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
   });
 
index bd9f1a425bc8681d48b7e7b58777cc4fbdd04491..e007c9b40994f4db8fed5ec13277a60c3e651eb8 100644 (file)
@@ -123,6 +123,7 @@ class ProtocolV2 final : public Protocol {
   ceph::crypto::onwire::rxtx_t session_stream_handlers;
   boost::container::static_vector<ceph::msgr::v2::segment_t,
                                  ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_desc;
+  ceph::msgr::v2::FrameAssembler tx_frame_asm;
   ceph::msgr::v2::segment_bls_t rx_segments_data;
 
   size_t get_current_msg_size() const;
index 0c8359657958497ba8b88362e06788242406a25a..fada39b457f4f80a0ca4cca4ab0c3f4eff98a4de 100644 (file)
@@ -17,6 +17,7 @@ list(APPEND msg_srcs
   async/PosixStack.cc
   async/Stack.cc
   async/crypto_onwire.cc
+  async/frames_v2.cc
   async/net_handler.cc)
 
 if(LINUX)
index cdb8b7205571a855b58568eb9f28b0aebdc11095..9b9d6f122eea8a1f03c42dc10a99041290238cfa 100644 (file)
@@ -92,6 +92,7 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection)
       replacing(false),
       can_write(false),
       bannerExchangeCallback(nullptr),
+      tx_frame_asm(&session_stream_handlers),
       next_tag(static_cast<Tag>(0)),
       keepalive(false) {
 }
@@ -573,11 +574,14 @@ template <class F>
 bool ProtocolV2::append_frame(F& frame) {
   ceph::bufferlist bl;
   try {
-    bl = frame.get_buffer(session_stream_handlers);
+    bl = frame.get_buffer(tx_frame_asm);
   } catch (ceph::crypto::onwire::TxHandlerError &e) {
     ldout(cct, 1) << __func__ << " " << e.what() << dendl;
     return false;
   }
+
+  ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
+                 << " bytes " << tx_frame_asm << dendl;
   connection->outgoing_bl.append(bl);
   return true;
 }
@@ -787,11 +791,14 @@ CtPtr ProtocolV2::write(const std::string &desc,
                         F &frame) {
   ceph::bufferlist bl;
   try {
-    bl = frame.get_buffer(session_stream_handlers);
+    bl = frame.get_buffer(tx_frame_asm);
   } catch (ceph::crypto::onwire::TxHandlerError &e) {
     ldout(cct, 1) << __func__ << " " << e.what() << dendl;
     return _fault();
   }
+
+  ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
+                 << " bytes " << tx_frame_asm << dendl;
   return write(desc, next, bl);
 }
 
index 29ad191cbb1a2b5edac1f2b17ae0f053b9936a22..78224772604ee79519581204815bce989cc28eb0 100644 (file)
@@ -98,6 +98,7 @@ private:
 
   boost::container::static_vector<ceph::msgr::v2::segment_t,
                                  ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_desc;
+  ceph::msgr::v2::FrameAssembler tx_frame_asm;
   ceph::msgr::v2::segment_bls_t rx_segments_data;
   ceph::msgr::v2::Tag next_tag;
   utime_t backoff;  // backoff time
diff --git a/src/msg/async/frames_v2.cc b/src/msg/async/frames_v2.cc
new file mode 100644 (file)
index 0000000..ead90ae
--- /dev/null
@@ -0,0 +1,167 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "frames_v2.h"
+
+#include <ostream>
+
+#include <fmt/format.h>
+
+namespace ceph::msgr::v2 {
+
+// Discards trailing empty segments, unless there is just one segment.
+// A frame always has at least one (possibly empty) segment.
+static size_t calc_num_segments(const bufferlist segment_bls[],
+                                size_t segment_count) {
+  ceph_assert(segment_count > 0 && segment_count <= MAX_NUM_SEGMENTS);
+  for (size_t i = segment_count; i-- > 0; ) {
+    if (segment_bls[i].length() > 0) {
+      return i + 1;
+    }
+  }
+  return 1;
+}
+
+void FrameAssembler::fill_preamble(Tag tag,
+                                   preamble_block_t& preamble) const {
+  // FIPS zeroization audit 20191115: this memset is not security related.
+  ::memset(&preamble, 0, sizeof(preamble));
+
+  preamble.tag = static_cast<__u8>(tag);
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    preamble.segments[i].length = m_descs[i].logical_len;
+    preamble.segments[i].alignment = m_descs[i].align;
+  }
+  preamble.num_segments = m_descs.size();
+  preamble.crc = ceph_crc32c(
+      0, reinterpret_cast<const unsigned char*>(&preamble),
+      sizeof(preamble) - sizeof(preamble.crc));
+}
+
+uint64_t FrameAssembler::get_frame_logical_len() const {
+  ceph_assert(!m_descs.empty());
+  uint64_t logical_len = 0;
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    logical_len += m_descs[i].logical_len;
+  }
+  return logical_len;
+}
+
+uint64_t FrameAssembler::get_frame_onwire_len() const {
+  ceph_assert(!m_descs.empty());
+  uint64_t onwire_len = get_preamble_onwire_len();
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    onwire_len += get_segment_onwire_len(i);
+  }
+  onwire_len += get_epilogue_onwire_len();
+  return onwire_len;
+}
+
+bufferlist FrameAssembler::asm_crc_rev0(const preamble_block_t& preamble,
+                                        bufferlist segment_bls[]) const {
+  epilogue_plain_block_t epilogue;
+  // FIPS zeroization audit 20191115: this memset is not security related.
+  ::memset(&epilogue, 0, sizeof(epilogue));
+
+  bufferlist frame_bl(sizeof(preamble) + sizeof(epilogue));
+  frame_bl.append(reinterpret_cast<const char*>(&preamble), sizeof(preamble));
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    ceph_assert(segment_bls[i].length() == m_descs[i].logical_len);
+    epilogue.crc_values[i] = segment_bls[i].crc32c(-1);
+    if (segment_bls[i].length() > 0) {
+      frame_bl.claim_append(segment_bls[i]);
+    }
+  }
+  frame_bl.append(reinterpret_cast<const char*>(&epilogue), sizeof(epilogue));
+  return frame_bl;
+}
+
+bufferlist FrameAssembler::asm_secure_rev0(const preamble_block_t& preamble,
+                                           bufferlist segment_bls[]) const {
+  bufferlist preamble_bl(sizeof(preamble));
+  preamble_bl.append(reinterpret_cast<const char*>(&preamble),
+                     sizeof(preamble));
+
+  epilogue_secure_block_t epilogue;
+  // FIPS zeroization audit 20191115: this memset is not security related.
+  ::memset(&epilogue, 0, sizeof(epilogue));
+  bufferlist epilogue_bl(sizeof(epilogue));
+  epilogue_bl.append(reinterpret_cast<const char*>(&epilogue),
+                     sizeof(epilogue));
+
+  // preamble + MAX_NUM_SEGMENTS + epilogue
+  uint32_t onwire_lens[MAX_NUM_SEGMENTS + 2];
+  onwire_lens[0] = preamble_bl.length();
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    onwire_lens[i + 1] = segment_bls[i].length();  // already padded
+  }
+  onwire_lens[m_descs.size() + 1] = epilogue_bl.length();
+  m_crypto->tx->reset_tx_handler(onwire_lens,
+                                 onwire_lens + m_descs.size() + 2);
+  m_crypto->tx->authenticated_encrypt_update(preamble_bl);
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    if (segment_bls[i].length() > 0) {
+      m_crypto->tx->authenticated_encrypt_update(segment_bls[i]);
+    }
+  }
+  m_crypto->tx->authenticated_encrypt_update(epilogue_bl);
+  return m_crypto->tx->authenticated_encrypt_final();
+}
+
+bufferlist FrameAssembler::assemble_frame(Tag tag, bufferlist segment_bls[],
+                                          const uint16_t segment_aligns[],
+                                          size_t segment_count) {
+  m_descs.resize(calc_num_segments(segment_bls, segment_count));
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    m_descs[i].logical_len = segment_bls[i].length();
+    m_descs[i].align = segment_aligns[i];
+  }
+
+  preamble_block_t preamble;
+  fill_preamble(tag, preamble);
+
+  if (m_crypto->rx) {
+    for (size_t i = 0; i < m_descs.size(); i++) {
+      ceph_assert(segment_bls[i].length() == m_descs[i].logical_len);
+      // We're padding segments to biggest cipher's block size. Although
+      // AES-GCM can live without that as it's a stream cipher, we don't
+      // want to be fixed to stream ciphers only.
+      uint32_t padded_len = get_segment_padded_len(i);
+      if (padded_len > segment_bls[i].length()) {
+        uint32_t pad_len = padded_len - segment_bls[i].length();
+        segment_bls[i].reserve(pad_len);
+        segment_bls[i].append_zero(pad_len);
+      }
+    }
+    return asm_secure_rev0(preamble, segment_bls);
+  }
+  return asm_crc_rev0(preamble, segment_bls);
+}
+
+std::ostream& operator<<(std::ostream& os, const FrameAssembler& frame_asm) {
+  if (!frame_asm.m_descs.empty()) {
+    os << frame_asm.get_preamble_onwire_len();
+    for (size_t i = 0; i < frame_asm.m_descs.size(); i++) {
+      os << " + " << frame_asm.get_segment_onwire_len(i)
+         << " (logical " << frame_asm.m_descs[i].logical_len
+         << "/" << frame_asm.m_descs[i].align << ")";
+    }
+    os << " + " << frame_asm.get_epilogue_onwire_len() << " ";
+  }
+  os << "rx=" << frame_asm.m_crypto->rx.get()
+     << " tx=" << frame_asm.m_crypto->tx.get();
+  return os;
+}
+
+}  // namespace ceph::msgr::v2
index 0576ce9cf92e0f55fcfabb95d57810b8f894970d..035549b8a064a822bafce5035c1f4cd1e679e10b 100644 (file)
@@ -5,6 +5,7 @@
 #include "common/Clock.h"
 #include "crypto_onwire.h"
 #include <array>
+#include <iosfwd>
 #include <utility>
 
 #include <boost/container/static_vector.hpp>
@@ -157,157 +158,107 @@ static uint32_t segment_onwire_size(const uint32_t logical_size)
   return p2roundup<uint32_t>(logical_size, CRYPTO_BLOCK_SIZE);
 }
 
-static inline ceph::bufferlist segment_onwire_bufferlist(ceph::bufferlist&& bl)
-{
-  const auto padding_size = segment_onwire_size(bl.length()) - bl.length();
-  if (padding_size) {
-    bl.append_zero(padding_size);
+struct FrameError : std::runtime_error {
+  using runtime_error::runtime_error;
+};
+
+class FrameAssembler {
+public:
+  // crypto must be non-null
+  FrameAssembler(const ceph::crypto::onwire::rxtx_t* crypto)
+      : m_crypto(crypto) {}
+
+  size_t get_num_segments() const {
+    ceph_assert(!m_descs.empty());
+    return m_descs.size();
   }
-  return std::move(bl);
-}
 
-template <class T, uint16_t... SegmentAlignmentVs>
-struct Frame {
-  static constexpr size_t SegmentsNumV = sizeof...(SegmentAlignmentVs);
-  static_assert(SegmentsNumV > 0 && SegmentsNumV <= MAX_NUM_SEGMENTS);
-protected:
-  std::array<ceph::bufferlist, SegmentsNumV> segments;
+  uint32_t get_segment_logical_len(size_t seg_idx) const {
+    ceph_assert(seg_idx < m_descs.size());
+    return m_descs[seg_idx].logical_len;
+  }
 
-private:
-  static constexpr std::array<uint16_t, SegmentsNumV> alignments {
-    SegmentAlignmentVs...
-  };
-  ceph::bufferlist::contiguous_filler preamble_filler;
+  uint16_t get_segment_align(size_t seg_idx) const {
+    ceph_assert(seg_idx < m_descs.size());
+    return m_descs[seg_idx].align;
+  }
 
-  __u8 calc_num_segments(const segment_t segments[])
-  {
-    for (__u8 num = SegmentsNumV; num > 0; num--) {
-      if (segments[num-1].length) {
-        return num;
-      }
+  uint32_t get_preamble_onwire_len() const {
+    return sizeof(preamble_block_t);
+  }
+
+  uint32_t get_segment_onwire_len(size_t seg_idx) const {
+    ceph_assert(seg_idx < m_descs.size());
+    if (m_crypto->rx) {
+      return get_segment_padded_len(seg_idx);
     }
-    // frame always has at least one segment.
-    return 1;
+    return m_descs[seg_idx].logical_len;
   }
 
-  // craft the main preamble. It's always present regardless of the number
-  // of segments message is composed from.
-  void fill_preamble() {
-    ceph_assert(std::size(segments) <= MAX_NUM_SEGMENTS);
-
-    preamble_block_t main_preamble;
-    // FIPS zeroization audit 20191115: this memset is not security related.
-    ::memset(&main_preamble, 0, sizeof(main_preamble));
-
-    main_preamble.tag = static_cast<__u8>(T::tag);
-    ceph_assert(main_preamble.tag != 0);
-
-    // implementation detail: the first bufferlist of Frame::segments carries
-    // space for preamble. This glueing isn't a part of the onwire format but
-    // just our private detail.
-    main_preamble.segments[0].length =
-        segments[0].length() - FRAME_PREAMBLE_SIZE;
-    main_preamble.segments[0].alignment = alignments[0];
-
-    // there is no business in issuing frame without at least one segment
-    // filled.
-    if constexpr(SegmentsNumV > 1) {
-      for (__u8 idx = 1; idx < SegmentsNumV; idx++) {
-        main_preamble.segments[idx].length = segments[idx].length();
-        main_preamble.segments[idx].alignment = alignments[idx];
-      }
+  uint32_t get_epilogue_onwire_len() const {
+    ceph_assert(!m_descs.empty());
+    if (m_crypto->rx) {
+      return sizeof(epilogue_secure_block_t) + get_auth_tag_len();
     }
-    // calculate the number of non-empty segments.
-    // TODO: reorder segments to get DATA first
-    main_preamble.num_segments = calc_num_segments(main_preamble.segments);
+    return sizeof(epilogue_plain_block_t);
+  }
 
-    main_preamble.crc =
-        ceph_crc32c(0, reinterpret_cast<unsigned char *>(&main_preamble),
-                    sizeof(main_preamble) - sizeof(main_preamble.crc));
+  uint64_t get_frame_logical_len() const;
+  uint64_t get_frame_onwire_len() const;
 
-    preamble_filler.copy_in(sizeof(main_preamble),
-                            reinterpret_cast<const char *>(&main_preamble));
-  }
+  bufferlist assemble_frame(Tag tag, bufferlist segment_bls[],
+                            const uint16_t segment_aligns[],
+                            size_t segment_count);
 
-  template <size_t... Is>
-  void reset_tx_handler(
-    ceph::crypto::onwire::rxtx_t &session_stream_handlers,
-    std::index_sequence<Is...>)
-  {
-    session_stream_handlers.tx->reset_tx_handler({ segments[Is].length()...,
-                                                   sizeof(epilogue_secure_block_t) });
+private:
+  struct segment_desc_t {
+    uint32_t logical_len;
+    uint16_t align;
+  };
+
+  uint32_t get_segment_padded_len(size_t seg_idx) const {
+    return p2roundup<uint32_t>(m_descs[seg_idx].logical_len,
+                               CRYPTO_BLOCK_SIZE);
   }
 
-public:
-  ceph::bufferlist get_buffer(
-    ceph::crypto::onwire::rxtx_t &session_stream_handlers)
-  {
-    fill_preamble();
-    if (session_stream_handlers.tx) {
-      // we're padding segments to biggest cipher's block size. Although
-      // AES-GCM can live without that as it's a stream cipher, we don't
-      // to be fixed to stream ciphers only.
-      for (auto& segment : segments) {
-        segment = segment_onwire_bufferlist(std::move(segment));
-      }
+  uint32_t get_auth_tag_len() const {
+    return m_crypto->rx->get_extra_size_at_final();
+  }
 
-      // let's cipher allocate one huge buffer for entire ciphertext.
-      reset_tx_handler(
-          session_stream_handlers, std::make_index_sequence<SegmentsNumV>());
+  bufferlist asm_crc_rev0(const preamble_block_t& preamble,
+                          bufferlist segment_bls[]) const;
+  bufferlist asm_secure_rev0(const preamble_block_t& preamble,
+                             bufferlist segment_bls[]) const;
 
-      for (auto& segment : segments) {
-        if (segment.length()) {
-          session_stream_handlers.tx->authenticated_encrypt_update(
-            std::move(segment));
-        }
-      }
+  void fill_preamble(Tag tag, preamble_block_t& preamble) const;
+  friend std::ostream& operator<<(std::ostream& os,
+                                  const FrameAssembler& frame_asm);
 
-      // in secure mode we craft only the late_flags. Signature (for AES-GCM
-      // called auth tag) will be added by the cipher.
-      {
-        epilogue_secure_block_t epilogue;
-        // FIPS zeroization audit 20191115: this memset is not security
-        // related.
-        ::memset(&epilogue, 0, sizeof(epilogue));
-        ceph::bufferlist epilogue_bl;
-        epilogue_bl.append(reinterpret_cast<const char*>(&epilogue),
-                           sizeof(epilogue));
-        session_stream_handlers.tx->authenticated_encrypt_update(epilogue_bl);
-      }
-      return session_stream_handlers.tx->authenticated_encrypt_final();
-    } else {
-      // plain mode
-      epilogue_plain_block_t epilogue;
-      // FIPS zeroization audit 20191115: this memset is not security related.
-      ::memset(&epilogue, 0, sizeof(epilogue));
-
-      ceph::bufferlist::const_iterator hdriter(&segments.front(),
-                                               FRAME_PREAMBLE_SIZE);
-      epilogue.crc_values[SegmentIndex::Control::PAYLOAD] =
-          hdriter.crc32c(hdriter.get_remaining(), -1);
-      if constexpr(SegmentsNumV > 1) {
-        for (__u8 idx = 1; idx < SegmentsNumV; idx++) {
-          epilogue.crc_values[idx] = segments[idx].crc32c(-1);
-        }
-      }
+  boost::container::static_vector<segment_desc_t, MAX_NUM_SEGMENTS> m_descs;
+  const ceph::crypto::onwire::rxtx_t* m_crypto;
+};
 
-      ceph::bufferlist ret;
-      for (auto& segment : segments) {
-        ret.claim_append(segment);
-      }
-      ret.append(reinterpret_cast<const char*>(&epilogue), sizeof(epilogue));
-      return ret;
-    }
-  }
+template <class T, uint16_t... SegmentAlignmentVs>
+struct Frame {
+  static constexpr size_t SegmentsNumV = sizeof...(SegmentAlignmentVs);
+  static_assert(SegmentsNumV > 0 && SegmentsNumV <= MAX_NUM_SEGMENTS);
+protected:
+  std::array<ceph::bufferlist, SegmentsNumV> segments;
 
-  Frame()
-    : preamble_filler(segments.front().append_hole(FRAME_PREAMBLE_SIZE)) {
-  }
+private:
+  static constexpr std::array<uint16_t, SegmentsNumV> alignments {
+    SegmentAlignmentVs...
+  };
 
 public:
+  ceph::bufferlist get_buffer(FrameAssembler& tx_frame_asm) {
+    auto bl = tx_frame_asm.assemble_frame(T::tag, segments.data(),
+                                          alignments.data(), SegmentsNumV);
+    ceph_assert(bl.length() == tx_frame_asm.get_frame_onwire_len());
+    return bl;
+  }
 };
 
-
 // ControlFrames are used to manage transceiver state (like connections) and
 // orchestrate transfers of MessageFrames. They use only single segment with
 // marshalling facilities -- derived classes specify frame structure through