#ifndef CEPH_LIBRADOS_AIOCOMPLETIONIMPL_H
#define CEPH_LIBRADOS_AIOCOMPLETIONIMPL_H
-#include "common/Cond.h"
-#include "common/Mutex.h"
-
+#include "common/ceph_mutex.h"
#include "include/buffer.h"
#include "include/xlist.h"
#include "osd/osd_types.h"
class IoCtxImpl;
struct librados::AioCompletionImpl {
- Mutex lock;
- Cond cond;
- int ref, rval;
- bool released;
- bool complete;
- version_t objver;
- ceph_tid_t tid;
+ ceph::mutex lock = ceph::make_mutex("AioCompletionImpl lock", false);
+ ceph::condition_variable cond;
+ int ref = 1, rval = 0;
+ bool released = false;
+ bool complete = false;
+ version_t objver = 0;
+ ceph_tid_t tid = 0;
- rados_callback_t callback_complete, callback_safe;
- void *callback_complete_arg, *callback_safe_arg;
+ rados_callback_t callback_complete = nullptr, callback_safe = nullptr;
+ void *callback_complete_arg = nullptr, *callback_safe_arg = nullptr;
// for read
- bool is_read;
+ bool is_read = false;
bufferlist bl;
- bufferlist *blp;
- char *out_buf;
+ bufferlist *blp = nullptr;
+ char *out_buf = nullptr;
- IoCtxImpl *io;
- ceph_tid_t aio_write_seq;
+ IoCtxImpl *io = nullptr;
+ ceph_tid_t aio_write_seq = 0;
xlist<AioCompletionImpl*>::item aio_write_list_item;
- AioCompletionImpl() : lock("AioCompletionImpl lock", false, false),
- ref(1), rval(0), released(false),
- complete(false),
- objver(0),
- tid(0),
- callback_complete(0),
- callback_safe(0),
- callback_complete_arg(0),
- callback_safe_arg(0),
- is_read(false), blp(nullptr), out_buf(nullptr),
- io(NULL), aio_write_seq(0), aio_write_list_item(this) { }
+ AioCompletionImpl() : aio_write_list_item(this) { }
int set_complete_callback(void *cb_arg, rados_callback_t cb) {
- lock.Lock();
+ std::scoped_lock l{lock};
callback_complete = cb;
callback_complete_arg = cb_arg;
- lock.Unlock();
return 0;
}
int set_safe_callback(void *cb_arg, rados_callback_t cb) {
- lock.Lock();
+ std::scoped_lock l{lock};
callback_safe = cb;
callback_safe_arg = cb_arg;
- lock.Unlock();
return 0;
}
int wait_for_complete() {
- lock.Lock();
- while (!complete)
- cond.Wait(lock);
- lock.Unlock();
+ std::unique_lock l{lock};
+ cond.wait(l, [this] { return complete; });
return 0;
}
int wait_for_safe() {
return wait_for_complete();
}
int is_complete() {
- lock.Lock();
- int r = complete;
- lock.Unlock();
- return r;
+ std::scoped_lock l{lock};
+ return complete;
}
int is_safe() {
return is_complete();
}
int wait_for_complete_and_cb() {
- lock.Lock();
- while (!complete || callback_complete || callback_safe)
- cond.Wait(lock);
- lock.Unlock();
+ std::unique_lock l{lock};
+ cond.wait(l, [this] { return complete && !callback_complete && !callback_safe; });
return 0;
}
int wait_for_safe_and_cb() {
return wait_for_complete_and_cb();
}
int is_complete_and_cb() {
- lock.Lock();
- int r = complete && !callback_complete && !callback_safe;
- lock.Unlock();
- return r;
+ std::scoped_lock l{lock};
+ return complete && !callback_complete && !callback_safe;
}
int is_safe_and_cb() {
return is_complete_and_cb();
}
int get_return_value() {
- lock.Lock();
- int r = rval;
- lock.Unlock();
- return r;
+ std::scoped_lock l{lock};
+ return rval;
}
uint64_t get_version() {
- lock.Lock();
- version_t v = objver;
- lock.Unlock();
- return v;
+ std::scoped_lock l{lock};
+ return objver;
}
void get() {
- lock.Lock();
+ std::scoped_lock l{lock};
_get();
- lock.Unlock();
}
void _get() {
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ceph_assert(ref > 0);
++ref;
}
void release() {
- lock.Lock();
+ lock.lock();
ceph_assert(!released);
released = true;
put_unlock();
}
void put() {
- lock.Lock();
+ lock.lock();
put_unlock();
}
void put_unlock() {
ceph_assert(ref > 0);
int n = --ref;
- lock.Unlock();
+ lock.unlock();
if (!n)
delete this;
}
if (cb_safe)
cb_safe(c, cb_safe_arg);
- c->lock.Lock();
+ c->lock.lock();
c->callback_complete = NULL;
c->callback_safe = NULL;
- c->cond.Signal();
+ c->cond.notify_all();
c->put_unlock();
}
};
}
void finish(int r) override {
- c->lock.Lock();
+ c->lock.lock();
c->rval = r;
c->complete = true;
- c->lock.Unlock();
+ c->lock.unlock();
rados_callback_t cb_complete = c->callback_complete;
void *cb_complete_arg = c->callback_complete_arg;
if (cb_safe)
cb_safe(c, cb_safe_arg);
- c->lock.Lock();
+ c->lock.lock();
c->callback_complete = NULL;
c->callback_safe = NULL;
- c->cond.Signal();
+ c->cond.notify_all();
c->put_unlock();
}
};
c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter,
linger_op));
- c->lock.Lock();
+ c->lock.lock();
c->rval = r;
c->complete = true;
- c->cond.Signal();
+ c->cond.notify_all();
if (c->callback_complete ||
c->callback_safe) {
};
struct C_aio_notify_Complete : public C_aio_linger_Complete {
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("C_aio_notify_Complete::lock");
bool acked = false;
bool finished = false;
int ret_val = 0;
C_aio_notify_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op)
- : C_aio_linger_Complete(_c, _linger_op, false),
- lock("C_aio_notify_Complete::lock") {
+ : C_aio_linger_Complete(_c, _linger_op, false) {
}
void handle_ack(int r) {
// invoked by C_aio_notify_Ack
- lock.Lock();
+ lock.lock();
acked = true;
complete_unlock(r);
}
void complete(int r) override {
// invoked by C_notify_Finish
- lock.Lock();
+ lock.lock();
finished = true;
complete_unlock(r);
}
}
if (acked && finished) {
- lock.Unlock();
+ lock.unlock();
cancel = true;
C_aio_linger_Complete::complete(ret_val);
} else {
- lock.Unlock();
+ lock.unlock();
}
}
};
}
void finish(int r) override {
- c->lock.Lock();
+ c->lock.lock();
c->rval = r;
c->complete = true;
- c->cond.Signal();
+ c->cond.notify_all();
if (c->callback_complete || c->callback_safe) {
client->finisher.queue(new librados::C_AioComplete(c));
} // anonymous namespace
} // namespace librados
-librados::IoCtxImpl::IoCtxImpl() :
- ref_cnt(0), client(NULL), poolid(0), assert_ver(0), last_objver(0),
- notify_timeout(30), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
- aio_write_seq(0), objecter(NULL)
-{
-}
+librados::IoCtxImpl::IoCtxImpl() = default;
librados::IoCtxImpl::IoCtxImpl(RadosClient *c, Objecter *objecter,
int64_t poolid, snapid_t s)
- : ref_cnt(0), client(c), poolid(poolid), snap_seq(s),
- assert_ver(0), last_objver(0),
+ : client(c), poolid(poolid), snap_seq(s),
notify_timeout(c->cct->_conf->client_notify_timeout),
- oloc(poolid), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
+ oloc(poolid),
aio_write_seq(0), objecter(objecter)
{
}
void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c)
{
get();
- aio_write_list_lock.Lock();
+ std::scoped_lock l{aio_write_list_lock};
ceph_assert(c->io == this);
c->aio_write_seq = ++aio_write_seq;
ldout(client->cct, 20) << "queue_aio_write " << this << " completion " << c
<< " write_seq " << aio_write_seq << dendl;
aio_write_list.push_back(&c->aio_write_list_item);
- aio_write_list_lock.Unlock();
}
void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c)
{
ldout(client->cct, 20) << "complete_aio_write " << c << dendl;
- aio_write_list_lock.Lock();
+ aio_write_list_lock.lock();
ceph_assert(c->io == this);
c->aio_write_list_item.remove_myself();
aio_write_waiters.erase(waiters++);
}
- aio_write_cond.Signal();
- aio_write_list_lock.Unlock();
+ aio_write_cond.notify_all();
+ aio_write_list_lock.unlock();
put();
}
void librados::IoCtxImpl::flush_aio_writes()
{
ldout(client->cct, 20) << "flush_aio_writes" << dendl;
- aio_write_list_lock.Lock();
- ceph_tid_t seq = aio_write_seq;
- while (!aio_write_list.empty() &&
- aio_write_list.front()->aio_write_seq <= seq)
- aio_write_cond.Wait(aio_write_list_lock);
- aio_write_list_lock.Unlock();
+ std::unique_lock l{aio_write_list_lock};
+ aio_write_cond.wait(l, [seq=aio_write_seq, this] {
+ return (aio_write_list.empty() ||
+ aio_write_list.front()->aio_write_seq > seq);
+ });
}
string librados::IoCtxImpl::get_cached_pool_name()
int reply;
string sName(snapName);
- Mutex mylock ("IoCtxImpl::snap_create::mylock");
- Cond cond;
+ ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::snap_create::mylock");
+ ceph::condition_variable cond;
bool done;
- Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
+ Context *onfinish = new C_SafeCond(mylock, cond, &done, &reply);
reply = objecter->create_pool_snap(poolid, sName, onfinish);
if (reply < 0) {
delete onfinish;
} else {
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done; });
}
return reply;
}
{
int reply;
- Mutex mylock("IoCtxImpl::selfmanaged_snap_create::mylock");
- Cond cond;
+ ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::selfmanaged_snap_create::mylock");
+ ceph::condition_variable cond;
bool done;
- Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
+ Context *onfinish = new C_SafeCond(mylock, cond, &done, &reply);
snapid_t snapid;
reply = objecter->allocate_selfmanaged_snap(poolid, &snapid, onfinish);
if (reply < 0) {
delete onfinish;
} else {
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
+ {
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done; });
+ }
if (reply == 0)
*psnapid = snapid;
}
int reply;
string sName(snapName);
- Mutex mylock ("IoCtxImpl::snap_remove::mylock");
- Cond cond;
+ ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::snap_remove::mylock");
+ ceph::condition_variable cond;
bool done;
- Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
+ Context *onfinish = new C_SafeCond(mylock, cond, &done, &reply);
reply = objecter->delete_pool_snap(poolid, sName, onfinish);
if (reply < 0) {
delete onfinish;
} else {
- mylock.Lock();
- while(!done)
- cond.Wait(mylock);
- mylock.Unlock();
+ unique_lock l{mylock};
+ cond.wait(l, [&done] { return done; });
}
return reply;
}
{
int reply;
- Mutex mylock("IoCtxImpl::snap_rollback::mylock");
- Cond cond;
+ ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::snap_rollback::mylock");
+ ceph::condition_variable cond;
bool done;
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &reply);
+ Context *onack = new C_SafeCond(mylock, cond, &done, &reply);
::ObjectOperation op;
prepare_assert_ops(&op);
op, snapc, ceph::real_clock::now(), 0,
onack, NULL);
- mylock.Lock();
- while (!done) cond.Wait(mylock);
- mylock.Unlock();
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done; });
return reply;
}
{
int reply;
- Mutex mylock("IoCtxImpl::selfmanaged_snap_remove::mylock");
- Cond cond;
+ ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::selfmanaged_snap_remove::mylock");
+ ceph::condition_variable cond;
bool done;
objecter->delete_selfmanaged_snap(poolid, snapid_t(snapid),
- new C_SafeCond(&mylock, &cond, &done, &reply));
+ new C_SafeCond(mylock, cond, &done, &reply));
- mylock.Lock();
- while (!done) cond.Wait(mylock);
- mylock.Unlock();
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done; });
return (int)reply;
}
int librados::IoCtxImpl::nlist(Objecter::NListContext *context, int max_entries)
{
- Cond cond;
bool done;
int r = 0;
- Mutex mylock("IoCtxImpl::nlist::mylock");
+ ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::nlist::mylock");
+ ceph::condition_variable cond;
if (context->at_end())
return 0;
context->max_entries = max_entries;
context->nspace = oloc.nspace;
- objecter->list_nobjects(context, new C_SafeCond(&mylock, &cond, &done, &r));
-
- mylock.Lock();
- while(!done)
- cond.Wait(mylock);
- mylock.Unlock();
+ objecter->list_nobjects(context, new C_SafeCond(mylock, cond, &done, &r));
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done; });
return r;
}
if (!o->size())
return 0;
- Mutex mylock("IoCtxImpl::operate::mylock");
- Cond cond;
+ ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::operate::mylock");
+ ceph::condition_variable cond;
bool done;
int r;
version_t ver;
- Context *oncommit = new C_SafeCond(&mylock, &cond, &done, &r);
+ Context *oncommit = new C_SafeCond(mylock, cond, &done, &r);
int op = o->ops[0].op.op;
ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid
oncommit, &ver);
objecter->op_submit(objecter_op);
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
+ {
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done;});
+ }
ldout(client->cct, 10) << "Objecter returned from "
<< ceph_osd_op_name(op) << " r=" << r << dendl;
if (!o->size())
return 0;
- Mutex mylock("IoCtxImpl::operate_read::mylock");
- Cond cond;
+ ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::operate_read::mylock");
+ ceph::condition_variable cond;
bool done;
int r;
version_t ver;
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ Context *onack = new C_SafeCond(mylock, cond, &done, &r);
int op = o->ops[0].op.op;
ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid << " nspace=" << oloc.nspace << dendl;
onack, &ver);
objecter->op_submit(objecter_op);
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
+ {
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done; });
+ }
ldout(client->cct, 10) << "Objecter returned from "
<< ceph_osd_op_name(op) << " r=" << r << dendl;
{
bufferlist bl;
- Mutex mylock("IoCtxImpl::read::mylock");
- Cond cond;
+ ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::read::mylock");
+ ceph::condition_variable cond;
bool done;
int r;
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ Context *onack = new C_SafeCond(mylock, cond, &done, &r);
objecter->mapext(oid, oloc,
off, len, snap_seq, &bl, 0,
onack);
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
+ {
+ unique_lock l{mylock};
+ cond.wait(l, [&done] { return done;});
+ }
ldout(client->cct, 10) << "Objecter returned from read r=" << r << dendl;
if (r < 0)
void librados::IoCtxImpl::C_aio_stat_Ack::finish(int r)
{
- c->lock.Lock();
+ c->lock.lock();
c->rval = r;
c->complete = true;
- c->cond.Signal();
+ c->cond.notify_all();
if (r >= 0 && pmtime) {
*pmtime = real_clock::to_time_t(mtime);
void librados::IoCtxImpl::C_aio_stat2_Ack::finish(int r)
{
- c->lock.Lock();
+ c->lock.lock();
c->rval = r;
c->complete = true;
- c->cond.Signal();
+ c->cond.notify_all();
if (r >= 0 && pts) {
*pts = real_clock::to_timespec(mtime);
void librados::IoCtxImpl::C_aio_Complete::finish(int r)
{
- c->lock.Lock();
+ c->lock.lock();
// Leave an existing rval unless r != 0
if (r)
c->rval = r; // This clears the error set in C_ObjectOperation_scrub_ls::finish()
c->complete = true;
- c->cond.Signal();
+ c->cond.notify_all();
if (r == 0 && c->blp && c->blp->length() > 0) {
if (c->out_buf && !c->blp->is_contiguous()) {
#include <atomic>
#include "common/Cond.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "common/snap_types.h"
#include "common/zipkin_trace.h"
#include "include/types.h"
struct librados::IoCtxImpl {
std::atomic<uint64_t> ref_cnt = { 0 };
- RadosClient *client;
- int64_t poolid;
+ RadosClient *client = nullptr;
+ int64_t poolid = 0;
snapid_t snap_seq;
::SnapContext snapc;
- uint64_t assert_ver;
- version_t last_objver;
- uint32_t notify_timeout;
+ uint64_t assert_ver = 0;
+ version_t last_objver = 0;
+ uint32_t notify_timeout = 30;
object_locator_t oloc;
- Mutex aio_write_list_lock;
- ceph_tid_t aio_write_seq;
- Cond aio_write_cond;
+ ceph::mutex aio_write_list_lock =
+ ceph::make_mutex("librados::IoCtxImpl::aio_write_list_lock");
+ ceph_tid_t aio_write_seq = 0;
+ ceph::condition_variable aio_write_cond;
xlist<AioCompletionImpl*> aio_write_list;
map<ceph_tid_t, std::list<AioCompletionImpl*> > aio_write_waiters;
- Objecter *objecter;
+ Objecter *objecter = nullptr;
IoCtxImpl();
IoCtxImpl(RadosClient *c, Objecter *objecter,
#ifndef CEPH_LIBRADOS_POOLASYNCCOMPLETIONIMPL_H
#define CEPH_LIBRADOS_POOLASYNCCOMPLETIONIMPL_H
-#include "common/Cond.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "include/Context.h"
#include "include/rados/librados.h"
#include "include/rados/librados.hpp"
namespace librados {
struct PoolAsyncCompletionImpl {
- Mutex lock;
- Cond cond;
- int ref, rval;
- bool released;
- bool done;
+ ceph::mutex lock = ceph::make_mutex("PoolAsyncCompletionImpl lock");
+ ceph::condition_variable cond;
+ int ref = 1;
+ int rval = 0;
+ bool released = false;
+ bool done = false;
- rados_callback_t callback;
- void *callback_arg;
+ rados_callback_t callback = 0;
+ void *callback_arg = nullptr;;
- PoolAsyncCompletionImpl() : lock("PoolAsyncCompletionImpl lock"),
- ref(1), rval(0), released(false), done(false),
- callback(0), callback_arg(0) {}
+ PoolAsyncCompletionImpl() = default;
int set_callback(void *cb_arg, rados_callback_t cb) {
- lock.Lock();
+ std::scoped_lock l{lock};
callback = cb;
callback_arg = cb_arg;
- lock.Unlock();
return 0;
}
int wait() {
- lock.Lock();
- while (!done)
- cond.Wait(lock);
- lock.Unlock();
+ std::unique_lock l{lock};
+ cond.wait(l, [this] { return done;});
return 0;
}
int is_complete() {
- lock.Lock();
- int r = done;
- lock.Unlock();
- return r;
+ std::scoped_lock l{lock};
+ return done;
}
int get_return_value() {
- lock.Lock();
- int r = rval;
- lock.Unlock();
- return r;
+ std::scoped_lock l{lock};
+ return rval;
}
void get() {
- lock.Lock();
+ std::scoped_lock l{lock};
ceph_assert(ref > 0);
ref++;
- lock.Unlock();
}
void release() {
- lock.Lock();
+ lock.lock();
ceph_assert(!released);
released = true;
put_unlock();
}
void put() {
- lock.Lock();
+ lock.lock();
put_unlock();
}
void put_unlock() {
ceph_assert(ref > 0);
int n = --ref;
- lock.Unlock();
+ lock.unlock();
if (!n)
delete this;
}
}
void finish(int r) override {
- c->lock.Lock();
+ c->lock.lock();
c->rval = r;
c->done = true;
- c->cond.Signal();
+ c->cond.notify_all();
if (c->callback) {
rados_callback_t cb = c->callback;
void *cb_arg = c->callback_arg;
- c->lock.Unlock();
+ c->lock.unlock();
cb(c, cb_arg);
- c->lock.Lock();
+ c->lock.lock();
}
- c->lock.Unlock();
+ c->lock.unlock();
}
};
}
messenger(NULL),
instance_id(0),
objecter(NULL),
- lock("librados::RadosClient::lock"),
timer(cct, lock),
refcnt(1),
log_last_version(0), log_cb(NULL), log_cb2(NULL), log_cb_arg(NULL),
objecter->set_client_incarnation(0);
objecter->start();
- lock.Lock();
+ lock.lock();
timer.init();
state = CONNECTED;
instance_id = monclient.get_global_id();
- lock.Unlock();
+ lock.unlock();
ldout(cct, 1) << "init done" << dendl;
err = 0;
void librados::RadosClient::shutdown()
{
- lock.Lock();
+ std::unique_lock l{lock};
if (state == DISCONNECTED) {
- lock.Unlock();
return;
}
state = DISCONNECTED;
instance_id = 0;
timer.shutdown(); // will drop+retake lock
- lock.Unlock();
+ l.unlock();
if (need_objecter) {
objecter->shutdown();
}
int librados::RadosClient::watch_flush()
{
ldout(cct, 10) << __func__ << " enter" << dendl;
- Mutex mylock("RadosClient::watch_flush::mylock");
- Cond cond;
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::watch_flush::mylock");
+ ceph::condition_variable cond;
bool done;
- objecter->linger_callback_flush(new C_SafeCond(&mylock, &cond, &done));
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
+ objecter->linger_callback_flush(new C_SafeCond(mylock, cond, &done));
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done; });
ldout(cct, 10) << __func__ << " exit" << dendl;
return 0;
}
c->lock.lock();
c->rval = r;
c->complete = true;
- c->cond.Signal();
+ c->cond.notify_all();
if (c->callback_complete ||
c->callback_safe) {
bool librados::RadosClient::_dispatch(Message *m)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
switch (m->get_type()) {
// OSD
case CEPH_MSG_OSD_MAP:
- cond.Signal();
+ cond.notify_all();
m->put();
break;
int librados::RadosClient::wait_for_osdmap()
{
- ceph_assert(!lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_not_locked_by_me(lock));
if (state != CONNECTED) {
return -ENOTCONN;
});
if (need_map) {
- std::lock_guard l(lock);
+ std::unique_lock l(lock);
ceph::timespan timeout{0};
if (cct->_conf->rados_mon_op_timeout > 0) {
ldout(cct, 10) << __func__ << " waiting" << dendl;
while (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
if (timeout == timeout.zero()) {
- cond.Wait(lock);
+ cond.wait(l);
} else {
- int r = cond.WaitInterval(lock, timeout);
- if (r == ETIMEDOUT) {
+ if (cond.wait_for(l, timeout) == std::cv_status::timeout) {
lderr(cct) << "timed out waiting for first osdmap from monitors"
<< dendl;
return -ETIMEDOUT;
int librados::RadosClient::wait_for_latest_osdmap()
{
- Mutex mylock("RadosClient::wait_for_latest_osdmap");
- Cond cond;
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::wait_for_latest_osdmap");
+ ceph::condition_variable cond;
bool done;
- objecter->wait_for_latest_osdmap(new C_SafeCond(&mylock, &cond, &done));
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
+ objecter->wait_for_latest_osdmap(new C_SafeCond(mylock, cond, &done));
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] {return done;});
return 0;
}
map<string,::pool_stat_t> *result,
bool *per_pool)
{
- Mutex mylock("RadosClient::get_pool_stats::mylock");
- Cond cond;
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::get_pool_stats::mylock");
+ ceph::condition_variable cond;
bool done;
int ret = 0;
objecter->get_pool_stats(pools, result, per_pool,
- new C_SafeCond(&mylock, &cond, &done,
+ new C_SafeCond(mylock, cond, &done,
&ret));
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
+ unique_lock l{mylock};
+ cond.wait(l, [&done] { return done;});
return ret;
}
int librados::RadosClient::get_fs_stats(ceph_statfs& stats)
{
- Mutex mylock ("RadosClient::get_fs_stats::mylock");
- Cond cond;
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::get_fs_stats::mylock");
+ ceph::condition_variable cond;
bool done;
int ret = 0;
-
- lock.Lock();
- objecter->get_fs_stats(stats, boost::optional<int64_t> (),
- new C_SafeCond(&mylock, &cond, &done, &ret));
- lock.Unlock();
-
- mylock.Lock();
- while (!done) cond.Wait(mylock);
- mylock.Unlock();
-
+ {
+ std::lock_guard l{mylock};
+ objecter->get_fs_stats(stats, boost::optional<int64_t> (),
+ new C_SafeCond(mylock, cond, &done, &ret));
+ }
+ {
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done;});
+ }
return ret;
}
return r;
}
- Mutex mylock ("RadosClient::pool_create::mylock");
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::pool_create::mylock");
int reply;
- Cond cond;
+ ceph::condition_variable cond;
bool done;
- Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
+ Context *onfinish = new C_SafeCond(mylock, cond, &done, &reply);
reply = objecter->create_pool(name, onfinish, crush_rule);
if (reply < 0) {
delete onfinish;
} else {
- mylock.Lock();
- while(!done)
- cond.Wait(mylock);
- mylock.Unlock();
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done; });
}
return reply;
}
return r;
}
- Mutex mylock("RadosClient::pool_delete::mylock");
- Cond cond;
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::pool_delete::mylock");
+ ceph::condition_variable cond;
bool done;
int ret;
- Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &ret);
+ Context *onfinish = new C_SafeCond(mylock, cond, &done, &ret);
ret = objecter->delete_pool(name, onfinish);
if (ret < 0) {
delete onfinish;
} else {
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done;});
}
return ret;
}
bufferlist *outbl, string *outs,
Context *on_finish)
{
- lock.Lock();
+ std::lock_guard l{lock};
monclient.start_mon_command(cmd, inbl, outbl, outs, on_finish);
- lock.Unlock();
}
int librados::RadosClient::mgr_command(const vector<string>& cmd,
if (r < 0)
return r;
- lock.Unlock();
+ lock.unlock();
if (conf->rados_mon_op_timeout) {
r = cond.wait_for(conf->rados_mon_op_timeout);
} else {
r = cond.wait();
}
- lock.Lock();
+ lock.lock();
return r;
}
const bufferlist &inbl,
bufferlist *outbl, string *outs)
{
- Mutex mylock("RadosClient::mon_command::mylock");
- Cond cond;
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::mon_command::mylock");
+ ceph::condition_variable cond;
bool done;
int rval;
- lock.Lock();
- monclient.start_mon_command(rank, cmd, inbl, outbl, outs,
- new C_SafeCond(&mylock, &cond, &done, &rval));
- lock.Unlock();
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
+ {
+ std::lock_guard l{mylock};
+ monclient.start_mon_command(rank, cmd, inbl, outbl, outs,
+ new C_SafeCond(mylock, cond, &done, &rval));
+ }
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done;});
return rval;
}
const bufferlist &inbl,
bufferlist *outbl, string *outs)
{
- Mutex mylock("RadosClient::mon_command::mylock");
- Cond cond;
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::mon_command::mylock");
+ ceph::condition_variable cond;
bool done;
int rval;
- lock.Lock();
- monclient.start_mon_command(name, cmd, inbl, outbl, outs,
- new C_SafeCond(&mylock, &cond, &done, &rval));
- lock.Unlock();
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
+ {
+ std::lock_guard l{mylock};
+ monclient.start_mon_command(name, cmd, inbl, outbl, outs,
+ new C_SafeCond(mylock, cond, &done, &rval));
+ }
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done;});
return rval;
}
const bufferlist& inbl,
bufferlist *poutbl, string *prs)
{
- Mutex mylock("RadosClient::osd_command::mylock");
- Cond cond;
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::osd_command::mylock");
+ ceph::condition_variable cond;
bool done;
int ret;
ceph_tid_t tid;
if (osd < 0)
return -EINVAL;
- lock.Lock();
- // XXX do anything with tid?
- objecter->osd_command(osd, cmd, inbl, &tid, poutbl, prs,
- new C_SafeCond(&mylock, &cond, &done, &ret));
- lock.Unlock();
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
+ {
+ std::lock_guard l{mylock};
+ // XXX do anything with tid?
+ objecter->osd_command(osd, cmd, inbl, &tid, poutbl, prs,
+ new C_SafeCond(mylock, cond, &done, &ret));
+ }
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done;});
return ret;
}
const bufferlist& inbl,
bufferlist *poutbl, string *prs)
{
- Mutex mylock("RadosClient::pg_command::mylock");
- Cond cond;
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::pg_command::mylock");
+ ceph::condition_variable cond;
bool done;
int ret;
ceph_tid_t tid;
- lock.Lock();
- objecter->pg_command(pgid, cmd, inbl, &tid, poutbl, prs,
- new C_SafeCond(&mylock, &cond, &done, &ret));
- lock.Unlock();
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
+
+ {
+ std::lock_guard l{lock};
+ objecter->pg_command(pgid, cmd, inbl, &tid, poutbl, prs,
+ new C_SafeCond(mylock, cond, &done, &ret));
+ }
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done;});
return ret;
}
void librados::RadosClient::handle_log(MLog *m)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ldout(cct, 10) << __func__ << " version " << m->version << dendl;
if (log_last_version < m->version) {
#include "common/config_fwd.h"
#include "common/Cond.h"
-#include "common/Mutex.h"
-#include "common/RWLock.h"
#include "common/Timer.h"
+#include "common/ceph_mutex.h"
#include "common/ceph_time.h"
#include "include/rados/librados.h"
#include "include/rados/librados.hpp"
Objecter *objecter;
- Mutex lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("librados::RadosClient::lock");
+ ceph::condition_variable cond;
SafeTimer timer;
int refcnt;
rados_callback_t cb = completion->callback_complete;
void *cb_arg = completion->callback_complete_arg;
cb(completion, cb_arg);
- completion->lock.Lock();
+ completion->lock.lock();
completion->callback_complete = NULL;
- completion->cond.Signal();
+ completion->cond.notify_all();
completion->put_unlock();
}
};
void libradosstriper::MultiAioCompletionImpl::complete_request(ssize_t r)
{
- lock.Lock();
+ lock.lock();
if (rval >= 0) {
if (r < 0 && r != -EEXIST)
rval = r;
void libradosstriper::MultiAioCompletionImpl::safe_request(ssize_t r)
{
- lock.Lock();
+ lock.lock();
if (rval >= 0) {
if (r < 0 && r != -EEXIST)
rval = r;
void libradosstriper::MultiAioCompletionImpl::finish_adding_requests()
{
- lock.Lock();
+ std::scoped_lock l{lock};
ceph_assert(building);
building = false;
if (!pending_complete)
complete();
if (!pending_safe)
safe();
- lock.Unlock();
}
void intrusive_ptr_add_ref(libradosstriper::MultiAioCompletionImpl* ptr)
#ifndef CEPH_LIBRADOSSTRIPERSTRIPER_MULTIAIOCOMPLETIONIMPL_H
#define CEPH_LIBRADOSSTRIPERSTRIPER_MULTIAIOCOMPLETIONIMPL_H
-#include "common/Cond.h"
-#include "common/Mutex.h"
-
+#include <list>
+#include <mutex>
+#include "common/ceph_mutex.h"
#include "include/radosstriper/libradosstriper.hpp"
struct libradosstriper::MultiAioCompletionImpl {
- Mutex lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("MultiAioCompletionImpl lock", false);
+ ceph::condition_variable cond;
int ref, rval;
int pending_complete, pending_safe;
rados_callback_t callback_complete, callback_safe;
bufferlist bl; /// only used for read case in C api of rados striper
std::list<bufferlist*> bllist; /// keep temporary buffer lists used for destriping
- MultiAioCompletionImpl() : lock("MultiAioCompletionImpl lock", false, false),
- ref(1), rval(0),
+ MultiAioCompletionImpl()
+ : ref(1), rval(0),
pending_complete(0), pending_safe(0),
callback_complete(0), callback_safe(0),
callback_complete_arg(0), callback_safe_arg(0),
}
int set_complete_callback(void *cb_arg, rados_callback_t cb) {
- lock.Lock();
+ std::scoped_lock l{lock};
callback_complete = cb;
callback_complete_arg = cb_arg;
- lock.Unlock();
return 0;
}
int set_safe_callback(void *cb_arg, rados_callback_t cb) {
- lock.Lock();
+ std::scoped_lock l{lock};
callback_safe = cb;
callback_safe_arg = cb_arg;
- lock.Unlock();
return 0;
}
int wait_for_complete() {
- lock.Lock();
- while (pending_complete)
- cond.Wait(lock);
- lock.Unlock();
+ std::unique_lock l{lock};
+ cond.wait(l, [this] { return !pending_complete; });
return 0;
}
int wait_for_safe() {
- lock.Lock();
- while (pending_safe)
- cond.Wait(lock);
- lock.Unlock();
+ std::unique_lock l{lock};
+ cond.wait(l, [this] { return !pending_safe; });
return 0;
}
bool is_complete() {
- lock.Lock();
- int r = pending_complete;
- lock.Unlock();
- return 0 == r;
+ std::scoped_lock l{lock};
+ return pending_complete == 0;
}
bool is_safe() {
- lock.Lock();
- int r = pending_safe;
- lock.Unlock();
- return r == 0;
+ std::scoped_lock l{lock};
+ return pending_safe == 0;
}
void wait_for_complete_and_cb() {
- lock.Lock();
- while (pending_complete || callback_complete)
- cond.Wait(lock);
- lock.Unlock();
+ std::unique_lock l{lock};
+ cond.wait(l, [this] { return !pending_complete && !callback_complete; });
}
void wait_for_safe_and_cb() {
- lock.Lock();
- while (pending_safe || callback_safe)
- cond.Wait(lock);
- lock.Unlock();
+ std::unique_lock l{lock};
+ cond.wait(l, [this] { return !pending_safe && !callback_safe; });
}
bool is_complete_and_cb() {
- lock.Lock();
- bool r = ((0 == pending_complete) && !callback_complete);
- lock.Unlock();
- return r;
+ std::scoped_lock l{lock};
+ return ((0 == pending_complete) && !callback_complete);
}
bool is_safe_and_cb() {
- lock.Lock();
- int r = ((0 == pending_safe) && !callback_safe);
- lock.Unlock();
- return r;
+ std::scoped_lock l{lock};
+ return ((0 == pending_safe) && !callback_safe);
}
int get_return_value() {
- lock.Lock();
- int r = rval;
- lock.Unlock();
- return r;
+ std::scoped_lock l{lock};
+ return rval;
}
void get() {
- lock.Lock();
+ std::scoped_lock l{lock};
_get();
- lock.Unlock();
}
void _get() {
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ceph_assert(ref > 0);
++ref;
}
void put() {
- lock.Lock();
+ lock.lock();
put_unlock();
}
void put_unlock() {
ceph_assert(ref > 0);
int n = --ref;
- lock.Unlock();
+ lock.unlock();
if (!n)
delete this;
}
void add_request() {
- lock.Lock();
+ std::scoped_lock l{lock};
pending_complete++;
_get();
pending_safe++;
_get();
- lock.Unlock();
}
void add_safe_request() {
- lock.Lock();
+ std::scoped_lock l{lock};
pending_complete++;
_get();
- lock.Unlock();
}
void complete() {
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
if (callback_complete) {
callback_complete(this, callback_complete_arg);
callback_complete = 0;
}
- cond.Signal();
+ cond.notify_all();
}
void safe() {
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
if (callback_safe) {
callback_safe(this, callback_safe_arg);
callback_safe = 0;
}
- cond.Signal();
+ cond.notify_all();
};
void complete_request(ssize_t r);
///////////////////////// constructor /////////////////////////////
libradosstriper::RadosStriperImpl::RadosStriperImpl(librados::IoCtx& ioctx, librados::IoCtxImpl *ioctx_impl) :
- m_refCnt(0),lock("RadosStriper Refcont", false, false), m_radosCluster(ioctx), m_ioCtx(ioctx), m_ioCtxImpl(ioctx_impl),
+ m_refCnt(0), m_radosCluster(ioctx), m_ioCtx(ioctx), m_ioCtxImpl(ioctx_impl),
m_layout(default_file_layout) {}
///////////////////////// layout /////////////////////////////
if (ret < 0)
return ret;
//wait all CompletionData are released
- lock.Lock();
- while (m_refCnt > 1)
- cond.Wait(lock);
- lock.Unlock();
+ std::unique_lock l{lock};
+ cond.wait(l, [this] {return m_refCnt <= 1;});
return ret;
}
// reference counting
void get() {
- lock.Lock();
+ std::lock_guard l{lock};
m_refCnt ++ ;
- lock.Unlock();
}
void put() {
bool deleteme = false;
- lock.Lock();
+ lock.lock();
m_refCnt --;
if (m_refCnt == 0)
deleteme = true;
- cond.Signal();
- lock.Unlock();
+ cond.notify_all();
+ lock.unlock();
if (deleteme)
delete this;
}
}
// reference counting
- Cond cond;
+ std::condition_variable cond;
int m_refCnt;
- Mutex lock;
+ std::mutex lock;
// Context
void finish(int r) override
{
- lock_guard locker{array_lock};
+ std::lock_guard locker{array_lock};
cout << "TestContext " << num << std::endl;
test_array[array_idx++] = num;
}