From: Haomai Wang Date: Fri, 6 May 2016 08:52:01 +0000 (+0800) Subject: AsyncConnection: add DispathQueue throttle X-Git-Tag: v11.0.0~138^2~11 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=485a2c80dc1e3ae338fe464eb57ca614af919cc3;p=ceph.git AsyncConnection: add DispathQueue throttle Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index b9090720b6f3..b0e4455ec331 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -634,14 +634,14 @@ void AsyncConnection::process() case STATE_OPEN_MESSAGE_THROTTLE_BYTES: { - uint64_t message_size = current_header.front_len + current_header.middle_len + current_header.data_len; - if (message_size) { + cur_msg_size = current_header.front_len + current_header.middle_len + current_header.data_len; + if (cur_msg_size) { if (policy.throttler_bytes) { - ldout(async_msgr->cct, 10) << __func__ << " wants " << message_size << " bytes from policy throttler " + ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from policy throttler " << policy.throttler_bytes->get_current() << "/" << policy.throttler_bytes->get_max() << dendl; - if (!policy.throttler_bytes->get_or_fail(message_size)) { - ldout(async_msgr->cct, 10) << __func__ << " wants " << message_size << " bytes from policy throttler " + if (!policy.throttler_bytes->get_or_fail(cur_msg_size)) { + ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from policy throttler " << policy.throttler_bytes->get_current() << "/" << policy.throttler_bytes->get_max() << " failed, just wait." << dendl; // following thread pool deal with th full message queue isn't a @@ -653,6 +653,25 @@ void AsyncConnection::process() } } + state = STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE; + break; + } + + case STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE: + { + if (cur_msg_size) { + if (!dispatch_queue->dispatch_throttler.get_or_fail(cur_msg_size)) { + ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from dispatch throttle " + << dispatch_queue->dispatch_throttler.get_current() << "/" + << dispatch_queue->dispatch_throttler.get_max() << " failed, just wait." << dendl; + // following thread pool deal with th full message queue isn't a + // short time, so we can wait a ms. + if (register_time_events.empty()) + register_time_events.insert(center->create_time_event(1000, wakeup_handler)); + break; + } + } + throttle_stamp = ceph_clock_now(msgr->cct); state = STATE_OPEN_MESSAGE_READ_FRONT; break; @@ -815,8 +834,7 @@ void AsyncConnection::process() // store reservation size in message, so we don't get confused // by messages entering the dispatch queue through other paths. - uint64_t message_size = current_header.front_len + current_header.middle_len + current_header.data_len; - message->set_dispatch_throttle_size(message_size); + message->set_dispatch_throttle_size(cur_msg_size); message->set_recv_stamp(recv_stamp); message->set_throttle_stamp(throttle_stamp); @@ -862,7 +880,7 @@ void AsyncConnection::process() state = STATE_OPEN; logger->inc(l_msgr_recv_messages); - logger->inc(l_msgr_recv_bytes, message_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer)); + logger->inc(l_msgr_recv_bytes, cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer)); async_msgr->ms_fast_preprocess(message); if (delay_state) { @@ -882,6 +900,7 @@ void AsyncConnection::process() } else { dispatch_queue->enqueue(message, message->get_priority(), conn_id); } + break; } @@ -942,16 +961,16 @@ void AsyncConnection::process() << policy.throttler_messages->get_max() << dendl; policy.throttler_messages->put(); } - if (state > STATE_OPEN_MESSAGE_THROTTLE_BYTES && + if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE && state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) { - uint64_t message_size = current_header.front_len + current_header.middle_len + current_header.data_len; if (policy.throttler_bytes) { - ldout(async_msgr->cct,10) << __func__ << " releasing " << message_size + ldout(async_msgr->cct,10) << __func__ << " releasing " << cur_msg_size << " bytes to policy throttler " << policy.throttler_bytes->get_current() << "/" << policy.throttler_bytes->get_max() << dendl; - policy.throttler_bytes->put(message_size); + policy.throttler_bytes->put(cur_msg_size); } + dispatch_queue->dispatch_throttle_release(cur_msg_size); } fault(); } diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index fcbaf5e35d5a..2550283a1459 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -210,6 +210,7 @@ class AsyncConnection : public Connection { STATE_OPEN_MESSAGE_HEADER, STATE_OPEN_MESSAGE_THROTTLE_MESSAGE, STATE_OPEN_MESSAGE_THROTTLE_BYTES, + STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE, STATE_OPEN_MESSAGE_READ_FRONT, STATE_OPEN_MESSAGE_READ_MIDDLE, STATE_OPEN_MESSAGE_READ_DATA_PREPARE, @@ -247,6 +248,7 @@ class AsyncConnection : public Connection { "STATE_OPEN_MESSAGE_HEADER", "STATE_OPEN_MESSAGE_THROTTLE_MESSAGE", "STATE_OPEN_MESSAGE_THROTTLE_BYTES", + "STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE", "STATE_OPEN_MESSAGE_READ_FRONT", "STATE_OPEN_MESSAGE_READ_MIDDLE", "STATE_OPEN_MESSAGE_READ_DATA_PREPARE", @@ -321,6 +323,7 @@ class AsyncConnection : public Connection { utime_t recv_stamp; utime_t throttle_stamp; unsigned msg_left; + uint64_t cur_msg_size; ceph_msg_header current_header; bufferlist data_buf; bufferlist::iterator data_blp;