using ceph::mono_clock;
using ceph::mono_time;
-using ceph::uniquely_lock;
enum {
l_throttle_first = 532430,
Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m,
bool _use_perf)
- : cct(cct), name(n), max(m), use_perf(_use_perf)
+ : cct(cct), name(n), max(m),
+ lock(ceph::make_mutex(name)),
+ use_perf(_use_perf)
{
ceph_assert(m >= 0);
Throttle::~Throttle()
{
- auto l = uniquely_lock(lock);
+ std::lock_guard l(lock);
ceph_assert(conds.empty());
}
max = m;
}
-bool Throttle::_wait(int64_t c, UNIQUE_LOCK_T(lock)& l)
+bool Throttle::_wait(int64_t c, std::unique_lock<ceph::mutex>& l)
{
mono_time start;
bool waited = false;
return false;
}
- auto l = uniquely_lock(lock);
+ std::unique_lock l(lock);
if (m) {
ceph_assert(m > 0);
_reset_max(m);
ceph_assert(c >= 0);
ldout(cct, 10) << "take " << c << dendl;
{
- auto l = uniquely_lock(lock);
+ std::lock_guard l(lock);
count += c;
}
if (logger) {
}
bool waited = false;
{
- auto l = uniquely_lock(lock);
+ std::unique_lock l(lock);
if (m) {
ceph_assert(m > 0);
_reset_max(m);
}
assert (c >= 0);
- auto l = uniquely_lock(lock);
+ std::lock_guard l(lock);
if (_should_wait(c) || !conds.empty()) {
ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl;
if (logger) {
ceph_assert(c >= 0);
ldout(cct, 10) << "put " << c << " (" << count.load() << " -> "
<< (count.load()-c) << ")" << dendl;
- auto l = uniquely_lock(lock);
+ std::lock_guard l(lock);
if (c) {
if (!conds.empty())
conds.front().notify_one();
void Throttle::reset()
{
- auto l = uniquely_lock(lock);
+ std::lock_guard l(lock);
if (!conds.empty())
conds.front().notify_one();
count = 0;
BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n,
unsigned expected_concurrency, bool _use_perf)
: cct(cct), name(n),
+ lock(ceph::make_mutex(name)),
conds(expected_concurrency),///< [in] determines size of conds
use_perf(_use_perf)
{
BackoffThrottle::~BackoffThrottle()
{
- auto l = uniquely_lock(lock);
+ std::lock_guard l(lock);
ceph_assert(waiters.empty());
}
SimpleThrottle::~SimpleThrottle()
{
- auto l = uniquely_lock(m_lock);
+ std::lock_guard l(m_lock);
ceph_assert(m_current == 0);
ceph_assert(waiters == 0);
}
void SimpleThrottle::start_op()
{
- auto l = uniquely_lock(m_lock);
+ std::unique_lock l(m_lock);
waiters++;
m_cond.wait(l, [this]() { return m_max != m_current; });
waiters--;
void SimpleThrottle::end_op(int r)
{
- auto l = uniquely_lock(m_lock);
+ std::lock_guard l(m_lock);
--m_current;
if (r < 0 && !m_ret && !(r == -ENOENT && m_ignore_enoent))
m_ret = r;
bool SimpleThrottle::pending_error() const
{
- auto l = uniquely_lock(m_lock);
+ std::lock_guard l(m_lock);
return (m_ret < 0);
}
int SimpleThrottle::wait_for_ret()
{
- auto l = uniquely_lock(m_lock);
+ std::unique_lock l(m_lock);
waiters++;
m_cond.wait(l, [this]() { return m_current == 0; });
waiters--;
: m_max(max), m_ignore_enoent(ignore_enoent) {}
OrderedThrottle::~OrderedThrottle() {
- auto l = uniquely_lock(m_lock);
+ std::lock_guard l(m_lock);
ceph_assert(waiters == 0);
}
C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) {
ceph_assert(on_finish);
- auto l = uniquely_lock(m_lock);
+ std::unique_lock l(m_lock);
uint64_t tid = m_next_tid++;
m_tid_result[tid] = Result(on_finish);
auto ctx = std::make_unique<C_OrderedThrottle>(this, tid);
}
void OrderedThrottle::end_op(int r) {
- auto l = uniquely_lock(m_lock);
+ std::lock_guard l(m_lock);
ceph_assert(m_current > 0);
if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) {
}
void OrderedThrottle::finish_op(uint64_t tid, int r) {
- auto l = uniquely_lock(m_lock);
+ std::lock_guard l(m_lock);
auto it = m_tid_result.find(tid);
ceph_assert(it != m_tid_result.end());
}
bool OrderedThrottle::pending_error() const {
- auto l = uniquely_lock(m_lock);
+ std::lock_guard l(m_lock);
return (m_ret_val < 0);
}
int OrderedThrottle::wait_for_ret() {
- auto l = uniquely_lock(m_lock);
+ std::unique_lock l(m_lock);
complete_pending_ops(l);
while (m_current > 0) {
return m_ret_val;
}
-void OrderedThrottle::complete_pending_ops(UNIQUE_LOCK_T(m_lock)& l) {
+void OrderedThrottle::complete_pending_ops(std::unique_lock<ceph::mutex>& l) {
while (true) {
auto it = m_tid_result.begin();
if (it == m_tid_result.end() || it->first != m_complete_tid ||
#include <atomic>
#include <chrono>
-#include <condition_variable>
#include <iostream>
#include <list>
#include <map>
-#include <mutex>
+#include "common/ceph_mutex.h"
#include "include/Context.h"
#include "common/ThrottleInterface.h"
#include "common/Timer.h"
const std::string name;
PerfCountersRef logger;
std::atomic<int64_t> count = { 0 }, max = { 0 };
- std::mutex lock;
- std::list<std::condition_variable> conds;
+ ceph::mutex lock;
+ std::list<ceph::condition_variable> conds;
const bool use_perf;
public:
(c >= m && cur > m)); // except for large c
}
- bool _wait(int64_t c, UNIQUE_LOCK_T(lock)& l);
+ bool _wait(int64_t c, std::unique_lock<ceph::mutex>& l);
public:
/**
return _should_wait(c);
}
void reset_max(int64_t m) {
- auto l = ceph::uniquely_lock(lock);
+ std::lock_guard l(lock);
_reset_max(m);
}
};
const std::string name;
PerfCountersRef logger;
- std::mutex lock;
- using locker = std::unique_lock<std::mutex>;
+ ceph::mutex lock;
+ using locker = std::unique_lock<ceph::mutex>;
unsigned next_cond = 0;
/// allocated once to avoid constantly allocating new ones
- vector<std::condition_variable> conds;
+ vector<ceph::condition_variable> conds;
const bool use_perf;
/// pointers into conds
- list<std::condition_variable*> waiters;
+ list<ceph::condition_variable*> waiters;
- std::list<std::condition_variable*>::iterator _push_waiter() {
+ std::list<ceph::condition_variable*>::iterator _push_waiter() {
unsigned next = next_cond++;
if (next_cond == conds.size())
next_cond = 0;
bool pending_error() const;
int wait_for_ret();
private:
- mutable std::mutex m_lock;
- std::condition_variable m_cond;
+ mutable ceph::mutex m_lock = ceph::make_mutex("SimpleThrottle::m_lock");
+ ceph::condition_variable m_cond;
uint64_t m_max;
uint64_t m_current = 0;
int m_ret = 0;
typedef std::map<uint64_t, Result> TidResult;
- mutable std::mutex m_lock;
- std::condition_variable m_cond;
+ mutable ceph::mutex m_lock = ceph::make_mutex("OrderedThrottle::m_lock");
+ ceph::condition_variable m_cond;
uint64_t m_max;
uint64_t m_current = 0;
int m_ret_val = 0;
TidResult m_tid_result;
- void complete_pending_ops(UNIQUE_LOCK_T(m_lock)& l);
+ void complete_pending_ops(std::unique_lock<ceph::mutex>& l);
uint32_t waiters = 0;
};