From d5cbb94bcb7d9343efbab217a800d84414adcf8a Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Sun, 7 Jul 2019 11:20:13 +0800 Subject: [PATCH] common: s/Mutex/ceph::mutex/ Signed-off-by: Kefu Chai --- src/common/Cond.h | 51 ++++++++++++++++++++------------------ src/common/HeartbeatMap.cc | 11 +++----- src/common/HeartbeatMap.h | 5 ++-- src/common/Throttle.cc | 8 +++--- src/common/Throttle.h | 6 ++--- src/common/TrackedOp.cc | 14 +++++------ src/common/TrackedOp.h | 5 ++-- src/test/heartbeat_map.cc | 1 - 8 files changed, 50 insertions(+), 51 deletions(-) diff --git a/src/common/Cond.h b/src/common/Cond.h index efcb24166e9ca..3afe159b22f48 100644 --- a/src/common/Cond.h +++ b/src/common/Cond.h @@ -126,17 +126,17 @@ class Cond { * assume the caller is holding the appropriate lock. */ class C_Cond : public Context { - Cond *cond; ///< Cond to signal + ceph::condition_variable& cond; ///< Cond to signal bool *done; ///< true if finish() has been called int *rval; ///< return value public: - C_Cond(Cond *c, bool *d, int *r) : cond(c), done(d), rval(r) { + C_Cond(ceph::condition_variable &c, bool *d, int *r) : cond(c), done(d), rval(r) { *done = false; } void finish(int r) override { *done = true; *rval = r; - cond->Signal(); + cond.notify_all(); } }; @@ -148,21 +148,21 @@ public: * already hold it. */ class C_SafeCond : public Context { - Mutex *lock; ///< Mutex to take - Cond *cond; ///< Cond to signal + ceph::mutex& lock; ///< Mutex to take + ceph::condition_variable& cond; ///< Cond to signal bool *done; ///< true after finish() has been called int *rval; ///< return value (optional) public: - C_SafeCond(Mutex *l, Cond *c, bool *d, int *r=0) : lock(l), cond(c), done(d), rval(r) { + C_SafeCond(ceph::mutex& l, ceph::condition_variable& c, bool *d, int *r=0) + : lock(l), cond(c), done(d), rval(r) { *done = false; } void finish(int r) override { - lock->lock(); + std::lock_guard l{lock}; if (rval) *rval = r; *done = true; - cond->Signal(); - lock->unlock(); + cond.notify_all(); } }; @@ -173,13 +173,16 @@ public: * until wait() returns. */ class C_SaferCond : public Context { - Mutex lock; ///< Mutex to take - Cond cond; ///< Cond to signal - bool done; ///< true after finish() has been called - int rval; ///< return value + ceph::mutex lock; ///< Mutex to take + ceph::condition_variable cond; ///< Cond to signal + bool done = false; ///< true after finish() has been called + int rval = 0; ///< return value public: - C_SaferCond() : lock("C_SaferCond"), done(false), rval(0) {} - explicit C_SaferCond(const std::string &name) : lock(name), done(false), rval(0) {} + C_SaferCond() : + C_SaferCond("C_SaferCond") + {} + explicit C_SaferCond(const std::string &name) + : lock(ceph::make_mutex(name)) {} void finish(int r) override { complete(r); } /// We overload complete in order to not delete the context @@ -187,27 +190,27 @@ public: std::lock_guard l(lock); done = true; rval = r; - cond.Signal(); + cond.notify_all(); } /// Returns rval once the Context is called int wait() { - std::lock_guard l(lock); - while (!done) - cond.Wait(lock); + std::unique_lock l{lock}; + cond.wait(l, [this] { return done;}); return rval; } /// Wait until the \c secs expires or \c complete() is called int wait_for(double secs) { - utime_t interval; - interval.set_from_double(secs); - std::lock_guard l{lock}; + std::unique_lock l{lock}; if (done) { return rval; } - cond.WaitInterval(lock, interval); - return done ? rval : ETIMEDOUT; + if (cond.wait_for(l, ceph::make_timespan(secs), [this] { return done; })) { + return rval; + } else { + return ETIMEDOUT; + } } }; diff --git a/src/common/HeartbeatMap.cc b/src/common/HeartbeatMap.cc index d3737478f9377..165fdbb3fac3a 100644 --- a/src/common/HeartbeatMap.cc +++ b/src/common/HeartbeatMap.cc @@ -28,7 +28,6 @@ namespace ceph { HeartbeatMap::HeartbeatMap(CephContext *cct) : m_cct(cct), - m_rwlock("HeartbeatMap::m_rwlock"), m_unhealthy_workers(0), m_total_workers(0) { @@ -41,7 +40,7 @@ HeartbeatMap::~HeartbeatMap() heartbeat_handle_d *HeartbeatMap::add_worker(const string& name, pthread_t thread_id) { - m_rwlock.get_write(); + std::unique_lock locker{m_rwlock}; ldout(m_cct, 10) << "add_worker '" << name << "'" << dendl; heartbeat_handle_d *h = new heartbeat_handle_d(name); ANNOTATE_BENIGN_RACE_SIZED(&h->timeout, sizeof(h->timeout), @@ -51,16 +50,14 @@ heartbeat_handle_d *HeartbeatMap::add_worker(const string& name, pthread_t threa m_workers.push_front(h); h->list_item = m_workers.begin(); h->thread_id = thread_id; - m_rwlock.put_write(); return h; } void HeartbeatMap::remove_worker(const heartbeat_handle_d *h) { - m_rwlock.get_write(); + std::unique_lock locker{m_rwlock}; ldout(m_cct, 10) << "remove_worker '" << h->name << "'" << dendl; m_workers.erase(h->list_item); - m_rwlock.put_write(); delete h; } @@ -119,7 +116,7 @@ bool HeartbeatMap::is_healthy() { int unhealthy = 0; int total = 0; - m_rwlock.get_read(); + m_rwlock.lock_shared(); auto now = ceph::coarse_mono_clock::now(); if (m_cct->_conf->heartbeat_inject_failure) { ldout(m_cct, 0) << "is_healthy injecting failure for next " << m_cct->_conf->heartbeat_inject_failure << " seconds" << dendl; @@ -146,7 +143,7 @@ bool HeartbeatMap::is_healthy() } total++; } - m_rwlock.put_read(); + m_rwlock.unlock_shared(); m_unhealthy_workers = unhealthy; m_total_workers = total; diff --git a/src/common/HeartbeatMap.h b/src/common/HeartbeatMap.h index f7ffd9eb62090..28e16662d3f3e 100644 --- a/src/common/HeartbeatMap.h +++ b/src/common/HeartbeatMap.h @@ -21,7 +21,7 @@ #include #include "common/ceph_time.h" -#include "RWLock.h" +#include "common/ceph_mutex.h" class CephContext; @@ -81,7 +81,8 @@ class HeartbeatMap { private: CephContext *m_cct; - RWLock m_rwlock; + ceph::shared_mutex m_rwlock = + ceph::make_shared_mutex("HeartbeatMap::m_rwlock"); ceph::coarse_mono_clock::time_point m_inject_unhealthy_until; std::list m_workers; std::atomic m_unhealthy_workers = { 0 }; diff --git a/src/common/Throttle.cc b/src/common/Throttle.cc index 40a8e6ed3eb95..70770fbe30128 100644 --- a/src/common/Throttle.cc +++ b/src/common/Throttle.cc @@ -708,11 +708,11 @@ TokenBucketThrottle::TokenBucketThrottle( uint64_t capacity, uint64_t avg, SafeTimer *timer, - Mutex *timer_lock) + ceph::mutex *timer_lock) : m_cct(cct), m_name(name), m_throttle(m_cct, name + "_bucket", capacity), m_avg(avg), m_timer(timer), m_timer_lock(timer_lock), - m_lock(name + "_lock") + m_lock(ceph::make_mutex(name + "_lock")) {} TokenBucketThrottle::~TokenBucketThrottle() { @@ -735,7 +735,7 @@ TokenBucketThrottle::~TokenBucketThrottle() { int TokenBucketThrottle::set_limit(uint64_t average, uint64_t burst) { { - std::lock_guard lock(m_lock); + std::lock_guard lock{m_lock}; if (0 < burst && burst < average) { // the burst should never less than the average. @@ -769,7 +769,7 @@ int TokenBucketThrottle::set_limit(uint64_t average, uint64_t burst) { // The schedule period will be changed when the average rate is set. { - std::lock_guard timer_locker(*m_timer_lock); + std::lock_guard timer_locker{*m_timer_lock}; cancel_timer(); schedule_timer(); } diff --git a/src/common/Throttle.h b/src/common/Throttle.h index a249532b58f48..5dfd92dc62d9c 100644 --- a/src/common/Throttle.h +++ b/src/common/Throttle.h @@ -362,10 +362,10 @@ class TokenBucketThrottle { uint64_t m_avg = 0; uint64_t m_burst = 0; SafeTimer *m_timer; - Mutex *m_timer_lock; + ceph::mutex *m_timer_lock; FunctionContext *m_token_ctx = nullptr; std::list m_blockers; - Mutex m_lock; + ceph::mutex m_lock; // minimum of the filling period. uint64_t m_tick_min = 50; @@ -409,7 +409,7 @@ class TokenBucketThrottle { public: TokenBucketThrottle(CephContext *cct, const std::string &name, uint64_t capacity, uint64_t avg, - SafeTimer *timer, Mutex *timer_lock); + SafeTimer *timer, ceph::mutex *timer_lock); ~TokenBucketThrottle(); diff --git a/src/common/TrackedOp.cc b/src/common/TrackedOp.cc index c0e60e73d5c57..474f16dadb739 100644 --- a/src/common/TrackedOp.cc +++ b/src/common/TrackedOp.cc @@ -149,7 +149,7 @@ OpTracker::OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards): num_optracker_shards(num_shards), complaint_time(0), log_threshold(0), tracking_enabled(tracking), - lock("OpTracker::lock"), cct(cct_) { + cct(cct_) { for (uint32_t i = 0; i < num_optracker_shards; i++) { char lock_name[32] = {0}; snprintf(lock_name, sizeof(lock_name), "%s:%" PRIu32, "OpTracker::ShardedLock", i); @@ -171,7 +171,7 @@ bool OpTracker::dump_historic_ops(Formatter *f, bool by_duration, set fi if (!tracking_enabled) return false; - RWLock::RLocker l(lock); + std::shared_lock l{lock}; utime_t now = ceph_clock_now(); history.dump_ops(now, f, filters, by_duration); return true; @@ -206,7 +206,7 @@ bool OpTracker::dump_historic_slow_ops(Formatter *f, set filters) if (!tracking_enabled) return false; - RWLock::RLocker l(lock); + std::shared_lock l{lock}; utime_t now = ceph_clock_now(); history.dump_slow_ops(now, f, filters); return true; @@ -217,7 +217,7 @@ bool OpTracker::dump_ops_in_flight(Formatter *f, bool print_only_blocked, setopen_object_section("ops_in_flight"); // overall dump uint64_t total_ops_in_flight = 0; f->open_array_section("ops"); // list of TrackedOps @@ -252,7 +252,7 @@ bool OpTracker::register_inflight_op(TrackedOp *i) if (!tracking_enabled) return false; - RWLock::RLocker l(lock); + std::shared_lock l{lock}; uint64_t current_seq = ++seq; uint32_t shard_index = current_seq % num_optracker_shards; ShardedTrackingData* sdata = sharded_in_flight_list[shard_index]; @@ -282,7 +282,7 @@ void OpTracker::unregister_inflight_op(TrackedOp* const i) void OpTracker::record_history_op(TrackedOpRef&& i) { - RWLock::RLocker l(lock); + std::shared_lock l{lock}; history.insert(ceph_clock_now(), std::move(i)); } @@ -296,7 +296,7 @@ bool OpTracker::visit_ops_in_flight(utime_t* oldest_secs, utime_t oldest_op = now; uint64_t total_ops_in_flight = 0; - RWLock::RLocker l(lock); + std::shared_lock l{lock}; for (const auto sdata : sharded_in_flight_list) { ceph_assert(sdata); std::lock_guard locker(sdata->ops_in_flight_lock_sharded); diff --git a/src/common/TrackedOp.h b/src/common/TrackedOp.h index 9a545c894a25d..e70cba1171138 100644 --- a/src/common/TrackedOp.h +++ b/src/common/TrackedOp.h @@ -15,11 +15,10 @@ #define TRACKEDREQUEST_H_ #include +#include "common/ceph_mutex.h" #include "common/histogram.h" -#include "common/RWLock.h" #include "common/Thread.h" #include "common/Clock.h" -#include "common/ceph_mutex.h" #include "include/spinlock.h" #include "msg/Message.h" @@ -108,7 +107,7 @@ class OpTracker { float complaint_time; int log_threshold; std::atomic tracking_enabled; - RWLock lock; + ceph::shared_mutex lock = ceph::make_shared_mutex("OpTracker::lock"); public: CephContext *cct; diff --git a/src/test/heartbeat_map.cc b/src/test/heartbeat_map.cc index 97be94d9d6ca6..7c98c9082349d 100644 --- a/src/test/heartbeat_map.cc +++ b/src/test/heartbeat_map.cc @@ -12,7 +12,6 @@ * */ -#include "common/Mutex.h" #include "common/HeartbeatMap.h" #include "common/ceph_context.h" #include "common/config.h" -- 2.39.5