case STATE_OPEN_MESSAGE_THROTTLE_BYTES:
{
- uint64_t message_size = current_header.front_len + current_header.middle_len + current_header.data_len;
- if (message_size) {
+ cur_msg_size = current_header.front_len + current_header.middle_len + current_header.data_len;
+ if (cur_msg_size) {
if (policy.throttler_bytes) {
- ldout(async_msgr->cct, 10) << __func__ << " wants " << message_size << " bytes from policy throttler "
+ ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from policy throttler "
<< policy.throttler_bytes->get_current() << "/"
<< policy.throttler_bytes->get_max() << dendl;
- if (!policy.throttler_bytes->get_or_fail(message_size)) {
- ldout(async_msgr->cct, 10) << __func__ << " wants " << message_size << " bytes from policy throttler "
+ if (!policy.throttler_bytes->get_or_fail(cur_msg_size)) {
+ ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from policy throttler "
<< policy.throttler_bytes->get_current() << "/"
<< policy.throttler_bytes->get_max() << " failed, just wait." << dendl;
// following thread pool deal with th full message queue isn't a
}
}
+ state = STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE;
+ break;
+ }
+
+ case STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE:
+ {
+ if (cur_msg_size) {
+ if (!dispatch_queue->dispatch_throttler.get_or_fail(cur_msg_size)) {
+ ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from dispatch throttle "
+ << dispatch_queue->dispatch_throttler.get_current() << "/"
+ << dispatch_queue->dispatch_throttler.get_max() << " failed, just wait." << dendl;
+ // following thread pool deal with th full message queue isn't a
+ // short time, so we can wait a ms.
+ if (register_time_events.empty())
+ register_time_events.insert(center->create_time_event(1000, wakeup_handler));
+ break;
+ }
+ }
+
throttle_stamp = ceph_clock_now(msgr->cct);
state = STATE_OPEN_MESSAGE_READ_FRONT;
break;
// store reservation size in message, so we don't get confused
// by messages entering the dispatch queue through other paths.
- uint64_t message_size = current_header.front_len + current_header.middle_len + current_header.data_len;
- message->set_dispatch_throttle_size(message_size);
+ message->set_dispatch_throttle_size(cur_msg_size);
message->set_recv_stamp(recv_stamp);
message->set_throttle_stamp(throttle_stamp);
state = STATE_OPEN;
logger->inc(l_msgr_recv_messages);
- logger->inc(l_msgr_recv_bytes, message_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
+ logger->inc(l_msgr_recv_bytes, cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
async_msgr->ms_fast_preprocess(message);
if (delay_state) {
} else {
dispatch_queue->enqueue(message, message->get_priority(), conn_id);
}
+
break;
}
<< policy.throttler_messages->get_max() << dendl;
policy.throttler_messages->put();
}
- if (state > STATE_OPEN_MESSAGE_THROTTLE_BYTES &&
+ if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE &&
state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) {
- uint64_t message_size = current_header.front_len + current_header.middle_len + current_header.data_len;
if (policy.throttler_bytes) {
- ldout(async_msgr->cct,10) << __func__ << " releasing " << message_size
+ ldout(async_msgr->cct,10) << __func__ << " releasing " << cur_msg_size
<< " bytes to policy throttler "
<< policy.throttler_bytes->get_current() << "/"
<< policy.throttler_bytes->get_max() << dendl;
- policy.throttler_bytes->put(message_size);
+ policy.throttler_bytes->put(cur_msg_size);
}
+ dispatch_queue->dispatch_throttle_release(cur_msg_size);
}
fault();
}
STATE_OPEN_MESSAGE_HEADER,
STATE_OPEN_MESSAGE_THROTTLE_MESSAGE,
STATE_OPEN_MESSAGE_THROTTLE_BYTES,
+ STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE,
STATE_OPEN_MESSAGE_READ_FRONT,
STATE_OPEN_MESSAGE_READ_MIDDLE,
STATE_OPEN_MESSAGE_READ_DATA_PREPARE,
"STATE_OPEN_MESSAGE_HEADER",
"STATE_OPEN_MESSAGE_THROTTLE_MESSAGE",
"STATE_OPEN_MESSAGE_THROTTLE_BYTES",
+ "STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE",
"STATE_OPEN_MESSAGE_READ_FRONT",
"STATE_OPEN_MESSAGE_READ_MIDDLE",
"STATE_OPEN_MESSAGE_READ_DATA_PREPARE",
utime_t recv_stamp;
utime_t throttle_stamp;
unsigned msg_left;
+ uint64_t cur_msg_size;
ceph_msg_header current_header;
bufferlist data_buf;
bufferlist::iterator data_blp;