// queue inside journal lock, to preserve ordering
queue_op(osr, op, tls, onreadable, onreadable_sync);
- op_submit_finish();
+ op_submit_finish(op);
return 0;
}
else if (g_conf.filestore_journal_writeahead) {
dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
_op_journal_transactions(tls, op,
new C_JournaledAhead(this, osr, op, tls, onreadable, ondisk, onreadable_sync));
- op_submit_finish();
+ op_submit_finish(op);
return 0;
}
}
- uint64_t op_seq = op_apply_start(0);
- dout(10) << "queue_transactions (trailing journal) " << op_seq << " " << tls << dendl;
- int r = do_transactions(tls, op_seq);
- op_apply_finish(op_seq);
+ uint64_t op = op_submit_start();
+ dout(10) << "queue_transactions (trailing journal) " << op << " " << tls << dendl;
+
+ _op_apply_start(op);
+ int r = do_transactions(tls, op);
if (r >= 0) {
- op_journal_transactions(tls, op_seq, ondisk);
+ op_journal_transactions(tls, op, ondisk);
} else {
delete ondisk;
}
}
op_finisher.queue(onreadable, r);
+ op_submit_finish(op);
+ op_apply_finish(op);
+
return r;
}
uint64_t JournalingObjectStore::op_submit_start()
{
journal_lock.Lock();
- return ++op_seq;
+ uint64_t op = ++op_seq;
+ dout(10) << "op_submit_start " << op << dendl;
+ ops_submitting.push_back(op);
+ return op;
}
-void JournalingObjectStore::op_submit_finish()
+void JournalingObjectStore::op_submit_finish(uint64_t op)
{
+ dout(10) << "op_submit_finish " << op << dendl;
+ if (op != ops_submitting.front()) {
+ dout(0) << "op_submit_finish " << op << " expected " << ops_submitting.front()
+ << ", OUT OF ORDER" << dendl;
+ }
+ ops_submitting.pop_front();
journal_lock.Unlock();
}
Cond cond;
Mutex journal_lock;
+ list<uint64_t> ops_submitting;
+
protected:
void journal_start();
void journal_stop();
// --
uint64_t op_submit_start();
- void op_submit_finish();
+ void op_submit_finish(uint64_t op_seq);
uint64_t op_apply_start(uint64_t op);
uint64_t _op_apply_start(uint64_t op);