}
void AioImageRequestWQ::block_writes() {
+ C_SaferCond cond_ctx;
+ block_writes(&cond_ctx);
+ cond_ctx.wait();
+}
+
+void AioImageRequestWQ::block_writes(Context *on_blocked) {
CephContext *cct = m_image_ctx.cct;
- Mutex::Locker locker(m_lock);
- ++m_write_blockers;
- ldout(cct, 5) << __func__ << ": " << &m_image_ctx << ", "
- << "num=" << m_write_blockers << dendl;
- if (m_write_blockers == 1) {
- while (m_in_progress_writes > 0) {
- m_cond.Wait(m_lock);
+ {
+ Mutex::Locker locker(m_lock);
+ ++m_write_blockers;
+ ldout(cct, 5) << __func__ << ": " << &m_image_ctx << ", "
+ << "num=" << m_write_blockers << dendl;
+ if (m_in_progress_writes > 0) {
+ m_write_blocker_contexts.push_back(on_blocked);
+ return;
}
}
+ on_blocked->complete(0);
}
void AioImageRequestWQ::unblock_writes() {
req->send();
}
+ Contexts contexts;
{
Mutex::Locker locker(m_lock);
if (req->is_write_op()) {
assert(m_in_progress_writes > 0);
if (--m_in_progress_writes == 0) {
- m_cond.Signal();
+ contexts.swap(m_write_blocker_contexts);
}
}
}
delete req;
+
+ for (Contexts::iterator it = contexts.begin(); it != contexts.end(); ++it) {
+ (*it)->complete(0);
+ }
}
bool AioImageRequestWQ::is_journal_required() const {
}
void block_writes();
+ void block_writes(Context *on_blocked);
void unblock_writes();
void register_lock_listener();
virtual void process(AioImageRequest *req);
private:
+ typedef std::list<Context *> Contexts;
+
struct LockListener : public ImageWatcher::Listener {
AioImageRequestWQ *aio_work_queue;
LockListener(AioImageRequestWQ *_aio_work_queue)
ImageCtx &m_image_ctx;
mutable Mutex m_lock;
- Cond m_cond;
+ Contexts m_write_blocker_contexts;
uint32_t m_write_blockers;
uint32_t m_in_progress_writes;
uint32_t m_queued_writes;