]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async, v2: drop support for the buggy rx_buffers mechanism.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Fri, 1 Mar 2019 17:33:25 +0000 (18:33 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Fri, 1 Mar 2019 17:41:23 +0000 (18:41 +0100)
See:
  * https://github.com/ceph/ceph/pull/26696,
  * http://tracker.ceph.com/issues/22480.

Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h
src/msg/async/frames_v2.h

index ebf0aa308cc8e554f4bd202f98f6af76e7eb2a7d..20b3f8d151c08cb4fbd8c52b3cc88680e8defe42 100644 (file)
@@ -1103,8 +1103,7 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() {
     case Tag::WAIT:
       return handle_frame_payload();
     case Tag::MESSAGE:
-      // see the comment in ::read_frame_segment().
-      return handle_message_complete();
+      return handle_message();
     default: {
       lderr(cct) << __func__
                  << " received unknown tag=" << static_cast<uint32_t>(next_tag)
@@ -1122,35 +1121,6 @@ CtPtr ProtocolV2::read_frame_segment() {
 
   // description of current segment to read
   const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
-
-  if (cur_rx_desc.alignment == segment_t::DEFERRED_ALLOCATION) {
-    // This is a special case for the rx_buffers zero-copy optimization
-    // used for Message's data field. It might be dangerous and will be
-    // ultimately replaced by `allocation policies`.
-    rx_segments_data.emplace_back(ceph::bufferlist{});
-
-    // XXX: for the sake of unified epilogue handling this becomes even
-    // uglier. We are doing early dispatch of Messages now. The overall
-    // idea is to:
-    //   1. parse ceph_msg_header2 which let us know tid, and thus pick
-    //      up appropriate rx_buffer (early dispatch aka pre-dispatch).
-    //   2. Read data field into selected rx_buffer.
-    //   3. REUNIFY WITH THE MAIN FLOW: read and handle frame epilogue.
-    //   4. Do ::handle_read_frame_dispatch() as for any kind of frame.
-    //      For messages it wll call ::handle_message_complete().
-#if 0
-    ceph_assert_always(next_tag == Tag::MESSAGE);
-#else
-    if (next_tag != Tag::MESSAGE) {
-      ldout(cct, 20) << __func__
-                    << " only message can use DEFERRED_ALLOCATION"
-                    << dendl;
-      return _fault();
-    }
-#endif
-    return handle_message();
-  }
-
   std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer> rx_buffer;
   try {
     rx_buffer = ceph::buffer::ptr_node::create(buffer::create_aligned(
@@ -1285,93 +1255,6 @@ CtPtr ProtocolV2::ready() {
   return CONTINUE(read_frame);
 }
 
-CtPtr ProtocolV2::handle_message() {
-  ldout(cct, 20) << __func__ << dendl;
-
-  ceph_assert(rx_segments_data.size() == 4);
-  ceph_assert(state == THROTTLE_DONE);
-
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
-  ltt_recv_stamp = ceph_clock_now();
-#endif
-  recv_stamp = ceph_clock_now();
-
-  auto header_frame = MessageHeaderFrame::Decode(
-    std::move(rx_segments_data[SegmentIndex::Msg::HEADER]));
-  ceph_msg_header2 &header = header_frame.header();
-
-  ldout(cct, 20) << __func__
-                << " got envelope type=" << header.type
-                << " src " << peer_name
-                << " off " << header.data_off
-                 << dendl;
-
-  INTERCEPT(16);
-
-  // Reset state
-  data_buf.clear();
-
-  current_header = header;
-  return read_message_data_prepare();
-}
-
-CtPtr ProtocolV2::read_message_data_prepare() {
-  ldout(cct, 20) << __func__ << dendl;
-
-  const auto data_len = \
-    rx_segments_desc[SegmentIndex::Msg::DATA].length;
-  const unsigned data_off = le32_to_cpu(current_header.data_off);
-
-  if (data_len) {
-    // get a buffer
-    map<ceph_tid_t, pair<bufferlist, int>>::iterator p =
-        connection->rx_buffers.find(current_header.tid);
-    if (p != connection->rx_buffers.end()) {
-      ldout(cct, 10) << __func__ << " seleting rx buffer v " << p->second.second
-                     << " at offset " << data_off << " len "
-                     << p->second.first.length() << dendl;
-      data_buf = p->second.first;
-      // make sure it's big enough
-      if (data_buf.length() < data_len)
-        data_buf.push_back(buffer::create(data_len - data_buf.length()));
-      data_blp = data_buf.begin();
-    } else {
-      ldout(cct, 20) << __func__ << " allocating new rx buffer at offset "
-                     << data_off << dendl;
-      alloc_aligned_buffer(data_buf, data_len, data_off);
-      data_blp = data_buf.begin();
-    }
-  }
-
-  msg_left = data_len;
-
-  return CONTINUE(read_message_data);
-}
-
-CtPtr ProtocolV2::read_message_data() {
-  ldout(cct, 20) << __func__ << " msg_left=" << msg_left << dendl;
-
-  if (msg_left > 0) {
-    bufferptr bp = data_blp.get_current_ptr();
-    unsigned read_len = std::min(bp.length(), msg_left);
-
-    return READB(read_len, bp.c_str(), handle_message_data);
-  }
-
-  // FIXME: if (auth_meta->is_mode_secure()) {
-  //  ceph_assert(session_stream_handlers.rx);
-  if (session_stream_handlers.rx && \
-      rx_segments_data[SegmentIndex::Msg::DATA].length()) {
-    rx_segments_data[SegmentIndex::Msg::DATA] =
-        session_stream_handlers.rx->authenticated_decrypt_update(
-           std::move(rx_segments_data[SegmentIndex::Msg::DATA]),
-            segment_t::DEFAULT_ALIGNMENT);
-  }
-
-  state = READ_MESSAGE_COMPLETE;
-  return READ(FRAME_EPILOGUE_SIZE, handle_read_frame_epilogue_main);
-}
-
 CtPtr ProtocolV2::handle_read_frame_epilogue_main(char *buffer, int r)
 {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
@@ -1404,6 +1287,7 @@ CtPtr ProtocolV2::handle_read_frame_epilogue_main(char *buffer, int r)
     } catch (ceph::crypto::onwire::MsgAuthError &e) {
       ldout(cct, 5) << __func__ << " message authentication failed: "
                    << e.what() << dendl;
+      ceph_assert("oops" == nullptr);
       return _fault();
     }
   } else {
@@ -1428,26 +1312,30 @@ CtPtr ProtocolV2::handle_read_frame_epilogue_main(char *buffer, int r)
   return handle_read_frame_dispatch();
 }
 
-CtPtr ProtocolV2::handle_message_data(char *buffer, int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
+CtPtr ProtocolV2::handle_message() {
+  ldout(cct, 20) << __func__ << dendl;
 
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " read data error " << dendl;
-    return _fault();
-  }
+  ceph_assert(rx_segments_data.size() == 4);
+  ceph_assert(state == THROTTLE_DONE);
 
-  bufferptr bp = data_blp.get_current_ptr();
-  unsigned read_len = std::min(bp.length(), msg_left);
-  ceph_assert(read_len < std::numeric_limits<int>::max());
-  data_blp.advance(read_len);
-  rx_segments_data[SegmentIndex::Msg::DATA].append(bp, 0, read_len);
-  msg_left -= read_len;
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+  ltt_recv_stamp = ceph_clock_now();
+#endif
+  recv_stamp = ceph_clock_now();
+  state = READ_MESSAGE_COMPLETE;
 
-  return CONTINUE(read_message_data);
-}
+  auto header_frame = MessageHeaderFrame::Decode(
+    std::move(rx_segments_data[SegmentIndex::Msg::HEADER]));
+  // XXX: paranoid copy just to avoid oops
+  ceph_msg_header2 current_header = header_frame.header();
 
-CtPtr ProtocolV2::handle_message_complete() {
-  ldout(cct, 20) << __func__ << dendl;
+  ldout(cct, 20) << __func__
+                << " got envelope type=" << current_header.type
+                << " src " << peer_name
+                << " off " << current_header.data_off
+                 << dendl;
+
+  INTERCEPT(16);
 
   const auto front_len = \
     rx_segments_desc[SegmentIndex::Msg::FRONT].length;
@@ -1596,9 +1484,6 @@ CtPtr ProtocolV2::handle_message_complete() {
 
   handle_message_ack(current_header.ack_seq);
 
-  // clean up local buffer references
-  data_buf.clear();
-
   // we might have been reused by another connection
   // let's check if that is the case
   if (state != READY) {
index 3ba55fb595aa95ef645315fea4ac0916dcd0b18c..b6acd2be4db4ab15c5f1e353b07fb7ff5e29e78d 100644 (file)
@@ -94,13 +94,9 @@ private:
 
   ceph::msgr::v2::Tag sent_tag;
   ceph::msgr::v2::Tag next_tag;
-  ceph_msg_header2 current_header;
   utime_t backoff;  // backoff time
   utime_t recv_stamp;
   utime_t throttle_stamp;
-  unsigned msg_left;
-  bufferlist data_buf;
-  bufferlist::iterator data_blp;
 
   bool keepalive;
 
@@ -149,8 +145,6 @@ private:
   CONTINUATION_DECL(ProtocolV2, throttle_message);
   CONTINUATION_DECL(ProtocolV2, throttle_bytes);
   CONTINUATION_DECL(ProtocolV2, throttle_dispatch_queue);
-  CONTINUATION_DECL(ProtocolV2, read_message_data);
-  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_data);
 
   Ct<ProtocolV2> *read_frame();
   Ct<ProtocolV2> *handle_read_frame_preamble_main(char *buffer, int r);
@@ -167,9 +161,6 @@ private:
   Ct<ProtocolV2> *throttle_bytes();
   Ct<ProtocolV2> *throttle_dispatch_queue();
   Ct<ProtocolV2> *read_message_data_prepare();
-  Ct<ProtocolV2> *read_message_data();
-  Ct<ProtocolV2> *handle_message_data(char *buffer, int r);
-  Ct<ProtocolV2> *handle_message_complete();
 
   Ct<ProtocolV2> *handle_keepalive2(ceph::bufferlist &payload);
   Ct<ProtocolV2> *handle_keepalive2_ack(ceph::bufferlist &payload);
index 85f333f5d55d3f73e2490eb79c34174fa0d668d6..507dc471a8abdd5625bf2a16e597f31c36ca3471 100644 (file)
@@ -55,7 +55,7 @@ enum class Tag : __u8 {
 struct segment_t {
   // TODO: this will be dropped with support for `allocation policies`.
   // We need them because of the rx_buffers zero-copy optimization.
-  static constexpr __le16 DEFERRED_ALLOCATION{0x0000};
+  static constexpr __le16 PAGE_SIZE_ALIGNMENT{4096};
 
   static constexpr __le16 DEFAULT_ALIGNMENT = sizeof(void *);
 
@@ -604,7 +604,7 @@ struct MessageHeaderFrame
                 segment_t::DEFAULT_ALIGNMENT },
       segment_t{ front.length(), segment_t::DEFAULT_ALIGNMENT },
       segment_t{ middle.length(), segment_t::DEFAULT_ALIGNMENT },
-      segment_t{ data.length(), segment_t::DEFERRED_ALLOCATION },
+      segment_t{ data.length(), segment_t::PAGE_SIZE_ALIGNMENT },
     });
 
     // FIXME: plainsize -> ciphersize; for AES-GCM they are equall apart from auth tag size