]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async, v2: rework decoding of MessageFrame.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Fri, 8 Mar 2019 21:21:16 +0000 (22:21 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Mon, 11 Mar 2019 00:24:01 +0000 (01:24 +0100)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/msg/async/ProtocolV2.cc
src/msg/async/frames_v2.h

index 35d34fbe052dab373227eec993fa52a0ee46baa3..48fcca695c21133b3d8e2259a8ae4b644a9aa39b 100644 (file)
@@ -1291,40 +1291,32 @@ CtPtr ProtocolV2::handle_message() {
 #endif
   recv_stamp = ceph_clock_now();
 
-  auto header_frame = MessageFrame::Decode(
-    std::move(rx_segments_data[SegmentIndex::Msg::HEADER]));
-  // XXX: paranoid copy just to avoid oops
-  ceph_msg_header2 current_header = header_frame.header();
-
-  ldout(cct, 20) << __func__
-                << " got envelope type=" << current_header.type
-                << " src " << peer_name
-                << " off " << current_header.data_off
-                 << dendl;
-
-  INTERCEPT(16);
+  // we need to get the size before std::moving segments data
+  const size_t cur_msg_size = get_current_msg_size();
+  auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
 
-  const auto front_len = \
-    rx_segments_desc[SegmentIndex::Msg::FRONT].length;
-  const auto middle_len = \
-    rx_segments_desc[SegmentIndex::Msg::MIDDLE].length;
-  const auto data_len = \
-    rx_segments_desc[SegmentIndex::Msg::DATA].length;
+  // XXX: paranoid copy just to avoid oops
+  ceph_msg_header2 current_header = msg_frame.header();
 
   ldout(cct, 5) << __func__
-               << " got " << front_len
-               << " + " << middle_len
-               << " + " << data_len
-               << " byte message" << dendl;
+               << " got " << msg_frame.front_len()
+               << " + " << msg_frame.middle_len()
+               << " + " << msg_frame.data_len()
+               << " byte message."
+               << " envelope type=" << current_header.type
+               << " src " << peer_name
+               << " off " << current_header.data_off
+                << dendl;
 
+  INTERCEPT(16);
   ceph_msg_header header{current_header.seq,
                          current_header.tid,
                          current_header.type,
                          current_header.priority,
                          current_header.version,
-                         front_len,
-                         middle_len,
-                         data_len,
+                         msg_frame.front_len(),
+                         msg_frame.middle_len(),
+                         msg_frame.data_len(),
                          current_header.data_off,
                          peer_name,
                          current_header.compat_version,
@@ -1333,9 +1325,9 @@ CtPtr ProtocolV2::handle_message() {
   ceph_msg_footer footer{0, 0, 0, 0, current_header.flags};
 
   Message *message = decode_message(cct, 0, header, footer,
-      rx_segments_data[SegmentIndex::Msg::FRONT],
-      rx_segments_data[SegmentIndex::Msg::MIDDLE],
-      rx_segments_data[SegmentIndex::Msg::DATA],
+      msg_frame.front(),
+      msg_frame.middle(),
+      msg_frame.data(),
       connection);
   if (!message) {
     ldout(cct, 1) << __func__ << " decode message failed " << dendl;
@@ -1349,11 +1341,6 @@ CtPtr ProtocolV2::handle_message() {
   message->set_byte_throttler(connection->policy.throttler_bytes);
   message->set_message_throttler(connection->policy.throttler_messages);
 
-  const uint32_t cur_msg_size = \
-    rx_segments_desc[SegmentIndex::Msg::FRONT].length + \
-    rx_segments_desc[SegmentIndex::Msg::MIDDLE].length + \
-    rx_segments_desc[SegmentIndex::Msg::DATA].length;
-
   // store reservation size in message, so we don't get confused
   // by messages entering the dispatch queue through other paths.
   message->set_dispatch_throttle_size(cur_msg_size);
index 2ae31f8b77756b73e2ea5d6ea7fa5535a89b11b4..e979743dc2480887c31ba9c4823804594798f1ba 100644 (file)
@@ -688,9 +688,16 @@ struct MessageFrame : public Frame<MessageFrame,
     return f;
   }
 
-  static MessageFrame Decode(ceph::bufferlist &&msg_header) {
+  using rx_segments_t =
+    boost::container::static_vector<ceph::bufferlist,
+                                    ceph::msgr::v2::MAX_NUM_SEGMENTS>;
+  static MessageFrame Decode(rx_segments_t &&recv_segments) {
     MessageFrame f;
-    f.segments[SegmentIndex::Msg::HEADER] = std::move(msg_header);
+    // transfer segments' bufferlists. If a MessageFrame contains less
+    // SegmentsNumV segments, the missing ones will be seen as zeroed.
+    for (__u8 idx = 0; idx < std::size(recv_segments); idx++) {
+      f.segments[idx] = std::move(recv_segments[idx]);
+    }
     return f;
   }
 
@@ -699,6 +706,30 @@ struct MessageFrame : public Frame<MessageFrame,
     return reinterpret_cast<const ceph_msg_header2&>(*hdrbl.c_str());
   }
 
+  ceph::bufferlist &front() {
+    return segments[SegmentIndex::Msg::FRONT];
+  }
+
+  ceph::bufferlist &middle() {
+    return segments[SegmentIndex::Msg::MIDDLE];
+  }
+
+  ceph::bufferlist &data() {
+    return segments[SegmentIndex::Msg::DATA];
+  }
+
+  uint32_t front_len() const {
+    return segments[SegmentIndex::Msg::FRONT].length();
+  }
+
+  uint32_t middle_len() const {
+    return segments[SegmentIndex::Msg::MIDDLE].length();
+  }
+
+  uint32_t data_len() const {
+    return segments[SegmentIndex::Msg::DATA].length();
+  }
+
 protected:
   using Frame::Frame;
 };