]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async, v2: decouple onwire segment length from logical length.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Tue, 19 Feb 2019 20:56:25 +0000 (21:56 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 21 Feb 2019 22:31:03 +0000 (23:31 +0100)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index 4b657f49c0f90644f4625da7db717f689c966d0e..485f0ef68bc7b9058a1bf262e0c58a6361466803 100644 (file)
@@ -706,9 +706,9 @@ void ProtocolV2::reset_recv_state() {
   connection->pendingReadLen.reset();
   connection->writeCallback.reset();
 
-  uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \
-                         rx_segments_todo_rev[1].length + \
-                         rx_segments_todo_rev[0].length;
+  uint32_t cur_msg_size = rx_segments_desc[1].logical.length + \
+                         rx_segments_desc[2].logical.length + \
+                         rx_segments_desc[3].logical.length;
 
   if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
       connection->policy.throttler_messages) {
@@ -1191,6 +1191,10 @@ bool ProtocolV2::is_queued() {
   return !out_queue.empty() || connection->is_queued();
 }
 
+uint32_t ProtocolV2::get_onwire_size(uint32_t logical_size) const {
+  return logical_size;
+}
+
 CtPtr ProtocolV2::read(CONTINUATION_PARAM(next, ProtocolV2, char *, int),
                        int len, char *buffer) {
   if (!buffer) {
@@ -1440,6 +1444,8 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) {
   }
 
   {
+    // I expect ceph_le32 will make the endian conversion for me. Passing
+    // everything through ::decode is unnecessary.
     const auto& main_preamble = \
       reinterpret_cast<preamble_block_t&>(*preamble.c_str());
     next_tag = static_cast<Tag>(main_preamble.tag);
@@ -1448,25 +1454,30 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) {
     ceph_assert_always(main_preamble.num_segments == 1 ||
       main_preamble.num_segments == 4);
 
-    // I expect ceph_le32 will make the endian conversion for me. Passing
-    // everything through ::decode is unnecessary.
-    rx_segments_todo_rev.clear();
+    rx_segments_desc.clear();
+    rx_segments_data.clear();
     next_payload_len = 0;
-    for (std::uint8_t idx = main_preamble.num_segments;
-        idx <= rx_segments_todo_rev.capacity() && idx > 0;
-        /* NOP */)
-    {
-      --idx;
+
+    if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
+      ldout(cct, 30) << __func__
+                    << " num_segments=" << main_preamble.num_segments
+                    << " is too much" << dendl;
+      return _fault();
+    }
+    for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
       ldout(cct, 10) << __func__ << " got new segment:"
                     << " len=" << main_preamble.segments[idx].length
                     << " align=" << main_preamble.segments[idx].alignment
                     << dendl;
-      rx_segments_todo_rev.emplace_back(main_preamble.segments[idx]);
-      next_payload_len += main_preamble.segments[idx].length;
+      rx_segments_desc.emplace_back(onwire_segment_t{
+       get_onwire_size(main_preamble.segments[idx].length),
+       main_preamble.segments[idx]
+      });
+      next_payload_len += rx_segments_desc.back().onwire_length;
     }
 
     if (session_stream_handlers.rx) {
-      rx_segments_todo_rev.front().length += \
+      rx_segments_desc.back().onwire_length += \
        session_stream_handlers.rx->get_extra_size_at_final();
       next_payload_len += \
        session_stream_handlers.rx->get_extra_size_at_final();
@@ -1489,7 +1500,6 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) {
     state = THROTTLE_MESSAGE;
     return CONTINUE(throttle_message);
   } else {
-    rx_segments_data.clear();
     return read_frame_segment();
   }
 }
@@ -1534,28 +1544,36 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() {
 
 CtPtr ProtocolV2::read_frame_segment() {
   ldout(cct, 20) << __func__ << dendl;
-  ceph_assert(!rx_segments_todo_rev.empty());
+  ceph_assert(!rx_segments_desc.empty());
 
-#if 0
-  auto rx_buffer = ceph::buffer::ptr_node::create(
-    buffer::create_aligned(rx_segments_todo_rev.back().length,
-                          rx_segments_todo_rev.back().alignment));
-#else
-  auto rx_buffer = ceph::buffer::ptr_node::create(
-    buffer::create(rx_segments_todo_rev.back().length));
-#endif
+  // description of current segment to read
+  const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
 
-  if (rx_segments_todo_rev.back().alignment == 0) {
-    rx_segments_data.emplace_back();
+  if (cur_rx_desc.logical.alignment == segment_t::DEFERRED_ALLOCATION) {
+    // This is a special case for the rx_buffers zero-copy optimization
+    // used for Message's data field. It might be dangerous and will be
+    // ultimately replaced by `allocation policies`.
+    rx_segments_data.emplace_back(ceph::bufferlist{});
     return handle_read_frame_dispatch();
   }
 
-  rx_segments_todo_rev.pop_back();
+  std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer> rx_buffer;
+  try {
+    rx_buffer = ceph::buffer::ptr_node::create(buffer::create_aligned(
+      cur_rx_desc.onwire_length, cur_rx_desc.logical.alignment));
+  } catch (std::bad_alloc&) {
+    // Catching because of potential issues with satisfying alignment.
+    ldout(cct, 20) << __func__ << " can't allocate aligned rx_buffer "
+                  << " len=" << cur_rx_desc.onwire_length
+                  << " align=" << cur_rx_desc.logical.alignment
+                  << dendl;
+    return _fault();
+  }
+
   rx_segments_data.emplace_back();
   rx_segments_data.back().push_back(std::move(rx_buffer));
-  return READB(rx_segments_data.back().length(),
-              rx_segments_data.back().c_str(),
-              handle_read_frame_segment);
+  return READB(rx_segments_data.back().length(), rx_segments_data.back().c_str(),
+    handle_read_frame_segment);
 }
 
 CtPtr ProtocolV2::handle_read_frame_segment(char *buffer, int r) {
@@ -1567,11 +1585,12 @@ CtPtr ProtocolV2::handle_read_frame_segment(char *buffer, int r) {
     return _fault();
   }
 
-  if (!rx_segments_todo_rev.empty()) {
-    return read_frame_segment();
+  if (rx_segments_desc.size() == rx_segments_data.size()) {
+    // OK, all segments planned to read are read. Can go with dispatch.
+    return handle_read_frame_dispatch();
   } else {
     // TODO: for makeshift only. This will be more generic and throttled
-    return handle_read_frame_dispatch();
+    return read_frame_segment();
   }
 }
 
@@ -2028,10 +2047,10 @@ CtPtr ProtocolV2::throttle_message() {
 CtPtr ProtocolV2::throttle_bytes() {
   ldout(cct, 20) << __func__ << dendl;
 
-  ceph_assert(rx_segments_todo_rev.size() == 4);
-  uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \
-                         rx_segments_todo_rev[1].length + \
-                         rx_segments_todo_rev[0].length;
+  ceph_assert(rx_segments_desc.size() == 4);
+  uint32_t cur_msg_size = rx_segments_desc[1].logical.length + \
+                         rx_segments_desc[2].logical.length + \
+                         rx_segments_desc[3].logical.length;
   if (cur_msg_size) {
     if (connection->policy.throttler_bytes) {
       ldout(cct, 10) << __func__ << " wants " << cur_msg_size
@@ -2063,10 +2082,10 @@ CtPtr ProtocolV2::throttle_bytes() {
 CtPtr ProtocolV2::throttle_dispatch_queue() {
   ldout(cct, 20) << __func__ << dendl;
 
-  ceph_assert(rx_segments_todo_rev.size() == 4);
-  uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \
-                         rx_segments_todo_rev[1].length + \
-                         rx_segments_todo_rev[0].length;
+  ceph_assert(rx_segments_desc.size() == 4);
+  uint32_t cur_msg_size = rx_segments_desc[1].logical.length + \
+                         rx_segments_desc[2].logical.length + \
+                         rx_segments_desc[3].logical.length;
 
   if (cur_msg_size) {
     if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
@@ -2091,7 +2110,6 @@ CtPtr ProtocolV2::throttle_dispatch_queue() {
   throttle_stamp = ceph_clock_now();
   state = THROTTLE_DONE;
 
-  rx_segments_data.clear();
   return read_frame_segment();
 }
 
index 00b8e3bdd5e511dd3c5eb8bb44e1ac420485a611..8c8e4340df0304ec15fd829ba78b92e4f5e75d68 100644 (file)
@@ -117,13 +117,21 @@ public:
     __le16 alignment;
   } __attribute__((packed));
 
+  struct onwire_segment_t {
+    // crypto-processed segment can be expanded on-wire because of:
+    //  * padding to achieve CRYPTO_BLOCK_SIZE alignment,
+    //  * authentication tag. It's appended at the end of message.
+    //    See RxHandler::get_extra_size_at_final().
+    __le32 onwire_length;
+
+    struct segment_t logical;
+  } __attribute__((packed));
+
 private:
   static constexpr std::size_t MAX_NUM_SEGMENTS = 4;
-  // segment descriptors are stored in reversed order. This is because
-  // vectors don't support ::pop_front.  We might want to exchange
-  // the container to slightly tuned one in the future.
-  boost::container::static_vector<segment_t,
-                                 MAX_NUM_SEGMENTS> rx_segments_todo_rev;
+
+  boost::container::static_vector<onwire_segment_t,
+                                 MAX_NUM_SEGMENTS> rx_segments_desc;
   boost::container::static_vector<ceph::bufferlist,
                                  MAX_NUM_SEGMENTS> rx_segments_data;
 
@@ -269,6 +277,8 @@ private:
   Ct<ProtocolV2> *send_server_ident();
   Ct<ProtocolV2> *send_reconnect_ok();
   Ct<ProtocolV2> *server_ready();
+
+  uint32_t get_onwire_size(uint32_t logical_size) const;
 };
 
 #endif /* _MSG_ASYNC_PROTOCOL_V2_ */