]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
JournalingFileStore: move apply/commit sequencing to apply_manager
authorSamuel Just <sam.just@inktank.com>
Sat, 6 Oct 2012 00:33:36 +0000 (17:33 -0700)
committerSamuel Just <sam.just@inktank.com>
Tue, 30 Oct 2012 20:31:09 +0000 (13:31 -0700)
syncing the filestore requires a stable commit point (i.e., all ops
up to applied_seq must have been applied).  Previously, we used
journal_lock to atomically block new applies while waiting for
the remaining ones to finish.  This creates unnecessary contention.
We now use apply_manager to manage that state atomically with its
own lock.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/os/FileStore.cc
src/os/JournalingObjectStore.cc
src/os/JournalingObjectStore.h

index 321307e0e212e697053e6c15955a3cd06fb9efc8..3f2b2bc0e71fff19647d9fa88ccf82c22e050062 100644 (file)
@@ -2222,7 +2222,7 @@ void FileStore::queue_op(OpSequencer *osr, Op *o)
   // mark apply start _now_, because we need to drain the entire apply
   // queue during commit in order to put the store in a consistent
   // state.
-  op_apply_start(o->op);
+  apply_manager.op_apply_start(o->op);
   op_tp.lock();
 
   osr->queue(o);
@@ -2295,7 +2295,7 @@ void FileStore::_do_op(OpSequencer *osr)
 
   dout(5) << "_do_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl;
   int r = do_transactions(o->tls, o->op);
-  op_apply_finish(o->op);
+  apply_manager.op_apply_finish(o->op);
   dout(10) << "_do_op " << o << " seq " << o->op << " r = " << r
           << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
   
@@ -2409,7 +2409,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
   if (m_filestore_do_dump)
     dump_transactions(tls, op, osr);
 
-  op_apply_start(op);
+  apply_manager.op_apply_start(op);
   int r = do_transactions(tls, op);
     
   if (r >= 0) {
@@ -2427,7 +2427,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
   op_finisher.queue(onreadable, r);
 
   submit_manager.op_submit_finish(op);
-  op_apply_finish(op);
+  apply_manager.op_apply_finish(op);
 
   return r;
 }
@@ -3647,9 +3647,9 @@ void FileStore::sync_entry()
     fin.swap(sync_waiters);
     lock.Unlock();
     
-    if (commit_start()) {
+    if (apply_manager.commit_start()) {
       utime_t start = ceph_clock_now(g_ceph_context);
-      uint64_t cp = committing_seq;
+      uint64_t cp = apply_manager.get_committing_seq();
 
       sync_entry_timeo_lock.Lock();
       SyncEntryTimeout *sync_entry_timeo =
@@ -3697,7 +3697,7 @@ void FileStore::sync_entry()
 
          snaps.push_back(cp);
 
-         commit_started();
+         apply_manager.commit_started();
 
          // wait for commit
          dout(20) << " waiting for transid " << async_args.transid << " to complete" << dendl;
@@ -3728,11 +3728,11 @@ void FileStore::sync_entry()
          assert(r == 0);
          snaps.push_back(cp);
          
-         commit_started();
+         apply_manager.commit_started();
        }
       } else
       {
-       commit_started();
+       apply_manager.commit_started();
 
        if (btrfs) {
          dout(15) << "sync_entry doing btrfs SYNC" << dendl;
@@ -3764,7 +3764,7 @@ void FileStore::sync_entry()
       logger->finc(l_os_commit_lat, lat);
       logger->finc(l_os_commit_len, dur);
 
-      commit_finish();
+      apply_manager.commit_finish();
 
       logger->set(l_os_committing, 0);
 
index 79038c8ee742f78d2318cf6f93e6184585856c8d..42b95c96a58bc71c2adacd60c087b65872d5b8e2 100644 (file)
@@ -38,12 +38,8 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
     fs_op_seq = g_conf->journal_replay_from - 1;
   }
 
-  journal_lock.Lock();
   uint64_t op_seq = fs_op_seq;
-  committed_seq = fs_op_seq;
-  committing_seq = fs_op_seq;
-  applied_seq = fs_op_seq;
-  journal_lock.Unlock();
+  apply_manager.init_seq(fs_op_seq);
 
   if (!journal)
     return 0;
@@ -58,8 +54,6 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
     return err;
   }
 
