]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async: initial multi-segment support for V2.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Fri, 15 Feb 2019 02:09:19 +0000 (03:09 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 21 Feb 2019 20:58:37 +0000 (21:58 +0100)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index 74e45fbe2a7b9adb126ed9e2d9f34e92dbf26c92..e6a6a9a000a85fa406ec3d5a3b7e4c05de751b90 100644 (file)
@@ -544,7 +544,10 @@ struct MessageHeaderFrame
   {
     // FIXME: plainsize -> ciphersize; for AES-GCM they are equall apart from auth tag size
     fill_preamble({
-      segment_t { this->payload.length() + front_len + middle_len + data_len + 16 - FRAME_PREAMBLE_SIZE, 16 },
+      segment_t{ this->payload.length() - FRAME_PREAMBLE_SIZE, 8 },
+      segment_t{ front_len, 8 },
+      segment_t{ middle_len, 8 },
+      segment_t{ data_len + 16, segment_t::DEFERRED_ALLOCATION },
     }, {});
   }
 
@@ -1446,13 +1449,13 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) {
     next_tag = static_cast<Tag>(main_preamble.tag);
 
     // FIXME: makeshift solution
-    ceph_assert_always(main_preamble.num_segments == 1);
+    ceph_assert_always(main_preamble.num_segments == 1 ||
+      main_preamble.num_segments == 4);
 
     // I expect ceph_le32 will make the endian conversion for me. Passing
     // everything through ::decode is unnecessary.
-    next_payload_len = main_preamble.segments[0].length;
-
     rx_segments_todo_rev.clear();
+    next_payload_len = 0;
     for (std::uint8_t idx = main_preamble.num_segments;
         idx <= rx_segments_todo_rev.capacity() && idx > 0;
         /* NOP */)
@@ -1463,6 +1466,7 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) {
                     << " align=" << main_preamble.segments[idx].alignment
                     << dendl;
       rx_segments_todo_rev.emplace_back(main_preamble.segments[idx]);
+      next_payload_len += main_preamble.segments[idx].length;
     }
 
     // TODO: move this ugliness into dedicated procedure
@@ -1477,7 +1481,8 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) {
     }
   }
 
-  return handle_read_frame_dispatch();
+  rx_segments_data.clear();
+  return read_frame_segment();
 }
 
 CtPtr ProtocolV2::handle_read_frame_dispatch() {
@@ -1502,8 +1507,7 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() {
     case Tag::KEEPALIVE2:
     case Tag::KEEPALIVE2_ACK:
     case Tag::ACK:
-      rx_segments_data.clear();
-      return read_frame_segment();
+      return handle_frame_payload();
     case Tag::WAIT:
       return handle_wait();
     case Tag::MESSAGE:
@@ -1532,15 +1536,20 @@ CtPtr ProtocolV2::read_frame_segment() {
     buffer::create(rx_segments_todo_rev.back().length));
 #endif
 
+  if (rx_segments_todo_rev.back().alignment == 0) {
+    rx_segments_data.emplace_back();
+    return handle_read_frame_dispatch();
+  }
+
   rx_segments_todo_rev.pop_back();
   rx_segments_data.emplace_back();
   rx_segments_data.back().push_back(std::move(rx_buffer));
   return READB(rx_segments_data.back().length(),
               rx_segments_data.back().c_str(),
-              handle_frame_segment);
+              handle_read_frame_segment);
 }
 
