int AioCompletion::wait_for_complete() {
tracepoint(librbd, aio_wait_for_complete_enter, this);
- lock.Lock();
- while (state != AIO_STATE_COMPLETE)
- cond.Wait(lock);
- lock.Unlock();
+ {
+ std::unique_lock<std::mutex> locker(lock);
+ while (state != AIO_STATE_COMPLETE) {
+ cond.wait(locker);
+ }
+ }
tracepoint(librbd, aio_wait_for_complete_exit, 0);
return 0;
}
-void AioCompletion::finalize(ssize_t rval)
-{
- ceph_assert(lock.is_locked());
+void AioCompletion::finalize() {
ceph_assert(ictx != nullptr);
CephContext *cct = ictx->cct;
- ldout(cct, 20) << "r=" << rval << dendl;
- if (rval >= 0 && aio_type == AIO_TYPE_READ) {
+ // finalize any pending error results since we won't be
+ // atomically incrementing rval anymore
+ int err_r = error_rval;
+ if (err_r < 0) {
+ rval = err_r;
+ }
+
+ ssize_t r = rval;
+ ldout(cct, 20) << "r=" << r << dendl;
+ if (r >= 0 && aio_type == AIO_TYPE_READ) {
read_result.assemble_result(cct);
}
}
void AioCompletion::complete() {
- ceph_assert(lock.is_locked());
ceph_assert(ictx != nullptr);
CephContext *cct = ictx->cct;
- tracepoint(librbd, aio_complete_enter, this, rval);
+ ssize_t r = rval;
+ tracepoint(librbd, aio_complete_enter, this, r);
if (ictx->perfcounter != nullptr) {
ceph::timespan elapsed = coarse_mono_clock::now() - start_time;
switch (aio_type) {
}
if ((aio_type == AIO_TYPE_CLOSE) ||
- (aio_type == AIO_TYPE_OPEN && rval < 0)) {
+ (aio_type == AIO_TYPE_OPEN && r < 0)) {
// must destroy ImageCtx prior to invoking callback
delete ictx;
ictx = nullptr;
state = AIO_STATE_CALLBACK;
if (complete_cb) {
- lock.Unlock();
complete_cb(rbd_comp, complete_arg);
- lock.Lock();
}
if (ictx != nullptr && event_notify && ictx->event_socket.is_valid()) {
ictx->completed_reqs_lock.Unlock();
ictx->event_socket.notify();
}
-
state = AIO_STATE_COMPLETE;
- cond.Signal();
+
+ {
+ std::unique_lock<std::mutex> locker(lock);
+ cond.notify_all();
+ }
// note: possible for image to be closed after op marked finished
if (async_op.started()) {
}
void AioCompletion::init_time(ImageCtx *i, aio_type_t t) {
- Mutex::Locker locker(lock);
if (ictx == nullptr) {
ictx = i;
aio_type = t;
}
void AioCompletion::start_op() {
- Mutex::Locker locker(lock);
ceph_assert(ictx != nullptr);
if (aio_type == AIO_TYPE_OPEN || aio_type == AIO_TYPE_CLOSE) {
void AioCompletion::fail(int r)
{
- lock.Lock();
ceph_assert(ictx != nullptr);
CephContext *cct = ictx->cct;
ceph_assert(pending_count == 0);
rval = r;
complete();
- put_unlock();
+ put();
}
void AioCompletion::set_request_count(uint32_t count) {
- lock.Lock();
ceph_assert(ictx != nullptr);
CephContext *cct = ictx->cct;
- ldout(cct, 20) << "pending=" << count << dendl;
- ceph_assert(pending_count == 0);
-
- if (count > 0) {
- pending_count = count;
- lock.Unlock();
- } else {
- pending_count = 1;
- lock.Unlock();
+ uint32_t previous_pending_count = pending_count.exchange(
+ count == 0 ? 1 : count);
+ ceph_assert(previous_pending_count == 0);
+ ldout(cct, 20) << "pending=" << count << dendl;
+ if (count == 0) {
// ensure completion fires in clean lock context
ictx->op_work_queue->queue(new C_AioRequest(this), 0);
+ return;
}
}
void AioCompletion::complete_request(ssize_t r)
{
- lock.Lock();
+ uint32_t previous_pending_count = pending_count--;
+ ceph_assert(previous_pending_count > 0);
+ auto pending_count = previous_pending_count - 1;
+
ceph_assert(ictx != nullptr);
CephContext *cct = ictx->cct;
- if (rval >= 0) {
- if (r < 0 && r != -EEXIST)
- rval = r;
- else if (r > 0)
- rval += r;
+ if (r > 0) {
+ rval += r;
+ } else if (r != -EEXIST) {
+ // might race w/ another thread setting an error code but
+ // first one wins
+ int zero = 0;
+ error_rval.compare_exchange_strong(zero, r);
}
- ceph_assert(pending_count);
- int count = --pending_count;
ldout(cct, 20) << "cb=" << complete_cb << ", "
<< "pending=" << pending_count << dendl;
- if (!count) {
- finalize(rval);
+ if (pending_count == 0) {
+ finalize();
complete();
}
- put_unlock();
+ put();
}
bool AioCompletion::is_complete() {
tracepoint(librbd, aio_is_complete_enter, this);
- bool done;
- {
- Mutex::Locker l(lock);
- done = this->state == AIO_STATE_COMPLETE;
- }
+ bool done = (this->state != AIO_STATE_PENDING);
tracepoint(librbd, aio_is_complete_exit, done);
return done;
}
ssize_t AioCompletion::get_return_value() {
tracepoint(librbd, aio_get_return_value_enter, this);
- lock.Lock();
ssize_t r = rval;
- lock.Unlock();
tracepoint(librbd, aio_get_return_value_exit, r);
return r;
}
#ifndef CEPH_LIBRBD_IO_AIO_COMPLETION_H
#define CEPH_LIBRBD_IO_AIO_COMPLETION_H
-#include "common/Cond.h"
-#include "common/Mutex.h"
#include "common/ceph_time.h"
#include "include/Context.h"
#include "include/utime.h"
#include "librbd/io/ReadResult.h"
#include "librbd/io/Types.h"
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+
class CephContext;
namespace librbd {
AIO_STATE_COMPLETE,
} aio_state_t;
- mutable Mutex lock;
- Cond cond;
- aio_state_t state;
- ssize_t rval;
- callback_t complete_cb;
- void *complete_arg;
- rbd_completion_t rbd_comp;
- uint32_t pending_count; ///< number of requests
- int ref;
- bool released;
- ImageCtx *ictx;
+ mutable std::mutex lock;
+ std::condition_variable cond;
+
+ callback_t complete_cb = nullptr;
+ void *complete_arg = nullptr;
+ rbd_completion_t rbd_comp = nullptr;
+
+ /// note: only using atomic for built-in memory barrier
+ std::atomic<aio_state_t> state{AIO_STATE_PENDING};
+
+ std::atomic<ssize_t> rval{0};
+ std::atomic<int> error_rval{0};
+ std::atomic<uint32_t> ref{1};
+ std::atomic<uint32_t> pending_count{0}; ///< number of requests
+ std::atomic<bool> released{false};
+
+ ImageCtx *ictx = nullptr;
coarse_mono_time start_time;
- aio_type_t aio_type;
+ aio_type_t aio_type = AIO_TYPE_NONE;
ReadResult read_result;
AsyncOperation async_op;
xlist<AioCompletion*>::item m_xlist_item;
- bool event_notify;
+ bool event_notify = false;
template <typename T, void (T::*MF)(int)>
static void callback_adapter(completion_t cb, void *arg) {
return comp;
}
- AioCompletion() : lock("AioCompletion::lock", true, false),
- state(AIO_STATE_PENDING), rval(0), complete_cb(NULL),
- complete_arg(NULL), rbd_comp(NULL),
- pending_count(0), ref(1), released(false), ictx(NULL),
- aio_type(AIO_TYPE_NONE), m_xlist_item(this),
- event_notify(false) {
+ AioCompletion() : m_xlist_item(this) {
}
~AioCompletion() {
int wait_for_complete();
- void finalize(ssize_t rval);
+ void finalize();
inline bool is_initialized(aio_type_t type) const {
- Mutex::Locker locker(lock);
+ std::unique_lock<std::mutex> locker(lock);
return ((ictx != nullptr) && (aio_type == type));
}
inline bool is_started() const {
- Mutex::Locker locker(lock);
+ std::unique_lock<std::mutex> locker(lock);
return async_op.started();
}
void set_request_count(uint32_t num);
void add_request() {
- lock.Lock();
ceph_assert(pending_count > 0);
- lock.Unlock();
get();
}
void complete_request(ssize_t r);
ssize_t get_return_value();
void get() {
- lock.Lock();
ceph_assert(ref > 0);
- ref++;
- lock.Unlock();
+ ++ref;
}
void release() {
- lock.Lock();
- ceph_assert(!released);
- released = true;
- put_unlock();
+ bool previous_released = released.exchange(true);
+ ceph_assert(!previous_released);
+ put();
}
void put() {
- lock.Lock();
- put_unlock();
- }
- void put_unlock() {
- ceph_assert(ref > 0);
- int n = --ref;
- lock.Unlock();
- if (!n) {
+ uint32_t previous_ref = ref--;
+ ceph_assert(previous_ref > 0);
+
+ if (previous_ref == 1) {
if (ictx != nullptr && event_notify) {
ictx->completed_reqs_lock.Lock();
m_xlist_item.remove_myself();
}
void set_event_notify(bool s) {
- Mutex::Locker l(lock);
event_notify = s;
}