]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
JournalingFileStore: create submit_manager to order op submission
authorSamuel Just <sam.just@inktank.com>
Fri, 5 Oct 2012 20:46:13 +0000 (13:46 -0700)
committerSamuel Just <sam.just@inktank.com>
Tue, 30 Oct 2012 20:31:09 +0000 (13:31 -0700)
Previously, we ensured op ordering by queueing for journal and
the op queue under the journal lock.  All that is required is
that obtaining an op sequence, queueing for journal, and
(for parallel) queueing for application to the fs are done
atomically.  To that end, submit_manager now handles op submission.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/os/FileStore.cc
src/os/JournalingObjectStore.cc
src/os/JournalingObjectStore.h

index b1ba601590b81cf96ed7032186d3be4a371e822c..321307e0e212e697053e6c15955a3cd06fb9efc8 100644 (file)
@@ -2219,12 +2219,10 @@ FileStore::Op *FileStore::build_op(list<Transaction*>& tls,
 
 void FileStore::queue_op(OpSequencer *osr, Op *o)
 {
-  assert(journal_lock.is_locked());
-
   // mark apply start _now_, because we need to drain the entire apply
   // queue during commit in order to put the store in a consistent
   // state.
-  _op_apply_start(o->op);
+  op_apply_start(o->op);
   op_tp.lock();
 
   osr->queue(o);
@@ -2377,7 +2375,8 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
     Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
     op_queue_reserve_throttle(o);
     journal->throttle();
-    o->op = op_submit_start();
+    uint64_t op_num = submit_manager.op_submit_start();
+    o->op = op_num;
 
     if (m_filestore_do_dump)
       dump_transactions(o->tls, o->op, osr);
@@ -2387,7 +2386,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
       
       _op_journal_transactions(o->tls, o->op, ondisk, osd_op);
       
-      // queue inside journal lock, to preserve ordering
+      // queue inside submit_manager op submission lock
       queue_op(osr, o);
     } else if (m_filestore_journal_writeahead) {
       dout(5) << "queue_transactions (writeahead) " << o->op << " " << o->tls << dendl;
@@ -2400,17 +2399,17 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
     } else {
       assert(0);
     }
-    op_submit_finish(o->op);
+    submit_manager.op_submit_finish(op_num);
     return 0;
   }
 
-  uint64_t op = op_submit_start();
+  uint64_t op = submit_manager.op_submit_start();
   dout(5) << "queue_transactions (trailing journal) " << op << " " << tls << dendl;
 
   if (m_filestore_do_dump)
     dump_transactions(tls, op, osr);
 
-  _op_apply_start(op);
+  op_apply_start(op);
   int r = do_transactions(tls, op);
     
   if (r >= 0) {
@@ -2427,7 +2426,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
   }
   op_finisher.queue(onreadable, r);
 
-  op_submit_finish(op);
+  submit_manager.op_submit_finish(op);
   op_apply_finish(op);
 
   return r;
@@ -2438,9 +2437,7 @@ void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
   dout(5) << "_journaled_ahead " << o << " seq " << o->op << " " << *osr << " " << o->tls << dendl;
 
   // this should queue in order because the journal does it's completions in order.
-  journal_lock.Lock();
   queue_op(osr, o);
-  journal_lock.Unlock();
 
   osr->dequeue_journal();
 
index d42a96008cfec6f156a3d432090cb61647ab430b..79038c8ee742f78d2318cf6f93e6184585856c8d 100644 (file)
@@ -39,7 +39,7 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
   }
 
   journal_lock.Lock();
-  op_seq = fs_op_seq;
+  uint64_t op_seq = fs_op_seq;
   committed_seq = fs_op_seq;
   committing_seq = fs_op_seq;
   applied_seq = fs_op_seq;
@@ -106,6 +106,8 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
 
   replaying = false;
 
+  submit_manager.set_op_seq(op_seq);
+
   journal_lock.Unlock();
 
   // done reading, make writeable.
@@ -165,25 +167,26 @@ void JournalingObjectStore::op_apply_finish(uint64_t op)
   journal_lock.Unlock();
 }
 
-uint64_t JournalingObjectStore::op_submit_start()
+uint64_t JournalingObjectStore::SubmitManager::op_submit_start()
 {
-  journal_lock.Lock();
+  lock.Lock();
   uint64_t op = ++op_seq;
   dout(10) << "op_submit_start " << op << dendl;
   ops_submitting.push_back(op);
   return op;
 }
 
-void JournalingObjectStore::op_submit_finish(uint64_t op)
+void JournalingObjectStore::SubmitManager::op_submit_finish(uint64_t op)
 {
   dout(10) << "op_submit_finish " << op << dendl;
   if (op != ops_submitting.front()) {
-    dout(0) << "op_submit_finish " << op << " expected " << ops_submitting.front()
+    dout(0) << "op_submit_finish " << op << " expected "
+           << ops_submitting.front()
            << ", OUT OF ORDER" << dendl;
     assert(0 == "out of order op_submit_finish");
   }
   ops_submitting.pop_front();
-  journal_lock.Unlock();
+  lock.Unlock();
 }
 
 
@@ -194,9 +197,9 @@ bool JournalingObjectStore::commit_start()
   bool ret = false;
 
   journal_lock.Lock();
-  dout(10) << "commit_start op_seq " << op_seq
-          << ", applied_seq " << applied_seq
-          << ", committed_seq " << committed_seq << dendl;
+  dout(10) << "commit_start op_seq " << submit_manager.get_op_seq()
+                                        << ", applied_seq " << applied_seq
+                                        << ", committed_seq " << committed_seq << dendl;
   blocked = true;
   while (open_ops > 0) {
     dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl;
@@ -265,7 +268,6 @@ void JournalingObjectStore::_op_journal_transactions(
   list<ObjectStore::Transaction*>& tls, uint64_t op,
   Context *onjournal, TrackedOpRef osd_op)
 {
-  assert(journal_lock.is_locked());
   dout(10) << "op_journal_transactions " << op << " " << tls << dendl;
 
   if (journal && journal->is_writeable()) {
index 54a34da1932c528a2175234a58c322e5ca543a2f..d8004c4e4cdadb1ff4c5773fb6fbd750969a712e 100644 (file)
 
 class JournalingObjectStore : public ObjectStore {
 protected:
-  uint64_t op_seq, applied_seq;
+  uint64_t applied_seq;
   uint64_t committing_seq, committed_seq;
   map<version_t, vector<Context*> > commit_waiters;
 
+  class SubmitManager {
+    Mutex lock;
+    uint64_t op_seq;
+    list<uint64_t> ops_submitting;
+  public:
+    SubmitManager() :
+      lock("JOS::SubmitManager::lock"),
+      op_seq(0)
+    {}
+    uint64_t op_submit_start();
+    void op_submit_finish(uint64_t op);
+    void set_op_seq(uint64_t seq) {
+      Mutex::Locker l(lock);
+      seq = op_seq;
+    }
+    uint64_t get_op_seq() {
+      return op_seq;
+    }
+  } submit_manager;
+
   int open_ops;
   bool blocked;
 
@@ -73,8 +93,7 @@ public:
   }
 
 public:
-  JournalingObjectStore() : op_seq(0), 
-                           applied_seq(0), committing_seq(0), committed_seq(0), 
+  JournalingObjectStore() : applied_seq(0), committing_seq(0), committed_seq(0),
                            open_ops(0), blocked(false),
                            journal(NULL), finisher(g_ceph_context),
                            journal_lock("JournalingObjectStore::journal_lock"),