]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async: msgr2: extract frame classes into its own header
authorRicardo Dias <rdias@suse.com>
Mon, 25 Feb 2019 16:23:04 +0000 (16:23 +0000)
committerRicardo Dias <rdias@suse.com>
Mon, 25 Feb 2019 16:43:38 +0000 (16:43 +0000)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h
src/msg/async/frames_v2.h [new file with mode: 0644]

index 1829336f1bbe984d79e6df34053807f44761eba5..d4e9ed7fb9b19c8caad45f1e1950e2dac4f601d0 100644 (file)
@@ -29,22 +29,7 @@ ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
                 << ").";
 }
 
-// We require these features from any peer, period, in order to encode
-// a entity_addrvec_t.
-const uint64_t msgr2_required = CEPH_FEATUREMASK_MSG_ADDR2;
-
-// We additionally assume the peer has the below features *purely for
-// the purpose of encoding the frames themselves*.  The only complex
-// types in the frames are entity_addr_t and entity_addrvec_t, and we
-// specifically want the peer to understand the (new in nautilus)
-// TYPE_ANY.  We treat narrow this assumption to frames because we
-// expect there may be future clients (the kernel) that understand
-// msgr v2 and understand this encoding but don't necessarily have
-// everything else that SERVER_NAUTILUS implies.  Yes, a fresh feature
-// bit would be a cleaner approach, but those are scarce these days.
-const uint64_t msgr2_frame_assumed =
-                  msgr2_required |
-                  CEPH_FEATUREMASK_SERVER_NAUTILUS;
+using namespace ceph::msgr::v2;
 
 using CtPtr = Ct<ProtocolV2> *;
 
@@ -104,460 +89,6 @@ static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off)
   data.push_back(std::move(ptr));
 }
 