-CtPtr ProtocolV2::handle_frame_segment(char *buffer, int r) {
+CtPtr ProtocolV2::handle_read_frame_segment(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
@@ -1553,20 +1562,13 @@ CtPtr ProtocolV2::handle_frame_segment(char *buffer, int r) {
     return read_frame_segment();
   } else {
     // TODO: for makeshift only. This will be more generic and throttled
-    return handle_frame_payload(buffer, r);
+    return handle_read_frame_dispatch();
   }
 }
 
-CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " read frame payload failed r=" << r << " ("
-                  << cpp_strerror(r) << ")" << dendl;
-    return _fault();
-  }
-
+CtPtr ProtocolV2::handle_frame_payload() {
   ceph_assert(!rx_segments_data.empty());
+  auto* buffer = rx_segments_data.back().c_str();
   const auto this_payload_len = rx_segments_data.back().length();
 
   ldout(cct, 30) << __func__ << "\n";
@@ -1653,6 +1655,7 @@ CtPtr ProtocolV2::ready() {
 CtPtr ProtocolV2::handle_message() {
   ldout(cct, 20) << __func__ << dendl;
 
+  ceph_assert(rx_segments_data.size() == 4);
   ceph_assert(state == READY);
 
 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
@@ -1660,36 +1663,23 @@ CtPtr ProtocolV2::handle_message() {
 #endif
   recv_stamp = ceph_clock_now();
 
-  const uint32_t header_len = calculate_payload_size(
-    session_security.rx.get(), sizeof(ceph_msg_header2));
-  return READ(header_len, handle_message_header);
-}
-
-CtPtr ProtocolV2::handle_message_header(char *buffer, int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " read message header failed" << dendl;
-    return _fault();
-  }
-
-  const uint32_t header_len = calculate_payload_size(
-    session_security.rx.get(), sizeof(ceph_msg_header2));
-
-  ceph::bufferlist text;
-  text.push_back(buffer::create_static(header_len, buffer));
+  // TODO: move crypto processing to segment reader
   if (auth_meta->is_mode_secure()) {
     ceph_assert(session_stream_handlers.rx);
 
-    text = session_stream_handlers.rx->authenticated_decrypt_update(
-      std::move(text), 8);
+    rx_segments_data[0] = \
+      session_stream_handlers.rx->authenticated_decrypt_update(
+       std::move(rx_segments_data[0]), 8);
   }
-  MessageHeaderFrame header_frame(std::move(text));
+  MessageHeaderFrame header_frame(std::move(rx_segments_data[0]));
   ceph_msg_header2 &header = header_frame.header();
 
-  ldout(cct, 20) << __func__ << " got envelope type=" << header.type << " src "
-                 << peer_name << " front=" << header.front_len
-                 << " data=" << header.data_len << " off " << header.data_off
+  ldout(cct, 20) << __func__
+                << " got envelope type=" << header.type
+                << " src " << peer_name
+                << " front=" << header.front_len
+                 << " data=" << header.data_len
+                << " off " << header.data_off
                  << dendl;
 
   if (messenger->crcflags & MSG_CRC_HEADER) {
@@ -1714,171 +1704,29 @@ CtPtr ProtocolV2::handle_message_header(char *buffer, int r) {
   extra.clear();
   current_header = header;
 
-  next_payload_len -= header_len;
-
 #if 0
   state = THROTTLE_MESSAGE;
   return CONTINUE(throttle_message);
-#else
-  state = READ_MESSAGE_FRONT;
-  return read_message_front();
 #endif
-}
-
-CtPtr ProtocolV2::throttle_message() {
-  ldout(cct, 20) << __func__ << dendl;
-
-  if (connection->policy.throttler_messages) {
-    ldout(cct, 10) << __func__ << " wants " << 1
-                   << " message from policy throttler "
-                   << connection->policy.throttler_messages->get_current()
-                   << "/" << connection->policy.throttler_messages->get_max()
-                   << dendl;
-    if (!connection->policy.throttler_messages->get_or_fail()) {
-      ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
-                     << connection->policy.throttler_messages->get_current()
-                     << "/" << connection->policy.throttler_messages->get_max()
-                     << " failed, just wait." << dendl;
-      // following thread pool deal with th full message queue isn't a
-      // short time, so we can wait a ms.
-      if (connection->register_time_events.empty()) {
-        connection->register_time_events.insert(
-            connection->center->create_time_event(1000,
-                                                  connection->wakeup_handler));
-      }
-      return nullptr;
-    }
-  }
-
-  state = THROTTLE_BYTES;
-  return CONTINUE(throttle_bytes);
-}
-
-CtPtr ProtocolV2::throttle_bytes() {
-  ldout(cct, 20) << __func__ << dendl;
-
-  uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
-                          current_header.data_len;
-  if (cur_msg_size) {
-    if (connection->policy.throttler_bytes) {
-      ldout(cct, 10) << __func__ << " wants " << cur_msg_size
-                     << " bytes from policy throttler "
-                     << connection->policy.throttler_bytes->get_current() << "/"
-                     << connection->policy.throttler_bytes->get_max() << dendl;
-      if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
-        ldout(cct, 10) << __func__ << " wants " << cur_msg_size
-                       << " bytes from policy throttler "
-                       << connection->policy.throttler_bytes->get_current()
-                       << "/" << connection->policy.throttler_bytes->get_max()
-                       << " failed, just wait." << dendl;
-        // following thread pool deal with th full message queue isn't a
-        // short time, so we can wait a ms.
-        if (connection->register_time_events.empty()) {
-          connection->register_time_events.insert(
-              connection->center->create_time_event(
-                  1000, connection->wakeup_handler));
-        }
-        return nullptr;
-      }
-    }
-  }
 
-  state = THROTTLE_DISPATCH_QUEUE;
-  return CONTINUE(throttle_dispatch_queue);
-}
+  // front
+  ceph_assert(current_header.front_len == rx_segments_data[1].length());
+  ceph_assert(!front.length());
+  front = std::move(rx_segments_data[1]);
 
-CtPtr ProtocolV2::throttle_dispatch_queue() {
-  ldout(cct, 20) << __func__ << dendl;
-
-  uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
-                          current_header.data_len;
-
-  if (cur_msg_size) {
-    if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
-            cur_msg_size)) {
-      ldout(cct, 10)
-          << __func__ << " wants " << cur_msg_size
-          << " bytes from dispatch throttle "
-          << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
-          << connection->dispatch_queue->dispatch_throttler.get_max()
-          << " failed, just wait." << dendl;
-      // following thread pool deal with th full message queue isn't a
-      // short time, so we can wait a ms.
-      if (connection->register_time_events.empty()) {
-        connection->register_time_events.insert(
-            connection->center->create_time_event(1000,
-                                                  connection->wakeup_handler));
-      }
-      return nullptr;
-    }
-  }
-
-  throttle_stamp = ceph_clock_now();
-
-  state = READ_MESSAGE_FRONT;
-  return read_message_front();
-}
-
-CtPtr ProtocolV2::read_message_front() {
-  ldout(cct, 20) << __func__ << dendl;
-
-  unsigned front_len = current_header.front_len;
-  if (front_len) {
-    if (!front.length()) {
-      front.push_back(buffer::create(front_len));
-    }
-    return READB(front_len, front.c_str(), handle_message_front);
-  }
-  return read_message_middle();
-}
-
-CtPtr ProtocolV2::handle_message_front(char *buffer, int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " read message front failed" << dendl;
-    return _fault();
-  }
-
-  ldout(cct, 20) << __func__ << " got front " << front.length() << dendl;
-
-  next_payload_len -= current_header.front_len;
-
-  return read_message_middle();
-}
-
-CtPtr ProtocolV2::read_message_middle() {
-  ldout(cct, 20) << __func__ << dendl;
-
-  if (current_header.middle_len) {
-    if (!middle.length()) {
-      middle.push_back(buffer::create(current_header.middle_len));
-    }
-    return READB(current_header.middle_len, middle.c_str(),
-                 handle_message_middle);
-  }
-
-  return read_message_data_prepare();
-}
-
-CtPtr ProtocolV2::handle_message_middle(char *buffer, int r) {
-  ldout(cct, 20) << __func__ << " r" << r << dendl;
-
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " read message middle failed" << dendl;
-    return _fault();
-  }
-
-  ldout(cct, 20) << __func__ << " got middle " << middle.length() << dendl;
-
-  next_payload_len -= current_header.middle_len;
+  // middle
+  ceph_assert(current_header.middle_len == rx_segments_data[2].length());
+  ceph_assert(!middle.length());
+  middle = std::move(rx_segments_data[2]);
 
