]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
simplified FileJournal, journal interface
authorSage Weil <sage@newdream.net>
Thu, 24 Jan 2008 09:47:03 +0000 (01:47 -0800)
committerSage Weil <sage@newdream.net>
Thu, 24 Jan 2008 09:47:03 +0000 (01:47 -0800)
src/ebofs/DIOJournal.cc [new file with mode: 0644]
src/ebofs/DIOJournal.h [new file with mode: 0644]
src/ebofs/Ebofs.cc
src/ebofs/Ebofs.h
src/ebofs/FileJournal.cc
src/ebofs/FileJournal.h
src/ebofs/Journal.h
src/ebofs/Onode.h
src/ebofs/streamtest.cc

diff --git a/src/ebofs/DIOJournal.cc b/src/ebofs/DIOJournal.cc
new file mode 100644 (file)
index 0000000..75eb408
--- /dev/null
@@ -0,0 +1,469 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include "DIOJournal.h"
+#include "Ebofs.h"
+
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+
+#include "config.h"
+
+#define dout(x) if (x <= g_conf.debug_ebofs) *_dout << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal "
+#define derr(x) if (x <= g_conf.debug_ebofs) *_derr << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal "
+
+
+int DIOJournal::create()
+{
+  dout(2) << "create " << fn << dendl;
+
+  // open/create
+  fd = ::open(fn.c_str(), O_RDWR|O_DIRECT);
+  if (fd < 0) {
+    dout(2) << "create failed " << errno << " " << strerror(errno) << dendl;
+    return -errno;
+  }
+  assert(fd > 0);
+
+  //::ftruncate(fd, 0);
+  //::fchmod(fd, 0644);
+
+  // get size
+  struct stat st;
+  ::fstat(fd, &st);
+  dout(2) << "create " << fn << " " << st.st_size << " bytes" << dendl;
+
+  // write empty header
+  memset(&header, 0, sizeof(header));
+  header.clear();
+  header.fsid = ebofs->get_fsid();
+  header.max_size = st.st_size;
+  write_header();
+  
+  // writeable.
+  read_pos = 0;
+  write_pos = queue_pos = sizeof(header);
+
+  ::close(fd);
+
+  return 0;
+}
+
+int DIOJournal::open()
+{
+  //dout(1) << "open " << fn << dendl;
+
+  // open and file
+  assert(fd == 0);
+  fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
+  if (fd < 0) {
+    dout(2) << "open failed " << errno << " " << strerror(errno) << dendl;
+    return -errno;
+  }
+  assert(fd > 0);
+
+  // assume writeable, unless...
+  read_pos = 0;
+  write_pos = queue_pos = sizeof(header);
+
+  // read header?
+  read_header();
+  if (header.fsid != ebofs->get_fsid()) {
+    dout(2) << "open journal fsid doesn't match, invalid (someone else's?) journal" << dendl;
+  } 
+  else if (header.num > 0) {
+    // valid header, pick an offset
+    for (int i=0; i<header.num; i++) {
+      if (header.epoch[i] == ebofs->get_super_epoch()) {
+       dout(2) << "using read_pos header pointer "
+               << header.epoch[i] << " at " << header.offset[i]
+               << dendl;
+       read_pos = header.offset[i];
+       write_pos = queue_pos = 0;
+       break;
+      }      
+      else if (header.epoch[i] < ebofs->get_super_epoch()) {
+       dout(2) << "super_epoch is " << ebofs->get_super_epoch() 
+               << ", skipping old " << header.epoch[i] << " at " << header.offset[i]
+               << dendl;
+      }
+      else if (header.epoch[i] > ebofs->get_super_epoch()) {
+       dout(2) << "super_epoch is " << ebofs->get_super_epoch() 
+               << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i]
+               << dendl;
+       break;
+      }
+    }
+  }
+
+  start_writer();
+
+  return 0;
+}
+
+void DIOJournal::close()
+{
+  dout(1) << "close " << fn << dendl;
+
+  // stop writer thread
+  stop_writer();
+
+  // close
+  assert(writeq.empty());
+  assert(commitq.empty());
+  assert(fd > 0);
+  ::close(fd);
+  fd = 0;
+}
+
+void DIOJournal::start_writer()
+{
+  write_stop = false;
+  write_thread.create();
+}
+
+void DIOJournal::stop_writer()
+{
+  write_lock.Lock();
+  {
+    write_stop = true;
+    write_cond.Signal();
+  } 
+  write_lock.Unlock();
+  write_thread.join();
+}
+
+
+void DIOJournal::print_header()
+{
+  for (int i=0; i<header.num; i++) {
+    if (i && header.offset[i] < header.offset[i-1]) {
+      assert(header.wrap);
+      dout(10) << "header: wrap at " << header.wrap << dendl;
+    }
+    dout(10) << "header: epoch " << header.epoch[i] << " at " << header.offset[i] << dendl;
+  }
+  //if (header.wrap) dout(10) << "header: wrap at " << header.wrap << dendl;
+}
+void DIOJournal::read_header()
+{
+  dout(10) << "read_header" << dendl;
+  memset(&header, 0, sizeof(header));  // zero out (read may fail)
+  char buf[4096];
+  ::lseek(fd, 0, SEEK_SET);
+  int r = ::read(fd, buf, 4096);
+  memcpy(&header, buf, sizeof(header));
+  if (r < 0) 
+    dout(0) << "read_header error " << errno << " " << strerror(errno) << dendl;
+  print_header();
+}
+void DIOJournal::write_header()
+{
+  dout(10) << "write_header " << dendl;
+  print_header();
+
+  char buf[4096];
+  memcpy(buf, &header, sizeof(header));
+  
+  ::lseek(fd, 0, SEEK_SET);
+  int r = ::write(fd, &buf, 4096);
+  if (r < 0) 
+    dout(0) << "write_header error " << errno << " " << strerror(errno) << dendl;
+}
+
+
+void DIOJournal::write_thread_entry()
+{
+  dout(10) << "write_thread_entry start" << dendl;
+  write_lock.Lock();
+  
+  while (!write_stop) {
+    if (writeq.empty()) {
+      // sleep
+      dout(20) << "write_thread_entry going to sleep" << dendl;
+      assert(write_pos == queue_pos);
+      write_cond.Wait(write_lock);
+      dout(20) << "write_thread_entry woke up" << dendl;
+      continue;
+    }
+    
+    // do queued writes
+    while (!writeq.empty()) {
+      // grab next item
+      epoch_t epoch = writeq.front().first;
+
+      int len = writeq.front().second.length() + sizeof(entry_header_t);
+      int dolen = DIV_ROUND_UP(len,4096) * 4096;
+      bufferptr bp = buffer::create_page_aligned(dolen);
+      entry_header_t *h = (entry_header_t*)bp.c_str();
+      writeq.front().second.copy(0, writeq.front().second.length(), bp.c_str()+sizeof(entry_header_t));
+
+      writeq.pop_front();
+      Context *oncommit = commitq.front();
+      commitq.pop_front();
+      
+      // wrap?
+      if (write_pos == header.wrap) {
+       dout(15) << "write_thread_entry wrapped write_pos at " << write_pos << " to " << sizeof(header_t) << dendl;
+       assert(header.wrap == write_pos);
+       write_header();
+       write_pos = 4096; //sizeof(header_t);
+      }
+
+      // write!
+      dout(-15) << "write_thread_entry writing " << write_pos << " : " 
+               << len << " (" << dolen << ")"
+               << " epoch " << epoch
+              << dendl;
+      
+      // write entry header
+      //entry_header_t h;
+      h->epoch = epoch;
+      h->len = len;//bl.length();
+      h->make_magic(write_pos, header.fsid);
+
+      ::lseek(fd, write_pos, SEEK_SET);
+      /*
+      ::write(fd, &h, sizeof(h));
+      for (list<bufferptr>::const_iterator it = bl.buffers().begin();
+          it != bl.buffers().end();
+          it++) {
+       if ((*it).length() == 0) continue;  // blank buffer.
+       ::write(fd, (char*)(*it).c_str(), (*it).length() );
+      }
+      ::write(fd, &h, sizeof(h));
+      ::fsync(fd);
+      */
+      int r = ::write(fd, bp.c_str(), dolen);
+      if (r != dolen) derr(0) << "write got " << r << " not " << dolen << dendl;
+      
+      // move position pointer
+      write_pos += dolen;
+      
+      if (oncommit) {
+       if (1) {
+         // queue callback
+         ebofs->queue_finisher(oncommit);
+       } else {
+         // callback now
+         oncommit->finish(0);
+         delete oncommit;
+       }
+      }
+    }
+  }
+  
+  write_lock.Unlock();
+  dout(10) << "write_thread_entry finish" << dendl;
+}
+
+bool DIOJournal::submit_entry(bufferlist& e, Context *oncommit)
+{
+  assert(queue_pos != 0);  // bad create(), or journal didn't replay to completion.
+
+  // ** lock **
+  Mutex::Locker locker(write_lock);
+
+  // wrap? full?
+  off_t size = 2*sizeof(entry_header_t) + e.length();
+
+  if (full) return false;  // already marked full.
+
+  if (header.wrap) {
+    // we're wrapped.  don't overwrite ourselves.
+    if (queue_pos + size >= header.offset[0]) {
+      derr(0) << "submit_entry JOURNAL FULL (and wrapped), " << queue_pos << "+" << size
+              << " >= " << header.offset[0]
+              << dendl;
+      full = true;
+      print_header();
+      return false;
+    }
+  } else {
+    // we haven't wrapped.  
+    if (queue_pos + size >= header.max_size) {
+      // is there room if we wrap?
+      if ((off_t)sizeof(header_t) + size < header.offset[0]) {
+       // yes!
+       dout(10) << "submit_entry wrapped from " << queue_pos << " to " << sizeof(header_t) << dendl;
+       header.wrap = queue_pos;
+       queue_pos = sizeof(header_t);
+       header.push(ebofs->get_super_epoch(), queue_pos);
+      } else {
+       // no room.
+       derr(0) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size
+                << " >= " << header.max_size
+                << dendl;
+       full = true;
+       return false;
+      }
+    }
+  }
+  
+  dout(10) << "submit_entry " << queue_pos << " : " << e.length()
+          << " epoch " << ebofs->get_super_epoch()
+          << " " << oncommit << dendl;
+  
+  // dump on queue
+  writeq.push_back(pair<epoch_t,bufferlist>(ebofs->get_super_epoch(), e));
+  commitq.push_back(oncommit);
+  
+  queue_pos += size;
+  
+  // kick writer thread
+  write_cond.Signal();
+
+  return true;
+}
+
+
+void DIOJournal::commit_epoch_start()
+{
+  dout(10) << "commit_epoch_start on " << ebofs->get_super_epoch()-1 
+          << " -- new epoch " << ebofs->get_super_epoch()
+          << dendl;
+
+  Mutex::Locker locker(write_lock);
+
+  // was full -> empty -> now usable?
+  if (full) {
+    if (header.num != 0) {
+      dout(1) << " journal FULL, ignoring this epoch" << dendl;
+      return;
+    }
+    
+    derr(0) << " clearing FULL flag, journal now usable" << dendl;
+    full = false;
+  } 
+
+  // note epoch boundary
+  header.push(ebofs->get_super_epoch(), queue_pos);  // note: these entries may not yet be written.
+  //write_header();  // no need to write it now, though...
+}
+
+void DIOJournal::commit_epoch_finish()
+{
+  dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << dendl;
+
+  write_lock.Lock();
+  {
+    if (full) {
+      // full journal damage control.
+      dout(15) << " journal was FULL, contents now committed, clearing header.  journal still not usable until next epoch." << dendl;
+      header.clear();
+      write_pos = queue_pos = sizeof(header_t);
+    } else {
+      // update header -- trim/discard old (committed) epochs
+      while (header.epoch[0] < ebofs->get_super_epoch())
+       header.pop();
+    }
+    write_header();
+
+    // discard any unwritten items in previous epoch, and do callbacks
+    epoch_t epoch = ebofs->get_super_epoch();
+    list<Context*> callbacks;
+    while (!writeq.empty() && writeq.front().first < epoch) {
+      dout(15) << " dropping unwritten and committed " 
+              << write_pos << " : " << writeq.front().second.length()
+              << " epoch " << writeq.front().first 
+              << dendl;
+      // finisher?
+      Context *oncommit = commitq.front();
+      if (oncommit) callbacks.push_back(oncommit);
+
+      write_pos += 2*sizeof(entry_header_t) + writeq.front().second.length();
+
+      // discard.
+      writeq.pop_front();  
+      commitq.pop_front();
+    }
+    
+    // queue the finishers
+    ebofs->queue_finishers(callbacks);
+  }
+  write_lock.Unlock();
+  
+}
+
+
+void DIOJournal::make_writeable()
+{
+  if (read_pos)
+    write_pos = queue_pos = read_pos;
+  else
+    write_pos = queue_pos = sizeof(header_t);
+  read_pos = 0;
+}
+
+
+bool DIOJournal::read_entry(bufferlist& bl, epoch_t& epoch)
+{
+  if (!read_pos) {
+    dout(2) << "read_entry -- not readable" << dendl;
+    return false;
+  }
+
+  if (read_pos == header.wrap) {
+    // find wrap point
+    for (int i=1; i<header.num; i++) {
+      if (header.offset[i] < read_pos) {
+       assert(header.offset[i-1] < read_pos);
+       read_pos = header.offset[i];
+       break;
+      }
+    }
+    assert(read_pos != header.wrap);
+    dout(10) << "read_entry wrapped from " << header.wrap << " to " << read_pos << dendl;
+  }
+
+  // header
+  entry_header_t h;
+  ::lseek(fd, read_pos, SEEK_SET);
+  ::read(fd, &h, sizeof(h));
+  if (!h.check_magic(read_pos, header.fsid)) {
+    dout(2) << "read_entry " << read_pos << " : bad header magic, end of journal" << dendl;
+    return false;
+  }
+
+  // body
+  bufferptr bp(h.len);
+  ::read(fd, bp.c_str(), h.len);
+
+  // footer
+  entry_header_t f;
+  ::read(fd, &f, sizeof(h));
+  if (!f.check_magic(read_pos, header.fsid) ||
+      h.epoch != f.epoch ||
+      h.len != f.len) {
+    dout(2) << "read_entry " << read_pos << " : bad footer magic, partially entry, end of journal" << dendl;
+    return false;
+  }
+
+
+  // yay!
+  dout(1) << "read_entry " << read_pos << " : " 
+         << " " << h.len << " bytes"
+         << " epoch " << h.epoch 
+         << dendl;
+  
+  bl.push_back(bp);
+  epoch = h.epoch;
+
+  read_pos += 2*sizeof(entry_header_t) + h.len;
+
+  return true;
+}
diff --git a/src/ebofs/DIOJournal.h b/src/ebofs/DIOJournal.h
new file mode 100644 (file)
index 0000000..32e6e8e
--- /dev/null
@@ -0,0 +1,149 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __EBOFS_DIOJOURNAL_H
+#define __EBOFS_DIOJOURNAL_H
+
+
+#include "Journal.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/Thread.h"
+
+class DIOJournal : public Journal {
+public:
+  /** log header
+   * we allow 4 pointers:
+   *  top/initial,
+   *  one for an epoch boundary (if any),
+   *  one for a wrap in the ring buffer/journal file,
+   *  one for a second epoch boundary (if any).
+   * the epoch boundary one is useful only for speedier recovery in certain cases
+   * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!)
+   */
+  struct header_t {
+    uint64_t fsid;
+    int num;
+    off_t wrap;
+    off_t max_size;
+    epoch_t epoch[4];
+    off_t offset[4];
+
+    header_t() : fsid(0), num(0), wrap(0), max_size(0) {}
+
+    void clear() {
+      num = 0;
+      wrap = 0;
+    }
+    void pop() {
+      if (num >= 2 && offset[0] > offset[1]) 
+       wrap = 0;  // we're eliminating a wrap
+      num--;
+      for (int i=0; i<num; i++) {
+       epoch[i] = epoch[i+1];
+       offset[i] = offset[i+1];
+      }
+    }
+    void push(epoch_t e, off_t o) {
+      assert(num < 4);
+      if (num > 2 && 
+         epoch[num-1] == e &&
+         epoch[num-2] == (e-1)) 
+       num--;  // tail was an epoch boundary; replace it.
+      epoch[num] = e;
+      offset[num] = o;
+      num++;
+    }
+  } header;
+
+  struct entry_header_t {
+    uint64_t epoch;
+    uint64_t len;
+    uint64_t magic1;
+    uint64_t magic2;
+    
+    void make_magic(off_t pos, uint64_t fsid) {
+      magic1 = pos;
+      magic2 = fsid ^ epoch ^ len;
+    }
+    bool check_magic(off_t pos, uint64_t fsid) {
+      return
+       magic1 == (uint64_t)pos &&
+       magic2 == (fsid ^ epoch ^ len);
+    }
+  };
+
+private:
+  string fn;
+
+  bool full;
+  off_t write_pos;      // byte where next entry written goes
+  off_t queue_pos;      // byte where next entry queued for write goes
+
+  off_t read_pos;       // 
+
+  int fd;
+
+  list<pair<epoch_t,bufferlist> > writeq;  // currently journaling
+  list<Context*> commitq; // currently journaling
+  
+  // write thread
+  Mutex write_lock;
+  Cond write_cond;
+  bool write_stop;
+
+  void print_header();
+  void read_header();
+  void write_header();
+  void start_writer();
+  void stop_writer();
+  void write_thread_entry();
+
+  class Writer : public Thread {
+    DIOJournal *journal;
+  public:
+    Writer(DIOJournal *fj) : journal(fj) {}
+    void *entry() {
+      journal->write_thread_entry();
+      return 0;
+    }
+  } write_thread;
+
+ public:
+  DIOJournal(Ebofs *e, char *f) : 
+    Journal(e), fn(f),
+    full(false),
+    write_pos(0), queue_pos(0), read_pos(0),
+    fd(0),
+    write_stop(false), write_thread(this) { }
+  ~DIOJournal() {}
+
+  int create();
+  int open();
+  void close();
+
+  void make_writeable();
+
+  // writes
+  bool submit_entry(bufferlist& e, Context *oncommit);  // submit an item
+  void commit_epoch_start();   // mark epoch boundary
+  void commit_epoch_finish();  // mark prior epoch as committed (we can expire)
+
+  bool read_entry(bufferlist& bl, epoch_t& e);
+
+  // reads
+};
+
+#endif
index 9376a4f8f953961fcf8a8ee8a9a6e937c936f136..5d1e5f31a26187deaaae3b78098ac3b855e86531 100644 (file)
@@ -1432,17 +1432,14 @@ void Ebofs::sync(Context *onsafe)
   if (onsafe) {
     dirty = true;
 
-    while (1) {
-      if (journal) {  
-       // journal empty transaction
-       Transaction t;
-       bufferlist bl;
-       t._encode(bl);
-       if (journal->submit_entry(bl, onsafe)) break;
-      }
-      commit_waiters[super_epoch].push_back(onsafe);
-      break;
-    }
+    if (journal) {  
+      // journal empty transaction
+      Transaction t;
+      bufferlist bl;
+      t._encode(bl);
+      journal->submit_entry(bl, onsafe);
+    } else
+      queue_commit_waiter(onsafe);
   }
   ebofs_lock.Unlock();
 }