-/**
- * Protocol V2 Frame Structures
- **/
-
-static constexpr uint8_t CRYPTO_BLOCK_SIZE { 16 };
-
-using segment_t = ProtocolV2::segment_t;
-
-// V2 preamble consists of one or more preamble blocks depending on
-// the number of segments a particular frame needs. Each block holds
-// up to MAX_NUM_SEGMENTS segments and has its own CRC.
-//
-// XXX: currently the multi-segment facility is NOT implemented.
-struct preamble_block_t {
-  static constexpr std::size_t MAX_NUM_SEGMENTS = 4;
-
-  // ProtocolV2::Tag. For multi-segmented frames the value is the same
-  // between subsequent preamble blocks.
-  __u8 tag;
-
-  // Number of segments to go in entire frame. First preable block has
-  // set this to just #segments, second #segments - MAX_NUM_SEGMENTS,
-  // third to #segments - MAX_NUM_SEGMENTS and so on.
-  __u8 num_segments;
-
-  std::array<ProtocolV2::segment_t, MAX_NUM_SEGMENTS> segments;
-  __u8 _reserved[2];
-
-  // CRC32 for this single preamble block.
-  __le32 crc;
-} __attribute__((packed));
-static_assert(sizeof(preamble_block_t) % CRYPTO_BLOCK_SIZE == 0);
-static_assert(std::is_standard_layout<preamble_block_t>::value);
-
-
-static constexpr uint32_t FRAME_PREAMBLE_SIZE = sizeof(preamble_block_t);
-
-template <class T>
-struct Frame {
-protected:
-  ceph::bufferlist payload;
-  ceph::bufferlist::contiguous_filler preamble_filler;
-
-  void fill_preamble(
-    const std::initializer_list<segment_t> main_segments)
-  {
-    ceph_assert(
-      std::size(main_segments) <= preamble_block_t::MAX_NUM_SEGMENTS);
-
-    // Craft the main preamble. It's always present regardless of the number
-    // of segments message is composed from. This doesn't apply to extra one
-    // as it's optional -- if there is up to 2 segments, we'll never transmit
-    // preamble_extra_t;
-    {
-      preamble_block_t main_preamble;
-      // TODO: we might fill/pad with pseudo-random data.
-      ::memset(&main_preamble, 0, sizeof(main_preamble));
-
-      main_preamble.num_segments = std::size(main_segments);
-      main_preamble.tag = static_cast<__u8>(T::tag);
-      ceph_assert(main_preamble.tag != 0);
-
-      std::copy(std::cbegin(main_segments), std::cend(main_segments),
-               std::begin(main_preamble.segments));
-
-      main_preamble.crc = ceph_crc32c(0,
-       reinterpret_cast<unsigned char*>(&main_preamble),
-       sizeof(main_preamble) - sizeof(main_preamble.crc));
-
-      preamble_filler.copy_in(sizeof(main_preamble),
-                             reinterpret_cast<const char*>(&main_preamble));
-    }
-  }
-
-public:
-  Frame() : preamble_filler(payload.append_hole(FRAME_PREAMBLE_SIZE)) {}
-
-  ceph::bufferlist &get_buffer() {
-    fill_preamble({
-      segment_t{ payload.length() - FRAME_PREAMBLE_SIZE,
-                segment_t::DEFAULT_ALIGNMENT }
-    });
-    return payload;
-  }
-
-  void decode_frame(const ceph::bufferlist& bl) {
-    auto ti = bl.cbegin();
-    static_cast<T *>(this)->decode_payload(ti);
-  }
-
-  void decode_payload(bufferlist::const_iterator &ti) {}
-};
-
-// TODO, FIXME: fix this altogether with the Frame hierarchy rework
-struct do_not_encode_tag_t {};
-struct dummy_ctor_conflict_helper {};
-
-template <class C, typename... Args>
-struct PayloadFrame : public Frame<C> {
-protected:
-  // this tuple is only used when decoding values from a payload buffer
-  std::tuple<Args...> _values;
-
-  // FIXME: for now, we assume specific features for the purpoess of encoding
-  // the frames themselves (*not* messages in message frames!).
-  uint64_t features = msgr2_frame_assumed;
-
-  template <typename T>
-  inline void _encode_payload_each(T &t) {
-    if constexpr (std::is_same<T, bufferlist const>()) {
-      this->payload.claim_append((bufferlist &)t);
-    } else if constexpr (std::is_same<T, std::vector<uint32_t> const>()) {
-      encode((uint32_t)t.size(), this->payload, features);
-      for (const auto &elem : t) {
-        encode(elem, this->payload, features);
-      }
-    } else if constexpr (std::is_same<T, ceph_msg_header2 const>()) {
-      this->payload.append((char *)&t, sizeof(t));
-    } else if constexpr (std::is_same<T, dummy_ctor_conflict_helper const>()) {
-      /* NOP, only to discriminate ctors for decode/encode. FIXME. */
-    } else {
-      encode(t, this->payload, features);
-    }
-  }
-
-  template <typename T>
-  inline void _decode_payload_each(T &t, bufferlist::const_iterator &ti) const {
-    if constexpr (std::is_same<T, bufferlist>()) {
-      if (ti.get_remaining()) {
-        t.append(ti.get_current_ptr());
-      }
-    } else if constexpr (std::is_same<T, std::vector<uint32_t>>()) {
-      uint32_t size;
-      decode(size, ti);
-      t.resize(size);
-      for (uint32_t i = 0; i < size; ++i) {
-        decode(t[i], ti);
-      }
-    } else if constexpr (std::is_same<T, ceph_msg_header2>()) {
-      auto ptr = ti.get_current_ptr();
-      ti.advance(sizeof(T));
-      t = *(T *)ptr.raw_c_str();
-    } else if constexpr (std::is_same<T, dummy_ctor_conflict_helper>()) {
-      /* NOP, only to discriminate ctors for decode/encode. FIXME. */
-    } else {
-      decode(t, ti);
-    }
-  }
-
-  template <std::size_t... Is>
-  inline void _decode_payload(bufferlist::const_iterator &ti,
-                              std::index_sequence<Is...>) const {
-    (_decode_payload_each((Args &)std::get<Is>(_values), ti), ...);
-  }
-
-  template <std::size_t N>
-  inline decltype(auto) get_val() {
-    return std::get<N>(_values);
-  }
-
-public:
-  PayloadFrame(const Args &... args) {
-    (_encode_payload_each(args), ...);
-  }
-
-  PayloadFrame(do_not_encode_tag_t) {}
-
-  PayloadFrame(const ceph::bufferlist &payload) {
-    this->decode_frame(payload);
-  }
-
-  void decode_payload(bufferlist::const_iterator &ti) {
-    _decode_payload(ti, std::index_sequence_for<Args...>());
-  }
-};
-
-struct HelloFrame : public PayloadFrame<HelloFrame,
-                                        uint8_t,          // entity type
-                                        entity_addr_t> {  // peer_addr
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::HELLO;
-  using PayloadFrame::PayloadFrame;
-
-  inline uint8_t &entity_type() { return get_val<0>(); }
-  inline entity_addr_t &peer_addr() { return get_val<1>(); }
-};
-
-struct AuthRequestFrame
-  : public PayloadFrame<AuthRequestFrame,
-                       uint32_t, vector<uint32_t>, bufferlist> {
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_REQUEST;
-  using PayloadFrame::PayloadFrame;
-
-  inline uint32_t &method() { return get_val<0>(); }
-  inline vector<uint32_t> &preferred_modes() { return get_val<1>(); }
-  inline bufferlist &auth_payload() { return get_val<2>(); }
-};
-
-struct AuthBadMethodFrame
-  : public PayloadFrame<AuthBadMethodFrame,
-                       uint32_t, // method
-                       int32_t,  // result
-                       std::vector<uint32_t>,   // allowed_methods
-                       std::vector<uint32_t>> { // allowed_modes
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_BAD_METHOD;
-  using PayloadFrame::PayloadFrame;
-
-  inline uint32_t &method() { return get_val<0>(); }
-  inline int32_t &result() { return get_val<1>(); }
-  inline std::vector<uint32_t> &allowed_methods() { return get_val<2>(); }
-  inline std::vector<uint32_t> &allowed_modes() { return get_val<3>(); }
-};
-
-struct AuthReplyMoreFrame
-    : public PayloadFrame<AuthReplyMoreFrame,
-                         dummy_ctor_conflict_helper, bufferlist> {
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_REPLY_MORE;
-  using PayloadFrame::PayloadFrame;
-
-  inline bufferlist &auth_payload() { return get_val<1>(); }
-};
-
-struct AuthRequestMoreFrame
-    : public PayloadFrame<AuthRequestMoreFrame,
-                         dummy_ctor_conflict_helper, bufferlist> {
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_REQUEST_MORE;
-  using PayloadFrame::PayloadFrame;
-
-  inline bufferlist &auth_payload() { return get_val<1>(); }
-};
-
-struct AuthDoneFrame
-  : public PayloadFrame<AuthDoneFrame,
-                       uint64_t, // global_id
-                       uint32_t, // con_mode
-                       bufferlist> { // auth method payload
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_DONE;
-  using PayloadFrame::PayloadFrame;
-
-  inline uint64_t &global_id() { return get_val<0>(); }
-  inline uint32_t &con_mode() { return get_val<1>(); }
-  inline bufferlist &auth_payload() { return get_val<2>(); }
-};
-
-template <class T, typename... Args>
-struct SignedEncryptedFrame : public PayloadFrame<T, Args...> {
-  ceph::bufferlist &get_buffer() {
-    // In contrast to Frame::get_buffer() we don't fill preamble here.
-    return this->payload;
-  }
-
-  SignedEncryptedFrame(ProtocolV2 &protocol, const Args &... args)
-      : PayloadFrame<T, Args...>(args...)
-  {
-    // FIXME: plainsize -> ciphersize; for AES-GCM they are equall apart
-    // from auth tag size
-    this->fill_preamble({
-      segment_t{ this->payload.length() - FRAME_PREAMBLE_SIZE,
-                segment_t::DEFAULT_ALIGNMENT }
-    });
-
-    if (protocol.session_stream_handlers.tx) {
-      ceph_assert(protocol.session_stream_handlers.tx);
-      protocol.session_stream_handlers.tx->reset_tx_handler({
-       this->payload.length()
-      });
-
-      protocol.session_stream_handlers.tx->authenticated_encrypt_update(
-       std::move(this->payload));
-      this->payload = \
-       protocol.session_stream_handlers.tx->authenticated_encrypt_final();
-    }
-  }
-
-  SignedEncryptedFrame(ProtocolV2 &protocol, ceph::bufferlist& bl)
-      : PayloadFrame<T, Args...>(do_not_encode_tag_t{})
-  {
-    if (!protocol.session_stream_handlers.rx) {
-      this->decode_frame(bl);
-      return;
-    }
-
-    const auto length = bl.length();
-    ceph::bufferlist plain_bl = \
-      protocol.session_stream_handlers.rx->authenticated_decrypt_update_final(
-        std::move(bl), ProtocolV2::segment_t::DEFAULT_ALIGNMENT);
-    ceph_assert(plain_bl.length() == length - \
-       protocol.session_stream_handlers.rx->get_extra_size_at_final());
-    this->decode_frame(plain_bl);
-  }
-};
-
-struct ClientIdentFrame
-    : public SignedEncryptedFrame<ClientIdentFrame, 
-                                  entity_addrvec_t,  // my addresses
-                                  entity_addr_t,  // target address
-                                  int64_t,  // global_id
-                                  uint64_t,  // global seq
-                                  uint64_t,  // supported features
-                                  uint64_t,  // required features
-                                  uint64_t,  // flags
-                                  uint64_t> {  // client cookie
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::CLIENT_IDENT;
-  using SignedEncryptedFrame::SignedEncryptedFrame;
-
-  inline entity_addrvec_t &addrs() { return get_val<0>(); }
-  inline entity_addr_t &target_addr() { return get_val<1>(); }
-  inline int64_t &gid() { return get_val<2>(); }
-  inline uint64_t &global_seq() { return get_val<3>(); }
-  inline uint64_t &supported_features() { return get_val<4>(); }
-  inline uint64_t &required_features() { return get_val<5>(); }
-  inline uint64_t &flags() { return get_val<6>(); }
-  inline uint64_t &cookie() { return get_val<7>(); }
-};
-
-struct ServerIdentFrame
-    : public SignedEncryptedFrame<ServerIdentFrame, entity_addrvec_t,
-                                  int64_t, uint64_t, uint64_t,
-                                  uint64_t, uint64_t, uint64_t> {
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::SERVER_IDENT;
-  using SignedEncryptedFrame::SignedEncryptedFrame;
-
-  inline entity_addrvec_t &addrs() { return get_val<0>(); }
-  inline int64_t &gid() { return get_val<1>(); }
-  inline uint64_t &global_seq() { return get_val<2>(); }
-  inline uint64_t &supported_features() { return get_val<3>(); }
-  inline uint64_t &required_features() { return get_val<4>(); }
-  inline uint64_t &flags() { return get_val<5>(); }
-  inline uint64_t &cookie() { return get_val<6>(); }
-};
-
-struct ReconnectFrame
-    : public SignedEncryptedFrame<ReconnectFrame, 
-                                  entity_addrvec_t,  // my addresses
-                                  uint64_t,  // client cookie
-                                  uint64_t,  // server cookie
-                                  uint64_t,  // global sequence
-                                  uint64_t,  // connect sequence
-                                  uint64_t> { // message sequence
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RECONNECT;
-  using SignedEncryptedFrame::SignedEncryptedFrame;
-
-  inline entity_addrvec_t &addrs() { return get_val<0>(); }
-  inline uint64_t &client_cookie() { return get_val<1>(); }
-  inline uint64_t &server_cookie() { return get_val<2>(); }
-  inline uint64_t &global_seq() { return get_val<3>(); }
-  inline uint64_t &connect_seq() { return get_val<4>(); }
-  inline uint64_t &msg_seq() { return get_val<5>(); }
-};
-
-struct ResetFrame : public SignedEncryptedFrame<ResetFrame, bool> {
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RESET;
-  using SignedEncryptedFrame::SignedEncryptedFrame;
-
-  inline bool &full() { return get_val<0>(); }
-};
-
-struct RetryFrame : public SignedEncryptedFrame<RetryFrame, uint64_t> {
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RETRY;
-  using SignedEncryptedFrame::SignedEncryptedFrame;
-
-  inline uint64_t &connect_seq() { return get_val<0>(); }
-};
-
-struct RetryGlobalFrame
-    : public SignedEncryptedFrame<RetryGlobalFrame, uint64_t> {
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RETRY_GLOBAL;
-  using SignedEncryptedFrame::SignedEncryptedFrame;
-
-  inline uint64_t &global_seq() { return get_val<0>(); }
-};
-
-struct WaitFrame : public SignedEncryptedFrame<WaitFrame> {
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::WAIT;
-  using SignedEncryptedFrame::SignedEncryptedFrame;
-};
-
-struct ReconnectOkFrame
-    : public SignedEncryptedFrame<ReconnectOkFrame, uint64_t> {
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RECONNECT_OK;
-  using SignedEncryptedFrame::SignedEncryptedFrame;
-
-  inline uint64_t &msg_seq() { return get_val<0>(); }
-};
-
-struct IdentMissingFeaturesFrame
-    : public SignedEncryptedFrame<IdentMissingFeaturesFrame, uint64_t> {
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::IDENT_MISSING_FEATURES;
-  using SignedEncryptedFrame::SignedEncryptedFrame;
-
-  inline uint64_t &features() { return get_val<0>(); }
-};
-
-struct KeepAliveFrame : public SignedEncryptedFrame<KeepAliveFrame, utime_t> {
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::KEEPALIVE2;
-  using SignedEncryptedFrame::SignedEncryptedFrame;
-
-  KeepAliveFrame(ProtocolV2 &protocol)
-      : KeepAliveFrame(protocol, ceph_clock_now()) {}
-
-  inline utime_t &timestamp() { return get_val<0>(); }
-};
-
-struct KeepAliveFrameAck
-    : public SignedEncryptedFrame<KeepAliveFrameAck, utime_t> {
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::KEEPALIVE2_ACK;
-  using SignedEncryptedFrame::SignedEncryptedFrame;
-
-  inline utime_t &timestamp() { return get_val<0>(); }
-};
-
-struct AckFrame : public SignedEncryptedFrame<AckFrame, uint64_t> {
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::ACK;
-  using SignedEncryptedFrame::SignedEncryptedFrame;
-
-  inline uint64_t &seq() { return get_val<0>(); }
-};
-
-// This class is used for encoding/decoding header of the message frame.
-// Body is processed almost independently with the sole junction point
-// being the `extra_payload_len` passed to get_buffer().
-struct MessageHeaderFrame
-    : public PayloadFrame<MessageHeaderFrame, ceph_msg_header2> {
-  static const ProtocolV2::Tag tag = ProtocolV2::Tag::MESSAGE;
-
-  ceph::bufferlist &get_buffer() {
-    // In contrast to Frame::get_buffer() we don't fill preamble here.
-    return this->payload;
-  }
-
-  MessageHeaderFrame(const ceph_msg_header2 &msghdr,
-                    const uint32_t front_len,
-                    const uint32_t middle_len,
-                    const uint32_t data_len)
-      : PayloadFrame<MessageHeaderFrame, ceph_msg_header2>(msghdr)
-  {
-    // FIXME: plainsize -> ciphersize; for AES-GCM they are equall apart from auth tag size
-    fill_preamble({
-      segment_t{ this->payload.length() - FRAME_PREAMBLE_SIZE,
-                segment_t::DEFAULT_ALIGNMENT },
-      segment_t{ front_len, segment_t::DEFAULT_ALIGNMENT },
-      segment_t{ middle_len, segment_t::DEFAULT_ALIGNMENT },
-      segment_t{ data_len, segment_t::DEFERRED_ALLOCATION },
-    });
-  }
-
-  MessageHeaderFrame(ceph::bufferlist&& text)
-      : PayloadFrame<MessageHeaderFrame, ceph_msg_header2>(do_not_encode_tag_t{})
-  {
-    this->decode_frame(text);
-  }
-
-  inline ceph_msg_header2 &header() { return get_val<0>(); }
-};
-
 ProtocolV2::ProtocolV2(AsyncConnection *connection)
     : Protocol(2, connection),
       temp_buffer(nullptr),
