common/admin_socket_client.cc \
common/escape.c \
common/Clock.cc \
+ common/Throttle.cc \
common/Timer.cc \
common/Finisher.cc \
common/environment.cc\
"(no journal)" : g_conf->osd_journal)
<< std::endl;
- Throttle client_throttler(g_conf->osd_client_message_size_cap);
+ Throttle client_throttler(g_ceph_context, "osd_client_bytes",
+ g_conf->osd_client_message_size_cap);
uint64_t supported =
CEPH_FEATURE_UID |
--- /dev/null
+
+#include "common/Throttle.h"
+#include "common/dout.h"
+#include "common/ceph_context.h"
+
+#define dout_subsys ceph_subsys_throttle
+
+#undef dout_prefix
+#define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") "
+
+
+Throttle::Throttle(CephContext *cct, std::string n, int64_t m)
+ : cct(cct), name(n),
+ count(0), max(m),
+ lock("Throttle::lock")
+{
+ assert(m >= 0);
+}
+
+Throttle::~Throttle()
+{
+ while (!cond.empty()) {
+ Cond *cv = cond.front();
+ delete cv;
+ cond.pop_front();
+ }
+}
+
+bool Throttle::_wait(int64_t c)
+{
+ bool waited = false;
+ if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters.
+ Cond *cv = new Cond;
+ cond.push_back(cv);
+ do {
+ if (!waited)
+ ldout(cct, 2) << "_wait waiting..." << dendl;
+ waited = true;
+ cv->Wait(lock);
+ } while (_should_wait(c) || cv != cond.front());
+
+ if (waited)
+ ldout(cct, 3) << "_wait finished waiting" << dendl;
+
+ delete cv;
+ cond.pop_front();
+
+ // wake up the next guy
+ if (!cond.empty())
+ cond.front()->SignalOne();
+ }
+ return waited;
+}
+
+bool Throttle::wait(int64_t m)
+{
+ Mutex::Locker l(lock);
+ if (m) {
+ assert(m > 0);
+ _reset_max(m);
+ }
+ ldout(cct, 5) << "wait" << dendl;
+ return _wait(0);
+}
+
+int64_t Throttle::take(int64_t c)
+{
+ assert(c >= 0);
+ Mutex::Locker l(lock);
+ ldout(cct, 5) << "take " << c << dendl;
+ count += c;
+ return count;
+}
+
+bool Throttle::get(int64_t c, int64_t m)
+{
+ assert(c >= 0);
+ Mutex::Locker l(lock);
+ ldout(cct, 5) << "get " << c << " (" << count << " -> " << (count + c) << ")" << dendl;
+ if (m) {
+ assert(m > 0);
+ _reset_max(m);
+ }
+ bool waited = _wait(c);
+ count += c;
+ return waited;
+}
+
+/* Returns true if it successfully got the requested amount,
+ * or false if it would block.
+ */
+bool Throttle::get_or_fail(int64_t c)
+{
+ assert (c >= 0);
+ Mutex::Locker l(lock);
+ if (_should_wait(c) || !cond.empty()) {
+ ldout(cct, 2) << "get_or_fail " << c << " failed" << dendl;
+ return false;
+ } else {
+ ldout(cct, 5) << "get_or_fail " << c << " success (" << count << " -> " << (count + c) << ")" << dendl;
+ count += c;
+ return true;
+ }
+}
+
+int64_t Throttle::put(int64_t c)
+{
+ assert(c >= 0);
+ Mutex::Locker l(lock);
+ ldout(cct, 5) << "put " << c << " (" << cout << " -> " << (count-c) << ")" << dendl;
+ if (c) {
+ if (!cond.empty())
+ cond.front()->SignalOne();
+ count -= c;
+ assert(count >= 0); //if count goes negative, we failed somewhere!
+ }
+ return count;
+}
#include "Cond.h"
#include <list>
+class CephContext;
+
class Throttle {
+ CephContext *cct;
+ std::string name;
int64_t count, max;
Mutex lock;
list<Cond*> cond;
public:
- Throttle(int64_t m = 0) : count(0), max(m),
- lock("Throttle::lock") {
- assert(m >= 0);
- }
- ~Throttle() {
- while (!cond.empty()) {
- Cond *cv = cond.front();
- delete cv;
- cond.pop_front();
- }
- }
+ Throttle(CephContext *cct, std::string n, int64_t m = 0);
+ ~Throttle();
private:
void _reset_max(int64_t m) {
(c >= max && count > max)); // except for large c
}
- bool _wait(int64_t c) {
- bool waited = false;
- if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters.
- Cond *cv = new Cond;
- cond.push_back(cv);
- do {
- waited = true;
- cv->Wait(lock);
- } while (_should_wait(c) || cv != cond.front());
- delete cv;
- cond.pop_front();
-
- // wake up the next guy
- if (!cond.empty())
- cond.front()->SignalOne();
- }
- return waited;
- }
+ bool _wait(int64_t c);
public:
int64_t get_current() {
int64_t get_max() { return max; }
- bool wait(int64_t m = 0) {
- Mutex::Locker l(lock);
- if (m) {
- assert(m > 0);
- _reset_max(m);
- }
- return _wait(0);
- }
+ bool wait(int64_t m = 0);
- int64_t take(int64_t c = 1) {
- assert(c >= 0);
- Mutex::Locker l(lock);
- count += c;
- return count;
- }
+ int64_t take(int64_t c = 1);
+ bool get(int64_t c = 1, int64_t m = 0);
- bool get(int64_t c = 1, int64_t m = 0) {
- assert(c >= 0);
- Mutex::Locker l(lock);
- if (m) {
- assert(m > 0);
- _reset_max(m);
- }
- bool waited = _wait(c);
- count += c;
- return waited;
- }
-
- /* Returns true if it successfully got the requested amount,
+ /**
+ * Returns true if it successfully got the requested amount,
* or false if it would block.
*/
- bool get_or_fail(int64_t c = 1) {
- assert (c >= 0);
- Mutex::Locker l(lock);
- if (_should_wait(c) || !cond.empty()) return false;
- count += c;
- return true;
- }
-
- int64_t put(int64_t c = 1) {
- assert(c >= 0);
- Mutex::Locker l(lock);
- if (c) {
- if (!cond.empty())
- cond.front()->SignalOne();
- count -= c;
- assert(count >= 0); //if count goes negative, we failed somewhere!
- }
- return count;
- }
+ bool get_or_fail(int64_t c = 1);
+ int64_t put(int64_t c = 1);
};
SUBSYS(rgw, 1, 5) // log level for the Rados gateway
SUBSYS(hadoop, 1, 5)
SUBSYS(asok, 1, 5)
+SUBSYS(throttle, 1, 5)
OPTION(key, OPT_STR, "")
OPTION(keyfile, OPT_STR, "")
Messenger(cct, name),
accepter(this),
lock("SimpleMessenger::lock"), did_bind(false),
- dispatch_throttler(cct->_conf->ms_dispatch_throttle_bytes), need_addr(true),
+ dispatch_throttler(cct, "dispatch_throttler", cct->_conf->ms_dispatch_throttle_bytes),
+ need_addr(true),
nonce(_nonce), destination_stopped(false), my_type(name.type()),
global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
reaper_thread(this), reaper_started(false), reaper_stop(false),
full_state(FULL_NOTFULL),
fd(-1),
writing_seq(0),
+ throttle_ops(g_ceph_context, "filestore_ops"),
+ throttle_bytes(g_ceph_context, "filestore_bytes"),
write_lock("FileJournal::write_lock"),
write_stop(false),
write_thread(this),
logger(NULL), tick_event(NULL),
m_request_state_hook(NULL),
num_homeless_ops(0),
- op_throttle_bytes(cct->_conf->objecter_inflight_op_bytes),
- op_throttle_ops(cct->_conf->objecter_inflight_ops)
+ op_throttle_bytes(cct, "objecter_bytes", cct->_conf->objecter_inflight_op_bytes),
+ op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops)
{ }
~Objecter() {
assert(!tick_event);
public:
RGWProcess(CephContext *cct, int num_threads)
: m_tp(cct, "RGWProcess::m_tp", num_threads),
- req_throttle(num_threads * 2),
+ req_throttle(cct, "rgw_ops", num_threads * 2),
req_wq(this, g_conf->rgw_op_thread_timeout,
g_conf->rgw_op_thread_suicide_timeout, &m_tp),
max_req_id(0) {}