]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: V2 uses segments instead of next_payload_len, part 1.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 14 Feb 2019 00:39:26 +0000 (01:39 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 21 Feb 2019 20:58:35 +0000 (21:58 +0100)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index ade751432da45b942b378112270b8df9861d3811..e8fedf729ef330c31d3d64b058d2fa13c74569d9 100644 (file)
@@ -118,10 +118,7 @@ static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) {
 
 static constexpr uint8_t CRYPTO_BLOCK_SIZE { 16 };
 
-struct segment_t {
-  __le32 length;
-  __le16 alignment;
-} __attribute__((packed));
+using segment_t = ProtocolV2::segment_t;
 
 struct preamble_main_t {
   static constexpr std::size_t MAX_NUM_SEGMENTS = 2;
@@ -129,14 +126,14 @@ struct preamble_main_t {
   __le16 crc;
   __u8 tag;
   __u8 num_segments;
-  std::array<segment_t, MAX_NUM_SEGMENTS> segments;
+  std::array<ProtocolV2::segment_t, MAX_NUM_SEGMENTS> segments;
 } __attribute__((packed));
 static_assert(sizeof(preamble_main_t) == CRYPTO_BLOCK_SIZE);
 static_assert(std::is_standard_layout<preamble_main_t>::value);
 
 struct preamble_extra_t {
   static constexpr std::size_t MAX_NUM_SEGMENTS = 2;
-  std::array<segment_t, MAX_NUM_SEGMENTS> segments;
+  std::array<ProtocolV2::segment_t, MAX_NUM_SEGMENTS> segments;
   std::array<__u8, 4> always_padding;
 } __attribute__((packed));
 static_assert(sizeof(preamble_extra_t) == CRYPTO_BLOCK_SIZE);
@@ -1452,6 +1449,19 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) {
     // everything through ::decode is unnecessary.
     next_payload_len = main_preamble.segments[0].length;
 
+    rx_segments_todo_rev.clear();
+    for (std::uint8_t idx = main_preamble.num_segments;
+        idx <= rx_segments_todo_rev.capacity() && idx > 0;
+        /* NOP */)
+    {
+      --idx;
+      ldout(cct, 10) << __func__ << " got new segment:"
+                    << " len=" << main_preamble.segments[idx].length
+                    << " align=" << main_preamble.segments[idx].alignment
+                    << dendl;
+      rx_segments_todo_rev.emplace_back(main_preamble.segments[idx]);
+    }
+
     // TODO: move this ugliness into dedicated procedure
     const auto rx_crc = ceph_crc16c(0,
       reinterpret_cast<const unsigned char*>(&main_preamble) + sizeof(main_preamble.crc),
@@ -1489,7 +1499,8 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() {
     case Tag::KEEPALIVE2:
     case Tag::KEEPALIVE2_ACK:
     case Tag::ACK:
-      return READ(next_payload_len, handle_frame_payload);
+      rx_segments_data.clear();
+      return read_frame_segment();
     case Tag::WAIT:
       return handle_wait();
     case Tag::MESSAGE:
@@ -1505,6 +1516,44 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() {
   return nullptr;
 }
 
+CtPtr ProtocolV2::read_frame_segment() {
+  ldout(cct, 20) << __func__ << dendl;
+  ceph_assert(!rx_segments_todo_rev.empty());
+
+#if 0
+  auto rx_buffer = ceph::buffer::ptr_node::create(
+    buffer::create_aligned(rx_segments_todo_rev.back().length,
+                          rx_segments_todo_rev.back().alignment));
+#else
+  auto rx_buffer = ceph::buffer::ptr_node::create(
+    buffer::create(rx_segments_todo_rev.back().length));
+#endif
+
+  rx_segments_todo_rev.pop_back();
+  rx_segments_data.emplace_back();
+  rx_segments_data.back().push_back(std::move(rx_buffer));
+  return READB(rx_segments_data.back().length(),
+              rx_segments_data.back().c_str(),
+              handle_frame_segment);
+}
+
+CtPtr ProtocolV2::handle_frame_segment(char *buffer, int r) {
+  ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+  if (r < 0) {
+    ldout(cct, 1) << __func__ << " read frame segment failed r=" << r << " ("
+                  << cpp_strerror(r) << ")" << dendl;
+    return _fault();
+  }
+
+  if (!rx_segments_todo_rev.empty()) {
+    return read_frame_segment();
+  } else {
+    // TODO: for makeshift only. This will be more generic and throttled
+    return handle_frame_payload(buffer, r);
+  }
+}
+
 CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
@@ -1514,47 +1563,50 @@ CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) {
     return _fault();
   }
 
+  ceph_assert(!rx_segments_data.empty());
+  const auto this_payload_len = rx_segments_data.back().length();
+
   ldout(cct, 30) << __func__ << "\n";
   bufferlist bl;
-  bl.append(buffer, next_payload_len);
+  bl.append(buffer, this_payload_len);
   bl.hexdump(*_dout);
   *_dout << dendl;
 
   switch (next_tag) {
     case Tag::HELLO:
-      return handle_hello(buffer, next_payload_len);
+      return handle_hello(buffer, this_payload_len);
     case Tag::AUTH_REQUEST:
-      return handle_auth_request(buffer, next_payload_len);
+      return handle_auth_request(buffer, this_payload_len);
     case Tag::AUTH_BAD_METHOD:
-      return handle_auth_bad_method(buffer, next_payload_len);
+      return handle_auth_bad_method(buffer, this_payload_len);
     case Tag::AUTH_REPLY_MORE:
-      return handle_auth_reply_more(buffer, next_payload_len);
+      return handle_auth_reply_more(buffer, this_payload_len);
     case Tag::AUTH_REQUEST_MORE:
-      return handle_auth_request_more(buffer, next_payload_len);
+      return handle_auth_request_more(buffer, this_payload_len);
     case Tag::AUTH_DONE:
-      return handle_auth_done(buffer, next_payload_len);
+      return handle_auth_done(buffer, this_payload_len);
     case Tag::CLIENT_IDENT:
-      return handle_client_ident(buffer, next_payload_len);
+      return handle_client_ident(buffer, this_payload_len);
     case Tag::SERVER_IDENT:
-      return handle_server_ident(buffer, next_payload_len);
+      return handle_server_ident(buffer, this_payload_len);
     case Tag::IDENT_MISSING_FEATURES:
-      return handle_ident_missing_features(buffer, next_payload_len);
+      return handle_ident_missing_features(buffer, this_payload_len);
     case Tag::SESSION_RECONNECT:
-      return handle_reconnect(buffer, next_payload_len);
+      return handle_reconnect(buffer, this_payload_len);
     case Tag::SESSION_RESET:
-      return handle_session_reset(buffer, next_payload_len);
+      return handle_session_reset(buffer, this_payload_len);
     case Tag::SESSION_RETRY:
-      return handle_session_retry(buffer, next_payload_len);
+      return handle_session_retry(buffer, this_payload_len);
     case Tag::SESSION_RETRY_GLOBAL:
-      return handle_session_retry_global(buffer, next_payload_len);
+      return handle_session_retry_global(buffer, this_payload_len);
     case Tag::SESSION_RECONNECT_OK:
-      return handle_reconnect_ok(buffer, next_payload_len);
+      return handle_reconnect_ok(buffer, this_payload_len);
     case Tag::KEEPALIVE2:
-      return handle_keepalive2(buffer, next_payload_len);
+      return handle_keepalive2(buffer, this_payload_len);
     case Tag::KEEPALIVE2_ACK:
-      return handle_keepalive2_ack(buffer, next_payload_len);
+      return handle_keepalive2_ack(buffer, this_payload_len);
     case Tag::ACK:
-      return handle_message_ack(buffer, next_payload_len);
+      return handle_message_ack(buffer, this_payload_len);
     default:
       ceph_abort();
   }
index 1514e199b60f29375ed3c7149fdf7781d341e200..38c7cee04bdb6b883ae51eb90ac2eff243333bee 100644 (file)
@@ -4,6 +4,8 @@
 #ifndef _MSG_ASYNC_PROTOCOL_V2_
 #define _MSG_ASYNC_PROTOCOL_V2_
 
+#include <boost/container/static_vector.hpp>
+
 #include "Protocol.h"
 #include "crypto_onwire.h"
 
@@ -102,6 +104,24 @@ private:
   Ct<ProtocolV2> *bannerExchangeCallback;
 
   uint32_t next_payload_len;
+
+public:
+  struct segment_t {
+    __le32 length;
+    __le16 alignment;
+  } __attribute__((packed));
+
+private:
+  static constexpr std::size_t MAX_NUM_SEGMENTS = 4;
+  // segment descriptors are stored in reversed order. This is because
+  // vectors don't support ::pop_front.  We might want to exchange
+  // the container to slightly tuned one in the future.
+  boost::container::static_vector<segment_t,
+                                 MAX_NUM_SEGMENTS> rx_segments_todo_rev;
+  boost::container::static_vector<ceph::bufferlist,
+                                 MAX_NUM_SEGMENTS> rx_segments_data;
+
+
   Tag next_tag;
   ceph_msg_header2 current_header;
   utime_t backoff;  // backoff time
@@ -148,7 +168,7 @@ private:
 
   CONTINUATION_DECL(ProtocolV2, read_frame);
   READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_preamble_main);
-  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_frame_payload);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_frame_segment);
   READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_header);
   CONTINUATION_DECL(ProtocolV2, throttle_message);
   CONTINUATION_DECL(ProtocolV2, throttle_bytes);
@@ -162,6 +182,8 @@ private:
   Ct<ProtocolV2> *read_frame();
   Ct<ProtocolV2> *handle_read_frame_preamble_main(char *buffer, int r);
   Ct<ProtocolV2> *handle_read_frame_dispatch();
+  Ct<ProtocolV2> *read_frame_segment();
+  Ct<ProtocolV2> *handle_frame_segment(char *buffer, int r);
   Ct<ProtocolV2> *handle_frame_payload(char *buffer, int r);
 
   Ct<ProtocolV2> *ready();