]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
common/Throttle: throttle in FIFO order
authorJim Schutt <jaschut@sandia.gov>
Wed, 1 Feb 2012 15:54:25 +0000 (08:54 -0700)
committerGreg Farnum <gregory.farnum@dreamhost.com>
Thu, 2 Feb 2012 18:38:47 +0000 (10:38 -0800)
Under heavy write load from many clients, many reader threads will
be waiting in the policy throttler, all on a single condition variable.
When a wakeup is signalled, any of those threads may receive the
signal.  This increases the variance in the message processing
latency, and in extreme cases can significantly delay a message.

This patch causes threads to exit a throttler in the same order
they entered.

Signed-off-by: Jim Schutt <jaschut@sandia.gov>
Signed-off-by: Greg Farnum <gregory.farnum@dreamhost.com>
src/common/Throttle.h

index f13fde05dde9c0af15f036969a7f38ccf000bc24..48b821e9ec0a6adc11b8d43e3e77faad6c8cf407 100644 (file)
@@ -3,22 +3,30 @@
 
 #include "Mutex.h"
 #include "Cond.h"
+#include <list>
 
 class Throttle {
-  int64_t count, max, waiting;
+  int64_t count, max;
   Mutex lock;
-  Cond cond;
+  list<Cond*> cond;
   
 public:
-  Throttle(int64_t m = 0) : count(0), max(m), waiting(0),
+  Throttle(int64_t m = 0) : count(0), max(m),
                          lock("Throttle::lock") {
     assert(m >= 0);
   }
+  ~Throttle() {
+    while (!cond.empty()) {
+      Cond *cv = cond.front();
+      delete cv;
+      cond.pop_front();
+    }
+  }
 
 private:
   void _reset_max(int64_t m) {
-    if (m < max)
-      cond.SignalOne();
+    if (m < max && !cond.empty())
+      cond.front()->SignalOne();
     max = m;
   }
   bool _should_wait(int64_t c) {
@@ -27,19 +35,22 @@ private:
       ((c < max && count + c > max) ||   // normally stay under max
        (c >= max && count > max));       // except for large c
   }
+
   bool _wait(int64_t c) {
     bool waited = false;
-    if (_should_wait(c)) {
-      waiting += c;
+    if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters.
+      Cond *cv = new Cond;
+      cond.push_back(cv);
       do {
        waited = true;
-       cond.Wait(lock);
-      } while (_should_wait(c));
-      waiting -= c;
+        cv->Wait(lock);
+      } while (_should_wait(c) || cv != cond.front());
+      delete cv;
+      cond.pop_front();
 
       // wake up the next guy
-      if (waiting)
-       cond.SignalOne();
+      if (!cond.empty())
+        cond.front()->SignalOne();
     }
     return waited;
   }
@@ -85,7 +96,7 @@ public:
   bool get_or_fail(int64_t c = 1) {
     assert (c >= 0);
     Mutex::Locker l(lock);
-    if (_should_wait(c)) return false;
+    if (_should_wait(c) || !cond.empty()) return false;
     count += c;
     return true;
   }
@@ -94,7 +105,8 @@ public:
     assert(c >= 0);
     Mutex::Locker l(lock);
     if (c) {
-      cond.SignalOne();
+      if (!cond.empty())
+        cond.front()->SignalOne();
       count -= c;
       assert(count >= 0); //if count goes negative, we failed somewhere!
     }