]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: add DispathQueue throttle
authorHaomai Wang <haomai@xsky.com>
Fri, 6 May 2016 08:52:01 +0000 (16:52 +0800)
committerHaomai Wang <haomai@xsky.com>
Fri, 6 May 2016 16:29:31 +0000 (00:29 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index b9090720b6f3582e0fdaa70b3833b6f1d5c4abf9..b0e4455ec3318bdd36fbbdd83cb2c02d6f898b95 100644 (file)
@@ -634,14 +634,14 @@ void AsyncConnection::process()
 
       case STATE_OPEN_MESSAGE_THROTTLE_BYTES:
         {
-          uint64_t message_size = current_header.front_len + current_header.middle_len + current_header.data_len;
-          if (message_size) {
+          cur_msg_size = current_header.front_len + current_header.middle_len + current_header.data_len;
+          if (cur_msg_size) {
             if (policy.throttler_bytes) {
-              ldout(async_msgr->cct, 10) << __func__ << " wants " << message_size << " bytes from policy throttler "
+              ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from policy throttler "
                                          << policy.throttler_bytes->get_current() << "/"
                                          << policy.throttler_bytes->get_max() << dendl;
-              if (!policy.throttler_bytes->get_or_fail(message_size)) {
-                ldout(async_msgr->cct, 10) << __func__ << " wants " << message_size << " bytes from policy throttler "
+              if (!policy.throttler_bytes->get_or_fail(cur_msg_size)) {
+                ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from policy throttler "
                                            << policy.throttler_bytes->get_current() << "/"
                                            << policy.throttler_bytes->get_max() << " failed, just wait." << dendl;
                 // following thread pool deal with th full message queue isn't a
@@ -653,6 +653,25 @@ void AsyncConnection::process()
             }
           }
 
+          state = STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE;
+          break;
+        }
+
+      case STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE:
+        {
+          if (cur_msg_size) {
+            if (!dispatch_queue->dispatch_throttler.get_or_fail(cur_msg_size)) {
+              ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from dispatch throttle "
+                                         << dispatch_queue->dispatch_throttler.get_current() << "/"
+                                         << dispatch_queue->dispatch_throttler.get_max() << " failed, just wait." << dendl;
+              // following thread pool deal with th full message queue isn't a
+              // short time, so we can wait a ms.
+              if (register_time_events.empty())
+                register_time_events.insert(center->create_time_event(1000, wakeup_handler));
+              break;
+            }
+          }
+
           throttle_stamp = ceph_clock_now(msgr->cct);
           state = STATE_OPEN_MESSAGE_READ_FRONT;
           break;
@@ -815,8 +834,7 @@ void AsyncConnection::process()
 
           // store reservation size in message, so we don't get confused
           // by messages entering the dispatch queue through other paths.
-          uint64_t message_size = current_header.front_len + current_header.middle_len + current_header.data_len;
-          message->set_dispatch_throttle_size(message_size);
+          message->set_dispatch_throttle_size(cur_msg_size);
 
           message->set_recv_stamp(recv_stamp);
           message->set_throttle_stamp(throttle_stamp);
@@ -862,7 +880,7 @@ void AsyncConnection::process()
           state = STATE_OPEN;
 
           logger->inc(l_msgr_recv_messages);
-          logger->inc(l_msgr_recv_bytes, message_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
+          logger->inc(l_msgr_recv_bytes, cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
 
           async_msgr->ms_fast_preprocess(message);
           if (delay_state) {
@@ -882,6 +900,7 @@ void AsyncConnection::process()
           } else {
             dispatch_queue->enqueue(message, message->get_priority(), conn_id);
           }
+
           break;
         }
 
@@ -942,16 +961,16 @@ void AsyncConnection::process()
                         << policy.throttler_messages->get_max() << dendl;
     policy.throttler_messages->put();
   }
-  if (state > STATE_OPEN_MESSAGE_THROTTLE_BYTES &&
+  if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE &&
       state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) {
-    uint64_t message_size = current_header.front_len + current_header.middle_len + current_header.data_len;
     if (policy.throttler_bytes) {
-      ldout(async_msgr->cct,10) << __func__ << " releasing " << message_size
+      ldout(async_msgr->cct,10) << __func__ << " releasing " << cur_msg_size
                           << " bytes to policy throttler "
                           << policy.throttler_bytes->get_current() << "/"
                           << policy.throttler_bytes->get_max() << dendl;
-      policy.throttler_bytes->put(message_size);
+      policy.throttler_bytes->put(cur_msg_size);
     }
+    dispatch_queue->dispatch_throttle_release(cur_msg_size);
   }
   fault();
 }
index fcbaf5e35d5a2f93926f1dd1f5da7b4e65fd0607..2550283a1459b49b787d872fdf609d8078d62a3f 100644 (file)
@@ -210,6 +210,7 @@ class AsyncConnection : public Connection {
     STATE_OPEN_MESSAGE_HEADER,
     STATE_OPEN_MESSAGE_THROTTLE_MESSAGE,
     STATE_OPEN_MESSAGE_THROTTLE_BYTES,
+    STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE,
     STATE_OPEN_MESSAGE_READ_FRONT,
     STATE_OPEN_MESSAGE_READ_MIDDLE,
     STATE_OPEN_MESSAGE_READ_DATA_PREPARE,
@@ -247,6 +248,7 @@ class AsyncConnection : public Connection {
                                         "STATE_OPEN_MESSAGE_HEADER",
                                         "STATE_OPEN_MESSAGE_THROTTLE_MESSAGE",
                                         "STATE_OPEN_MESSAGE_THROTTLE_BYTES",
+                                        "STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE",
                                         "STATE_OPEN_MESSAGE_READ_FRONT",
                                         "STATE_OPEN_MESSAGE_READ_MIDDLE",
                                         "STATE_OPEN_MESSAGE_READ_DATA_PREPARE",
@@ -321,6 +323,7 @@ class AsyncConnection : public Connection {
   utime_t recv_stamp;
   utime_t throttle_stamp;
   unsigned msg_left;
+  uint64_t cur_msg_size;
   ceph_msg_header current_header;
   bufferlist data_buf;
   bufferlist::iterator data_blp;