From: Jim Schutt Date: Wed, 1 Feb 2012 15:54:25 +0000 (-0700) Subject: common/Throttle: throttle in FIFO order X-Git-Tag: v0.44~100 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=83432af2adce75676b734d2b99dd88372ede833a;p=ceph.git common/Throttle: throttle in FIFO order Under heavy write load from many clients, many reader threads will be waiting in the policy throttler, all on a single condition variable. When a wakeup is signalled, any of those threads may receive the signal. This increases the variance in the message processing latency, and in extreme cases can significantly delay a message. This patch causes threads to exit a throttler in the same order they entered. Signed-off-by: Jim Schutt Signed-off-by: Greg Farnum --- diff --git a/src/common/Throttle.h b/src/common/Throttle.h index f13fde05dde9..48b821e9ec0a 100644 --- a/src/common/Throttle.h +++ b/src/common/Throttle.h @@ -3,22 +3,30 @@ #include "Mutex.h" #include "Cond.h" +#include class Throttle { - int64_t count, max, waiting; + int64_t count, max; Mutex lock; - Cond cond; + list cond; public: - Throttle(int64_t m = 0) : count(0), max(m), waiting(0), + Throttle(int64_t m = 0) : count(0), max(m), lock("Throttle::lock") { assert(m >= 0); } + ~Throttle() { + while (!cond.empty()) { + Cond *cv = cond.front(); + delete cv; + cond.pop_front(); + } + } private: void _reset_max(int64_t m) { - if (m < max) - cond.SignalOne(); + if (m < max && !cond.empty()) + cond.front()->SignalOne(); max = m; } bool _should_wait(int64_t c) { @@ -27,19 +35,22 @@ private: ((c < max && count + c > max) || // normally stay under max (c >= max && count > max)); // except for large c } + bool _wait(int64_t c) { bool waited = false; - if (_should_wait(c)) { - waiting += c; + if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters. + Cond *cv = new Cond; + cond.push_back(cv); do { waited = true; - cond.Wait(lock); - } while (_should_wait(c)); - waiting -= c; + cv->Wait(lock); + } while (_should_wait(c) || cv != cond.front()); + delete cv; + cond.pop_front(); // wake up the next guy - if (waiting) - cond.SignalOne(); + if (!cond.empty()) + cond.front()->SignalOne(); } return waited; } @@ -85,7 +96,7 @@ public: bool get_or_fail(int64_t c = 1) { assert (c >= 0); Mutex::Locker l(lock); - if (_should_wait(c)) return false; + if (_should_wait(c) || !cond.empty()) return false; count += c; return true; } @@ -94,7 +105,8 @@ public: assert(c >= 0); Mutex::Locker l(lock); if (c) { - cond.SignalOne(); + if (!cond.empty()) + cond.front()->SignalOne(); count -= c; assert(count >= 0); //if count goes negative, we failed somewhere! }