CODE_ENVIRONMENT_DAEMON,
CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
- Mutex mutex("main");
+ ceph::mutex mutex = ceph::make_mutex("main");
SafeTimer init_timer(g_ceph_context, mutex);
init_timer.init();
- mutex.Lock();
+ mutex.lock();
init_timer.add_event_after(g_conf()->rgw_init_timeout, new C_InitTimeout);
- mutex.Unlock();
+ mutex.unlock();
common_init_finish(g_ceph_context);
g_conf().get_val<bool>("rgw_dynamic_resharding"));
if (!store) {
- mutex.Lock();
+ mutex.lock();
init_timer.cancel_all_events();
init_timer.shutdown();
- mutex.Unlock();
+ mutex.unlock();
derr << "Couldn't init storage provider (RADOS)" << dendl;
return -EIO;
rgw_rest_init(g_ceph_context, store, store->svc.zone->get_zonegroup());
- mutex.Lock();
+ mutex.lock();
init_timer.cancel_all_events();
init_timer.shutdown();
- mutex.Unlock();
+ mutex.unlock();
if (r)
return -EIO;
CEPH_ENTITY_TYPE_CLIENT,
CODE_ENVIRONMENT_UTILITY, 0);
- Mutex mutex("main");
+ ceph::mutex mutex = ceph::make_mutex("main");
SafeTimer init_timer(g_ceph_context, mutex);
init_timer.init();
- mutex.Lock();
+ mutex.lock();
init_timer.add_event_after(g_conf()->rgw_init_timeout, new C_InitTimeout);
- mutex.Unlock();
+ mutex.unlock();
common_init_finish(g_ceph_context);
store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false, false);
if (!store) {
- mutex.Lock();
+ mutex.lock();
init_timer.cancel_all_events();
init_timer.shutdown();
- mutex.Unlock();
+ mutex.unlock();
derr << "Couldn't init storage provider (RADOS)" << dendl;
return -EIO;
}
- mutex.Lock();
+ mutex.lock();
init_timer.cancel_all_events();
init_timer.shutdown();
- mutex.Unlock();
+ mutex.unlock();
rgw_user_init(store);
* it later, so we keep two lists under the map */
map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > > m;
- lock.Lock();
+ lock.lock();
map<rgw_bucket_shard, bool> entries;
entries.swap(cur_cycle);
- lock.Unlock();
+ lock.unlock();
map<rgw_bucket_shard, bool>::iterator iter;
string section;
void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
if (!changes.find(bs, status)) {
status = ChangeStatusPtr(new ChangeStatus);
changes.add(bs, status);
void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
cur_cycle[bs] = true;
}
void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, real_time& expiration)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
ChangeStatusPtr status;
_get_change(bs, status);
int index = choose_oid(bs);
mark_modified(index, bs);
- lock.Lock();
+ lock.lock();
ChangeStatusPtr status;
_get_change(bs, status);
- lock.Unlock();
+ lock.unlock();
real_time now = real_clock::now();
- status->lock->Lock();
+ status->lock.lock();
ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
if (now < status->cur_expiration) {
/* no need to send, recently completed */
- status->lock->Unlock();
+ status->lock.unlock();
register_renew(bs);
return 0;
ceph_assert(cond);
status->cond->get();
- status->lock->Unlock();
+ status->lock.unlock();
int ret = cond->wait();
cond->put();
expiration = now;
expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
- status->lock->Unlock();
+ status->lock.unlock();
bufferlist bl;
rgw_data_change change;
now = real_clock::now();
- status->lock->Lock();
+ status->lock.lock();
} while (!ret && real_clock::now() > expiration);
status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */
status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window);
status->cond = NULL;
- status->lock->Unlock();
+ status->lock.unlock();
cond->done(ret);
cond->put();
break;
int interval = cct->_conf->rgw_data_log_window * 3 / 4;
- lock.Lock();
- cond.WaitInterval(lock, utime_t(interval, 0));
- lock.Unlock();
+ std::unique_lock locker{lock};
+ cond.wait_for(locker, std::chrono::seconds(interval));
} while (!log->going_down());
return NULL;
void RGWDataChangesLog::ChangesRenewThread::stop()
{
- Mutex::Locker l(lock);
- cond.Signal();
+ std::lock_guard l{lock};
+ cond.notify_all();
}
void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
{
auto key = bs.get_key();
- modified_lock.get_read();
+ modified_lock.lock_shared();
map<int, set<string> >::iterator iter = modified_shards.find(shard_id);
if (iter != modified_shards.end()) {
set<string>& keys = iter->second;
}
modified_lock.unlock();
- RWLock::WLocker wl(modified_lock);
+ std::unique_lock wl{modified_lock};
modified_shards[shard_id].insert(key);
}
void RGWDataChangesLog::read_clear_modified(map<int, set<string> > &modified)
{
- RWLock::WLocker wl(modified_lock);
+ std::unique_lock wl{modified_lock};
modified.swap(modified_shards);
modified_shards.clear();
}
int num_shards;
string *oids;
- Mutex lock;
- RWLock modified_lock;
+ ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock");
+ ceph::shared_mutex modified_lock =
+ ceph::make_shared_mutex("RGWDataChangesLog::modified_lock");
map<int, set<string> > modified_shards;
std::atomic<bool> down_flag = { false };
struct ChangeStatus {
real_time cur_expiration;
real_time cur_sent;
- bool pending;
- RefCountedCond *cond;
- Mutex *lock;
+ bool pending = false;
+ RefCountedCond *cond = nullptr;
+ ceph::mutex lock =
+ ceph::make_mutex("RGWDataChangesLog::ChangeStatus");
- ChangeStatus() : pending(false), cond(NULL) {
- lock = new Mutex("RGWDataChangesLog::ChangeStatus");
- }
-
- ~ChangeStatus() {
- delete lock;
- }
+ ChangeStatus() = default;
+ ~ChangeStatus() = default;
};
typedef std::shared_ptr<ChangeStatus> ChangeStatusPtr;
class ChangesRenewThread : public Thread {
CephContext *cct;
RGWDataChangesLog *log;
- Mutex lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("ChangesRenewThread::lock");
+ ceph::condition_variable cond;
public:
- ChangesRenewThread(CephContext *_cct, RGWDataChangesLog *_log) : cct(_cct), log(_log), lock("ChangesRenewThread::lock") {}
+ ChangesRenewThread(CephContext *_cct, RGWDataChangesLog *_log) : cct(_cct), log(_log) {}
void *entry() override;
void stop();
};
public:
RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store),
- lock("RGWDataChangesLog::lock"), modified_lock("RGWDataChangesLog::modified_lock"),
changes(cct->_conf->rgw_data_log_changes_size) {
num_shards = cct->_conf->rgw_data_log_num_shards;
int ObjectCache::get(const string& name, ObjectCacheInfo& info, uint32_t mask, rgw_cache_entry_info *cache_info)
{
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
if (!enabled) {
return -ENOENT;
(ceph::coarse_mono_clock::now() - iter->second.info.time_added) > expiry) {
ldout(cct, 10) << "cache get: name=" << name << " : expiry miss" << dendl;
lock.unlock();
- lock.get_write();
+ lock.lock();
// check that wasn't already removed by other thread
iter = cache_map.find(name);
if (iter != cache_map.end()) {
ldout(cct, 20) << "cache get: touching lru, lru_counter=" << lru_counter
<< " promotion_ts=" << entry->lru_promotion_ts << dendl;
lock.unlock();
- lock.get_write(); /* promote lock to writer */
+ lock.lock(); /* promote lock to writer */
/* need to redo this because entry might have dropped off the cache */
iter = cache_map.find(name);
bool ObjectCache::chain_cache_entry(std::initializer_list<rgw_cache_entry_info*> cache_info_entries,
RGWChainedCache::Entry *chained_entry)
{
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
if (!enabled) {
return false;
void ObjectCache::put(const string& name, ObjectCacheInfo& info, rgw_cache_entry_info *cache_info)
{
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
if (!enabled) {
return;
bool ObjectCache::remove(const string& name)
{
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
if (!enabled) {
return false;
void ObjectCache::set_enabled(bool status)
{
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
enabled = status;
void ObjectCache::invalidate_all()
{
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
do_invalidate_all();
}
}
void ObjectCache::chain_cache(RGWChainedCache *cache) {
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
chained_cache.push_back(cache);
}
void ObjectCache::unchain_cache(RGWChainedCache *cache) {
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
auto iter = chained_cache.begin();
for (; iter != chained_cache.end(); ++iter) {
#include "include/types.h"
#include "include/utime.h"
#include "include/ceph_assert.h"
-#include "common/RWLock.h"
+#include "common/ceph_mutex.h"
enum {
UPDATE_OBJ,
unsigned long lru_size;
unsigned long lru_counter;
unsigned long lru_window;
- RWLock lock;
+ ceph::shared_mutex lock = ceph::make_shared_mutex("ObjectCache");
CephContext *cct;
vector<RGWChainedCache *> chained_cache;
void do_invalidate_all();
public:
- ObjectCache() : lru_size(0), lru_counter(0), lru_window(0), lock("ObjectCache"), cct(NULL), enabled(false) { }
+ ObjectCache() : lru_size(0), lru_counter(0), lru_window(0), cct(NULL), enabled(false) { }
~ObjectCache();
int get(const std::string& name, ObjectCacheInfo& bl, uint32_t mask, rgw_cache_entry_info *cache_info);
std::optional<ObjectCacheInfo> get(const std::string& name) {
template<typename F>
void for_each(const F& f) {
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
if (enabled) {
auto now = ceph::coarse_mono_clock::now();
for (const auto& [name, entry] : cache_map) {
int RGWCivetWebFrontend::process(struct mg_connection* const conn)
{
/* Hold a read lock over access to env.store for reconfiguration. */
- RWLock::RLocker lock(env.mutex);
+ std::shared_lock lock{env.mutex};
RGWCivetWeb cw_client(conn);
auto real_client_io = rgw::io::add_reordering(
}
};
-RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct), lock("RGWCompletionManager::lock"),
+RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct),
timer(cct, lock)
{
timer.init();
RGWCompletionManager::~RGWCompletionManager()
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
timer.cancel_all_events();
timer.shutdown();
}
void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
_complete(cn, io_id, user_info);
}
void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (cn) {
cns.insert(cn);
}
void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (cn) {
cns.erase(cn);
}
return;
}
complete_reqs.push_back(io_completion{io_id, user_info});
- cond.Signal();
+ cond.notify_all();
}
int RGWCompletionManager::get_next(io_completion *io)
{
- Mutex::Locker l(lock);
+ std::unique_lock l{lock};
while (complete_reqs.empty()) {
if (going_down) {
return -ECANCELED;
}
- cond.Wait(lock);
+ cond.wait(l);
}
*io = complete_reqs.front();
complete_reqs_set.erase(io->io_id);
bool RGWCompletionManager::try_get_next(io_completion *io)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (complete_reqs.empty()) {
return false;
}
void RGWCompletionManager::go_down()
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
for (auto cn : cns) {
cn->unregister();
}
going_down = true;
- cond.Signal();
+ cond.notify_all();
}
void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval, void *user_info)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
ceph_assert(waiters.find(opaque) == waiters.end());
waiters[opaque] = user_info;
timer.add_event_after(interval, new WaitContext(this, opaque));
void RGWCompletionManager::wakeup(void *opaque)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
_wakeup(opaque);
}
stringstream& RGWCoroutine::Status::set_status()
{
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
string s = status.str();
status.str(string());
if (!timestamp.is_zero()) {
RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data) : completion_mgr(_mgr),
io_id(_io_id),
- user_data(_user_data), lock("RGWAioCompletionNotifier"), registered(true) {
+ user_data(_user_data), registered(true) {
c = librados::Rados::aio_create_completion((void *)this, NULL,
_aio_completion_notifier_cb);
}
void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks,
RGWCompletionManager::io_completion& io, int *blocked_count)
{
- ceph_assert(lock.is_wlocked());
+ ceph_assert(ceph_mutex_is_wlocked(lock));
RGWCoroutinesStack *stack = static_cast<RGWCoroutinesStack *>(io.user_info);
if (context_stacks.find(stack) == context_stacks.end()) {
return;
void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
{
- RWLock::WLocker wl(lock);
+ std::unique_lock wl{lock};
_schedule(env, stack);
}
void RGWCoroutinesManager::_schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
{
- ceph_assert(lock.is_wlocked());
+ ceph_assert(ceph_mutex_is_wlocked(lock));
if (!stack->is_scheduled) {
env->scheduled_stacks->push_back(stack);
stack->set_is_scheduled(true);
uint64_t run_context = ++run_context_count;
- lock.get_write();
+ lock.lock();
set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context];
list<RGWCoroutinesStack *> scheduled_stacks;
for (auto& st : stacks) {
ret = stack->operate(&env);
- lock.get_write();
+ lock.lock();
stack->set_is_scheduled(false);
if (ret < 0) {
while (blocked_count - interval_wait_count >= ops_window) {
lock.unlock();
ret = completion_mgr->get_next(&io);
- lock.get_write();
+ lock.lock();
if (ret < 0) {
ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
}
while (scheduled_stacks.empty() && blocked_count > 0) {
lock.unlock();
ret = completion_mgr->get_next(&io);
- lock.get_write();
+ lock.lock();
if (ret < 0) {
ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
}
}
void RGWCoroutinesManager::dump(Formatter *f) const {
- RWLock::RLocker rl(lock);
+ std::shared_lock rl{lock};
f->open_array_section("run_contexts");
for (auto& i : run_contexts) {
void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr)
{
- RWLock::WLocker wl(lock);
+ std::unique_lock wl{lock};
if (managers.find(mgr) == managers.end()) {
managers.insert(mgr);
get();
void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager *mgr)
{
- RWLock::WLocker wl(lock);
+ std::unique_lock wl{lock};
if (managers.find(mgr) != managers.end()) {
managers.erase(mgr);
put();
const cmdmap_t& cmdmap,
std::string_view format,
bufferlist& out) {
- RWLock::RLocker rl(lock);
+ std::shared_lock rl{lock};
stringstream ss;
JSONFormatter f;
::encode_json("cr_managers", *this, &f);
using NotifierRef = boost::intrusive_ptr<RGWAioCompletionNotifier>;
set<NotifierRef> cns;
- Mutex lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("RGWCompletionManager::lock");
+ ceph::condition_variable cond;
SafeTimer timer;
RGWCompletionManager *completion_mgr;
rgw_io_id io_id;
void *user_data;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("RGWAioCompletionNotifier");
bool registered;
public:
RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data);
~RGWAioCompletionNotifier() override {
c->release();
- lock.Lock();
+ lock.lock();
bool need_unregister = registered;
if (registered) {
completion_mgr->get();
}
registered = false;
- lock.Unlock();
+ lock.unlock();
if (need_unregister) {
completion_mgr->unregister_completion_notifier(this);
completion_mgr->put();
}
void unregister() {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (!registered) {
return;
}
}
void cb() {
- lock.Lock();
+ lock.lock();
if (!registered) {
- lock.Unlock();
+ lock.unlock();
put();
return;
}
completion_mgr->get();
registered = false;
- lock.Unlock();
+ lock.unlock();
completion_mgr->complete(this, io_id, user_data);
completion_mgr->put();
put();
struct Status {
CephContext *cct;
- RWLock lock;
+ ceph::shared_mutex lock =
+ ceph::make_shared_mutex("RGWCoroutine::Status::lock");
int max_history;
utime_t timestamp;
stringstream status;
- explicit Status(CephContext *_cct) : cct(_cct), lock("RGWCoroutine::Status::lock"), max_history(MAX_COROUTINE_HISTORY) {}
+ explicit Status(CephContext *_cct) : cct(_cct), max_history(MAX_COROUTINE_HISTORY) {}
deque<StatusItem> history;
CephContext *cct;
set<RGWCoroutinesManager *> managers;
- RWLock lock;
+ ceph::shared_mutex lock =
+ ceph::make_shared_mutex("RGWCoroutinesRegistry::lock");
string admin_command;
public:
- explicit RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct), lock("RGWCoroutinesRegistry::lock") {}
+ explicit RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct) {}
~RGWCoroutinesManagerRegistry() override;
void add(RGWCoroutinesManager *mgr);
std::atomic<int64_t> max_io_id = { 0 };
- RWLock lock;
+ mutable ceph::shared_mutex lock =
+ ceph::make_shared_mutex("RGWCoroutinesManager::lock");
RGWIOIDProvider io_id_provider;
void put_completion_notifier(RGWAioCompletionNotifier *cn);
public:
- RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), lock("RGWCoroutinesManager::lock"),
+ RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct),
cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {
completion_mgr = new RGWCompletionManager(cct);
if (cr_registry) {
int retcode;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("RGWAsyncRadosRequest::lock");
protected:
virtual int _send_request() = 0;
public:
- RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) : caller(_caller), notifier(_cn), retcode(0),
- lock("RGWAsyncRadosRequest::lock") {
+ RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn)
+ : caller(_caller), notifier(_cn), retcode(0) {
}
~RGWAsyncRadosRequest() override {
if (notifier) {
get();
retcode = _send_request();
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (notifier) {
notifier->cb(); // drops its own ref
notifier = nullptr;
void finish() {
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (notifier) {
// we won't call notifier->cb() to drop its ref, so drop it here
notifier->put();
class RGWAsyncWait : public RGWAsyncRadosRequest {
CephContext *cct;
- Mutex *lock;
- Cond *cond;
- utime_t interval;
+ ceph::mutex *lock;
+ ceph::condition_variable *cond;
+ std::chrono::seconds interval;
protected:
int _send_request() override {
- Mutex::Locker l(*lock);
- return cond->WaitInterval(*lock, interval);
+ std::unique_lock l{*lock};
+ return (cond->wait_for(l, interval) == std::cv_status::timeout ?
+ ETIMEDOUT : 0);
}
public:
RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct,
- Mutex *_lock, Cond *_cond, int _secs) : RGWAsyncRadosRequest(caller, cn),
- cct(_cct),
- lock(_lock), cond(_cond), interval(_secs, 0) {}
+ ceph::mutex *_lock, ceph::condition_variable *_cond, int _secs)
+ : RGWAsyncRadosRequest(caller, cn),
+ cct(_cct),
+ lock(_lock), cond(_cond), interval(_secs) {}
void wakeup() {
- Mutex::Locker l(*lock);
- cond->Signal();
+ std::lock_guard l{*lock};
+ cond->notify_all();
}
};
class RGWWaitCR : public RGWSimpleCoroutine {
CephContext *cct;
RGWAsyncRadosProcessor *async_rados;
- Mutex *lock;
- Cond *cond;
+ ceph::mutex *lock;
+ ceph::condition_variable *cond;
int secs;
RGWAsyncWait *req;
public:
RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct,
- Mutex *_lock, Cond *_cond,
+ ceph::mutex *_lock, ceph::condition_variable *_cond,
int _secs) : RGWSimpleCoroutine(_cct), cct(_cct),
async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) {
}
int interval;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("RGWContinuousLeaseCR");
std::atomic<bool> going_down = { false };
bool locked{false};
: RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
obj(_obj), lock_name(_lock_name),
cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
- interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller)
+ interval(_interval), caller(_caller)
{}
int operate() override;
bool is_locked() {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
return locked;
}
void set_locked(bool status) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
locked = status;
}
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
-RGWCRHTTPGetDataCB::RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, RGWHTTPStreamRWRequest *_req) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), req(_req) {
+RGWCRHTTPGetDataCB::RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, RGWHTTPStreamRWRequest *_req) : env(_env), cr(_cr), req(_req) {
io_id = req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ |RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
req->set_in_cb(this);
}
{
uint64_t bl_len = bl.length();
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (!got_all_extra_data) {
uint64_t max = extra_data_len - extra_data.length();
bool need_to_unpause = false;
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (data.length() == 0) {
return;
};
class RGWCRHTTPGetDataCB : public RGWHTTPStreamRWRequest::ReceiveCB {
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("RGWCRHTTPGetDataCB");
RGWCoroutinesEnv *env;
RGWCoroutine *cr;
RGWHTTPStreamRWRequest *req;
list<rgw_data_change_log_entry>::iterator log_iter;
bool truncated;
- Mutex inc_lock;
- Cond inc_cond;
+ ceph::mutex inc_lock = ceph::make_mutex("RGWDataSyncShardCR::inc_lock");
+ ceph::condition_variable inc_cond;
boost::asio::coroutine incremental_cr;
boost::asio::coroutine full_cr;
pool(_pool),
shard_id(_shard_id),
sync_marker(_marker),
- marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
+ marker_tracker(NULL), truncated(false),
total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT), tn(_tn) {
}
void append_modified_shards(set<string>& keys) {
- Mutex::Locker l(inc_lock);
+ std::lock_guard l{inc_lock};
modified_shards.insert(keys.begin(), keys.end());
}
return set_cr_error(-ECANCELED);
}
current_modified.clear();
- inc_lock.Lock();
+ inc_lock.lock();
current_modified.swap(modified_shards);
- inc_lock.Unlock();
+ inc_lock.unlock();
if (current_modified.size() > 0) {
tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
}
void append_modified_shards(set<string>& keys) {
- Mutex::Locker l(cr_lock());
+ std::lock_guard l{cr_lock()};
RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
if (!cr) {
RGWDataSyncShardMarkerTrack *marker_tracker;
- Mutex shard_crs_lock;
+ ceph::mutex shard_crs_lock =
+ ceph::make_mutex("RGWDataSyncCR::shard_crs_lock");
map<int, RGWDataSyncShardControlCR *> shard_crs;
bool *reset_backoff;
sync_env(_sync_env),
num_shards(_num_shards),
marker_tracker(NULL),
- shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
reset_backoff(_reset_backoff), tn(_tn) {
}
RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->svc.zone->get_zone_params().log_pool,
iter->first, iter->second, tn);
cr->get();
- shard_crs_lock.Lock();
+ shard_crs_lock.lock();
shard_crs[iter->first] = cr;
- shard_crs_lock.Unlock();
+ shard_crs_lock.unlock();
spawn(cr, true);
}
}
}
void wakeup(int shard_id, set<string>& keys) {
- Mutex::Locker l(shard_crs_lock);
+ std::lock_guard l{shard_crs_lock};
map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
if (iter == shard_crs.end()) {
return;
}
void wakeup(int shard_id, set<string>& keys) {
- Mutex& m = cr_lock();
+ ceph::mutex& m = cr_lock();
- m.Lock();
+ m.lock();
RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
if (!cr) {
- m.Unlock();
+ m.unlock();
return;
}
cr->get();
- m.Unlock();
+ m.unlock();
if (cr) {
tn->log(20, SSTR("notify shard=" << shard_id << " keys=" << keys));
};
void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
- RWLock::RLocker rl(lock);
+ std::shared_lock rl{lock};
if (!data_sync_cr) {
return;
}
int RGWRemoteDataLog::run_sync(int num_shards)
{
- lock.get_write();
+ lock.lock();
data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards, tn);
data_sync_cr->get(); // run() will drop a ref, so take another
lock.unlock();
int r = run(data_sync_cr);
- lock.get_write();
+ lock.lock();
data_sync_cr->put();
data_sync_cr = NULL;
lock.unlock();
RGWDataSyncEnv sync_env;
- RWLock lock;
+ ceph::shared_mutex lock = ceph::make_shared_mutex("RGWRemoteDataLog::lock");
RGWDataSyncControlCR *data_sync_cr;
RGWSyncTraceNodeRef tn;
: RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
dpp(dpp), store(_store), async_rados(async_rados),
http_manager(store->ctx(), completion_mgr),
- lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
+ data_sync_cr(NULL),
initialized(false) {}
int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger,
RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& module,
secs -= end.sec();
- lock.Lock();
- cond.WaitInterval(lock, utime_t(secs, 0));
- lock.Unlock();
+ std::unique_lock locker{lock};
+ cond.wait_for(locker, std::chrono::seconds(secs));
} while (!gc->going_down());
return NULL;
void RGWGC::GCWorker::stop()
{
- Mutex::Locker l(lock);
- cond.Signal();
+ std::lock_guard l{lock};
+ cond.notify_all();
}
#include "include/types.h"
#include "include/rados/librados.hpp"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "common/Cond.h"
#include "common/Thread.h"
#include "rgw_common.h"
const DoutPrefixProvider *dpp;
CephContext *cct;
RGWGC *gc;
- Mutex lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("GCWorker");
+ ceph::condition_variable cond;
public:
- GCWorker(const DoutPrefixProvider *_dpp, CephContext *_cct, RGWGC *_gc) : dpp(_dpp), cct(_cct), gc(_gc), lock("GCWorker") {}
+ GCWorker(const DoutPrefixProvider *_dpp, CephContext *_cct, RGWGC *_gc) : dpp(_dpp), cct(_cct), gc(_gc) {}
void *entry() override;
void stop();
};
bool write_paused{false};
bool read_paused{false};
- Mutex lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("rgw_http_req_data::lock");
+ ceph::condition_variable cond;
using Signature = void(boost::system::error_code);
using Completion = ceph::async::Completion<Signature>;
std::unique_ptr<Completion> completion;
- rgw_http_req_data() : id(-1), lock("rgw_http_req_data::lock") {
+ rgw_http_req_data() : id(-1) {
memset(error_buf, 0, sizeof(error_buf));
}
}
int wait(optional_yield y) {
- Mutex::Locker l(lock);
+ std::unique_lock l{lock};
if (done) {
return ret;
}
dout(20) << "WARNING: blocking http request" << dendl;
}
#endif
- cond.Wait(lock);
+ cond.wait(l);
return ret;
}
void set_state(int bitmask);
void finish(int r) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
ret = r;
if (curl_handle)
do_curl_easy_cleanup(curl_handle);
boost::system::error_code ec(-ret, boost::system::system_category());
Completion::post(std::move(completion), ec);
} else {
- cond.Signal();
+ cond.notify_all();
}
}
}
bool is_done() {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
return done;
}
int get_retcode() {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
return ret;
}
RGWHTTPManager *get_manager() {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
return mgr;
}
#define MAXIDLE 5
class RGWCurlHandles : public Thread {
public:
- Mutex cleaner_lock;
- std::vector<RGWCurlHandle*>saved_curl;
+ ceph::mutex cleaner_lock = ceph::make_mutex("RGWCurlHandles::cleaner_lock");
+ std::vector<RGWCurlHandle*> saved_curl;
int cleaner_shutdown;
- Cond cleaner_cond;
+ ceph::condition_variable cleaner_cond;
RGWCurlHandles() :
- cleaner_lock{"RGWCurlHandles::cleaner_lock"},
cleaner_shutdown{0} {
}
RGWCurlHandle* curl = 0;
CURL* h;
{
- Mutex::Locker lock(cleaner_lock);
+ std::lock_guard lock{cleaner_lock};
if (!saved_curl.empty()) {
curl = *saved_curl.begin();
saved_curl.erase(saved_curl.begin());
release_curl_handle_now(curl);
} else {
curl_easy_reset(**curl);
- Mutex::Locker lock(cleaner_lock);
+ std::lock_guard lock{cleaner_lock};
curl->lastuse = mono_clock::now();
saved_curl.insert(saved_curl.begin(), 1, curl);
}
void* RGWCurlHandles::entry()
{
RGWCurlHandle* curl;
- Mutex::Locker lock(cleaner_lock);
+ std::unique_lock lock{cleaner_lock};
for (;;) {
if (cleaner_shutdown) {
if (saved_curl.empty())
break;
} else {
- utime_t release = ceph_clock_now() + utime_t(MAXIDLE,0);
- cleaner_cond.WaitUntil(cleaner_lock, release);
+ cleaner_cond.wait_for(lock, std::chrono::seconds(MAXIDLE));
}
mono_time now = mono_clock::now();
while (!saved_curl.empty()) {
void RGWCurlHandles::stop()
{
- Mutex::Locker lock(cleaner_lock);
+ std::lock_guard lock{cleaner_lock};
cleaner_shutdown = 1;
- cleaner_cond.Signal();
+ cleaner_cond.notify_all();
}
void RGWCurlHandles::flush_curl_handles()
rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
size_t len = size * nmemb;
- Mutex::Locker l(req_data->lock);
+ std::lock_guard l{req_data->lock};
if (!req_data->registered) {
return len;
RGWHTTPClient *client;
{
- Mutex::Locker l(req_data->lock);
+ std::lock_guard l{req_data->lock};
if (!req_data->registered) {
return len;
}
if (pause) {
dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl;
skip_bytes = len;
- Mutex::Locker l(req_data->lock);
+ std::lock_guard l{req_data->lock};
req_data->read_paused = true;
return CURL_WRITEFUNC_PAUSE;
}
RGWHTTPClient *client;
{
- Mutex::Locker l(req_data->lock);
+ std::lock_guard l{req_data->lock};
if (!req_data->registered) {
return 0;
if (ret == 0 &&
pause) {
- Mutex::Locker l(req_data->lock);
+ std::lock_guard l{req_data->lock};
req_data->write_paused = true;
return CURL_READFUNC_PAUSE;
}
return ret;
}
-Mutex& RGWHTTPClient::get_req_lock()
+ceph::mutex& RGWHTTPClient::get_req_lock()
{
return req_data->lock;
}
void RGWHTTPClient::_set_write_paused(bool pause)
{
- ceph_assert(req_data->lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(req_data->lock));
RGWHTTPManager *mgr = req_data->mgr;
if (pause == req_data->write_paused) {
void RGWHTTPClient::_set_read_paused(bool pause)
{
- ceph_assert(req_data->lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(req_data->lock));
RGWHTTPManager *mgr = req_data->mgr;
if (pause == req_data->read_paused) {
* RGWHTTPManager has two modes of operation: threaded and non-threaded.
*/
RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
- completion_mgr(_cm), is_started(false),
- reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
- reqs_thread(NULL)
+ completion_mgr(_cm)
{
multi_handle = (void *)curl_multi_init();
thread_pipe[0] = -1;
void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
{
- RWLock::WLocker rl(reqs_lock);
+ std::unique_lock rl{reqs_lock};
req_data->id = num_reqs;
req_data->registered = true;
reqs[num_reqs] = req_data;
bool RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
{
- RWLock::WLocker rl(reqs_lock);
+ std::unique_lock rl{reqs_lock};
if (!req_data->registered) {
return false;
}
void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
{
- RWLock::WLocker rl(reqs_lock);
+ std::unique_lock rl{reqs_lock};
_complete_request(req_data);
}
reqs.erase(iter);
}
{
- Mutex::Locker l(req_data->lock);
+ std::lock_guard l{req_data->lock};
req_data->mgr = nullptr;
}
if (completion_mgr) {
void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
{
- RWLock::WLocker wl(reqs_lock);
+ std::unique_lock wl{reqs_lock};
_unlink_request(req_data);
}
void RGWHTTPManager::manage_pending_requests()
{
- reqs_lock.get_read();
+ reqs_lock.lock_shared();
if (max_threaded_req == num_reqs &&
unregistered_reqs.empty() &&
reqs_change_state.empty()) {
- reqs_lock.unlock();
+ reqs_lock.unlock_shared();
return;
}
- reqs_lock.unlock();
+ reqs_lock.unlock_shared();
- RWLock::WLocker wl(reqs_lock);
+ std::unique_lock wl{reqs_lock};
if (!unregistered_reqs.empty()) {
for (auto& r : unregistered_reqs) {
{
rgw_http_req_data *req_data = client->get_req_data();
- ceph_assert(req_data->lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(req_data->lock));
/* can only do that if threaded */
if (!is_started) {
}
- RWLock::WLocker rl(reqs_lock);
+ std::unique_lock rl{reqs_lock};
for (auto r : unregistered_reqs) {
_unlink_request(r);
}
size_t nmemb,
void *_info);
- Mutex& get_req_lock();
+ ceph::mutex& get_req_lock();
/* needs to be called under req_lock() */
void _set_write_paused(bool pause);
CephContext *cct;
RGWCompletionManager *completion_mgr;
void *multi_handle;
- bool is_started;
+ bool is_started = false;
std::atomic<unsigned> going_down { 0 };
std::atomic<unsigned> is_stopped { 0 };
- RWLock reqs_lock;
+ ceph::shared_mutex reqs_lock = ceph::make_shared_mutex("RGWHTTPManager::reqs_lock");
map<uint64_t, rgw_http_req_data *> reqs;
list<rgw_http_req_data *> unregistered_reqs;
list<set_state> reqs_change_state;
map<uint64_t, rgw_http_req_data *> complete_reqs;
- int64_t num_reqs;
- int64_t max_threaded_req;
+ int64_t num_reqs = 0;
+ int64_t max_threaded_req = 0;
int thread_pipe[2];
void register_request(rgw_http_req_data *req_data);
void *entry() override;
};
- ReqsThread *reqs_thread;
+ ReqsThread *reqs_thread = nullptr;
void *reqs_thread_entry();
bool TokenCache::find(const std::string& token_id,
rgw::keystone::TokenEnvelope& token)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
return find_locked(token_id, token);
}
bool TokenCache::find_locked(const std::string& token_id,
rgw::keystone::TokenEnvelope& token)
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
map<string, token_entry>::iterator iter = tokens.find(token_id);
if (iter == tokens.end()) {
if (perfcounter) perfcounter->inc(l_rgw_keystone_token_cache_miss);
bool TokenCache::find_admin(rgw::keystone::TokenEnvelope& token)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
return find_locked(admin_token_id, token);
}
bool TokenCache::find_barbican(rgw::keystone::TokenEnvelope& token)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
return find_locked(barbican_token_id, token);
}
void TokenCache::add(const std::string& token_id,
const rgw::keystone::TokenEnvelope& token)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
add_locked(token_id, token);
}
void TokenCache::add_locked(const std::string& token_id,
const rgw::keystone::TokenEnvelope& token)
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
map<string, token_entry>::iterator iter = tokens.find(token_id);
if (iter != tokens.end()) {
token_entry& e = iter->second;
void TokenCache::add_admin(const rgw::keystone::TokenEnvelope& token)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
rgw_get_token_id(token.token.id, admin_token_id);
add_locked(admin_token_id, token);
void TokenCache::add_barbican(const rgw::keystone::TokenEnvelope& token)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
rgw_get_token_id(token.token.id, barbican_token_id);
add_locked(barbican_token_id, token);
void TokenCache::invalidate(const std::string& token_id)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
map<string, token_entry>::iterator iter = tokens.find(token_id);
if (iter == tokens.end())
return;
#include "rgw_common.h"
#include "rgw_http_client.h"
-#include "common/Cond.h"
+#include "common/ceph_mutex.h"
#include "global/global_init.h"
#include <atomic>
std::map<std::string, token_entry> tokens;
std::list<std::string> tokens_lru;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("rgw::keystone::TokenCache");
const size_t max;
explicit TokenCache(const rgw::keystone::Config& config)
: cct(g_ceph_context),
- lock("rgw::keystone::TokenCache"),
max(cct->_conf->rgw_keystone_token_cache_size) {
}
ldpp_dout(dpp, 5) << "schedule life cycle next start time: " << rgw_to_asctime(next) << dendl;
- lock.Lock();
- cond.WaitInterval(lock, utime_t(secs, 0));
- lock.Unlock();
+ std::unique_lock l{lock};
+ cond.wait_for(l, std::chrono::seconds(secs));
} while (!lc->going_down());
return NULL;
void RGWLC::LCWorker::stop()
{
- Mutex::Locker l(lock);
- cond.Signal();
+ std::lock_guard l{lock};
+ cond.notify_all();
}
bool RGWLC::going_down()
#include "include/types.h"
#include "include/rados/librados.hpp"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "common/Cond.h"
#include "common/iso_8601.h"
#include "common/Thread.h"
const DoutPrefixProvider *dpp;
CephContext *cct;
RGWLC *lc;
- Mutex lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("LCWorker");
+ ceph::condition_variable cond;
public:
- LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWLC *_lc) : dpp(_dpp), cct(_cct), lc(_lc), lock("LCWorker") {}
+ LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWLC *_lc) : dpp(_dpp), cct(_cct), lc(_lc) {}
void *entry() override;
void stop();
bool should_work(utime_t& now);
CephContext *cct;
RGWRados *store;
map<rgw_user_bucket, RGWUsageBatch> usage_map;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("UsageLogger");
int32_t num_entries;
- Mutex timer_lock;
+ ceph::mutex timer_lock = ceph::make_mutex("UsageLogger::timer_lock");
SafeTimer timer;
utime_t round_timestamp;
}
public:
- UsageLogger(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("UsageLogger"), num_entries(0), timer_lock("UsageLogger::timer_lock"), timer(cct, timer_lock) {
+ UsageLogger(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), num_entries(0), timer(cct, timer_lock) {
timer.init();
- Mutex::Locker l(timer_lock);
+ std::lock_guard l{timer_lock};
set_timer();
utime_t ts = ceph_clock_now();
recalc_round_timestamp(ts);
}
~UsageLogger() {
- Mutex::Locker l(timer_lock);
+ std::lock_guard l{timer_lock};
flush();
timer.cancel_all_events();
timer.shutdown();
}
void insert_user(utime_t& timestamp, const rgw_user& user, rgw_usage_log_entry& entry) {
- lock.Lock();
+ lock.lock();
if (timestamp.sec() > round_timestamp + 3600)
recalc_round_timestamp(timestamp);
entry.epoch = round_timestamp.sec();
if (account)
num_entries++;
bool need_flush = (num_entries > cct->_conf->rgw_usage_log_flush_threshold);
- lock.Unlock();
+ lock.unlock();
if (need_flush) {
- Mutex::Locker l(timer_lock);
+ std::lock_guard l{timer_lock};
flush();
}
}
void flush() {
map<rgw_user_bucket, RGWUsageBatch> old_map;
- lock.Lock();
+ lock.lock();
old_map.swap(usage_map);
num_entries = 0;
- lock.Unlock();
+ lock.unlock();
store->log_usage(old_map);
}
bl.append("[");
}
-OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog), lock("OpsLogSocket")
+OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog)
{
formatter = new JSONFormatter;
delim.append(",\n");
{
bufferlist bl;
- lock.Lock();
+ lock.lock();
rgw_format_ops_log_entry(entry, formatter);
formatter_to_bl(bl);
- lock.Unlock();
+ lock.unlock();
append_output(bl);
}
class OpsLogSocket : public OutputDataSocket {
Formatter *formatter;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("OpsLogSocket");
void formatter_to_bl(bufferlist& bl);
if (g_conf()->daemonize) {
global_init_daemonize(g_ceph_context);
}
- Mutex mutex("main");
+ ceph::mutex mutex = ceph::make_mutex("main");
SafeTimer init_timer(g_ceph_context, mutex);
init_timer.init();
- mutex.Lock();
+ mutex.lock();
init_timer.add_event_after(g_conf()->rgw_init_timeout, new C_InitTimeout);
- mutex.Unlock();
+ mutex.unlock();
common_init_finish(g_ceph_context);
g_conf().get_val<bool>("rgw_dynamic_resharding"),
g_conf()->rgw_cache_enabled);
if (!store) {
- mutex.Lock();
+ mutex.lock();
init_timer.cancel_all_events();
init_timer.shutdown();
- mutex.Unlock();
+ mutex.unlock();
derr << "Couldn't init storage provider (RADOS)" << dendl;
return EIO;
rgw_rest_init(g_ceph_context, store, store->svc.zone->get_zonegroup());
- mutex.Lock();
+ mutex.lock();
init_timer.cancel_all_events();
init_timer.shutdown();
- mutex.Unlock();
+ mutex.unlock();
rgw_user_init(store);
rgw_bucket_init(store->meta_mgr);
}
lock.unlock();
- RWLock::WLocker wl(lock);
+ std::unique_lock wl{lock};
modified_shards.insert(shard_id);
}
void RGWMetadataLog::read_clear_modified(set<int> &modified)
{
- RWLock::WLocker wl(lock);
+ std::unique_lock wl{lock};
modified.swap(modified_shards);
modified_shards.clear();
}
secs -= end.sec();
- lock.Lock();
- cond.WaitInterval(lock, utime_t(secs, 0));
- lock.Unlock();
+ std::unique_lock l{lock};
+ cond.wait_for(l, std::chrono::seconds(secs));
} while (!oe->going_down());
return NULL;
void RGWObjectExpirer::OEWorker::stop()
{
- Mutex::Locker l(lock);
- cond.Signal();
+ std::lock_guard l{lock};
+ cond.notify_all();
}
#include "common/Formatter.h"
#include "common/errno.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "common/Cond.h"
#include "common/Thread.h"
class OEWorker : public Thread {
CephContext *cct;
RGWObjectExpirer *oe;
- Mutex lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("OEWorker");
+ ceph::condition_variable cond;
public:
OEWorker(CephContext * const cct,
RGWObjectExpirer * const oe)
: cct(cct),
- oe(oe),
- lock("OEWorker") {
+ oe(oe) {
}
void *entry() override;
#include "common/lru_map.h"
#include "common/RefCountedObj.h"
#include "common/Thread.h"
-#include "common/Mutex.h"
-#include "common/RWLock.h"
+#include "common/ceph_mutex.h"
#include "rgw_common.h"
#include "rgw_rados.h"
CephContext *cct;
RGWUserStatsCache *stats;
- Mutex lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::BucketsSyncThread");
+ ceph::condition_variable cond;
public:
- BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::BucketsSyncThread") {}
+ BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {}
void *entry() override {
ldout(cct, 20) << "BucketsSyncThread: start" << dendl;
if (stats->going_down())
break;
- lock.Lock();
- cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_bucket_sync_interval, 0));
- lock.Unlock();
+ std::unique_lock locker{lock};
+ cond.wait_for(
+ locker,
+ std::chrono::seconds(cct->_conf->rgw_user_quota_bucket_sync_interval));
} while (!stats->going_down());
ldout(cct, 20) << "BucketsSyncThread: done" << dendl;
}
void stop() {
- Mutex::Locker l(lock);
- cond.Signal();
+ std::lock_guard l{lock};
+ cond.notify_all();
}
};
CephContext *cct;
RGWUserStatsCache *stats;
- Mutex lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::UserSyncThread");
+ ceph::condition_variable cond;
public:
- UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::UserSyncThread") {}
+ UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {}
void *entry() override {
ldout(cct, 20) << "UserSyncThread: start" << dendl;
if (stats->going_down())
break;
- lock.Lock();
- cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_sync_interval, 0));
- lock.Unlock();
+ std::unique_lock l{lock};
+ cond.wait_for(l, std::chrono::seconds(cct->_conf->rgw_user_quota_sync_interval));
} while (!stats->going_down());
ldout(cct, 20) << "UserSyncThread: done" << dendl;
}
void stop() {
- Mutex::Locker l(lock);
- cond.Signal();
+ std::lock_guard l{lock};
+ cond.notify_all();
}
};
void *RGWRadosThread::Worker::entry() {
uint64_t msec = processor->interval_msec();
- utime_t interval = utime_t(msec / 1000, (msec % 1000) * 1000000);
+ auto interval = std::chrono::milliseconds(msec);
do {
- utime_t start = ceph_clock_now();
+ auto start = ceph::real_clock::now();
int r = processor->process();
if (r < 0) {
dout(0) << "ERROR: processor->process() returned error r=" << r << dendl;
if (processor->going_down())
break;
- utime_t end = ceph_clock_now();
- end -= start;
+ auto end = ceph::real_clock::now() - start;
uint64_t cur_msec = processor->interval_msec();
if (cur_msec != msec) { /* was it reconfigured? */
msec = cur_msec;
- interval = utime_t(msec / 1000, (msec % 1000) * 1000000);
+ interval = std::chrono::milliseconds(msec);
}
if (cur_msec > 0) {
if (interval <= end)
continue; // next round
- utime_t wait_time = interval;
- wait_time -= end;
-
+ auto wait_time = interval - end;
wait_interval(wait_time);
} else {
wait();
void RGWRados::wakeup_meta_sync_shards(set<int>& shard_ids)
{
- Mutex::Locker l(meta_sync_thread_lock);
+ std::lock_guard l{meta_sync_thread_lock};
if (meta_sync_processor_thread) {
meta_sync_processor_thread->wakeup_sync_shards(shard_ids);
}
void RGWRados::wakeup_data_sync_shards(const string& source_zone, map<int, set<string> >& shard_ids)
{
ldout(ctx(), 20) << __func__ << ": source_zone=" << source_zone << ", shard_ids=" << shard_ids << dendl;
- Mutex::Locker l(data_sync_thread_lock);
+ std::lock_guard l{data_sync_thread_lock};
map<string, RGWDataSyncProcessorThread *>::iterator iter = data_sync_processor_threads.find(source_zone);
if (iter == data_sync_processor_threads.end()) {
ldout(ctx(), 10) << __func__ << ": couldn't find sync thread for zone " << source_zone << ", skipping async data sync processing" << dendl;
RGWMetaSyncStatusManager* RGWRados::get_meta_sync_manager()
{
- Mutex::Locker l(meta_sync_thread_lock);
+ std::lock_guard l{meta_sync_thread_lock};
if (meta_sync_processor_thread) {
return meta_sync_processor_thread->get_manager();
}
RGWDataSyncStatusManager* RGWRados::get_data_sync_manager(const std::string& source_zone)
{
- Mutex::Locker l(data_sync_thread_lock);
+ std::lock_guard l{data_sync_thread_lock};
auto thread = data_sync_processor_threads.find(source_zone);
if (thread == data_sync_processor_threads.end()) {
return nullptr;
class RGWIndexCompletionManager;
struct complete_op_data {
- Mutex lock{"complete_op_data"};
+ ceph::mutex lock = ceph::make_mutex("complete_op_data");
AioCompletion *rados_completion{nullptr};
int manager_shard_id{-1};
RGWIndexCompletionManager *manager{nullptr};
bool stopped{false};
void stop() {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
stopped = true;
}
};
list<complete_op_data *> completions;
- Mutex completions_lock;
+ ceph::mutex completions_lock =
+ ceph::make_mutex("RGWIndexCompletionThread::completions_lock");
public:
RGWIndexCompletionThread(RGWRados *_store)
- : RGWRadosThread(_store, "index-complete"), store(_store), completions_lock("RGWIndexCompletionThread::completions_lock") {}
+ : RGWRadosThread(_store, "index-complete"), store(_store) {}
int process() override;
void add_completion(complete_op_data *completion) {
{
- Mutex::Locker l(completions_lock);
+ std::lock_guard l{completions_lock};
completions.push_back(completion);
}
list<complete_op_data *> comps;
{
- Mutex::Locker l(completions_lock);
+ std::lock_guard l{completions_lock};
completions.swap(comps);
}
class RGWIndexCompletionManager {
RGWRados *store{nullptr};
- vector<Mutex *> locks;
+ ceph::containers::tiny_vector<ceph::mutex> locks;
vector<set<complete_op_data *> > completions;
RGWIndexCompletionThread *completion_thread{nullptr};
public:
- RGWIndexCompletionManager(RGWRados *_store) : store(_store) {
+ RGWIndexCompletionManager(RGWRados *_store) :
+ store(_store),
+ locks{ceph::make_lock_container<ceph::mutex>(
+ store->ctx()->_conf->rgw_thread_pool_size,
+ [](const size_t i) {
+ return ceph::make_mutex("RGWIndexCompletionManager::lock::" +
+ std::to_string(i));
+ })}
+ {
num_shards = store->ctx()->_conf->rgw_thread_pool_size;
-
- for (int i = 0; i < num_shards; i++) {
- char buf[64];
- snprintf(buf, sizeof(buf), "RGWIndexCompletionManager::lock::%d", i);
- locks.push_back(new Mutex(buf));
- }
-
completions.resize(num_shards);
}
~RGWIndexCompletionManager() {
stop();
-
- for (auto l : locks) {
- delete l;
- }
}
int next_shard() {
}
for (int i = 0; i < num_shards; ++i) {
- Mutex::Locker l(*locks[i]);
+ std::lock_guard l{locks[i]};
for (auto c : completions[i]) {
c->stop();
}
static void obj_complete_cb(completion_t cb, void *arg)
{
complete_op_data *completion = (complete_op_data *)arg;
- completion->lock.Lock();
+ completion->lock.lock();
if (completion->stopped) {
- completion->lock.Unlock(); /* can drop lock, no one else is referencing us */
+ completion->lock.unlock(); /* can drop lock, no one else is referencing us */
delete completion;
return;
}
bool need_delete = completion->manager->handle_completion(cb, completion);
- completion->lock.Unlock();
+ completion->lock.unlock();
if (need_delete) {
delete completion;
}
entry->rados_completion = librados::Rados::aio_create_completion(entry, NULL, obj_complete_cb);
- Mutex::Locker l(*locks[shard_id]);
+ std::lock_guard l{locks[shard_id]};
completions[shard_id].insert(entry);
}
{
int shard_id = arg->manager_shard_id;
{
- Mutex::Locker l(*locks[shard_id]);
+ std::lock_guard l{locks[shard_id]};
auto& comps = completions[shard_id];
{
cct->get_admin_socket()->unregister_commands(this);
if (run_sync_thread) {
- Mutex::Locker l(meta_sync_thread_lock);
+ std::lock_guard l{meta_sync_thread_lock};
meta_sync_processor_thread->stop();
- Mutex::Locker dl(data_sync_thread_lock);
+ std::lock_guard dl{data_sync_thread_lock};
for (auto iter : data_sync_processor_threads) {
RGWDataSyncProcessorThread *thread = iter.second;
thread->stop();
if (run_sync_thread) {
delete meta_sync_processor_thread;
meta_sync_processor_thread = NULL;
- Mutex::Locker dl(data_sync_thread_lock);
+ std::lock_guard dl{data_sync_thread_lock};
for (auto iter : data_sync_processor_threads) {
RGWDataSyncProcessorThread *thread = iter.second;
delete thread;
<< pt.second.name << " present in zonegroup" << dendl;
}
}
- Mutex::Locker l(meta_sync_thread_lock);
+ std::lock_guard l{meta_sync_thread_lock};
meta_sync_processor_thread = new RGWMetaSyncProcessorThread(this, async_rados);
ret = meta_sync_processor_thread->init();
if (ret < 0) {
}
data_log->set_observer(&*bucket_trim);
- Mutex::Locker dl(data_sync_thread_lock);
+ std::lock_guard dl{data_sync_thread_lock};
for (auto source_zone : svc.zone->get_data_sync_source_zones()) {
ldout(cct, 5) << "starting data sync thread for zone " << source_zone->name << dendl;
auto *thread = new RGWDataSyncProcessorThread(this, async_rados, source_zone);
map<RGWObjCategory, RGWStorageStats> stats;
int ret_code;
bool should_cb;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("RGWGetBucketStatsContext");
public:
RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb, uint32_t _pendings)
- : cb(_cb), pendings(_pendings), stats(), ret_code(0), should_cb(true),
- lock("RGWGetBucketStatsContext") {}
+ : cb(_cb), pendings(_pendings), stats(), ret_code(0), should_cb(true)
+ {}
void handle_response(int r, rgw_bucket_dir_header& header) override {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (should_cb) {
if ( r >= 0) {
accumulate_raw_stats(header, stats);
}
void unset_cb() {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
should_cb = false;
}
};
uint64_t RGWRados::next_bucket_id()
{
- Mutex::Locker l(bucket_id_lock);
+ std::lock_guard l{bucket_id_lock};
return ++max_bucket_id;
}
class RGWObjectCtx {
RGWRados *store;
- RWLock lock{"RGWObjectCtx"};
+ ceph::shared_mutex lock = ceph::make_shared_mutex("RGWObjectCtx");
void *s{nullptr};
std::map<rgw_obj, RGWObjState> objs_state;
RGWObjState *get_state(const rgw_obj& obj) {
RGWObjState *result;
typename std::map<rgw_obj, RGWObjState>::iterator iter;
- lock.get_read();
+ lock.lock_shared();
assert (!obj.empty());
iter = objs_state.find(obj);
if (iter != objs_state.end()) {
result = &iter->second;
- lock.unlock();
+ lock.unlock_shared();
} else {
- lock.unlock();
- lock.get_write();
+ lock.unlock_shared();
+ lock.lock();
result = &objs_state[obj];
lock.unlock();
}
}
void set_atomic(rgw_obj& obj) {
- RWLock::WLocker wl(lock);
+ std::unique_lock wl{lock};
assert (!obj.empty());
objs_state[obj].is_atomic = true;
}
void set_prefetch_data(const rgw_obj& obj) {
- RWLock::WLocker wl(lock);
+ std::unique_lock wl{lock};
assert (!obj.empty());
objs_state[obj].prefetch_data = true;
}
void invalidate(const rgw_obj& obj) {
- RWLock::WLocker wl(lock);
+ std::unique_lock wl{lock};
auto iter = objs_state.find(obj);
if (iter == objs_state.end()) {
return;
void get_bucket_instance_ids(const RGWBucketInfo& bucket_info, int shard_id, map<int, string> *result);
std::atomic<int64_t> max_req_id = { 0 };
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("rados_timer_lock");
SafeTimer *timer;
RGWGC *gc;
boost::optional<rgw::BucketTrimManager> bucket_trim;
RGWSyncLogTrimThread *sync_log_trimmer{nullptr};
- Mutex meta_sync_thread_lock;
- Mutex data_sync_thread_lock;
+ ceph::mutex meta_sync_thread_lock = ceph::make_mutex("meta_sync_thread_lock");
+ ceph::mutex data_sync_thread_lock = ceph::make_mutex("data_sync_thread_lock");
librados::IoCtx root_pool_ctx; // .rgw
friend class RGWWatcher;
- Mutex bucket_id_lock;
+ ceph::mutex bucket_id_lock = ceph::make_mutex("rados_bucket_id");
// This field represents the number of bucket index object shards
uint32_t bucket_index_max_shards;
bool use_cache{false};
public:
- RGWRados(): lock("rados_timer_lock"), timer(NULL),
+ RGWRados(): timer(NULL),
gc(NULL), lc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false),
run_sync_thread(false), run_reshard_thread(false), async_rados(nullptr), meta_notifier(NULL),
data_notifier(NULL), meta_sync_processor_thread(NULL),
- meta_sync_thread_lock("meta_sync_thread_lock"), data_sync_thread_lock("data_sync_thread_lock"),
- bucket_id_lock("rados_bucket_id"),
bucket_index_max_shards(0),
max_bucket_id(0), cct(NULL),
binfo_cache(NULL), obj_tombstone_cache(nullptr),
class Worker : public Thread {
CephContext *cct;
RGWRadosThread *processor;
- Mutex lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("RGWRadosThread::Worker");
+ ceph::condition_variable cond;
void wait() {
- Mutex::Locker l(lock);
- cond.Wait(lock);
+ std::unique_lock l{lock};
+ cond.wait(l);
};
- void wait_interval(const utime_t& wait_time) {
- Mutex::Locker l(lock);
- cond.WaitInterval(lock, wait_time);
+ void wait_interval(const ceph::real_clock::duration& wait_time) {
+ std::unique_lock l{lock};
+ cond.wait_for(l, wait_time);
}
public:
- Worker(CephContext *_cct, RGWRadosThread *_p) : cct(_cct), processor(_p), lock("RGWRadosThread::Worker") {}
+ Worker(CephContext *_cct, RGWRadosThread *_p) : cct(_cct), processor(_p) {}
void *entry() override;
void signal() {
- Mutex::Locker l(lock);
- cond.Signal();
+ std::lock_guard l{lock};
+ cond.notify_all();
}
};
service_map_meta(service_map_meta),
frontends(frontends),
timer(store->ctx(), mutex, USE_SAFE_TIMER_CALLBACKS),
- mutex("RGWRealmReloader"),
+ mutex(ceph::make_mutex("RGWRealmReloader")),
reload_scheduled(nullptr)
{
timer.init();
RGWRealmReloader::~RGWRealmReloader()
{
- Mutex::Locker lock(mutex);
+ std::lock_guard lock{mutex};
timer.shutdown();
}
CephContext *const cct = store->ctx();
- Mutex::Locker lock(mutex);
+ std::lock_guard lock{mutex};
if (reload_scheduled) {
ldout(cct, 4) << "Notification on realm, reconfiguration "
"already scheduled" << dendl;
}
reload_scheduled = new C_Reload(this);
- cond.SignalOne(); // wake reload() if it blocked on a bad configuration
+ cond.notify_one(); // wake reload() if it blocked on a bad configuration
// schedule reload() without delay
timer.add_event_after(0, reload_scheduled);
{
// allow a new notify to reschedule us. it's important that we do this
// before we start loading the new realm, or we could miss some updates
- Mutex::Locker lock(mutex);
+ std::lock_guard lock{mutex};
reload_scheduled = nullptr;
}
RGWRados* store_cleanup = nullptr;
{
- Mutex::Locker lock(mutex);
+ std::unique_lock lock{mutex};
// failure to recreate RGWRados is not a recoverable error, but we
// don't want to assert or abort the entire cluster. instead, just
"configuration update. Waiting for a new update." << dendl;
// sleep until another event is scheduled
- while (!reload_scheduled)
- cond.Wait(mutex);
-
+ cond.wait(lock, [this] { return reload_scheduled; });
ldout(cct, 1) << "Woke up with a new configuration, retrying "
"RGWRados initialization." << dendl;
}
/// Finisher because it allows us to cancel events that were scheduled while
/// reload() is still running
SafeTimer timer;
- Mutex mutex; //< protects access to timer and reload_scheduled
- Cond cond; //< to signal reload() after an invalid realm config
+ ceph::mutex mutex; //< protects access to timer and reload_scheduled
+ ceph::condition_variable cond; //< to signal reload() after an invalid realm config
C_Reload* reload_scheduled; //< reload() context if scheduled
};
secs -= end.sec();
- lock.Lock();
- cond.WaitInterval(lock, utime_t(secs, 0));
- lock.Unlock();
+ std::unique_lock locker{lock};
+ cond.wait_for(locker, std::chrono::seconds(secs));
} while (!reshard->going_down());
return NULL;
void RGWReshard::ReshardWorker::stop()
{
- Mutex::Locker l(lock);
- cond.Signal();
+ std::lock_guard l{lock};
+ cond.notify_all();
}
class ReshardWorker : public Thread {
CephContext *cct;
RGWReshard *reshard;
- Mutex lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("ReshardWorker");
+ ceph::condition_variable cond;
public:
ReshardWorker(CephContext * const _cct,
- RGWReshard * const _reshard)
+ RGWReshard * const _reshard)
: cct(_cct),
- reshard(_reshard),
- lock("ReshardWorker") {
+ reshard(_reshard) {
}
void *entry() override;
}
void RGWHTTPStreamRWRequest::set_stream_write(bool s) {
- Mutex::Locker wl(write_lock);
+ std::lock_guard wl{write_lock};
stream_writes = s;
}
void RGWHTTPStreamRWRequest::unpause_receive()
{
- Mutex::Locker req_locker(get_req_lock());
+ std::lock_guard req_locker{get_req_lock()};
if (!read_paused) {
_set_read_paused(false);
}
void RGWHTTPStreamRWRequest::add_send_data(bufferlist& bl)
{
- Mutex::Locker req_locker(get_req_lock());
- Mutex::Locker wl(write_lock);
+ std::scoped_lock locker{get_req_lock(), write_lock};
outbl.claim_append(bl);
_set_write_paused(false);
}
uint64_t RGWHTTPStreamRWRequest::get_pending_send_size()
{
- Mutex::Locker wl(write_lock);
+ std::lock_guard wl{write_lock};
return outbl.length();
}
void RGWHTTPStreamRWRequest::finish_write()
{
- Mutex::Locker req_locker(get_req_lock());
- Mutex::Locker wl(write_lock);
+ std::scoped_lock locker{get_req_lock(), write_lock};
write_stream_complete = true;
_set_write_paused(false);
}
uint64_t out_len;
uint64_t send_size;
{
- Mutex::Locker wl(write_lock);
+ std::lock_guard wl{write_lock};
if (outbl.length() == 0) {
if ((stream_writes && !write_stream_complete) ||
class ReceiveCB;
private:
- Mutex lock;
- Mutex write_lock;
+ ceph::mutex lock =
+ ceph::make_mutex("RGWHTTPStreamRWRequest");
+ ceph::mutex write_lock =
+ ceph::make_mutex("RGWHTTPStreamRWRequest::write_lock");
ReceiveCB *cb{nullptr};
RGWWriteDrainCB *write_drain_cb{nullptr};
bufferlist outbl;
};
RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url,
- param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params),
- lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock") {
+ param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params) {
}
RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, ReceiveCB *_cb,
param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params),
- lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock"), cb(_cb) {
+ cb(_cb) {
}
virtual ~RGWHTTPStreamRWRequest() override {}
// retry the operation until it succeeds
while (true) {
yield {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
cr = alloc_cr();
cr->get();
call(cr);
}
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
cr->put();
cr = NULL;
}
shard_objs[i] = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, sync_env.shard_obj_name(i));
}
- RWLock::WLocker wl(ts_to_shard_lock);
+ std::unique_lock wl{ts_to_shard_lock};
for (int i = 0; i < num_shards; i++) {
clone_markers.push_back(string());
utime_shard ut;
string raw_key;
rgw_mdlog_entry mdlog_entry;
- Mutex inc_lock;
- Cond inc_cond;
+ ceph::mutex inc_lock = ceph::make_mutex("RGWMetaSyncShardCR::inc_lock");
+ ceph::condition_variable inc_cond;
boost::asio::coroutine incremental_cr;
boost::asio::coroutine full_cr;
: RGWCoroutine(_sync_env->cct), sync_env(_sync_env), pool(_pool),
period(period), realm_epoch(realm_epoch), mdlog(mdlog),
shard_id(_shard_id), sync_marker(_marker),
- period_marker(period_marker), inc_lock("RGWMetaSyncShardCR::inc_lock"),
+ period_marker(period_marker),
reset_backoff(_reset_backoff), tn(_tn) {
*reset_backoff = false;
}
class RGWBackoffControlCR : public RGWCoroutine
{
RGWCoroutine *cr;
- Mutex lock;
+ ceph::mutex lock;
RGWSyncBackoff backoff;
bool reset_backoff;
return &reset_backoff;
}
- Mutex& cr_lock() {
+ ceph::mutex& cr_lock() {
return lock;
}
}
public:
- RGWBackoffControlCR(CephContext *_cct, bool _exit_on_error) : RGWCoroutine(_cct), cr(NULL), lock("RGWBackoffControlCR::lock:" + stringify(this)),
- reset_backoff(false), exit_on_error(_exit_on_error) {
+ RGWBackoffControlCR(CephContext *_cct, bool _exit_on_error)
+ : RGWCoroutine(_cct),
+ cr(nullptr),
+ lock(ceph::make_mutex("RGWBackoffControlCR::lock:" + stringify(this))),
+ reset_backoff(false), exit_on_error(_exit_on_error) {
}
~RGWBackoffControlCR() override {
}
};
- RWLock ts_to_shard_lock;
+ ceph::shared_mutex ts_to_shard_lock = ceph::make_shared_mutex("ts_to_shard_lock");
map<utime_shard, int> ts_to_shard;
vector<string> clone_markers;
public:
RGWMetaSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
- : store(_store), master_log(this, store, async_rados, this),
- ts_to_shard_lock("ts_to_shard_lock") {}
+ : store(_store), master_log(this, store, async_rados, this)
+ {}
int init();
int read_sync_status(rgw_meta_sync_status *sync_status) {
class RGWSyncModulesManager {
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("RGWSyncModulesManager");
map<string, RGWSyncModuleRef> modules;
public:
- RGWSyncModulesManager() : lock("RGWSyncModulesManager") {}
+ RGWSyncModulesManager() = default;
void register_module(const string& name, RGWSyncModuleRef& module, bool is_default = false) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
modules[name] = module;
if (is_default) {
modules[string()] = module;
}
bool get_module(const string& name, RGWSyncModuleRef *module) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
auto iter = modules.find(name);
if (iter == modules.end()) {
return false;
#include <atomic>
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "common/shunique_lock.h"
#include "common/admin_socket.h"
uint16_t state{0};
std::string status;
- Mutex lock{"RGWSyncTraceNode::lock"};
+ ceph::mutex lock = ceph::make_mutex("RGWSyncTraceNode::lock");
std::string type;
std::string id;
void RGWSI_Notify::add_watcher(int i)
{
ldout(cct, 20) << "add_watcher() i=" << i << dendl;
- RWLock::WLocker l(watchers_lock);
+ std::unique_lock l{watchers_lock};
watchers_set.insert(i);
if (watchers_set.size() == (size_t)num_watchers) {
ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
void RGWSI_Notify::remove_watcher(int i)
{
ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
- RWLock::WLocker l(watchers_lock);
+ std::unique_lock l{watchers_lock};
size_t orig_size = watchers_set.size();
watchers_set.erase(i);
if (orig_size == (size_t)num_watchers &&
uint64_t notifier_id,
bufferlist& bl)
{
- RWLock::RLocker l(watchers_lock);
+ std::shared_lock l{watchers_lock};
if (cb) {
return cb->watch_cb(notify_id, cookie, notifier_id, bl);
}
void RGWSI_Notify::set_enabled(bool status)
{
- RWLock::WLocker l(watchers_lock);
+ std::unique_lock l{watchers_lock};
_set_enabled(status);
}
void RGWSI_Notify::register_watch_cb(CB *_cb)
{
- RWLock::WLocker l(watchers_lock);
+ std::unique_lock l{watchers_lock};
cb = _cb;
_set_enabled(enabled);
}
RGWSI_RADOS *rados_svc{nullptr};
RGWSI_Finisher *finisher_svc{nullptr};
- RWLock watchers_lock{"watchers_lock"};
+ ceph::shared_mutex watchers_lock = ceph::make_shared_mutex("watchers_lock");
rgw_pool control_pool;
int num_watchers{0};
}
boost::optional<T> find(const string& key) {
- RWLock::RLocker rl(lock);
+ std::shared_lock rl{lock};
auto iter = entries.find(key);
if (iter == entries.end()) {
return boost::none;
void chain_cb(const string& key, void *data) override {
T *entry = static_cast<T *>(data);
- RWLock::WLocker wl(lock);
+ std::unique_lock wl{lock};
entries[key].first = *entry;
if (expiry.count() > 0) {
entries[key].second = ceph::coarse_mono_clock::now();
}
void invalidate(const string& key) override {
- RWLock::WLocker wl(lock);
+ std::unique_lock wl{lock};
entries.erase(key);
}
void invalidate_all() override {
- RWLock::WLocker wl(lock);
+ std::unique_lock wl{lock};
entries.clear();
}
}; /* RGWChainedCacheImpl */
class RGWSysObjectCtxBase {
std::map<rgw_raw_obj, RGWSysObjState> objs_state;
- RWLock lock;
+ ceph::shared_mutex lock = ceph::make_shared_mutex("RGWSysObjectCtxBase");
public:
- explicit RGWSysObjectCtxBase() : lock("RGWSysObjectCtxBase") {}
+ RGWSysObjectCtxBase() = default;
- RGWSysObjectCtxBase(const RGWSysObjectCtxBase& rhs) : objs_state(rhs.objs_state),
- lock("RGWSysObjectCtxBase") {}
- RGWSysObjectCtxBase(const RGWSysObjectCtxBase&& rhs) : objs_state(std::move(rhs.objs_state)),
- lock("RGWSysObjectCtxBase") {}
+ RGWSysObjectCtxBase(const RGWSysObjectCtxBase& rhs) : objs_state(rhs.objs_state) {}
+ RGWSysObjectCtxBase(const RGWSysObjectCtxBase&& rhs) : objs_state(std::move(rhs.objs_state)) {}
RGWSysObjState *get_state(const rgw_raw_obj& obj) {
RGWSysObjState *result;
std::map<rgw_raw_obj, RGWSysObjState>::iterator iter;
- lock.get_read();
+ lock.lock_shared();
assert (!obj.empty());
iter = objs_state.find(obj);
if (iter != objs_state.end()) {
result = &iter->second;
- lock.unlock();
+ lock.unlock_shared();
} else {
- lock.unlock();
- lock.get_write();
+ lock.unlock_shared();
+ lock.lock();
result = &objs_state[obj];
lock.unlock();
}
}
void set_prefetch_data(rgw_raw_obj& obj) {
- RWLock::WLocker wl(lock);
+ std::unique_lock wl{lock};
assert (!obj.empty());
objs_state[obj].prefetch_data = true;
}
void invalidate(const rgw_raw_obj& obj) {
- RWLock::WLocker wl(lock);
+ std::unique_lock wl{lock};
auto iter = objs_state.find(obj);
if (iter == objs_state.end()) {
return;