-  journal_lock.Lock();
-
   replaying = true;
 
   int count = 0;
@@ -85,14 +79,11 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
       tls.push_back(t);
     }
 
-    open_ops++;
-    journal_lock.Unlock();
+    apply_manager.op_apply_start(seq);
     int r = do_transactions(tls, seq);
-    journal_lock.Lock();
-    open_ops--;
-    cond.Signal();
+    apply_manager.op_apply_finish(seq);
 
-    op_seq = applied_seq = seq;
+    op_seq = seq;
 
     while (!tls.empty()) {
       delete tls.front(); 
@@ -100,16 +91,12 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
     }
 
     dout(3) << "journal_replay: r = " << r << ", op_seq now " << op_seq << dendl;
-    assert(op_seq == seq);
-    seq++;  // we expect the next op
   }
 
   replaying = false;
 
   submit_manager.set_op_seq(op_seq);
 
-  journal_lock.Unlock();
-
   // done reading, make writeable.
   journal->make_writeable();
 
@@ -119,16 +106,9 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
 
 // ------------------------------------
 
-uint64_t JournalingObjectStore::op_apply_start(uint64_t op) 
-{
-  Mutex::Locker l(journal_lock);
-  return _op_apply_start(op);
-}
-
-uint64_t JournalingObjectStore::_op_apply_start(uint64_t op) 
+uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
 {
-  assert(journal_lock.is_locked());
-
+  Mutex::Locker l(apply_lock);
   // if we ops are blocked, or there are already people (left) in
   // line, get in line.
   if (blocked || !ops_apply_blocked.empty()) {
@@ -137,7 +117,7 @@ uint64_t JournalingObjectStore::_op_apply_start(uint64_t op)
     dout(10) << "op_apply_start " << op << " blocked (getting in back of line)" << dendl;
     // sleep until we are not blocked AND we are at the front of line
     while (blocked || ops_apply_blocked.front() != &cond)
-      cond.Wait(journal_lock);
+      cond.Wait(apply_lock);
     dout(10) << "op_apply_start " << op << " woke (at front of line)" << dendl;
     ops_apply_blocked.pop_front();
     if (!ops_apply_blocked.empty()) {
@@ -147,24 +127,24 @@ uint64_t JournalingObjectStore::_op_apply_start(uint64_t op)
   }
   dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) << dendl;
   assert(!blocked);
+
   open_ops++;
   return op;
 }
 
-void JournalingObjectStore::op_apply_finish(uint64_t op) 
+void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
 {
-  journal_lock.Lock();
-  dout(10) << "op_apply_finish " << op << " open_ops " << open_ops << " -> " << (open_ops-1) << dendl;
+  Mutex::Locker l(apply_lock);
+  dout(10) << "op_apply_finish " << op << " open_ops " << open_ops
+                                        << " -> " << (open_ops-1) << dendl;
   if (--open_ops == 0)
-    cond.Signal();
+    open_ops_cond.Signal();
 
   // there can be multiple applies in flight; track the max value we
   // note.  note that we can't _read_ this value and learn anything
   // meaningful unless/until we've quiesced all in-flight applies.
   if (op > applied_seq)
     applied_seq = op;
-
-  journal_lock.Unlock();
 }
 
 uint64_t JournalingObjectStore::SubmitManager::op_submit_start()
@@ -192,51 +172,57 @@ void JournalingObjectStore::SubmitManager::op_submit_finish(uint64_t op)
 
 // ------------------------------------------
 
-bool JournalingObjectStore::commit_start() 
+void JournalingObjectStore::ApplyManager::add_waiter(uint64_t op, Context *c)
 {
-  bool ret = false;
-
-  journal_lock.Lock();
-  dout(10) << "commit_start op_seq " << submit_manager.get_op_seq()
-                                        << ", applied_seq " << applied_seq
-                                        << ", committed_seq " << committed_seq << dendl;
-  blocked = true;
-  while (open_ops > 0) {
-    dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl;
-    cond.Wait(journal_lock);
-  }
-  dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
-  assert(open_ops == 0);
+  Mutex::Locker l(com_lock);
+  assert(c);
+  commit_waiters[op].push_back(c);
+}
 
