list<Op*> q;
list<uint64_t> jq;
Cond cond;
+ uint64_t last_thru_q, last_thru_jq; // Last op to drain through each q
public:
Sequencer *parent;
Mutex apply_lock; // for apply mutual exclusion
}
void dequeue_journal() {
Mutex::Locker l(qlock);
+ assert(jq.front() > last_thru_jq);
+ last_thru_jq = jq.front();
jq.pop_front();
cond.Signal();
}
assert(apply_lock.is_locked());
Mutex::Locker l(qlock);
Op *o = q.front();
+ assert(o->op > last_thru_q);
+ last_thru_q = o->op;
q.pop_front();
cond.Signal();
return o;
if (seq) {
// everything prior to our watermark to drain through either/both queues
- while ((!q.empty() && q.front()->op <= seq) ||
- (!jq.empty() && jq.front() <= seq))
+ while (last_thru_q < seq || last_thru_jq < seq)
cond.Wait(qlock);
}
}
OpSequencer() : qlock("FileStore::OpSequencer::qlock", false, false),
+ last_thru_q(0), last_thru_jq(0),
apply_lock("FileStore::OpSequencer::apply_lock", false, false) {}
~OpSequencer() {
assert(q.empty());