From 02292616a320753f638c9fa62b6b714126af4012 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Sun, 7 Jul 2019 12:40:52 +0800 Subject: [PATCH] rgw: s/Mutex/ceph::mutex/ Signed-off-by: Kefu Chai --- src/rgw/librgw.cc | 14 ++--- src/rgw/librgw_admin_user.cc | 14 ++--- src/rgw/rgw_bucket.cc | 41 ++++++------- src/rgw/rgw_bucket.h | 28 ++++----- src/rgw/rgw_cache.cc | 20 +++--- src/rgw/rgw_cache.h | 8 +-- src/rgw/rgw_civetweb_frontend.cc | 2 +- src/rgw/rgw_coroutine.cc | 52 ++++++++-------- src/rgw/rgw_coroutine.h | 33 +++++----- src/rgw/rgw_cr_rados.h | 46 +++++++------- src/rgw/rgw_cr_rest.cc | 6 +- src/rgw/rgw_cr_rest.h | 2 +- src/rgw/rgw_data_sync.cc | 38 ++++++------ src/rgw/rgw_data_sync.h | 4 +- src/rgw/rgw_gc.cc | 9 ++- src/rgw/rgw_gc.h | 8 +-- src/rgw/rgw_http_client.cc | 82 ++++++++++++------------- src/rgw/rgw_http_client.h | 12 ++-- src/rgw/rgw_keystone.cc | 18 +++--- src/rgw/rgw_keystone.h | 5 +- src/rgw/rgw_lc.cc | 9 ++- src/rgw/rgw_lc.h | 8 +-- src/rgw/rgw_log.cc | 26 ++++---- src/rgw/rgw_log.h | 2 +- src/rgw/rgw_main.cc | 14 ++--- src/rgw/rgw_metadata.cc | 4 +- src/rgw/rgw_object_expirer_core.cc | 9 ++- src/rgw/rgw_object_expirer_core.h | 9 ++- src/rgw/rgw_quota.cc | 35 ++++++----- src/rgw/rgw_rados.cc | 91 +++++++++++++--------------- src/rgw/rgw_rados.h | 48 +++++++-------- src/rgw/rgw_realm_reloader.cc | 16 +++-- src/rgw/rgw_realm_reloader.h | 4 +- src/rgw/rgw_reshard.cc | 9 ++- src/rgw/rgw_reshard.h | 9 ++- src/rgw/rgw_rest_client.cc | 14 ++--- src/rgw/rgw_rest_client.h | 11 ++-- src/rgw/rgw_sync.cc | 12 ++-- src/rgw/rgw_sync.h | 17 +++--- src/rgw/rgw_sync_module.h | 8 +-- src/rgw/rgw_sync_trace.h | 4 +- src/rgw/services/svc_notify.cc | 10 +-- src/rgw/services/svc_notify.h | 2 +- src/rgw/services/svc_sys_obj_cache.h | 8 +-- src/rgw/services/svc_sys_obj_core.h | 22 +++---- 45 files changed, 411 insertions(+), 432 deletions(-) diff --git a/src/rgw/librgw.cc b/src/rgw/librgw.cc index 5e8578f0baf..4f5b31a76cb 100644 --- a/src/rgw/librgw.cc +++ b/src/rgw/librgw.cc @@ -491,12 +491,12 @@ namespace rgw { CODE_ENVIRONMENT_DAEMON, CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); - Mutex mutex("main"); + ceph::mutex mutex = ceph::make_mutex("main"); SafeTimer init_timer(g_ceph_context, mutex); init_timer.init(); - mutex.Lock(); + mutex.lock(); init_timer.add_event_after(g_conf()->rgw_init_timeout, new C_InitTimeout); - mutex.Unlock(); + mutex.unlock(); common_init_finish(g_ceph_context); @@ -514,10 +514,10 @@ namespace rgw { g_conf().get_val("rgw_dynamic_resharding")); if (!store) { - mutex.Lock(); + mutex.lock(); init_timer.cancel_all_events(); init_timer.shutdown(); - mutex.Unlock(); + mutex.unlock(); derr << "Couldn't init storage provider (RADOS)" << dendl; return -EIO; @@ -527,10 +527,10 @@ namespace rgw { rgw_rest_init(g_ceph_context, store, store->svc.zone->get_zonegroup()); - mutex.Lock(); + mutex.lock(); init_timer.cancel_all_events(); init_timer.shutdown(); - mutex.Unlock(); + mutex.unlock(); if (r) return -EIO; diff --git a/src/rgw/librgw_admin_user.cc b/src/rgw/librgw_admin_user.cc index 928f04cb329..ada78c13a90 100644 --- a/src/rgw/librgw_admin_user.cc +++ b/src/rgw/librgw_admin_user.cc @@ -89,31 +89,31 @@ namespace rgw { CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); - Mutex mutex("main"); + ceph::mutex mutex = ceph::make_mutex("main"); SafeTimer init_timer(g_ceph_context, mutex); init_timer.init(); - mutex.Lock(); + mutex.lock(); init_timer.add_event_after(g_conf()->rgw_init_timeout, new C_InitTimeout); - mutex.Unlock(); + mutex.unlock(); common_init_finish(g_ceph_context); store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false, false); if (!store) { - mutex.Lock(); + mutex.lock(); init_timer.cancel_all_events(); init_timer.shutdown(); - mutex.Unlock(); + mutex.unlock(); derr << "Couldn't init storage provider (RADOS)" << dendl; return -EIO; } - mutex.Lock(); + mutex.lock(); init_timer.cancel_all_events(); init_timer.shutdown(); - mutex.Unlock(); + mutex.unlock(); rgw_user_init(store); diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index dff579b9ecb..369cd80da34 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -2140,10 +2140,10 @@ int RGWDataChangesLog::renew_entries() * it later, so we keep two lists under the map */ map, list > > m; - lock.Lock(); + lock.lock(); map entries; entries.swap(cur_cycle); - lock.Unlock(); + lock.unlock(); map::iterator iter; string section; @@ -2197,7 +2197,7 @@ int RGWDataChangesLog::renew_entries() void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status) { - ceph_assert(lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(lock)); if (!changes.find(bs, status)) { status = ChangeStatusPtr(new ChangeStatus); changes.add(bs, status); @@ -2206,13 +2206,13 @@ void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; cur_cycle[bs] = true; } void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, real_time& expiration) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; ChangeStatusPtr status; _get_change(bs, status); @@ -2239,22 +2239,22 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { int index = choose_oid(bs); mark_modified(index, bs); - lock.Lock(); + lock.lock(); ChangeStatusPtr status; _get_change(bs, status); - lock.Unlock(); + lock.unlock(); real_time now = real_clock::now(); - status->lock->Lock(); + status->lock.lock(); ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl; if (now < status->cur_expiration) { /* no need to send, recently completed */ - status->lock->Unlock(); + status->lock.unlock(); register_renew(bs); return 0; @@ -2268,7 +2268,7 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { ceph_assert(cond); status->cond->get(); - status->lock->Unlock(); + status->lock.unlock(); int ret = cond->wait(); cond->put(); @@ -2292,7 +2292,7 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { expiration = now; expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window); - status->lock->Unlock(); + status->lock.unlock(); bufferlist bl; rgw_data_change change; @@ -2308,7 +2308,7 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { now = real_clock::now(); - status->lock->Lock(); + status->lock.lock(); } while (!ret && real_clock::now() > expiration); @@ -2318,7 +2318,7 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */ status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window); status->cond = NULL; - status->lock->Unlock(); + status->lock.unlock(); cond->done(ret); cond->put(); @@ -2470,9 +2470,8 @@ void *RGWDataChangesLog::ChangesRenewThread::entry() { break; int interval = cct->_conf->rgw_data_log_window * 3 / 4; - lock.Lock(); - cond.WaitInterval(lock, utime_t(interval, 0)); - lock.Unlock(); + std::unique_lock locker{lock}; + cond.wait_for(locker, std::chrono::seconds(interval)); } while (!log->going_down()); return NULL; @@ -2480,14 +2479,14 @@ void *RGWDataChangesLog::ChangesRenewThread::entry() { void RGWDataChangesLog::ChangesRenewThread::stop() { - Mutex::Locker l(lock); - cond.Signal(); + std::lock_guard l{lock}; + cond.notify_all(); } void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs) { auto key = bs.get_key(); - modified_lock.get_read(); + modified_lock.lock_shared(); map >::iterator iter = modified_shards.find(shard_id); if (iter != modified_shards.end()) { set& keys = iter->second; @@ -2498,13 +2497,13 @@ void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs) } modified_lock.unlock(); - RWLock::WLocker wl(modified_lock); + std::unique_lock wl{modified_lock}; modified_shards[shard_id].insert(key); } void RGWDataChangesLog::read_clear_modified(map > &modified) { - RWLock::WLocker wl(modified_lock); + std::unique_lock wl{modified_lock}; modified.swap(modified_shards); modified_shards.clear(); } diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index 9e7dce4e595..d062d52f6b5 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -458,8 +458,9 @@ class RGWDataChangesLog { int num_shards; string *oids; - Mutex lock; - RWLock modified_lock; + ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock"); + ceph::shared_mutex modified_lock = + ceph::make_shared_mutex("RGWDataChangesLog::modified_lock"); map > modified_shards; std::atomic down_flag = { false }; @@ -467,17 +468,13 @@ class RGWDataChangesLog { struct ChangeStatus { real_time cur_expiration; real_time cur_sent; - bool pending; - RefCountedCond *cond; - Mutex *lock; + bool pending = false; + RefCountedCond *cond = nullptr; + ceph::mutex lock = + ceph::make_mutex("RGWDataChangesLog::ChangeStatus"); - ChangeStatus() : pending(false), cond(NULL) { - lock = new Mutex("RGWDataChangesLog::ChangeStatus"); - } - - ~ChangeStatus() { - delete lock; - } + ChangeStatus() = default; + ~ChangeStatus() = default; }; typedef std::shared_ptr ChangeStatusPtr; @@ -493,11 +490,11 @@ class RGWDataChangesLog { class ChangesRenewThread : public Thread { CephContext *cct; RGWDataChangesLog *log; - Mutex lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("ChangesRenewThread::lock"); + ceph::condition_variable cond; public: - ChangesRenewThread(CephContext *_cct, RGWDataChangesLog *_log) : cct(_cct), log(_log), lock("ChangesRenewThread::lock") {} + ChangesRenewThread(CephContext *_cct, RGWDataChangesLog *_log) : cct(_cct), log(_log) {} void *entry() override; void stop(); }; @@ -507,7 +504,6 @@ class RGWDataChangesLog { public: RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), - lock("RGWDataChangesLog::lock"), modified_lock("RGWDataChangesLog::modified_lock"), changes(cct->_conf->rgw_data_log_changes_size) { num_shards = cct->_conf->rgw_data_log_num_shards; diff --git a/src/rgw/rgw_cache.cc b/src/rgw/rgw_cache.cc index 5acc9f9df50..38f7b739c5d 100644 --- a/src/rgw/rgw_cache.cc +++ b/src/rgw/rgw_cache.cc @@ -11,7 +11,7 @@ int ObjectCache::get(const string& name, ObjectCacheInfo& info, uint32_t mask, rgw_cache_entry_info *cache_info) { - RWLock::RLocker l(lock); + std::shared_lock l{lock}; if (!enabled) { return -ENOENT; @@ -28,7 +28,7 @@ int ObjectCache::get(const string& name, ObjectCacheInfo& info, uint32_t mask, r (ceph::coarse_mono_clock::now() - iter->second.info.time_added) > expiry) { ldout(cct, 10) << "cache get: name=" << name << " : expiry miss" << dendl; lock.unlock(); - lock.get_write(); + lock.lock(); // check that wasn't already removed by other thread iter = cache_map.find(name); if (iter != cache_map.end()) { @@ -48,7 +48,7 @@ int ObjectCache::get(const string& name, ObjectCacheInfo& info, uint32_t mask, r ldout(cct, 20) << "cache get: touching lru, lru_counter=" << lru_counter << " promotion_ts=" << entry->lru_promotion_ts << dendl; lock.unlock(); - lock.get_write(); /* promote lock to writer */ + lock.lock(); /* promote lock to writer */ /* need to redo this because entry might have dropped off the cache */ iter = cache_map.find(name); @@ -90,7 +90,7 @@ int ObjectCache::get(const string& name, ObjectCacheInfo& info, uint32_t mask, r bool ObjectCache::chain_cache_entry(std::initializer_list cache_info_entries, RGWChainedCache::Entry *chained_entry) { - RWLock::WLocker l(lock); + std::unique_lock l{lock}; if (!enabled) { return false; @@ -132,7 +132,7 @@ bool ObjectCache::chain_cache_entry(std::initializer_list void ObjectCache::put(const string& name, ObjectCacheInfo& info, rgw_cache_entry_info *cache_info) { - RWLock::WLocker l(lock); + std::unique_lock l{lock}; if (!enabled) { return; @@ -204,7 +204,7 @@ void ObjectCache::put(const string& name, ObjectCacheInfo& info, rgw_cache_entry bool ObjectCache::remove(const string& name) { - RWLock::WLocker l(lock); + std::unique_lock l{lock}; if (!enabled) { return false; @@ -288,7 +288,7 @@ void ObjectCache::invalidate_lru(ObjectCacheEntry& entry) void ObjectCache::set_enabled(bool status) { - RWLock::WLocker l(lock); + std::unique_lock l{lock}; enabled = status; @@ -299,7 +299,7 @@ void ObjectCache::set_enabled(bool status) void ObjectCache::invalidate_all() { - RWLock::WLocker l(lock); + std::unique_lock l{lock}; do_invalidate_all(); } @@ -319,12 +319,12 @@ void ObjectCache::do_invalidate_all() } void ObjectCache::chain_cache(RGWChainedCache *cache) { - RWLock::WLocker l(lock); + std::unique_lock l{lock}; chained_cache.push_back(cache); } void ObjectCache::unchain_cache(RGWChainedCache *cache) { - RWLock::WLocker l(lock); + std::unique_lock l{lock}; auto iter = chained_cache.begin(); for (; iter != chained_cache.end(); ++iter) { diff --git a/src/rgw/rgw_cache.h b/src/rgw/rgw_cache.h index b0696237fe6..36d5e0e83a6 100644 --- a/src/rgw/rgw_cache.h +++ b/src/rgw/rgw_cache.h @@ -11,7 +11,7 @@ #include "include/types.h" #include "include/utime.h" #include "include/ceph_assert.h" -#include "common/RWLock.h" +#include "common/ceph_mutex.h" enum { UPDATE_OBJ, @@ -160,7 +160,7 @@ class ObjectCache { unsigned long lru_size; unsigned long lru_counter; unsigned long lru_window; - RWLock lock; + ceph::shared_mutex lock = ceph::make_shared_mutex("ObjectCache"); CephContext *cct; vector chained_cache; @@ -176,7 +176,7 @@ class ObjectCache { void do_invalidate_all(); public: - ObjectCache() : lru_size(0), lru_counter(0), lru_window(0), lock("ObjectCache"), cct(NULL), enabled(false) { } + ObjectCache() : lru_size(0), lru_counter(0), lru_window(0), cct(NULL), enabled(false) { } ~ObjectCache(); int get(const std::string& name, ObjectCacheInfo& bl, uint32_t mask, rgw_cache_entry_info *cache_info); std::optional get(const std::string& name) { @@ -187,7 +187,7 @@ public: template void for_each(const F& f) { - RWLock::RLocker l(lock); + std::shared_lock l{lock}; if (enabled) { auto now = ceph::coarse_mono_clock::now(); for (const auto& [name, entry] : cache_map) { diff --git a/src/rgw/rgw_civetweb_frontend.cc b/src/rgw/rgw_civetweb_frontend.cc index 18aedede0f2..b2cb2b5ff6c 100644 --- a/src/rgw/rgw_civetweb_frontend.cc +++ b/src/rgw/rgw_civetweb_frontend.cc @@ -55,7 +55,7 @@ static int civetweb_callback(struct mg_connection* conn) int RGWCivetWebFrontend::process(struct mg_connection* const conn) { /* Hold a read lock over access to env.store for reconfiguration. */ - RWLock::RLocker lock(env.mutex); + std::shared_lock lock{env.mutex}; RGWCivetWeb cw_client(conn); auto real_client_io = rgw::io::add_reordering( diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index 4ef9a6ac94e..1ed88ba4716 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -23,7 +23,7 @@ public: } }; -RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct), lock("RGWCompletionManager::lock"), +RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct), timer(cct, lock) { timer.init(); @@ -31,20 +31,20 @@ RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct), lock( RGWCompletionManager::~RGWCompletionManager() { - Mutex::Locker l(lock); + std::lock_guard l{lock}; timer.cancel_all_events(); timer.shutdown(); } void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; _complete(cn, io_id, user_info); } void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (cn) { cns.insert(cn); } @@ -52,7 +52,7 @@ void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (cn) { cns.erase(cn); } @@ -69,17 +69,17 @@ void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, const rgw_io_ return; } complete_reqs.push_back(io_completion{io_id, user_info}); - cond.Signal(); + cond.notify_all(); } int RGWCompletionManager::get_next(io_completion *io) { - Mutex::Locker l(lock); + std::unique_lock l{lock}; while (complete_reqs.empty()) { if (going_down) { return -ECANCELED; } - cond.Wait(lock); + cond.wait(l); } *io = complete_reqs.front(); complete_reqs_set.erase(io->io_id); @@ -89,7 +89,7 @@ int RGWCompletionManager::get_next(io_completion *io) bool RGWCompletionManager::try_get_next(io_completion *io) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (complete_reqs.empty()) { return false; } @@ -101,17 +101,17 @@ bool RGWCompletionManager::try_get_next(io_completion *io) void RGWCompletionManager::go_down() { - Mutex::Locker l(lock); + std::lock_guard l{lock}; for (auto cn : cns) { cn->unregister(); } going_down = true; - cond.Signal(); + cond.notify_all(); } void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval, void *user_info) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; ceph_assert(waiters.find(opaque) == waiters.end()); waiters[opaque] = user_info; timer.add_event_after(interval, new WaitContext(this, opaque)); @@ -119,7 +119,7 @@ void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval, void RGWCompletionManager::wakeup(void *opaque) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; _wakeup(opaque); } @@ -176,7 +176,7 @@ void RGWCoroutine::StatusItem::dump(Formatter *f) const { stringstream& RGWCoroutine::Status::set_status() { - RWLock::WLocker l(lock); + std::unique_lock l{lock}; string s = status.str(); status.str(string()); if (!timestamp.is_zero()) { @@ -437,7 +437,7 @@ static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg) RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data) : completion_mgr(_mgr), io_id(_io_id), - user_data(_user_data), lock("RGWAioCompletionNotifier"), registered(true) { + user_data(_user_data), registered(true) { c = librados::Rados::aio_create_completion((void *)this, NULL, _aio_completion_notifier_cb); } @@ -532,7 +532,7 @@ bool RGWCoroutinesStack::consume_io_finish(const rgw_io_id& io_id) void RGWCoroutinesManager::handle_unblocked_stack(set& context_stacks, list& scheduled_stacks, RGWCompletionManager::io_completion& io, int *blocked_count) { - ceph_assert(lock.is_wlocked()); + ceph_assert(ceph_mutex_is_wlocked(lock)); RGWCoroutinesStack *stack = static_cast(io.user_info); if (context_stacks.find(stack) == context_stacks.end()) { return; @@ -558,13 +558,13 @@ void RGWCoroutinesManager::handle_unblocked_stack(set& con void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack) { - RWLock::WLocker wl(lock); + std::unique_lock wl{lock}; _schedule(env, stack); } void RGWCoroutinesManager::_schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack) { - ceph_assert(lock.is_wlocked()); + ceph_assert(ceph_mutex_is_wlocked(lock)); if (!stack->is_scheduled) { env->scheduled_stacks->push_back(stack); stack->set_is_scheduled(true); @@ -594,7 +594,7 @@ int RGWCoroutinesManager::run(list& stacks) uint64_t run_context = ++run_context_count; - lock.get_write(); + lock.lock(); set& context_stacks = run_contexts[run_context]; list scheduled_stacks; for (auto& st : stacks) { @@ -622,7 +622,7 @@ int RGWCoroutinesManager::run(list& stacks) ret = stack->operate(&env); - lock.get_write(); + lock.lock(); stack->set_is_scheduled(false); if (ret < 0) { @@ -691,7 +691,7 @@ int RGWCoroutinesManager::run(list& stacks) while (blocked_count - interval_wait_count >= ops_window) { lock.unlock(); ret = completion_mgr->get_next(&io); - lock.get_write(); + lock.lock(); if (ret < 0) { ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl; } @@ -702,7 +702,7 @@ next: while (scheduled_stacks.empty() && blocked_count > 0) { lock.unlock(); ret = completion_mgr->get_next(&io); - lock.get_write(); + lock.lock(); if (ret < 0) { ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl; } @@ -779,7 +779,7 @@ RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCo } void RGWCoroutinesManager::dump(Formatter *f) const { - RWLock::RLocker rl(lock); + std::shared_lock rl{lock}; f->open_array_section("run_contexts"); for (auto& i : run_contexts) { @@ -811,7 +811,7 @@ string RGWCoroutinesManager::get_id() void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr) { - RWLock::WLocker wl(lock); + std::unique_lock wl{lock}; if (managers.find(mgr) == managers.end()) { managers.insert(mgr); get(); @@ -820,7 +820,7 @@ void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr) void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager *mgr) { - RWLock::WLocker wl(lock); + std::unique_lock wl{lock}; if (managers.find(mgr) != managers.end()) { managers.erase(mgr); put(); @@ -855,7 +855,7 @@ bool RGWCoroutinesManagerRegistry::call(std::string_view command, const cmdmap_t& cmdmap, std::string_view format, bufferlist& out) { - RWLock::RLocker rl(lock); + std::shared_lock rl{lock}; stringstream ss; JSONFormatter f; ::encode_json("cr_managers", *this, &f); diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index e8173b3f016..4d33b2718c3 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -47,8 +47,8 @@ class RGWCompletionManager : public RefCountedObject { using NotifierRef = boost::intrusive_ptr; set cns; - Mutex lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("RGWCompletionManager::lock"); + ceph::condition_variable cond; SafeTimer timer; @@ -87,20 +87,20 @@ class RGWAioCompletionNotifier : public RefCountedObject { RGWCompletionManager *completion_mgr; rgw_io_id io_id; void *user_data; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("RGWAioCompletionNotifier"); bool registered; public: RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data); ~RGWAioCompletionNotifier() override { c->release(); - lock.Lock(); + lock.lock(); bool need_unregister = registered; if (registered) { completion_mgr->get(); } registered = false; - lock.Unlock(); + lock.unlock(); if (need_unregister) { completion_mgr->unregister_completion_notifier(this); completion_mgr->put(); @@ -112,7 +112,7 @@ public: } void unregister() { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (!registered) { return; } @@ -120,15 +120,15 @@ public: } void cb() { - lock.Lock(); + lock.lock(); if (!registered) { - lock.Unlock(); + lock.unlock(); put(); return; } completion_mgr->get(); registered = false; - lock.Unlock(); + lock.unlock(); completion_mgr->complete(this, io_id, user_data); completion_mgr->put(); put(); @@ -198,13 +198,14 @@ class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine { struct Status { CephContext *cct; - RWLock lock; + ceph::shared_mutex lock = + ceph::make_shared_mutex("RGWCoroutine::Status::lock"); int max_history; utime_t timestamp; stringstream status; - explicit Status(CephContext *_cct) : cct(_cct), lock("RGWCoroutine::Status::lock"), max_history(MAX_COROUTINE_HISTORY) {} + explicit Status(CephContext *_cct) : cct(_cct), max_history(MAX_COROUTINE_HISTORY) {} deque history; @@ -542,12 +543,13 @@ class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocket CephContext *cct; set managers; - RWLock lock; + ceph::shared_mutex lock = + ceph::make_shared_mutex("RGWCoroutinesRegistry::lock"); string admin_command; public: - explicit RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct), lock("RGWCoroutinesRegistry::lock") {} + explicit RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct) {} ~RGWCoroutinesManagerRegistry() override; void add(RGWCoroutinesManager *mgr); @@ -569,7 +571,8 @@ class RGWCoroutinesManager { std::atomic max_io_id = { 0 }; - RWLock lock; + mutable ceph::shared_mutex lock = + ceph::make_shared_mutex("RGWCoroutinesManager::lock"); RGWIOIDProvider io_id_provider; @@ -585,7 +588,7 @@ protected: void put_completion_notifier(RGWAioCompletionNotifier *cn); public: - RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), lock("RGWCoroutinesManager::lock"), + RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) { completion_mgr = new RGWCompletionManager(cct); if (cr_registry) { diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index f9dceebf8e1..795d74b43d3 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -21,13 +21,13 @@ class RGWAsyncRadosRequest : public RefCountedObject { int retcode; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("RGWAsyncRadosRequest::lock"); protected: virtual int _send_request() = 0; public: - RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) : caller(_caller), notifier(_cn), retcode(0), - lock("RGWAsyncRadosRequest::lock") { + RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) + : caller(_caller), notifier(_cn), retcode(0) { } ~RGWAsyncRadosRequest() override { if (notifier) { @@ -39,7 +39,7 @@ public: get(); retcode = _send_request(); { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (notifier) { notifier->cb(); // drops its own ref notifier = nullptr; @@ -52,7 +52,7 @@ public: void finish() { { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (notifier) { // we won't call notifier->cb() to drop its ref, so drop it here notifier->put(); @@ -674,38 +674,40 @@ public: class RGWAsyncWait : public RGWAsyncRadosRequest { CephContext *cct; - Mutex *lock; - Cond *cond; - utime_t interval; + ceph::mutex *lock; + ceph::condition_variable *cond; + std::chrono::seconds interval; protected: int _send_request() override { - Mutex::Locker l(*lock); - return cond->WaitInterval(*lock, interval); + std::unique_lock l{*lock}; + return (cond->wait_for(l, interval) == std::cv_status::timeout ? + ETIMEDOUT : 0); } public: RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct, - Mutex *_lock, Cond *_cond, int _secs) : RGWAsyncRadosRequest(caller, cn), - cct(_cct), - lock(_lock), cond(_cond), interval(_secs, 0) {} + ceph::mutex *_lock, ceph::condition_variable *_cond, int _secs) + : RGWAsyncRadosRequest(caller, cn), + cct(_cct), + lock(_lock), cond(_cond), interval(_secs) {} void wakeup() { - Mutex::Locker l(*lock); - cond->Signal(); + std::lock_guard l{*lock}; + cond->notify_all(); } }; class RGWWaitCR : public RGWSimpleCoroutine { CephContext *cct; RGWAsyncRadosProcessor *async_rados; - Mutex *lock; - Cond *cond; + ceph::mutex *lock; + ceph::condition_variable *cond; int secs; RGWAsyncWait *req; public: RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct, - Mutex *_lock, Cond *_cond, + ceph::mutex *_lock, ceph::condition_variable *_cond, int _secs) : RGWSimpleCoroutine(_cct), cct(_cct), async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) { } @@ -1207,7 +1209,7 @@ class RGWContinuousLeaseCR : public RGWCoroutine { int interval; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("RGWContinuousLeaseCR"); std::atomic going_down = { false }; bool locked{false}; @@ -1222,18 +1224,18 @@ public: : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), obj(_obj), lock_name(_lock_name), cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)), - interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller) + interval(_interval), caller(_caller) {} int operate() override; bool is_locked() { - Mutex::Locker l(lock); + std::lock_guard l{lock}; return locked; } void set_locked(bool status) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; locked = status; } diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc index 9c2b4f0f5f0..c105aa26e30 100644 --- a/src/rgw/rgw_cr_rest.cc +++ b/src/rgw/rgw_cr_rest.cc @@ -13,7 +13,7 @@ #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw -RGWCRHTTPGetDataCB::RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, RGWHTTPStreamRWRequest *_req) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), req(_req) { +RGWCRHTTPGetDataCB::RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, RGWHTTPStreamRWRequest *_req) : env(_env), cr(_cr), req(_req) { io_id = req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ |RGWHTTPClient::HTTPCLIENT_IO_CONTROL); req->set_in_cb(this); } @@ -28,7 +28,7 @@ int RGWCRHTTPGetDataCB::handle_data(bufferlist& bl, bool *pause) { { uint64_t bl_len = bl.length(); - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (!got_all_extra_data) { uint64_t max = extra_data_len - extra_data.length(); @@ -59,7 +59,7 @@ void RGWCRHTTPGetDataCB::claim_data(bufferlist *dest, uint64_t max) { bool need_to_unpause = false; { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (data.length() == 0) { return; diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h index 48106d7e3a8..7def3446a49 100644 --- a/src/rgw/rgw_cr_rest.h +++ b/src/rgw/rgw_cr_rest.h @@ -386,7 +386,7 @@ public: }; class RGWCRHTTPGetDataCB : public RGWHTTPStreamRWRequest::ReceiveCB { - Mutex lock; + ceph::mutex lock = ceph::make_mutex("RGWCRHTTPGetDataCB"); RGWCoroutinesEnv *env; RGWCoroutine *cr; RGWHTTPStreamRWRequest *req; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index afb839c3fd2..4c81ffe7571 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1126,8 +1126,8 @@ class RGWDataSyncShardCR : public RGWCoroutine { list::iterator log_iter; bool truncated; - Mutex inc_lock; - Cond inc_cond; + ceph::mutex inc_lock = ceph::make_mutex("RGWDataSyncShardCR::inc_lock"); + ceph::condition_variable inc_cond; boost::asio::coroutine incremental_cr; boost::asio::coroutine full_cr; @@ -1175,7 +1175,7 @@ public: pool(_pool), shard_id(_shard_id), sync_marker(_marker), - marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"), + marker_tracker(NULL), truncated(false), total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL), lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES), retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT), tn(_tn) { @@ -1195,7 +1195,7 @@ public: } void append_modified_shards(set& keys) { - Mutex::Locker l(inc_lock); + std::lock_guard l{inc_lock}; modified_shards.insert(keys.begin(), keys.end()); } @@ -1371,9 +1371,9 @@ public: return set_cr_error(-ECANCELED); } current_modified.clear(); - inc_lock.Lock(); + inc_lock.lock(); current_modified.swap(modified_shards); - inc_lock.Unlock(); + inc_lock.unlock(); if (current_modified.size() > 0) { tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */ @@ -1543,7 +1543,7 @@ public: } void append_modified_shards(set& keys) { - Mutex::Locker l(cr_lock()); + std::lock_guard l{cr_lock()}; RGWDataSyncShardCR *cr = static_cast(get_cr()); if (!cr) { @@ -1562,7 +1562,8 @@ class RGWDataSyncCR : public RGWCoroutine { RGWDataSyncShardMarkerTrack *marker_tracker; - Mutex shard_crs_lock; + ceph::mutex shard_crs_lock = + ceph::make_mutex("RGWDataSyncCR::shard_crs_lock"); map shard_crs; bool *reset_backoff; @@ -1575,7 +1576,6 @@ public: sync_env(_sync_env), num_shards(_num_shards), marker_tracker(NULL), - shard_crs_lock("RGWDataSyncCR::shard_crs_lock"), reset_backoff(_reset_backoff), tn(_tn) { } @@ -1654,9 +1654,9 @@ public: RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->svc.zone->get_zone_params().log_pool, iter->first, iter->second, tn); cr->get(); - shard_crs_lock.Lock(); + shard_crs_lock.lock(); shard_crs[iter->first] = cr; - shard_crs_lock.Unlock(); + shard_crs_lock.unlock(); spawn(cr, true); } } @@ -1675,7 +1675,7 @@ public: } void wakeup(int shard_id, set& keys) { - Mutex::Locker l(shard_crs_lock); + std::lock_guard l{shard_crs_lock}; map::iterator iter = shard_crs.find(shard_id); if (iter == shard_crs.end()) { return; @@ -1835,17 +1835,17 @@ public: } void wakeup(int shard_id, set& keys) { - Mutex& m = cr_lock(); + ceph::mutex& m = cr_lock(); - m.Lock(); + m.lock(); RGWDataSyncCR *cr = static_cast(get_cr()); if (!cr) { - m.Unlock(); + m.unlock(); return; } cr->get(); - m.Unlock(); + m.unlock(); if (cr) { tn->log(20, SSTR("notify shard=" << shard_id << " keys=" << keys)); @@ -1857,7 +1857,7 @@ public: }; void RGWRemoteDataLog::wakeup(int shard_id, set& keys) { - RWLock::RLocker rl(lock); + std::shared_lock rl{lock}; if (!data_sync_cr) { return; } @@ -1866,14 +1866,14 @@ void RGWRemoteDataLog::wakeup(int shard_id, set& keys) { int RGWRemoteDataLog::run_sync(int num_shards) { - lock.get_write(); + lock.lock(); data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards, tn); data_sync_cr->get(); // run() will drop a ref, so take another lock.unlock(); int r = run(data_sync_cr); - lock.get_write(); + lock.lock(); data_sync_cr->put(); data_sync_cr = NULL; lock.unlock(); diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 440ef153cb2..29bc68a3cda 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -278,7 +278,7 @@ class RGWRemoteDataLog : public RGWCoroutinesManager { RGWDataSyncEnv sync_env; - RWLock lock; + ceph::shared_mutex lock = ceph::make_shared_mutex("RGWRemoteDataLog::lock"); RGWDataSyncControlCR *data_sync_cr; RGWSyncTraceNodeRef tn; @@ -291,7 +291,7 @@ public: : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), dpp(dpp), store(_store), async_rados(async_rados), http_manager(store->ctx(), completion_mgr), - lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL), + data_sync_cr(NULL), initialized(false) {} int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& module, diff --git a/src/rgw/rgw_gc.cc b/src/rgw/rgw_gc.cc index 687b6a7c358..775ab0c3643 100644 --- a/src/rgw/rgw_gc.cc +++ b/src/rgw/rgw_gc.cc @@ -489,9 +489,8 @@ void *RGWGC::GCWorker::entry() { secs -= end.sec(); - lock.Lock(); - cond.WaitInterval(lock, utime_t(secs, 0)); - lock.Unlock(); + std::unique_lock locker{lock}; + cond.wait_for(locker, std::chrono::seconds(secs)); } while (!gc->going_down()); return NULL; @@ -499,6 +498,6 @@ void *RGWGC::GCWorker::entry() { void RGWGC::GCWorker::stop() { - Mutex::Locker l(lock); - cond.Signal(); + std::lock_guard l{lock}; + cond.notify_all(); } diff --git a/src/rgw/rgw_gc.h b/src/rgw/rgw_gc.h index f8f24e97e5d..73423b26880 100644 --- a/src/rgw/rgw_gc.h +++ b/src/rgw/rgw_gc.h @@ -7,7 +7,7 @@ #include "include/types.h" #include "include/rados/librados.hpp" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "common/Cond.h" #include "common/Thread.h" #include "rgw_common.h" @@ -31,11 +31,11 @@ class RGWGC : public DoutPrefixProvider { const DoutPrefixProvider *dpp; CephContext *cct; RGWGC *gc; - Mutex lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("GCWorker"); + ceph::condition_variable cond; public: - GCWorker(const DoutPrefixProvider *_dpp, CephContext *_cct, RGWGC *_gc) : dpp(_dpp), cct(_cct), gc(_gc), lock("GCWorker") {} + GCWorker(const DoutPrefixProvider *_dpp, CephContext *_cct, RGWGC *_gc) : dpp(_dpp), cct(_cct), gc(_gc) {} void *entry() override; void stop(); }; diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index b382990a40e..01b36f0ca87 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -45,14 +45,14 @@ struct rgw_http_req_data : public RefCountedObject { bool write_paused{false}; bool read_paused{false}; - Mutex lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("rgw_http_req_data::lock"); + ceph::condition_variable cond; using Signature = void(boost::system::error_code); using Completion = ceph::async::Completion; std::unique_ptr completion; - rgw_http_req_data() : id(-1), lock("rgw_http_req_data::lock") { + rgw_http_req_data() : id(-1) { memset(error_buf, 0, sizeof(error_buf)); } @@ -65,7 +65,7 @@ struct rgw_http_req_data : public RefCountedObject { } int wait(optional_yield y) { - Mutex::Locker l(lock); + std::unique_lock l{lock}; if (done) { return ret; } @@ -82,14 +82,14 @@ struct rgw_http_req_data : public RefCountedObject { dout(20) << "WARNING: blocking http request" << dendl; } #endif - cond.Wait(lock); + cond.wait(l); return ret; } void set_state(int bitmask); void finish(int r) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; ret = r; if (curl_handle) do_curl_easy_cleanup(curl_handle); @@ -104,7 +104,7 @@ struct rgw_http_req_data : public RefCountedObject { boost::system::error_code ec(-ret, boost::system::system_category()); Completion::post(std::move(completion), ec); } else { - cond.Signal(); + cond.notify_all(); } } @@ -113,17 +113,17 @@ struct rgw_http_req_data : public RefCountedObject { } bool is_done() { - Mutex::Locker l(lock); + std::lock_guard l{lock}; return done; } int get_retcode() { - Mutex::Locker l(lock); + std::lock_guard l{lock}; return ret; } RGWHTTPManager *get_manager() { - Mutex::Locker l(lock); + std::lock_guard l{lock}; return mgr; } @@ -154,13 +154,12 @@ void rgw_http_req_data::set_state(int bitmask) { #define MAXIDLE 5 class RGWCurlHandles : public Thread { public: - Mutex cleaner_lock; - std::vectorsaved_curl; + ceph::mutex cleaner_lock = ceph::make_mutex("RGWCurlHandles::cleaner_lock"); + std::vector saved_curl; int cleaner_shutdown; - Cond cleaner_cond; + ceph::condition_variable cleaner_cond; RGWCurlHandles() : - cleaner_lock{"RGWCurlHandles::cleaner_lock"}, cleaner_shutdown{0} { } @@ -176,7 +175,7 @@ RGWCurlHandle* RGWCurlHandles::get_curl_handle() { RGWCurlHandle* curl = 0; CURL* h; { - Mutex::Locker lock(cleaner_lock); + std::lock_guard lock{cleaner_lock}; if (!saved_curl.empty()) { curl = *saved_curl.begin(); saved_curl.erase(saved_curl.begin()); @@ -203,7 +202,7 @@ void RGWCurlHandles::release_curl_handle(RGWCurlHandle* curl) release_curl_handle_now(curl); } else { curl_easy_reset(**curl); - Mutex::Locker lock(cleaner_lock); + std::lock_guard lock{cleaner_lock}; curl->lastuse = mono_clock::now(); saved_curl.insert(saved_curl.begin(), 1, curl); } @@ -212,15 +211,14 @@ void RGWCurlHandles::release_curl_handle(RGWCurlHandle* curl) void* RGWCurlHandles::entry() { RGWCurlHandle* curl; - Mutex::Locker lock(cleaner_lock); + std::unique_lock lock{cleaner_lock}; for (;;) { if (cleaner_shutdown) { if (saved_curl.empty()) break; } else { - utime_t release = ceph_clock_now() + utime_t(MAXIDLE,0); - cleaner_cond.WaitUntil(cleaner_lock, release); + cleaner_cond.wait_for(lock, std::chrono::seconds(MAXIDLE)); } mono_time now = mono_clock::now(); while (!saved_curl.empty()) { @@ -238,9 +236,9 @@ void* RGWCurlHandles::entry() void RGWCurlHandles::stop() { - Mutex::Locker lock(cleaner_lock); + std::lock_guard lock{cleaner_lock}; cleaner_shutdown = 1; - cleaner_cond.Signal(); + cleaner_cond.notify_all(); } void RGWCurlHandles::flush_curl_handles() @@ -304,7 +302,7 @@ size_t RGWHTTPClient::receive_http_header(void * const ptr, rgw_http_req_data *req_data = static_cast(_info); size_t len = size * nmemb; - Mutex::Locker l(req_data->lock); + std::lock_guard l{req_data->lock}; if (!req_data->registered) { return len; @@ -331,7 +329,7 @@ size_t RGWHTTPClient::receive_http_data(void * const ptr, RGWHTTPClient *client; { - Mutex::Locker l(req_data->lock); + std::lock_guard l{req_data->lock}; if (!req_data->registered) { return len; } @@ -354,7 +352,7 @@ size_t RGWHTTPClient::receive_http_data(void * const ptr, if (pause) { dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl; skip_bytes = len; - Mutex::Locker l(req_data->lock); + std::lock_guard l{req_data->lock}; req_data->read_paused = true; return CURL_WRITEFUNC_PAUSE; } @@ -374,7 +372,7 @@ size_t RGWHTTPClient::send_http_data(void * const ptr, RGWHTTPClient *client; { - Mutex::Locker l(req_data->lock); + std::lock_guard l{req_data->lock}; if (!req_data->registered) { return 0; @@ -392,7 +390,7 @@ size_t RGWHTTPClient::send_http_data(void * const ptr, if (ret == 0 && pause) { - Mutex::Locker l(req_data->lock); + std::lock_guard l{req_data->lock}; req_data->write_paused = true; return CURL_READFUNC_PAUSE; } @@ -400,14 +398,14 @@ size_t RGWHTTPClient::send_http_data(void * const ptr, return ret; } -Mutex& RGWHTTPClient::get_req_lock() +ceph::mutex& RGWHTTPClient::get_req_lock() { return req_data->lock; } void RGWHTTPClient::_set_write_paused(bool pause) { - ceph_assert(req_data->lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(req_data->lock)); RGWHTTPManager *mgr = req_data->mgr; if (pause == req_data->write_paused) { @@ -422,7 +420,7 @@ void RGWHTTPClient::_set_write_paused(bool pause) void RGWHTTPClient::_set_read_paused(bool pause) { - ceph_assert(req_data->lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(req_data->lock)); RGWHTTPManager *mgr = req_data->mgr; if (pause == req_data->read_paused) { @@ -799,9 +797,7 @@ void *RGWHTTPManager::ReqsThread::entry() * RGWHTTPManager has two modes of operation: threaded and non-threaded. */ RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct), - completion_mgr(_cm), is_started(false), - reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0), - reqs_thread(NULL) + completion_mgr(_cm) { multi_handle = (void *)curl_multi_init(); thread_pipe[0] = -1; @@ -816,7 +812,7 @@ RGWHTTPManager::~RGWHTTPManager() { void RGWHTTPManager::register_request(rgw_http_req_data *req_data) { - RWLock::WLocker rl(reqs_lock); + std::unique_lock rl{reqs_lock}; req_data->id = num_reqs; req_data->registered = true; reqs[num_reqs] = req_data; @@ -826,7 +822,7 @@ void RGWHTTPManager::register_request(rgw_http_req_data *req_data) bool RGWHTTPManager::unregister_request(rgw_http_req_data *req_data) { - RWLock::WLocker rl(reqs_lock); + std::unique_lock rl{reqs_lock}; if (!req_data->registered) { return false; } @@ -839,7 +835,7 @@ bool RGWHTTPManager::unregister_request(rgw_http_req_data *req_data) void RGWHTTPManager::complete_request(rgw_http_req_data *req_data) { - RWLock::WLocker rl(reqs_lock); + std::unique_lock rl{reqs_lock}; _complete_request(req_data); } @@ -850,7 +846,7 @@ void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data) reqs.erase(iter); } { - Mutex::Locker l(req_data->lock); + std::lock_guard l{req_data->lock}; req_data->mgr = nullptr; } if (completion_mgr) { @@ -906,22 +902,22 @@ void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data) void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data) { - RWLock::WLocker wl(reqs_lock); + std::unique_lock wl{reqs_lock}; _unlink_request(req_data); } void RGWHTTPManager::manage_pending_requests() { - reqs_lock.get_read(); + reqs_lock.lock_shared(); if (max_threaded_req == num_reqs && unregistered_reqs.empty() && reqs_change_state.empty()) { - reqs_lock.unlock(); + reqs_lock.unlock_shared(); return; } - reqs_lock.unlock(); + reqs_lock.unlock_shared(); - RWLock::WLocker wl(reqs_lock); + std::unique_lock wl{reqs_lock}; if (!unregistered_reqs.empty()) { for (auto& r : unregistered_reqs) { @@ -1019,7 +1015,7 @@ int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetSt { rgw_http_req_data *req_data = client->get_req_data(); - ceph_assert(req_data->lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(req_data->lock)); /* can only do that if threaded */ if (!is_started) { @@ -1195,7 +1191,7 @@ void *RGWHTTPManager::reqs_thread_entry() } - RWLock::WLocker rl(reqs_lock); + std::unique_lock rl{reqs_lock}; for (auto r : unregistered_reqs) { _unlink_request(r); } diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h index 35c11e1fd4e..0e48218a7b7 100644 --- a/src/rgw/rgw_http_client.h +++ b/src/rgw/rgw_http_client.h @@ -131,7 +131,7 @@ protected: size_t nmemb, void *_info); - Mutex& get_req_lock(); + ceph::mutex& get_req_lock(); /* needs to be called under req_lock() */ void _set_write_paused(bool pause); @@ -305,17 +305,17 @@ class RGWHTTPManager { CephContext *cct; RGWCompletionManager *completion_mgr; void *multi_handle; - bool is_started; + bool is_started = false; std::atomic going_down { 0 }; std::atomic is_stopped { 0 }; - RWLock reqs_lock; + ceph::shared_mutex reqs_lock = ceph::make_shared_mutex("RGWHTTPManager::reqs_lock"); map reqs; list unregistered_reqs; list reqs_change_state; map complete_reqs; - int64_t num_reqs; - int64_t max_threaded_req; + int64_t num_reqs = 0; + int64_t max_threaded_req = 0; int thread_pipe[2]; void register_request(rgw_http_req_data *req_data); @@ -339,7 +339,7 @@ class RGWHTTPManager { void *entry() override; }; - ReqsThread *reqs_thread; + ReqsThread *reqs_thread = nullptr; void *reqs_thread_entry(); diff --git a/src/rgw/rgw_keystone.cc b/src/rgw/rgw_keystone.cc index 45b728c7393..3fc68488d93 100644 --- a/src/rgw/rgw_keystone.cc +++ b/src/rgw/rgw_keystone.cc @@ -370,14 +370,14 @@ int TokenEnvelope::parse(CephContext* const cct, bool TokenCache::find(const std::string& token_id, rgw::keystone::TokenEnvelope& token) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; return find_locked(token_id, token); } bool TokenCache::find_locked(const std::string& token_id, rgw::keystone::TokenEnvelope& token) { - ceph_assert(lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(lock)); map::iterator iter = tokens.find(token_id); if (iter == tokens.end()) { if (perfcounter) perfcounter->inc(l_rgw_keystone_token_cache_miss); @@ -404,14 +404,14 @@ bool TokenCache::find_locked(const std::string& token_id, bool TokenCache::find_admin(rgw::keystone::TokenEnvelope& token) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; return find_locked(admin_token_id, token); } bool TokenCache::find_barbican(rgw::keystone::TokenEnvelope& token) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; return find_locked(barbican_token_id, token); } @@ -419,14 +419,14 @@ bool TokenCache::find_barbican(rgw::keystone::TokenEnvelope& token) void TokenCache::add(const std::string& token_id, const rgw::keystone::TokenEnvelope& token) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; add_locked(token_id, token); } void TokenCache::add_locked(const std::string& token_id, const rgw::keystone::TokenEnvelope& token) { - ceph_assert(lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(lock)); map::iterator iter = tokens.find(token_id); if (iter != tokens.end()) { token_entry& e = iter->second; @@ -449,7 +449,7 @@ void TokenCache::add_locked(const std::string& token_id, void TokenCache::add_admin(const rgw::keystone::TokenEnvelope& token) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; rgw_get_token_id(token.token.id, admin_token_id); add_locked(admin_token_id, token); @@ -457,7 +457,7 @@ void TokenCache::add_admin(const rgw::keystone::TokenEnvelope& token) void TokenCache::add_barbican(const rgw::keystone::TokenEnvelope& token) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; rgw_get_token_id(token.token.id, barbican_token_id); add_locked(barbican_token_id, token); @@ -465,7 +465,7 @@ void TokenCache::add_barbican(const rgw::keystone::TokenEnvelope& token) void TokenCache::invalidate(const std::string& token_id) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; map::iterator iter = tokens.find(token_id); if (iter == tokens.end()) return; diff --git a/src/rgw/rgw_keystone.h b/src/rgw/rgw_keystone.h index 128a6229c3e..67432b34a93 100644 --- a/src/rgw/rgw_keystone.h +++ b/src/rgw/rgw_keystone.h @@ -11,7 +11,7 @@ #include "rgw_common.h" #include "rgw_http_client.h" -#include "common/Cond.h" +#include "common/ceph_mutex.h" #include "global/global_init.h" #include @@ -217,13 +217,12 @@ class TokenCache { std::map tokens; std::list tokens_lru; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("rgw::keystone::TokenCache"); const size_t max; explicit TokenCache(const rgw::keystone::Config& config) : cct(g_ceph_context), - lock("rgw::keystone::TokenCache"), max(cct->_conf->rgw_keystone_token_cache_size) { } diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index e0dd3a5bd57..eb4721c0643 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -211,9 +211,8 @@ void *RGWLC::LCWorker::entry() { ldpp_dout(dpp, 5) << "schedule life cycle next start time: " << rgw_to_asctime(next) << dendl; - lock.Lock(); - cond.WaitInterval(lock, utime_t(secs, 0)); - lock.Unlock(); + std::unique_lock l{lock}; + cond.wait_for(l, std::chrono::seconds(secs)); } while (!lc->going_down()); return NULL; @@ -1305,8 +1304,8 @@ std::ostream& RGWLC::gen_prefix(std::ostream& out) const void RGWLC::LCWorker::stop() { - Mutex::Locker l(lock); - cond.Signal(); + std::lock_guard l{lock}; + cond.notify_all(); } bool RGWLC::going_down() diff --git a/src/rgw/rgw_lc.h b/src/rgw/rgw_lc.h index b1f05500524..adf56811b95 100644 --- a/src/rgw/rgw_lc.h +++ b/src/rgw/rgw_lc.h @@ -12,7 +12,7 @@ #include "include/types.h" #include "include/rados/librados.hpp" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "common/Cond.h" #include "common/iso_8601.h" #include "common/Thread.h" @@ -462,11 +462,11 @@ class RGWLC : public DoutPrefixProvider { const DoutPrefixProvider *dpp; CephContext *cct; RGWLC *lc; - Mutex lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("LCWorker"); + ceph::condition_variable cond; public: - LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWLC *_lc) : dpp(_dpp), cct(_cct), lc(_lc), lock("LCWorker") {} + LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWLC *_lc) : dpp(_dpp), cct(_cct), lc(_lc) {} void *entry() override; void stop(); bool should_work(utime_t& now); diff --git a/src/rgw/rgw_log.cc b/src/rgw/rgw_log.cc index 74b192b91ca..2b0b59eb6fb 100644 --- a/src/rgw/rgw_log.cc +++ b/src/rgw/rgw_log.cc @@ -91,9 +91,9 @@ class UsageLogger { CephContext *cct; RGWRados *store; map usage_map; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("UsageLogger"); int32_t num_entries; - Mutex timer_lock; + ceph::mutex timer_lock = ceph::make_mutex("UsageLogger::timer_lock"); SafeTimer timer; utime_t round_timestamp; @@ -112,16 +112,16 @@ class UsageLogger { } public: - UsageLogger(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("UsageLogger"), num_entries(0), timer_lock("UsageLogger::timer_lock"), timer(cct, timer_lock) { + UsageLogger(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), num_entries(0), timer(cct, timer_lock) { timer.init(); - Mutex::Locker l(timer_lock); + std::lock_guard l{timer_lock}; set_timer(); utime_t ts = ceph_clock_now(); recalc_round_timestamp(ts); } ~UsageLogger() { - Mutex::Locker l(timer_lock); + std::lock_guard l{timer_lock}; flush(); timer.cancel_all_events(); timer.shutdown(); @@ -132,7 +132,7 @@ public: } void insert_user(utime_t& timestamp, const rgw_user& user, rgw_usage_log_entry& entry) { - lock.Lock(); + lock.lock(); if (timestamp.sec() > round_timestamp + 3600) recalc_round_timestamp(timestamp); entry.epoch = round_timestamp.sec(); @@ -144,9 +144,9 @@ public: if (account) num_entries++; bool need_flush = (num_entries > cct->_conf->rgw_usage_log_flush_threshold); - lock.Unlock(); + lock.unlock(); if (need_flush) { - Mutex::Locker l(timer_lock); + std::lock_guard l{timer_lock}; flush(); } } @@ -161,10 +161,10 @@ public: void flush() { map old_map; - lock.Lock(); + lock.lock(); old_map.swap(usage_map); num_entries = 0; - lock.Unlock(); + lock.unlock(); store->log_usage(old_map); } @@ -290,7 +290,7 @@ void OpsLogSocket::init_connection(bufferlist& bl) bl.append("["); } -OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog), lock("OpsLogSocket") +OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog) { formatter = new JSONFormatter; delim.append(",\n"); @@ -305,10 +305,10 @@ void OpsLogSocket::log(struct rgw_log_entry& entry) { bufferlist bl; - lock.Lock(); + lock.lock(); rgw_format_ops_log_entry(entry, formatter); formatter_to_bl(bl); - lock.Unlock(); + lock.unlock(); append_output(bl); } diff --git a/src/rgw/rgw_log.h b/src/rgw/rgw_log.h index 9614624d6a9..5cd105d7e01 100644 --- a/src/rgw/rgw_log.h +++ b/src/rgw/rgw_log.h @@ -117,7 +117,7 @@ WRITE_CLASS_ENCODER(rgw_log_entry) class OpsLogSocket : public OutputDataSocket { Formatter *formatter; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("OpsLogSocket"); void formatter_to_bl(bufferlist& bl); diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index 374d8aef2dd..b907bf09933 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -272,12 +272,12 @@ int main(int argc, const char **argv) if (g_conf()->daemonize) { global_init_daemonize(g_ceph_context); } - Mutex mutex("main"); + ceph::mutex mutex = ceph::make_mutex("main"); SafeTimer init_timer(g_ceph_context, mutex); init_timer.init(); - mutex.Lock(); + mutex.lock(); init_timer.add_event_after(g_conf()->rgw_init_timeout, new C_InitTimeout); - mutex.Unlock(); + mutex.unlock(); common_init_finish(g_ceph_context); @@ -310,10 +310,10 @@ int main(int argc, const char **argv) g_conf().get_val("rgw_dynamic_resharding"), g_conf()->rgw_cache_enabled); if (!store) { - mutex.Lock(); + mutex.lock(); init_timer.cancel_all_events(); init_timer.shutdown(); - mutex.Unlock(); + mutex.unlock(); derr << "Couldn't init storage provider (RADOS)" << dendl; return EIO; @@ -326,10 +326,10 @@ int main(int argc, const char **argv) rgw_rest_init(g_ceph_context, store, store->svc.zone->get_zonegroup()); - mutex.Lock(); + mutex.lock(); init_timer.cancel_all_events(); init_timer.shutdown(); - mutex.Unlock(); + mutex.unlock(); rgw_user_init(store); rgw_bucket_init(store->meta_mgr); diff --git a/src/rgw/rgw_metadata.cc b/src/rgw/rgw_metadata.cc index 30968be3020..bcfea6c39a5 100644 --- a/src/rgw/rgw_metadata.cc +++ b/src/rgw/rgw_metadata.cc @@ -259,13 +259,13 @@ void RGWMetadataLog::mark_modified(int shard_id) } lock.unlock(); - RWLock::WLocker wl(lock); + std::unique_lock wl{lock}; modified_shards.insert(shard_id); } void RGWMetadataLog::read_clear_modified(set &modified) { - RWLock::WLocker wl(lock); + std::unique_lock wl{lock}; modified.swap(modified_shards); modified_shards.clear(); } diff --git a/src/rgw/rgw_object_expirer_core.cc b/src/rgw/rgw_object_expirer_core.cc index 93d240f6e84..546c61ff947 100644 --- a/src/rgw/rgw_object_expirer_core.cc +++ b/src/rgw/rgw_object_expirer_core.cc @@ -278,9 +278,8 @@ void *RGWObjectExpirer::OEWorker::entry() { secs -= end.sec(); - lock.Lock(); - cond.WaitInterval(lock, utime_t(secs, 0)); - lock.Unlock(); + std::unique_lock l{lock}; + cond.wait_for(l, std::chrono::seconds(secs)); } while (!oe->going_down()); return NULL; @@ -288,7 +287,7 @@ void *RGWObjectExpirer::OEWorker::entry() { void RGWObjectExpirer::OEWorker::stop() { - Mutex::Locker l(lock); - cond.Signal(); + std::lock_guard l{lock}; + cond.notify_all(); } diff --git a/src/rgw/rgw_object_expirer_core.h b/src/rgw/rgw_object_expirer_core.h index c3caff5cc51..248c7c14d4b 100644 --- a/src/rgw/rgw_object_expirer_core.h +++ b/src/rgw/rgw_object_expirer_core.h @@ -19,7 +19,7 @@ #include "common/Formatter.h" #include "common/errno.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "common/Cond.h" #include "common/Thread.h" @@ -49,15 +49,14 @@ protected: class OEWorker : public Thread { CephContext *cct; RGWObjectExpirer *oe; - Mutex lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("OEWorker"); + ceph::condition_variable cond; public: OEWorker(CephContext * const cct, RGWObjectExpirer * const oe) : cct(cct), - oe(oe), - lock("OEWorker") { + oe(oe) { } void *entry() override; diff --git a/src/rgw/rgw_quota.cc b/src/rgw/rgw_quota.cc index 2f5e64a24e1..e61e0de0b1d 100644 --- a/src/rgw/rgw_quota.cc +++ b/src/rgw/rgw_quota.cc @@ -18,8 +18,7 @@ #include "common/lru_map.h" #include "common/RefCountedObj.h" #include "common/Thread.h" -#include "common/Mutex.h" -#include "common/RWLock.h" +#include "common/ceph_mutex.h" #include "rgw_common.h" #include "rgw_rados.h" @@ -448,11 +447,11 @@ class RGWUserStatsCache : public RGWQuotaCache { CephContext *cct; RGWUserStatsCache *stats; - Mutex lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::BucketsSyncThread"); + ceph::condition_variable cond; public: - BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::BucketsSyncThread") {} + BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {} void *entry() override { ldout(cct, 20) << "BucketsSyncThread: start" << dendl; @@ -474,9 +473,10 @@ class RGWUserStatsCache : public RGWQuotaCache { if (stats->going_down()) break; - lock.Lock(); - cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_bucket_sync_interval, 0)); - lock.Unlock(); + std::unique_lock locker{lock}; + cond.wait_for( + locker, + std::chrono::seconds(cct->_conf->rgw_user_quota_bucket_sync_interval)); } while (!stats->going_down()); ldout(cct, 20) << "BucketsSyncThread: done" << dendl; @@ -484,8 +484,8 @@ class RGWUserStatsCache : public RGWQuotaCache { } void stop() { - Mutex::Locker l(lock); - cond.Signal(); + std::lock_guard l{lock}; + cond.notify_all(); } }; @@ -500,11 +500,11 @@ class RGWUserStatsCache : public RGWQuotaCache { CephContext *cct; RGWUserStatsCache *stats; - Mutex lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::UserSyncThread"); + ceph::condition_variable cond; public: - UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::UserSyncThread") {} + UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {} void *entry() override { ldout(cct, 20) << "UserSyncThread: start" << dendl; @@ -517,9 +517,8 @@ class RGWUserStatsCache : public RGWQuotaCache { if (stats->going_down()) break; - lock.Lock(); - cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_sync_interval, 0)); - lock.Unlock(); + std::unique_lock l{lock}; + cond.wait_for(l, std::chrono::seconds(cct->_conf->rgw_user_quota_sync_interval)); } while (!stats->going_down()); ldout(cct, 20) << "UserSyncThread: done" << dendl; @@ -527,8 +526,8 @@ class RGWUserStatsCache : public RGWQuotaCache { } void stop() { - Mutex::Locker l(lock); - cond.Signal(); + std::lock_guard l{lock}; + cond.notify_all(); } }; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 726564f1313..e0d064045e5 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -626,10 +626,10 @@ void RGWRadosThread::stop() void *RGWRadosThread::Worker::entry() { uint64_t msec = processor->interval_msec(); - utime_t interval = utime_t(msec / 1000, (msec % 1000) * 1000000); + auto interval = std::chrono::milliseconds(msec); do { - utime_t start = ceph_clock_now(); + auto start = ceph::real_clock::now(); int r = processor->process(); if (r < 0) { dout(0) << "ERROR: processor->process() returned error r=" << r << dendl; @@ -638,22 +638,19 @@ void *RGWRadosThread::Worker::entry() { if (processor->going_down()) break; - utime_t end = ceph_clock_now(); - end -= start; + auto end = ceph::real_clock::now() - start; uint64_t cur_msec = processor->interval_msec(); if (cur_msec != msec) { /* was it reconfigured? */ msec = cur_msec; - interval = utime_t(msec / 1000, (msec % 1000) * 1000000); + interval = std::chrono::milliseconds(msec); } if (cur_msec > 0) { if (interval <= end) continue; // next round - utime_t wait_time = interval; - wait_time -= end; - + auto wait_time = interval - end; wait_interval(wait_time); } else { wait(); @@ -897,7 +894,7 @@ public: void RGWRados::wakeup_meta_sync_shards(set& shard_ids) { - Mutex::Locker l(meta_sync_thread_lock); + std::lock_guard l{meta_sync_thread_lock}; if (meta_sync_processor_thread) { meta_sync_processor_thread->wakeup_sync_shards(shard_ids); } @@ -906,7 +903,7 @@ void RGWRados::wakeup_meta_sync_shards(set& shard_ids) void RGWRados::wakeup_data_sync_shards(const string& source_zone, map >& shard_ids) { ldout(ctx(), 20) << __func__ << ": source_zone=" << source_zone << ", shard_ids=" << shard_ids << dendl; - Mutex::Locker l(data_sync_thread_lock); + std::lock_guard l{data_sync_thread_lock}; map::iterator iter = data_sync_processor_threads.find(source_zone); if (iter == data_sync_processor_threads.end()) { ldout(ctx(), 10) << __func__ << ": couldn't find sync thread for zone " << source_zone << ", skipping async data sync processing" << dendl; @@ -920,7 +917,7 @@ void RGWRados::wakeup_data_sync_shards(const string& source_zone, mapget_manager(); } @@ -929,7 +926,7 @@ RGWMetaSyncStatusManager* RGWRados::get_meta_sync_manager() RGWDataSyncStatusManager* RGWRados::get_data_sync_manager(const std::string& source_zone) { - Mutex::Locker l(data_sync_thread_lock); + std::lock_guard l{data_sync_thread_lock}; auto thread = data_sync_processor_threads.find(source_zone); if (thread == data_sync_processor_threads.end()) { return nullptr; @@ -1023,7 +1020,7 @@ int RGWRados::get_max_chunk_size(const rgw_placement_rule& placement_rule, const class RGWIndexCompletionManager; struct complete_op_data { - Mutex lock{"complete_op_data"}; + ceph::mutex lock = ceph::make_mutex("complete_op_data"); AioCompletion *rados_completion{nullptr}; int manager_shard_id{-1}; RGWIndexCompletionManager *manager{nullptr}; @@ -1041,7 +1038,7 @@ struct complete_op_data { bool stopped{false}; void stop() { - Mutex::Locker l(lock); + std::lock_guard l{lock}; stopped = true; } }; @@ -1055,16 +1052,17 @@ class RGWIndexCompletionThread : public RGWRadosThread { list completions; - Mutex completions_lock; + ceph::mutex completions_lock = + ceph::make_mutex("RGWIndexCompletionThread::completions_lock"); public: RGWIndexCompletionThread(RGWRados *_store) - : RGWRadosThread(_store, "index-complete"), store(_store), completions_lock("RGWIndexCompletionThread::completions_lock") {} + : RGWRadosThread(_store, "index-complete"), store(_store) {} int process() override; void add_completion(complete_op_data *completion) { { - Mutex::Locker l(completions_lock); + std::lock_guard l{completions_lock}; completions.push_back(completion); } @@ -1077,7 +1075,7 @@ int RGWIndexCompletionThread::process() list comps; { - Mutex::Locker l(completions_lock); + std::lock_guard l{completions_lock}; completions.swap(comps); } @@ -1123,7 +1121,7 @@ int RGWIndexCompletionThread::process() class RGWIndexCompletionManager { RGWRados *store{nullptr}; - vector locks; + ceph::containers::tiny_vector locks; vector > completions; RGWIndexCompletionThread *completion_thread{nullptr}; @@ -1134,23 +1132,20 @@ class RGWIndexCompletionManager { public: - RGWIndexCompletionManager(RGWRados *_store) : store(_store) { + RGWIndexCompletionManager(RGWRados *_store) : + store(_store), + locks{ceph::make_lock_container( + store->ctx()->_conf->rgw_thread_pool_size, + [](const size_t i) { + return ceph::make_mutex("RGWIndexCompletionManager::lock::" + + std::to_string(i)); + })} + { num_shards = store->ctx()->_conf->rgw_thread_pool_size; - - for (int i = 0; i < num_shards; i++) { - char buf[64]; - snprintf(buf, sizeof(buf), "RGWIndexCompletionManager::lock::%d", i); - locks.push_back(new Mutex(buf)); - } - completions.resize(num_shards); } ~RGWIndexCompletionManager() { stop(); - - for (auto l : locks) { - delete l; - } } int next_shard() { @@ -1186,7 +1181,7 @@ public: } for (int i = 0; i < num_shards; ++i) { - Mutex::Locker l(*locks[i]); + std::lock_guard l{locks[i]}; for (auto c : completions[i]) { c->stop(); } @@ -1198,14 +1193,14 @@ public: static void obj_complete_cb(completion_t cb, void *arg) { complete_op_data *completion = (complete_op_data *)arg; - completion->lock.Lock(); + completion->lock.lock(); if (completion->stopped) { - completion->lock.Unlock(); /* can drop lock, no one else is referencing us */ + completion->lock.unlock(); /* can drop lock, no one else is referencing us */ delete completion; return; } bool need_delete = completion->manager->handle_completion(cb, completion); - completion->lock.Unlock(); + completion->lock.unlock(); if (need_delete) { delete completion; } @@ -1253,7 +1248,7 @@ void RGWIndexCompletionManager::create_completion(const rgw_obj& obj, entry->rados_completion = librados::Rados::aio_create_completion(entry, NULL, obj_complete_cb); - Mutex::Locker l(*locks[shard_id]); + std::lock_guard l{locks[shard_id]}; completions[shard_id].insert(entry); } @@ -1261,7 +1256,7 @@ bool RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_d { int shard_id = arg->manager_shard_id; { - Mutex::Locker l(*locks[shard_id]); + std::lock_guard l{locks[shard_id]}; auto& comps = completions[shard_id]; @@ -1285,10 +1280,10 @@ void RGWRados::finalize() { cct->get_admin_socket()->unregister_commands(this); if (run_sync_thread) { - Mutex::Locker l(meta_sync_thread_lock); + std::lock_guard l{meta_sync_thread_lock}; meta_sync_processor_thread->stop(); - Mutex::Locker dl(data_sync_thread_lock); + std::lock_guard dl{data_sync_thread_lock}; for (auto iter : data_sync_processor_threads) { RGWDataSyncProcessorThread *thread = iter.second; thread->stop(); @@ -1303,7 +1298,7 @@ void RGWRados::finalize() if (run_sync_thread) { delete meta_sync_processor_thread; meta_sync_processor_thread = NULL; - Mutex::Locker dl(data_sync_thread_lock); + std::lock_guard dl{data_sync_thread_lock}; for (auto iter : data_sync_processor_threads) { RGWDataSyncProcessorThread *thread = iter.second; delete thread; @@ -1540,7 +1535,7 @@ int RGWRados::init_complete() << pt.second.name << " present in zonegroup" << dendl; } } - Mutex::Locker l(meta_sync_thread_lock); + std::lock_guard l{meta_sync_thread_lock}; meta_sync_processor_thread = new RGWMetaSyncProcessorThread(this, async_rados); ret = meta_sync_processor_thread->init(); if (ret < 0) { @@ -1561,7 +1556,7 @@ int RGWRados::init_complete() } data_log->set_observer(&*bucket_trim); - Mutex::Locker dl(data_sync_thread_lock); + std::lock_guard dl{data_sync_thread_lock}; for (auto source_zone : svc.zone->get_data_sync_source_zones()) { ldout(cct, 5) << "starting data sync thread for zone " << source_zone->name << dendl; auto *thread = new RGWDataSyncProcessorThread(this, async_rados, source_zone); @@ -8036,15 +8031,15 @@ class RGWGetBucketStatsContext : public RGWGetDirHeader_CB { map stats; int ret_code; bool should_cb; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("RGWGetBucketStatsContext"); public: RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb, uint32_t _pendings) - : cb(_cb), pendings(_pendings), stats(), ret_code(0), should_cb(true), - lock("RGWGetBucketStatsContext") {} + : cb(_cb), pendings(_pendings), stats(), ret_code(0), should_cb(true) + {} void handle_response(int r, rgw_bucket_dir_header& header) override { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (should_cb) { if ( r >= 0) { accumulate_raw_stats(header, stats); @@ -8064,7 +8059,7 @@ public: } void unset_cb() { - Mutex::Locker l(lock); + std::lock_guard l{lock}; should_cb = false; } }; @@ -10057,7 +10052,7 @@ uint64_t RGWRados::instance_id() uint64_t RGWRados::next_bucket_id() { - Mutex::Locker l(bucket_id_lock); + std::lock_guard l{bucket_id_lock}; return ++max_bucket_id; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index e7b9a134fa8..4e71ed41aea 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1066,7 +1066,7 @@ class RGWGetUserHeader_CB; class RGWObjectCtx { RGWRados *store; - RWLock lock{"RGWObjectCtx"}; + ceph::shared_mutex lock = ceph::make_shared_mutex("RGWObjectCtx"); void *s{nullptr}; std::map objs_state; @@ -1085,15 +1085,15 @@ public: RGWObjState *get_state(const rgw_obj& obj) { RGWObjState *result; typename std::map::iterator iter; - lock.get_read(); + lock.lock_shared(); assert (!obj.empty()); iter = objs_state.find(obj); if (iter != objs_state.end()) { result = &iter->second; - lock.unlock(); + lock.unlock_shared(); } else { - lock.unlock(); - lock.get_write(); + lock.unlock_shared(); + lock.lock(); result = &objs_state[obj]; lock.unlock(); } @@ -1101,18 +1101,18 @@ public: } void set_atomic(rgw_obj& obj) { - RWLock::WLocker wl(lock); + std::unique_lock wl{lock}; assert (!obj.empty()); objs_state[obj].is_atomic = true; } void set_prefetch_data(const rgw_obj& obj) { - RWLock::WLocker wl(lock); + std::unique_lock wl{lock}; assert (!obj.empty()); objs_state[obj].prefetch_data = true; } void invalidate(const rgw_obj& obj) { - RWLock::WLocker wl(lock); + std::unique_lock wl{lock}; auto iter = objs_state.find(obj); if (iter == objs_state.end()) { return; @@ -1213,7 +1213,7 @@ class RGWRados : public AdminSocketHook void get_bucket_instance_ids(const RGWBucketInfo& bucket_info, int shard_id, map *result); std::atomic max_req_id = { 0 }; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("rados_timer_lock"); SafeTimer *timer; RGWGC *gc; @@ -1236,8 +1236,8 @@ class RGWRados : public AdminSocketHook boost::optional bucket_trim; RGWSyncLogTrimThread *sync_log_trimmer{nullptr}; - Mutex meta_sync_thread_lock; - Mutex data_sync_thread_lock; + ceph::mutex meta_sync_thread_lock = ceph::make_mutex("meta_sync_thread_lock"); + ceph::mutex data_sync_thread_lock = ceph::make_mutex("data_sync_thread_lock"); librados::IoCtx root_pool_ctx; // .rgw @@ -1246,7 +1246,7 @@ class RGWRados : public AdminSocketHook friend class RGWWatcher; - Mutex bucket_id_lock; + ceph::mutex bucket_id_lock = ceph::make_mutex("rados_bucket_id"); // This field represents the number of bucket index object shards uint32_t bucket_index_max_shards; @@ -1299,12 +1299,10 @@ protected: bool use_cache{false}; public: - RGWRados(): lock("rados_timer_lock"), timer(NULL), + RGWRados(): timer(NULL), gc(NULL), lc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false), run_sync_thread(false), run_reshard_thread(false), async_rados(nullptr), meta_notifier(NULL), data_notifier(NULL), meta_sync_processor_thread(NULL), - meta_sync_thread_lock("meta_sync_thread_lock"), data_sync_thread_lock("data_sync_thread_lock"), - bucket_id_lock("rados_bucket_id"), bucket_index_max_shards(0), max_bucket_id(0), cct(NULL), binfo_cache(NULL), obj_tombstone_cache(nullptr), @@ -2581,25 +2579,25 @@ class RGWRadosThread { class Worker : public Thread { CephContext *cct; RGWRadosThread *processor; - Mutex lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("RGWRadosThread::Worker"); + ceph::condition_variable cond; void wait() { - Mutex::Locker l(lock); - cond.Wait(lock); + std::unique_lock l{lock}; + cond.wait(l); }; - void wait_interval(const utime_t& wait_time) { - Mutex::Locker l(lock); - cond.WaitInterval(lock, wait_time); + void wait_interval(const ceph::real_clock::duration& wait_time) { + std::unique_lock l{lock}; + cond.wait_for(l, wait_time); } public: - Worker(CephContext *_cct, RGWRadosThread *_p) : cct(_cct), processor(_p), lock("RGWRadosThread::Worker") {} + Worker(CephContext *_cct, RGWRadosThread *_p) : cct(_cct), processor(_p) {} void *entry() override; void signal() { - Mutex::Locker l(lock); - cond.Signal(); + std::lock_guard l{lock}; + cond.notify_all(); } }; diff --git a/src/rgw/rgw_realm_reloader.cc b/src/rgw/rgw_realm_reloader.cc index 1fd48db0a53..32872427402 100644 --- a/src/rgw/rgw_realm_reloader.cc +++ b/src/rgw/rgw_realm_reloader.cc @@ -31,7 +31,7 @@ RGWRealmReloader::RGWRealmReloader(RGWRados*& store, std::mapctx(), mutex, USE_SAFE_TIMER_CALLBACKS), - mutex("RGWRealmReloader"), + mutex(ceph::make_mutex("RGWRealmReloader")), reload_scheduled(nullptr) { timer.init(); @@ -39,7 +39,7 @@ RGWRealmReloader::RGWRealmReloader(RGWRados*& store, std::mapctx(); - Mutex::Locker lock(mutex); + std::lock_guard lock{mutex}; if (reload_scheduled) { ldout(cct, 4) << "Notification on realm, reconfiguration " "already scheduled" << dendl; @@ -68,7 +68,7 @@ void RGWRealmReloader::handle_notify(RGWRealmNotify type, } reload_scheduled = new C_Reload(this); - cond.SignalOne(); // wake reload() if it blocked on a bad configuration + cond.notify_one(); // wake reload() if it blocked on a bad configuration // schedule reload() without delay timer.add_event_after(0, reload_scheduled); @@ -96,7 +96,7 @@ void RGWRealmReloader::reload() { // allow a new notify to reschedule us. it's important that we do this // before we start loading the new realm, or we could miss some updates - Mutex::Locker lock(mutex); + std::lock_guard lock{mutex}; reload_scheduled = nullptr; } @@ -115,7 +115,7 @@ void RGWRealmReloader::reload() RGWRados* store_cleanup = nullptr; { - Mutex::Locker lock(mutex); + std::unique_lock lock{mutex}; // failure to recreate RGWRados is not a recoverable error, but we // don't want to assert or abort the entire cluster. instead, just @@ -126,9 +126,7 @@ void RGWRealmReloader::reload() "configuration update. Waiting for a new update." << dendl; // sleep until another event is scheduled - while (!reload_scheduled) - cond.Wait(mutex); - + cond.wait(lock, [this] { return reload_scheduled; }); ldout(cct, 1) << "Woke up with a new configuration, retrying " "RGWRados initialization." << dendl; } diff --git a/src/rgw/rgw_realm_reloader.h b/src/rgw/rgw_realm_reloader.h index 1277429e5b3..24d10ae6217 100644 --- a/src/rgw/rgw_realm_reloader.h +++ b/src/rgw/rgw_realm_reloader.h @@ -55,8 +55,8 @@ class RGWRealmReloader : public RGWRealmWatcher::Watcher { /// Finisher because it allows us to cancel events that were scheduled while /// reload() is still running SafeTimer timer; - Mutex mutex; //< protects access to timer and reload_scheduled - Cond cond; //< to signal reload() after an invalid realm config + ceph::mutex mutex; //< protects access to timer and reload_scheduled + ceph::condition_variable cond; //< to signal reload() after an invalid realm config C_Reload* reload_scheduled; //< reload() context if scheduled }; diff --git a/src/rgw/rgw_reshard.cc b/src/rgw/rgw_reshard.cc index f57350866f5..7ca79339524 100644 --- a/src/rgw/rgw_reshard.cc +++ b/src/rgw/rgw_reshard.cc @@ -1132,9 +1132,8 @@ void *RGWReshard::ReshardWorker::entry() { secs -= end.sec(); - lock.Lock(); - cond.WaitInterval(lock, utime_t(secs, 0)); - lock.Unlock(); + std::unique_lock locker{lock}; + cond.wait_for(locker, std::chrono::seconds(secs)); } while (!reshard->going_down()); return NULL; @@ -1142,6 +1141,6 @@ void *RGWReshard::ReshardWorker::entry() { void RGWReshard::ReshardWorker::stop() { - Mutex::Locker l(lock); - cond.Signal(); + std::lock_guard l{lock}; + cond.notify_all(); } diff --git a/src/rgw/rgw_reshard.h b/src/rgw/rgw_reshard.h index 213fc238d2e..66ba2a2760f 100644 --- a/src/rgw/rgw_reshard.h +++ b/src/rgw/rgw_reshard.h @@ -134,15 +134,14 @@ protected: class ReshardWorker : public Thread { CephContext *cct; RGWReshard *reshard; - Mutex lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("ReshardWorker"); + ceph::condition_variable cond; public: ReshardWorker(CephContext * const _cct, - RGWReshard * const _reshard) + RGWReshard * const _reshard) : cct(_cct), - reshard(_reshard), - lock("ReshardWorker") { + reshard(_reshard) { } void *entry() override; diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index e8aa2977944..ee017784e04 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -908,13 +908,13 @@ int RGWHTTPStreamRWRequest::receive_data(void *ptr, size_t len, bool *pause) } void RGWHTTPStreamRWRequest::set_stream_write(bool s) { - Mutex::Locker wl(write_lock); + std::lock_guard wl{write_lock}; stream_writes = s; } void RGWHTTPStreamRWRequest::unpause_receive() { - Mutex::Locker req_locker(get_req_lock()); + std::lock_guard req_locker{get_req_lock()}; if (!read_paused) { _set_read_paused(false); } @@ -922,22 +922,20 @@ void RGWHTTPStreamRWRequest::unpause_receive() void RGWHTTPStreamRWRequest::add_send_data(bufferlist& bl) { - Mutex::Locker req_locker(get_req_lock()); - Mutex::Locker wl(write_lock); + std::scoped_lock locker{get_req_lock(), write_lock}; outbl.claim_append(bl); _set_write_paused(false); } uint64_t RGWHTTPStreamRWRequest::get_pending_send_size() { - Mutex::Locker wl(write_lock); + std::lock_guard wl{write_lock}; return outbl.length(); } void RGWHTTPStreamRWRequest::finish_write() { - Mutex::Locker req_locker(get_req_lock()); - Mutex::Locker wl(write_lock); + std::scoped_lock locker{get_req_lock(), write_lock}; write_stream_complete = true; _set_write_paused(false); } @@ -947,7 +945,7 @@ int RGWHTTPStreamRWRequest::send_data(void *ptr, size_t len, bool *pause) uint64_t out_len; uint64_t send_size; { - Mutex::Locker wl(write_lock); + std::lock_guard wl{write_lock}; if (outbl.length() == 0) { if ((stream_writes && !write_stream_complete) || diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index 8f9b2c16f54..d02e3052bf5 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -100,8 +100,10 @@ public: class ReceiveCB; private: - Mutex lock; - Mutex write_lock; + ceph::mutex lock = + ceph::make_mutex("RGWHTTPStreamRWRequest"); + ceph::mutex write_lock = + ceph::make_mutex("RGWHTTPStreamRWRequest::write_lock"); ReceiveCB *cb{nullptr}; RGWWriteDrainCB *write_drain_cb{nullptr}; bufferlist outbl; @@ -132,12 +134,11 @@ public: }; RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, - param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params), - lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock") { + param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params) { } RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, ReceiveCB *_cb, param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params), - lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock"), cb(_cb) { + cb(_cb) { } virtual ~RGWHTTPStreamRWRequest() override {} diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 1c3ee7d45e9..7f044c4429b 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -92,13 +92,13 @@ int RGWBackoffControlCR::operate() { // retry the operation until it succeeds while (true) { yield { - Mutex::Locker l(lock); + std::lock_guard l{lock}; cr = alloc_cr(); cr->get(); call(cr); } { - Mutex::Locker l(lock); + std::lock_guard l{lock}; cr->put(); cr = NULL; } @@ -341,7 +341,7 @@ int RGWMetaSyncStatusManager::init() shard_objs[i] = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, sync_env.shard_obj_name(i)); } - RWLock::WLocker wl(ts_to_shard_lock); + std::unique_lock wl{ts_to_shard_lock}; for (int i = 0; i < num_shards; i++) { clone_markers.push_back(string()); utime_shard ut; @@ -1417,8 +1417,8 @@ class RGWMetaSyncShardCR : public RGWCoroutine { string raw_key; rgw_mdlog_entry mdlog_entry; - Mutex inc_lock; - Cond inc_cond; + ceph::mutex inc_lock = ceph::make_mutex("RGWMetaSyncShardCR::inc_lock"); + ceph::condition_variable inc_cond; boost::asio::coroutine incremental_cr; boost::asio::coroutine full_cr; @@ -1451,7 +1451,7 @@ public: : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), pool(_pool), period(period), realm_epoch(realm_epoch), mdlog(mdlog), shard_id(_shard_id), sync_marker(_marker), - period_marker(period_marker), inc_lock("RGWMetaSyncShardCR::inc_lock"), + period_marker(period_marker), reset_backoff(_reset_backoff), tn(_tn) { *reset_backoff = false; } diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index 4e342918ab3..8e6ec69ef3b 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -132,7 +132,7 @@ public: class RGWBackoffControlCR : public RGWCoroutine { RGWCoroutine *cr; - Mutex lock; + ceph::mutex lock; RGWSyncBackoff backoff; bool reset_backoff; @@ -144,7 +144,7 @@ protected: return &reset_backoff; } - Mutex& cr_lock() { + ceph::mutex& cr_lock() { return lock; } @@ -153,8 +153,11 @@ protected: } public: - RGWBackoffControlCR(CephContext *_cct, bool _exit_on_error) : RGWCoroutine(_cct), cr(NULL), lock("RGWBackoffControlCR::lock:" + stringify(this)), - reset_backoff(false), exit_on_error(_exit_on_error) { + RGWBackoffControlCR(CephContext *_cct, bool _exit_on_error) + : RGWCoroutine(_cct), + cr(nullptr), + lock(ceph::make_mutex("RGWBackoffControlCR::lock:" + stringify(this))), + reset_backoff(false), exit_on_error(_exit_on_error) { } ~RGWBackoffControlCR() override { @@ -263,14 +266,14 @@ class RGWMetaSyncStatusManager : public DoutPrefixProvider { } }; - RWLock ts_to_shard_lock; + ceph::shared_mutex ts_to_shard_lock = ceph::make_shared_mutex("ts_to_shard_lock"); map ts_to_shard; vector clone_markers; public: RGWMetaSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados) - : store(_store), master_log(this, store, async_rados, this), - ts_to_shard_lock("ts_to_shard_lock") {} + : store(_store), master_log(this, store, async_rados, this) + {} int init(); int read_sync_status(rgw_meta_sync_status *sync_status) { diff --git a/src/rgw/rgw_sync_module.h b/src/rgw/rgw_sync_module.h index aa68934c06b..7006223c2a9 100644 --- a/src/rgw/rgw_sync_module.h +++ b/src/rgw/rgw_sync_module.h @@ -80,14 +80,14 @@ typedef std::shared_ptr RGWSyncModuleRef; class RGWSyncModulesManager { - Mutex lock; + ceph::mutex lock = ceph::make_mutex("RGWSyncModulesManager"); map modules; public: - RGWSyncModulesManager() : lock("RGWSyncModulesManager") {} + RGWSyncModulesManager() = default; void register_module(const string& name, RGWSyncModuleRef& module, bool is_default = false) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; modules[name] = module; if (is_default) { modules[string()] = module; @@ -95,7 +95,7 @@ public: } bool get_module(const string& name, RGWSyncModuleRef *module) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; auto iter = modules.find(name); if (iter == modules.end()) { return false; diff --git a/src/rgw/rgw_sync_trace.h b/src/rgw/rgw_sync_trace.h index d2925cf18a5..2c2e676dbf2 100644 --- a/src/rgw/rgw_sync_trace.h +++ b/src/rgw/rgw_sync_trace.h @@ -6,7 +6,7 @@ #include -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "common/shunique_lock.h" #include "common/admin_socket.h" @@ -41,7 +41,7 @@ class RGWSyncTraceNode final { uint16_t state{0}; std::string status; - Mutex lock{"RGWSyncTraceNode::lock"}; + ceph::mutex lock = ceph::make_mutex("RGWSyncTraceNode::lock"); std::string type; std::string id; diff --git a/src/rgw/services/svc_notify.cc b/src/rgw/services/svc_notify.cc index bf61d137839..7ef4257e9bc 100644 --- a/src/rgw/services/svc_notify.cc +++ b/src/rgw/services/svc_notify.cc @@ -309,7 +309,7 @@ int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle) void RGWSI_Notify::add_watcher(int i) { ldout(cct, 20) << "add_watcher() i=" << i << dendl; - RWLock::WLocker l(watchers_lock); + std::unique_lock l{watchers_lock}; watchers_set.insert(i); if (watchers_set.size() == (size_t)num_watchers) { ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl; @@ -320,7 +320,7 @@ void RGWSI_Notify::add_watcher(int i) void RGWSI_Notify::remove_watcher(int i) { ldout(cct, 20) << "remove_watcher() i=" << i << dendl; - RWLock::WLocker l(watchers_lock); + std::unique_lock l{watchers_lock}; size_t orig_size = watchers_set.size(); watchers_set.erase(i); if (orig_size == (size_t)num_watchers && @@ -335,7 +335,7 @@ int RGWSI_Notify::watch_cb(uint64_t notify_id, uint64_t notifier_id, bufferlist& bl) { - RWLock::RLocker l(watchers_lock); + std::shared_lock l{watchers_lock}; if (cb) { return cb->watch_cb(notify_id, cookie, notifier_id, bl); } @@ -344,7 +344,7 @@ int RGWSI_Notify::watch_cb(uint64_t notify_id, void RGWSI_Notify::set_enabled(bool status) { - RWLock::WLocker l(watchers_lock); + std::unique_lock l{watchers_lock}; _set_enabled(status); } @@ -468,7 +468,7 @@ int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl, void RGWSI_Notify::register_watch_cb(CB *_cb) { - RWLock::WLocker l(watchers_lock); + std::unique_lock l{watchers_lock}; cb = _cb; _set_enabled(enabled); } diff --git a/src/rgw/services/svc_notify.h b/src/rgw/services/svc_notify.h index 85780d36617..9e53b05101b 100644 --- a/src/rgw/services/svc_notify.h +++ b/src/rgw/services/svc_notify.h @@ -28,7 +28,7 @@ private: RGWSI_RADOS *rados_svc{nullptr}; RGWSI_Finisher *finisher_svc{nullptr}; - RWLock watchers_lock{"watchers_lock"}; + ceph::shared_mutex watchers_lock = ceph::make_shared_mutex("watchers_lock"); rgw_pool control_pool; int num_watchers{0}; diff --git a/src/rgw/services/svc_sys_obj_cache.h b/src/rgw/services/svc_sys_obj_cache.h index 69c6e10436c..14ea5bc6b05 100644 --- a/src/rgw/services/svc_sys_obj_cache.h +++ b/src/rgw/services/svc_sys_obj_cache.h @@ -137,7 +137,7 @@ public: } boost::optional find(const string& key) { - RWLock::RLocker rl(lock); + std::shared_lock rl{lock}; auto iter = entries.find(key); if (iter == entries.end()) { return boost::none; @@ -164,7 +164,7 @@ public: void chain_cb(const string& key, void *data) override { T *entry = static_cast(data); - RWLock::WLocker wl(lock); + std::unique_lock wl{lock}; entries[key].first = *entry; if (expiry.count() > 0) { entries[key].second = ceph::coarse_mono_clock::now(); @@ -172,12 +172,12 @@ public: } void invalidate(const string& key) override { - RWLock::WLocker wl(lock); + std::unique_lock wl{lock}; entries.erase(key); } void invalidate_all() override { - RWLock::WLocker wl(lock); + std::unique_lock wl{lock}; entries.clear(); } }; /* RGWChainedCacheImpl */ diff --git a/src/rgw/services/svc_sys_obj_core.h b/src/rgw/services/svc_sys_obj_core.h index 38edf094547..0b6f3f6d3bc 100644 --- a/src/rgw/services/svc_sys_obj_core.h +++ b/src/rgw/services/svc_sys_obj_core.h @@ -52,28 +52,26 @@ struct RGWSysObjState { class RGWSysObjectCtxBase { std::map objs_state; - RWLock lock; + ceph::shared_mutex lock = ceph::make_shared_mutex("RGWSysObjectCtxBase"); public: - explicit RGWSysObjectCtxBase() : lock("RGWSysObjectCtxBase") {} + RGWSysObjectCtxBase() = default; - RGWSysObjectCtxBase(const RGWSysObjectCtxBase& rhs) : objs_state(rhs.objs_state), - lock("RGWSysObjectCtxBase") {} - RGWSysObjectCtxBase(const RGWSysObjectCtxBase&& rhs) : objs_state(std::move(rhs.objs_state)), - lock("RGWSysObjectCtxBase") {} + RGWSysObjectCtxBase(const RGWSysObjectCtxBase& rhs) : objs_state(rhs.objs_state) {} + RGWSysObjectCtxBase(const RGWSysObjectCtxBase&& rhs) : objs_state(std::move(rhs.objs_state)) {} RGWSysObjState *get_state(const rgw_raw_obj& obj) { RGWSysObjState *result; std::map::iterator iter; - lock.get_read(); + lock.lock_shared(); assert (!obj.empty()); iter = objs_state.find(obj); if (iter != objs_state.end()) { result = &iter->second; - lock.unlock(); + lock.unlock_shared(); } else { - lock.unlock(); - lock.get_write(); + lock.unlock_shared(); + lock.lock(); result = &objs_state[obj]; lock.unlock(); } @@ -81,12 +79,12 @@ public: } void set_prefetch_data(rgw_raw_obj& obj) { - RWLock::WLocker wl(lock); + std::unique_lock wl{lock}; assert (!obj.empty()); objs_state[obj].prefetch_data = true; } void invalidate(const rgw_raw_obj& obj) { - RWLock::WLocker wl(lock); + std::unique_lock wl{lock}; auto iter = objs_state.find(obj); if (iter == objs_state.end()) { return; -- 2.39.5