@@ -1058,12 +589,12 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
 
 void ProtocolV2::append_keepalive() {
   ldout(cct, 10) << __func__ << dendl;
-  KeepAliveFrame keepalive_frame(*this);
+  KeepAliveFrame keepalive_frame(session_stream_handlers);
   connection->outcoming_bl.claim_append(keepalive_frame.get_buffer());
 }
 
 void ProtocolV2::append_keepalive_ack(utime_t &timestamp) {
-  KeepAliveFrameAck keepalive_ack_frame(*this, timestamp);
+  KeepAliveFrameAck keepalive_ack_frame(session_stream_handlers, timestamp);
   connection->outcoming_bl.claim_append(keepalive_ack_frame.get_buffer());
 }
 
@@ -1143,7 +674,7 @@ void ProtocolV2::write_event() {
       if (left) {
         ceph_le64 s;
         s = in_seq;
-        AckFrame ack(*this, in_seq);
+        AckFrame ack(session_stream_handlers, in_seq);
         connection->outcoming_bl.claim_append(ack.get_buffer());
         ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
                        << " messages" << dendl;
@@ -2145,7 +1676,7 @@ CtPtr ProtocolV2::handle_keepalive2(ceph::bufferlist &payload)
   ldout(cct, 20) << __func__
                 << " payload.length()=" << payload.length() << dendl;
 
-  KeepAliveFrame keepalive_frame(*this, payload);
+  KeepAliveFrame keepalive_frame(session_stream_handlers, payload);
 
   ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
 
@@ -2169,7 +1700,7 @@ CtPtr ProtocolV2::handle_keepalive2_ack(ceph::bufferlist &payload)
   ldout(cct, 20) << __func__
                 << " payload.length()=" << payload.length() << dendl;
 
-  KeepAliveFrameAck keepalive_ack_frame(*this, payload);
+  KeepAliveFrameAck keepalive_ack_frame(session_stream_handlers, payload);
   connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp());
   ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
 
@@ -2181,7 +1712,7 @@ CtPtr ProtocolV2::handle_message_ack(ceph::bufferlist &payload)
   ldout(cct, 20) << __func__
                 << " payload.length()=" << payload.length() << dendl;
 
-  AckFrame ack(*this, payload);
+  AckFrame ack(session_stream_handlers, payload);
   handle_message_ack(ack.seq());
   return CONTINUE(read_frame);
 }
