]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async, v2: limit the num_segments to non-empty segments.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Fri, 8 Mar 2019 18:40:37 +0000 (19:40 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Sun, 10 Mar 2019 00:12:01 +0000 (01:12 +0100)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h
src/msg/async/frames_v2.h

index 5c83ea24b94607fdf4d8ee69129c49d327d21c8a..35d34fbe052dab373227eec993fa52a0ee46baa3 100644 (file)
@@ -235,6 +235,16 @@ void ProtocolV2::reset_recv_state() {
   reset_throttle();
 }
 
+size_t ProtocolV2::get_current_msg_size() const {
+  ceph_assert(!rx_segments_desc.empty());
+  size_t sum = 0;
+  // we don't include SegmentIndex::Msg::HEADER.
+  for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
+    sum += rx_segments_desc[idx].length;
+  }
+  return sum;
+}
+
 void ProtocolV2::reset_throttle() {
   if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
       connection->policy.throttler_messages) {
@@ -247,11 +257,7 @@ void ProtocolV2::reset_throttle() {
   }
   if (state > THROTTLE_BYTES && state <= THROTTLE_DONE) {
     if (connection->policy.throttler_bytes) {
-      const uint32_t cur_msg_size = \
-       rx_segments_desc[SegmentIndex::Msg::FRONT].length + \
-       rx_segments_desc[SegmentIndex::Msg::MIDDLE].length + \
-       rx_segments_desc[SegmentIndex::Msg::DATA].length;
-
+      const size_t cur_msg_size = get_current_msg_size();
       ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
                      << " bytes to policy throttler "
                      << connection->policy.throttler_bytes->get_current() << "/"
@@ -260,11 +266,7 @@ void ProtocolV2::reset_throttle() {
     }
   }
   if (state > THROTTLE_DISPATCH_QUEUE && state <= THROTTLE_DONE) {
-    const uint32_t cur_msg_size = \
-      rx_segments_desc[SegmentIndex::Msg::FRONT].length + \
-      rx_segments_desc[SegmentIndex::Msg::MIDDLE].length + \
-      rx_segments_desc[SegmentIndex::Msg::DATA].length;
-
+    const size_t cur_msg_size = get_current_msg_size();
     ldout(cct, 10)
         << __func__ << " releasing " << cur_msg_size
         << " bytes to dispatch_queue throttler "
@@ -988,8 +990,9 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) {
       return _fault();
     }
 
-    // currently we do support only 4 or 1 segments
-    if (main_preamble.num_segments != 1 && main_preamble.num_segments != 4) {
+    // currently we do support between 1 and MAX_NUM_SEGMENTS segments
+    if (main_preamble.num_segments < 1 ||
+        main_preamble.num_segments > MAX_NUM_SEGMENTS) {
       ldout(cct, 10) << __func__ << " unsupported num_segments="
                     << " tx_crc=" << main_preamble.num_segments << dendl;
       return _fault();
@@ -1281,8 +1284,6 @@ CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r)
 
 CtPtr ProtocolV2::handle_message() {
   ldout(cct, 20) << __func__ << dendl;
-
-  ceph_assert(rx_segments_data.size() == 4);
   ceph_assert(state == THROTTLE_DONE);
 
 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
@@ -1499,11 +1500,7 @@ CtPtr ProtocolV2::throttle_message() {
 CtPtr ProtocolV2::throttle_bytes() {
   ldout(cct, 20) << __func__ << dendl;
 
-  ceph_assert(rx_segments_desc.size() == 4);
-  uint32_t cur_msg_size = \
-    rx_segments_desc[SegmentIndex::Msg::FRONT].length + \
-    rx_segments_desc[SegmentIndex::Msg::MIDDLE].length + \
-    rx_segments_desc[SegmentIndex::Msg::DATA].length;
+  const size_t cur_msg_size = get_current_msg_size();
   if (cur_msg_size) {
     if (connection->policy.throttler_bytes) {
       ldout(cct, 10) << __func__ << " wants " << cur_msg_size
@@ -1535,12 +1532,7 @@ CtPtr ProtocolV2::throttle_bytes() {
 CtPtr ProtocolV2::throttle_dispatch_queue() {
   ldout(cct, 20) << __func__ << dendl;
 
-  ceph_assert(rx_segments_desc.size() == 4);
-  uint32_t cur_msg_size =
-    rx_segments_desc[SegmentIndex::Msg::FRONT].length + \
-    rx_segments_desc[SegmentIndex::Msg::MIDDLE].length + \
-    rx_segments_desc[SegmentIndex::Msg::DATA].length;
-
+  const size_t cur_msg_size = get_current_msg_size();
   if (cur_msg_size) {
     if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
             cur_msg_size)) {
index 96fdc4d570cbfd972b7c5157aee9eadcfc94c1f5..0193d7992a18ea270071d427b8c0f6599b0998a0 100644 (file)
@@ -243,6 +243,7 @@ private:
 
   uint32_t get_onwire_size(uint32_t logical_size) const;
   uint32_t get_epilogue_size() const;
+  size_t get_current_msg_size() const;
 };
 
 #endif /* _MSG_ASYNC_PROTOCOL_V2_ */
index 2fade004b306d1658f9e0e2e1a25fdc8206f976c..2ae31f8b77756b73e2ea5d6ea7fa5535a89b11b4 100644 (file)
@@ -176,6 +176,18 @@ private:
   };
   ceph::bufferlist::contiguous_filler preamble_filler;
 
+  __u8 calc_num_segments(
+    const std::array<segment_t, MAX_NUM_SEGMENTS>& segments)
+  {
+    for (__u8 num = SegmentsNumV; num > 0; num--) {
+      if (segments[num-1].length) {
+        return num;
+      }
+    }
+    // frame always has at least one segment.
+    return 1;
+  }
+
   // craft the main preamble. It's always present regardless of the number
   // of segments message is composed from.
   void fill_preamble() {
@@ -194,13 +206,17 @@ private:
         segments.front().length() - FRAME_PREAMBLE_SIZE;
     main_preamble.segments.front().alignment = alignments.front();
 
+    // there is no business in issuing frame without at least one segment
+    // filled.
     if constexpr(SegmentsNumV > 1) {
       for (__u8 idx = 1; idx < SegmentsNumV; idx++) {
         main_preamble.segments[idx].length = segments[idx].length();
         main_preamble.segments[idx].alignment = alignments[idx];
       }
     }
-    main_preamble.num_segments = SegmentsNumV;
+    // calculate the number of non-empty segments.
+    // TODO: reorder segments to get DATA first
+    main_preamble.num_segments = calc_num_segments(main_preamble.segments);
 
     main_preamble.crc =
         ceph_crc32c(0, reinterpret_cast<unsigned char *>(&main_preamble),
@@ -649,6 +665,12 @@ struct MessageFrame : public Frame<MessageFrame,
                                    segment_t::DEFAULT_ALIGNMENT,
                                    segment_t::DEFAULT_ALIGNMENT,
                                    segment_t::PAGE_SIZE_ALIGNMENT> {
+  struct {
+    uint32_t front;
+    uint32_t middle;
+    uint32_t data;
+  } len;
+
   static const Tag tag = Tag::MESSAGE;
 
   static MessageFrame Encode(const ceph_msg_header2 &msg_header,