/// -----------------------------
-void FileStore::queue_op(Sequencer *posr, uint64_t op_seq, list<Transaction*>& tls,
+void FileStore::queue_op(OpSequencer *osr, uint64_t op_seq, list<Transaction*>& tls,
Context *onreadable, Context *onreadable_sync)
{
uint64_t bytes = 0, ops = 0;
op_tp.lock();
- OpSequencer *osr;
- if (!posr)
- posr = &default_osr;
- if (posr->p) {
- osr = (OpSequencer *)posr->p;
- dout(10) << "queue_op existing osr " << osr << "/" << osr->parent << dendl; //<< " w/ q " << osr->q << dendl;
- } else {
- osr = new OpSequencer;
- osr->parent = posr;
- posr->p = osr;
- dout(10) << "queue_op new osr " << osr << "/" << osr->parent << dendl;
- }
osr->queue(o);
op_queue_len++;
struct C_JournaledAhead : public Context {
FileStore *fs;
- ObjectStore::Sequencer *osr;
+ FileStore::OpSequencer *osr;
uint64_t op;
list<ObjectStore::Transaction*> tls;
Context *onreadable, *onreadable_sync;
Context *ondisk;
- C_JournaledAhead(FileStore *f, ObjectStore::Sequencer *os, uint64_t o, list<ObjectStore::Transaction*>& t,
+ C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, uint64_t o, list<ObjectStore::Transaction*>& t,
Context *onr, Context *ond, Context *onrsync) :
fs(f), osr(os), op(o), tls(t), onreadable(onr), onreadable_sync(onrsync), ondisk(ond) { }
void finish(int r) {
return queue_transactions(osr, tls, new C_DeleteTransaction(t));
}
-int FileStore::queue_transactions(Sequencer *osr, list<Transaction*> &tls,
+int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
Context *onreadable, Context *ondisk,
Context *onreadable_sync)
{
+ // set up the sequencer
+ OpSequencer *osr;
+ if (!posr)
+ posr = &default_osr;
+ if (posr->p) {
+ osr = (OpSequencer *)posr->p;
+ dout(10) << "queue_transactions existing osr " << osr << "/" << osr->parent << dendl; //<< " w/ q " << osr->q << dendl;
+ } else {
+ osr = new OpSequencer;
+ osr->parent = posr;
+ posr->p = osr;
+ dout(10) << "queue_transactions new osr " << osr << "/" << osr->parent << dendl;
+ }
+
if (journal && journal->is_writeable()) {
if (g_conf.filestore_journal_parallel) {
uint64_t op = op_submit_start();
dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
+ osr->queue_journal(op);
_op_journal_transactions(tls, op,
new C_JournaledAhead(this, osr, op, tls, onreadable, ondisk, onreadable_sync));
op_submit_finish(op);
return r;
}
-void FileStore::_journaled_ahead(Sequencer *osr, uint64_t op,
+void FileStore::_journaled_ahead(OpSequencer *osr, uint64_t op,
list<Transaction*> &tls,
Context *onreadable, Context *ondisk,
Context *onreadable_sync)
op_queue_throttle();
+ osr->dequeue_journal();
+
// this should queue in order because the journal does it's completions in order.
journal_lock.Lock();
queue_op(osr, op, tls, onreadable, onreadable_sync);
class OpSequencer : public Sequencer_impl {
Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock)
list<Op*> q;
+ list<uint64_t> jq;
Cond cond;
public:
Sequencer *parent;
Mutex apply_lock; // for apply mutual exclusion
+ void queue_journal(uint64_t s) {
+ Mutex::Locker l(qlock);
+ jq.push_back(s);
+ }
+ void dequeue_journal() {
+ Mutex::Locker l(qlock);
+ jq.pop_front();
+ cond.Signal();
+ }
void queue(Op *o) {
Mutex::Locker l(qlock);
q.push_back(o);
}
void flush() {
Mutex::Locker l(qlock);
- if (!q.empty()) {
- uint64_t seq = q.back()->op;
- while (!q.empty() && q.front()->op <= seq)
+
+ // get max for journal _or_ op queues
+ uint64_t seq = 0;
+ if (!q.empty())
+ seq = q.back()->op;
+ if (!jq.empty() && jq.back() > seq)
+ seq = jq.back();
+
+ if (seq) {
+ // everything prior to our watermark to drain through either/both queues
+ while ((!q.empty() && q.front()->op <= seq) ||
+ (!jq.empty() && jq.front() <= seq))
cond.Wait(qlock);
}
}
void _do_op(OpSequencer *o);
void _finish_op(OpSequencer *o);
- void queue_op(Sequencer *osr, uint64_t op, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync);
+ void queue_op(OpSequencer *osr, uint64_t op, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync);
void op_queue_throttle();
- void _journaled_ahead(Sequencer *osr, uint64_t op, list<Transaction*> &tls,
+ void _journaled_ahead(OpSequencer *osr, uint64_t op, list<Transaction*> &tls,
Context *onreadable, Context *ondisk, Context *onreadable_sync);
friend class C_JournaledAhead;