]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: introduce and integrate FrameAssemblerV2
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 17 Nov 2022 09:06:17 +0000 (17:06 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 8 Feb 2023 06:07:41 +0000 (14:07 +0800)
FrameAssemblerV2 encapsulates the low-level frame processing for
protocol v2, and manages the socket instance.

FrameAssemblerV2 is supposed to be running on the socket core for
performance reasons, which will be changable.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/CMakeLists.txt
src/crimson/net/FrameAssemblerV2.cc [new file with mode: 0644]
src/crimson/net/FrameAssemblerV2.h [new file with mode: 0644]
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/SocketConnection.h

index 44c2ee9306584e7bdc153950a9af3b47d1d09ecc..cd5d1fb0cd8a3d6ec7014760d3219537499277d9 100644 (file)
@@ -175,6 +175,7 @@ set(crimson_net_srcs
   ${PROJECT_SOURCE_DIR}/src/msg/async/compression_onwire.cc
   ${PROJECT_SOURCE_DIR}/src/msg/async/frames_v2.cc
   net/Errors.cc
+  net/FrameAssemblerV2.cc
   net/Messenger.cc
   net/SocketConnection.cc
   net/SocketMessenger.cc
diff --git a/src/crimson/net/FrameAssemblerV2.cc b/src/crimson/net/FrameAssemblerV2.cc
new file mode 100644 (file)
index 0000000..cf41989
--- /dev/null
@@ -0,0 +1,264 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "FrameAssemblerV2.h"
+
+#include "Errors.h"
+#include "SocketConnection.h"
+
+using ceph::msgr::v2::FrameAssembler;
+using ceph::msgr::v2::FrameError;
+using ceph::msgr::v2::preamble_block_t;
+using ceph::msgr::v2::segment_t;
+using ceph::msgr::v2::Tag;
+
+namespace {
+
+seastar::logger& logger() {
+  return crimson::get_logger(ceph_subsys_ms);
+}
+
+} // namespace anonymous
+
+namespace crimson::net {
+
+FrameAssemblerV2::FrameAssemblerV2(SocketConnection &_conn)
+  : conn{_conn}
+{}
+
+void FrameAssemblerV2::set_is_rev1(bool _is_rev1)
+{
+  is_rev1 = _is_rev1;
+  tx_frame_asm.set_is_rev1(_is_rev1);
+  rx_frame_asm.set_is_rev1(_is_rev1);
+}
+
+void FrameAssemblerV2::create_session_stream_handlers(
+  const AuthConnectionMeta &auth_meta,
+  bool crossed)
+{
+  session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+      nullptr, auth_meta, is_rev1, crossed);
+}
+
+void FrameAssemblerV2::reset_handlers()
+{
+  session_stream_handlers = { nullptr, nullptr };
+  session_comp_handlers = { nullptr, nullptr };
+}
+
+FrameAssemblerV2::mover_t
+FrameAssemblerV2::to_replace()
+{
+  assert(has_socket());
+  socket = nullptr;
+  return mover_t{
+      std::move(conn.socket),
+      std::move(session_stream_handlers),
+      std::move(session_comp_handlers)};
+}
+
+void FrameAssemblerV2::replace_by(FrameAssemblerV2::mover_t &&mover)
+{
+  set_socket(std::move(mover.socket));
+  record_io = false;
+  rxbuf.clear();
+  txbuf.clear();
+  session_stream_handlers = std::move(mover.session_stream_handlers);
+  session_comp_handlers = std::move(mover.session_comp_handlers);
+}
+
+void FrameAssemblerV2::start_recording()
+{
+  record_io = true;
+  rxbuf.clear();
+  txbuf.clear();
+}
+
+FrameAssemblerV2::record_bufs_t
+FrameAssemblerV2::stop_recording()
+{
+  ceph_assert_always(record_io == true);
+  record_io = false;
+  return record_bufs_t{std::move(rxbuf), std::move(txbuf)};
+}
+
+bool FrameAssemblerV2::has_socket() const
+{
+  assert((socket && conn.socket) || (!socket && !conn.socket));
+  return socket != nullptr;
+}
+
+void FrameAssemblerV2::set_socket(SocketRef &&_socket)
+{
+  assert(!has_socket());
+  ceph_assert_always(!conn.socket);
+  socket = _socket.get();
+  conn.socket = std::move(_socket);
+  assert(has_socket());
+}
+
+void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port)
+{
+  assert(has_socket());
+  socket->learn_ephemeral_port_as_connector(port);
+}
+
+void FrameAssemblerV2::shutdown_socket()
+{
+  if (has_socket()) {
+    socket->shutdown();
+  }
+}
+
+seastar::future<> FrameAssemblerV2::reset_and_close_socket(bool do_reset)
+{
+  if (!has_socket()) {
+    return seastar::now();
+  }
+  if (do_reset) {
+    socket = nullptr;
+    return conn.socket->close(
+    ).then([sock = std::move(conn.socket)] {});
+  } else {
+    return socket->close();
+  }
+}
+
+seastar::future<Socket::tmp_buf>
+FrameAssemblerV2::read_exactly(std::size_t bytes)
+{
+  assert(has_socket());
+  if (unlikely(record_io)) {
+    return socket->read_exactly(bytes
+    ).then([this](auto bl) {
+      rxbuf.append(buffer::create(bl.share()));
+      return bl;
+    });
+  } else {
+    return socket->read_exactly(bytes);
+  };
+}
+
+seastar::future<ceph::bufferlist>
+FrameAssemblerV2::read(std::size_t bytes)
+{
+  assert(has_socket());
+  if (unlikely(record_io)) {
+    return socket->read(bytes
+    ).then([this](auto buf) {
+      rxbuf.append(buf);
+      return buf;
+    });
+  } else {
+    return socket->read(bytes);
+  }
+}
+
+seastar::future<>
+FrameAssemblerV2::write(ceph::bufferlist &&buf)
+{
+  assert(has_socket());
+  if (unlikely(record_io)) {
+    txbuf.append(buf);
+  }
+  return socket->write(std::move(buf));
+}
+
+seastar::future<>
+FrameAssemblerV2::flush()
+{
+  assert(has_socket());
+  return socket->flush();
+}
+
+seastar::future<>
+FrameAssemblerV2::write_flush(ceph::bufferlist &&buf)
+{
+  assert(has_socket());
+  if (unlikely(record_io)) {
+    txbuf.append(buf);
+  }
+  return socket->write_flush(std::move(buf));
+}
+
+seastar::future<FrameAssemblerV2::read_main_t>
+FrameAssemblerV2::read_main_preamble()
+{
+  rx_preamble.clear();
+  return read_exactly(rx_frame_asm.get_preamble_onwire_len()
+  ).then([this](auto bl) {
+    try {
+      rx_preamble.append(buffer::create(std::move(bl)));
+      const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
+      return read_main_t{tag, &rx_frame_asm};
+    } catch (FrameError& e) {
+      logger().warn("{} read_main_preamble: {}", conn, e.what());
+      throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+    }
+  });
+}
+
+seastar::future<FrameAssemblerV2::read_payload_t*>
+FrameAssemblerV2::read_frame_payload()
+{
+  rx_segments_data.clear();
+  return seastar::do_until(
+    [this] {
+      return rx_frame_asm.get_num_segments() == rx_segments_data.size();
+    },
+    [this] {
+      // TODO: create aligned and contiguous buffer from socket
+      const size_t seg_idx = rx_segments_data.size();
+      if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx);
+          alignment != segment_t::DEFAULT_ALIGNMENT) {
+        logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
+                       conn, alignment, rx_segments_data.size());
+      }
+      uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
+      // TODO: create aligned and contiguous buffer from socket
+      return read_exactly(onwire_len
+      ).then([this](auto tmp_bl) {
+        logger().trace("{} RECV({}) frame segment[{}]",
+                       conn, tmp_bl.size(), rx_segments_data.size());
+        bufferlist segment;
+        segment.append(buffer::create(std::move(tmp_bl)));
+        rx_segments_data.emplace_back(std::move(segment));
+      });
+    }
+  ).then([this] {
+    return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
+  }).then([this](auto bl) {
+    logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
+    bool ok = false;
+    try {
+      bufferlist rx_epilogue;
+      rx_epilogue.append(buffer::create(std::move(bl)));
+      ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
+    } catch (FrameError& e) {
+      logger().error("read_frame_payload: {} {}", conn, e.what());
+      throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+    } catch (ceph::crypto::onwire::MsgAuthError&) {
+      logger().error("read_frame_payload: {} bad auth tag", conn);
+      throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+    }
+    // we do have a mechanism that allows transmitter to start sending message
+    // and abort after putting entire data field on wire. This will be used by
+    // the kernel client to avoid unnecessary buffering.
+    if (!ok) {
+      ceph_abort("TODO");
+    }
+    return &rx_segments_data;
+  });
+}
+
+void FrameAssemblerV2::log_main_preamble(const ceph::bufferlist &bl)
+{
+  const auto main_preamble =
+    reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
+  logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
+                 conn, bl.length(), (int)main_preamble->tag,
+                 (int)main_preamble->num_segments, main_preamble->crc);
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/FrameAssemblerV2.h b/src/crimson/net/FrameAssemblerV2.h
new file mode 100644 (file)
index 0000000..f72eeee
--- /dev/null
@@ -0,0 +1,158 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "msg/async/frames_v2.h"
+#include "msg/async/crypto_onwire.h"
+#include "msg/async/compression_onwire.h"
+
+#include "crimson/net/Socket.h"
+
+namespace crimson::net {
+
+class SocketConnection;
+
+class FrameAssemblerV2 {
+public:
+  FrameAssemblerV2(SocketConnection &conn);
+
+  ~FrameAssemblerV2() = default;
+
+  FrameAssemblerV2(const FrameAssemblerV2 &) = delete;
+
+  FrameAssemblerV2(FrameAssemblerV2 &&) = delete;
+
+  void set_is_rev1(bool is_rev1);
+
+  void create_session_stream_handlers(
+      const AuthConnectionMeta &auth_meta,
+      bool crossed);
+
+  void reset_handlers();
+
+  /*
+   * replacing
+   */
+
+  struct mover_t {
+    SocketRef socket;
+    ceph::crypto::onwire::rxtx_t session_stream_handlers;
+    ceph::compression::onwire::rxtx_t session_comp_handlers;
+  };
+
+  mover_t to_replace();
+
+  void replace_by(mover_t &&);
+
+  /*
+   * auth signature interfaces
+   */
+
+  void start_recording();
+
+  struct record_bufs_t {
+    ceph::bufferlist rxbuf;
+    ceph::bufferlist txbuf;
+  };
+  record_bufs_t stop_recording();
+
+  /*
+   * socket maintainence interfaces
+   */
+
+  bool has_socket() const;
+
+  void set_socket(SocketRef &&);
+
+  void learn_socket_ephemeral_port_as_connector(uint16_t port);
+
+  void shutdown_socket();
+
+  seastar::future<> reset_and_close_socket(bool do_reset=true);
+
+  /*
+   * socket read and write interfaces
+   */
+
+  seastar::future<Socket::tmp_buf> read_exactly(std::size_t bytes);
+
+  seastar::future<ceph::bufferlist> read(std::size_t bytes);
+
+  seastar::future<> write(ceph::bufferlist &&);
+
+  seastar::future<> flush();
+
+  seastar::future<> write_flush(ceph::bufferlist &&);
+
+  /*
+   * frame read and write interfaces
+   */
+
+  /// may throw negotiation_failure as fault
+  struct read_main_t {
+    ceph::msgr::v2::Tag tag;
+    const ceph::msgr::v2::FrameAssembler *rx_frame_asm;
+  };
+  seastar::future<read_main_t> read_main_preamble();
+
+  /// may throw negotiation_failure as fault
+  using read_payload_t = ceph::msgr::v2::segment_bls_t;
+  // FIXME: read_payload_t cannot be no-throw move constructible
+  seastar::future<read_payload_t*> read_frame_payload();
+
+  template <class F>
+  ceph::bufferlist get_buffer(F &tx_frame) {
+    auto bl = tx_frame.get_buffer(tx_frame_asm);
+    log_main_preamble(bl);
+    return bl;
+  }
+
+  template <class F>
+  seastar::future<> write_flush_frame(F &tx_frame) {
+    auto bl = get_buffer(tx_frame);
+    return write_flush(std::move(bl));
+  }
+
+private:
+  void log_main_preamble(const ceph::bufferlist &bl);
+
+  SocketConnection &conn;
+
+  Socket *socket = nullptr;
+
+  /*
+   * auth signature
+   */
+
+  bool record_io = false;
+
+  ceph::bufferlist rxbuf;
+
+  ceph::bufferlist txbuf;
+
+  /*
+   * frame data and handlers
+   */
+
+  ceph::crypto::onwire::rxtx_t session_stream_handlers = { nullptr, nullptr };
+
+  // TODO
+  ceph::compression::onwire::rxtx_t session_comp_handlers = { nullptr, nullptr };
+
+  bool is_rev1 = false;
+
+  ceph::msgr::v2::FrameAssembler tx_frame_asm{
+    &session_stream_handlers, is_rev1, common::local_conf()->ms_crc_data,
+    &session_comp_handlers};
+
+  ceph::msgr::v2::FrameAssembler rx_frame_asm{
+    &session_stream_handlers, is_rev1, common::local_conf()->ms_crc_data,
+    &session_comp_handlers};
+
+  ceph::bufferlist rx_preamble;
+
+  read_payload_t rx_segments_data;
+};
+
+} // namespace crimson::net
index e38590a1aec86508ff503c3c7b69d057d6f9202b..f8bb3b7e932ae1295f31271c3208515c44e5c67c 100644 (file)
@@ -22,7 +22,8 @@ namespace crimson::net {
 Protocol::Protocol(ChainedDispatchers& dispatchers,
                    SocketConnection& conn)
   : dispatchers(dispatchers),
-    conn(conn)
+    conn(conn),
+    frame_assembler(conn)
 {}
 
 Protocol::~Protocol()
@@ -156,7 +157,7 @@ void Protocol::ack_out_sent(seq_num_t seq)
 
 seastar::future<stop_t> Protocol::try_exit_out_dispatch() {
   assert(!is_out_queued());
-  return conn.socket->flush().then([this] {
+  return frame_assembler.flush().then([this] {
     if (!is_out_queued()) {
       // still nothing pending to send after flush,
       // the dispatching can ONLY stop now
@@ -190,7 +191,7 @@ seastar::future<> Protocol::do_out_dispatch()
       auto to_ack = ack_left;
       assert(to_ack == 0 || in_seq > 0);
       // sweep all pending out with the concrete Protocol
-      return conn.socket->write(
+      return frame_assembler.write(
         sweep_out_pending_msgs_to_sent(
           num_msgs, need_keepalive, next_keepalive_ack, to_ack > 0)
       ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
@@ -241,7 +242,8 @@ seastar::future<> Protocol::do_out_dispatch()
                      conn, out_state, e);
       ceph_abort();
     }
-    conn.socket->shutdown();
+    ceph_assert_always(frame_assembler.has_socket());
+    frame_assembler.shutdown_socket();
     if (out_state == out_state_t::open) {
       logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
                     conn, out_state, e);
index 260634a2828468f32c5d0a58409940f75d14e65b..c352683dcb79c03246016950cf51d61bb7d43296 100644 (file)
@@ -11,6 +11,7 @@
 #include "crimson/common/log.h"
 #include "Fwd.h"
 #include "SocketConnection.h"
+#include "FrameAssemblerV2.h"
 
 namespace crimson::net {
 
@@ -141,13 +142,6 @@ class Protocol {
     in_seq = 0;
   }
 
-  bool is_out_queued() const {
-    return (!out_pending_msgs.empty() ||
-            ack_left > 0 ||
-            need_keepalive ||
-            next_keepalive_ack.has_value());
-  }
-
   bool is_out_queued_or_sent() const {
     return is_out_queued() || !out_sent_msgs.empty();
   }
@@ -174,7 +168,16 @@ class Protocol {
 
   SocketConnection &conn;
 
+  FrameAssemblerV2 frame_assembler;
+
  private:
+  bool is_out_queued() const {
+    return (!out_pending_msgs.empty() ||
+            ack_left > 0 ||
+            need_keepalive ||
+            next_keepalive_ack.has_value());
+  }
+
   seastar::future<stop_t> try_exit_out_dispatch();
 
   seastar::future<> do_out_dispatch();
index 3b16818cc62acfd9ea3db898267ae01b64a14539..4f6e113bc809695a1031aeda1a24a130b4a8dc89 100644 (file)
@@ -99,6 +99,17 @@ inline uint64_t generate_client_cookie() {
       1, std::numeric_limits<uint64_t>::max());
 }
 
+std::size_t get_msg_size(const FrameAssembler &rx_frame_asm)
+{
+  ceph_assert(rx_frame_asm.get_num_segments() > 0);
+  size_t sum = 0;
+  // we don't include SegmentIndex::Msg::HEADER.
+  for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
+    sum += rx_frame_asm.get_segment_logical_len(idx);
+  }
+  return sum;
+}
+
 } // namespace anonymous
 
 namespace fmt {
@@ -182,7 +193,7 @@ void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
                                const entity_name_t& _peer_name)
 {
   ceph_assert(state == state_t::NONE);
-  ceph_assert(!conn.socket);
+  ceph_assert(!frame_assembler.has_socket());
   ceph_assert(!gate.is_closed());
   conn.peer_addr = _peer_addr;
   conn.target_addr = _peer_addr;
@@ -203,10 +214,10 @@ void ProtocolV2::start_accept(SocketRef&& sock,
                               const entity_addr_t& _peer_addr)
 {
   ceph_assert(state == state_t::NONE);
-  ceph_assert(!conn.socket);
+  ceph_assert(!frame_assembler.has_socket());
   // until we know better
   conn.target_addr = _peer_addr;
-  conn.socket = std::move(sock);
+  frame_assembler.set_socket(std::move(sock));
   logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
   messenger.accept_conn(
     seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
@@ -215,148 +226,32 @@ void ProtocolV2::start_accept(SocketRef&& sock,
 
 // TODO: Frame related implementations, probably to a separate class.
 
-void ProtocolV2::enable_recording()
-{
-  rxbuf.clear();
-  txbuf.clear();
-  record_io = true;
-}
-
-seastar::future<Socket::tmp_buf> ProtocolV2::read_exactly(size_t bytes)
-{
-  if (unlikely(record_io)) {
-    return conn.socket->read_exactly(bytes
-    ).then([this] (auto bl) {
-      rxbuf.append(buffer::create(bl.share()));
-      return bl;
-    });
-  } else {
-    return conn.socket->read_exactly(bytes);
-  };
-}
-
-seastar::future<bufferlist> ProtocolV2::read(size_t bytes)
-{
-  if (unlikely(record_io)) {
-    return conn.socket->read(bytes
-    ).then([this] (auto buf) {
-      rxbuf.append(buf);
-      return buf;
-    });
-  } else {
-    return conn.socket->read(bytes);
-  }
-}
-
-seastar::future<> ProtocolV2::write(bufferlist&& buf)
-{
-  if (unlikely(record_io)) {
-    txbuf.append(buf);
-  }
-  return conn.socket->write(std::move(buf));
-}
-
-seastar::future<> ProtocolV2::write_flush(bufferlist&& buf)
-{
-  if (unlikely(record_io)) {
-    txbuf.append(buf);
-  }
-  return conn.socket->write_flush(std::move(buf));
-}
-
-size_t ProtocolV2::get_current_msg_size() const
+seastar::future<FrameAssemblerV2::read_main_t>
+ProtocolV2::read_main_preamble()
 {
-  ceph_assert(rx_frame_asm.get_num_segments() > 0);
-  size_t sum = 0;
-  // we don't include SegmentIndex::Msg::HEADER.
-  for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
-    sum += rx_frame_asm.get_segment_logical_len(idx);
-  }
-  return sum;
-}
-
-seastar::future<Tag> ProtocolV2::read_main_preamble()
-{
-  rx_preamble.clear();
-  return read_exactly(rx_frame_asm.get_preamble_onwire_len())
-    .then([this] (auto bl) {
-      rx_segments_data.clear();
-      try {
-        rx_preamble.append(buffer::create(std::move(bl)));
-        const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
-        INTERCEPT_FRAME(tag, bp_type_t::READ);
-        return tag;
-      } catch (FrameError& e) {
-        logger().warn("{} read_main_preamble: {}", conn, e.what());
-        abort_in_fault();
-      }
-    });
+  return frame_assembler.read_main_preamble(
+#ifdef UNIT_TESTS_BUILT
+  ).then([this](auto ret) {
+    INTERCEPT_FRAME(ret.tag, bp_type_t::READ);
+    return ret;
+  });
+#else
+  );
+#endif
 }
 
-seastar::future<> ProtocolV2::read_frame_payload()
+template <class F>
+ceph::bufferlist ProtocolV2::get_buffer(F &tx_frame)
 {
-  ceph_assert(rx_segments_data.empty());
-
-  return seastar::do_until(
-    [this] { return rx_frame_asm.get_num_segments() == rx_segments_data.size(); },
-    [this] {
-      // TODO: create aligned and contiguous buffer from socket
-      const size_t seg_idx = rx_segments_data.size();
-      if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx);
-         alignment != segment_t::DEFAULT_ALIGNMENT) {
-        logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
-                       conn, alignment, rx_segments_data.size());
-      }
-      uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
-      // TODO: create aligned and contiguous buffer from socket
-      return read_exactly(onwire_len).then([this] (auto tmp_bl) {
-        logger().trace("{} RECV({}) frame segment[{}]",
-                       conn, tmp_bl.size(), rx_segments_data.size());
-        bufferlist segment;
-        segment.append(buffer::create(std::move(tmp_bl)));
-        rx_segments_data.emplace_back(std::move(segment));
-      });
-    }
-  ).then([this] {
-    return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
-  }).then([this] (auto bl) {
-    logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
-    bool ok = false;
-    try {
-      bufferlist rx_epilogue;
-      rx_epilogue.append(buffer::create(std::move(bl)));
-      ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
-    } catch (FrameError& e) {
-      logger().error("read_frame_payload: {} {}", conn, e.what());
-      abort_in_fault();
-    } catch (ceph::crypto::onwire::MsgAuthError&) {
-      logger().error("read_frame_payload: {} bad auth tag", conn);
-      abort_in_fault();
-    }
-    // we do have a mechanism that allows transmitter to start sending message
-    // and abort after putting entire data field on wire. This will be used by
-    // the kernel client to avoid unnecessary buffering.
-    if (!ok) {
-      // TODO
-      ceph_assert(false);
-    }
-  });
+  INTERCEPT_FRAME(F::tag, bp_type_t::WRITE);
+  return frame_assembler.get_buffer(tx_frame);
 }
 
 template <class F>
-seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
+seastar::future<> ProtocolV2::write_flush_frame(F &tx_frame)
 {
-  auto bl = frame.get_buffer(tx_frame_asm);
-  const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
-  logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
-                 conn, bl.length(), (int)main_preamble->tag,
-                 (int)main_preamble->num_segments, main_preamble->crc);
-  INTERCEPT_FRAME(main_preamble->tag, bp_type_t::WRITE);
-  if (flush) {
-    return write_flush(std::move(bl));
-  } else {
-    return write(std::move(bl));
-  }
+  INTERCEPT_FRAME(F::tag, bp_type_t::WRITE);
+  return frame_assembler.write_flush_frame(tx_frame);
 }
 
 void ProtocolV2::trigger_state(state_t _state, out_state_t _out_state, bool reentrant)
@@ -428,11 +323,11 @@ ProtocolV2::banner_exchange(bool is_connect)
                  CEPH_MSGR2_REQUIRED_FEATURES,
                  CEPH_BANNER_V2_PREFIX);
   INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
-  return write_flush(std::move(bl)).then([this] {
+  return frame_assembler.write_flush(std::move(bl)).then([this] {
       // 2. read peer banner
       unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
       INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
-      return read_exactly(banner_len); // or read exactly?
+      return frame_assembler.read_exactly(banner_len); // or read exactly?
     }).then([this] (auto bl) {
       // 3. process peer banner and read banner_payload
       unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
@@ -462,7 +357,7 @@ ProtocolV2::banner_exchange(bool is_connect)
       }
       logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
       INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
-      return read(payload_len);
+      return frame_assembler.read(payload_len);
     }).then([this, is_connect] (bufferlist bl) {
       // 4. process peer banner_payload and send HelloFrame
       auto p = bl.cbegin();
@@ -496,24 +391,23 @@ ProtocolV2::banner_exchange(bool is_connect)
       }
       peer_supported_features = _peer_supported_features;
       bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
-      tx_frame_asm.set_is_rev1(is_rev1);
-      rx_frame_asm.set_is_rev1(is_rev1);
+      frame_assembler.set_is_rev1(is_rev1);
 
       auto hello = HelloFrame::Encode(messenger.get_mytype(),
                                       conn.target_addr);
       logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
                      conn, ceph_entity_type_name(messenger.get_mytype()),
                      conn.target_addr);
-      return write_frame(hello);
+      return write_flush_frame(hello);
     }).then([this] {
       //5. read peer HelloFrame
       return read_main_preamble();
-    }).then([this] (Tag tag) {
-      expect_tag(Tag::HELLO, tag, conn, __func__);
-      return read_frame_payload();
-    }).then([this] {
+    }).then([this](auto ret) {
+      expect_tag(Tag::HELLO, ret.tag, conn, __func__);
+      return frame_assembler.read_frame_payload();
+    }).then([this](auto payload) {
       // 6. process peer HelloFrame
-      auto hello = HelloFrame::Decode(rx_segments_data.back());
+      auto hello = HelloFrame::Decode(payload->back());
       logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
                      conn, ceph_entity_type_name(hello.entity_type()),
                      hello.peer_addr());
@@ -526,13 +420,14 @@ ProtocolV2::banner_exchange(bool is_connect)
 
 seastar::future<> ProtocolV2::handle_auth_reply()
 {
-  return read_main_preamble()
-  .then([this] (Tag tag) {
-    switch (tag) {
+  return read_main_preamble(
+  ).then([this](auto ret) {
+    switch (ret.tag) {
       case Tag::AUTH_BAD_METHOD:
-        return read_frame_payload().then([this] {
+        return frame_assembler.read_frame_payload(
+        ).then([this](auto payload) {
           // handle_auth_bad_method() logic
-          auto bad_method = AuthBadMethodFrame::Decode(rx_segments_data.back());
+          auto bad_method = AuthBadMethodFrame::Decode(payload->back());
           logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
                         "allowed_methods={}, allowed_modes={}",
                         conn, bad_method.method(), cpp_strerror(bad_method.result()),
@@ -550,9 +445,10 @@ seastar::future<> ProtocolV2::handle_auth_reply()
           return client_auth(bad_method.allowed_methods());
         });
       case Tag::AUTH_REPLY_MORE:
-        return read_frame_payload().then([this] {
+        return frame_assembler.read_frame_payload(
+        ).then([this](auto payload) {
           // handle_auth_reply_more() logic
-          auto auth_more = AuthReplyMoreFrame::Decode(rx_segments_data.back());
+          auto auth_more = AuthReplyMoreFrame::Decode(payload->back());
           logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
                          conn, auth_more.auth_payload().length());
           ceph_assert(messenger.get_auth_client());
@@ -562,14 +458,15 @@ seastar::future<> ProtocolV2::handle_auth_reply()
           auto more_reply = AuthRequestMoreFrame::Encode(reply);
           logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
                          conn, reply.length());
-          return write_frame(more_reply);
+          return write_flush_frame(more_reply);
         }).then([this] {
           return handle_auth_reply();
         });
       case Tag::AUTH_DONE:
-        return read_frame_payload().then([this] {
+        return frame_assembler.read_frame_payload(
+        ).then([this](auto payload) {
           // handle_auth_done() logic
-          auto auth_done = AuthDoneFrame::Decode(rx_segments_data.back());
+          auto auth_done = AuthDoneFrame::Decode(payload->back());
           logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
                          conn, auth_done.global_id(),
                          ceph_con_mode_name(auth_done.con_mode()),
@@ -586,13 +483,11 @@ seastar::future<> ProtocolV2::handle_auth_reply()
             abort_in_fault();
           }
           auth_meta->con_mode = auth_done.con_mode();
-          bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
-          session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
-              nullptr, *auth_meta, is_rev1, false);
+          frame_assembler.create_session_stream_handlers(*auth_meta, false);
           return finish_auth();
         });
       default: {
-        unexpected_tag(tag, conn, __func__);
+        unexpected_tag(ret.tag, conn, __func__);
         return seastar::now();
       }
     }
@@ -612,7 +507,7 @@ seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods
     logger().debug("{} WRITE AuthRequestFrame: method={},"
                    " preferred_modes={}, payload_len={}",
                    conn, auth_method, preferred_modes, bl.length());
-    return write_frame(frame).then([this] {
+    return write_flush_frame(frame).then([this] {
       return handle_auth_reply();
     });
   } catch (const crimson::auth::error& e) {
@@ -625,10 +520,11 @@ seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods
 seastar::future<ProtocolV2::next_step_t>
 ProtocolV2::process_wait()
 {
-  return read_frame_payload().then([this] {
+  return frame_assembler.read_frame_payload(
+  ).then([this](auto payload) {
     // handle_wait() logic
     logger().debug("{} GOT WaitFrame", conn);
-    WaitFrame::Decode(rx_segments_data.back());
+    WaitFrame::Decode(payload->back());
     return next_step_t::wait;
   });
 }
@@ -659,14 +555,15 @@ ProtocolV2::client_connect()
                  conn.policy.features_supported,
                  conn.policy.features_required | msgr2_required,
                  flags, client_cookie);
-  return write_frame(client_ident).then([this] {
+  return write_flush_frame(client_ident).then([this] {
     return read_main_preamble();
-  }).then([this] (Tag tag) {
-    switch (tag) {
+  }).then([this](auto ret) {
+    switch (ret.tag) {
       case Tag::IDENT_MISSING_FEATURES:
-        return read_frame_payload().then([this] {
+        return frame_assembler.read_frame_payload(
+        ).then([this](auto payload) {
           // handle_ident_missing_features() logic
-          auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back());
+          auto ident_missing = IdentMissingFeaturesFrame::Decode(payload->back());
           logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
                         " (client does not support all server features)",
                         conn, ident_missing.features());
@@ -676,10 +573,11 @@ ProtocolV2::client_connect()
       case Tag::WAIT:
         return process_wait();
       case Tag::SERVER_IDENT:
-        return read_frame_payload().then([this] {
+        return frame_assembler.read_frame_payload(
+        ).then([this](auto payload) {
           // handle_server_ident() logic
           requeue_out_sent();
-          auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
+          auto server_ident = ServerIdentFrame::Decode(payload->back());
           logger().debug("{} GOT ServerIdentFrame:"
                          " addrs={}, gid={}, gs={},"
                          " features_supported={}, features_required={},"
@@ -738,7 +636,7 @@ ProtocolV2::client_connect()
           return seastar::make_ready_future<next_step_t>(next_step_t::ready);
         });
       default: {
-        unexpected_tag(tag, conn, "post_client_connect");
+        unexpected_tag(ret.tag, conn, "post_client_connect");
         return seastar::make_ready_future<next_step_t>(next_step_t::none);
       }
     }
@@ -760,14 +658,15 @@ ProtocolV2::client_reconnect()
                  conn, messenger.get_myaddrs(),
                  client_cookie, server_cookie,
                  global_seq, connect_seq, get_in_seq());
-  return write_frame(reconnect).then([this] {
+  return write_flush_frame(reconnect).then([this] {
     return read_main_preamble();
-  }).then([this] (Tag tag) {
-    switch (tag) {
+  }).then([this](auto ret) {
+    switch (ret.tag) {
       case Tag::SESSION_RETRY_GLOBAL:
-        return read_frame_payload().then([this] {
+        return frame_assembler.read_frame_payload(
+        ).then([this](auto payload) {
           // handle_session_retry_global() logic
-          auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
+          auto retry = RetryGlobalFrame::Decode(payload->back());
           logger().warn("{} GOT RetryGlobalFrame: gs={}",
                         conn, retry.global_seq());
           global_seq = messenger.get_global_seq(retry.global_seq());
@@ -775,9 +674,10 @@ ProtocolV2::client_reconnect()
           return client_reconnect();
         });
       case Tag::SESSION_RETRY:
-        return read_frame_payload().then([this] {
+        return frame_assembler.read_frame_payload(
+        ).then([this](auto payload) {
           // handle_session_retry() logic
-          auto retry = RetryFrame::Decode(rx_segments_data.back());
+          auto retry = RetryFrame::Decode(payload->back());
           logger().warn("{} GOT RetryFrame: cs={}",
                         conn, retry.connect_seq());
           connect_seq = retry.connect_seq() + 1;
@@ -785,9 +685,10 @@ ProtocolV2::client_reconnect()
           return client_reconnect();
         });
       case Tag::SESSION_RESET:
-        return read_frame_payload().then([this] {
+        return frame_assembler.read_frame_payload(
+        ).then([this](auto payload) {
           // handle_session_reset() logic
-          auto reset = ResetFrame::Decode(rx_segments_data.back());
+          auto reset = ResetFrame::Decode(payload->back());
           logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
           reset_session(reset.full());
           return client_connect();
@@ -795,16 +696,17 @@ ProtocolV2::client_reconnect()
       case Tag::WAIT:
         return process_wait();
       case Tag::SESSION_RECONNECT_OK:
-        return read_frame_payload().then([this] {
+        return frame_assembler.read_frame_payload(
+        ).then([this](auto payload) {
           // handle_reconnect_ok() logic
-          auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
+          auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
           logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
                          conn, reconnect_ok.msg_seq());
           requeue_out_sent_up_to(reconnect_ok.msg_seq());
           return seastar::make_ready_future<next_step_t>(next_step_t::ready);
         });
       default: {
-        unexpected_tag(tag, conn, "post_client_reconnect");
+        unexpected_tag(ret.tag, conn, "post_client_reconnect");
         return seastar::make_ready_future<next_step_t>(next_step_t::none);
       }
     }
@@ -814,9 +716,7 @@ ProtocolV2::client_reconnect()
 void ProtocolV2::execute_connecting()
 {
   trigger_state(state_t::CONNECTING, out_state_t::delay, false);
-  if (conn.socket) {
-    conn.socket->shutdown();
-  }
+  frame_assembler.shutdown_socket();
   gated_execute("execute_connecting", [this] {
       global_seq = messenger.get_global_seq();
       assert(client_cookie != 0);
@@ -835,12 +735,10 @@ void ProtocolV2::execute_connecting()
                            conn, get_state_name(state));
             abort_protocol();
           }
-          if (conn.socket) {
-            gate.dispatch_in_background("close_sockect_connecting", *this,
-                           [sock = std::move(conn.socket)] () mutable {
-              return sock->close().then([sock = std::move(sock)] {});
-            });
-          }
+          gate.dispatch_in_background(
+              "reset_close_socket_connecting",
+              *this,
+              [this] { return frame_assembler.reset_and_close_socket(); });
           INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING);
           return Socket::connect(conn.peer_addr);
         }).then([this](SocketRef sock) {
@@ -852,12 +750,12 @@ void ProtocolV2::execute_connecting()
               abort_protocol();
             });
           }
-          conn.socket = std::move(sock);
+          frame_assembler.set_socket(std::move(sock));
           return seastar::now();
         }).then([this] {
           auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
-          session_stream_handlers = { nullptr, nullptr };
-          enable_recording();
+          frame_assembler.reset_handlers();
+          frame_assembler.start_recording();
           return banner_exchange(true);
         }).then([this] (auto&& ret) {
           auto [_peer_type, _my_addr_from_peer] = std::move(ret);
@@ -872,7 +770,8 @@ void ProtocolV2::execute_connecting()
                            conn, get_state_name(state));
             abort_protocol();
           }
-          conn.socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port());
+          frame_assembler.learn_socket_ephemeral_port_as_connector(
+              _my_addr_from_peer.get_port());
           if (unlikely(_my_addr_from_peer.is_legacy())) {
             logger().warn("{} peer sent a legacy address for me: {}",
                           conn, _my_addr_from_peer);
@@ -953,7 +852,7 @@ seastar::future<> ProtocolV2::_auth_bad_method(int r)
                 "allowed_methods={}, allowed_modes={})",
                 conn, auth_meta->auth_method, cpp_strerror(r),
                 allowed_methods, allowed_modes);
-  return write_frame(bad_method).then([this] {
+  return write_flush_frame(bad_method).then([this] {
     return server_auth();
   });
 }
@@ -979,11 +878,9 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo
     logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
                    conn, conn.peer_global_id,
                    ceph_con_mode_name(auth_meta->con_mode), reply.length());
-    return write_frame(auth_done).then([this] {
+    return write_flush_frame(auth_done).then([this] {
       ceph_assert(auth_meta);
-      bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
-      session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
-          nullptr, *auth_meta, is_rev1, true);
+      frame_assembler.create_session_stream_handlers(*auth_meta, true);
       return finish_auth();
     });
    }
@@ -992,13 +889,13 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo
     auto more = AuthReplyMoreFrame::Encode(reply);
     logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
                    conn, reply.length());
-    return write_frame(more).then([this] {
+    return write_flush_frame(more).then([this] {
       return read_main_preamble();
-    }).then([this] (Tag tag) {
-      expect_tag(Tag::AUTH_REQUEST_MORE, tag, conn, __func__);
-      return read_frame_payload();
-    }).then([this] {
-      auto auth_more = AuthRequestMoreFrame::Decode(rx_segments_data.back());
+    }).then([this](auto ret) {
+      expect_tag(Tag::AUTH_REQUEST_MORE, ret.tag, conn, __func__);
+      return frame_assembler.read_frame_payload();
+    }).then([this](auto payload) {
+      auto auth_more = AuthRequestMoreFrame::Decode(payload->back());
       logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
                      conn, auth_more.auth_payload().length());
       return _handle_auth_request(auth_more.auth_payload(), true);
@@ -1018,13 +915,13 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo
 
 seastar::future<> ProtocolV2::server_auth()
 {
-  return read_main_preamble()
-  .then([this] (Tag tag) {
-    expect_tag(Tag::AUTH_REQUEST, tag, conn, __func__);
-    return read_frame_payload();
-  }).then([this] {
+  return read_main_preamble(
+  ).then([this](auto ret) {
+    expect_tag(Tag::AUTH_REQUEST, ret.tag, conn, __func__);
+    return frame_assembler.read_frame_payload();
+  }).then([this](auto payload) {
     // handle_auth_request() logic
-    auto request = AuthRequestFrame::Decode(rx_segments_data.back());
+    auto request = AuthRequestFrame::Decode(payload->back());
     logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
                    " payload_len={}",
                    conn, request.method(), request.preferred_modes(),
@@ -1060,7 +957,7 @@ ProtocolV2::send_wait()
 {
   auto wait = WaitFrame::Encode();
   logger().debug("{} WRITE WaitFrame", conn);
-  return write_frame(wait).then([] {
+  return write_flush_frame(wait).then([] {
     return next_step_t::wait;
   });
 }
@@ -1072,9 +969,8 @@ ProtocolV2::reuse_connection(
 {
   existing_proto->trigger_replacing(reconnect,
                                     do_reset,
-                                    std::move(conn.socket),
+                                    frame_assembler.to_replace(),
                                     std::move(auth_meta),
-                                    std::move(session_stream_handlers),
                                     peer_global_seq,
                                     client_cookie,
                                     conn.get_peer_name(),
@@ -1194,9 +1090,10 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
 seastar::future<ProtocolV2::next_step_t>
 ProtocolV2::server_connect()
 {
-  return read_frame_payload().then([this] {
+  return frame_assembler.read_frame_payload(
+  ).then([this](auto payload) {
     // handle_client_ident() logic
-    auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back());
+    auto client_ident = ClientIdentFrame::Decode(payload->back());
     logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
                    " gid={}, gs={}, features_supported={},"
                    " features_required={}, flags={}, cookie={}",
@@ -1245,7 +1142,7 @@ ProtocolV2::server_connect()
       auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
       logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
                     conn, feat_missing);
-      return write_frame(ident_missing_features).then([] {
+      return write_flush_frame(ident_missing_features).then([] {
         return next_step_t::wait;
       });
     }
@@ -1278,9 +1175,9 @@ ProtocolV2::server_connect()
 seastar::future<ProtocolV2::next_step_t>
 ProtocolV2::read_reconnect()
 {
-  return read_main_preamble()
-  .then([this] (Tag tag) {
-    expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect");
+  return read_main_preamble(
+  ).then([this](auto ret) {
+    expect_tag(Tag::SESSION_RECONNECT, ret.tag, conn, "read_reconnect");
     return server_reconnect();
   });
 }
@@ -1290,7 +1187,7 @@ ProtocolV2::send_retry(uint64_t connect_seq)
 {
   auto retry = RetryFrame::Encode(connect_seq);
   logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
-  return write_frame(retry).then([this] {
+  return write_flush_frame(retry).then([this] {
     return read_reconnect();
   });
 }
@@ -1300,7 +1197,7 @@ ProtocolV2::send_retry_global(uint64_t global_seq)
 {
   auto retry = RetryGlobalFrame::Encode(global_seq);
   logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
-  return write_frame(retry).then([this] {
+  return write_flush_frame(retry).then([this] {
     return read_reconnect();
   });
 }
@@ -1310,10 +1207,10 @@ ProtocolV2::send_reset(bool full)
 {
   auto reset = ResetFrame::Encode(full);
   logger().warn("{} WRITE ResetFrame: full={}", conn, full);
-  return write_frame(reset).then([this] {
+  return write_flush_frame(reset).then([this] {
     return read_main_preamble();
-  }).then([this] (Tag tag) {
-    expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset");
+  }).then([this](auto ret) {
+    expect_tag(Tag::CLIENT_IDENT, ret.tag, conn, "post_send_reset");
     return server_connect();
   });
 }
@@ -1321,9 +1218,10 @@ ProtocolV2::send_reset(bool full)
 seastar::future<ProtocolV2::next_step_t>
 ProtocolV2::server_reconnect()
 {
-  return read_frame_payload().then([this] {
+  return frame_assembler.read_frame_payload(
+  ).then([this](auto payload) {
     // handle_reconnect() logic
-    auto reconnect = ReconnectFrame::Decode(rx_segments_data.back());
+    auto reconnect = ReconnectFrame::Decode(payload->back());
 
     logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
                    " server_cookie={}, gs={}, cs={}, msg_seq={}",
@@ -1462,9 +1360,8 @@ void ProtocolV2::execute_accepting()
       return seastar::futurize_invoke([this] {
           INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
           auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
-          session_stream_handlers = { nullptr, nullptr };
-          session_comp_handlers = { nullptr, nullptr };
-          enable_recording();
+          frame_assembler.reset_handlers();
+          frame_assembler.start_recording();
           return banner_exchange(false);
         }).then([this] (auto&& ret) {
           auto [_peer_type, _my_addr_from_peer] = std::move(ret);
@@ -1489,14 +1386,14 @@ void ProtocolV2::execute_accepting()
           return server_auth();
         }).then([this] {
           return read_main_preamble();
-        }).then([this] (Tag tag) {
-          switch (tag) {
+        }).then([this](auto ret) {
+          switch (ret.tag) {
             case Tag::CLIENT_IDENT:
               return server_connect();
             case Tag::SESSION_RECONNECT:
               return server_reconnect();
             default: {
-              unexpected_tag(tag, conn, "post_server_auth");
+              unexpected_tag(ret.tag, conn, "post_server_auth");
               return seastar::make_ready_future<next_step_t>(next_step_t::none);
             }
           }
@@ -1531,21 +1428,19 @@ seastar::future<> ProtocolV2::finish_auth()
 {
   ceph_assert(auth_meta);
 
+  auto records = frame_assembler.stop_recording();
   const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
-    auth_meta->session_key.hmac_sha256(nullptr, rxbuf);
+    auth_meta->session_key.hmac_sha256(nullptr, records.rxbuf);
   auto sig_frame = AuthSignatureFrame::Encode(sig);
-  ceph_assert(record_io);
-  record_io = false;
-  rxbuf.clear();
   logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
-  return write_frame(sig_frame).then([this] {
+  return write_flush_frame(sig_frame).then([this] {
     return read_main_preamble();
-  }).then([this] (Tag tag) {
-    expect_tag(Tag::AUTH_SIGNATURE, tag, conn, "post_finish_auth");
-    return read_frame_payload();
-  }).then([this] {
+  }).then([this](auto ret) {
+    expect_tag(Tag::AUTH_SIGNATURE, ret.tag, conn, "post_finish_auth");
+    return frame_assembler.read_frame_payload();
+  }).then([this, txbuf=std::move(records.txbuf)](auto payload) {
     // handle_auth_signature() logic
-    auto sig_frame = AuthSignatureFrame::Decode(rx_segments_data.back());
+    auto sig_frame = AuthSignatureFrame::Decode(payload->back());
     logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature());
 
     const auto actual_tx_sig = auth_meta->session_key.empty() ?
@@ -1556,7 +1451,6 @@ seastar::future<> ProtocolV2::finish_auth()
                     conn, actual_tx_sig, sig_frame.signature());
       abort_in_fault();
     }
-    txbuf.clear();
   });
 }
 
@@ -1664,16 +1558,15 @@ ProtocolV2::send_server_ident()
                  conn.policy.features_required | msgr2_required,
                  flags, server_cookie);
 
-  return write_frame(server_ident);
+  return write_flush_frame(server_ident);
 }
 
 // REPLACING state
 
 void ProtocolV2::trigger_replacing(bool reconnect,
                                    bool do_reset,
-                                   SocketRef&& new_socket,
+                                   FrameAssemblerV2::mover_t &&mover,
                                    AuthConnectionMetaRef&& new_auth_meta,
-                                   ceph::crypto::onwire::rxtx_t new_rxtx,
                                    uint64_t new_peer_global_seq,
                                    uint64_t new_client_cookie,
                                    entity_name_t new_peer_name,
@@ -1683,18 +1576,15 @@ void ProtocolV2::trigger_replacing(bool reconnect,
                                    uint64_t new_msg_seq)
 {
   trigger_state(state_t::REPLACING, out_state_t::delay, false);
-  if (conn.socket) {
-    conn.socket->shutdown();
-  }
+  frame_assembler.shutdown_socket();
   dispatchers.ms_handle_accept(
       seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
   gate.dispatch_in_background("trigger_replacing", *this,
                  [this,
                   reconnect,
                   do_reset,
-                  new_socket = std::move(new_socket),
+                  mover = std::move(mover),
                   new_auth_meta = std::move(new_auth_meta),
-                  new_rxtx = std::move(new_rxtx),
                   new_client_cookie, new_peer_name,
                   new_conn_features, new_peer_supported_features,
                   new_peer_global_seq,
@@ -1707,30 +1597,26 @@ void ProtocolV2::trigger_replacing(bool reconnect,
       return execution_done.get_future();
     }).then([this,
              reconnect,
-             new_socket = std::move(new_socket),
+             mover = std::move(mover),
              new_auth_meta = std::move(new_auth_meta),
-             new_rxtx = std::move(new_rxtx),
              new_client_cookie, new_peer_name,
              new_conn_features, new_peer_supported_features,
              new_peer_global_seq,
              new_connect_seq, new_msg_seq] () mutable {
       if (unlikely(state != state_t::REPLACING)) {
-        return new_socket->close().then([sock = std::move(new_socket)] {
+        return mover.socket->close(
+        ).then([sock = std::move(mover.socket)] {
           abort_protocol();
         });
       }
 
-      if (conn.socket) {
-        gate.dispatch_in_background("close_socket_replacing", *this,
-                       [sock = std::move(conn.socket)] () mutable {
-          return sock->close().then([sock = std::move(sock)] {});
-        });
-      }
-      conn.socket = std::move(new_socket);
+      gate.dispatch_in_background(
+          "reset_close_socket_replacing",
+          *this,
+          [this] { return frame_assembler.reset_and_close_socket(); });
       auth_meta = std::move(new_auth_meta);
-      session_stream_handlers = std::move(new_rxtx);
-      record_io = false;
       peer_global_seq = new_peer_global_seq;
+      frame_assembler.replace_by(std::move(mover));
 
       if (reconnect) {
         connect_seq = new_connect_seq;
@@ -1738,7 +1624,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
         requeue_out_sent_up_to(new_msg_seq);
         auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq());
         logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq());
-        return write_frame(reconnect_ok);
+        return write_flush_frame(reconnect_ok);
       } else {
         client_cookie = new_client_cookie;
         assert(conn.get_peer_type() == new_peer_name.type());
@@ -1748,8 +1634,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
         conn.set_features(new_conn_features);
         peer_supported_features = new_peer_supported_features;
         bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
-        tx_frame_asm.set_is_rev1(is_rev1);
-        rx_frame_asm.set_is_rev1(is_rev1);
+        frame_assembler.set_is_rev1(is_rev1);
         return send_server_ident();
       }
     }).then([this, reconnect] {
@@ -1790,20 +1675,17 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
 
   if (unlikely(require_keepalive)) {
     auto keepalive_frame = KeepAliveFrame::Encode();
-    bl.append(keepalive_frame.get_buffer(tx_frame_asm));
-    INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
+    bl.append(get_buffer(keepalive_frame));
   }
 
   if (unlikely(maybe_keepalive_ack.has_value())) {
     auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
-    bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm));
-    INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
+    bl.append(get_buffer(keepalive_ack_frame));
   }
 
   if (require_ack && num_msgs == 0u) {
     auto ack_frame = AckFrame::Encode(get_in_seq());
-    bl.append(ack_frame.get_buffer(tx_frame_asm));
-    INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
+    bl.append(get_buffer(ack_frame));
   }
 
   std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) {
@@ -1831,22 +1713,20 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
         msg->get_payload(), msg->get_middle(), msg->get_data());
     logger().debug("{} --> #{} === {} ({})",
                   conn, msg->get_seq(), *msg, msg->get_type());
-    bl.append(message.get_buffer(tx_frame_asm));
-    INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
+    bl.append(get_buffer(message));
   });
 
   return bl;
 }
 
-seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
+seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp, std::size_t msg_size)
 {
-  return read_frame_payload()
-  .then([this, throttle_stamp] {
+  return frame_assembler.read_frame_payload(
+  ).then([this, throttle_stamp, msg_size](auto payload) {
     utime_t recv_stamp{seastar::lowres_system_clock::now()};
 
     // we need to get the size before std::moving segments data
-    const size_t cur_msg_size = get_current_msg_size();
-    auto msg_frame = MessageFrame::Decode(rx_segments_data);
+    auto msg_frame = MessageFrame::Decode(*payload);
     // XXX: paranoid copy just to avoid oops
     ceph_msg_header2 current_header = msg_frame.header();
 
@@ -1883,7 +1763,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
 
     // store reservation size in message, so we don't get confused
     // by messages entering the dispatch queue through other paths.
-    message->set_dispatch_throttle_size(cur_msg_size);
+    message->set_dispatch_throttle_size(msg_size);
 
     message->set_throttle_stamp(throttle_stamp);
     message->set_recv_stamp(recv_stamp);
@@ -1941,10 +1821,11 @@ void ProtocolV2::execute_ready(bool dispatch_connect)
   gated_execute("execute_ready", [this] {
     protocol_timer.cancel();
     return seastar::keep_doing([this] {
-      return read_main_preamble()
-      .then([this] (Tag tag) {
-        switch (tag) {
+      return read_main_preamble(
+      ).then([this](auto ret) {
+        switch (ret.tag) {
           case Tag::MESSAGE: {
+            size_t msg_size = get_msg_size(*ret.rx_frame_asm);
             return seastar::futurize_invoke([this] {
               // throttle_message() logic
               if (!conn.policy.throttler_messages) {
@@ -1953,46 +1834,48 @@ void ProtocolV2::execute_ready(bool dispatch_connect)
               // TODO: message throttler
               ceph_assert(false);
               return seastar::now();
-            }).then([this] {
+            }).then([this, msg_size] {
               // throttle_bytes() logic
               if (!conn.policy.throttler_bytes) {
                 return seastar::now();
               }
-              size_t cur_msg_size = get_current_msg_size();
-              if (!cur_msg_size) {
+              if (!msg_size) {
                 return seastar::now();
               }
               logger().trace("{} wants {} bytes from policy throttler {}/{}",
-                             conn, cur_msg_size,
+                             conn, msg_size,
                              conn.policy.throttler_bytes->get_current(),
                              conn.policy.throttler_bytes->get_max());
-              return conn.policy.throttler_bytes->get(cur_msg_size);
-            }).then([this] {
+              return conn.policy.throttler_bytes->get(msg_size);
+            }).then([this, msg_size] {
               // TODO: throttle_dispatch_queue() logic
               utime_t throttle_stamp{seastar::lowres_system_clock::now()};
-              return read_message(throttle_stamp);
+              return read_message(throttle_stamp, msg_size);
             });
           }
           case Tag::ACK:
-            return read_frame_payload().then([this] {
+            return frame_assembler.read_frame_payload(
+            ).then([this](auto payload) {
               // handle_message_ack() logic
-              auto ack = AckFrame::Decode(rx_segments_data.back());
+              auto ack = AckFrame::Decode(payload->back());
               logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
               ack_out_sent(ack.seq());
             });
           case Tag::KEEPALIVE2:
-            return read_frame_payload().then([this] {
+            return frame_assembler.read_frame_payload(
+            ).then([this](auto payload) {
               // handle_keepalive2() logic
-              auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back());
+              auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
               logger().debug("{} GOT KeepAliveFrame: timestamp={}",
                              conn, keepalive_frame.timestamp());
               notify_keepalive_ack(keepalive_frame.timestamp());
               set_last_keepalive(seastar::lowres_system_clock::now());
             });
           case Tag::KEEPALIVE2_ACK:
-            return read_frame_payload().then([this] {
+            return frame_assembler.read_frame_payload(
+            ).then([this](auto payload) {
               // handle_keepalive2_ack() logic
-              auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back());
+              auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
               auto _last_keepalive_ack =
                 seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()};
               set_last_keepalive_ack(_last_keepalive_ack);
@@ -2000,7 +1883,7 @@ void ProtocolV2::execute_ready(bool dispatch_connect)
                              conn, _last_keepalive_ack);
             });
           default: {
-            unexpected_tag(tag, conn, "execute_ready");
+            unexpected_tag(ret.tag, conn, "execute_ready");
             return seastar::now();
           }
         }
@@ -2023,9 +1906,7 @@ void ProtocolV2::execute_ready(bool dispatch_connect)
 void ProtocolV2::execute_standby()
 {
   trigger_state(state_t::STANDBY, out_state_t::delay, false);
-  if (conn.socket) {
-    conn.socket->shutdown();
-  }
+  frame_assembler.shutdown_socket();
 }
 
 void ProtocolV2::notify_out()
@@ -2042,9 +1923,7 @@ void ProtocolV2::notify_out()
 void ProtocolV2::execute_wait(bool max_backoff)
 {
   trigger_state(state_t::WAIT, out_state_t::delay, false);
-  if (conn.socket) {
-    conn.socket->shutdown();
-  }
+  frame_assembler.shutdown_socket();
   gated_execute("execute_wait", [this, max_backoff] {
     double backoff = protocol_timer.last_dur();
     if (max_backoff) {
@@ -2077,7 +1956,7 @@ void ProtocolV2::execute_server_wait()
 {
   trigger_state(state_t::SERVER_WAIT, out_state_t::none, false);
   gated_execute("execute_server_wait", [this] {
-    return read_exactly(1).then([this] (auto bl) {
+    return frame_assembler.read_exactly(1).then([this] (auto bl) {
       logger().warn("{} SERVER_WAIT got read, abort", conn);
       abort_in_fault();
     }).handle_exception([this] (std::exception_ptr eptr) {
@@ -2154,9 +2033,7 @@ void ProtocolV2::do_close(
   if (f_accept_new) {
     (*f_accept_new)();
   }
-  if (conn.socket) {
-    conn.socket->shutdown();
-  }
+  frame_assembler.shutdown_socket();
   assert(!gate.is_closed());
   auto gate_closed = gate.close();
   auto out_closed = close_out();
@@ -2172,11 +2049,7 @@ void ProtocolV2::do_close(
   closed_clean_fut = seastar::when_all(
       std::move(gate_closed), std::move(out_closed)
   ).discard_result().then([this] {
-    if (conn.socket) {
-      return conn.socket->close();
-    } else {
-      return seastar::now();
-    }
+    return frame_assembler.reset_and_close_socket(false);
   }).then([this] {
     logger().debug("{} closed!", conn);
     messenger.closed_conn(
index 47584e702a8f6e44675b4815f50787ef783fa4fa..3c2b5e67434a94762be35147e2b94674d75c2395 100644 (file)
@@ -6,9 +6,6 @@
 #include <seastar/core/sleep.hh>
 
 #include "Protocol.h"
-#include "msg/async/frames_v2.h"
-#include "msg/async/crypto_onwire.h"
-#include "msg/async/compression_onwire.h"
 
 namespace crimson::net {
 
@@ -139,37 +136,17 @@ class ProtocolV2 final : public Protocol {
   };
   Timer protocol_timer;
 
- // TODO: Frame related implementations, probably to a separate class.
  private:
-  bool record_io = false;
-  ceph::bufferlist rxbuf;
-  ceph::bufferlist txbuf;
-
-  void enable_recording();
-  seastar::future<Socket::tmp_buf> read_exactly(size_t bytes);
-  seastar::future<bufferlist> read(size_t bytes);
-  seastar::future<> write(bufferlist&& buf);
-  seastar::future<> write_flush(bufferlist&& buf);
-
-  ceph::crypto::onwire::rxtx_t session_stream_handlers;
-  ceph::compression::onwire::rxtx_t session_comp_handlers;
-  ceph::msgr::v2::FrameAssembler tx_frame_asm{
-    &session_stream_handlers, false, common::local_conf()->ms_crc_data,
-    &session_comp_handlers};
-  ceph::msgr::v2::FrameAssembler rx_frame_asm{
-    &session_stream_handlers, false, common::local_conf()->ms_crc_data,
-    &session_comp_handlers};
-  ceph::bufferlist rx_preamble;
-  ceph::msgr::v2::segment_bls_t rx_segments_data;
-
-  size_t get_current_msg_size() const;
-  seastar::future<ceph::msgr::v2::Tag> read_main_preamble();
-  seastar::future<> read_frame_payload();
+  seastar::future<FrameAssemblerV2::read_main_t> read_main_preamble();
+
   template <class F>
-  seastar::future<> write_frame(F &frame, bool flush=true);
+  ceph::bufferlist get_buffer(F &tx_frame);
+
+  template <class F>
+  seastar::future<> write_flush_frame(F &tx_frame);
 
- private:
   void fault(bool backoff, const char* func_name, std::exception_ptr eptr);
+
   void reset_session(bool is_full);
   seastar::future<std::tuple<entity_type_t, entity_addr_t>>
   banner_exchange(bool is_connect);
@@ -229,9 +206,8 @@ class ProtocolV2 final : public Protocol {
   // REPLACING (server)
   void trigger_replacing(bool reconnect,
                          bool do_reset,
-                         SocketRef&& new_socket,
+                         FrameAssemblerV2::mover_t &&mover,
                          AuthConnectionMetaRef&& new_auth_meta,
-                         ceph::crypto::onwire::rxtx_t new_rxtx,
                          uint64_t new_peer_global_seq,
                          // !reconnect
                          uint64_t new_client_cookie,
@@ -243,7 +219,7 @@ class ProtocolV2 final : public Protocol {
                          uint64_t new_msg_seq);
 
   // READY
-  seastar::future<> read_message(utime_t throttle_stamp);
+  seastar::future<> read_message(utime_t throttle_stamp, std::size_t msg_size);
   void execute_ready(bool dispatch_connect);
 
   // STANDBY
index 76703c76337c4e90abdc5170e7094576cd006ab5..94d98302bdf6e2ba1c2e4a4e52a797624fa97255 100644 (file)
@@ -180,6 +180,7 @@ private:
 
   friend class Protocol;
   friend class ProtocolV2;
+  friend class FrameAssemblerV2;
 };
 
 } // namespace crimson::net