From 9ef345fdf7bc773bfaed47522ae8604215b351da Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Fri, 15 Feb 2019 16:40:22 +0100 Subject: [PATCH] msg/async, v2: drop the throttles bypass. Signed-off-by: Radoslaw Zarzynski --- src/msg/async/ProtocolV2.cc | 43 +++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 1b1a274768afb..11adc2d0abca5 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -717,10 +717,10 @@ void ProtocolV2::reset_recv_state() { connection->pendingReadLen.reset(); connection->writeCallback.reset(); - uint32_t cur_msg_size = current_header.front_len + current_header.middle_len + - current_header.data_len; + uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \ + rx_segments_todo_rev[1].length + \ + rx_segments_todo_rev[0].length; -#if 0 if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE && connection->policy.throttler_messages) { ldout(cct, 10) << __func__ << " releasing " << 1 @@ -747,7 +747,6 @@ void ProtocolV2::reset_recv_state() { << connection->dispatch_queue->dispatch_throttler.get_max() << dendl; connection->dispatch_queue->dispatch_throttle_release(cur_msg_size); } -#endif } CtPtr ProtocolV2::_fault() { @@ -1485,8 +1484,14 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) { } } - rx_segments_data.clear(); - return read_frame_segment(); + // does it need throttle? + if (next_tag == Tag::MESSAGE) { + state = THROTTLE_MESSAGE; + return CONTINUE(throttle_message); + } else { + rx_segments_data.clear(); + return read_frame_segment(); + } } CtPtr ProtocolV2::handle_read_frame_dispatch() { @@ -1660,7 +1665,7 @@ CtPtr ProtocolV2::handle_message() { ldout(cct, 20) << __func__ << dendl; ceph_assert(rx_segments_data.size() == 4); - ceph_assert(state == READY); + ceph_assert(state == THROTTLE_DONE); #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) ltt_recv_stamp = ceph_clock_now(); @@ -1708,11 +1713,6 @@ CtPtr ProtocolV2::handle_message() { extra.clear(); current_header = header; -#if 0 - state = THROTTLE_MESSAGE; - return CONTINUE(throttle_message); -#endif - // front ceph_assert(current_header.front_len == rx_segments_data[1].length()); ceph_assert(!front.length()); @@ -1869,19 +1869,15 @@ CtPtr ProtocolV2::handle_message_complete() { INTERCEPT(17); -#if 0 message->set_byte_throttler(connection->policy.throttler_bytes); message->set_message_throttler(connection->policy.throttler_messages); -#endif uint32_t cur_msg_size = current_header.front_len + current_header.middle_len + current_header.data_len; -#if 0 // store reservation size in message, so we don't get confused // by messages entering the dispatch queue through other paths. message->set_dispatch_throttle_size(cur_msg_size); -#endif message->set_recv_stamp(recv_stamp); message->set_throttle_stamp(throttle_stamp); @@ -2032,8 +2028,10 @@ CtPtr ProtocolV2::throttle_message() { CtPtr ProtocolV2::throttle_bytes() { ldout(cct, 20) << __func__ << dendl; - uint32_t cur_msg_size = current_header.front_len + current_header.middle_len + - current_header.data_len; + ceph_assert(rx_segments_todo_rev.size() == 4); + uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \ + rx_segments_todo_rev[1].length + \ + rx_segments_todo_rev[0].length; if (cur_msg_size) { if (connection->policy.throttler_bytes) { ldout(cct, 10) << __func__ << " wants " << cur_msg_size @@ -2065,8 +2063,10 @@ CtPtr ProtocolV2::throttle_bytes() { CtPtr ProtocolV2::throttle_dispatch_queue() { ldout(cct, 20) << __func__ << dendl; - uint32_t cur_msg_size = current_header.front_len + current_header.middle_len + - current_header.data_len; + ceph_assert(rx_segments_todo_rev.size() == 4); + uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \ + rx_segments_todo_rev[1].length + \ + rx_segments_todo_rev[0].length; if (cur_msg_size) { if (!connection->dispatch_queue->dispatch_throttler.get_or_fail( @@ -2091,7 +2091,8 @@ CtPtr ProtocolV2::throttle_dispatch_queue() { throttle_stamp = ceph_clock_now(); state = THROTTLE_DONE; - return read_message_data_prepare(); + rx_segments_data.clear(); + return read_frame_segment(); } CtPtr ProtocolV2::handle_keepalive2(char *payload, uint32_t length) { -- 2.39.5