@@ -2483,15 +2480,12 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
     delete onsafe;  // kill callback, but still journal below (in case transaction had side effects)
     onsafe = 0;
   }
-  while (1) {
-    if (journal) {
-      bufferlist bl;
-      t._encode(bl);
-      if (journal->submit_entry(bl, onsafe)) break; 
-    }
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
-    break;
-  }
+  if (journal) {
+    bufferlist bl;
+    t._encode(bl);
+    journal->submit_entry(bl, onsafe);
+  } else
+    queue_commit_waiter(onsafe);
 
   ebofs_lock.Unlock();
   return r;
@@ -2900,17 +2894,14 @@ int Ebofs::write(pobject_t oid,
   // commit waiter
   if (r > 0) {
     assert((size_t)r == len);
-    while (1) {
-      if (journal) {
-       Transaction t;
-       t.write(oid, off, len, bl);
-       bufferlist tbl;
-       t._encode(tbl);
-       if (journal->submit_entry(tbl, onsafe)) break;
-      }
-      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
-      break;
-    }
+    if (journal) {
+      Transaction t;
+      t.write(oid, off, len, bl);
+      bufferlist tbl;
+      t._encode(tbl);
+      journal->submit_entry(tbl, onsafe);
+    } else
+      queue_commit_waiter(onsafe);
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2929,17 +2920,14 @@ int Ebofs::zero(pobject_t oid, off_t off, size_t len, Context *onsafe)
   // commit waiter
   if (r > 0) {
     assert((size_t)r == len);
-    while (1) {
-      if (journal) {
-       Transaction t;
-       t.zero(oid, off, len);
-       bufferlist tbl;
-       t._encode(tbl);
-       if (journal->submit_entry(tbl, onsafe)) break;
-      }
-      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
-      break;
-    }
+    if (journal) {
+      Transaction t;
+      t.zero(oid, off, len);
+      bufferlist tbl;
+      t._encode(tbl);
+      journal->submit_entry(tbl, onsafe);
+    } else
+      queue_commit_waiter(onsafe);
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2973,17 +2961,14 @@ int Ebofs::remove(pobject_t oid, Context *onsafe)
 
   // journal, wait for commit
   if (r >= 0) {
-    while (1) {
-      if (journal) {
-       Transaction t;
-       t.remove(oid);
-       bufferlist bl;
-       t._encode(bl);
-       if (journal->submit_entry(bl, onsafe)) break;
-      }
-      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
-      break;
-    }
+    if (journal) {
+      Transaction t;
+      t.remove(oid);
+      bufferlist bl;
+      t._encode(bl);
+      journal->submit_entry(bl, onsafe);
+    } else
+      queue_commit_waiter(onsafe);
   } else {
     if (onsafe) delete onsafe;
   }
@@ -3059,17 +3044,14 @@ int Ebofs::truncate(pobject_t oid, off_t size, Context *onsafe)
 
   // journal, wait for commit
   if (r >= 0) {
-    while (1) {
-      if (journal) {
-       Transaction t;
-       t.truncate(oid, size);
-       bufferlist bl;
-       t._encode(bl);
-       if (journal->submit_entry(bl, onsafe)) break;
-      }
-      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
-      break;
-    }
+    if (journal) {
+      Transaction t;
+      t.truncate(oid, size);
+      bufferlist bl;
+      t._encode(bl);
+      journal->submit_entry(bl, onsafe);
+    } else
+      queue_commit_waiter(onsafe);
   } else {
     if (onsafe) delete onsafe;
   }
@@ -3088,17 +3070,14 @@ int Ebofs::clone(pobject_t from, pobject_t to, Context *onsafe)
 
   // journal, wait for commit
   if (r >= 0) {
-    while (1) {
-      if (journal) {
-       Transaction t;
-       t.clone(from, to);
-       bufferlist bl;
-       t._encode(bl);
-       if (journal->submit_entry(bl, onsafe)) break;
-      }
-      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
-      break;
-    }
+    if (journal) {
+      Transaction t;
+      t.clone(from, to);
+      bufferlist bl;
+      t._encode(bl);
+      journal->submit_entry(bl, onsafe);
+    } else
+      queue_commit_waiter(onsafe);
   } else {
     if (onsafe) delete onsafe;
   }
@@ -3275,17 +3254,14 @@ int Ebofs::setattr(pobject_t oid, const char *name, const void *value, size_t si
 
   // journal, wait for commit
   if (r >= 0) {
-    while (1) {
-      if (journal) {
-       Transaction t;
-       t.setattr(oid, name, value, size);
-       bufferlist bl;
-       t._encode(bl);
-       if (journal->submit_entry(bl, onsafe)) break;
-      }
-      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
-      break;
-    }
+    if (journal) {
+      Transaction t;
+      t.setattr(oid, name, value, size);
+      bufferlist bl;
+      t._encode(bl);
+      journal->submit_entry(bl, onsafe);
+    } else
+      queue_commit_waiter(onsafe);
   } else {
     if (onsafe) delete onsafe;
   }
@@ -3318,17 +3294,14 @@ int Ebofs::setattrs(pobject_t oid, map<string,bufferptr>& attrset, Context *onsa
 
   // journal, wait for commit
   if (r >= 0) {
-    while (1) {
-      if (journal) {
-       Transaction t;
-       t.setattrs(oid, attrset);
-       bufferlist bl;
-       t._encode(bl);
-       if (journal->submit_entry(bl, onsafe)) break;
-      }
-      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
-      break;
-    }
+    if (journal) {
+      Transaction t;
+      t.setattrs(oid, attrset);
+      bufferlist bl;
+      t._encode(bl);
+      journal->submit_entry(bl, onsafe);
+    } else
+      queue_commit_waiter(onsafe);
   } else {
     if (onsafe) delete onsafe;
   }
@@ -3433,17 +3406,14 @@ int Ebofs::rmattr(pobject_t oid, const char *name, Context *onsafe)
 
   // journal, wait for commit
   if (r >= 0) {
-    while (1) {
-      if (journal) {
-       Transaction t;
-       t.rmattr(oid, name);
-       bufferlist bl;
-       t._encode(bl);
-       if (journal->submit_entry(bl, onsafe)) break;
-      }
-      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
-      break;
-    }
+    if (journal) {
+      Transaction t;
+      t.rmattr(oid, name);
+      bufferlist bl;
+      t._encode(bl);
+      journal->submit_entry(bl, onsafe);
+    } else
+      queue_commit_waiter(onsafe);
   } else {
     if (onsafe) delete onsafe;
   }
@@ -3539,17 +3509,14 @@ int Ebofs::create_collection(coll_t cid, Context *onsafe)
 
   // journal, wait for commit
   if (r >= 0) {
-    while (1) {
-      if (journal) {
-       Transaction t;
-       t.create_collection(cid);
-       bufferlist bl;
-       t._encode(bl);
-       if (journal->submit_entry(bl, onsafe)) break;
-      }
-      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
-      break;
-    }
+    if (journal) {
+      Transaction t;
+      t.create_collection(cid);
+      bufferlist bl;
+      t._encode(bl);
+      journal->submit_entry(bl, onsafe);
+    } else
+      queue_commit_waiter(onsafe);
   } else {
     if (onsafe) delete onsafe;
   }
@@ -3596,17 +3563,14 @@ int Ebofs::destroy_collection(coll_t cid, Context *onsafe)
 
   // journal, wait for commit
   if (r >= 0) {
-    while (1) {
-      if (journal) {
-       Transaction t;
-       t.remove_collection(cid);
-       bufferlist bl;
-       t._encode(bl);
-       if (journal->submit_entry(bl, onsafe)) break;
-      }
-      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
-      break;
-    }
+    if (journal) {
+      Transaction t;
+      t.remove_collection(cid);
+      bufferlist bl;
+      t._encode(bl);
+      journal->submit_entry(bl, onsafe);
+    } else
+      queue_commit_waiter(onsafe);
   } else {
     if (onsafe) delete onsafe;
   }
@@ -3660,17 +3624,14 @@ int Ebofs::collection_add(coll_t cid, pobject_t oid, Context *onsafe)
 
   // journal, wait for commit
   if (r >= 0) {
-    while (1) {
-      if (journal) {
-       Transaction t;
-       t.collection_add(cid, oid);
-       bufferlist bl;
-       t._encode(bl);
-       if (journal->submit_entry(bl, onsafe)) break;
-      }
-      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
-      break;
-    }
+    if (journal) {
+      Transaction t;
+      t.collection_add(cid, oid);
+      bufferlist bl;
+      t._encode(bl);
+      journal->submit_entry(bl, onsafe);
+    } else
+      queue_commit_waiter(onsafe);
   } else {
     if (onsafe) delete onsafe;
   }
@@ -3711,17 +3672,14 @@ int Ebofs::collection_remove(coll_t cid, pobject_t oid, Context *onsafe)
 
   // journal, wait for commit
   if (r >= 0) {
-    while (1) {
-      if (journal) {
-       Transaction t;
-       t.collection_remove(cid, oid);
-       bufferlist bl;
-       t._encode(bl);
-       if (journal->submit_entry(bl, onsafe)) break;
-      }
-      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
-      break;
-    }
+    if (journal) {
+      Transaction t;
+      t.collection_remove(cid, oid);
+      bufferlist bl;
+      t._encode(bl);
+      journal->submit_entry(bl, onsafe);
+    } else
+      queue_commit_waiter(onsafe);
   } else {
     if (onsafe) delete onsafe;
   }
@@ -3784,17 +3742,14 @@ int Ebofs::collection_setattr(coll_t cid, const char *name, const void *value, s
 
   // journal, wait for commit
   if (r >= 0) {
-    while (1) {
-      if (journal) {
-       Transaction t;
-       t.collection_setattr(cid, name, value, size);
-       bufferlist bl;
-       t._encode(bl);
-       if (journal->submit_entry(bl, onsafe)) break;
-      }
-      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
-      break;
-    }
+    if (journal) {
+      Transaction t;
+      t.collection_setattr(cid, name, value, size);
+      bufferlist bl;
+      t._encode(bl);
+      journal->submit_entry(bl, onsafe);
+    } else
+      queue_commit_waiter(onsafe);
   } else {
     if (onsafe) delete onsafe;
   }
@@ -3892,17 +3847,14 @@ int Ebofs::collection_rmattr(coll_t cid, const char *name, Context *onsafe)
 
   // journal, wait for commit
   if (r >= 0) {
-    while (1) {
-      if (journal) {
-       Transaction t;
-       t.collection_rmattr(cid, name);
-       bufferlist bl;
-       t._encode(bl);
-       if (journal->submit_entry(bl, onsafe)) break;
-      }
-      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
-      break;
-    }
+    if (journal) {
+      Transaction t;
+      t.collection_rmattr(cid, name);
+      bufferlist bl;
+      t._encode(bl);
+      journal->submit_entry(bl, onsafe);
+    } else
+      queue_commit_waiter(onsafe);
   } else {
     if (onsafe) delete onsafe;
   }
index b0ba503f65ed1fa7efb7107993d8c37a86a7383b..216c1428baa342ca5637549575f57562982e55ab 100644 (file)
@@ -80,6 +80,13 @@ protected:
 public:
   uint64_t get_fsid() { return super_fsid; }
   epoch_t get_super_epoch() { return super_epoch; }
+
+  void queue_commit_waiter(Context *oncommit) {
+    if (oncommit) 
+      commit_waiters[super_epoch].push_back(oncommit);
+  }
+
+
 protected:
 
 
@@ -239,13 +246,13 @@ protected:
 
 
  public:
-  Ebofs(const char *devfn, char *jfn=0) : 
+  Ebofs(const char *devfn, const char *jfn=0) : 
     fake_writes(false),
     dev(devfn), 
     mounted(false), unmounting(false), dirty(false), readonly(false), 
     super_epoch(0), commit_starting(false), commit_thread_started(false),
     commit_thread(this),
-    journalfn(jfn), journal(0),
+    journal(0),
     free_blocks(0), limbo_blocks(0),
     allocator(this),
     nodepool(ebofs_lock),
@@ -258,7 +265,10 @@ protected:
     finisher_stop(false), finisher_thread(this) {
     for (int i=0; i<EBOFS_NUM_FREE_BUCKETS; i++)
       free_tab[i] = 0;
-    if (!journalfn) {
+    if (jfn) {
+      journalfn = new char[strlen(jfn) + 1];
+      strcpy(journalfn, jfn);
+    } else {
       journalfn = new char[strlen(devfn) + 100];
       strcpy(journalfn, devfn);
       strcat(journalfn, ".journal");
index 35a1e6f4127b6fb8dbbda4ca98ed8f42041d74f3..24ec04785cbb2284c9401f142ca29fe606fe78ae 100644 (file)
@@ -56,7 +56,7 @@ int FileJournal::create()
   
   // writeable.
   read_pos = 0;
-  write_pos = queue_pos = sizeof(header);
+  write_pos = sizeof(header);
 
   ::close(fd);
 
@@ -78,7 +78,7 @@ int FileJournal::open()
 
   // assume writeable, unless...
   read_pos = 0;
-  write_pos = queue_pos = sizeof(header);
+  write_pos = sizeof(header);
 
   // read header?
   read_header();
@@ -93,7 +93,7 @@ int FileJournal::open()
                << header.epoch[i] << " at " << header.offset[i]
                << dendl;
        read_pos = header.offset[i];
-       write_pos = queue_pos = 0;
+       write_pos = 0;
        break;
       }      
       else if (header.epoch[i] < ebofs->get_super_epoch()) {
@@ -190,131 +190,132 @@ void FileJournal::write_thread_entry()
     if (writeq.empty()) {
       // sleep
       dout(20) << "write_thread_entry going to sleep" << dendl;
-      assert(write_pos == queue_pos);
       write_cond.Wait(write_lock);
       dout(20) << "write_thread_entry woke up" << dendl;
       continue;
     }
     
-    // do queued writes
+    // gather queued writes
+    off_t queue_pos = write_pos;
+    bufferlist bl;
+
     while (!writeq.empty()) {
       // grab next item
       epoch_t epoch = writeq.front().first;
-      bufferlist bl;
-      bl.claim(writeq.front().second);
-      writeq.pop_front();
-      Context *oncommit = commitq.front();
-      commitq.pop_front();
+      bufferlist &ebl = writeq.front().second;
+      off_t size = 2*sizeof(entry_header_t) + ebl.length();
       
-      // wrap?
-      if (write_pos == header.wrap) {
-       dout(15) << "write_thread_entry wrapped write_pos at " << write_pos << " to " << sizeof(header_t) << dendl;
-       assert(header.wrap == write_pos);
-       write_header();
-       write_pos = sizeof(header_t);
+      // epoch boundary?
+      if (epoch > header.last_epoch()) {
+       dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl;
+       header.push(epoch, queue_pos);
       }
 
-      // write!
-      dout(15) << "write_thread_entry writing " << write_pos << " : " 
-              << bl.length() 
-              << " epoch " << epoch
-              << dendl;
+      // does it fit?
+      if (header.wrap) {
+       // we're wrapped.  don't overwrite ourselves.
+       if (queue_pos + size >= header.offset[0]) {
+         if (queue_pos != write_pos) break;  // do what we have, first
+         dout(10) << "JOURNAL FULL (and wrapped), " << queue_pos << "+" << size
+                  << " >= " << header.offset[0]
+                  << dendl;
+         full = true;
+         writeq.clear();
+         print_header();
+         break;
+       }
+      } else {
+       // we haven't wrapped.  
+       if (queue_pos + size >= header.max_size) {
+         if (queue_pos != write_pos) break;  // do what we have, first
+         // is there room if we wrap?
+         if ((off_t)sizeof(header_t) + size < header.offset[0]) {
+           // yes!
+           dout(10) << "wrapped from " << queue_pos << " to " << sizeof(header_t) << dendl;
+           header.wrap = queue_pos;
+           queue_pos = sizeof(header_t);
+           header.push(ebofs->get_super_epoch(), queue_pos);
+           write_header();
+         } else {
+           // no room.
+           dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size
+                    << " >= " << header.max_size
+                    << dendl;
+           full = true;
+           writeq.clear();
+           break;
+         }
+       }
+      }
+       
+      // add to write buffer
+      dout(15) << "write_thread_entry will write " << queue_pos << " : " 
+               << ebl.length() 
+               << " epoch " << epoch
+               << dendl;
       
-      // write entry header
+      // add it this entry
       entry_header_t h;
       h.epoch = epoch;
-      h.len = bl.length();
+      h.len = ebl.length();
       h.make_magic(write_pos, header.fsid);
+      bl.append((const char*)&h, sizeof(h));
+      bl.claim_append(ebl);
+      bl.append((const char*)&h, sizeof(h));
+      
+      Context *oncommit = commitq.front();
+      if (oncommit)
+       writingq.push_back(oncommit);
+      
+      // pop from writeq
+      writeq.pop_front();
+      commitq.pop_front();
+      break;
+    }
 
-      ::lseek(fd, write_pos, SEEK_SET);
-      ::write(fd, &h, sizeof(h));
+    // write anything?
+    if (bl.length() > 0) {
+      writing = true;
+      write_lock.Unlock();
+      dout(15) << "write_thread_entry writing " << write_pos << "~" << bl.length() << dendl;
       
+      ::lseek(fd, write_pos, SEEK_SET);
       for (list<bufferptr>::const_iterator it = bl.buffers().begin();
           it != bl.buffers().end();
           it++) {
        if ((*it).length() == 0) continue;  // blank buffer.
        ::write(fd, (char*)(*it).c_str(), (*it).length() );
       }
-
-      ::write(fd, &h, sizeof(h));
       
-      // move position pointer
-      write_pos += 2*sizeof(entry_header_t) + bl.length();
-      
-      if (oncommit) {
-       if (1) {
-         // queue callback
-         ebofs->queue_finisher(oncommit);
-       } else {
-         // callback now
-         oncommit->finish(0);
-         delete oncommit;
-       }
-      }
+      write_lock.Lock();    
+      writing = false;
+      write_pos = queue_pos;
+      ebofs->queue_finishers(writingq);
     }
   }
-  
   write_lock.Unlock();
   dout(10) << "write_thread_entry finish" << dendl;
 }
 
-bool FileJournal::submit_entry(bufferlist& e, Context *oncommit)
+bool FileJournal::is_full()
 {
-  assert(queue_pos != 0);  // bad create(), or journal didn't replay to completion.
-
-  // ** lock **
   Mutex::Locker locker(write_lock);
+  return full;
+}
 
-  // wrap? full?
-  off_t size = 2*sizeof(entry_header_t) + e.length();
-
-  if (full) return false;  // already marked full.
+void FileJournal::submit_entry(bufferlist& e, Context *oncommit)
+{
+  Mutex::Locker locker(write_lock);  // ** lock **
 
-  if (header.wrap) {
-    // we're wrapped.  don't overwrite ourselves.
-    if (queue_pos + size >= header.offset[0]) {
-      dout(10) << "submit_entry JOURNAL FULL (and wrapped), " << queue_pos << "+" << size
-              << " >= " << header.offset[0]
-              << dendl;
-      full = true;
-      print_header();
-      return false;
-    }
-  } else {
-    // we haven't wrapped.  
-    if (queue_pos + size >= header.max_size) {
-      // is there room if we wrap?
-      if ((off_t)sizeof(header_t) + size < header.offset[0]) {
-       // yes!
-       dout(10) << "submit_entry wrapped from " << queue_pos << " to " << sizeof(header_t) << dendl;
-       header.wrap = queue_pos;
-       queue_pos = sizeof(header_t);
-       header.push(ebofs->get_super_epoch(), queue_pos);
-      } else {
-       // no room.
-       dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size
-                << " >= " << header.max_size
-                << dendl;
-       full = true;
-       return false;
-      }
-    }
-  }
-  
-  dout(10) << "submit_entry " << queue_pos << " : " << e.length()
+  // dump on queue
+  dout(10) << "submit_entry " << e.length()
           << " epoch " << ebofs->get_super_epoch()
           << " " << oncommit << dendl;
-  
-  // dump on queue
-  writeq.push_back(pair<epoch_t,bufferlist>(ebofs->get_super_epoch(), e));
   commitq.push_back(oncommit);
-  
-  queue_pos += size;
-  
-  // kick writer thread
-  write_cond.Signal();
-
-  return true;
+  if (!full) {
+    writeq.push_back(pair<epoch_t,bufferlist>(ebofs->get_super_epoch(), e));
+    write_cond.Signal(); // kick writer thread
+  }
 }
 
 
@@ -336,63 +337,53 @@ void FileJournal::commit_epoch_start()
     dout(1) << " clearing FULL flag, journal now usable" << dendl;
     full = false;
   } 
-
-  // note epoch boundary
-  header.push(ebofs->get_super_epoch(), queue_pos);  // note: these entries may not yet be written.
-  //write_header();  // no need to write it now, though...
 }
 
 void FileJournal::commit_epoch_finish()
 {
   dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << dendl;
 
-  write_lock.Lock();
-  {
-    if (full) {
-      // full journal damage control.
-      dout(15) << " journal was FULL, contents now committed, clearing header.  journal still not usable until next epoch." << dendl;
-      header.clear();
-      write_pos = queue_pos = sizeof(header_t);
-    } else {
-      // update header -- trim/discard old (committed) epochs
-      while (header.epoch[0] < ebofs->get_super_epoch())
-       header.pop();
-    }
-    write_header();
-
-    // discard any unwritten items in previous epoch, and do callbacks
-    epoch_t epoch = ebofs->get_super_epoch();
-    list<Context*> callbacks;
-    while (!writeq.empty() && writeq.front().first < epoch) {
-      dout(15) << " dropping unwritten and committed " 
-              << write_pos << " : " << writeq.front().second.length()
-              << " epoch " << writeq.front().first 
-              << dendl;
-      // finisher?
-      Context *oncommit = commitq.front();
-      if (oncommit) callbacks.push_back(oncommit);
-
-      write_pos += 2*sizeof(entry_header_t) + writeq.front().second.length();
-
-      // discard.
-      writeq.pop_front();  
-      commitq.pop_front();
-    }
-    
-    // queue the finishers
-    ebofs->queue_finishers(callbacks);
-  }
-  write_lock.Unlock();
-  
+   Mutex::Locker locker(write_lock);
+
+   if (full) {
+     // full journal damage control.
+     dout(15) << " journal was FULL, contents now committed, clearing header.  journal still not usable until next epoch." << dendl;
+     header.clear();
+     write_pos = sizeof(header_t);
+   } else {
+     // update header -- trim/discard old (committed) epochs
+     while (header.num && header.epoch[0] < ebofs->get_super_epoch())
+       header.pop();
+   }
+   write_header();
+   
+   // discard any unwritten items in previous epoch
+   epoch_t epoch = ebofs->get_super_epoch();
+   while (!writeq.empty() && writeq.front().first < epoch) {
+     dout(15) << " dropping unwritten and committed " 
+             << write_pos << " : " << writeq.front().second.length()
+             << " epoch " << writeq.front().first 
+             << dendl;
+     // finisher?
+     Context *oncommit = commitq.front();
+     if (oncommit) writingq.push_back(oncommit);
+     
+     // discard.
+     writeq.pop_front();  
+     commitq.pop_front();
+   }
+   
+   // queue the finishers
+   ebofs->queue_finishers(writingq);
 }
 
 
 void FileJournal::make_writeable()
 {
   if (read_pos)
-    write_pos = queue_pos = read_pos;
+    write_pos = read_pos;
   else
-    write_pos = queue_pos = sizeof(header_t);
+    write_pos = sizeof(header_t);
   read_pos = 0;
 }
 
index 446adeb826c718421729c97666fb4b37208c5259..84b3a603f74a8293476ff891b8e367f78a84a23c 100644 (file)
@@ -66,6 +66,9 @@ public:
       offset[num] = o;
       num++;
     }
+    epoch_t last_epoch() {
+      return epoch[num-1];
+    }
   } header;
 
   struct entry_header_t {
@@ -88,16 +91,18 @@ public:
 private:
   string fn;
 
-  bool full;
+  bool full, writing;
   off_t write_pos;      // byte where next entry written goes
-  off_t queue_pos;      // byte where next entry queued for write goes
-
   off_t read_pos;       // 
 
   int fd;
 
-  list<pair<epoch_t,bufferlist> > writeq;  // currently journaling
-  list<Context*> commitq; // currently journaling
+  // to be journaled
+  list<pair<epoch_t,bufferlist> > writeq;
+  list<Context*> commitq;
+
+  // being journaled
+  list<Context*> writingq;
   
   // write thread
   Mutex write_lock;
@@ -122,10 +127,10 @@ private:
   } write_thread;
 
  public:
-  FileJournal(Ebofs *e, char *f) : 
+  FileJournal(Ebofs *e, const char *f) : 
     Journal(e), fn(f),
-    full(false),
-    write_pos(0), queue_pos(0), read_pos(0),
+    full(false), writing(false),
+    write_pos(0), read_pos(0),
     fd(0),
     write_stop(false), write_thread(this) { }
   ~FileJournal() {}
@@ -137,12 +142,14 @@ private:
   void make_writeable();
 
   // writes
-  bool submit_entry(bufferlist& e, Context *oncommit);  // submit an item
+  void submit_entry(bufferlist& e, Context *oncommit);  // submit an item
   void commit_epoch_start();   // mark epoch boundary
   void commit_epoch_finish();  // mark prior epoch as committed (we can expire)
 
   bool read_entry(bufferlist& bl, epoch_t& e);
 
+  bool is_full();
+
   // reads
 };
 
index 9bab0b7f3c1099bab0121a6dcc3523bc73ef039a..1f738dbdaca9a2c4c366c75dea83f09e78caf1f9 100644 (file)
@@ -35,10 +35,11 @@ public:
 
   // writes
   virtual void make_writeable() = 0;
-  virtual bool submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item
+  virtual void submit_entry(bufferlist& e, Context *oncommit) = 0;
   virtual void commit_epoch_start() = 0;  // mark epoch boundary
   virtual void commit_epoch_finish() = 0; // mark prior epoch as committed (we can expire)
   virtual bool read_entry(bufferlist& bl, epoch_t &e) = 0;
+  virtual bool is_full() = 0;
 
   // reads/recovery
   
index aa0dca3175c2b2a04a38520a4127692ca4ccacdf..ad365d3f7e2b7f47aa325a397b34210c7d711ae4 100644 (file)
@@ -169,7 +169,7 @@ public:
 
   // allocation
   void verify_extents() {
-    if (1) {  // do crazy stupid sanity checking
+    if (0) {  // do crazy stupid sanity checking
       block_t count = 0, pos = 0;
       interval_set<block_t> is;    
       csum_t csum = 0;
index 6ce7f843f1f9c73734e6bba05967d118952adb04..40d7671543f1883470da126b2c34f8e585eecb20 100644 (file)
@@ -12,6 +12,8 @@
  * 
  */
 
+#define dout(x) if (x <= g_conf.debug_ebofs) *_dout << dbeginl
+
 #include <iostream>
 #include "ebofs/Ebofs.h"
 
@@ -22,9 +24,9 @@ struct C_Commit : public Context {
   C_Commit(off_t o) : off(o) {}
   void finish(int r) {
     utime_t now = g_clock.now();
-    cout << off << "\t" 
+    dout(0) << off << "\t" 
         << (writes[off].second-writes[off].first) << "\t"
-        << (now - writes[off].first) << std::endl;
+        << (now - writes[off].first) << dendl;
     writes.erase(off);
   }
 };
@@ -37,10 +39,13 @@ int main(int argc, const char **argv)
   parse_config_options(args);
 
   // args
-  if (args.size() != 3) return -1;
+  if (args.size() < 3) return -1;
   const char *filename = args[0];
   int seconds = atoi(args[1]);
   int bytes = atoi(args[2]);
+  const char *journal = 0;
+  if (args.size() >= 4)
+    journal = args[3];
 
   buffer::ptr bp(bytes);
   bp.zero();
@@ -51,7 +56,7 @@ int main(int argc, const char **argv)
   cout << "#dev " << filename
        << seconds << " seconds, " << bytes << " bytes per write" << std::endl;
 
-  Ebofs fs(filename);
+  Ebofs fs(filename, journal);
   if (fs.mkfs() < 0) {
     cout << "mkfs failed" << std::endl;
     return -1;