operations(new Operations<>(*this)),
exclusive_lock(nullptr), object_map(nullptr),
io_work_queue(nullptr), op_work_queue(nullptr),
- external_callback_completions(32),
event_socket_completions(32),
asok_hook(nullptr),
trace_endpoint("librbd")
io::AioCompletion*,
boost::lockfree::allocator<ceph::allocator<void>>> Completions;
- Completions external_callback_completions;
- std::atomic<bool> external_callback_in_progress = {false};
-
Completions event_socket_completions;
EventSocket event_socket;
void AioCompletion::complete_external_callback() {
// ensure librbd external users never experience concurrent callbacks
// from multiple librbd-internal threads.
- ictx->external_callback_completions.push(this);
-
- while (true) {
- if (ictx->external_callback_in_progress.exchange(true)) {
- // another thread is concurrently invoking external callbacks
- break;
- }
-
- AioCompletion* aio_comp;
- while (ictx->external_callback_completions.pop(aio_comp)) {
- aio_comp->complete_cb(aio_comp->rbd_comp, aio_comp->complete_arg);
- aio_comp->complete_event_socket();
- aio_comp->notify_callbacks_complete();
- }
-
- ictx->external_callback_in_progress.store(false);
- if (ictx->external_callback_completions.empty()) {
- // queue still empty implies we didn't have a race between the last failed
- // pop and resetting the in-progress state
- break;
- }
- }
+ get();
+ ictx->op_work_queue->queue(new LambdaContext([this](int r) {
+ complete_cb(rbd_comp, complete_arg);
+ complete_event_socket();
+ notify_callbacks_complete();
+ put();
+ }));
}
void AioCompletion::complete_event_socket() {