connection->pendingReadLen.reset();
connection->writeCallback.reset();
- uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
- current_header.data_len;
+ uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \
+ rx_segments_todo_rev[1].length + \
+ rx_segments_todo_rev[0].length;
-#if 0
if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
connection->policy.throttler_messages) {
ldout(cct, 10) << __func__ << " releasing " << 1
<< connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
}
-#endif
}
CtPtr ProtocolV2::_fault() {
}
}
- rx_segments_data.clear();
- return read_frame_segment();
+ // does it need throttle?
+ if (next_tag == Tag::MESSAGE) {
+ state = THROTTLE_MESSAGE;
+ return CONTINUE(throttle_message);
+ } else {
+ rx_segments_data.clear();
+ return read_frame_segment();
+ }
}
CtPtr ProtocolV2::handle_read_frame_dispatch() {
ldout(cct, 20) << __func__ << dendl;
ceph_assert(rx_segments_data.size() == 4);
- ceph_assert(state == READY);
+ ceph_assert(state == THROTTLE_DONE);
#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
ltt_recv_stamp = ceph_clock_now();
extra.clear();
current_header = header;
-#if 0
- state = THROTTLE_MESSAGE;
- return CONTINUE(throttle_message);
-#endif
-
// front
ceph_assert(current_header.front_len == rx_segments_data[1].length());
ceph_assert(!front.length());
INTERCEPT(17);
-#if 0
message->set_byte_throttler(connection->policy.throttler_bytes);
message->set_message_throttler(connection->policy.throttler_messages);
-#endif
uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
current_header.data_len;
-#if 0
// store reservation size in message, so we don't get confused
// by messages entering the dispatch queue through other paths.
message->set_dispatch_throttle_size(cur_msg_size);
-#endif
message->set_recv_stamp(recv_stamp);
message->set_throttle_stamp(throttle_stamp);
CtPtr ProtocolV2::throttle_bytes() {
ldout(cct, 20) << __func__ << dendl;
- uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
- current_header.data_len;
+ ceph_assert(rx_segments_todo_rev.size() == 4);
+ uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \
+ rx_segments_todo_rev[1].length + \
+ rx_segments_todo_rev[0].length;
if (cur_msg_size) {
if (connection->policy.throttler_bytes) {
ldout(cct, 10) << __func__ << " wants " << cur_msg_size
CtPtr ProtocolV2::throttle_dispatch_queue() {
ldout(cct, 20) << __func__ << dendl;
- uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
- current_header.data_len;
+ ceph_assert(rx_segments_todo_rev.size() == 4);
+ uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \
+ rx_segments_todo_rev[1].length + \
+ rx_segments_todo_rev[0].length;
if (cur_msg_size) {
if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
throttle_stamp = ceph_clock_now();
state = THROTTLE_DONE;
- return read_message_data_prepare();
+ rx_segments_data.clear();
+ return read_frame_segment();
}
CtPtr ProtocolV2::handle_keepalive2(char *payload, uint32_t length) {