#include "include/scope_guard.h"
#include "common/Throttle.h"
+#include "common/ceph_time.h"
#include "common/perf_counters.h"
+#include "common/Throttle.h"
+
// re-include our assert to clobber the system one; fix dout:
#include "include/assert.h"
#undef dout_prefix
#define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") "
+using ceph::mono_clock;
+using ceph::mono_time;
+using ceph::uniquely_lock;
+
enum {
l_throttle_first = 532430,
l_throttle_val,
l_throttle_last,
};
-Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, bool _use_perf)
- : cct(cct), name(n), logger(NULL),
- max(m),
- lock("Throttle::lock"),
- use_perf(_use_perf)
+Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m,
+ bool _use_perf)
+ : cct(cct), name(n), max(m), use_perf(_use_perf)
{
assert(m >= 0);
b.add_u64_counter(l_throttle_put_sum, "put_sum", "Put data");
b.add_time_avg(l_throttle_wait, "wait", "Waiting latency");
- logger = b.create_perf_counters();
- cct->get_perfcounters_collection()->add(logger);
+ logger = { b.create_perf_counters(), cct };
+ cct->get_perfcounters_collection()->add(logger.get());
logger->set(l_throttle_max, max);
}
}
Throttle::~Throttle()
{
- {
- Mutex::Locker l(lock);
- assert(cond.empty());
- }
-
- if (!use_perf)
- return;
-
- if (logger) {
- cct->get_perfcounters_collection()->remove(logger);
- delete logger;
- }
+ auto l = uniquely_lock(lock);
+ assert(conds.empty());
}
void Throttle::_reset_max(int64_t m)
{
- assert(lock.is_locked());
+ // lock must be held.
if (static_cast<int64_t>(max) == m)
return;
- if (!cond.empty())
- cond.front()->SignalOne();
+ if (!conds.empty())
+ conds.front().notify_one();
if (logger)
logger->set(l_throttle_max, m);
max = m;
}
-bool Throttle::_wait(int64_t c)
+bool Throttle::_wait(int64_t c, UNIQUE_LOCK_T(lock)& l)
{
- utime_t start;
+ mono_time start;
bool waited = false;
- if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters.
+ if (_should_wait(c) || !conds.empty()) { // always wait behind other waiters.
{
- auto cv = cond.insert(cond.end(), new Cond);
+ auto cv = conds.emplace(conds.end());
auto w = make_scope_guard([this, cv]() {
- delete *cv;
- cond.erase(cv);
+ conds.erase(cv);
});
waited = true;
ldout(cct, 2) << "_wait waiting..." << dendl;
if (logger)
- start = ceph_clock_now();
-
- do {
- (*cv)->Wait(lock);
- } while ((_should_wait(c) || cv != cond.begin()));
+ start = mono_clock::now();
+ cv->wait(l, [this, c, cv]() { return (!_should_wait(c) &&
+ cv == conds.begin()); });
ldout(cct, 2) << "_wait finished waiting" << dendl;
if (logger) {
- utime_t dur = ceph_clock_now() - start;
- logger->tinc(l_throttle_wait, dur);
+ logger->tinc(l_throttle_wait, mono_clock::now() - start);
}
}
// wake up the next guy
- if (!cond.empty())
- cond.front()->SignalOne();
+ if (!conds.empty())
+ conds.front().notify_one();
}
return waited;
}
return false;
}
- Mutex::Locker l(lock);
+ auto l = uniquely_lock(lock);
if (m) {
assert(m > 0);
_reset_max(m);
}
ldout(cct, 10) << "wait" << dendl;
- return _wait(0);
+ return _wait(0, l);
}
int64_t Throttle::take(int64_t c)
assert(c >= 0);
ldout(cct, 10) << "take " << c << dendl;
{
- Mutex::Locker l(lock);
+ auto l = uniquely_lock(lock);
count += c;
}
if (logger) {
}
bool waited = false;
{
- Mutex::Locker l(lock);
+ auto l = uniquely_lock(lock);
if (m) {
assert(m > 0);
_reset_max(m);
}
- waited = _wait(c);
+ waited = _wait(c, l);
count += c;
}
if (logger) {
}
assert (c >= 0);
- Mutex::Locker l(lock);
- if (_should_wait(c) || !cond.empty()) {
+ auto l = uniquely_lock(lock);
+ if (_should_wait(c) || !conds.empty()) {
ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl;
if (logger) {
logger->inc(l_throttle_get_or_fail_fail);
}
return false;
} else {
- ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load() << " -> " << (count.load() + c) << ")" << dendl;
+ ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load()
+ << " -> " << (count.load() + c) << ")" << dendl;
count += c;
if (logger) {
logger->inc(l_throttle_get_or_fail_success);
}
assert(c >= 0);
- ldout(cct, 10) << "put " << c << " (" << count.load() << " -> " << (count.load()-c) << ")" << dendl;
- Mutex::Locker l(lock);
+ ldout(cct, 10) << "put " << c << " (" << count.load() << " -> "
+ << (count.load()-c) << ")" << dendl;
+ auto l = uniquely_lock(lock);
if (c) {
- if (!cond.empty())
- cond.front()->SignalOne();
- assert(static_cast<int64_t>(count) >= c); // if count goes negative, we failed somewhere!
+ if (!conds.empty())
+ conds.front().notify_one();
+ // if count goes negative, we failed somewhere!
+ assert(static_cast<int64_t>(count) >= c);
count -= c;
if (logger) {
logger->inc(l_throttle_put);
void Throttle::reset()
{
- Mutex::Locker l(lock);
- if (!cond.empty())
- cond.front()->SignalOne();
+ auto l = uniquely_lock(lock);
+ if (!conds.empty())
+ conds.front().notify_one();
count = 0;
if (logger) {
logger->set(l_throttle_val, 0);
l_backoff_throttle_last,
};
-BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, unsigned expected_concurrency, bool _use_perf)
- : cct(cct), name(n), logger(NULL),
+BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n,
+ unsigned expected_concurrency, bool _use_perf)
+ : cct(cct), name(n),
conds(expected_concurrency),///< [in] determines size of conds
use_perf(_use_perf)
{
return;
if (cct->_conf->throttler_perf_counter) {
- PerfCountersBuilder b(cct, string("throttle-") + name, l_backoff_throttle_first, l_backoff_throttle_last);
+ PerfCountersBuilder b(cct, string("throttle-") + name,
+ l_backoff_throttle_first, l_backoff_throttle_last);
b.add_u64(l_backoff_throttle_val, "val", "Currently available throttle");
b.add_u64(l_backoff_throttle_max, "max", "Max value for throttle");
b.add_u64_counter(l_backoff_throttle_get, "get", "Gets");
b.add_u64_counter(l_backoff_throttle_put_sum, "put_sum", "Put data");
b.add_time_avg(l_backoff_throttle_wait, "wait", "Waiting latency");
- logger = b.create_perf_counters();
- cct->get_perfcounters_collection()->add(logger);
+ logger = { b.create_perf_counters(), cct };
+ cct->get_perfcounters_collection()->add(logger.get());
logger->set(l_backoff_throttle_max, max);
}
}
BackoffThrottle::~BackoffThrottle()
{
- {
- locker l(lock);
- assert(waiters.empty());
- }
-
- if (!use_perf)
- return;
-
- if (logger) {
- cct->get_perfcounters_collection()->remove(logger);
- delete logger;
- }
+ auto l = uniquely_lock(lock);
+ assert(waiters.empty());
}
bool BackoffThrottle::set_params(
}
auto ticket = _push_waiter();
- utime_t wait_from = ceph_clock_now();
+ auto wait_from = mono_clock::now();
bool waited = false;
while (waiters.begin() != ticket) {
if (logger) {
logger->set(l_backoff_throttle_val, current);
if (waited) {
- logger->tinc(l_backoff_throttle_wait, ceph_clock_now() - wait_from);
+ logger->tinc(l_backoff_throttle_wait, mono_clock::now() - wait_from);
}
}
}
SimpleThrottle::SimpleThrottle(uint64_t max, bool ignore_enoent)
- : m_lock("SimpleThrottle"),
- m_max(max),
- m_current(0),
- m_ret(0),
- m_ignore_enoent(ignore_enoent)
-{
-}
+ : m_max(max), m_ignore_enoent(ignore_enoent) {}
SimpleThrottle::~SimpleThrottle()
{
- Mutex::Locker l(m_lock);
+ auto l = uniquely_lock(m_lock);
assert(m_current == 0);
assert(waiters == 0);
}
void SimpleThrottle::start_op()
{
- Mutex::Locker l(m_lock);
- while (m_max == m_current) {
- waiters++;
- m_cond.Wait(m_lock);
- waiters--;
- }
+ auto l = uniquely_lock(m_lock);
+ waiters++;
+ m_cond.wait(l, [this]() { return m_max != m_current; });
+ waiters--;
++m_current;
}
void SimpleThrottle::end_op(int r)
{
- Mutex::Locker l(m_lock);
+ auto l = uniquely_lock(m_lock);
--m_current;
if (r < 0 && !m_ret && !(r == -ENOENT && m_ignore_enoent))
m_ret = r;
- m_cond.Signal();
+ m_cond.notify_all();
}
bool SimpleThrottle::pending_error() const
{
- Mutex::Locker l(m_lock);
+ auto l = uniquely_lock(m_lock);
return (m_ret < 0);
}
int SimpleThrottle::wait_for_ret()
{
- Mutex::Locker l(m_lock);
- while (m_current > 0) {
- waiters++;
- m_cond.Wait(m_lock);
- waiters--;
- }
+ auto l = uniquely_lock(m_lock);
+ waiters++;
+ m_cond.wait(l, [this]() { return m_current == 0; });
+ waiters--;
return m_ret;
}
}
OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent)
- : m_lock("OrderedThrottle::m_lock"), m_max(max), m_current(0), m_ret_val(0),
- m_ignore_enoent(ignore_enoent), m_next_tid(0), m_complete_tid(0) {
-}
+ : m_max(max), m_ignore_enoent(ignore_enoent) {}
OrderedThrottle::~OrderedThrottle() {
- Mutex::Locker locker(m_lock);
+ auto l = uniquely_lock(m_lock);
assert(waiters == 0);
}
C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) {
- assert(on_finish != NULL);
+ assert(on_finish);
- Mutex::Locker locker(m_lock);
+ auto l = uniquely_lock(m_lock);
uint64_t tid = m_next_tid++;
m_tid_result[tid] = Result(on_finish);
- C_OrderedThrottle *ctx = new C_OrderedThrottle(this, tid);
+ auto ctx = make_unique<C_OrderedThrottle>(this, tid);
- complete_pending_ops();
+ complete_pending_ops(l);
while (m_max == m_current) {
++waiters;
- m_cond.Wait(m_lock);
+ m_cond.wait(l);
--waiters;
- complete_pending_ops();
+ complete_pending_ops(l);
}
++m_current;
- return ctx;
+ return ctx.release();
}
void OrderedThrottle::end_op(int r) {
- Mutex::Locker locker(m_lock);
+ auto l = uniquely_lock(m_lock);
assert(m_current > 0);
if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) {
m_ret_val = r;
}
--m_current;
- m_cond.Signal();
+ m_cond.notify_all();
}
void OrderedThrottle::finish_op(uint64_t tid, int r) {
- Mutex::Locker locker(m_lock);
+ auto l = uniquely_lock(m_lock);
- TidResult::iterator it = m_tid_result.find(tid);
+ auto it = m_tid_result.find(tid);
assert(it != m_tid_result.end());
it->second.finished = true;
it->second.ret_val = r;
- m_cond.Signal();
+ m_cond.notify_all();
}
bool OrderedThrottle::pending_error() const {
- Mutex::Locker locker(m_lock);
+ auto l = uniquely_lock(m_lock);
return (m_ret_val < 0);
}
int OrderedThrottle::wait_for_ret() {
- Mutex::Locker locker(m_lock);
- complete_pending_ops();
+ auto l = uniquely_lock(m_lock);
+ complete_pending_ops(l);
while (m_current > 0) {
++waiters;
- m_cond.Wait(m_lock);
+ m_cond.wait(l);
--waiters;
- complete_pending_ops();
+ complete_pending_ops(l);
}
return m_ret_val;
}
-void OrderedThrottle::complete_pending_ops() {
- assert(m_lock.is_locked());
-
+void OrderedThrottle::complete_pending_ops(UNIQUE_LOCK_T(m_lock)& l) {
while (true) {
- TidResult::iterator it = m_tid_result.begin();
+ auto it = m_tid_result.begin();
if (it == m_tid_result.end() || it->first != m_complete_tid ||
!it->second.finished) {
break;
Result result = it->second;
m_tid_result.erase(it);
- m_lock.Unlock();
+ l.unlock();
result.on_finish->complete(result.ret_val);
- m_lock.Lock();
+ l.lock();
++m_complete_tid;
}