-  if (applied_seq == committed_seq) {
-    dout(10) << "commit_start nothing to do" << dendl;
-    blocked = false;
-    if (!ops_apply_blocked.empty())
-      ops_apply_blocked.front()->Signal();
-    assert(commit_waiters.empty());
-    goto out;
-  }
+bool JournalingObjectStore::ApplyManager::commit_start()
+{
+  bool ret = false;
 
+  {
+    Mutex::Locker l(apply_lock);
+    dout(10) << "commit_start "
+            << ", applied_seq " << applied_seq << dendl;
+    blocked = true;
+    while (open_ops > 0) {
+      dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl;
+      open_ops_cond.Wait(apply_lock);
+    }
+    assert(open_ops == 0);
+
+    dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
+    {
+      Mutex::Locker l(com_lock);
+      if (applied_seq == committed_seq) {
+       dout(10) << "commit_start nothing to do" << dendl;
+       blocked = false;
+       if (!ops_apply_blocked.empty())
+         ops_apply_blocked.front()->Signal();
+       assert(commit_waiters.empty());
+       goto out;
+      }
 
-  com_lock.Lock();
-  // we can _only_ read applied_seq here because open_ops == 0 (we've
-  // quiesced all in-flight applies).
-  committing_seq = applied_seq;
-  com_lock.Unlock();
+      committing_seq = applied_seq;
 
-  dout(10) << "commit_start committing " << committing_seq << ", still blocked" << dendl;
+      dout(10) << "commit_start committing " << committing_seq
+              << ", still blocked" << dendl;
+    }
+  }
   ret = true;
 
  out:
   if (journal)
     journal->commit_start();  // tell the journal too
-  journal_lock.Unlock();
   return ret;
 }
 
-void JournalingObjectStore::commit_started() 
+void JournalingObjectStore::ApplyManager::commit_started()
 {
-  Mutex::Locker l(journal_lock);
+  Mutex::Locker l(apply_lock);
   // allow new ops. (underlying fs should now be committing all prior ops)
   dout(10) << "commit_started committing " << committing_seq << ", unblocking" << dendl;
   blocked = false;
@@ -244,21 +230,19 @@ void JournalingObjectStore::commit_started()
     ops_apply_blocked.front()->Signal();
 }
 
-void JournalingObjectStore::commit_finish()
+void JournalingObjectStore::ApplyManager::commit_finish()
 {
-  Mutex::Locker l(journal_lock);
+  Mutex::Locker l(com_lock);
   dout(10) << "commit_finish thru " << committing_seq << dendl;
   
   if (journal)
     journal->committed_thru(committing_seq);
 
-  com_lock.Lock();
   committed_seq = committing_seq;
-  com_lock.Unlock();
   
   map<version_t, vector<Context*> >::iterator p = commit_waiters.begin();
   while (p != commit_waiters.end() &&
-        p->first <= committing_seq) {
+    p->first <= committing_seq) {
     finisher.queue(p->second);
     commit_waiters.erase(p++);
   }
@@ -285,6 +269,7 @@ void JournalingObjectStore::_op_journal_transactions(
       ::encode(*t, tbl);
     }
     journal->submit_entry(op, tbl, data_align, onjournal, osd_op);
-  } else if (onjournal)
-    commit_waiters[op].push_back(onjournal);
+  } else if (onjournal) {
+    apply_manager.add_waiter(op, onjournal);
+  }
 }
