]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
throttle: feed cct, name, and add logging
authorSage Weil <sage@newdream.net>
Mon, 30 Apr 2012 17:42:39 +0000 (10:42 -0700)
committerSage Weil <sage@newdream.net>
Mon, 30 Apr 2012 20:55:05 +0000 (13:55 -0700)
Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
Reviewed-by: Greg Farnum <gregory.farnum@dreamhost.com>
src/Makefile.am
src/ceph_osd.cc
src/common/Throttle.cc [new file with mode: 0644]
src/common/Throttle.h
src/common/config_opts.h
src/msg/SimpleMessenger.h
src/os/FileJournal.h
src/osdc/Objecter.h
src/rgw/rgw_main.cc

index 20cb84fdc720485c71df0382bd2b9e2eb3eaf0d6..b55f38039ec1d05089c49785913072f6705c805e 100644 (file)
@@ -981,6 +981,7 @@ libcommon_files = \
        common/admin_socket_client.cc \
        common/escape.c \
        common/Clock.cc \
+       common/Throttle.cc \
        common/Timer.cc \
        common/Finisher.cc \
        common/environment.cc\
index cce1959d334dbf84db3dfd94a280e133cc8e4b55..f0446adf6b5b36e7c28f266cb0fbae3ad1c12af0 100644 (file)
@@ -327,7 +327,8 @@ int main(int argc, const char **argv)
                    "(no journal)" : g_conf->osd_journal)
        << std::endl;
 
-  Throttle client_throttler(g_conf->osd_client_message_size_cap);
+  Throttle client_throttler(g_ceph_context, "osd_client_bytes",
+                           g_conf->osd_client_message_size_cap);
 
   uint64_t supported =
     CEPH_FEATURE_UID | 
diff --git a/src/common/Throttle.cc b/src/common/Throttle.cc
new file mode 100644 (file)
index 0000000..fafe36d
--- /dev/null
@@ -0,0 +1,118 @@
+
+#include "common/Throttle.h"
+#include "common/dout.h"
+#include "common/ceph_context.h"
+
+#define dout_subsys ceph_subsys_throttle
+
+#undef dout_prefix
+#define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") "
+
+
+Throttle::Throttle(CephContext *cct, std::string n, int64_t m)
+  : cct(cct), name(n),
+    count(0), max(m),
+    lock("Throttle::lock")
+{
+  assert(m >= 0);
+}
+
+Throttle::~Throttle()
+{
+  while (!cond.empty()) {
+    Cond *cv = cond.front();
+    delete cv;
+    cond.pop_front();
+  }
+}
+
+bool Throttle::_wait(int64_t c)
+{
+  bool waited = false;
+  if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters.
+    Cond *cv = new Cond;
+    cond.push_back(cv);
+    do {
+      if (!waited)
+       ldout(cct, 2) << "_wait waiting..." << dendl;
+      waited = true;
+      cv->Wait(lock);
+    } while (_should_wait(c) || cv != cond.front());
+
+    if (waited)
+      ldout(cct, 3) << "_wait finished waiting" << dendl;
+
+    delete cv;
+    cond.pop_front();
+
+    // wake up the next guy
+    if (!cond.empty())
+      cond.front()->SignalOne();
+  }
+  return waited;
+}
+
+bool Throttle::wait(int64_t m)
+{
+  Mutex::Locker l(lock);
+  if (m) {
+    assert(m > 0);
+    _reset_max(m);
+  }
+  ldout(cct, 5) << "wait" << dendl;
+  return _wait(0);
+}
+
+int64_t Throttle::take(int64_t c)
+{
+  assert(c >= 0);
+  Mutex::Locker l(lock);
+  ldout(cct, 5) << "take " << c << dendl;
+  count += c;
+  return count;
+}
+
+bool Throttle::get(int64_t c, int64_t m)
+{
+  assert(c >= 0);
+  Mutex::Locker l(lock);
+  ldout(cct, 5) << "get " << c << " (" << count << " -> " << (count + c) << ")" << dendl;
+  if (m) {
+    assert(m > 0);
+    _reset_max(m);
+  }
+  bool waited = _wait(c);
+  count += c;
+  return waited;
+}
+
+/* Returns true if it successfully got the requested amount,
+ * or false if it would block.
+ */
+bool Throttle::get_or_fail(int64_t c)
+{
+  assert (c >= 0);
+  Mutex::Locker l(lock);
+  if (_should_wait(c) || !cond.empty()) {
+    ldout(cct, 2) << "get_or_fail " << c << " failed" << dendl;
+    return false;
+  } else {
+    ldout(cct, 5) << "get_or_fail " << c << " success (" << count << " -> " << (count + c) << ")" << dendl;
+    count += c;
+    return true;
+  }
+}
+
+int64_t Throttle::put(int64_t c)
+{
+  assert(c >= 0);
+  Mutex::Locker l(lock);
+  ldout(cct, 5) << "put " << c << " (" << cout << " -> " << (count-c) << ")" << dendl;
+  if (c) {
+    if (!cond.empty())
+      cond.front()->SignalOne();
+    count -= c;
+    assert(count >= 0); //if count goes negative, we failed somewhere!
+  }
+  return count;
+}
index 621f4a7f02a3638bae0a7eef255d43ccb19ffe0b..1e878a2d5101295f7761482e27801623d3378535 100644 (file)
@@ -5,23 +5,18 @@
 #include "Cond.h"
 #include <list>
 
