From 9601b291322dff7f229ffd8d84a912a9ea94cd4d Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 5 Oct 2012 17:33:36 -0700 Subject: [PATCH] JournalingFileStore: move apply/commit sequencing to apply_manager syncing the filestore requires a stable commit point (i.e., all ops up to applied_seq must have been applied). Previously, we used journal_lock to atomically block new applies while waiting for the remaining ones to finish. This creates unnecessary contention. We now use apply_manager to manage that state atomically with its own lock. Signed-off-by: Samuel Just --- src/os/FileStore.cc | 20 ++--- src/os/JournalingObjectStore.cc | 127 ++++++++++++++------------------ src/os/JournalingObjectStore.h | 95 +++++++++++++++--------- 3 files changed, 127 insertions(+), 115 deletions(-) diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 321307e0e212e..3f2b2bc0e71ff 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -2222,7 +2222,7 @@ void FileStore::queue_op(OpSequencer *osr, Op *o) // mark apply start _now_, because we need to drain the entire apply // queue during commit in order to put the store in a consistent // state. - op_apply_start(o->op); + apply_manager.op_apply_start(o->op); op_tp.lock(); osr->queue(o); @@ -2295,7 +2295,7 @@ void FileStore::_do_op(OpSequencer *osr) dout(5) << "_do_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl; int r = do_transactions(o->tls, o->op); - op_apply_finish(o->op); + apply_manager.op_apply_finish(o->op); dout(10) << "_do_op " << o << " seq " << o->op << " r = " << r << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl; @@ -2409,7 +2409,7 @@ int FileStore::queue_transactions(Sequencer *posr, list &tls, if (m_filestore_do_dump) dump_transactions(tls, op, osr); - op_apply_start(op); + apply_manager.op_apply_start(op); int r = do_transactions(tls, op); if (r >= 0) { @@ -2427,7 +2427,7 @@ int FileStore::queue_transactions(Sequencer *posr, list &tls, op_finisher.queue(onreadable, r); submit_manager.op_submit_finish(op); - op_apply_finish(op); + apply_manager.op_apply_finish(op); return r; } @@ -3647,9 +3647,9 @@ void FileStore::sync_entry() fin.swap(sync_waiters); lock.Unlock(); - if (commit_start()) { + if (apply_manager.commit_start()) { utime_t start = ceph_clock_now(g_ceph_context); - uint64_t cp = committing_seq; + uint64_t cp = apply_manager.get_committing_seq(); sync_entry_timeo_lock.Lock(); SyncEntryTimeout *sync_entry_timeo = @@ -3697,7 +3697,7 @@ void FileStore::sync_entry() snaps.push_back(cp); - commit_started(); + apply_manager.commit_started(); // wait for commit dout(20) << " waiting for transid " << async_args.transid << " to complete" << dendl; @@ -3728,11 +3728,11 @@ void FileStore::sync_entry() assert(r == 0); snaps.push_back(cp); - commit_started(); + apply_manager.commit_started(); } } else { - commit_started(); + apply_manager.commit_started(); if (btrfs) { dout(15) << "sync_entry doing btrfs SYNC" << dendl; @@ -3764,7 +3764,7 @@ void FileStore::sync_entry() logger->finc(l_os_commit_lat, lat); logger->finc(l_os_commit_len, dur); - commit_finish(); + apply_manager.commit_finish(); logger->set(l_os_committing, 0); diff --git a/src/os/JournalingObjectStore.cc b/src/os/JournalingObjectStore.cc index 79038c8ee742f..42b95c96a58bc 100644 --- a/src/os/JournalingObjectStore.cc +++ b/src/os/JournalingObjectStore.cc @@ -38,12 +38,8 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq) fs_op_seq = g_conf->journal_replay_from - 1; } - journal_lock.Lock(); uint64_t op_seq = fs_op_seq; - committed_seq = fs_op_seq; - committing_seq = fs_op_seq; - applied_seq = fs_op_seq; - journal_lock.Unlock(); + apply_manager.init_seq(fs_op_seq); if (!journal) return 0; @@ -58,8 +54,6 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq) return err; } - journal_lock.Lock(); - replaying = true; int count = 0; @@ -85,14 +79,11 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq) tls.push_back(t); } - open_ops++; - journal_lock.Unlock(); + apply_manager.op_apply_start(seq); int r = do_transactions(tls, seq); - journal_lock.Lock(); - open_ops--; - cond.Signal(); + apply_manager.op_apply_finish(seq); - op_seq = applied_seq = seq; + op_seq = seq; while (!tls.empty()) { delete tls.front(); @@ -100,16 +91,12 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq) } dout(3) << "journal_replay: r = " << r << ", op_seq now " << op_seq << dendl; - assert(op_seq == seq); - seq++; // we expect the next op } replaying = false; submit_manager.set_op_seq(op_seq); - journal_lock.Unlock(); - // done reading, make writeable. journal->make_writeable(); @@ -119,16 +106,9 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq) // ------------------------------------ -uint64_t JournalingObjectStore::op_apply_start(uint64_t op) -{ - Mutex::Locker l(journal_lock); - return _op_apply_start(op); -} - -uint64_t JournalingObjectStore::_op_apply_start(uint64_t op) +uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op) { - assert(journal_lock.is_locked()); - + Mutex::Locker l(apply_lock); // if we ops are blocked, or there are already people (left) in // line, get in line. if (blocked || !ops_apply_blocked.empty()) { @@ -137,7 +117,7 @@ uint64_t JournalingObjectStore::_op_apply_start(uint64_t op) dout(10) << "op_apply_start " << op << " blocked (getting in back of line)" << dendl; // sleep until we are not blocked AND we are at the front of line while (blocked || ops_apply_blocked.front() != &cond) - cond.Wait(journal_lock); + cond.Wait(apply_lock); dout(10) << "op_apply_start " << op << " woke (at front of line)" << dendl; ops_apply_blocked.pop_front(); if (!ops_apply_blocked.empty()) { @@ -147,24 +127,24 @@ uint64_t JournalingObjectStore::_op_apply_start(uint64_t op) } dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) << dendl; assert(!blocked); + open_ops++; return op; } -void JournalingObjectStore::op_apply_finish(uint64_t op) +void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op) { - journal_lock.Lock(); - dout(10) << "op_apply_finish " << op << " open_ops " << open_ops << " -> " << (open_ops-1) << dendl; + Mutex::Locker l(apply_lock); + dout(10) << "op_apply_finish " << op << " open_ops " << open_ops + << " -> " << (open_ops-1) << dendl; if (--open_ops == 0) - cond.Signal(); + open_ops_cond.Signal(); // there can be multiple applies in flight; track the max value we // note. note that we can't _read_ this value and learn anything // meaningful unless/until we've quiesced all in-flight applies. if (op > applied_seq) applied_seq = op; - - journal_lock.Unlock(); } uint64_t JournalingObjectStore::SubmitManager::op_submit_start() @@ -192,51 +172,57 @@ void JournalingObjectStore::SubmitManager::op_submit_finish(uint64_t op) // ------------------------------------------ -bool JournalingObjectStore::commit_start() +void JournalingObjectStore::ApplyManager::add_waiter(uint64_t op, Context *c) { - bool ret = false; - - journal_lock.Lock(); - dout(10) << "commit_start op_seq " << submit_manager.get_op_seq() - << ", applied_seq " << applied_seq - << ", committed_seq " << committed_seq << dendl; - blocked = true; - while (open_ops > 0) { - dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl; - cond.Wait(journal_lock); - } - dout(10) << "commit_start blocked, all open_ops have completed" << dendl; - assert(open_ops == 0); + Mutex::Locker l(com_lock); + assert(c); + commit_waiters[op].push_back(c); +} - if (applied_seq == committed_seq) { - dout(10) << "commit_start nothing to do" << dendl; - blocked = false; - if (!ops_apply_blocked.empty()) - ops_apply_blocked.front()->Signal(); - assert(commit_waiters.empty()); - goto out; - } +bool JournalingObjectStore::ApplyManager::commit_start() +{ + bool ret = false; + { + Mutex::Locker l(apply_lock); + dout(10) << "commit_start " + << ", applied_seq " << applied_seq << dendl; + blocked = true; + while (open_ops > 0) { + dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl; + open_ops_cond.Wait(apply_lock); + } + assert(open_ops == 0); + + dout(10) << "commit_start blocked, all open_ops have completed" << dendl; + { + Mutex::Locker l(com_lock); + if (applied_seq == committed_seq) { + dout(10) << "commit_start nothing to do" << dendl; + blocked = false; + if (!ops_apply_blocked.empty()) + ops_apply_blocked.front()->Signal(); + assert(commit_waiters.empty()); + goto out; + } - com_lock.Lock(); - // we can _only_ read applied_seq here because open_ops == 0 (we've - // quiesced all in-flight applies). - committing_seq = applied_seq; - com_lock.Unlock(); + committing_seq = applied_seq; - dout(10) << "commit_start committing " << committing_seq << ", still blocked" << dendl; + dout(10) << "commit_start committing " << committing_seq + << ", still blocked" << dendl; + } + } ret = true; out: if (journal) journal->commit_start(); // tell the journal too - journal_lock.Unlock(); return ret; } -void JournalingObjectStore::commit_started() +void JournalingObjectStore::ApplyManager::commit_started() { - Mutex::Locker l(journal_lock); + Mutex::Locker l(apply_lock); // allow new ops. (underlying fs should now be committing all prior ops) dout(10) << "commit_started committing " << committing_seq << ", unblocking" << dendl; blocked = false; @@ -244,21 +230,19 @@ void JournalingObjectStore::commit_started() ops_apply_blocked.front()->Signal(); } -void JournalingObjectStore::commit_finish() +void JournalingObjectStore::ApplyManager::commit_finish() { - Mutex::Locker l(journal_lock); + Mutex::Locker l(com_lock); dout(10) << "commit_finish thru " << committing_seq << dendl; if (journal) journal->committed_thru(committing_seq); - com_lock.Lock(); committed_seq = committing_seq; - com_lock.Unlock(); map >::iterator p = commit_waiters.begin(); while (p != commit_waiters.end() && - p->first <= committing_seq) { + p->first <= committing_seq) { finisher.queue(p->second); commit_waiters.erase(p++); } @@ -285,6 +269,7 @@ void JournalingObjectStore::_op_journal_transactions( ::encode(*t, tbl); } journal->submit_entry(op, tbl, data_align, onjournal, osd_op); - } else if (onjournal) - commit_waiters[op].push_back(onjournal); + } else if (onjournal) { + apply_manager.add_waiter(op, onjournal); + } } diff --git a/src/os/JournalingObjectStore.h b/src/os/JournalingObjectStore.h index d8004c4e4cdad..5bfa86b8f9f92 100644 --- a/src/os/JournalingObjectStore.h +++ b/src/os/JournalingObjectStore.h @@ -21,9 +21,9 @@ class JournalingObjectStore : public ObjectStore { protected: - uint64_t applied_seq; - uint64_t committing_seq, committed_seq; - map > commit_waiters; + Journal *journal; + Finisher finisher; + class SubmitManager { Mutex lock; @@ -38,25 +38,69 @@ protected: void op_submit_finish(uint64_t op); void set_op_seq(uint64_t seq) { Mutex::Locker l(lock); - seq = op_seq; + op_seq = seq; } uint64_t get_op_seq() { return op_seq; } } submit_manager; - int open_ops; - bool blocked; + class ApplyManager { + Journal *&journal; + Finisher &finisher; - Journal *journal; - Finisher finisher; + Mutex apply_lock; + bool blocked; + Cond blocked_cond; + int open_ops; + Cond open_ops_cond; + uint64_t applied_seq; - Cond cond; - Mutex journal_lock; - Mutex com_lock; + Mutex com_lock; + map > commit_waiters; + uint64_t committing_seq, committed_seq; + list ops_submitting; + list ops_apply_blocked; - list ops_submitting; - list ops_apply_blocked; + public: + ApplyManager(Journal *&j, Finisher &f) : + journal(j), finisher(f), + apply_lock("JOS::ApplyManager::apply_lock"), + blocked(false), + open_ops(0), + applied_seq(0), + com_lock("JOS::ApplyManager::com_lock"), + committing_seq(0), committed_seq(0) {} + void add_waiter(uint64_t, Context*); + uint64_t op_apply_start(uint64_t op); + void op_apply_finish(uint64_t op); + bool commit_start(); + void commit_started(); + void commit_finish(); + bool is_committing() { + Mutex::Locker l(com_lock); + return committing_seq != committed_seq; + } + uint64_t get_committed_seq() { + Mutex::Locker l(com_lock); + return committed_seq; + } + uint64_t get_committing_seq() { + Mutex::Locker l(com_lock); + return committing_seq; + } + void init_seq(uint64_t fs_op_seq) { + { + Mutex::Locker l(com_lock); + committed_seq = fs_op_seq; + committing_seq = fs_op_seq; + } + { + Mutex::Locker l(apply_lock); + applied_seq = fs_op_seq; + } + } + } apply_manager; bool replaying; @@ -65,39 +109,22 @@ protected: void journal_stop(); int journal_replay(uint64_t fs_op_seq); - // -- - uint64_t op_submit_start(); - void op_submit_finish(uint64_t op_seq); - - uint64_t op_apply_start(uint64_t op); - uint64_t _op_apply_start(uint64_t op); - void op_apply_finish(uint64_t op); - void _op_journal_transactions(list& tls, uint64_t op, Context *onjournal, TrackedOpRef osd_op); virtual int do_transactions(list& tls, uint64_t op_seq) = 0; - bool commit_start(); - void commit_started(); // allow new ops (underlying fs should now be committing all prior ops) - void commit_finish(); - public: bool is_committing() { - Mutex::Locker l(com_lock); - return committing_seq != committed_seq; + return apply_manager.is_committing(); } uint64_t get_committed_seq() { - Mutex::Locker l(com_lock); - return committed_seq; + return apply_manager.get_committed_seq(); } public: - JournalingObjectStore() : applied_seq(0), committing_seq(0), committed_seq(0), - open_ops(0), blocked(false), - journal(NULL), finisher(g_ceph_context), - journal_lock("JournalingObjectStore::journal_lock"), - com_lock("JournalingObjectStore::com_lock"), + JournalingObjectStore() : journal(NULL), finisher(g_ceph_context), + apply_manager(journal, finisher), replaying(false) {} }; -- 2.39.5