@@ -2375,7 +1906,7 @@ CtPtr ProtocolV2::send_client_ident() {
     }
   }
 
-  ClientIdentFrame client_ident(*this, messenger->get_myaddrs(),
+  ClientIdentFrame client_ident(session_stream_handlers, messenger->get_myaddrs(),
                                 connection->target_addr,
                                 messenger->get_myname().num(), global_seq,
                                 connection->policy.features_supported,
@@ -2402,7 +1933,7 @@ CtPtr ProtocolV2::send_client_ident() {
 CtPtr ProtocolV2::send_reconnect() {
   ldout(cct, 20) << __func__ << dendl;
 
-  ReconnectFrame reconnect(*this, messenger->get_myaddrs(),
+  ReconnectFrame reconnect(session_stream_handlers, messenger->get_myaddrs(),
                            client_cookie,
                            server_cookie,
                            global_seq,
@@ -2425,7 +1956,7 @@ CtPtr ProtocolV2::handle_ident_missing_features(ceph::bufferlist &payload)
   ldout(cct, 20) << __func__
                 << " payload.length()=" << payload.length() << dendl;
 
-  IdentMissingFeaturesFrame ident_missing(*this, payload);
+  IdentMissingFeaturesFrame ident_missing(session_stream_handlers, payload);
   lderr(cct) << __func__
              << " client does not support all server features: " << std::hex
              << ident_missing.features() << std::dec << dendl;
@@ -2438,7 +1969,7 @@ CtPtr ProtocolV2::handle_session_reset(ceph::bufferlist &payload)
   ldout(cct, 20) << __func__
                 << " payload.length()=" << payload.length() << dendl;
 
-  ResetFrame reset(*this, payload);
+  ResetFrame reset(session_stream_handlers, payload);
 
   ldout(cct, 1) << __func__ << " received session reset full=" << reset.full()
                 << dendl;
@@ -2458,7 +1989,7 @@ CtPtr ProtocolV2::handle_session_retry(ceph::bufferlist &payload)
   ldout(cct, 20) << __func__
                 << " payload.length()=" << payload.length() << dendl;
 
-  RetryFrame retry(*this, payload);
+  RetryFrame retry(session_stream_handlers, payload);
   connect_seq = retry.connect_seq() + 1;
 
   ldout(cct, 1) << __func__
@@ -2473,7 +2004,7 @@ CtPtr ProtocolV2::handle_session_retry_global(ceph::bufferlist &payload)
   ldout(cct, 20) << __func__
                 << " payload.length()=" << payload.length() << dendl;
 
-  RetryGlobalFrame retry(*this, payload);
+  RetryGlobalFrame retry(session_stream_handlers, payload);
   global_seq = messenger->get_global_seq(retry.global_seq());
 
   ldout(cct, 1) << __func__ << " received session retry global global_seq="
@@ -2489,7 +2020,7 @@ CtPtr ProtocolV2::handle_wait() {
   state = WAIT;
   ceph_assert(rx_segments_data.size() == 1);
   ceph_assert(rx_segments_desc.size() == 1);
-  WaitFrame(*this, rx_segments_data[SegmentIndex::Frame::PAYLOAD]);
+  WaitFrame(session_stream_handlers, rx_segments_data[SegmentIndex::Frame::PAYLOAD]);
   return _fault();
 }
 
@@ -2498,7 +2029,7 @@ CtPtr ProtocolV2::handle_reconnect_ok(ceph::bufferlist &payload)
   ldout(cct, 20) << __func__
                 << " payload.length()=" << payload.length() << dendl;
 
-  ReconnectOkFrame reconnect_ok(*this, payload);
+  ReconnectOkFrame reconnect_ok(session_stream_handlers, payload);
   ldout(cct, 5) << __func__
                 << " reconnect accepted: sms=" << reconnect_ok.msg_seq()
                 << dendl;
@@ -2525,7 +2056,7 @@ CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload)
   ldout(cct, 20) << __func__
                 << " payload.length()=" << payload.length() << dendl;
 
-  ServerIdentFrame server_ident(*this, payload);
+  ServerIdentFrame server_ident(session_stream_handlers, payload);
   ldout(cct, 5) << __func__ << " received server identification:"
                 << " addrs=" << server_ident.addrs()
                 << " gid=" << server_ident.gid()
@@ -2677,7 +2208,7 @@ CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
   ldout(cct, 20) << __func__
                 << " payload.length()=" << payload.length() << dendl;
 
-  ClientIdentFrame client_ident(*this, payload);
+  ClientIdentFrame client_ident(session_stream_handlers, payload);
 
   ldout(cct, 5) << __func__ << " received client identification:"
                 << " addrs=" << client_ident.addrs()
@@ -2717,7 +2248,7 @@ CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
   if (feat_missing) {
     ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
                   << feat_missing << std::dec << dendl;
-    IdentMissingFeaturesFrame ident_missing_features(*this, feat_missing);
+    IdentMissingFeaturesFrame ident_missing_features(session_stream_handlers, feat_missing);
 
     bufferlist &bl = ident_missing_features.get_buffer();
     return WRITE(bl, "ident missing features", read_frame);
@@ -2768,7 +2299,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
   ldout(cct, 20) << __func__
                 << " payload.length()=" << payload.length() << dendl;
 
-  ReconnectFrame reconnect(*this, payload);
+  ReconnectFrame reconnect(session_stream_handlers, payload);
 
   ldout(cct, 5) << __func__
                 << " received reconnect:" 
@@ -2813,7 +2344,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
     // session
     ldout(cct, 0) << __func__
                   << " no existing connection exists, reseting client" << dendl;
-    ResetFrame reset(*this, true);
+    ResetFrame reset(session_stream_handlers, true);
     return WRITE(reset.get_buffer(), "session reset", read_frame);
   }
 
@@ -2828,7 +2359,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
   if (exproto->state == CLOSED) {
     ldout(cct, 5) << __func__ << " existing " << existing
                   << " already closed. Reseting client" << dendl;
-    ResetFrame reset(*this, true);
+    ResetFrame reset(session_stream_handlers, true);
     return WRITE(reset.get_buffer(), "session reset", read_frame);
   }
 
@@ -2836,7 +2367,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
     ldout(cct, 1) << __func__
                   << " existing racing replace happened while replacing."
                   << " existing=" << existing << dendl;
-    RetryGlobalFrame retry(*this, exproto->peer_global_seq);
+    RetryGlobalFrame retry(session_stream_handlers, exproto->peer_global_seq);
     bufferlist &bl = retry.get_buffer();
     return WRITE(bl, "session retry", read_frame);
   }
@@ -2848,7 +2379,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
                   << " rcc=" << reconnect.client_cookie()
                   << ", reseting client."
                   << dendl;
-    ResetFrame reset(*this, connection->policy.resetcheck);
+    ResetFrame reset(session_stream_handlers, connection->policy.resetcheck);
     return WRITE(reset.get_buffer(), "session reset", read_frame);
   } else if (exproto->server_cookie == 0) {
     // this happens when:
@@ -2861,7 +2392,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
     ldout(cct, 1) << __func__ << " I was a client and didn't received the"
                   << " server_ident. Asking peer to resume session"
                   << " establishment" << dendl;
-    ResetFrame reset(*this, false);
+    ResetFrame reset(session_stream_handlers, false);
     return WRITE(reset.get_buffer(), "session reset", read_frame);
   }
 
@@ -2870,7 +2401,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
                   << " stale global_seq: sgs=" << exproto->peer_global_seq
                   << " cgs=" << reconnect.global_seq()
                   << ", ask client to retry global" << dendl;
-    RetryGlobalFrame retry(*this, exproto->peer_global_seq);
+    RetryGlobalFrame retry(session_stream_handlers, exproto->peer_global_seq);
 
     INTERCEPT(18);
 
@@ -2882,7 +2413,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
                   << " stale connect_seq scs=" << exproto->connect_seq
                   << " ccs=" << reconnect.connect_seq()
                   << " , ask client to retry" << dendl;
-    RetryFrame retry(*this, exproto->connect_seq);
+    RetryFrame retry(session_stream_handlers, exproto->connect_seq);
     return WRITE(retry.get_buffer(), "session retry", read_frame);
   }
 
@@ -2897,7 +2428,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
           << " reconnect race detected, this connection loses to existing="
           << existing << dendl;
 
-      WaitFrame wait(*this);
+      WaitFrame wait(session_stream_handlers);
       return WRITE(wait.get_buffer(), "wait", read_frame);
     } else {
       // this connection wins
@@ -2940,7 +2471,7 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
     ldout(cct, 1) << __func__
                   << " existing racing replace happened while replacing."
                   << " existing=" << existing << dendl;
-    WaitFrame wait(*this);
+    WaitFrame wait(session_stream_handlers);
     return WRITE(wait.get_buffer(), "wait", read_frame);
   }
 
@@ -3017,7 +2548,7 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
                // has something to send to us.
     existing->send_keepalive();
     existing->lock.lock();
-    WaitFrame wait(*this);
+    WaitFrame wait(session_stream_handlers);
     bufferlist &bl = wait.get_buffer();
     return WRITE(bl, "wait", read_frame);
   }
