]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
filestore: simplify op quescing
authorSage Weil <sage@inktank.com>
Fri, 7 Dec 2012 00:18:07 +0000 (16:18 -0800)
committerSage Weil <sage@inktank.com>
Sat, 8 Dec 2012 17:32:47 +0000 (09:32 -0800)
The delicate balancing with op_apply_start() and that fact that it can
block was making it very hard to determine how long commit_start() should
wait, since requests in the workqueue threads could op_apply_start() in
any order.  For example,

 threadA: gets osr1 from wq
 threadA: gets osr2 from wq
 threadA: dequeue seq 11 from osr1, op_apply_start
 threadC: commit_start on 11
 threadA: op_apply_finish on seq 11
 threadC: commit_started, commit_finish
 threadB: dequeue seq 10 from osr2
   <failed assert, badness>

Instead, rip out all this code, and use the ThreadPool pause() method to
quiesce operations.  Keep some of the (now unnecessary) fields around
for sanity checks (blocked, open_ops, max_applying_seq, etc.).

Signed-off-by: Sage Weil <sage@inktank.com>
src/os/FileStore.cc
src/os/JournalingObjectStore.cc
src/os/JournalingObjectStore.h

index 2c66a5ea7dbe0f4767b9d6b27185934f220d0de2..98ee811e5868f12146b72b61e422f847a08b7a4a 100644 (file)
@@ -3301,6 +3301,7 @@ void FileStore::sync_entry()
     fin.swap(sync_waiters);
     lock.Unlock();
     
+    op_tp.pause();
     if (apply_manager.commit_start()) {
       utime_t start = ceph_clock_now(g_ceph_context);
       uint64_t cp = apply_manager.get_committing_seq();
@@ -3352,6 +3353,7 @@ void FileStore::sync_entry()
          snaps.push_back(cp);
 
          apply_manager.commit_started();
+         op_tp.unpause();
 
          // wait for commit
          dout(20) << " waiting for transid " << async_args.transid << " to complete" << dendl;
@@ -3383,10 +3385,12 @@ void FileStore::sync_entry()
          snaps.push_back(cp);
          
          apply_manager.commit_started();
+         op_tp.unpause();
        }
       } else
       {
        apply_manager.commit_started();
+       op_tp.unpause();
 
        if (btrfs) {
          dout(15) << "sync_entry doing btrfs SYNC" << dendl;
@@ -3446,6 +3450,8 @@ void FileStore::sync_entry()
       sync_entry_timeo_lock.Lock();
       timer.cancel_event(sync_entry_timeo);
       sync_entry_timeo_lock.Unlock();
+    } else {
+      op_tp.unpause();
     }
     
     lock.Lock();
index b1aee62eca8cf379aa3caf2a154bb0e2e9c7f75d..99d34f8bc2622f4f0354a9c2e0a74e5a2650f540 100644 (file)
@@ -109,26 +109,9 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
 uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
 {
   Mutex::Locker l(apply_lock);
-  // if we ops are blocked, or there are already people (left) in
-  // line, get in line.
-  if (op > max_applying_seq &&
-      (blocked || !ops_apply_blocked.empty())) {
-    Cond cond;
-    ops_apply_blocked.push_back(&cond);
-    dout(10) << "op_apply_start " << op << " blocked (getting in back of line)" << dendl;
-    // sleep until we are not blocked AND we are at the front of line
-    while (blocked || ops_apply_blocked.front() != &cond)
-      cond.Wait(apply_lock);
-    dout(10) << "op_apply_start " << op << " woke (at front of line)" << dendl;
-    ops_apply_blocked.pop_front();
-    if (!ops_apply_blocked.empty()) {
-      dout(10) << "op_apply_start " << op << "  ...and kicking next in line" << dendl;
-      ops_apply_blocked.front()->Signal();
-    }
-  }
   dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1)
           << ", max_applying_seq " << max_applying_seq << " -> " << MAX(op, max_applying_seq) << dendl;
-
+  assert(!blocked);
   if (op > max_applying_seq)
     max_applying_seq = op;
   assert(op > committed_seq);
@@ -144,8 +127,8 @@ void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
           << ", max_applying_seq " << max_applying_seq
           << ", max_applied_seq " << max_applied_seq << " -> " << MAX(op, max_applied_seq)
           << dendl;
-  if (--open_ops == 0)
-    open_ops_cond.Signal();
+  --open_ops;
+  assert(open_ops >= 0);
 
   // there can be multiple applies in flight; track the max value we
   // note.  note that we can't _read_ this value and learn anything
@@ -194,13 +177,9 @@ bool JournalingObjectStore::ApplyManager::commit_start()
     Mutex::Locker l(apply_lock);
     dout(10) << "commit_start max_applying_seq " << max_applying_seq
             << ", max_applied_seq " << max_applied_seq
+            << ", open_ops " << open_ops
             << dendl;
     blocked = true;
-    while (open_ops > 0) {
-      dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops, "
-              << " max_applying_seq " << max_applying_seq << " max_applied_seq " << max_applied_seq << dendl;
-      open_ops_cond.Wait(apply_lock);
-    }
     assert(open_ops == 0);
     assert(max_applied_seq == max_applying_seq);
     dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
@@ -209,8 +188,6 @@ bool JournalingObjectStore::ApplyManager::commit_start()
       if (max_applied_seq == committed_seq) {
        dout(10) << "commit_start nothing to do" << dendl;
        blocked = false;
-       if (!ops_apply_blocked.empty())
-         ops_apply_blocked.front()->Signal();
        assert(commit_waiters.empty());
        goto out;
       }
@@ -235,8 +212,6 @@ void JournalingObjectStore::ApplyManager::commit_started()
   // allow new ops. (underlying fs should now be committing all prior ops)
   dout(10) << "commit_started committing " << committing_seq << ", unblocking" << dendl;
   blocked = false;
-  if (!ops_apply_blocked.empty())
-    ops_apply_blocked.front()->Signal();
 }
 
 void JournalingObjectStore::ApplyManager::commit_finish()
index ae74c32cd25f3ac7132f84cb332de191408d876c..9443740437253e256ac369e704c78f8f7468279c 100644 (file)
@@ -53,7 +53,6 @@ protected:
     bool blocked;
     Cond blocked_cond;
     int open_ops;
-    Cond open_ops_cond;
     uint64_t max_applying_seq;
     uint64_t max_applied_seq;
 
@@ -61,7 +60,6 @@ protected:
     map<version_t, vector<Context*> > commit_waiters;
     uint64_t committing_seq, committed_seq;
     list<uint64_t> ops_submitting;
-    list<Cond*> ops_apply_blocked;
 
   public:
     ApplyManager(Journal *&j, Finisher &f) :