* assume the caller is holding the appropriate lock.
*/
class C_Cond : public Context {
- Cond *cond; ///< Cond to signal
+ ceph::condition_variable& cond; ///< Cond to signal
bool *done; ///< true if finish() has been called
int *rval; ///< return value
public:
- C_Cond(Cond *c, bool *d, int *r) : cond(c), done(d), rval(r) {
+ C_Cond(ceph::condition_variable &c, bool *d, int *r) : cond(c), done(d), rval(r) {
*done = false;
}
void finish(int r) override {
*done = true;
*rval = r;
- cond->Signal();
+ cond.notify_all();
}
};
* already hold it.
*/
class C_SafeCond : public Context {
- Mutex *lock; ///< Mutex to take
- Cond *cond; ///< Cond to signal
+ ceph::mutex& lock; ///< Mutex to take
+ ceph::condition_variable& cond; ///< Cond to signal
bool *done; ///< true after finish() has been called
int *rval; ///< return value (optional)
public:
- C_SafeCond(Mutex *l, Cond *c, bool *d, int *r=0) : lock(l), cond(c), done(d), rval(r) {
+ C_SafeCond(ceph::mutex& l, ceph::condition_variable& c, bool *d, int *r=0)
+ : lock(l), cond(c), done(d), rval(r) {
*done = false;
}
void finish(int r) override {
- lock->lock();
+ std::lock_guard l{lock};
if (rval)
*rval = r;
*done = true;
- cond->Signal();
- lock->unlock();
+ cond.notify_all();
}
};
* until wait() returns.
*/
class C_SaferCond : public Context {
- Mutex lock; ///< Mutex to take
- Cond cond; ///< Cond to signal
- bool done; ///< true after finish() has been called
- int rval; ///< return value
+ ceph::mutex lock; ///< Mutex to take
+ ceph::condition_variable cond; ///< Cond to signal
+ bool done = false; ///< true after finish() has been called
+ int rval = 0; ///< return value
public:
- C_SaferCond() : lock("C_SaferCond"), done(false), rval(0) {}
- explicit C_SaferCond(const std::string &name) : lock(name), done(false), rval(0) {}
+ C_SaferCond() :
+ C_SaferCond("C_SaferCond")
+ {}
+ explicit C_SaferCond(const std::string &name)
+ : lock(ceph::make_mutex(name)) {}
void finish(int r) override { complete(r); }
/// We overload complete in order to not delete the context
std::lock_guard l(lock);
done = true;
rval = r;
- cond.Signal();
+ cond.notify_all();
}
/// Returns rval once the Context is called
int wait() {
- std::lock_guard l(lock);
- while (!done)
- cond.Wait(lock);
+ std::unique_lock l{lock};
+ cond.wait(l, [this] { return done;});
return rval;
}
/// Wait until the \c secs expires or \c complete() is called
int wait_for(double secs) {
- utime_t interval;
- interval.set_from_double(secs);
- std::lock_guard l{lock};
+ std::unique_lock l{lock};
if (done) {
return rval;
}
- cond.WaitInterval(lock, interval);
- return done ? rval : ETIMEDOUT;
+ if (cond.wait_for(l, ceph::make_timespan(secs), [this] { return done; })) {
+ return rval;
+ } else {
+ return ETIMEDOUT;
+ }
}
};
HeartbeatMap::HeartbeatMap(CephContext *cct)
: m_cct(cct),
- m_rwlock("HeartbeatMap::m_rwlock"),
m_unhealthy_workers(0),
m_total_workers(0)
{
heartbeat_handle_d *HeartbeatMap::add_worker(const string& name, pthread_t thread_id)
{
- m_rwlock.get_write();
+ std::unique_lock locker{m_rwlock};
ldout(m_cct, 10) << "add_worker '" << name << "'" << dendl;
heartbeat_handle_d *h = new heartbeat_handle_d(name);
ANNOTATE_BENIGN_RACE_SIZED(&h->timeout, sizeof(h->timeout),
m_workers.push_front(h);
h->list_item = m_workers.begin();
h->thread_id = thread_id;
- m_rwlock.put_write();
return h;
}
void HeartbeatMap::remove_worker(const heartbeat_handle_d *h)
{
- m_rwlock.get_write();
+ std::unique_lock locker{m_rwlock};
ldout(m_cct, 10) << "remove_worker '" << h->name << "'" << dendl;
m_workers.erase(h->list_item);
- m_rwlock.put_write();
delete h;
}
{
int unhealthy = 0;
int total = 0;
- m_rwlock.get_read();
+ m_rwlock.lock_shared();
auto now = ceph::coarse_mono_clock::now();
if (m_cct->_conf->heartbeat_inject_failure) {
ldout(m_cct, 0) << "is_healthy injecting failure for next " << m_cct->_conf->heartbeat_inject_failure << " seconds" << dendl;
}
total++;
}
- m_rwlock.put_read();
+ m_rwlock.unlock_shared();
m_unhealthy_workers = unhealthy;
m_total_workers = total;
#include <pthread.h>
#include "common/ceph_time.h"
-#include "RWLock.h"
+#include "common/ceph_mutex.h"
class CephContext;
private:
CephContext *m_cct;
- RWLock m_rwlock;
+ ceph::shared_mutex m_rwlock =
+ ceph::make_shared_mutex("HeartbeatMap::m_rwlock");
ceph::coarse_mono_clock::time_point m_inject_unhealthy_until;
std::list<heartbeat_handle_d*> m_workers;
std::atomic<unsigned> m_unhealthy_workers = { 0 };
uint64_t capacity,
uint64_t avg,
SafeTimer *timer,
- Mutex *timer_lock)
+ ceph::mutex *timer_lock)
: m_cct(cct), m_name(name),
m_throttle(m_cct, name + "_bucket", capacity),
m_avg(avg), m_timer(timer), m_timer_lock(timer_lock),
- m_lock(name + "_lock")
+ m_lock(ceph::make_mutex(name + "_lock"))
{}
TokenBucketThrottle::~TokenBucketThrottle() {
int TokenBucketThrottle::set_limit(uint64_t average, uint64_t burst) {
{
- std::lock_guard<Mutex> lock(m_lock);
+ std::lock_guard lock{m_lock};
if (0 < burst && burst < average) {
// the burst should never less than the average.
// The schedule period will be changed when the average rate is set.
{
- std::lock_guard<Mutex> timer_locker(*m_timer_lock);
+ std::lock_guard timer_locker{*m_timer_lock};
cancel_timer();
schedule_timer();
}
uint64_t m_avg = 0;
uint64_t m_burst = 0;
SafeTimer *m_timer;
- Mutex *m_timer_lock;
+ ceph::mutex *m_timer_lock;
FunctionContext *m_token_ctx = nullptr;
std::list<Blocker> m_blockers;
- Mutex m_lock;
+ ceph::mutex m_lock;
// minimum of the filling period.
uint64_t m_tick_min = 50;
public:
TokenBucketThrottle(CephContext *cct, const std::string &name,
uint64_t capacity, uint64_t avg,
- SafeTimer *timer, Mutex *timer_lock);
+ SafeTimer *timer, ceph::mutex *timer_lock);
~TokenBucketThrottle();
num_optracker_shards(num_shards),
complaint_time(0), log_threshold(0),
tracking_enabled(tracking),
- lock("OpTracker::lock"), cct(cct_) {
+ cct(cct_) {
for (uint32_t i = 0; i < num_optracker_shards; i++) {
char lock_name[32] = {0};
snprintf(lock_name, sizeof(lock_name), "%s:%" PRIu32, "OpTracker::ShardedLock", i);
if (!tracking_enabled)
return false;
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
utime_t now = ceph_clock_now();
history.dump_ops(now, f, filters, by_duration);
return true;
if (!tracking_enabled)
return false;
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
utime_t now = ceph_clock_now();
history.dump_slow_ops(now, f, filters);
return true;
if (!tracking_enabled)
return false;
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
f->open_object_section("ops_in_flight"); // overall dump
uint64_t total_ops_in_flight = 0;
f->open_array_section("ops"); // list of TrackedOps
if (!tracking_enabled)
return false;
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
uint64_t current_seq = ++seq;
uint32_t shard_index = current_seq % num_optracker_shards;
ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
void OpTracker::record_history_op(TrackedOpRef&& i)
{
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
history.insert(ceph_clock_now(), std::move(i));
}
utime_t oldest_op = now;
uint64_t total_ops_in_flight = 0;
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
for (const auto sdata : sharded_in_flight_list) {
ceph_assert(sdata);
std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
#define TRACKEDREQUEST_H_
#include <atomic>
+#include "common/ceph_mutex.h"
#include "common/histogram.h"
-#include "common/RWLock.h"
#include "common/Thread.h"
#include "common/Clock.h"
-#include "common/ceph_mutex.h"
#include "include/spinlock.h"
#include "msg/Message.h"
float complaint_time;
int log_threshold;
std::atomic<bool> tracking_enabled;
- RWLock lock;
+ ceph::shared_mutex lock = ceph::make_shared_mutex("OpTracker::lock");
public:
CephContext *cct;
*
*/
-#include "common/Mutex.h"
#include "common/HeartbeatMap.h"
#include "common/ceph_context.h"
#include "common/config.h"