Cond cond;
ops_apply_blocked.push_back(&cond);
dout(10) << "op_apply_start " << op << " blocked (getting in back of line)" << dendl;
- while (blocked && ops_apply_blocked.front() != &cond)
+ // sleep until we are not blocked AND we are at the front of line
+ while (blocked || ops_apply_blocked.front() != &cond)
cond.Wait(journal_lock);
dout(10) << "op_apply_start " << op << " woke (at front of line)" << dendl;
ops_apply_blocked.pop_front();
dout(10) << "op_apply_start " << op << " ...and kicking next in line" << dendl;
ops_apply_blocked.front()->Signal();
}
- } else {
- dout(10) << "op_apply_start " << op << dendl;
}
-
+ dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) << dendl;
+ assert(!blocked);
open_ops++;
-
return op;
}
void JournalingObjectStore::op_apply_finish(uint64_t op)
{
- dout(10) << "op_apply_finish " << op << dendl;
journal_lock.Lock();
+ dout(10) << "op_apply_finish " << op << " open_ops " << open_ops << " -> " << (open_ops-1) << dendl;
if (--open_ops == 0)
cond.Signal();
dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl;
cond.Wait(journal_lock);
}
-
+ dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
+ assert(open_ops == 0);
+
if (applied_seq == committed_seq && !force_commit) {
dout(10) << "commit_start nothing to do" << dendl;
blocked = false;