From 1d9f5d27d6c605ab2a9e63378a8f143ae30cefd7 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 5 Oct 2012 13:46:13 -0700 Subject: [PATCH] JournalingFileStore: create submit_manager to order op submission Previously, we ensured op ordering by queueing for journal and the op queue under the journal lock. All that is required is that obtaining an op sequence, queueing for journal, and (for parallel) queueing for application to the fs are done atomically. To that end, submit_manager now handles op submission. Signed-off-by: Samuel Just --- src/os/FileStore.cc | 19 ++++++++----------- src/os/JournalingObjectStore.cc | 22 ++++++++++++---------- src/os/JournalingObjectStore.h | 25 ++++++++++++++++++++++--- 3 files changed, 42 insertions(+), 24 deletions(-) diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index b1ba601590b81..321307e0e212e 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -2219,12 +2219,10 @@ FileStore::Op *FileStore::build_op(list& tls, void FileStore::queue_op(OpSequencer *osr, Op *o) { - assert(journal_lock.is_locked()); - // mark apply start _now_, because we need to drain the entire apply // queue during commit in order to put the store in a consistent // state. - _op_apply_start(o->op); + op_apply_start(o->op); op_tp.lock(); osr->queue(o); @@ -2377,7 +2375,8 @@ int FileStore::queue_transactions(Sequencer *posr, list &tls, Op *o = build_op(tls, onreadable, onreadable_sync, osd_op); op_queue_reserve_throttle(o); journal->throttle(); - o->op = op_submit_start(); + uint64_t op_num = submit_manager.op_submit_start(); + o->op = op_num; if (m_filestore_do_dump) dump_transactions(o->tls, o->op, osr); @@ -2387,7 +2386,7 @@ int FileStore::queue_transactions(Sequencer *posr, list &tls, _op_journal_transactions(o->tls, o->op, ondisk, osd_op); - // queue inside journal lock, to preserve ordering + // queue inside submit_manager op submission lock queue_op(osr, o); } else if (m_filestore_journal_writeahead) { dout(5) << "queue_transactions (writeahead) " << o->op << " " << o->tls << dendl; @@ -2400,17 +2399,17 @@ int FileStore::queue_transactions(Sequencer *posr, list &tls, } else { assert(0); } - op_submit_finish(o->op); + submit_manager.op_submit_finish(op_num); return 0; } - uint64_t op = op_submit_start(); + uint64_t op = submit_manager.op_submit_start(); dout(5) << "queue_transactions (trailing journal) " << op << " " << tls << dendl; if (m_filestore_do_dump) dump_transactions(tls, op, osr); - _op_apply_start(op); + op_apply_start(op); int r = do_transactions(tls, op); if (r >= 0) { @@ -2427,7 +2426,7 @@ int FileStore::queue_transactions(Sequencer *posr, list &tls, } op_finisher.queue(onreadable, r); - op_submit_finish(op); + submit_manager.op_submit_finish(op); op_apply_finish(op); return r; @@ -2438,9 +2437,7 @@ void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk) dout(5) << "_journaled_ahead " << o << " seq " << o->op << " " << *osr << " " << o->tls << dendl; // this should queue in order because the journal does it's completions in order. - journal_lock.Lock(); queue_op(osr, o); - journal_lock.Unlock(); osr->dequeue_journal(); diff --git a/src/os/JournalingObjectStore.cc b/src/os/JournalingObjectStore.cc index d42a96008cfec..79038c8ee742f 100644 --- a/src/os/JournalingObjectStore.cc +++ b/src/os/JournalingObjectStore.cc @@ -39,7 +39,7 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq) } journal_lock.Lock(); - op_seq = fs_op_seq; + uint64_t op_seq = fs_op_seq; committed_seq = fs_op_seq; committing_seq = fs_op_seq; applied_seq = fs_op_seq; @@ -106,6 +106,8 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq) replaying = false; + submit_manager.set_op_seq(op_seq); + journal_lock.Unlock(); // done reading, make writeable. @@ -165,25 +167,26 @@ void JournalingObjectStore::op_apply_finish(uint64_t op) journal_lock.Unlock(); } -uint64_t JournalingObjectStore::op_submit_start() +uint64_t JournalingObjectStore::SubmitManager::op_submit_start() { - journal_lock.Lock(); + lock.Lock(); uint64_t op = ++op_seq; dout(10) << "op_submit_start " << op << dendl; ops_submitting.push_back(op); return op; } -void JournalingObjectStore::op_submit_finish(uint64_t op) +void JournalingObjectStore::SubmitManager::op_submit_finish(uint64_t op) { dout(10) << "op_submit_finish " << op << dendl; if (op != ops_submitting.front()) { - dout(0) << "op_submit_finish " << op << " expected " << ops_submitting.front() + dout(0) << "op_submit_finish " << op << " expected " + << ops_submitting.front() << ", OUT OF ORDER" << dendl; assert(0 == "out of order op_submit_finish"); } ops_submitting.pop_front(); - journal_lock.Unlock(); + lock.Unlock(); } @@ -194,9 +197,9 @@ bool JournalingObjectStore::commit_start() bool ret = false; journal_lock.Lock(); - dout(10) << "commit_start op_seq " << op_seq - << ", applied_seq " << applied_seq - << ", committed_seq " << committed_seq << dendl; + dout(10) << "commit_start op_seq " << submit_manager.get_op_seq() + << ", applied_seq " << applied_seq + << ", committed_seq " << committed_seq << dendl; blocked = true; while (open_ops > 0) { dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl; @@ -265,7 +268,6 @@ void JournalingObjectStore::_op_journal_transactions( list& tls, uint64_t op, Context *onjournal, TrackedOpRef osd_op) { - assert(journal_lock.is_locked()); dout(10) << "op_journal_transactions " << op << " " << tls << dendl; if (journal && journal->is_writeable()) { diff --git a/src/os/JournalingObjectStore.h b/src/os/JournalingObjectStore.h index 54a34da1932c5..d8004c4e4cdad 100644 --- a/src/os/JournalingObjectStore.h +++ b/src/os/JournalingObjectStore.h @@ -21,10 +21,30 @@ class JournalingObjectStore : public ObjectStore { protected: - uint64_t op_seq, applied_seq; + uint64_t applied_seq; uint64_t committing_seq, committed_seq; map > commit_waiters; + class SubmitManager { + Mutex lock; + uint64_t op_seq; + list ops_submitting; + public: + SubmitManager() : + lock("JOS::SubmitManager::lock"), + op_seq(0) + {} + uint64_t op_submit_start(); + void op_submit_finish(uint64_t op); + void set_op_seq(uint64_t seq) { + Mutex::Locker l(lock); + seq = op_seq; + } + uint64_t get_op_seq() { + return op_seq; + } + } submit_manager; + int open_ops; bool blocked; @@ -73,8 +93,7 @@ public: } public: - JournalingObjectStore() : op_seq(0), - applied_seq(0), committing_seq(0), committed_seq(0), + JournalingObjectStore() : applied_seq(0), committing_seq(0), committed_seq(0), open_ops(0), blocked(false), journal(NULL), finisher(g_ceph_context), journal_lock("JournalingObjectStore::journal_lock"), -- 2.39.5