]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
filestore: make OpSequencer::flush() work for writeahead journaling items
authorSage Weil <sage@newdream.net>
Fri, 17 Dec 2010 23:12:17 +0000 (15:12 -0800)
committerSage Weil <sage@newdream.net>
Fri, 17 Dec 2010 23:30:39 +0000 (15:30 -0800)
It was only waiting for items in the op_queue to complete.  The goal is
to wait for anything we've called queue_transactions(&osr,...) on. If we
do writeahead journaling, though, there might be new ops that are still
journaling but not yet submitted to the fs that are missed.

This adds a journal queue to the OpSequencer, and uses it in the writeahead
case only.

Signed-off-by: Sage Weil <sage@newdream.net>
src/os/FileStore.cc
src/os/FileStore.h

index d180f9daa666a8399c3b822f24e8eda20b7517f0..179d1757dae37c498a3a0114c6e5f4738072783c 100644 (file)
@@ -1230,7 +1230,7 @@ int FileStore::umount()
 
 /// -----------------------------
 
-void FileStore::queue_op(Sequencer *posr, uint64_t op_seq, list<Transaction*>& tls,
+void FileStore::queue_op(OpSequencer *osr, uint64_t op_seq, list<Transaction*>& tls,
                         Context *onreadable, Context *onreadable_sync)
 {
   uint64_t bytes = 0, ops = 0;
@@ -1260,18 +1260,6 @@ void FileStore::queue_op(Sequencer *posr, uint64_t op_seq, list<Transaction*>& t
 
   op_tp.lock();
 
-  OpSequencer *osr;
-  if (!posr)
-    posr = &default_osr;
-  if (posr->p) {
-    osr = (OpSequencer *)posr->p;
-    dout(10) << "queue_op existing osr " << osr << "/" << osr->parent << dendl; //<< " w/ q " << osr->q << dendl;
-  } else {
-    osr = new OpSequencer;
-    osr->parent = posr;
-    posr->p = osr;
-    dout(10) << "queue_op new osr " << osr << "/" << osr->parent << dendl;
-  }
   osr->queue(o);
 
   op_queue_len++;
@@ -1361,13 +1349,13 @@ void FileStore::_finish_op(OpSequencer *osr)
 
 struct C_JournaledAhead : public Context {
   FileStore *fs;
-  ObjectStore::Sequencer *osr;
+  FileStore::OpSequencer *osr;
   uint64_t op;
   list<ObjectStore::Transaction*> tls;
   Context *onreadable, *onreadable_sync;
   Context *ondisk;
 
-  C_JournaledAhead(FileStore *f, ObjectStore::Sequencer *os, uint64_t o, list<ObjectStore::Transaction*>& t,
+  C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, uint64_t o, list<ObjectStore::Transaction*>& t,
                   Context *onr, Context *ond, Context *onrsync) :
     fs(f), osr(os), op(o), tls(t), onreadable(onr), onreadable_sync(onrsync), ondisk(ond) { }
   void finish(int r) {
@@ -1382,10 +1370,24 @@ int FileStore::queue_transaction(Sequencer *osr, Transaction *t)
   return queue_transactions(osr, tls, new C_DeleteTransaction(t));
 }
 
-int FileStore::queue_transactions(Sequencer *osr, list<Transaction*> &tls,
+int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
                                  Context *onreadable, Context *ondisk,
                                  Context *onreadable_sync)
 {
+  // set up the sequencer
+  OpSequencer *osr;
+  if (!posr)
+    posr = &default_osr;
+  if (posr->p) {
+    osr = (OpSequencer *)posr->p;
+    dout(10) << "queue_transactions existing osr " << osr << "/" << osr->parent << dendl; //<< " w/ q " << osr->q << dendl;
+  } else {
+    osr = new OpSequencer;
+    osr->parent = posr;
+    posr->p = osr;
+    dout(10) << "queue_transactions new osr " << osr << "/" << osr->parent << dendl;
+  }
+
   if (journal && journal->is_writeable()) {
     if (g_conf.filestore_journal_parallel) {
 
@@ -1414,6 +1416,7 @@ int FileStore::queue_transactions(Sequencer *osr, list<Transaction*> &tls,
 
       uint64_t op = op_submit_start();
       dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
+      osr->queue_journal(op);
       _op_journal_transactions(tls, op,
                               new C_JournaledAhead(this, osr, op, tls, onreadable, ondisk, onreadable_sync));
       op_submit_finish(op);
@@ -1447,7 +1450,7 @@ int FileStore::queue_transactions(Sequencer *osr, list<Transaction*> &tls,
   return r;
 }
 
-void FileStore::_journaled_ahead(Sequencer *osr, uint64_t op,
+void FileStore::_journaled_ahead(OpSequencer *osr, uint64_t op,
                                 list<Transaction*> &tls,
                                 Context *onreadable, Context *ondisk,
                                 Context *onreadable_sync)
@@ -1456,6 +1459,8 @@ void FileStore::_journaled_ahead(Sequencer *osr, uint64_t op,
 
   op_queue_throttle();
 
+  osr->dequeue_journal();
+
   // this should queue in order because the journal does it's completions in order.
   journal_lock.Lock();
   queue_op(osr, op, tls, onreadable, onreadable_sync);
index 2c062bee146dda9effe191303c02b0823ca6575c..536c7466affe18136f6f79156051c9c174c7f42a 100644 (file)
@@ -101,11 +101,21 @@ class FileStore : public JournalingObjectStore {
   class OpSequencer : public Sequencer_impl {
     Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock)
     list<Op*> q;
+    list<uint64_t> jq;
     Cond cond;
   public:
     Sequencer *parent;
     Mutex apply_lock;  // for apply mutual exclusion
     
+    void queue_journal(uint64_t s) {
+      Mutex::Locker l(qlock);
+      jq.push_back(s);
+    }
+    void dequeue_journal() {
+      Mutex::Locker l(qlock);
+      jq.pop_front();
+      cond.Signal();
+    }
     void queue(Op *o) {
       Mutex::Locker l(qlock);
       q.push_back(o);
@@ -124,9 +134,18 @@ class FileStore : public JournalingObjectStore {
     }
     void flush() {
       Mutex::Locker l(qlock);
-      if (!q.empty()) {
-       uint64_t seq = q.back()->op;
-       while (!q.empty() && q.front()->op <= seq)
+
+      // get max for journal _or_ op queues
+      uint64_t seq = 0;
+      if (!q.empty())
+       seq = q.back()->op;
+      if (!jq.empty() && jq.back() > seq)
+       seq = jq.back();
+
+      if (seq) {
+       // everything prior to our watermark to drain through either/both queues
+       while ((!q.empty() && q.front()->op <= seq) ||
+              (!jq.empty() && jq.front() <= seq))
          cond.Wait(qlock);
       }
     }
@@ -180,9 +199,9 @@ class FileStore : public JournalingObjectStore {
 
   void _do_op(OpSequencer *o);
   void _finish_op(OpSequencer *o);
-  void queue_op(Sequencer *osr, uint64_t op, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync);
+  void queue_op(OpSequencer *osr, uint64_t op, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync);
   void op_queue_throttle();
-  void _journaled_ahead(Sequencer *osr, uint64_t op, list<Transaction*> &tls,
+  void _journaled_ahead(OpSequencer *osr, uint64_t op, list<Transaction*> &tls,
                        Context *onreadable, Context *ondisk, Context *onreadable_sync);
   friend class C_JournaledAhead;