]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
filestore: refactor op_queue/journal locking
authorSage Weil <sage@newdream.net>
Tue, 30 Nov 2010 15:51:16 +0000 (07:51 -0800)
committerSage Weil <sage@newdream.net>
Tue, 30 Nov 2010 15:51:16 +0000 (07:51 -0800)
- Combine journal_lock and lock.
- Move throttling outside of the lock (this fixes potential deadlock in
  parallel journal mode)
- Make interface nomenclature a bit more helpful

Signed-off-by: Sage Weil <sage@newdream.net>
src/os/FileStore.cc
src/os/JournalingObjectStore.cc
src/os/JournalingObjectStore.h

index bda39317f1f53dd227fea059b65acfd1f47ce479..bb31232b9021e0143a9f91bba3a194fd97690dae 100644 (file)
@@ -1237,7 +1237,7 @@ void FileStore::queue_op(Sequencer *posr, uint64_t op_seq, list<Transaction*>& t
   // mark apply start _now_, because we need to drain the entire apply
   // queue during commit in order to put the store in a consistent
   // state.
-  op_apply_start(op_seq);
+  _op_apply_start(op_seq);
 
   Op *o = new Op;
   o->op = op_seq;
@@ -1393,15 +1393,15 @@ int FileStore::queue_transactions(Sequencer *osr, list<Transaction*> &tls,
       journal->throttle();   // make sure we're not ahead of the jouranl
       op_queue_throttle();   // make sure the journal isn't getting ahead of our op queue.
 
-      uint64_t op = op_journal_start(0);
+      uint64_t op = op_submit_start();
       dout(10) << "queue_transactions (parallel) " << op << " " << tls << dendl;
       
-      journal_transactions(tls, op, ondisk);
+      _op_journal_transactions(tls, op, ondisk);
       
       // queue inside journal lock, to preserve ordering
       queue_op(osr, op, tls, onreadable, onreadable_sync);
       
-      op_journal_finish();
+      op_submit_finish();
       return 0;
     }
     else if (g_conf.filestore_journal_writeahead) {
@@ -1409,11 +1409,11 @@ int FileStore::queue_transactions(Sequencer *osr, list<Transaction*> &tls,
       journal->throttle();   // make sure we're not ahead of the journal
       op_queue_throttle();   // make sure the journal isn't getting ahead of our op queue.
 
-      uint64_t op = op_journal_start(0);
+      uint64_t op = op_submit_start();
       dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
-      journal_transactions(tls, op,
-                          new C_JournaledAhead(this, osr, op, tls, onreadable, ondisk, onreadable_sync));
-      op_journal_finish();
+      _op_journal_transactions(tls, op,
+                              new C_JournaledAhead(this, osr, op, tls, onreadable, ondisk, onreadable_sync));
+      op_submit_finish();
       return 0;
     }
   }
@@ -1424,9 +1424,7 @@ int FileStore::queue_transactions(Sequencer *osr, list<Transaction*> &tls,
   op_apply_finish(op_seq);
     
   if (r >= 0) {
-    op_journal_start(op_seq);
-    journal_transactions(tls, op_seq, ondisk);
-    op_journal_finish();
+    op_journal_transactions(tls, op_seq, ondisk);
   } else {
     delete ondisk;
   }
@@ -1452,7 +1450,9 @@ void FileStore::_journaled_ahead(Sequencer *osr, uint64_t op,
   op_queue_throttle();
 
   // this should queue in order because the journal does it's completions in order.
+  journal_lock.Lock();
   queue_op(osr, op, tls, onreadable, onreadable_sync);
+  journal_lock.Unlock();
 
   // do ondisk completions async, to prevent any onreadable_sync completions
   // getting blocked behind an ondisk completion.
index e152c08496984d7fc03449a5167de9ffa9da76d6..3f8a7d9e4feeb286651dd1b2e0780fffc63e0ac8 100644 (file)
@@ -93,10 +93,16 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
 
 uint64_t JournalingObjectStore::op_apply_start(uint64_t op) 
 {
-  lock.Lock();
+  Mutex::Locker l(journal_lock);
+  return _op_apply_start(op);
+}
+
+uint64_t JournalingObjectStore::_op_apply_start(uint64_t op) 
+{
+  assert(journal_lock.is_locked());
   while (blocked) {
     dout(10) << "op_apply_start blocked" << dendl;
-    cond.Wait(lock);
+    cond.Wait(journal_lock);
   }
   open_ops++;
 
@@ -104,14 +110,13 @@ uint64_t JournalingObjectStore::op_apply_start(uint64_t op)
     op = ++op_seq;
   dout(10) << "op_apply_start " << op << dendl;
 
-  lock.Unlock();
   return op;
 }
 
 void JournalingObjectStore::op_apply_finish(uint64_t op) 
 {
   dout(10) << "op_apply_finish" << dendl;
-  lock.Lock();
+  journal_lock.Lock();
   if (--open_ops == 0)
     cond.Signal();
 
@@ -121,21 +126,16 @@ void JournalingObjectStore::op_apply_finish(uint64_t op)
   if (op > applied_seq)
     applied_seq = op;
 
-  lock.Unlock();
+  journal_lock.Unlock();
 }
 
-uint64_t JournalingObjectStore::op_journal_start(uint64_t op)
+uint64_t JournalingObjectStore::op_submit_start()
 {
   journal_lock.Lock();
-  if (!op) {
-    lock.Lock();
-    op = ++op_seq;
-    lock.Unlock();
-  }
-  return op;
+  return ++op_seq;
 }
 
-void JournalingObjectStore::op_journal_finish()
+void JournalingObjectStore::op_submit_finish()
 {
   journal_lock.Unlock();
 }
@@ -145,16 +145,16 @@ void JournalingObjectStore::op_journal_finish()
 
 bool JournalingObjectStore::commit_start() 
 {
-  // suspend new ops...
-  Mutex::Locker l(lock);
+  bool ret = false;
 
+  journal_lock.Lock();
   dout(10) << "commit_start op_seq " << op_seq
           << ", applied_seq " << applied_seq
           << ", committed_seq " << committed_seq << dendl;
   blocked = true;
   while (open_ops > 0) {
     dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl;
-    cond.Wait(lock);
+    cond.Wait(journal_lock);
   }
   
   if (applied_seq == committed_seq) {
@@ -162,7 +162,7 @@ bool JournalingObjectStore::commit_start()
     blocked = false;
     cond.Signal();
     assert(commit_waiters.empty());
-    return false;
+    goto out;
   }
 
   // we can _only_ read applied_seq here because open_ops == 0 (we've
@@ -170,12 +170,16 @@ bool JournalingObjectStore::commit_start()
   committing_seq = applied_seq;
 
   dout(10) << "commit_start committing " << committing_seq << ", still blocked" << dendl;
-  return true;
+  ret = true;
+
+ out:
+  journal_lock.Unlock();
+  return ret;
 }
 
 void JournalingObjectStore::commit_started() 
 {
-  Mutex::Locker l(lock);
+  Mutex::Locker l(journal_lock);
   // allow new ops. (underlying fs should now be committing all prior ops)
   dout(10) << "commit_started committing " << committing_seq << ", unblocking" << dendl;
   blocked = false;
@@ -184,7 +188,7 @@ void JournalingObjectStore::commit_started()
 
 void JournalingObjectStore::commit_finish()
 {
-  Mutex::Locker l(lock);
+  Mutex::Locker l(journal_lock);
   dout(10) << "commit_finish thru " << committing_seq << dendl;
   
   if (journal)
@@ -199,29 +203,18 @@ void JournalingObjectStore::commit_finish()
   }
 }
 
-void JournalingObjectStore::journal_transaction(ObjectStore::Transaction& t, uint64_t op,
-                                               Context *onjournal)
+void JournalingObjectStore::op_journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op,
+                                                   Context *onjournal)
 {
-  Mutex::Locker l(lock);
-  dout(10) << "journal_transaction " << op << dendl;
-  if (journal && journal->is_writeable()) {
-    bufferlist tbl;
-    t.encode(tbl);
-
-    int alignment = -1;
-    if ((int)t.get_data_length() >= g_conf.journal_align_min_size)
-      alignment = t.get_data_alignment();
-
-    journal->submit_entry(op, tbl, alignment, onjournal);
-  } else if (onjournal)
-    commit_waiters[op].push_back(onjournal);
+  Mutex::Locker l(journal_lock);
+  _op_journal_transactions(tls, op, onjournal);
 }
 
-void JournalingObjectStore::journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op,
-                                                Context *onjournal)
+void JournalingObjectStore::_op_journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op,
+                                                    Context *onjournal)
 {
-  Mutex::Locker l(lock);
-  dout(10) << "journal_transactions " << op << dendl;
+  assert(journal_lock.is_locked());
+  dout(10) << "op_journal_transactions " << op << dendl;
     
   if (journal && journal->is_writeable()) {
     bufferlist tbl;
index 6fba821964dfeae500393f81a17adf744ccfefe6..bca6d48eb59e9b5705845b3e0d1bd42281df9111 100644 (file)
@@ -33,7 +33,6 @@ protected:
 
   Cond cond;
   Mutex journal_lock;
-  Mutex lock;
 
 protected:
   void journal_start();
@@ -41,13 +40,15 @@ protected:
   int journal_replay(uint64_t fs_op_seq);
 
   // --
+  uint64_t op_submit_start();
+  void op_submit_finish();
+
   uint64_t op_apply_start(uint64_t op);
+  uint64_t _op_apply_start(uint64_t op);
   void op_apply_finish(uint64_t op);
-  uint64_t op_journal_start(uint64_t op);
-  void op_journal_finish();
 
-  void journal_transaction(ObjectStore::Transaction& t, uint64_t op, Context *onjournal);
-  void journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op, Context *onjournal);
+  void op_journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op, Context *onjournal);
+  void _op_journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op, Context *onjournal);
 
   bool commit_start();
   void commit_started();  // allow new ops (underlying fs should now be committing all prior ops)
@@ -59,8 +60,7 @@ public:
                            applied_seq(0), committing_seq(0), committed_seq(0), 
                            open_ops(0), blocked(false),
                            journal(NULL),
-                           journal_lock("JournalingObjectStore::journal_lock"),
-                           lock("JournalingObjectStore::lock") { }
+                           journal_lock("JournalingObjectStore::journal_lock") { }
   
 };