]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
filestore: add writeahead journal support
authorSage Weil <sage@newdream.net>
Thu, 7 Jan 2010 22:35:28 +0000 (14:35 -0800)
committerSage Weil <sage@newdream.net>
Mon, 25 Jan 2010 21:59:56 +0000 (13:59 -0800)
src/os/FileStore.cc
src/os/FileStore.h

index 411528910c05fd4db957c91e5af9b6020dcbc900..6c19e6fa5bb1525c8eb0a40d5f3ea18faed4093a 100644 (file)
@@ -437,6 +437,8 @@ int FileStore::mount()
   flusher_thread.create();
   op_finisher.start();
 
+  if (journal && g_conf.filestore_journal_writeahead)
+    journal->set_wait_on_full(true);
 
   // is this btrfs?
   Transaction empty;
@@ -573,23 +575,79 @@ void FileStore::op_entry()
   op_lock.Unlock();
 }
 
+struct C_JournaledAhead : public Context {
+  FileStore *fs;
+  __u64 op;
+  list<ObjectStore::Transaction*> tls;
+  Context *onreadable;
+  Context *onjournal;
+  Context *ondisk;
+
+  C_JournaledAhead(FileStore *f, __u64 o, list<ObjectStore::Transaction*>& t,
+             Context *onr, Context *onj, Context *ond) :
+    fs(f), op(o), tls(t), onreadable(onr), onjournal(onj), ondisk(ond) { }
+  void finish(int r) {
+    fs->_journaled_ahead(op, tls, onreadable, onjournal, ondisk);
+  }
+};
 
 int FileStore::queue_transactions(list<Transaction*> &tls,
                                  Context *onreadable,
                                  Context *onjournal,
                                  Context *ondisk)
 {
-  __u64 op;
+  if (journal && journal->is_writeable()) {
+    if (g_conf.filestore_journal_parallel) {
+      __u64 op = op_journal_start(0);
+      dout(10) << "queue_transactions (parallel) " << op << " " << tls << dendl;
+      
+      journal_transactions(tls, op, onjournal);
+      
+      // queue inside journal lock, to preserve ordering
+      queue_op(op, tls, onreadable, ondisk);
+      
+      op_journal_finish();
+      return 0;
+    }
+    else if (g_conf.filestore_journal_writeahead) {
+      __u64 op = op_journal_start(0);
+      dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
+      journal_transactions(tls, op, new C_JournaledAhead(this, op, tls, onreadable, onjournal, ondisk));
+      op_journal_finish();
+      return 0;
+    }
+  }
 
-  op = op_journal_start(0);
-  journal_transactions(tls, op, onjournal);
+  __u64 op_seq = op_apply_start(0, ondisk);
+  dout(10) << "queue_transactions (trailing journal) " << op_seq << " " << tls << dendl;
+  int r = do_transactions(tls, op_seq);
+  op_apply_finish();
+  op_finisher.queue(onreadable, r);
+    
+  if (r >= 0) {
+    op_journal_start(op_seq);
+    journal_transactions(tls, op_seq, onjournal);
+    op_journal_finish();
+  } else {
+    delete onjournal;
+    delete ondisk;
+  }
+  return r;
+}
 
-  // queue inside journal lock, to preserve ordering
+void FileStore::_journaled_ahead(__u64 op,
+                                list<Transaction*> &tls,
+                                Context *onreadable,
+                                Context *onjournal,
+                                Context *ondisk)
+{
+  dout(10) << "_journaled_ahead " << op << " " << tls << dendl;
+  // this should queue in order because the journal does it's completions in order.
   queue_op(op, tls, onreadable, ondisk);
-
-  op_journal_finish();
-
-  return 0;
+  if (onjournal) {
+    onjournal->finish(0);
+    delete onjournal;
+  }
 }
 
 int FileStore::do_transactions(list<Transaction*> &tls, __u64 op_seq)
@@ -638,7 +696,8 @@ unsigned FileStore::apply_transactions(list<Transaction*> &tls,
 {
   int r = 0;
 
-  if (g_conf.filestore_journal_parallel) {
+  if (journal && journal->is_writeable() &&
+      (g_conf.filestore_journal_parallel || g_conf.filestore_journal_writeahead)) {
     // use op pool
     Cond my_cond;
     Mutex my_lock("FileStore::apply_transaction::my_lock");
index 48bc9d85175dce2deff0ef7790de3c3d116ae1d9..dd876a15a14ebb6617ca2ce43889cdfae71f4355 100644 (file)
@@ -101,6 +101,8 @@ class FileStore : public JournalingObjectStore {
     }
   } op_thread;
   void queue_op(__u64 op, list<Transaction*>& tls, Context *onreadable, Context *ondisk);
+  void _journaled_ahead(__u64 op, list<Transaction*> &tls, Context *onreadable,        Context *onjournal, Context *ondisk);
+  friend class C_JournaledAhead;
 
   // flusher thread
   Cond flusher_cond;