+class CephContext;
+
 class Throttle {
+  CephContext *cct;
+  std::string name;
   int64_t count, max;
   Mutex lock;
   list<Cond*> cond;
   
 public:
-  Throttle(int64_t m = 0) : count(0), max(m),
-                         lock("Throttle::lock") {
-    assert(m >= 0);
-  }
-  ~Throttle() {
-    while (!cond.empty()) {
-      Cond *cv = cond.front();
-      delete cv;
-      cond.pop_front();
-    }
-  }
+  Throttle(CephContext *cct, std::string n, int64_t m = 0);
+  ~Throttle();
 
 private:
   void _reset_max(int64_t m) {
@@ -36,24 +31,7 @@ private:
        (c >= max && count > max));       // except for large c
   }
 
-  bool _wait(int64_t c) {
-    bool waited = false;
-    if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters.
-      Cond *cv = new Cond;
-      cond.push_back(cv);
-      do {
-       waited = true;
-        cv->Wait(lock);
-      } while (_should_wait(c) || cv != cond.front());
-      delete cv;
-      cond.pop_front();
-
-      // wake up the next guy
-      if (!cond.empty())
-        cond.front()->SignalOne();
-    }
-    return waited;
-  }
+  bool _wait(int64_t c);
 
 public:
   int64_t get_current() {
@@ -63,56 +41,17 @@ public:
 
   int64_t get_max() { return max; }
 
-  bool wait(int64_t m = 0) {
-    Mutex::Locker l(lock);
-    if (m) {
-      assert(m > 0);
-      _reset_max(m);
-    }
-    return _wait(0);
-  }
+  bool wait(int64_t m = 0);
 
-  int64_t take(int64_t c = 1) {
-    assert(c >= 0);
-    Mutex::Locker l(lock);
-    count += c;
-    return count;
-  }
+  int64_t take(int64_t c = 1);
+  bool get(int64_t c = 1, int64_t m = 0);
 
-  bool get(int64_t c = 1, int64_t m = 0) {
-    assert(c >= 0);
-    Mutex::Locker l(lock);
-    if (m) {
-      assert(m > 0);
-      _reset_max(m);
-    }
-    bool waited = _wait(c);
-    count += c;
-    return waited;
-  }
-
-  /* Returns true if it successfully got the requested amount,
+  /**
+   * Returns true if it successfully got the requested amount,
    * or false if it would block.
    */
-  bool get_or_fail(int64_t c = 1) {
-    assert (c >= 0);
-    Mutex::Locker l(lock);
-    if (_should_wait(c) || !cond.empty()) return false;
-    count += c;
-    return true;
-  }
-
-  int64_t put(int64_t c = 1) {
-    assert(c >= 0);
-    Mutex::Locker l(lock);
-    if (c) {
-      if (!cond.empty())
-        cond.front()->SignalOne();
-      count -= c;
-      assert(count >= 0); //if count goes negative, we failed somewhere!
-    }
-    return count;
-  }
+  bool get_or_fail(int64_t c = 1);
+  int64_t put(int64_t c = 1);
 };
 
 
