From: Sage Weil Date: Mon, 30 Apr 2012 17:42:39 +0000 (-0700) Subject: throttle: feed cct, name, and add logging X-Git-Tag: v0.47~69^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f3771b0e528e0f120f77389fe3ae7b1880cde309;p=ceph.git throttle: feed cct, name, and add logging Signed-off-by: Sage Weil Reviewed-by: Greg Farnum --- diff --git a/src/Makefile.am b/src/Makefile.am index 20cb84fdc7204..b55f38039ec1d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -981,6 +981,7 @@ libcommon_files = \ common/admin_socket_client.cc \ common/escape.c \ common/Clock.cc \ + common/Throttle.cc \ common/Timer.cc \ common/Finisher.cc \ common/environment.cc\ diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index cce1959d334db..f0446adf6b5b3 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -327,7 +327,8 @@ int main(int argc, const char **argv) "(no journal)" : g_conf->osd_journal) << std::endl; - Throttle client_throttler(g_conf->osd_client_message_size_cap); + Throttle client_throttler(g_ceph_context, "osd_client_bytes", + g_conf->osd_client_message_size_cap); uint64_t supported = CEPH_FEATURE_UID | diff --git a/src/common/Throttle.cc b/src/common/Throttle.cc new file mode 100644 index 0000000000000..fafe36d600fd8 --- /dev/null +++ b/src/common/Throttle.cc @@ -0,0 +1,118 @@ + +#include "common/Throttle.h" +#include "common/dout.h" +#include "common/ceph_context.h" + +#define dout_subsys ceph_subsys_throttle + +#undef dout_prefix +#define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") " + + +Throttle::Throttle(CephContext *cct, std::string n, int64_t m) + : cct(cct), name(n), + count(0), max(m), + lock("Throttle::lock") +{ + assert(m >= 0); +} + +Throttle::~Throttle() +{ + while (!cond.empty()) { + Cond *cv = cond.front(); + delete cv; + cond.pop_front(); + } +} + +bool Throttle::_wait(int64_t c) +{ + bool waited = false; + if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters. + Cond *cv = new Cond; + cond.push_back(cv); + do { + if (!waited) + ldout(cct, 2) << "_wait waiting..." << dendl; + waited = true; + cv->Wait(lock); + } while (_should_wait(c) || cv != cond.front()); + + if (waited) + ldout(cct, 3) << "_wait finished waiting" << dendl; + + delete cv; + cond.pop_front(); + + // wake up the next guy + if (!cond.empty()) + cond.front()->SignalOne(); + } + return waited; +} + +bool Throttle::wait(int64_t m) +{ + Mutex::Locker l(lock); + if (m) { + assert(m > 0); + _reset_max(m); + } + ldout(cct, 5) << "wait" << dendl; + return _wait(0); +} + +int64_t Throttle::take(int64_t c) +{ + assert(c >= 0); + Mutex::Locker l(lock); + ldout(cct, 5) << "take " << c << dendl; + count += c; + return count; +} + +bool Throttle::get(int64_t c, int64_t m) +{ + assert(c >= 0); + Mutex::Locker l(lock); + ldout(cct, 5) << "get " << c << " (" << count << " -> " << (count + c) << ")" << dendl; + if (m) { + assert(m > 0); + _reset_max(m); + } + bool waited = _wait(c); + count += c; + return waited; +} + +/* Returns true if it successfully got the requested amount, + * or false if it would block. + */ +bool Throttle::get_or_fail(int64_t c) +{ + assert (c >= 0); + Mutex::Locker l(lock); + if (_should_wait(c) || !cond.empty()) { + ldout(cct, 2) << "get_or_fail " << c << " failed" << dendl; + return false; + } else { + ldout(cct, 5) << "get_or_fail " << c << " success (" << count << " -> " << (count + c) << ")" << dendl; + count += c; + return true; + } +} + +int64_t Throttle::put(int64_t c) +{ + assert(c >= 0); + Mutex::Locker l(lock); + ldout(cct, 5) << "put " << c << " (" << cout << " -> " << (count-c) << ")" << dendl; + if (c) { + if (!cond.empty()) + cond.front()->SignalOne(); + count -= c; + assert(count >= 0); //if count goes negative, we failed somewhere! + } + return count; +} diff --git a/src/common/Throttle.h b/src/common/Throttle.h index 621f4a7f02a36..1e878a2d51012 100644 --- a/src/common/Throttle.h +++ b/src/common/Throttle.h @@ -5,23 +5,18 @@ #include "Cond.h" #include +class CephContext; + class Throttle { + CephContext *cct; + std::string name; int64_t count, max; Mutex lock; list cond; public: - Throttle(int64_t m = 0) : count(0), max(m), - lock("Throttle::lock") { - assert(m >= 0); - } - ~Throttle() { - while (!cond.empty()) { - Cond *cv = cond.front(); - delete cv; - cond.pop_front(); - } - } + Throttle(CephContext *cct, std::string n, int64_t m = 0); + ~Throttle(); private: void _reset_max(int64_t m) { @@ -36,24 +31,7 @@ private: (c >= max && count > max)); // except for large c } - bool _wait(int64_t c) { - bool waited = false; - if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters. - Cond *cv = new Cond; - cond.push_back(cv); - do { - waited = true; - cv->Wait(lock); - } while (_should_wait(c) || cv != cond.front()); - delete cv; - cond.pop_front(); - - // wake up the next guy - if (!cond.empty()) - cond.front()->SignalOne(); - } - return waited; - } + bool _wait(int64_t c); public: int64_t get_current() { @@ -63,56 +41,17 @@ public: int64_t get_max() { return max; } - bool wait(int64_t m = 0) { - Mutex::Locker l(lock); - if (m) { - assert(m > 0); - _reset_max(m); - } - return _wait(0); - } + bool wait(int64_t m = 0); - int64_t take(int64_t c = 1) { - assert(c >= 0); - Mutex::Locker l(lock); - count += c; - return count; - } + int64_t take(int64_t c = 1); + bool get(int64_t c = 1, int64_t m = 0); - bool get(int64_t c = 1, int64_t m = 0) { - assert(c >= 0); - Mutex::Locker l(lock); - if (m) { - assert(m > 0); - _reset_max(m); - } - bool waited = _wait(c); - count += c; - return waited; - } - - /* Returns true if it successfully got the requested amount, + /** + * Returns true if it successfully got the requested amount, * or false if it would block. */ - bool get_or_fail(int64_t c = 1) { - assert (c >= 0); - Mutex::Locker l(lock); - if (_should_wait(c) || !cond.empty()) return false; - count += c; - return true; - } - - int64_t put(int64_t c = 1) { - assert(c >= 0); - Mutex::Locker l(lock); - if (c) { - if (!cond.empty()) - cond.front()->SignalOne(); - count -= c; - assert(count >= 0); //if count goes negative, we failed somewhere! - } - return count; - } + bool get_or_fail(int64_t c = 1); + int64_t put(int64_t c = 1); }; diff --git a/src/common/config_opts.h b/src/common/config_opts.h index a025bee89ddff..e33c54b405d9e 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -78,6 +78,7 @@ SUBSYS(perfcounter, 1, 5) SUBSYS(rgw, 1, 5) // log level for the Rados gateway SUBSYS(hadoop, 1, 5) SUBSYS(asok, 1, 5) +SUBSYS(throttle, 1, 5) OPTION(key, OPT_STR, "") OPTION(keyfile, OPT_STR, "") diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 2021509c0b5bd..6294aa7d8a1dd 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -577,7 +577,8 @@ public: Messenger(cct, name), accepter(this), lock("SimpleMessenger::lock"), did_bind(false), - dispatch_throttler(cct->_conf->ms_dispatch_throttle_bytes), need_addr(true), + dispatch_throttler(cct, "dispatch_throttler", cct->_conf->ms_dispatch_throttle_bytes), + need_addr(true), nonce(_nonce), destination_stopped(false), my_type(name.type()), global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0), reaper_thread(this), reaper_started(false), reaper_stop(false), diff --git a/src/os/FileJournal.h b/src/os/FileJournal.h index 10f380d20b538..7e2bb315cc060 100644 --- a/src/os/FileJournal.h +++ b/src/os/FileJournal.h @@ -308,6 +308,8 @@ private: full_state(FULL_NOTFULL), fd(-1), writing_seq(0), + throttle_ops(g_ceph_context, "filestore_ops"), + throttle_bytes(g_ceph_context, "filestore_bytes"), write_lock("FileJournal::write_lock"), write_stop(false), write_thread(this), diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index d90e314d1e0de..9b4b251b94b25 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -922,8 +922,8 @@ public: logger(NULL), tick_event(NULL), m_request_state_hook(NULL), num_homeless_ops(0), - op_throttle_bytes(cct->_conf->objecter_inflight_op_bytes), - op_throttle_ops(cct->_conf->objecter_inflight_ops) + op_throttle_bytes(cct, "objecter_bytes", cct->_conf->objecter_inflight_op_bytes), + op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops) { } ~Objecter() { assert(!tick_event); diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index b132f28200af2..c742c1ebd3166 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -180,7 +180,7 @@ class RGWProcess { public: RGWProcess(CephContext *cct, int num_threads) : m_tp(cct, "RGWProcess::m_tp", num_threads), - req_throttle(num_threads * 2), + req_throttle(cct, "rgw_ops", num_threads * 2), req_wq(this, g_conf->rgw_op_thread_timeout, g_conf->rgw_op_thread_suicide_timeout, &m_tp), max_req_id(0) {}