index d8004c4e4cdadb1ff4c5773fb6fbd750969a712e..5bfa86b8f9f9281b3009456ffed4f5d91e470185 100644 (file)
@@ -21,9 +21,9 @@
 
 class JournalingObjectStore : public ObjectStore {
 protected:
-  uint64_t applied_seq;
-  uint64_t committing_seq, committed_seq;
-  map<version_t, vector<Context*> > commit_waiters;
+  Journal *journal;
+  Finisher finisher;
+
 
   class SubmitManager {
     Mutex lock;
@@ -38,25 +38,69 @@ protected:
     void op_submit_finish(uint64_t op);
     void set_op_seq(uint64_t seq) {
       Mutex::Locker l(lock);
-      seq = op_seq;
+      op_seq = seq;
     }
     uint64_t get_op_seq() {
       return op_seq;
     }
   } submit_manager;
 
-  int open_ops;
-  bool blocked;
+  class ApplyManager {
+    Journal *&journal;
+    Finisher &finisher;
 
-  Journal *journal;
-  Finisher finisher;
+    Mutex apply_lock;
+    bool blocked;
+    Cond blocked_cond;
+    int open_ops;
+    Cond open_ops_cond;
+    uint64_t applied_seq;
 
-  Cond cond;
-  Mutex journal_lock;
-  Mutex com_lock;
+    Mutex com_lock;
+    map<version_t, vector<Context*> > commit_waiters;
+    uint64_t committing_seq, committed_seq;
+    list<uint64_t> ops_submitting;
+    list<Cond*> ops_apply_blocked;
 
-  list<uint64_t> ops_submitting;
-  list<Cond*> ops_apply_blocked;
+  public:
+    ApplyManager(Journal *&j, Finisher &f) :
+      journal(j), finisher(f),
+      apply_lock("JOS::ApplyManager::apply_lock"),
+      blocked(false),
+      open_ops(0),
+      applied_seq(0),
+      com_lock("JOS::ApplyManager::com_lock"),
+      committing_seq(0), committed_seq(0) {}
+    void add_waiter(uint64_t, Context*);
+    uint64_t op_apply_start(uint64_t op);
+    void op_apply_finish(uint64_t op);
+    bool commit_start();
+    void commit_started();
+    void commit_finish();
+    bool is_committing() {
+      Mutex::Locker l(com_lock);
+      return committing_seq != committed_seq;
+    }
+    uint64_t get_committed_seq() {
+      Mutex::Locker l(com_lock);
+      return committed_seq;
+    }
+    uint64_t get_committing_seq() {
+      Mutex::Locker l(com_lock);
+      return committing_seq;
+    }
+    void init_seq(uint64_t fs_op_seq) {
+      {
+       Mutex::Locker l(com_lock);
+       committed_seq = fs_op_seq;
+       committing_seq = fs_op_seq;
+      }
+      {
+       Mutex::Locker l(apply_lock);
+       applied_seq = fs_op_seq;
+      }
+    }
+  } apply_manager;
 
   bool replaying;
 
@@ -65,39 +109,22 @@ protected:
   void journal_stop();
   int journal_replay(uint64_t fs_op_seq);
 
-  // --
-  uint64_t op_submit_start();
-  void op_submit_finish(uint64_t op_seq);
-
-  uint64_t op_apply_start(uint64_t op);
-  uint64_t _op_apply_start(uint64_t op);
-  void op_apply_finish(uint64_t op);
-
   void _op_journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op,
                                Context *onjournal, TrackedOpRef osd_op);
 
   virtual int do_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op_seq) = 0;
 
-  bool commit_start();
-  void commit_started();  // allow new ops (underlying fs should now be committing all prior ops)
-  void commit_finish();
-  
 public:
   bool is_committing() {
-    Mutex::Locker l(com_lock);
-    return committing_seq != committed_seq;
+    return apply_manager.is_committing();
   }
   uint64_t get_committed_seq() {
-    Mutex::Locker l(com_lock);
-    return committed_seq;
+    return apply_manager.get_committed_seq();
   }
 
 public:
-  JournalingObjectStore() : applied_seq(0), committing_seq(0), committed_seq(0),
-                           open_ops(0), blocked(false),
-                           journal(NULL), finisher(g_ceph_context),
-                           journal_lock("JournalingObjectStore::journal_lock"),
-                           com_lock("JournalingObjectStore::com_lock"),
+  JournalingObjectStore() : journal(NULL), finisher(g_ceph_context),
+                           apply_manager(journal, finisher),
                            replaying(false) {}
   
 };