]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/JournalingObjectStore: un-break op quiescing during journal replay
authorSage Weil <sage@inktank.com>
Wed, 12 Dec 2012 01:15:56 +0000 (17:15 -0800)
committerSage Weil <sage@inktank.com>
Wed, 12 Dec 2012 01:15:56 +0000 (17:15 -0800)
Commit d9dce4e9273adb4279519d65a0d8bfdfecb5c516 broke journal replay
because the commit thread may try to do a commit, and the ops are not
being applied via the normal work queue.  Add back in a simpler form of the
old op quiescing (simpler because there is a single thread doing the
replay).

Signed-off-by: Sage Weil <sage@inktank.com>
Reviewed-by: Samuel Just <sam.just@inktank.com>
src/os/JournalingObjectStore.cc

index b91b6dc207c4d4a2c9e24ff68e5d17f5126c31fe..971fd15b8242308ba43d15c17f1150e1b1855587 100644 (file)
@@ -109,6 +109,11 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
 uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
 {
   Mutex::Locker l(apply_lock);
+  while (blocked) {
+    // note: this only happens during journal replay
+    dout(10) << "op_apply_start blocked, waiting" << dendl;
+    blocked_cond.Wait(apply_lock);
+  }
   dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) << dendl;
   assert(!blocked);
   assert(op > committed_seq);
@@ -126,6 +131,11 @@ void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
   --open_ops;
   assert(open_ops >= 0);
 
+  // signal a blocked commit_start (only needed during journal replay)
+  if (blocked) {
+    blocked_cond.Signal();
+  }
+
   // there can be multiple applies in flight; track the max value we
   // note.  note that we can't _read_ this value and learn anything
   // meaningful unless/until we've quiesced all in-flight applies.
@@ -173,6 +183,10 @@ bool JournalingObjectStore::ApplyManager::commit_start()
             << ", open_ops " << open_ops
             << dendl;
     blocked = true;
+    while (open_ops > 0) {
+      dout(10) << "commit_start waiting for " << open_ops << " open ops to drain" << dendl;
+      blocked_cond.Wait(apply_lock);
+    }
     assert(open_ops == 0);
     dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
     {
@@ -204,6 +218,7 @@ void JournalingObjectStore::ApplyManager::commit_started()
   // allow new ops. (underlying fs should now be committing all prior ops)
   dout(10) << "commit_started committing " << committing_seq << ", unblocking" << dendl;
   blocked = false;
+  blocked_cond.Signal();
 }
 
 void JournalingObjectStore::ApplyManager::commit_finish()