@@ -3156,7 +2687,7 @@ CtPtr ProtocolV2::send_server_ident() {
 
   uint64_t gs = messenger->get_global_seq();
   ServerIdentFrame server_ident(
-      *this, messenger->get_myaddrs(), messenger->get_myname().num(), gs,
+      session_stream_handlers, messenger->get_myaddrs(), messenger->get_myname().num(), gs,
       connection->policy.features_supported,
       connection->policy.features_required | msgr2_required,
       flags,
@@ -3225,7 +2756,7 @@ CtPtr ProtocolV2::send_reconnect_ok() {
   out_seq = discard_requeued_up_to(out_seq, message_seq);
 
   uint64_t ms = in_seq;
-  ReconnectOkFrame reconnect_ok(*this, ms);
+  ReconnectOkFrame reconnect_ok(session_stream_handlers, ms);
 
   ldout(cct, 5) << __func__ << " sending reconnect_ok: msg_seq=" << ms << dendl;
 
index 66e4e9752a7cc935303794f6bbacf89c93339067..50dbbeecf975007802a9795343bd285a5218d838 100644 (file)
@@ -8,6 +8,7 @@
 
 #include "Protocol.h"
 #include "crypto_onwire.h"
+#include "frames_v2.h"
 
 class ProtocolV2 : public Protocol {
 private:
@@ -49,28 +50,6 @@ private:
   }
 
 public:
-  enum class Tag : __u8 {
-    HELLO = 1,
-    AUTH_REQUEST,
-    AUTH_BAD_METHOD,
-    AUTH_REPLY_MORE,
-    AUTH_REQUEST_MORE,
-    AUTH_DONE,
-    CLIENT_IDENT,
-    SERVER_IDENT,
-    IDENT_MISSING_FEATURES,
-    SESSION_RECONNECT,
-    SESSION_RESET,
-    SESSION_RETRY,
-    SESSION_RETRY_GLOBAL,
-    SESSION_RECONNECT_OK,
-    WAIT,
-    MESSAGE,
-    KEEPALIVE2,
-    KEEPALIVE2_ACK,
-    ACK
-  };
-
   // TODO: move into auth_meta?
   ceph::crypto::onwire::rxtx_t session_stream_handlers;
 private:
@@ -106,16 +85,6 @@ private:
   uint32_t next_payload_len;
 
 public:
-  struct segment_t {
-    // TODO: this will be dropped with support for `allocation policies`.
-    // We need them because of the rx_buffers zero-copy optimization.
-    static constexpr __le16 DEFERRED_ALLOCATION { 0x0000 };
-
-    static constexpr __le16 DEFAULT_ALIGNMENT = sizeof(void*);
-
-    __le32 length;
-    __le16 alignment;
-  } __attribute__((packed));
 
   struct onwire_segment_t {
     // crypto-processed segment can be expanded on-wire because of:
@@ -124,11 +93,9 @@ public:
     //    See RxHandler::get_extra_size_at_final().
     __le32 onwire_length;
 
-    struct segment_t logical;
+    struct ceph::msgr::v2::segment_t logical;
   } __attribute__((packed));
 
-  static constexpr std::size_t MAX_NUM_SEGMENTS = 4;
-
   struct SegmentIndex {
     struct Msg {
       static constexpr std::size_t HEADER = 0;
@@ -143,13 +110,13 @@ public:
   };
 
   boost::container::static_vector<onwire_segment_t,
-                                 MAX_NUM_SEGMENTS> rx_segments_desc;
+                                 ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_desc;
   boost::container::static_vector<ceph::bufferlist,
-                                 MAX_NUM_SEGMENTS> rx_segments_data;
+                                 ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_data;
 
 private:
 
-  Tag next_tag;
+  ceph::msgr::v2::Tag next_tag;
   ceph_msg_header2 current_header;
   utime_t backoff;  // backoff time
   utime_t recv_stamp;
diff --git a/src/msg/async/frames_v2.h b/src/msg/async/frames_v2.h
new file mode 100644 (file)
index 0000000..5b92577
--- /dev/null
@@ -0,0 +1,511 @@
+#ifndef _MSG_ASYNC_FRAMES_V2_
+#define _MSG_ASYNC_FRAMES_V2_
+
+#include "include/types.h"
+#include "crypto_onwire.h"
+#include <array>
+
+/**
+ * Protocol V2 Frame Structures
+ * 
+ * Documentation in: doc/dev/msgr2.rst
+ **/
+
+namespace ceph::msgr::v2 {
+
+// We require these features from any peer, period, in order to encode
+// a entity_addrvec_t.
+const uint64_t msgr2_required = CEPH_FEATUREMASK_MSG_ADDR2;
+
+// We additionally assume the peer has the below features *purely for
+// the purpose of encoding the frames themselves*.  The only complex
+// types in the frames are entity_addr_t and entity_addrvec_t, and we
+// specifically want the peer to understand the (new in nautilus)
+// TYPE_ANY.  We treat narrow this assumption to frames because we
+// expect there may be future clients (the kernel) that understand
+// msgr v2 and understand this encoding but don't necessarily have
+// everything else that SERVER_NAUTILUS implies.  Yes, a fresh feature
+// bit would be a cleaner approach, but those are scarce these days.
+const uint64_t msgr2_frame_assumed =
+                  msgr2_required |
+                  CEPH_FEATUREMASK_SERVER_NAUTILUS;
+
+enum class Tag : __u8 {
+  HELLO = 1,
+  AUTH_REQUEST,
+  AUTH_BAD_METHOD,
+  AUTH_REPLY_MORE,
+  AUTH_REQUEST_MORE,
+  AUTH_DONE,
+  CLIENT_IDENT,
+  SERVER_IDENT,
+  IDENT_MISSING_FEATURES,
+  SESSION_RECONNECT,
+  SESSION_RESET,
+  SESSION_RETRY,
+  SESSION_RETRY_GLOBAL,
+  SESSION_RECONNECT_OK,
+  WAIT,
+  MESSAGE,
+  KEEPALIVE2,
+  KEEPALIVE2_ACK,
+  ACK
+};
+
+struct segment_t {
+  // TODO: this will be dropped with support for `allocation policies`.
+  // We need them because of the rx_buffers zero-copy optimization.
+  static constexpr __le16 DEFERRED_ALLOCATION{0x0000};
+
+  static constexpr __le16 DEFAULT_ALIGNMENT = sizeof(void *);
+
+  __le32 length;
+  __le16 alignment;
+} __attribute__((packed));
+
+static constexpr uint8_t CRYPTO_BLOCK_SIZE { 16 };
+
+static constexpr std::size_t MAX_NUM_SEGMENTS = 4;
+
+// V2 preamble consists of one or more preamble blocks depending on
+// the number of segments a particular frame needs. Each block holds
+// up to MAX_NUM_SEGMENTS segments and has its own CRC.
+//
+// XXX: currently the multi-segment facility is NOT implemented.
+struct preamble_block_t {  
+  // Tag. For multi-segmented frames the value is the same
+  // between subsequent preamble blocks.
+  __u8 tag;
+
+  // Number of segments to go in entire frame. First preable block has
+  // set this to just #segments, second #segments - MAX_NUM_SEGMENTS,
+  // third to #segments - MAX_NUM_SEGMENTS and so on.
+  __u8 num_segments;
+
+  std::array<segment_t, MAX_NUM_SEGMENTS> segments;
+  __u8 _reserved[2];
+
+  // CRC32 for this single preamble block.
+  __le32 crc;
+} __attribute__((packed));
+static_assert(sizeof(preamble_block_t) % CRYPTO_BLOCK_SIZE == 0);
+static_assert(std::is_standard_layout<preamble_block_t>::value);
+
+
+static constexpr uint32_t FRAME_PREAMBLE_SIZE = sizeof(preamble_block_t);
+
+template <class T>
+struct Frame {
+protected:
+  ceph::bufferlist payload;
+  ceph::bufferlist::contiguous_filler preamble_filler;
+
+  void fill_preamble(const std::initializer_list<segment_t> main_segments) {
+    ceph_assert(std::size(main_segments) <= MAX_NUM_SEGMENTS);
+
+    // Craft the main preamble. It's always present regardless of the number
+    // of segments message is composed from. This doesn't apply to extra one
+    // as it's optional -- if there is up to 2 segments, we'll never transmit
+    // preamble_extra_t;
+    {
+      preamble_block_t main_preamble;
+      // TODO: we might fill/pad with pseudo-random data.
+      ::memset(&main_preamble, 0, sizeof(main_preamble));
+
+      main_preamble.num_segments = std::size(main_segments);
+      main_preamble.tag = static_cast<__u8>(T::tag);
+      ceph_assert(main_preamble.tag != 0);
+
+      std::copy(std::cbegin(main_segments), std::cend(main_segments),
+                std::begin(main_preamble.segments));
+
+      main_preamble.crc =
+          ceph_crc32c(0, reinterpret_cast<unsigned char *>(&main_preamble),
+                      sizeof(main_preamble) - sizeof(main_preamble.crc));
+
+      preamble_filler.copy_in(sizeof(main_preamble),
+                              reinterpret_cast<const char *>(&main_preamble));
+    }
+  }
+
+public:
+  Frame() : preamble_filler(payload.append_hole(FRAME_PREAMBLE_SIZE)) {}
+
+  ceph::bufferlist &get_buffer() {
+    fill_preamble({segment_t{payload.length() - FRAME_PREAMBLE_SIZE,
+                   segment_t::DEFAULT_ALIGNMENT}});
+    return payload;
+  }
+
+  void decode_frame(const ceph::bufferlist &bl) {
+    auto ti = bl.cbegin();
+    static_cast<T *>(this)->decode_payload(ti);
+  }
+
+  void decode_payload(bufferlist::const_iterator &ti) {}
+};
+
+// TODO, FIXME: fix this altogether with the Frame hierarchy rework
+struct do_not_encode_tag_t {};
+struct dummy_ctor_conflict_helper {};
+
+template <class C, typename... Args>
+struct PayloadFrame : public Frame<C> {
+protected:
+  // this tuple is only used when decoding values from a payload buffer
+  std::tuple<Args...> _values;
+
+  // FIXME: for now, we assume specific features for the purpoess of encoding
+  // the frames themselves (*not* messages in message frames!).
+  uint64_t features = msgr2_frame_assumed;
+
+  template <typename T>
+  inline void _encode_payload_each(T &t) {
+    if constexpr (std::is_same<T, bufferlist const>()) {
+      this->payload.claim_append((bufferlist &)t);
+    } else if constexpr (std::is_same<T, std::vector<uint32_t> const>()) {
+      encode((uint32_t)t.size(), this->payload, features);
+      for (const auto &elem : t) {
+        encode(elem, this->payload, features);
+      }
+    } else if constexpr (std::is_same<T, ceph_msg_header2 const>()) {
+      this->payload.append((char *)&t, sizeof(t));
+    } else if constexpr (std::is_same<T, dummy_ctor_conflict_helper const>()) {
+      /* NOP, only to discriminate ctors for decode/encode. FIXME. */
+    } else {
+      encode(t, this->payload, features);
+    }
+  }
+
+  template <typename T>
+  inline void _decode_payload_each(T &t, bufferlist::const_iterator &ti) const {
+    if constexpr (std::is_same<T, bufferlist>()) {
+      if (ti.get_remaining()) {
+        t.append(ti.get_current_ptr());
+      }
+    } else if constexpr (std::is_same<T, std::vector<uint32_t>>()) {
+      uint32_t size;
+      decode(size, ti);
+      t.resize(size);
+      for (uint32_t i = 0; i < size; ++i) {
+        decode(t[i], ti);
+      }
+    } else if constexpr (std::is_same<T, ceph_msg_header2>()) {
+      auto ptr = ti.get_current_ptr();
+      ti.advance(sizeof(T));
+      t = *(T *)ptr.raw_c_str();
+    } else if constexpr (std::is_same<T, dummy_ctor_conflict_helper>()) {
+      /* NOP, only to discriminate ctors for decode/encode. FIXME. */
+    } else {
+      decode(t, ti);
+    }
+  }
+
+  template <std::size_t... Is>
+  inline void _decode_payload(bufferlist::const_iterator &ti,
+                              std::index_sequence<Is...>) const {
+    (_decode_payload_each((Args &)std::get<Is>(_values), ti), ...);
+  }
+
+  template <std::size_t N>
+  inline decltype(auto) get_val() {
+    return std::get<N>(_values);
+  }
+
+public:
+  PayloadFrame(const Args &... args) { (_encode_payload_each(args), ...); }
+
+  PayloadFrame(do_not_encode_tag_t) {}
+
+  PayloadFrame(const ceph::bufferlist &payload) { this->decode_frame(payload); }
+
+  void decode_payload(bufferlist::const_iterator &ti) {
+    _decode_payload(ti, std::index_sequence_for<Args...>());
+  }
+};
+
+struct HelloFrame : public PayloadFrame<HelloFrame,
+                                        uint8_t,          // entity type
+                                        entity_addr_t> {  // peer address
+  static const Tag tag = Tag::HELLO;
+  using PayloadFrame::PayloadFrame;
+
+  inline uint8_t &entity_type() { return get_val<0>(); }
+  inline entity_addr_t &peer_addr() { return get_val<1>(); }
+};
+
+struct AuthRequestFrame : public PayloadFrame<AuthRequestFrame,
+                                                               uint32_t, // auth method
+                                              vector<uint32_t>, // preferred modes
+                                              bufferlist> { // auth payload
+  static const Tag tag = Tag::AUTH_REQUEST;
+  using PayloadFrame::PayloadFrame;
+
+  inline uint32_t &method() { return get_val<0>(); }
+  inline vector<uint32_t> &preferred_modes() { return get_val<1>(); }
+  inline bufferlist &auth_payload() { return get_val<2>(); }
+};
+
+struct AuthBadMethodFrame : public PayloadFrame<AuthBadMethodFrame,
+                                                uint32_t, // method
+                                                int32_t,  // result
+                                                std::vector<uint32_t>,   // allowed methods
+                                                std::vector<uint32_t>> { // allowed modes
+  static const Tag tag = Tag::AUTH_BAD_METHOD;
+  using PayloadFrame::PayloadFrame;
+
+  inline uint32_t &method() { return get_val<0>(); }
+  inline int32_t &result() { return get_val<1>(); }
+  inline std::vector<uint32_t> &allowed_methods() { return get_val<2>(); }
+  inline std::vector<uint32_t> &allowed_modes() { return get_val<3>(); }
+};
+
+struct AuthReplyMoreFrame : public PayloadFrame<AuthReplyMoreFrame,
+                                                                 dummy_ctor_conflict_helper,
+                                                bufferlist> { // auth payload
+  static const Tag tag = Tag::AUTH_REPLY_MORE;
+  using PayloadFrame::PayloadFrame;
+
+  inline bufferlist &auth_payload() { return get_val<1>(); }
+};
+
+struct AuthRequestMoreFrame : public PayloadFrame<AuthRequestMoreFrame,
+                                                                   dummy_ctor_conflict_helper,
+                                                  bufferlist> { // auth payload
+  static const Tag tag = Tag::AUTH_REQUEST_MORE;
+  using PayloadFrame::PayloadFrame;
+
+  inline bufferlist &auth_payload() { return get_val<1>(); }
+};
+
+struct AuthDoneFrame : public PayloadFrame<AuthDoneFrame,
+                                           uint64_t, // global id
+                                           uint32_t, // connection mode
+                                           bufferlist> { // auth method payload
+  static const Tag tag = Tag::AUTH_DONE;
+  using PayloadFrame::PayloadFrame;
+
+  inline uint64_t &global_id() { return get_val<0>(); }
+  inline uint32_t &con_mode() { return get_val<1>(); }
+  inline bufferlist &auth_payload() { return get_val<2>(); }
+};
+
+template <class T, typename... Args>
+struct SignedEncryptedFrame : public PayloadFrame<T, Args...> {
+  ceph::bufferlist &get_buffer() {
+    // In contrast to Frame::get_buffer() we don't fill preamble here.
+    return this->payload;
+  }
+
+  SignedEncryptedFrame(ceph::crypto::onwire::rxtx_t &session_stream_handlers,
+                       const Args &... args)
+      : PayloadFrame<T, Args...>(args...) {
+    // FIXME: plainsize -> ciphersize; for AES-GCM they are equall apart
+    // from auth tag size
+    this->fill_preamble({segment_t{this->payload.length() - FRAME_PREAMBLE_SIZE,
+                         segment_t::DEFAULT_ALIGNMENT}});
+
+    if (session_stream_handlers.tx) {
+      ceph_assert(session_stream_handlers.tx);
+      session_stream_handlers.tx->reset_tx_handler({this->payload.length()});
+
+      session_stream_handlers.tx->authenticated_encrypt_update(
+          std::move(this->payload));
+      this->payload = session_stream_handlers.tx->authenticated_encrypt_final();
+    }
+  }
+
+  SignedEncryptedFrame(ceph::crypto::onwire::rxtx_t &session_stream_handlers,
+                       ceph::bufferlist &bl)
+      : PayloadFrame<T, Args...>(do_not_encode_tag_t{}) {
+    if (!session_stream_handlers.rx) {
+      this->decode_frame(bl);
+      return;
+    }
+
+    const auto length = bl.length();
+    ceph::bufferlist plain_bl =
+        session_stream_handlers.rx->authenticated_decrypt_update_final(
+            std::move(bl), segment_t::DEFAULT_ALIGNMENT);
+    ceph_assert(plain_bl.length() ==
+                length - session_stream_handlers.rx->get_extra_size_at_final());
+    this->decode_frame(plain_bl);
+  }
+};
+
+struct ClientIdentFrame
+    : public SignedEncryptedFrame<ClientIdentFrame, 
+                                  entity_addrvec_t,  // my addresses
+                                  entity_addr_t,  // target address
+                                  int64_t,  // global_id
+                                  uint64_t,  // global seq
+                                  uint64_t,  // supported features
+                                  uint64_t,  // required features
+                                  uint64_t,  // flags
+                                  uint64_t> {  // client cookie
+  static const Tag tag = Tag::CLIENT_IDENT;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  inline entity_addrvec_t &addrs() { return get_val<0>(); }
+  inline entity_addr_t &target_addr() { return get_val<1>(); }
+  inline int64_t &gid() { return get_val<2>(); }
+  inline uint64_t &global_seq() { return get_val<3>(); }
+  inline uint64_t &supported_features() { return get_val<4>(); }
+  inline uint64_t &required_features() { return get_val<5>(); }
+  inline uint64_t &flags() { return get_val<6>(); }
+  inline uint64_t &cookie() { return get_val<7>(); }
+};
+
+struct ServerIdentFrame
+    : public SignedEncryptedFrame<ServerIdentFrame,
+                                  entity_addrvec_t,  // my addresses
+                                  int64_t,  // global_id
+                                  uint64_t,  // global seq
+                                  uint64_t,  // supported features
+                                  uint64_t,  // required features
+                                  uint64_t,  // flags
+                                  uint64_t> {  // server cookie
+  static const Tag tag = Tag::SERVER_IDENT;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  inline entity_addrvec_t &addrs() { return get_val<0>(); }
+  inline int64_t &gid() { return get_val<1>(); }
+  inline uint64_t &global_seq() { return get_val<2>(); }
+  inline uint64_t &supported_features() { return get_val<3>(); }
+  inline uint64_t &required_features() { return get_val<4>(); }
+  inline uint64_t &flags() { return get_val<5>(); }
+  inline uint64_t &cookie() { return get_val<6>(); }
+};
+
+struct ReconnectFrame
+    : public SignedEncryptedFrame<ReconnectFrame, 
+                                  entity_addrvec_t,  // my addresses
+                                  uint64_t,  // client cookie
+                                  uint64_t,  // server cookie
+                                  uint64_t,  // global sequence
+                                  uint64_t,  // connect sequence
+                                  uint64_t> { // message sequence
+  static const Tag tag = Tag::SESSION_RECONNECT;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  inline entity_addrvec_t &addrs() { return get_val<0>(); }
+  inline uint64_t &client_cookie() { return get_val<1>(); }
+  inline uint64_t &server_cookie() { return get_val<2>(); }
+  inline uint64_t &global_seq() { return get_val<3>(); }
+  inline uint64_t &connect_seq() { return get_val<4>(); }
+  inline uint64_t &msg_seq() { return get_val<5>(); }
+};
+
+struct ResetFrame : public SignedEncryptedFrame<ResetFrame,
+                                                bool> {  // full reset
+  static const Tag tag = Tag::SESSION_RESET;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  inline bool &full() { return get_val<0>(); }
+};
+
+struct RetryFrame : public SignedEncryptedFrame<RetryFrame,
+                                                uint64_t> {  // connection seq
+  static const Tag tag = Tag::SESSION_RETRY;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  inline uint64_t &connect_seq() { return get_val<0>(); }
+};
+
+struct RetryGlobalFrame : public SignedEncryptedFrame<RetryGlobalFrame,
+                                                      uint64_t> { // global seq
+  static const Tag tag = Tag::SESSION_RETRY_GLOBAL;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  inline uint64_t &global_seq() { return get_val<0>(); }
+};
+
+struct WaitFrame : public SignedEncryptedFrame<WaitFrame> {
+  static const Tag tag = Tag::WAIT;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+};
+
+struct ReconnectOkFrame : public SignedEncryptedFrame<ReconnectOkFrame,
+                                                      uint64_t> { // message seq
+  static const Tag tag = Tag::SESSION_RECONNECT_OK;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  inline uint64_t &msg_seq() { return get_val<0>(); }
+};
+
+struct IdentMissingFeaturesFrame 
+    : public SignedEncryptedFrame<IdentMissingFeaturesFrame,
+                                  uint64_t> { // missing features mask
+  static const Tag tag = Tag::IDENT_MISSING_FEATURES;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  inline uint64_t &features() { return get_val<0>(); }
+};
+
+struct KeepAliveFrame : public SignedEncryptedFrame<KeepAliveFrame,
+                                                    utime_t> {  // timestamp
+  static const Tag tag = Tag::KEEPALIVE2;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  KeepAliveFrame(ceph::crypto::onwire::rxtx_t &session_stream_handlers)
+      : KeepAliveFrame(session_stream_handlers, ceph_clock_now()) {}
+
+  inline utime_t &timestamp() { return get_val<0>(); }
+};
+
+struct KeepAliveFrameAck : public SignedEncryptedFrame<KeepAliveFrameAck,
+                                                       utime_t> { // ack timestamp
+  static const Tag tag = Tag::KEEPALIVE2_ACK;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  inline utime_t &timestamp() { return get_val<0>(); }
+};
+
+struct AckFrame : public SignedEncryptedFrame<AckFrame,
+                                              uint64_t> { // message sequence
+  static const Tag tag = Tag::ACK;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  inline uint64_t &seq() { return get_val<0>(); }
+};
+
+// This class is used for encoding/decoding header of the message frame.
+// Body is processed almost independently with the sole junction point
+// being the `extra_payload_len` passed to get_buffer().
+struct MessageHeaderFrame
+    : public PayloadFrame<MessageHeaderFrame, ceph_msg_header2> {
+  static const Tag tag = Tag::MESSAGE;
+
+  ceph::bufferlist &get_buffer() {
+    // In contrast to Frame::get_buffer() we don't fill preamble here.
+    return this->payload;
+  }
+
+  MessageHeaderFrame(const ceph_msg_header2 &msghdr,
+                    const uint32_t front_len,
+                    const uint32_t middle_len,
+                    const uint32_t data_len)
+      : PayloadFrame<MessageHeaderFrame, ceph_msg_header2>(msghdr)
+  {
+    // FIXME: plainsize -> ciphersize; for AES-GCM they are equall apart from auth tag size
+    fill_preamble({
+      segment_t{ this->payload.length() - FRAME_PREAMBLE_SIZE,
+                segment_t::DEFAULT_ALIGNMENT },
+      segment_t{ front_len, segment_t::DEFAULT_ALIGNMENT },
+      segment_t{ middle_len, segment_t::DEFAULT_ALIGNMENT },
+      segment_t{ data_len, segment_t::DEFERRED_ALLOCATION },
+    });
+  }
+
+  MessageHeaderFrame(ceph::bufferlist&& text)
+      : PayloadFrame<MessageHeaderFrame, ceph_msg_header2>(do_not_encode_tag_t{})
+  {
+    this->decode_frame(text);
+  }
+
+  inline ceph_msg_header2 &header() { return get_val<0>(); }
+};
+
+} // namespace ceph::msgr::v2
+
+#endif // _MSG_ASYNC_FRAMES_V2_