From: Sage Weil Date: Fri, 7 Dec 2012 00:18:07 +0000 (-0800) Subject: filestore: simplify op quescing X-Git-Tag: v0.56~84^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d9dce4e9273adb4279519d65a0d8bfdfecb5c516;p=ceph.git filestore: simplify op quescing The delicate balancing with op_apply_start() and that fact that it can block was making it very hard to determine how long commit_start() should wait, since requests in the workqueue threads could op_apply_start() in any order. For example, threadA: gets osr1 from wq threadA: gets osr2 from wq threadA: dequeue seq 11 from osr1, op_apply_start threadC: commit_start on 11 threadA: op_apply_finish on seq 11 threadC: commit_started, commit_finish threadB: dequeue seq 10 from osr2 Instead, rip out all this code, and use the ThreadPool pause() method to quiesce operations. Keep some of the (now unnecessary) fields around for sanity checks (blocked, open_ops, max_applying_seq, etc.). Signed-off-by: Sage Weil --- diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 2c66a5ea7dbe..98ee811e5868 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -3301,6 +3301,7 @@ void FileStore::sync_entry() fin.swap(sync_waiters); lock.Unlock(); + op_tp.pause(); if (apply_manager.commit_start()) { utime_t start = ceph_clock_now(g_ceph_context); uint64_t cp = apply_manager.get_committing_seq(); @@ -3352,6 +3353,7 @@ void FileStore::sync_entry() snaps.push_back(cp); apply_manager.commit_started(); + op_tp.unpause(); // wait for commit dout(20) << " waiting for transid " << async_args.transid << " to complete" << dendl; @@ -3383,10 +3385,12 @@ void FileStore::sync_entry() snaps.push_back(cp); apply_manager.commit_started(); + op_tp.unpause(); } } else { apply_manager.commit_started(); + op_tp.unpause(); if (btrfs) { dout(15) << "sync_entry doing btrfs SYNC" << dendl; @@ -3446,6 +3450,8 @@ void FileStore::sync_entry() sync_entry_timeo_lock.Lock(); timer.cancel_event(sync_entry_timeo); sync_entry_timeo_lock.Unlock(); + } else { + op_tp.unpause(); } lock.Lock(); diff --git a/src/os/JournalingObjectStore.cc b/src/os/JournalingObjectStore.cc index b1aee62eca8c..99d34f8bc262 100644 --- a/src/os/JournalingObjectStore.cc +++ b/src/os/JournalingObjectStore.cc @@ -109,26 +109,9 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq) uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op) { Mutex::Locker l(apply_lock); - // if we ops are blocked, or there are already people (left) in - // line, get in line. - if (op > max_applying_seq && - (blocked || !ops_apply_blocked.empty())) { - Cond cond; - ops_apply_blocked.push_back(&cond); - dout(10) << "op_apply_start " << op << " blocked (getting in back of line)" << dendl; - // sleep until we are not blocked AND we are at the front of line - while (blocked || ops_apply_blocked.front() != &cond) - cond.Wait(apply_lock); - dout(10) << "op_apply_start " << op << " woke (at front of line)" << dendl; - ops_apply_blocked.pop_front(); - if (!ops_apply_blocked.empty()) { - dout(10) << "op_apply_start " << op << " ...and kicking next in line" << dendl; - ops_apply_blocked.front()->Signal(); - } - } dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) << ", max_applying_seq " << max_applying_seq << " -> " << MAX(op, max_applying_seq) << dendl; - + assert(!blocked); if (op > max_applying_seq) max_applying_seq = op; assert(op > committed_seq); @@ -144,8 +127,8 @@ void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op) << ", max_applying_seq " << max_applying_seq << ", max_applied_seq " << max_applied_seq << " -> " << MAX(op, max_applied_seq) << dendl; - if (--open_ops == 0) - open_ops_cond.Signal(); + --open_ops; + assert(open_ops >= 0); // there can be multiple applies in flight; track the max value we // note. note that we can't _read_ this value and learn anything @@ -194,13 +177,9 @@ bool JournalingObjectStore::ApplyManager::commit_start() Mutex::Locker l(apply_lock); dout(10) << "commit_start max_applying_seq " << max_applying_seq << ", max_applied_seq " << max_applied_seq + << ", open_ops " << open_ops << dendl; blocked = true; - while (open_ops > 0) { - dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops, " - << " max_applying_seq " << max_applying_seq << " max_applied_seq " << max_applied_seq << dendl; - open_ops_cond.Wait(apply_lock); - } assert(open_ops == 0); assert(max_applied_seq == max_applying_seq); dout(10) << "commit_start blocked, all open_ops have completed" << dendl; @@ -209,8 +188,6 @@ bool JournalingObjectStore::ApplyManager::commit_start() if (max_applied_seq == committed_seq) { dout(10) << "commit_start nothing to do" << dendl; blocked = false; - if (!ops_apply_blocked.empty()) - ops_apply_blocked.front()->Signal(); assert(commit_waiters.empty()); goto out; } @@ -235,8 +212,6 @@ void JournalingObjectStore::ApplyManager::commit_started() // allow new ops. (underlying fs should now be committing all prior ops) dout(10) << "commit_started committing " << committing_seq << ", unblocking" << dendl; blocked = false; - if (!ops_apply_blocked.empty()) - ops_apply_blocked.front()->Signal(); } void JournalingObjectStore::ApplyManager::commit_finish() diff --git a/src/os/JournalingObjectStore.h b/src/os/JournalingObjectStore.h index ae74c32cd25f..944374043725 100644 --- a/src/os/JournalingObjectStore.h +++ b/src/os/JournalingObjectStore.h @@ -53,7 +53,6 @@ protected: bool blocked; Cond blocked_cond; int open_ops; - Cond open_ops_cond; uint64_t max_applying_seq; uint64_t max_applied_seq; @@ -61,7 +60,6 @@ protected: map > commit_waiters; uint64_t committing_seq, committed_seq; list ops_submitting; - list ops_apply_blocked; public: ApplyManager(Journal *&j, Finisher &f) :