index a025bee89ddffaf3f79a81a9bc88ab3479f97a85..e33c54b405d9ef75a5b01e1eb5e6736b7481a1d3 100644 (file)
@@ -78,6 +78,7 @@ SUBSYS(perfcounter, 1, 5)
 SUBSYS(rgw, 1, 5)                 // log level for the Rados gateway
 SUBSYS(hadoop, 1, 5)
 SUBSYS(asok, 1, 5)
+SUBSYS(throttle, 1, 5)
 
 OPTION(key, OPT_STR, "")
 OPTION(keyfile, OPT_STR, "")
index 2021509c0b5bd32e2a52677afa2f9f7d9aa9049d..6294aa7d8a1dd22df4c9976a7c8b0b61848ba3e7 100644 (file)
@@ -577,7 +577,8 @@ public:
     Messenger(cct, name),
     accepter(this),
     lock("SimpleMessenger::lock"), did_bind(false),
-    dispatch_throttler(cct->_conf->ms_dispatch_throttle_bytes), need_addr(true),
+    dispatch_throttler(cct, "dispatch_throttler", cct->_conf->ms_dispatch_throttle_bytes),
+    need_addr(true),
     nonce(_nonce), destination_stopped(false), my_type(name.type()),
     global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
     reaper_thread(this), reaper_started(false), reaper_stop(false), 
index 10f380d20b5381bef27879cfd80fba99d06df65b..7e2bb315cc06039daeb18b443e47164fc466b9e6 100644 (file)
@@ -308,6 +308,8 @@ private:
     full_state(FULL_NOTFULL),
     fd(-1),
     writing_seq(0),
+    throttle_ops(g_ceph_context, "filestore_ops"),
+    throttle_bytes(g_ceph_context, "filestore_bytes"),
     write_lock("FileJournal::write_lock"),
     write_stop(false),
     write_thread(this),
index d90e314d1e0de587eace94bc6279cfef11b5309c..9b4b251b94b25a7c0e30edb1e5932f28318d28a8 100644 (file)
@@ -922,8 +922,8 @@ public:
     logger(NULL), tick_event(NULL),
     m_request_state_hook(NULL),
     num_homeless_ops(0),
-    op_throttle_bytes(cct->_conf->objecter_inflight_op_bytes),
-    op_throttle_ops(cct->_conf->objecter_inflight_ops)
+    op_throttle_bytes(cct, "objecter_bytes", cct->_conf->objecter_inflight_op_bytes),
+    op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops)
   { }
   ~Objecter() {
     assert(!tick_event);
index b132f28200af2e6c4882b47d6439cca1235d3358..c742c1ebd3166924261d56538bc9294f452b146b 100644 (file)
@@ -180,7 +180,7 @@ class RGWProcess {
 public:
   RGWProcess(CephContext *cct, int num_threads)
     : m_tp(cct, "RGWProcess::m_tp", num_threads),
-      req_throttle(num_threads * 2),
+      req_throttle(cct, "rgw_ops", num_threads * 2),
       req_wq(this, g_conf->rgw_op_thread_timeout,
             g_conf->rgw_op_thread_suicide_timeout, &m_tp),
       max_req_id(0) {}