]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async, v2: rework the class hierarchy - introduce ControlFrame.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Mon, 4 Mar 2019 21:39:18 +0000 (22:39 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Sat, 9 Mar 2019 23:44:34 +0000 (00:44 +0100)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/msg/async/frames_v2.h

index b99269702a53fad211a6108125f3010b39c25fdf..6d709bf50c512be02d4f4f4cc7735688bdd66215 100644 (file)
@@ -72,7 +72,7 @@ struct SegmentIndex {
     static constexpr std::size_t DATA = 3;
   };
 
-  struct Frame {
+  struct Control {
     static constexpr std::size_t PAYLOAD = 0;
   };
 };
@@ -172,70 +172,57 @@ static ceph::bufferlist segment_onwire_bufferlist(const ceph::bufferlist &bl)
   return ret;
 }
 
-template <class T>
+template <class T, __u8 SegmentsNumV>
 struct Frame {
+  static_assert(SegmentsNumV > 0 && SegmentsNumV <= MAX_NUM_SEGMENTS);
 protected:
-  ceph::bufferlist payload;
+  std::array<ceph::bufferlist, SegmentsNumV> segments;
   ceph::bufferlist::contiguous_filler preamble_filler;
 
+  // craft the main preamble. It's always present regardless of the number
+  // of segments message is composed from.
   void fill_preamble(const std::initializer_list<segment_t> main_segments) {
     ceph_assert(std::size(main_segments) <= MAX_NUM_SEGMENTS);
 
-    // Craft the main preamble. It's always present regardless of the number
-    // of segments message is composed from. This doesn't apply to extra one
-    // as it's optional -- if there is up to 2 segments, we'll never transmit
-    // preamble_extra_t;
-    {
-      preamble_block_t main_preamble;
-      // TODO: we might fill/pad with pseudo-random data.
-      ::memset(&main_preamble, 0, sizeof(main_preamble));
+    preamble_block_t main_preamble;
+    // TODO: we might fill/pad with pseudo-random data.
+    ::memset(&main_preamble, 0, sizeof(main_preamble));
 
-      main_preamble.num_segments = std::size(main_segments);
-      main_preamble.tag = static_cast<__u8>(T::tag);
-      ceph_assert(main_preamble.tag != 0);
+    main_preamble.num_segments = std::size(main_segments);
+    main_preamble.tag = static_cast<__u8>(T::tag);
+    ceph_assert(main_preamble.tag != 0);
 
-      std::copy(std::cbegin(main_segments), std::cend(main_segments),
-                std::begin(main_preamble.segments));
+    std::copy(std::cbegin(main_segments), std::cend(main_segments),
+              std::begin(main_preamble.segments));
 
-      main_preamble.crc =
-          ceph_crc32c(0, reinterpret_cast<unsigned char *>(&main_preamble),
-                      sizeof(main_preamble) - sizeof(main_preamble.crc));
+    main_preamble.crc =
+        ceph_crc32c(0, reinterpret_cast<unsigned char *>(&main_preamble),
+                    sizeof(main_preamble) - sizeof(main_preamble.crc));
 
-      preamble_filler.copy_in(sizeof(main_preamble),
-                              reinterpret_cast<const char *>(&main_preamble));
-    }
+    preamble_filler.copy_in(sizeof(main_preamble),
+                            reinterpret_cast<const char *>(&main_preamble));
   }
 
-  Frame() : preamble_filler(payload.append_hole(FRAME_PREAMBLE_SIZE)) {}
-
-  void decode_frame(const ceph::bufferlist &bl) {
-    auto ti = bl.cbegin();
-    static_cast<T *>(this)->decode_payload(ti);
+  Frame()
+    : preamble_filler(segments.front().append_hole(FRAME_PREAMBLE_SIZE)) {
   }
 
-  void decode_payload(bufferlist::const_iterator &ti) {}
-
 public:
-  ceph::bufferlist &get_buffer() {
-    fill_preamble({segment_t{payload.length() - FRAME_PREAMBLE_SIZE,
-                   segment_t::DEFAULT_ALIGNMENT}});
-
-    epilogue_plain_block_t epilogue;
-    ::memset(&epilogue, 0, sizeof(epilogue));
-
-    ceph::bufferlist::const_iterator hdriter(&this->payload, FRAME_PREAMBLE_SIZE);
-    epilogue.crc_values[SegmentIndex::Frame::PAYLOAD] =
-        hdriter.crc32c(hdriter.get_remaining(), -1);
-    this->payload.append(reinterpret_cast<const char*>(&epilogue), sizeof(epilogue));
-
-    return payload;
-  }
 };
 
+
+// ControlFrames are used to manage transceiver state (like connections) and
+// orchestrate transfers of MessageFrames. They use only single segment with
+// marshalling facilities -- derived classes specify frame structure through
+// Args pack while ControlFrame provides common encode/decode machinery.
 template <class C, typename... Args>
-class PayloadFrame : public Frame<C> {
+class ControlFrame : public Frame<C, 1 /* single segment */> {
 protected:
-  // this tuple is only used when decoding values from a payload buffer
+  ceph::bufferlist &get_payload_segment() {
+    return this->segments[SegmentIndex::Control::PAYLOAD];
+  }
+
+  // this tuple is only used when decoding values from a payload segment
   std::tuple<Args...> _values;
 
   // FIXME: for now, we assume specific features for the purpoess of encoding
@@ -245,16 +232,16 @@ protected:
   template <typename T>
   inline void _encode_payload_each(T &t) {
     if constexpr (std::is_same<T, bufferlist const>()) {
-      this->payload.claim_append((bufferlist &)t);
+      this->get_payload_segment().claim_append((bufferlist &)t);
     } else if constexpr (std::is_same<T, std::vector<uint32_t> const>()) {
-      encode((uint32_t)t.size(), this->payload, features);
+      encode((uint32_t)t.size(), this->get_payload_segment(), features);
       for (const auto &elem : t) {
-        encode(elem, this->payload, features);
+        encode(elem, this->get_payload_segment(), features);
       }
     } else if constexpr (std::is_same<T, ceph_msg_header2 const>()) {
-      this->payload.append((char *)&t, sizeof(t));
+      this->get_payload_segment().append((char *)&t, sizeof(t));
     } else {
-      encode(t, this->payload, features);
+      encode(t, this->get_payload_segment(), features);
     }
   }
 
@@ -291,13 +278,37 @@ protected:
     return std::get<N>(_values);
   }
 
-  PayloadFrame() : Frame<C>() {}
+  ControlFrame() : Frame<C, 1 /* single segment */>() {}
 
   void _encode(const Args &... args) {
     (_encode_payload_each(args), ...);
   }
 
+  void _decode(const ceph::bufferlist &bl) {
+    auto ti = bl.cbegin();
+    _decode_payload(ti, std::index_sequence_for<Args...>());
+  }
+
 public:
+  ceph::bufferlist &get_buffer() {
+    this->fill_preamble({
+      segment_t{ get_payload_segment().length() - FRAME_PREAMBLE_SIZE,
+                 segment_t::DEFAULT_ALIGNMENT}
+    });
+
+    epilogue_plain_block_t epilogue;
+    ::memset(&epilogue, 0, sizeof(epilogue));
+
+    ceph::bufferlist::const_iterator hdriter(&this->get_payload_segment(),
+                                             FRAME_PREAMBLE_SIZE);
+    epilogue.crc_values[SegmentIndex::Control::PAYLOAD] =
+        hdriter.crc32c(hdriter.get_remaining(), -1);
+    get_payload_segment().append(reinterpret_cast<const char*>(&epilogue),
+                                 sizeof(epilogue));
+
+    return get_payload_segment();
+  }
+
   static C Encode(const Args &... args) {
     C c;
     c._encode(args...);
@@ -306,53 +317,49 @@ public:
 
   static C Decode(const ceph::bufferlist &payload) {
     C c;
-    c.decode_frame(payload);
+    c._decode(payload);
     return c;
   }
-
-  void decode_payload(bufferlist::const_iterator &ti) {
-    _decode_payload(ti, std::index_sequence_for<Args...>());
-  }
 };
 
-struct HelloFrame : public PayloadFrame<HelloFrame,
+struct HelloFrame : public ControlFrame<HelloFrame,
                                         uint8_t,          // entity type
                                         entity_addr_t> {  // peer address
   static const Tag tag = Tag::HELLO;
-  using PayloadFrame::Encode;
-  using PayloadFrame::Decode;
+  using ControlFrame::Encode;
+  using ControlFrame::Decode;
 
   inline uint8_t &entity_type() { return get_val<0>(); }
   inline entity_addr_t &peer_addr() { return get_val<1>(); }
 
 protected:
-  using PayloadFrame::PayloadFrame;
+  using ControlFrame::ControlFrame;
 };
 
-struct AuthRequestFrame : public PayloadFrame<AuthRequestFrame,
-                                                               uint32_t, // auth method
+struct AuthRequestFrame : public ControlFrame<AuthRequestFrame,
+                                              uint32_t, // auth method
                                               vector<uint32_t>, // preferred modes
                                               bufferlist> { // auth payload
   static const Tag tag = Tag::AUTH_REQUEST;
-  using PayloadFrame::Encode;
-  using PayloadFrame::Decode;
+  using ControlFrame::Encode;
+  using ControlFrame::Decode;
 
   inline uint32_t &method() { return get_val<0>(); }
   inline vector<uint32_t> &preferred_modes() { return get_val<1>(); }
   inline bufferlist &auth_payload() { return get_val<2>(); }
 
 protected:
-  using PayloadFrame::PayloadFrame;
+  using ControlFrame::ControlFrame;
 };
 
-struct AuthBadMethodFrame : public PayloadFrame<AuthBadMethodFrame,
+struct AuthBadMethodFrame : public ControlFrame<AuthBadMethodFrame,
                                                 uint32_t, // method
                                                 int32_t,  // result
                                                 std::vector<uint32_t>,   // allowed methods
                                                 std::vector<uint32_t>> { // allowed modes
   static const Tag tag = Tag::AUTH_BAD_METHOD;
-  using PayloadFrame::Encode;
-  using PayloadFrame::Decode;
+  using ControlFrame::Encode;
+  using ControlFrame::Decode;
 
   inline uint32_t &method() { return get_val<0>(); }
   inline int32_t &result() { return get_val<1>(); }
@@ -360,94 +367,94 @@ struct AuthBadMethodFrame : public PayloadFrame<AuthBadMethodFrame,
   inline std::vector<uint32_t> &allowed_modes() { return get_val<3>(); }
 
 protected:
-  using PayloadFrame::PayloadFrame;
+  using ControlFrame::ControlFrame;
 };
 
-struct AuthReplyMoreFrame : public PayloadFrame<AuthReplyMoreFrame,
+struct AuthReplyMoreFrame : public ControlFrame<AuthReplyMoreFrame,
                                                 bufferlist> { // auth payload
   static const Tag tag = Tag::AUTH_REPLY_MORE;
-  using PayloadFrame::Encode;
-  using PayloadFrame::Decode;
+  using ControlFrame::Encode;
+  using ControlFrame::Decode;
 
   inline bufferlist &auth_payload() { return get_val<0>(); }
 
 protected:
-  using PayloadFrame::PayloadFrame;
+  using ControlFrame::ControlFrame;
 };
 
-struct AuthRequestMoreFrame : public PayloadFrame<AuthRequestMoreFrame,
+struct AuthRequestMoreFrame : public ControlFrame<AuthRequestMoreFrame,
                                                   bufferlist> { // auth payload
   static const Tag tag = Tag::AUTH_REQUEST_MORE;
-  using PayloadFrame::Encode;
-  using PayloadFrame::Decode;
+  using ControlFrame::Encode;
+  using ControlFrame::Decode;
 
   inline bufferlist &auth_payload() { return get_val<0>(); }
 
 protected:
-  using PayloadFrame::PayloadFrame;
+  using ControlFrame::ControlFrame;
 };
 
-struct AuthDoneFrame : public PayloadFrame<AuthDoneFrame,
+struct AuthDoneFrame : public ControlFrame<AuthDoneFrame,
                                            uint64_t, // global id
                                            uint32_t, // connection mode
                                            bufferlist> { // auth method payload
   static const Tag tag = Tag::AUTH_DONE;
-  using PayloadFrame::Encode;
-  using PayloadFrame::Decode;
+  using ControlFrame::Encode;
+  using ControlFrame::Decode;
 
   inline uint64_t &global_id() { return get_val<0>(); }
   inline uint32_t &con_mode() { return get_val<1>(); }
   inline bufferlist &auth_payload() { return get_val<2>(); }
 
 protected:
-  using PayloadFrame::PayloadFrame;
+  using ControlFrame::ControlFrame;
 };
 
 template <class T, typename... Args>
-struct SignedEncryptedFrame : public PayloadFrame<T, Args...> {
+struct SignedEncryptedFrame : public ControlFrame<T, Args...> {
   ceph::bufferlist &get_buffer() {
     // In contrast to Frame::get_buffer() we don't fill preamble here.
-    return this->payload;
+    return this->get_payload_segment();
   }
 
   static T Encode(ceph::crypto::onwire::rxtx_t &session_stream_handlers,
                   const Args &... args) {
-    T c = PayloadFrame<T, Args...>::Encode(args...);
-    c.fill_preamble({segment_t{c.payload.length() - FRAME_PREAMBLE_SIZE,
+    T c = ControlFrame<T, Args...>::Encode(args...);
+    c.fill_preamble({segment_t{c.get_payload_segment().length() - FRAME_PREAMBLE_SIZE,
                      segment_t::DEFAULT_ALIGNMENT}});
 
     if (session_stream_handlers.tx) {
-      c.payload = segment_onwire_bufferlist(std::move(c.payload));
+      c.get_payload_segment() = segment_onwire_bufferlist(std::move(c.get_payload_segment()));
 
       epilogue_secure_block_t epilogue;
       ::memset(&epilogue, 0, sizeof(epilogue));
-      c.payload.append(reinterpret_cast<const char*>(&epilogue), sizeof(epilogue));
+      c.get_payload_segment().append(reinterpret_cast<const char*>(&epilogue), sizeof(epilogue));
 
       ceph_assert(session_stream_handlers.tx);
-      session_stream_handlers.tx->reset_tx_handler({c.payload.length()});
+      session_stream_handlers.tx->reset_tx_handler({c.get_payload_segment().length()});
 
       session_stream_handlers.tx->authenticated_encrypt_update(
-          std::move(c.payload));
-      c.payload = session_stream_handlers.tx->authenticated_encrypt_final();
+          std::move(c.get_payload_segment()));
+      c.get_payload_segment() = session_stream_handlers.tx->authenticated_encrypt_final();
     } else {
       epilogue_plain_block_t epilogue;
       ::memset(&epilogue, 0, sizeof(epilogue));
 
-      ceph::bufferlist::const_iterator hdriter(&c.payload, FRAME_PREAMBLE_SIZE);
-      epilogue.crc_values[SegmentIndex::Frame::PAYLOAD] =
+      ceph::bufferlist::const_iterator hdriter(&c.get_payload_segment(), FRAME_PREAMBLE_SIZE);
+      epilogue.crc_values[SegmentIndex::Control::PAYLOAD] =
           hdriter.crc32c(hdriter.get_remaining(), -1);
-      c.payload.append(reinterpret_cast<const char*>(&epilogue), sizeof(epilogue));
+      c.get_payload_segment().append(reinterpret_cast<const char*>(&epilogue), sizeof(epilogue));
     }
     return c;
   }
 
   static T Decode(ceph::crypto::onwire::rxtx_t &session_stream_handlers,
                   ceph::bufferlist &payload) {
-    return PayloadFrame<T, Args...>::Decode(payload);
+    return ControlFrame<T, Args...>::Decode(payload);
   }
 
 protected:
-  SignedEncryptedFrame() : PayloadFrame<T, Args...>() {}
+  SignedEncryptedFrame() : ControlFrame<T, Args...>() {}
 };
 
 struct ClientIdentFrame
@@ -640,12 +647,12 @@ protected:
 // Body is processed almost independently with the sole junction point
 // being the `extra_payload_len` passed to get_buffer().
 struct MessageHeaderFrame
-    : public PayloadFrame<MessageHeaderFrame, ceph_msg_header2> {
+    : public ControlFrame<MessageHeaderFrame, ceph_msg_header2> {
   static const Tag tag = Tag::MESSAGE;
 
   ceph::bufferlist &get_buffer() {
     // In contrast to Frame::get_buffer() we don't fill preamble here.
-    return this->payload;
+    return this->get_payload_segment();
   }
 
   static MessageHeaderFrame Encode(ceph::crypto::onwire::rxtx_t &session_stream_handlers,
@@ -654,10 +661,10 @@ struct MessageHeaderFrame
                                    const ceph::bufferlist& middle,
                                    const ceph::bufferlist& data) {
     MessageHeaderFrame f =
-        PayloadFrame<MessageHeaderFrame, ceph_msg_header2>::Encode(msg_header);
+        ControlFrame<MessageHeaderFrame, ceph_msg_header2>::Encode(msg_header);
 
     f.fill_preamble({
-      segment_t{ f.payload.length() - FRAME_PREAMBLE_SIZE,
+      segment_t{ f.get_payload_segment().length() - FRAME_PREAMBLE_SIZE,
                 segment_t::DEFAULT_ALIGNMENT },
       segment_t{ front.length(), segment_t::DEFAULT_ALIGNMENT },
       segment_t{ middle.length(), segment_t::DEFAULT_ALIGNMENT },
@@ -668,22 +675,23 @@ struct MessageHeaderFrame
       // we're padding segments to biggest cipher's block size. Although
       // AES-GCM can live without that as it's a stream cipher, we don't
       // to be fixed to stream ciphers only.
-      f.payload = segment_onwire_bufferlist(std::move(f.payload));
+      f.get_payload_segment() =
+          segment_onwire_bufferlist(std::move(f.get_payload_segment()));
       auto front_padded = segment_onwire_bufferlist(front);
       auto middle_padded = segment_onwire_bufferlist(middle);
       auto data_padded = segment_onwire_bufferlist(data);
 
       // let's cipher allocate one huge buffer for entire ciphertext.
       session_stream_handlers.tx->reset_tx_handler({
-        f.payload.length(),
+        f.get_payload_segment().length(),
         front_padded.length(),
         middle_padded.length(),
         data_padded.length()
       });
 
-      ceph_assert(f.payload.length());
+      ceph_assert(f.get_payload_segment().length());
       session_stream_handlers.tx->authenticated_encrypt_update(
-        std::move(f.payload));
+        std::move(f.get_payload_segment()));
 
       // TODO: switch TxHandler from `bl&&` to `const bl&`.
       if (front.length()) {
@@ -707,22 +715,22 @@ struct MessageHeaderFrame
       }
 
       // auth tag will be appended at the end
-      f.payload = session_stream_handlers.tx->authenticated_encrypt_final();
+      f.get_payload_segment() = session_stream_handlers.tx->authenticated_encrypt_final();
     } else {
       epilogue_plain_block_t epilogue;
       ::memset(&epilogue, 0, sizeof(epilogue));
 
-      ceph::bufferlist::const_iterator hdriter(&f.payload, FRAME_PREAMBLE_SIZE);
+      ceph::bufferlist::const_iterator hdriter(&f.get_payload_segment(), FRAME_PREAMBLE_SIZE);
       epilogue.crc_values[SegmentIndex::Msg::HEADER] =
           hdriter.crc32c(hdriter.get_remaining(), -1);
       epilogue.crc_values[SegmentIndex::Msg::FRONT] = front.crc32c(-1);
       epilogue.crc_values[SegmentIndex::Msg::MIDDLE] = middle.crc32c(-1);
       epilogue.crc_values[SegmentIndex::Msg::DATA] = data.crc32c(-1);
 
-      f.payload.append(front);
-      f.payload.append(middle);
-      f.payload.append(data);
-      f.payload.append(reinterpret_cast<const char*>(&epilogue), sizeof(epilogue));
+      f.get_payload_segment().append(front);
+      f.get_payload_segment().append(middle);
+      f.get_payload_segment().append(data);
+      f.get_payload_segment().append(reinterpret_cast<const char*>(&epilogue), sizeof(epilogue));
     }
 
     return f;
@@ -737,7 +745,7 @@ struct MessageHeaderFrame
   inline ceph_msg_header2 &header() { return get_val<0>(); }
 
 protected:
-  using PayloadFrame::PayloadFrame;
+  using ControlFrame::ControlFrame;
 };
 
 } // namespace ceph::msgr::v2