async_op.start_op(*ictx);
}
+void AioCompletion::queue_complete() {
+ uint32_t zero = 0;
+ pending_count.compare_exchange_strong(zero, 1);
+ ceph_assert(zero == 0);
+
+ // ensure completion fires in clean lock context
+ ictx->op_work_queue->queue(new C_AioRequest(this), 0);
+}
+
+void AioCompletion::block(CephContext* cct) {
+ ldout(cct, 20) << dendl;
+ ceph_assert(!was_armed);
+
+ get();
+ ++pending_count;
+}
+
+void AioCompletion::unblock(CephContext* cct) {
+ ldout(cct, 20) << dendl;
+ ceph_assert(was_armed);
+
+ uint32_t previous_pending_count = pending_count--;
+ ceph_assert(previous_pending_count > 0);
+
+ if (previous_pending_count == 1) {
+ queue_complete();
+ }
+ put();
+}
+
void AioCompletion::fail(int r)
{
ceph_assert(ictx != nullptr);
CephContext *cct = ictx->cct;
-
lderr(cct) << cpp_strerror(r) << dendl;
- uint32_t previous_pending_count = pending_count.exchange(1);
- ceph_assert(previous_pending_count == 0);
- // ensure completion fires in clean lock context
- ictx->op_work_queue->queue(new C_AioRequest(this), r);
+ ceph_assert(!was_armed);
+ was_armed = true;
+
+ error_rval = r;
+
+ uint32_t previous_pending_count = pending_count.load();
+ if (previous_pending_count == 0) {
+ queue_complete();
+ }
}
void AioCompletion::set_request_count(uint32_t count) {
ceph_assert(ictx != nullptr);
CephContext *cct = ictx->cct;
- uint32_t previous_pending_count = pending_count.exchange(
- count == 0 ? 1 : count);
- ceph_assert(previous_pending_count == 0);
+ ceph_assert(!was_armed);
+ was_armed = true;
ldout(cct, 20) << "pending=" << count << dendl;
- if (count == 0) {
- // ensure completion fires in clean lock context
- ictx->op_work_queue->queue(new C_AioRequest(this), 0);
- return;
+ uint32_t previous_pending_count = pending_count.fetch_add(count);
+ if (previous_pending_count == 0 && count == 0) {
+ queue_complete();
}
}
std::atomic<ssize_t> rval{0};
std::atomic<int> error_rval{0};
std::atomic<uint32_t> ref{1};
- std::atomic<uint32_t> pending_count{0}; ///< number of requests
+ std::atomic<uint32_t> pending_count{0}; ///< number of requests/blocks
std::atomic<bool> released{false};
ImageCtx *ictx = nullptr;
AsyncOperation async_op;
bool event_notify = false;
+ bool was_armed = false;
template <typename T, void (T::*MF)(int)>
static void callback_adapter(completion_t cb, void *arg) {
return async_op.started();
}
+ void block(CephContext* cct);
+ void unblock(CephContext* cct);
+
void init_time(ImageCtx *i, aio_type_t t);
void start_op();
void fail(int r);
void *get_arg() {
return complete_arg;
}
+
+private:
+ void queue_complete();
+
};
class C_AioRequest : public Context {