operations(new Operations<>(*this)),
exclusive_lock(nullptr), object_map(nullptr),
io_work_queue(nullptr), op_work_queue(nullptr),
- completed_reqs(32),
+ external_callback_completions(32),
+ event_socket_completions(32),
asok_hook(nullptr),
trace_endpoint("librbd")
{
ContextWQ *op_work_queue;
- boost::lockfree::queue<
+ typedef boost::lockfree::queue<
io::AioCompletion*,
- boost::lockfree::allocator<ceph::allocator<void>>> completed_reqs;
+ boost::lockfree::allocator<ceph::allocator<void>>> Completions;
+
+ Completions external_callback_completions;
+ std::atomic<bool> external_callback_in_progress = {false};
+
+ Completions event_socket_completions;
EventSocket event_socket;
bool ignore_migrating = false;
ldout(cct, 20) << __func__ << " " << ictx << " numcomp = " << numcomp
<< dendl;
int i = 0;
- while (i < numcomp && ictx->completed_reqs.pop(comps[i])) {
+ while (i < numcomp && ictx->event_socket_completions.pop(comps[i])) {
++i;
}
state = AIO_STATE_CALLBACK;
if (complete_cb) {
- complete_cb(rbd_comp, complete_arg);
+ if (external_callback) {
+ complete_external_callback();
+ } else {
+ complete_cb(rbd_comp, complete_arg);
+ }
}
if (ictx != nullptr && event_notify && ictx->event_socket.is_valid()) {
- ictx->completed_reqs.push(this);
+ ictx->event_socket_completions.push(this);
ictx->event_socket.notify();
}
state = AIO_STATE_COMPLETE;
return r;
}
+void AioCompletion::complete_external_callback() {
+ // ensure librbd external users never experience concurrent callbacks
+ // from multiple librbd-internal threads.
+ ictx->external_callback_completions.push(this);
+
+ while (true) {
+ if (ictx->external_callback_in_progress.exchange(true)) {
+ // another thread is concurrently invoking external callbacks
+ break;
+ }
+
+ AioCompletion* aio_comp;
+ while (ictx->external_callback_completions.pop(aio_comp)) {
+ aio_comp->complete_cb(aio_comp->rbd_comp, aio_comp->complete_arg);
+ }
+
+ ictx->external_callback_in_progress.store(false);
+ if (ictx->external_callback_completions.empty()) {
+ // queue still empty implies we didn't have a race between the last failed
+ // pop and resetting the in-progress state
+ break;
+ }
+ }
+}
+
} // namespace io
} // namespace librbd
bool event_notify = false;
bool was_armed = false;
+ bool external_callback = false;
template <typename T, void (T::*MF)(int)>
static void callback_adapter(completion_t cb, void *arg) {
private:
void queue_complete();
+ void complete_external_callback();
};
RBD::AioCompletion::AioCompletion(void *cb_arg, callback_t complete_cb)
{
- pc = reinterpret_cast<void*>(librbd::io::AioCompletion::create(
- cb_arg, complete_cb, this));
+ auto aio_comp = librbd::io::AioCompletion::create(
+ cb_arg, complete_cb, this);
+ aio_comp->external_callback = true;
+ pc = reinterpret_cast<void*>(aio_comp);
}
bool RBD::AioCompletion::is_complete()