From 3a235b0f2136c50823cafefe705eff610b0cded8 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 17 Dec 2010 15:12:17 -0800 Subject: [PATCH] filestore: make OpSequencer::flush() work for writeahead journaling items It was only waiting for items in the op_queue to complete. The goal is to wait for anything we've called queue_transactions(&osr,...) on. If we do writeahead journaling, though, there might be new ops that are still journaling but not yet submitted to the fs that are missed. This adds a journal queue to the OpSequencer, and uses it in the writeahead case only. Signed-off-by: Sage Weil --- src/os/FileStore.cc | 39 ++++++++++++++++++++++----------------- src/os/FileStore.h | 29 ++++++++++++++++++++++++----- 2 files changed, 46 insertions(+), 22 deletions(-) diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index d180f9daa666a..179d1757dae37 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -1230,7 +1230,7 @@ int FileStore::umount() /// ----------------------------- -void FileStore::queue_op(Sequencer *posr, uint64_t op_seq, list& tls, +void FileStore::queue_op(OpSequencer *osr, uint64_t op_seq, list& tls, Context *onreadable, Context *onreadable_sync) { uint64_t bytes = 0, ops = 0; @@ -1260,18 +1260,6 @@ void FileStore::queue_op(Sequencer *posr, uint64_t op_seq, list& t op_tp.lock(); - OpSequencer *osr; - if (!posr) - posr = &default_osr; - if (posr->p) { - osr = (OpSequencer *)posr->p; - dout(10) << "queue_op existing osr " << osr << "/" << osr->parent << dendl; //<< " w/ q " << osr->q << dendl; - } else { - osr = new OpSequencer; - osr->parent = posr; - posr->p = osr; - dout(10) << "queue_op new osr " << osr << "/" << osr->parent << dendl; - } osr->queue(o); op_queue_len++; @@ -1361,13 +1349,13 @@ void FileStore::_finish_op(OpSequencer *osr) struct C_JournaledAhead : public Context { FileStore *fs; - ObjectStore::Sequencer *osr; + FileStore::OpSequencer *osr; uint64_t op; list tls; Context *onreadable, *onreadable_sync; Context *ondisk; - C_JournaledAhead(FileStore *f, ObjectStore::Sequencer *os, uint64_t o, list& t, + C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, uint64_t o, list& t, Context *onr, Context *ond, Context *onrsync) : fs(f), osr(os), op(o), tls(t), onreadable(onr), onreadable_sync(onrsync), ondisk(ond) { } void finish(int r) { @@ -1382,10 +1370,24 @@ int FileStore::queue_transaction(Sequencer *osr, Transaction *t) return queue_transactions(osr, tls, new C_DeleteTransaction(t)); } -int FileStore::queue_transactions(Sequencer *osr, list &tls, +int FileStore::queue_transactions(Sequencer *posr, list &tls, Context *onreadable, Context *ondisk, Context *onreadable_sync) { + // set up the sequencer + OpSequencer *osr; + if (!posr) + posr = &default_osr; + if (posr->p) { + osr = (OpSequencer *)posr->p; + dout(10) << "queue_transactions existing osr " << osr << "/" << osr->parent << dendl; //<< " w/ q " << osr->q << dendl; + } else { + osr = new OpSequencer; + osr->parent = posr; + posr->p = osr; + dout(10) << "queue_transactions new osr " << osr << "/" << osr->parent << dendl; + } + if (journal && journal->is_writeable()) { if (g_conf.filestore_journal_parallel) { @@ -1414,6 +1416,7 @@ int FileStore::queue_transactions(Sequencer *osr, list &tls, uint64_t op = op_submit_start(); dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl; + osr->queue_journal(op); _op_journal_transactions(tls, op, new C_JournaledAhead(this, osr, op, tls, onreadable, ondisk, onreadable_sync)); op_submit_finish(op); @@ -1447,7 +1450,7 @@ int FileStore::queue_transactions(Sequencer *osr, list &tls, return r; } -void FileStore::_journaled_ahead(Sequencer *osr, uint64_t op, +void FileStore::_journaled_ahead(OpSequencer *osr, uint64_t op, list &tls, Context *onreadable, Context *ondisk, Context *onreadable_sync) @@ -1456,6 +1459,8 @@ void FileStore::_journaled_ahead(Sequencer *osr, uint64_t op, op_queue_throttle(); + osr->dequeue_journal(); + // this should queue in order because the journal does it's completions in order. journal_lock.Lock(); queue_op(osr, op, tls, onreadable, onreadable_sync); diff --git a/src/os/FileStore.h b/src/os/FileStore.h index 2c062bee146dd..536c7466affe1 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -101,11 +101,21 @@ class FileStore : public JournalingObjectStore { class OpSequencer : public Sequencer_impl { Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock) list q; + list jq; Cond cond; public: Sequencer *parent; Mutex apply_lock; // for apply mutual exclusion + void queue_journal(uint64_t s) { + Mutex::Locker l(qlock); + jq.push_back(s); + } + void dequeue_journal() { + Mutex::Locker l(qlock); + jq.pop_front(); + cond.Signal(); + } void queue(Op *o) { Mutex::Locker l(qlock); q.push_back(o); @@ -124,9 +134,18 @@ class FileStore : public JournalingObjectStore { } void flush() { Mutex::Locker l(qlock); - if (!q.empty()) { - uint64_t seq = q.back()->op; - while (!q.empty() && q.front()->op <= seq) + + // get max for journal _or_ op queues + uint64_t seq = 0; + if (!q.empty()) + seq = q.back()->op; + if (!jq.empty() && jq.back() > seq) + seq = jq.back(); + + if (seq) { + // everything prior to our watermark to drain through either/both queues + while ((!q.empty() && q.front()->op <= seq) || + (!jq.empty() && jq.front() <= seq)) cond.Wait(qlock); } } @@ -180,9 +199,9 @@ class FileStore : public JournalingObjectStore { void _do_op(OpSequencer *o); void _finish_op(OpSequencer *o); - void queue_op(Sequencer *osr, uint64_t op, list& tls, Context *onreadable, Context *onreadable_sync); + void queue_op(OpSequencer *osr, uint64_t op, list& tls, Context *onreadable, Context *onreadable_sync); void op_queue_throttle(); - void _journaled_ahead(Sequencer *osr, uint64_t op, list &tls, + void _journaled_ahead(OpSequencer *osr, uint64_t op, list &tls, Context *onreadable, Context *ondisk, Context *onreadable_sync); friend class C_JournaledAhead; -- 2.39.5