}
void AioImageRequestWQ::shut_down(Context *on_shutdown) {
+ assert(m_image_ctx.owner_lock.is_locked());
+
{
RWLock::WLocker locker(m_lock);
assert(!m_shutdown);
}
}
- on_shutdown->complete(0);
+ // ensure that all in-flight IO is flushed
+ m_image_ctx.flush(on_shutdown);
}
void AioImageRequestWQ::block_writes() {
}
void AioImageRequestWQ::block_writes(Context *on_blocked) {
+ assert(m_image_ctx.owner_lock.is_locked());
CephContext *cct = m_image_ctx.cct;
{
}
}
- m_image_ctx.op_work_queue->queue(on_blocked);
+ // ensure that all in-flight IO is flushed
+ m_image_ctx.flush(on_blocked);
}
void AioImageRequestWQ::unblock_writes() {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 5) << __func__ << ": completing shut down" << dendl;
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
assert(m_on_shutdown != nullptr);
- m_on_shutdown->complete(0);
+ m_image_ctx.flush(m_on_shutdown);
}
bool AioImageRequestWQ::is_journal_required() const {
<< "req=" << req << dendl;
assert(m_image_ctx.owner_lock.is_locked());
- if (req->is_write_op()) {
+ bool write_op = req->is_write_op();
+ if (write_op) {
m_queued_writes.inc();
}
ThreadPool::PointerWQ<AioImageRequest>::queue(req);
- if (is_lock_required()) {
+ if (write_op && is_lock_required()) {
m_image_ctx.exclusive_lock->request_lock(nullptr);
}
}