]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Throttle: reduce lock hold periods
authorSamuel Just <sam.just@inktank.com>
Tue, 16 Oct 2012 19:32:20 +0000 (12:32 -0700)
committerSamuel Just <sam.just@inktank.com>
Tue, 30 Oct 2012 20:31:10 +0000 (13:31 -0700)
Previously, we tended to dump a lot of log output under
the Throttle lock.  The log level for most log statements
has been reduced to 10.

Additionally, count and max are now atomic_t and can be
read without the Throttle lock.

Finally, most of the perf counter manipulations have been
moved outside of the lock.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/common/Throttle.cc
src/common/Throttle.h

index c04c4b9085203521353b5c9b4d180d21afd0beb9..da863c5ebb2fc20c08655b28023247feea0e37f9 100644 (file)
@@ -27,7 +27,7 @@ enum {
 
 Throttle::Throttle(CephContext *cct, std::string n, int64_t m)
   : cct(cct), name(n),
-    count(0), max(m),
+               max(m),
     lock("Throttle::lock")
 {
   assert(m >= 0);
@@ -47,7 +47,7 @@ Throttle::Throttle(CephContext *cct, std::string n, int64_t m)
 
   logger = b.create_perf_counters();
   cct->get_perfcounters_collection()->add(logger);
-  logger->set(l_throttle_max, max);
+  logger->set(l_throttle_max, max.read());
 }
 
 Throttle::~Throttle()
@@ -64,10 +64,11 @@ Throttle::~Throttle()
 
 void Throttle::_reset_max(int64_t m)
 {
-  if (m < max && !cond.empty())
+  assert(lock.is_locked());
+  if (m < ((int64_t)max.read()) && !cond.empty())
     cond.front()->SignalOne();
-  max = m;
-  logger->set(l_throttle_max, max);
+  logger->set(l_throttle_max, m);
+  max.set((size_t)m);
 }
 
 bool Throttle::_wait(int64_t c)
@@ -109,36 +110,41 @@ bool Throttle::wait(int64_t m)
     assert(m > 0);
     _reset_max(m);
   }
-  ldout(cct, 5) << "wait" << dendl;
+  ldout(cct, 10) << "wait" << dendl;
   return _wait(0);
 }
 
 int64_t Throttle::take(int64_t c)
 {
   assert(c >= 0);
-  Mutex::Locker l(lock);
-  ldout(cct, 5) << "take " << c << dendl;
-  count += c;
+  ldout(cct, 10) << "take " << c << dendl;
+  {
+    Mutex::Locker l(lock);
+    count.add(c);
+  }
   logger->inc(l_throttle_take);
   logger->inc(l_throttle_take_sum, c);
-  logger->set(l_throttle_val, count);
-  return count;
+  logger->set(l_throttle_val, count.read());
+  return count.read();
 }
 
 bool Throttle::get(int64_t c, int64_t m)
 {
   assert(c >= 0);
-  Mutex::Locker l(lock);
-  ldout(cct, 5) << "get " << c << " (" << count << " -> " << (count + c) << ")" << dendl;
-  if (m) {
-    assert(m > 0);
-    _reset_max(m);
+  ldout(cct, 10) << "get " << c << " (" << count.read() << " -> " << (count.read() + c) << ")" << dendl;
+  bool waited = false;
+  {
+    Mutex::Locker l(lock);
+    if (m) {
+      assert(m > 0);
+      _reset_max(m);
+    }
+    waited = _wait(c);
+    count.add(c);
   }
-  bool waited = _wait(c);
-  count += c;
   logger->inc(l_throttle_get);
   logger->inc(l_throttle_get_sum, c);
-  logger->set(l_throttle_val, count);
+  logger->set(l_throttle_val, count.read());
   return waited;
 }
 
@@ -150,16 +156,16 @@ bool Throttle::get_or_fail(int64_t c)
   assert (c >= 0);
   Mutex::Locker l(lock);
   if (_should_wait(c) || !cond.empty()) {
-    ldout(cct, 2) << "get_or_fail " << c << " failed" << dendl;
+    ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl;
     logger->inc(l_throttle_get_or_fail_fail);
     return false;
   } else {
-    ldout(cct, 5) << "get_or_fail " << c << " success (" << count << " -> " << (count + c) << ")" << dendl;
-    count += c;
+    ldout(cct, 10) << "get_or_fail " << c << " success (" << count.read() << " -> " << (count.read() + c) << ")" << dendl;
+    count.add(c);
     logger->inc(l_throttle_get_or_fail_success);
     logger->inc(l_throttle_get);
     logger->inc(l_throttle_get_sum, c);
-    logger->set(l_throttle_val, count);
+    logger->set(l_throttle_val, count.read());
     return true;
   }
 }
@@ -167,16 +173,16 @@ bool Throttle::get_or_fail(int64_t c)
 int64_t Throttle::put(int64_t c)
 {
   assert(c >= 0);
+  ldout(cct, 10) << "put " << c << " (" << count.read() << " -> " << (count.read()-c) << ")" << dendl;
   Mutex::Locker l(lock);
-  ldout(cct, 5) << "put " << c << " (" << cout << " -> " << (count-c) << ")" << dendl;
   if (c) {
     if (!cond.empty())
       cond.front()->SignalOne();
-    count -= c;
-    assert(count >= 0); //if count goes negative, we failed somewhere!
+    assert(((int64_t)count.read()) >= c); //if count goes negative, we failed somewhere!
+    count.sub(c);
     logger->inc(l_throttle_put);
     logger->inc(l_throttle_put_sum, c);
-    logger->set(l_throttle_val, count);
+    logger->set(l_throttle_val, count.read());
   }
-  return count;
+  return count.read();
 }
index 38a6519ed3eaa585cd97c88f0c9828862828d83d..e1fbcb494c718527f1a7dd5a8f5bcd736157532d 100644 (file)
@@ -4,6 +4,7 @@
 #include "Mutex.h"
 #include "Cond.h"
 #include <list>
+#include "include/atomic.h"
 
 class CephContext;
 class PerfCounters;
@@ -12,7 +13,7 @@ class Throttle {
   CephContext *cct;
   std::string name;
   PerfCounters *logger;
-  int64_t count, max;
+       ceph::atomic_t count, max;
   Mutex lock;
   list<Cond*> cond;
   
@@ -23,21 +24,22 @@ public:
 private:
   void _reset_max(int64_t m);
   bool _should_wait(int64_t c) {
+               int64_t m = max.read();
+               int64_t cur = count.read();
     return
-      max &&
-      ((c <= max && count + c > max) ||   // normally stay under max
-       (c >= max && count > max));       // except for large c
+      m &&
+      ((c <= m && cur + c > m) || // normally stay under max
+       (c >= m && cur > m));     // except for large c
   }
 
   bool _wait(int64_t c);
 
 public:
   int64_t get_current() {
-    Mutex::Locker l(lock);
-    return count;
+    return count.read();
   }
 
-  int64_t get_max() { return max; }
+  int64_t get_max() { return max.read(); }
 
   bool wait(int64_t m = 0);