void *AioImageRequestWQ::_void_dequeue() {
AioImageRequest<> *peek_item = front();
- if (peek_item == NULL || m_refresh_in_progress) {
- return NULL;
+
+ // no IO ops available or refresh in-progress (IO stalled)
+ if (peek_item == nullptr || m_refresh_in_progress) {
+ return nullptr;
}
+ bool refresh_required = m_image_ctx.state->is_refresh_required();
{
RWLock::RLocker locker(m_lock);
if (peek_item->is_write_op()) {
if (m_write_blockers > 0) {
- return NULL;
+ return nullptr;
+ }
+
+ // refresh will requeue the op -- don't count it as in-progress
+ if (!refresh_required) {
+ m_in_progress_writes.inc();
}
- m_in_progress_writes.inc();
} else if (m_require_lock_on_read) {
return nullptr;
}
ThreadPool::PointerWQ<AioImageRequest<> >::_void_dequeue());
assert(peek_item == item);
- if (m_image_ctx.state->is_refresh_required()) {
+ if (refresh_required) {
ldout(m_image_ctx.cct, 15) << "image refresh required: delaying IO " << item
<< dendl;
+
+ // stall IO until the refresh completes
m_refresh_in_progress = true;
get_pool_lock().Unlock();
m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
get_pool_lock().Lock();
- return NULL;
+ return nullptr;
}
return item;
}
req->send();
}
+ finish_queued_op(req);
+ if (req->is_write_op()) {
+ finish_in_progress_write();
+ }
+ delete req;
+
+ finish_in_flight_op();
+}
+
+void AioImageRequestWQ::finish_queued_op(AioImageRequest<> *req) {
+ RWLock::RLocker locker(m_lock);
+ if (req->is_write_op()) {
+ assert(m_queued_writes.read() > 0);
+ m_queued_writes.dec();
+ } else {
+ assert(m_queued_reads.read() > 0);
+ m_queued_reads.dec();
+ }
+}
+
+void AioImageRequestWQ::finish_in_progress_write() {
bool writes_blocked = false;
{
RWLock::RLocker locker(m_lock);
- if (req->is_write_op()) {
- assert(m_queued_writes.read() > 0);
- m_queued_writes.dec();
-
- assert(m_in_progress_writes.read() > 0);
- if (m_in_progress_writes.dec() == 0 &&
- !m_write_blocker_contexts.empty()) {
- writes_blocked = true;
- }
- } else {
- assert(m_queued_reads.read() > 0);
- m_queued_reads.dec();
+ assert(m_in_progress_writes.read() > 0);
+ if (m_in_progress_writes.dec() == 0 &&
+ !m_write_blocker_contexts.empty()) {
+ writes_blocked = true;
}
}
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
m_image_ctx.flush(new C_BlockedWrites(this));
}
- delete req;
-
- finish_in_flight_op();
}
int AioImageRequestWQ::start_in_flight_op(AioCompletion *c) {
<< "req=" << req << dendl;
if (r < 0) {
req->fail(r);
+ delete req;
+
+ finish_queued_op(req);
+ finish_in_flight_op();
} else {
- process(req);
- process_finish();
+ // since IO was stalled for refresh -- original IO order is preserved
+ // if we requeue this op for work queue processing
+ requeue(req);
+ }
- m_refresh_in_progress = false;
- signal();
+ m_refresh_in_progress = false;
+ signal();
+
+ // refresh might have enabled exclusive lock -- IO stalled until
+ // we acquire the lock
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ if (is_lock_required() && is_lock_request_needed()) {
+ m_image_ctx.exclusive_lock->request_lock(nullptr);
}
}