+  next_payload_len -= sizeof(ceph_msg_header2);
+  next_payload_len -= front.length();
+  next_payload_len -= middle.length();
   return read_message_data_prepare();
 }
 
 CtPtr ProtocolV2::read_message_data_prepare() {
   ldout(cct, 20) << __func__ << dendl;
-
   unsigned data_len = le32_to_cpu(current_header.data_len);
   unsigned data_off = le32_to_cpu(current_header.data_off);
 
@@ -1967,9 +1815,11 @@ CtPtr ProtocolV2::handle_message_extra_bytes(char *buffer, int r) {
 CtPtr ProtocolV2::handle_message_complete() {
   ldout(cct, 20) << __func__ << dendl;
 
-  ldout(cct, 5) << __func__ << " got " << front.length() << " + "
-                << middle.length() << " + " << data.length() << " byte message"
-                << dendl;
+  ldout(cct, 5) << __func__
+               << " got " << front.length()
+               << " + " << middle.length()
+               << " + " << data.length()
+               << " byte message" << dendl;
 
   ceph_msg_header header{current_header.seq,
                          current_header.tid,
@@ -2145,6 +1995,101 @@ CtPtr ProtocolV2::handle_message_complete() {
   return CONTINUE(read_frame);
 }
 
+
+CtPtr ProtocolV2::throttle_message() {
+  ldout(cct, 20) << __func__ << dendl;
+
+  if (connection->policy.throttler_messages) {
+    ldout(cct, 10) << __func__ << " wants " << 1
+                   << " message from policy throttler "
+                   << connection->policy.throttler_messages->get_current()
+                   << "/" << connection->policy.throttler_messages->get_max()
+                   << dendl;
+    if (!connection->policy.throttler_messages->get_or_fail()) {
+      ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
+                     << connection->policy.throttler_messages->get_current()
+                     << "/" << connection->policy.throttler_messages->get_max()
+                     << " failed, just wait." << dendl;
+      // following thread pool deal with th full message queue isn't a
+      // short time, so we can wait a ms.
+      if (connection->register_time_events.empty()) {
+        connection->register_time_events.insert(
+            connection->center->create_time_event(1000,
+                                                  connection->wakeup_handler));
+      }
+      return nullptr;
+    }
+  }
+
+  state = THROTTLE_BYTES;
+  return CONTINUE(throttle_bytes);
+}
+
+CtPtr ProtocolV2::throttle_bytes() {
+  ldout(cct, 20) << __func__ << dendl;
+
+  uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
+                          current_header.data_len;
+  if (cur_msg_size) {
+    if (connection->policy.throttler_bytes) {
+      ldout(cct, 10) << __func__ << " wants " << cur_msg_size
+                     << " bytes from policy throttler "
+                     << connection->policy.throttler_bytes->get_current() << "/"
+                     << connection->policy.throttler_bytes->get_max() << dendl;
+      if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
+        ldout(cct, 10) << __func__ << " wants " << cur_msg_size
+                       << " bytes from policy throttler "
+                       << connection->policy.throttler_bytes->get_current()
+                       << "/" << connection->policy.throttler_bytes->get_max()
+                       << " failed, just wait." << dendl;
+        // following thread pool deal with th full message queue isn't a
+        // short time, so we can wait a ms.
+        if (connection->register_time_events.empty()) {
+          connection->register_time_events.insert(
+              connection->center->create_time_event(
+                  1000, connection->wakeup_handler));
+        }
+        return nullptr;
+      }
+    }
+  }
+
+  state = THROTTLE_DISPATCH_QUEUE;
+  return CONTINUE(throttle_dispatch_queue);
+}
+
+CtPtr ProtocolV2::throttle_dispatch_queue() {
+  ldout(cct, 20) << __func__ << dendl;
+
+  uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
+                          current_header.data_len;
+
+  if (cur_msg_size) {
+    if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
+            cur_msg_size)) {
+      ldout(cct, 10)
+          << __func__ << " wants " << cur_msg_size
+          << " bytes from dispatch throttle "
+          << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
+          << connection->dispatch_queue->dispatch_throttler.get_max()
+          << " failed, just wait." << dendl;
+      // following thread pool deal with th full message queue isn't a
+      // short time, so we can wait a ms.
+      if (connection->register_time_events.empty()) {
+        connection->register_time_events.insert(
+            connection->center->create_time_event(1000,
+                                                  connection->wakeup_handler));
+      }
+      return nullptr;
+    }
+  }
+
+  throttle_stamp = ceph_clock_now();
+
+  state = READ_MESSAGE_FRONT;
+  return read_message_data_prepare();
+}
+
 CtPtr ProtocolV2::handle_keepalive2(char *payload, uint32_t length) {
   ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
 
index 38c7cee04bdb6b883ae51eb90ac2eff243333bee..8d556e08ab26e6fc7e69c889d169b973bb7af6c3 100644 (file)
@@ -107,6 +107,7 @@ private:
 
 public:
   struct segment_t {
+    static constexpr __le16 DEFERRED_ALLOCATION { 0x0000 };
     __le32 length;
     __le16 alignment;
   } __attribute__((packed));
@@ -168,13 +169,10 @@ private:
 
   CONTINUATION_DECL(ProtocolV2, read_frame);
   READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_preamble_main);
-  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_frame_segment);
-  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_header);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_segment);
   CONTINUATION_DECL(ProtocolV2, throttle_message);
   CONTINUATION_DECL(ProtocolV2, throttle_bytes);
   CONTINUATION_DECL(ProtocolV2, throttle_dispatch_queue);
