]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async, v2: drop the throttles bypass.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Fri, 15 Feb 2019 15:40:22 +0000 (16:40 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 21 Feb 2019 22:31:00 +0000 (23:31 +0100)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/msg/async/ProtocolV2.cc

index 1b1a274768afb82d667c55bb97af918e4e2840aa..11adc2d0abca598501c847a6c1dcac127bbebf47 100644 (file)
@@ -717,10 +717,10 @@ void ProtocolV2::reset_recv_state() {
   connection->pendingReadLen.reset();
   connection->writeCallback.reset();
 
-  uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
-                          current_header.data_len;
+  uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \
+                         rx_segments_todo_rev[1].length + \
+                         rx_segments_todo_rev[0].length;
 
-#if 0
   if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
       connection->policy.throttler_messages) {
     ldout(cct, 10) << __func__ << " releasing " << 1
@@ -747,7 +747,6 @@ void ProtocolV2::reset_recv_state() {
         << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
     connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
   }
-#endif
 }
 
 CtPtr ProtocolV2::_fault() {
@@ -1485,8 +1484,14 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) {
     }
   }
 
-  rx_segments_data.clear();
-  return read_frame_segment();
+  // does it need throttle?
+  if (next_tag == Tag::MESSAGE) {
+    state = THROTTLE_MESSAGE;
+    return CONTINUE(throttle_message);
+  } else {
+    rx_segments_data.clear();
+    return read_frame_segment();
+  }
 }
 
 CtPtr ProtocolV2::handle_read_frame_dispatch() {
@@ -1660,7 +1665,7 @@ CtPtr ProtocolV2::handle_message() {
   ldout(cct, 20) << __func__ << dendl;
 
   ceph_assert(rx_segments_data.size() == 4);
-  ceph_assert(state == READY);
+  ceph_assert(state == THROTTLE_DONE);
 
 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
   ltt_recv_stamp = ceph_clock_now();
@@ -1708,11 +1713,6 @@ CtPtr ProtocolV2::handle_message() {
   extra.clear();
   current_header = header;
 
-#if 0
-  state = THROTTLE_MESSAGE;
-  return CONTINUE(throttle_message);
-#endif
-
   // front
   ceph_assert(current_header.front_len == rx_segments_data[1].length());
   ceph_assert(!front.length());
@@ -1869,19 +1869,15 @@ CtPtr ProtocolV2::handle_message_complete() {
 
   INTERCEPT(17);
 
-#if 0
   message->set_byte_throttler(connection->policy.throttler_bytes);
   message->set_message_throttler(connection->policy.throttler_messages);
-#endif
 
   uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
                           current_header.data_len;
 
-#if 0
   // store reservation size in message, so we don't get confused
   // by messages entering the dispatch queue through other paths.
   message->set_dispatch_throttle_size(cur_msg_size);
-#endif
 
   message->set_recv_stamp(recv_stamp);
   message->set_throttle_stamp(throttle_stamp);
@@ -2032,8 +2028,10 @@ CtPtr ProtocolV2::throttle_message() {
 CtPtr ProtocolV2::throttle_bytes() {
   ldout(cct, 20) << __func__ << dendl;
 
-  uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
-                          current_header.data_len;
+  ceph_assert(rx_segments_todo_rev.size() == 4);
+  uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \
+                         rx_segments_todo_rev[1].length + \
+                         rx_segments_todo_rev[0].length;
   if (cur_msg_size) {
     if (connection->policy.throttler_bytes) {
       ldout(cct, 10) << __func__ << " wants " << cur_msg_size
@@ -2065,8 +2063,10 @@ CtPtr ProtocolV2::throttle_bytes() {
 CtPtr ProtocolV2::throttle_dispatch_queue() {
   ldout(cct, 20) << __func__ << dendl;
 
-  uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
-                          current_header.data_len;
+  ceph_assert(rx_segments_todo_rev.size() == 4);
+  uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \
+                         rx_segments_todo_rev[1].length + \
+                         rx_segments_todo_rev[0].length;
 
   if (cur_msg_size) {
     if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
@@ -2091,7 +2091,8 @@ CtPtr ProtocolV2::throttle_dispatch_queue() {
   throttle_stamp = ceph_clock_now();
   state = THROTTLE_DONE;
 
-  return read_message_data_prepare();
+  rx_segments_data.clear();
+  return read_frame_segment();
 }
 
 CtPtr ProtocolV2::handle_keepalive2(char *payload, uint32_t length) {