]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: reset throttle if mark_down or write failed
authorHaomai Wang <haomai@xsky.com>
Sat, 14 May 2016 14:44:21 +0000 (22:44 +0800)
committerHaomai Wang <haomai@xsky.com>
Fri, 20 May 2016 03:49:55 +0000 (11:49 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index 4c568311ed63b3d050eae905f105312e2ea845d8..2591b1017dea217d5512d22245ba68552c32cd98 100644 (file)
@@ -948,34 +948,6 @@ void AsyncConnection::process()
   return;
 
  fail:
-  // clean up state internal variables and states
-  if (state >= STATE_CONNECTING_SEND_CONNECT_MSG &&
-      state <= STATE_CONNECTING_READY) {
-    delete authorizer;
-    authorizer = NULL;
-    got_bad_auth = false;
-  }
-
-  if (state > STATE_OPEN_MESSAGE_THROTTLE_MESSAGE &&
-      state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
-      && policy.throttler_messages) {
-    ldout(async_msgr->cct,10) << __func__ << " releasing " << 1
-                        << " message to policy throttler "
-                        << policy.throttler_messages->get_current() << "/"
-                        << policy.throttler_messages->get_max() << dendl;
-    policy.throttler_messages->put();
-  }
-  if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE &&
-      state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) {
-    if (policy.throttler_bytes) {
-      ldout(async_msgr->cct,10) << __func__ << " releasing " << cur_msg_size
-                          << " bytes to policy throttler "
-                          << policy.throttler_bytes->get_current() << "/"
-                          << policy.throttler_bytes->get_max() << dendl;
-      policy.throttler_bytes->put(cur_msg_size);
-    }
-    dispatch_queue->dispatch_throttle_release(cur_msg_size);
-  }
   fault();
 }
 
@@ -1814,6 +1786,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
       assert(!delay_state);
     }
     existing->requeue_sent();
+    existing->reset_recv_state();
 
     swap(existing->sd, sd);
     existing->can_write = WriteStatus::NOWRITE;
@@ -2145,6 +2118,7 @@ void AsyncConnection::fault()
     _stop();
     return ;
   }
+  reset_recv_state();
   if (policy.standby && !is_queued()) {
     ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl;
     state = STATE_STANDBY;
@@ -2220,6 +2194,7 @@ void AsyncConnection::_stop()
   if (sd >= 0)
     center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
 
+  reset_recv_state();
   dispatch_queue->discard_queue(conn_id);
   discard_out_queue();
   async_msgr->unregister_conn(this);
@@ -2363,6 +2338,45 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
   return rc;
 }
 
+void AsyncConnection::reset_recv_state()
+{
+  // clean up state internal variables and states
+  if (state >= STATE_CONNECTING_SEND_CONNECT_MSG &&
+      state <= STATE_CONNECTING_READY) {
+    delete authorizer;
+    authorizer = NULL;
+    got_bad_auth = false;
+  }
+
+  if (state > STATE_OPEN_MESSAGE_THROTTLE_MESSAGE &&
+      state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
+      && policy.throttler_messages) {
+    ldout(async_msgr->cct, 10) << __func__ << " releasing " << 1
+                               << " message to policy throttler "
+                               << policy.throttler_messages->get_current() << "/"
+                               << policy.throttler_messages->get_max() << dendl;
+    policy.throttler_messages->put();
+  }
+  if (state > STATE_OPEN_MESSAGE_THROTTLE_BYTES &&
+      state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) {
+    if (policy.throttler_bytes) {
+      ldout(async_msgr->cct, 10) << __func__ << " releasing " << cur_msg_size
+                                 << " bytes to policy throttler "
+                                 << policy.throttler_bytes->get_current() << "/"
+                                 << policy.throttler_bytes->get_max() << dendl;
+      policy.throttler_bytes->put(cur_msg_size);
+    }
+  }
+  if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE &&
+      state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) {
+    ldout(async_msgr->cct, 10) << __func__ << " releasing " << cur_msg_size
+                               << " bytes to dispatch_queue throttler "
+                               << dispatch_queue->dispatch_throttler.get_current() << "/"
+                               << dispatch_queue->dispatch_throttler.get_max() << dendl;
+    dispatch_queue->dispatch_throttle_release(cur_msg_size);
+  }
+}
+
 void AsyncConnection::handle_ack(uint64_t seq)
 {
   ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl;
index 170f108757846b8af7b7b3d93dce39243349c17c..741085864b6e1b6203847c9a540543fd408a942c 100644 (file)
@@ -126,6 +126,7 @@ class AsyncConnection : public Connection {
     assert(write_lock.is_locked());
     return !out_q.empty();
   }
+  void reset_recv_state();
 
    /**
    * The DelayedDelivery is for injecting delays into Message delivery off