case STATE_OPEN_MESSAGE_THROTTLE_MESSAGE:
{
if (policy.throttler_messages) {
- ldout(async_msgr->cct,10) << __func__ << " wants " << 1 << " message from policy throttler "
- << policy.throttler_messages->get_current() << "/"
- << policy.throttler_messages->get_max() << dendl;
- // FIXME: may block
- policy.throttler_messages->get();
+ ldout(async_msgr->cct, 10) << __func__ << " wants " << 1 << " message from policy throttler "
+ << policy.throttler_messages->get_current() << "/"
+ << policy.throttler_messages->get_max() << dendl;
+ if (!policy.throttler_messages->get_or_fail()) {
+ ldout(async_msgr->cct, 1) << __func__ << " wants 1 message from policy throttle "
+ << policy.throttler_messages->get_current() << "/"
+ << policy.throttler_messages->get_max() << " failed, just wait." << dendl;
+ center->dispatch_event_external(read_handler);
+ break;
+ }
}
state = STATE_OPEN_MESSAGE_THROTTLE_BYTES;
uint64_t message_size = current_header.front_len + current_header.middle_len + current_header.data_len;
if (message_size) {
if (policy.throttler_bytes) {
- ldout(async_msgr->cct,10) << __func__ << " wants " << message_size << " bytes from policy throttler "
- << policy.throttler_bytes->get_current() << "/"
- << policy.throttler_bytes->get_max() << dendl;
- // FIXME: may block
- policy.throttler_bytes->get(message_size);
+ ldout(async_msgr->cct, 10) << __func__ << " wants " << message_size << " bytes from policy throttler "
+ << policy.throttler_bytes->get_current() << "/"
+ << policy.throttler_bytes->get_max() << dendl;
+ if (!policy.throttler_bytes->get_or_fail(message_size)) {
+ ldout(async_msgr->cct, 10) << __func__ << " wants " << message_size << " bytes from policy throttler "
+ << policy.throttler_bytes->get_current() << "/"
+ << policy.throttler_bytes->get_max() << " failed, just wait." << dendl;
+ center->dispatch_event_external(read_handler);
+ break;
+ }
}
}