]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
filestore: preliminary support for parallel journaling
authorSage Weil <sage@newdream.net>
Thu, 17 Dec 2009 00:23:26 +0000 (16:23 -0800)
committerSage Weil <sage@newdream.net>
Mon, 25 Jan 2010 21:59:55 +0000 (13:59 -0800)
- fixed issue with normal write-behind journal (op_seq wasn't updated
  inside the fs transaction)

- reworked journal interface to support write-behind, parallel, write-ahead
  (hopefully)

src/TODO
src/config.cc
src/config.h
src/os/FileJournal.cc
src/os/FileStore.cc
src/os/FileStore.h
src/os/JournalingObjectStore.cc
src/os/JournalingObjectStore.h

index 2e22f81ad9a5f5c0f3b673a0b8372bba9db7575e..b149f47f5c82a534958e87144522c3b85eefe1ef 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -253,6 +253,7 @@ filestore performance notes
   - acks: never.
   - throttle: ??
   - rmw: rmw must block on prior fs writes.
+  * JourningObjectStore interface needs work?
 
 - separate reads/writes into separate op queues?
 - 
index 5ac40a6e870ed0dba4cba0bdcc4840ebb644ca6f..ac0cb2e72d4e7613344a43abf432bb512481bb80 100644 (file)
@@ -528,6 +528,8 @@ static struct config_option config_optionsp[] = {
        OPTION(filestore_flusher, 0, OPT_BOOL, true),
        OPTION(filestore_flusher_max_fds, 0, OPT_INT, 512),
        OPTION(filestore_sync_flush, 0, OPT_BOOL, false),
+       OPTION(filestore_journal_parallel, 0, OPT_BOOL, false),
+       OPTION(filestore_journal_writeahead, 0, OPT_BOOL, false),
        OPTION(ebofs, 0, OPT_BOOL, false),
        OPTION(ebofs_cloneable, 0, OPT_BOOL, true),
        OPTION(ebofs_verify, 0, OPT_BOOL, false),
index 3ef85867c3df986a9bfc5c778ecb87828820b91e..e43ee18648dd19b59efc41d1a5c2236446ffb317 100644 (file)
@@ -339,6 +339,8 @@ struct md_config_t {
   bool filestore_flusher;
   int filestore_flusher_max_fds;
   bool filestore_sync_flush;
+  bool filestore_journal_parallel;
+  bool filestore_journal_writeahead;
   
   // ebofs
   bool  ebofs;
index 9b6c50c74a140a118f4ad321a6e515ecd6f5df30..03faf07ef822cb0f44cf9ab94ca9f336bdab99c2 100644 (file)
@@ -718,7 +718,7 @@ bool FileJournal::read_entry(bufferlist& bl, __u64& seq)
          << " " << h.len << " bytes"
          << dendl;
 
-  if (seq && h.seq != seq) {
+  if (seq && h.seq < seq) {
     dout(2) << "read_entry " << read_pos << " : got seq " << h.seq << ", expected " << seq << ", stopping" << dendl;
     return false;
   }
index 9c73d838740537e4042054c7203aa934682c4f85..411528910c05fd4db957c91e5af9b6020dcbc900 100644 (file)
@@ -419,21 +419,23 @@ int FileStore::mount()
   snprintf(fn, sizeof(fn), "%s/current/commit_op_seq", basedir.c_str());
   op_fd = ::open(fn, O_CREAT|O_RDWR, 0644);
   assert(op_fd >= 0);
-  op_seq = 0;
-  ::read(op_fd, &op_seq, sizeof(op_seq));
+  __u64 initial_op_seq = 0;
+  ::read(op_fd, &initial_op_seq, sizeof(initial_op_seq));
 
-  dout(5) << "mount op_seq is " << op_seq << dendl;
+  dout(5) << "mount op_seq is " << initial_op_seq << dendl;
 
   // journal
   open_journal();
-  r = journal_replay();
+  r = journal_replay(initial_op_seq);
   if (r == -EINVAL) {
     dout(0) << "mount got EINVAL on journal open, not mounting" << dendl;
     return r;
   }
   journal_start();
   sync_thread.create();
+  op_thread.create();
   flusher_thread.create();
+  op_finisher.start();
 
 
   // is this btrfs?
@@ -500,12 +502,16 @@ int FileStore::umount()
   stop = true;
   sync_cond.Signal();
   flusher_cond.Signal();
+  op_cond.Signal();
   lock.Unlock();
   sync_thread.join();
+  op_thread.join();
   flusher_thread.join();
 
   journal_stop();
 
+  op_finisher.stop();
+
   ::close(fsid_fd);
   ::close(op_fd);
 
@@ -521,23 +527,74 @@ int FileStore::umount()
 }
 
 
+/// -----------------------------
 
+void FileStore::queue_op(__u64 op_seq, list<Transaction*>& tls, Context *onreadable,
+                        Context *oncommit)
+{
+  op_lock.Lock();
+  Op *o = new Op;
+  dout(10) << "queue_op " << o << " " << op_seq << dendl;
+  o->op = op_seq;
+  o->tls.swap(tls);
+  o->onreadable = onreadable;
+  o->oncommit = oncommit;
+  op_queue.push_back(o);
+  op_cond.Signal();
+  op_lock.Unlock();
+}
+
+void FileStore::op_entry()
+{
+  op_lock.Lock();
+  while (1) {
+    while (!op_queue.empty()) {
+      Op *o = op_queue.front();
+      op_queue.pop_front();
+      op_lock.Unlock();
 
-unsigned FileStore::apply_transaction(Transaction &t,
-                                     Context *onjournal,
-                                     Context *ondisk)
+      dout(10) << "op_entry " << o << " " << o->op << " start" << dendl;
+      op_apply_start(o->op, o->oncommit);
+      int r = do_transactions(o->tls, o->op);
+      op_apply_finish();
+      dout(10) << "op_entry " << o << " " << o->op << " r = " << r
+              << ", finisher " << o->onreadable << dendl;
+
+      op_finisher.queue(o->onreadable, r);
+
+      delete o;
+
+      op_lock.Lock();
+    }
+    if (stop)
+      break;
+    op_cond.Wait(op_lock);
+  }
+  op_lock.Unlock();
+}
+
+
+int FileStore::queue_transactions(list<Transaction*> &tls,
+                                 Context *onreadable,
+                                 Context *onjournal,
+                                 Context *ondisk)
 {
-  list<Transaction*> tls;
-  tls.push_back(&t);
-  return apply_transactions(tls, onjournal, ondisk);
+  __u64 op;
+
+  op = op_journal_start(0);
+  journal_transactions(tls, op, onjournal);
+
+  // queue inside journal lock, to preserve ordering
+  queue_op(op, tls, onreadable, ondisk);
+
+  op_journal_finish();
+
+  return 0;
 }
 
-unsigned FileStore::apply_transactions(list<Transaction*> &tls,
-                                      Context *onjournal,
-                                      Context *ondisk)
+int FileStore::do_transactions(list<Transaction*> &tls, __u64 op_seq)
 {
-  int r = 0;
-  op_start();
+  int r;
 
   __u64 bytes = 0, ops = 0;
   for (list<Transaction*>::iterator p = tls.begin();
@@ -549,8 +606,6 @@ unsigned FileStore::apply_transactions(list<Transaction*> &tls,
 
   int id = _transaction_start(bytes, ops);
   if (id < 0) {
-    op_journal_start();
-    op_finish();
     return id;
   }
     
@@ -562,21 +617,56 @@ unsigned FileStore::apply_transactions(list<Transaction*> &tls,
       break;
   }
   
+  ::pwrite(op_fd, &op_seq, sizeof(op_seq), 0);
+  
   _transaction_finish(id);
+  return r;
+}
 
-  op_journal_start();
-  dout(10) << "op_seq is " << op_seq << dendl;
-  if (r >= 0) {
-    journal_transactions(tls, onjournal, ondisk);
+unsigned FileStore::apply_transaction(Transaction &t,
+                                     Context *onjournal,
+                                     Context *ondisk)
+{
+  list<Transaction*> tls;
+  tls.push_back(&t);
+  return apply_transactions(tls, onjournal, ondisk);
+}
+
+unsigned FileStore::apply_transactions(list<Transaction*> &tls,
+                                      Context *onjournal,
+                                      Context *ondisk)
+{
+  int r = 0;
 
-    ::pwrite(op_fd, &op_seq, sizeof(op_seq), 0);
+  if (g_conf.filestore_journal_parallel) {
+    // use op pool
+    Cond my_cond;
+    Mutex my_lock("FileStore::apply_transaction::my_lock");
+    bool done;
+    C_SafeCond *onreadable = new C_SafeCond(&my_lock, &my_cond, &done, &r);
 
+    dout(10) << "apply queued" << dendl;
+    queue_transactions(tls, onreadable, onjournal, ondisk);
+    
+    my_lock.Lock();
+    while (!done)
+      my_cond.Wait(my_lock);
+    my_lock.Unlock();
+    dout(10) << "apply done r = " << r << dendl;
   } else {
-    delete onjournal;
-    delete ondisk;
+    __u64 op_seq = op_apply_start(0, ondisk);
+    r = do_transactions(tls, op_seq);
+    op_apply_finish();
+
+    if (r >= 0) {
+      op_journal_start(op_seq);
+      journal_transactions(tls, op_seq, onjournal);
+      op_journal_finish();
+    } else {
+      delete onjournal;
+      delete ondisk;
+    }
   }
-
-  op_finish();
   return r;
 }
 
index fc1d43e66422eae887df9831b57c2840d4a1e453..48bc9d85175dce2deff0ef7790de3c3d116ae1d9 100644 (file)
@@ -80,6 +80,28 @@ class FileStore : public JournalingObjectStore {
 
   void sync_fs(); // actuall sync underlying fs
 
+  // op thread
+  struct Op {
+    __u64 op;
+    list<Transaction*> tls;
+    Context *onreadable, *oncommit;
+  };
+
+  Finisher op_finisher;
+  Mutex op_lock;
+  Cond op_cond;
+  list<Op*> op_queue;
+  void op_entry();
+  struct OpThread : public Thread {
+    FileStore *fs;
+    OpThread(FileStore *f) : fs(f) {}
+    void *entry() {
+      fs->op_entry();
+      return 0;
+    }
+  } op_thread;
+  void queue_op(__u64 op, list<Transaction*>& tls, Context *onreadable, Context *ondisk);
+
   // flusher thread
   Cond flusher_cond;
   list<__u64> flusher_queue;
@@ -94,6 +116,7 @@ class FileStore : public JournalingObjectStore {
     }
   } flusher_thread;
   bool queue_flusher(int fd, __u64 off, __u64 len);
+
   int open_journal();
 
  public:
@@ -104,7 +127,9 @@ class FileStore : public JournalingObjectStore {
     attrs(this), fake_attrs(false), 
     collections(this), fake_collections(false),
     lock("FileStore::lock"),
-    sync_epoch(0), stop(false), sync_thread(this), flusher_queue_len(0), flusher_thread(this) { }
+    sync_epoch(0), stop(false), sync_thread(this),
+    op_lock("FileStore::op_lock"), op_thread(this),
+    flusher_queue_len(0), flusher_thread(this) { }
 
   int mount();
   int umount();
@@ -113,12 +138,16 @@ class FileStore : public JournalingObjectStore {
 
   int statfs(struct statfs *buf);
 
+  int do_transactions(list<Transaction*> &tls, __u64 op_seq);
   unsigned apply_transaction(Transaction& t, Context *onjournal=0, Context *ondisk=0);
   unsigned apply_transactions(list<Transaction*>& tls, Context *onjournal=0, Context *ondisk=0);
   int _transaction_start(__u64 bytes, __u64 ops);
   void _transaction_finish(int id);
   unsigned _do_transaction(Transaction& t);
 
+  int queue_transactions(list<Transaction*>& tls, Context *onreadable,
+                         Context *onjournal=0, Context *ondisk=0);
+
   // ------------------
   // objects
   int pick_object_revision_lt(sobject_t& oid) {
index dac595cff68ca1ca34ba1be8aaec540894e3afce..f47c3e88ddf36f17667061a973a7201db21ae1c2 100644 (file)
@@ -8,8 +8,29 @@
 #define dout_prefix *_dout << dbeginl << "journal "
 
 
-int JournalingObjectStore::journal_replay()
+
+void JournalingObjectStore::journal_start()
+{
+  dout(10) << "journal_start" << dendl;
+  finisher.start();
+}
+void JournalingObjectStore::journal_stop() 
 {
+  dout(10) << "journal_stop" << dendl;
+  finisher.stop();
+  if (journal) {
+    journal->close();
+    delete journal;
+    journal = 0;
+  }
+}
+
+int JournalingObjectStore::journal_replay(__u64 fs_op_seq)
+{
+  dout(10) << "journal_replay fs op_seq " << fs_op_seq << dendl;
+  op_seq = applied_seq = fs_op_seq;
+
   if (!journal)
     return 0;
 
@@ -56,10 +77,142 @@ int JournalingObjectStore::journal_replay()
     seq++;  // we expect the next op
   }
 
-  committed_op_seq = op_seq;
+  committed_seq = op_seq;
 
   // done reading, make writeable.
   journal->make_writeable();
 
   return count;
 }
+
+
+// ------------------------------------
+
+__u64 JournalingObjectStore::op_apply_start(__u64 op, Context *ondisk) 
+{
+  lock.Lock();
+  while (blocked) {
+    dout(10) << "op_apply_start blocked" << dendl;
+    cond.Wait(lock);
+  }
+  open_ops++;
+
+  if (!op)
+    op = ++op_seq;
+  assert(op > applied_seq);  // !!
+  applied_seq = op;
+
+  dout(10) << "op_apply_start " << op << dendl;
+
+  if (ondisk)
+    commit_waiters[op].push_back(ondisk);
+
+  lock.Unlock();
+  return op;
+}
+
+void JournalingObjectStore::op_apply_finish() 
+{
+  dout(10) << "op_apply_finish" << dendl;
+  lock.Lock();
+  if (--open_ops == 0)
+    cond.Signal();
+  lock.Unlock();
+}
+
+__u64 JournalingObjectStore::op_journal_start(__u64 op)
+{
+  journal_lock.Lock();
+  if (!op) {
+    lock.Lock();
+    op = ++op_seq;
+    lock.Unlock();
+  }
+  return op;
+}
+
+void JournalingObjectStore::op_journal_finish()
+{
+  journal_lock.Unlock();
+}
+
+
+// ------------------------------------------
+
+bool JournalingObjectStore::commit_start() 
+{
+  // suspend new ops...
+  Mutex::Locker l(lock);
+
+  dout(10) << "commit_start" << dendl;
+  blocked = true;
+  while (open_ops > 0) {
+    dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl;
+    cond.Wait(lock);
+  }
+  
+  if (applied_seq == committed_seq) {
+    dout(10) << "commit_start nothing to do" << dendl;
+    blocked = false;
+    cond.Signal();
+    assert(commit_waiters.empty());
+    return false;
+  }
+  dout(10) << "commit_start" << dendl;
+  return true;
+}
+
+void JournalingObjectStore::commit_started() 
+{
+  Mutex::Locker l(lock);
+  dout(10) << "commit_started" << dendl;
+  // allow new ops. (underlying fs should now be committing all prior ops)
+  committing_seq = applied_seq;
+  blocked = false;
+  cond.Signal();
+}
+
+void JournalingObjectStore::commit_finish()
+{
+  Mutex::Locker l(lock);
+  dout(10) << "commit_finish" << dendl;
+  
+  if (journal)
+    journal->committed_thru(committing_seq);
+  committed_seq = committing_seq;
+  
+  map<version_t, vector<Context*> >::iterator p = commit_waiters.begin();
+  while (p != commit_waiters.end() &&
+        p->first <= committing_seq) {
+    finisher.queue(p->second);
+    commit_waiters.erase(p++);
+  }
+}
+
+void JournalingObjectStore::journal_transaction(ObjectStore::Transaction& t, __u64 op,
+                                               Context *onjournal)
+{
+  Mutex::Locker l(lock);
+  dout(10) << "journal_transaction " << op << dendl;
+  if (journal && journal->is_writeable()) {
+    bufferlist tbl;
+    t.encode(tbl);
+    journal->submit_entry(op, tbl, onjournal);
+  } else if (onjournal)
+    commit_waiters[op].push_back(onjournal);
+}
+
+void JournalingObjectStore::journal_transactions(list<ObjectStore::Transaction*>& tls, __u64 op,
+                                                Context *onjournal)
+{
+  Mutex::Locker l(lock);
+  dout(10) << "journal_transactions " << op << dendl;
+    
+  if (journal && journal->is_writeable()) {
+    bufferlist tbl;
+    for (list<ObjectStore::Transaction*>::iterator p = tls.begin(); p != tls.end(); p++)
+      (*p)->encode(tbl);
+    journal->submit_entry(op, tbl, onjournal);
+  } else if (onjournal)
+    commit_waiters[op].push_back(onjournal);
+}
index f4690ef58859b96c03bb7bc673171b07f2f16009..8ed782a73d37b882c2bc4867d934a5f9d1e932d1 100644 (file)
 
 class JournalingObjectStore : public ObjectStore {
 protected:
-  __u64 op_seq;
-  __u64 committing_op_seq, committed_op_seq;
-  Journal *journal;
-  Finisher finisher;
+  __u64 op_seq, applied_seq;
+  __u64 committing_seq, committed_seq;
   map<version_t, vector<Context*> > commit_waiters;
-  RWLock op_lock;
-  Mutex journal_lock;
-  Mutex lock;
-
-  void journal_start() {
-    finisher.start();
-  }
-  void journal_stop() {
-    finisher.stop();
-    if (journal) {
-      journal->close();
-      delete journal;
-      journal = 0;
-    }
-  }
-  int journal_replay();
-
-  void op_start() {
-    op_lock.get_read();
-  }
-  void op_journal_start() {
-    journal_lock.Lock();
-  }
-  void op_finish() {
-    journal_lock.Unlock();
-    op_lock.put_read();    
-  }
-
-  bool commit_start() {
-    // suspend new ops...
-    op_lock.get_write();
-    Mutex::Locker l(lock);
-    if (op_seq == committed_op_seq) {
-      op_lock.put_write();
-      assert(commit_waiters.empty());
-      return false;
-    }
-    return true;
-  }
-  void commit_started() {
-    Mutex::Locker l(lock);
 
-    // allow new ops
-    // (underlying fs should now be committing all prior ops)
-    committing_op_seq = op_seq;
-    op_lock.put_write();
-  }
-  void commit_finish() {
-    Mutex::Locker l(lock);
+  int open_ops;
+  bool blocked;
 
-    if (journal)
-      journal->committed_thru(committing_op_seq);
-    committed_op_seq = committing_op_seq;
-
-    map<version_t, vector<Context*> >::iterator p = commit_waiters.begin();
-    while (p != commit_waiters.end() &&
-          p->first <= committing_op_seq) {
-      finisher.queue(p->second);
-      commit_waiters.erase(p++);
-    }
-  }
-
-  void journal_transaction(ObjectStore::Transaction& t, Context *onjournal, Context *ondisk) {
-    Mutex::Locker l(lock);
+  Journal *journal;
+  Finisher finisher;
 
-    ++op_seq;
+  Cond cond;
+  Mutex journal_lock;
+  Mutex lock;
 
-    if (journal && journal->is_writeable()) {
-      bufferlist tbl;
-      t.encode(tbl);
-      journal->submit_entry(op_seq, tbl, onjournal);
-    } else if (onjournal)
-      commit_waiters[op_seq].push_back(onjournal);
+protected:
+  void journal_start();
+  void journal_stop();
+  int journal_replay(__u64 fs_op_seq);
 
-    if (ondisk)
-      commit_waiters[op_seq].push_back(ondisk);
-  }
-  void journal_transactions(list<ObjectStore::Transaction*>& tls, Context *onjournal, Context *ondisk) {
-    Mutex::Locker l(lock);
+  // --
+  __u64 op_apply_start(__u64 op, Context *ondisk);
+  void op_apply_finish();
+  __u64 op_journal_start(__u64 op);
+  void op_journal_finish();
 
-    ++op_seq;
+  void journal_transaction(ObjectStore::Transaction& t, __u64 op, Context *onjournal);
+  void journal_transactions(list<ObjectStore::Transaction*>& tls, __u64 op, Context *onjournal);
 
-    if (journal && journal->is_writeable()) {
-      bufferlist tbl;
-      for (list<ObjectStore::Transaction*>::iterator p = tls.begin(); p != tls.end(); p++)
-       (*p)->encode(tbl);
-      journal->submit_entry(op_seq, tbl, onjournal);
-    } else if (onjournal)
-      commit_waiters[op_seq].push_back(onjournal);
+  bool commit_start();
+  void commit_started();  // allow new ops (underlying fs should now be committing all prior ops)
+  void commit_finish();
 
-    if (ondisk)
-      commit_waiters[op_seq].push_back(ondisk);
-  }
 
 public:
-  JournalingObjectStore() : op_seq(0), committing_op_seq(0), committed_op_seq(0), 
-                           journal(0),
-                           op_lock("JournalingObjectStore::op_lock"),
+  JournalingObjectStore() : op_seq(0), 
+                           applied_seq(0), committing_seq(0), committed_seq(0), 
+                           open_ops(0), blocked(false),
+                           journal(NULL),
                            journal_lock("JournalingObjectStore::journal_lock"),
                            lock("JournalingObjectStore::lock") { }