From: Haomai Wang Date: Sat, 14 May 2016 14:44:21 +0000 (+0800) Subject: AsyncConnection: reset throttle if mark_down or write failed X-Git-Tag: v11.0.0~138^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1056ce663dd03a32a859bfacf6069e957f185269;p=ceph.git AsyncConnection: reset throttle if mark_down or write failed Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 4c568311ed63..2591b1017dea 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -948,34 +948,6 @@ void AsyncConnection::process() return; fail: - // clean up state internal variables and states - if (state >= STATE_CONNECTING_SEND_CONNECT_MSG && - state <= STATE_CONNECTING_READY) { - delete authorizer; - authorizer = NULL; - got_bad_auth = false; - } - - if (state > STATE_OPEN_MESSAGE_THROTTLE_MESSAGE && - state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH - && policy.throttler_messages) { - ldout(async_msgr->cct,10) << __func__ << " releasing " << 1 - << " message to policy throttler " - << policy.throttler_messages->get_current() << "/" - << policy.throttler_messages->get_max() << dendl; - policy.throttler_messages->put(); - } - if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE && - state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) { - if (policy.throttler_bytes) { - ldout(async_msgr->cct,10) << __func__ << " releasing " << cur_msg_size - << " bytes to policy throttler " - << policy.throttler_bytes->get_current() << "/" - << policy.throttler_bytes->get_max() << dendl; - policy.throttler_bytes->put(cur_msg_size); - } - dispatch_queue->dispatch_throttle_release(cur_msg_size); - } fault(); } @@ -1814,6 +1786,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis assert(!delay_state); } existing->requeue_sent(); + existing->reset_recv_state(); swap(existing->sd, sd); existing->can_write = WriteStatus::NOWRITE; @@ -2145,6 +2118,7 @@ void AsyncConnection::fault() _stop(); return ; } + reset_recv_state(); if (policy.standby && !is_queued()) { ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl; state = STATE_STANDBY; @@ -2220,6 +2194,7 @@ void AsyncConnection::_stop() if (sd >= 0) center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); + reset_recv_state(); dispatch_queue->discard_queue(conn_id); discard_out_queue(); async_msgr->unregister_conn(this); @@ -2363,6 +2338,45 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more) return rc; } +void AsyncConnection::reset_recv_state() +{ + // clean up state internal variables and states + if (state >= STATE_CONNECTING_SEND_CONNECT_MSG && + state <= STATE_CONNECTING_READY) { + delete authorizer; + authorizer = NULL; + got_bad_auth = false; + } + + if (state > STATE_OPEN_MESSAGE_THROTTLE_MESSAGE && + state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH + && policy.throttler_messages) { + ldout(async_msgr->cct, 10) << __func__ << " releasing " << 1 + << " message to policy throttler " + << policy.throttler_messages->get_current() << "/" + << policy.throttler_messages->get_max() << dendl; + policy.throttler_messages->put(); + } + if (state > STATE_OPEN_MESSAGE_THROTTLE_BYTES && + state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) { + if (policy.throttler_bytes) { + ldout(async_msgr->cct, 10) << __func__ << " releasing " << cur_msg_size + << " bytes to policy throttler " + << policy.throttler_bytes->get_current() << "/" + << policy.throttler_bytes->get_max() << dendl; + policy.throttler_bytes->put(cur_msg_size); + } + } + if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE && + state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) { + ldout(async_msgr->cct, 10) << __func__ << " releasing " << cur_msg_size + << " bytes to dispatch_queue throttler " + << dispatch_queue->dispatch_throttler.get_current() << "/" + << dispatch_queue->dispatch_throttler.get_max() << dendl; + dispatch_queue->dispatch_throttle_release(cur_msg_size); + } +} + void AsyncConnection::handle_ack(uint64_t seq) { ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl; diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 170f10875784..741085864b6e 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -126,6 +126,7 @@ class AsyncConnection : public Connection { assert(write_lock.is_locked()); return !out_q.empty(); } + void reset_recv_state(); /** * The DelayedDelivery is for injecting delays into Message delivery off