-  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_front);
-  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_middle);
   CONTINUATION_DECL(ProtocolV2, read_message_data);
   READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_data);
   READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_extra_bytes);
@@ -183,20 +181,15 @@ private:
   Ct<ProtocolV2> *handle_read_frame_preamble_main(char *buffer, int r);
   Ct<ProtocolV2> *handle_read_frame_dispatch();
   Ct<ProtocolV2> *read_frame_segment();
-  Ct<ProtocolV2> *handle_frame_segment(char *buffer, int r);
-  Ct<ProtocolV2> *handle_frame_payload(char *buffer, int r);
+  Ct<ProtocolV2> *handle_read_frame_segment(char *buffer, int r);
+  Ct<ProtocolV2> *handle_frame_payload();
 
   Ct<ProtocolV2> *ready();
 
   Ct<ProtocolV2> *handle_message();
-  Ct<ProtocolV2> *handle_message_header(char *buffer, int r);
   Ct<ProtocolV2> *throttle_message();
   Ct<ProtocolV2> *throttle_bytes();
   Ct<ProtocolV2> *throttle_dispatch_queue();
-  Ct<ProtocolV2> *read_message_front();
-  Ct<ProtocolV2> *handle_message_front(char *buffer, int r);
-  Ct<ProtocolV2> *read_message_middle();
-  Ct<ProtocolV2> *handle_message_middle(char *buffer, int r);
   Ct<ProtocolV2> *read_message_data_prepare();
   Ct<ProtocolV2> *read_message_data();
   Ct<ProtocolV2> *handle_message_data(char *buffer, int r);