return;
fail:
- // clean up state internal variables and states
- if (state >= STATE_CONNECTING_SEND_CONNECT_MSG &&
- state <= STATE_CONNECTING_READY) {
- delete authorizer;
- authorizer = NULL;
- got_bad_auth = false;
- }
-
- if (state > STATE_OPEN_MESSAGE_THROTTLE_MESSAGE &&
- state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
- && policy.throttler_messages) {
- ldout(async_msgr->cct,10) << __func__ << " releasing " << 1
- << " message to policy throttler "
- << policy.throttler_messages->get_current() << "/"
- << policy.throttler_messages->get_max() << dendl;
- policy.throttler_messages->put();
- }
- if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE &&
- state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) {
- if (policy.throttler_bytes) {
- ldout(async_msgr->cct,10) << __func__ << " releasing " << cur_msg_size
- << " bytes to policy throttler "
- << policy.throttler_bytes->get_current() << "/"
- << policy.throttler_bytes->get_max() << dendl;
- policy.throttler_bytes->put(cur_msg_size);
- }
- dispatch_queue->dispatch_throttle_release(cur_msg_size);
- }
fault();
}
assert(!delay_state);
}
existing->requeue_sent();
+ existing->reset_recv_state();
swap(existing->sd, sd);
existing->can_write = WriteStatus::NOWRITE;
_stop();
return ;
}
+ reset_recv_state();
if (policy.standby && !is_queued()) {
ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl;
state = STATE_STANDBY;
if (sd >= 0)
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+ reset_recv_state();
dispatch_queue->discard_queue(conn_id);
discard_out_queue();
async_msgr->unregister_conn(this);
return rc;
}
+void AsyncConnection::reset_recv_state()
+{
+ // clean up state internal variables and states
+ if (state >= STATE_CONNECTING_SEND_CONNECT_MSG &&
+ state <= STATE_CONNECTING_READY) {
+ delete authorizer;
+ authorizer = NULL;
+ got_bad_auth = false;
+ }
+
+ if (state > STATE_OPEN_MESSAGE_THROTTLE_MESSAGE &&
+ state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
+ && policy.throttler_messages) {
+ ldout(async_msgr->cct, 10) << __func__ << " releasing " << 1
+ << " message to policy throttler "
+ << policy.throttler_messages->get_current() << "/"
+ << policy.throttler_messages->get_max() << dendl;
+ policy.throttler_messages->put();
+ }
+ if (state > STATE_OPEN_MESSAGE_THROTTLE_BYTES &&
+ state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) {
+ if (policy.throttler_bytes) {
+ ldout(async_msgr->cct, 10) << __func__ << " releasing " << cur_msg_size
+ << " bytes to policy throttler "
+ << policy.throttler_bytes->get_current() << "/"
+ << policy.throttler_bytes->get_max() << dendl;
+ policy.throttler_bytes->put(cur_msg_size);
+ }
+ }
+ if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE &&
+ state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) {
+ ldout(async_msgr->cct, 10) << __func__ << " releasing " << cur_msg_size
+ << " bytes to dispatch_queue throttler "
+ << dispatch_queue->dispatch_throttler.get_current() << "/"
+ << dispatch_queue->dispatch_throttler.get_max() << dendl;
+ dispatch_queue->dispatch_throttle_release(cur_msg_size);
+ }
+}
+
void AsyncConnection::handle_ack(uint64_t seq)
{
ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl;