]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: journal rewrite
authorSage Weil <sage@newdream.net>
Thu, 11 Sep 2008 17:56:16 +0000 (10:56 -0700)
committerSage Weil <sage@newdream.net>
Thu, 11 Sep 2008 23:58:27 +0000 (16:58 -0700)
12 files changed:
src/TODO
src/config.cc
src/config.h
src/ebofs/Ebofs.cc
src/include/types.h
src/os/FileJournal.cc
src/os/FileJournal.h
src/os/Journal.h
src/os/JournalingObjectStore.cc
src/os/JournalingObjectStore.h
src/streamtest.cc
src/vstartnew.sh

index 5f19bb4f5c31ae14c8dc9f24af6ef40358af7062..f18bf553cb2e799c229c2045fe341dd1f1f58f23 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -1,6 +1,7 @@
 v0.4
-- btrfs latency.  update howto.
-- snap garbage collection
+/- btrfs latency.  update howto.
+/- snap garbage collection
+- journal
 
 v0.5
 - ENOSPC
@@ -51,10 +52,12 @@ userspace client
 - fix readdir vs fragment race by keeping a separate frag pos, and ignoring dentries below it
 
 kernel client
+- statfs fsid should xor the ceph_fsid hi/low words
+- async writepages?
+  - we want to saturate network with writeback on a single file
 - add i_frag_mutex to protect the frag tree..
   - get rid of ugly i_lock vs kmalloc juggling  in get_or_create_frag
-- fsync on dir?
-- virtual xattr for exposing dirstat/rstat info (instead of 'cat dirname')
+- fsync on dir?  what is that supposed to do?
 - make writepages maybe skip pages with errors?
   - EIO, or ENOSPC?
   - ... writeback vs ENOSPC vs flush vs close()... hrm...
index 4d7527c6e2f87a7f374a388a0276b6ce4687ae29..8699474d06f14d8b1ebd94e52682703556ef0bd4 100644 (file)
@@ -450,7 +450,7 @@ md_config_t g_conf = {
   // journal
   journal_dio: false,
   journal_max_write_bytes: 0,
-  journal_max_write_entries: 10,
+  journal_max_write_entries: 100,
 
   // --- block device ---
   bdev_lock: true,
index ef891e0c1a3bf4d3c28a42aed6374ec759315d42..b7819c1154312e3532393d8a338b8f603c3718b8 100644 (file)
@@ -309,8 +309,8 @@ struct md_config_t {
   
   // journal
   bool journal_dio;
-  bool journal_max_write_bytes;
-  bool journal_max_write_entries;
+  int journal_max_write_bytes;
+  int journal_max_write_entries;
 
   // block device
   bool  bdev_lock;
index 1ad6be27c7e7e5584058ff3b468c59cea610ccdc..9bf65b7fb6235303df109391a11e34a7ba185380 100644 (file)
@@ -137,7 +137,7 @@ int Ebofs::mount()
       
       while (1) {
        bufferlist bl;
-       epoch_t e;
+       __u64 e;
        if (!journal->read_entry(bl, e)) {
          dout(3) << "mount replay: end of journal, done." << dendl;
          break;
@@ -503,8 +503,6 @@ int Ebofs::commit_thread_entry()
       while (1) {
        // --- queue up commit writes ---
        bc.poison_commit = false;
-       if (journal) 
-         journal->commit_epoch_start(super_epoch);  // FIXME: make loopable
        commit_inodes_start();      // do this first; it currently involves inode reallocation
        allocator.commit_limbo();   // limbo -> limbo_tab
        nodepool.commit_start(dev, super_epoch);
@@ -557,8 +555,8 @@ int Ebofs::commit_thread_entry()
         alloc_more_node_space();
       }
       
-      // signal journal
-      if (journal) journal->commit_epoch_finish(super_epoch);
+      // trim journal
+      if (journal) journal->committed_thru(super_epoch-1);
 
       // kick waiters
       dout(10) << "commit_thread queueing commit + kicking sync waiters" << dendl;
@@ -2447,6 +2445,10 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
   ebofs_lock.Lock();
   dout(7) << "apply_transaction start (" << t.get_num_ops() << " ops)" << dendl;
 
+  bufferlist bl;
+  if (journal)
+    t.encode(bl);
+
   unsigned r = _apply_transaction(t);
 
   // journal, wait for commit
@@ -2455,8 +2457,6 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
     onsafe = 0;
   }
   if (journal) {
-    bufferlist bl;
-    t.encode(bl);
     journal->submit_entry(super_epoch, bl, onsafe);
   } else
     queue_commit_waiter(onsafe);
index 7929b067552b77c844bdbbf31e780b38c2740a41..ea17a0343b179cc581a1c76d798ff6daf2f409aa 100644 (file)
@@ -102,6 +102,16 @@ inline ostream& operator<<(ostream& out, const vector<A>& v) {
   out << "]";
   return out;
 }
+template<class A>
+inline ostream& operator<<(ostream& out, const deque<A>& v) {
+  out << "<";
+  for (typename deque<A>::const_iterator p = v.begin(); p != v.end(); p++) {
+    if (p != v.begin()) out << ",";
+    out << *p;
+  }
+  out << ">";
+  return out;
+}
 
 template<class A>
 inline ostream& operator<<(ostream& out, const list<A>& ilist) {
index b6cfd1d03382bbc27f39f43a111f725e9ee8691b..5bde9833c568876156e4e14d5a87a805db9e3be2 100644 (file)
@@ -74,6 +74,7 @@ int FileJournal::create()
     header.alignment = block_size;
   else
     header.alignment = 16;  // at least stay word aligned on 64bit machines...
+  header.start = get_top();
   print_header();
 
   buffer::ptr bp = prepare_header();
@@ -89,9 +90,9 @@ int FileJournal::create()
   return 0;
 }
 
-int FileJournal::open(epoch_t epoch)
+int FileJournal::open(__u64 next_seq)
 {
-  dout(2) << "open " << fn << dendl;
+  dout(2) << "open " << fn << " next_seq " << next_seq << dendl;
 
   int err = _open(false);
   if (err < 0) return err;
@@ -102,11 +103,11 @@ int FileJournal::open(epoch_t epoch)
 
   // read header?
   read_header();
-  dout(10) << "open journal header.fsid = " << header.fsid 
+  dout(10) << "open header.fsid = " << header.fsid 
     //<< " vs expected fsid = " << fsid 
           << dendl;
   if (header.fsid != fsid) {
-    dout(2) << "open journal fsid doesn't match, invalid (someone else's?) journal" << dendl;
+    dout(2) << "open fsid doesn't match, invalid (someone else's?) journal" << dendl;
     err = -EINVAL;
   } 
   if (header.max_size > max_size) {
@@ -127,40 +128,27 @@ int FileJournal::open(epoch_t epoch)
 
   // looks like a valid header.
   write_pos = 0;  // not writeable yet
-  read_pos = 0;
 
-  if (header.num > 0) {
-    // pick an offset
-    for (int i=0; i<header.num; i++) {
-      if (header.epoch[i] == epoch) {
-       dout(2) << "using read_pos header pointer "
-               << header.epoch[i] << " at " << header.offset[i]
-               << dendl;
-       read_pos = header.offset[i];
-       write_pos = 0;
-       break;
-      }      
-      else if (header.epoch[i] < epoch) {
-       dout(2) << "super_epoch is " << epoch 
-               << ", skipping old " << header.epoch[i] << " at " << header.offset[i]
-               << dendl;
-      }
-      else if (header.epoch[i] > epoch) {
-       dout(2) << "super_epoch is " << epoch 
-               << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i]
-               << dendl;
-       break;
-      }
+  // find next entry
+  read_pos = header.start;
+  while (1) {
+    bufferlist bl;
+    __u64 seq;
+    off64_t old_pos = read_pos;
+    if (!read_entry(bl, seq)) {
+      dout(10) << "open reached end of journal." << dendl;
+      break;
     }
-
-    if (read_pos == 0) {
-      dout(0) << "no valid journal segments" << dendl;
-      return 0; //hrm return -EINVAL; 
+    if (seq > next_seq) {
+      dout(10) << "open entry " << seq << " len " << bl.length() << " > next_seq " << next_seq << dendl;
+      read_pos = -1;
+      return 0;
+    }
+    if (seq == next_seq) {
+      dout(10) << "open reached seq " << seq << dendl;
+      read_pos = old_pos;
+      break;
     }
-
-  } else {
-    dout(0) << "journal was empty" << dendl;
-    read_pos = -1;
   }
 
   return 0;
@@ -175,7 +163,6 @@ void FileJournal::close()
 
   // close
   assert(writeq.empty());
-  assert(commitq.empty());
   assert(fd > 0);
   ::close(fd);
   fd = -1;
@@ -199,16 +186,15 @@ void FileJournal::stop_writer()
 }
 
 
+
 void FileJournal::print_header()
 {
-  for (int i=0; i<header.num; i++) {
-    if (i && header.offset[i] < header.offset[i-1]) {
-      assert(header.wrap);
-      dout(10) << "header: wrap at " << header.wrap << dendl;
-    }
-    dout(10) << "header: epoch " << header.epoch[i] << " at " << header.offset[i] << dendl;
-  }
-  //if (header.wrap) dout(10) << "header: wrap at " << header.wrap << dendl;
+  dout(10) << "header: block_size " << header.block_size
+          << " alignment " << header.alignment
+          << " max_size " << header.max_size
+          << dendl;
+  dout(10) << "header: start " << header.start << " wrap " << header.wrap << dendl;
+  dout(10) << " write_pos " << write_pos << dendl;
 }
 
 void FileJournal::read_header()
@@ -246,48 +232,62 @@ bufferptr FileJournal::prepare_header()
 
 
 
-void FileJournal::check_for_wrap(epoch_t epoch, off64_t pos, off64_t size)
+bool FileJournal::check_for_wrap(__u64 seq, off64_t *pos, off64_t size, bool can_wrap)
 {
-  // epoch boundary?
-  dout(10) << "check_for_wrap epoch " << epoch << " last " << header.last_epoch() << " of " << header.num << dendl;
-  if (epoch > header.last_epoch()) {
-    dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl;
-    header.push(epoch, pos);
-    must_write_header = true;
-  }
+  dout(20) << "check_for_wrap seq " << seq
+          << " pos " << *pos << " size " << size
+          << " max_size " << header.max_size
+          << " wrap " << header.wrap
+          << dendl;
+  
+  // already full?
+  if (full_commit_seq || full_restart_seq)
+    return false;
 
   // does it fit?
   if (header.wrap) {
     // we're wrapped.  don't overwrite ourselves.
-    if (pos + size >= header.offset[0]) {
-      dout(10) << "JOURNAL FULL (and wrapped), " << pos << "+" << size
-              << " >= " << header.offset[0]
-              << dendl;
-      full = true;
-      writeq.clear();
-      print_header();
-    }
+
+    if (*pos + size < header.start)
+      return true; // fits
+
+    dout(10) << "JOURNAL FULL (and wrapped), " << *pos << "+" << size
+            << " >= " << header.start
+            << dendl;
   } else {
     // we haven't wrapped.  
-    if (pos + size >= header.max_size) {
-      // is there room if we wrap?
-      if (get_top() + size < header.offset[0]) {
-       // yes!
-       dout(10) << "wrapped from " << pos << " to " << get_top() << dendl;
-       header.wrap = pos;
-       pos = get_top();
-       header.push(epoch, pos);
-       must_write_header = true;
-      } else {
-       // no room.
-       dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << pos << "+" << size
-                << " >= " << header.max_size
-                << dendl;
-       full = true;
-       writeq.clear();
-      }
+
+    if (*pos + size < header.max_size)
+      return true; // fits
+
+    if (!can_wrap)
+      return false;  // can't wrap just now..
+
+    // is there room if we wrap?
+    if (get_top() + size < header.start) {
+      // yes!
+      dout(10) << " wrapping from " << *pos << " to " << get_top() << dendl;
+      header.wrap = *pos;
+      *pos = get_top();
+      must_write_header = true;
+      return true;
     }
+
+    // no room.
+    dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << *pos << "+" << size
+            << " >= " << header.max_size
+            << dendl;
   }
+
+  full_commit_seq = seq;
+  full_restart_seq = seq+1;
+  while (!writeq.empty()) {
+    writing_seq.push_back(writeq.front().seq);
+    writing_fin.push_back(writeq.front().fin);
+    writeq.pop_front();
+  }  
+  print_header();
+  return false;
 }
 
 
@@ -299,79 +299,109 @@ void FileJournal::prepare_multi_write(bufferlist& bl)
   int eleft = g_conf.journal_max_write_entries;
   int bleft = g_conf.journal_max_write_bytes;
 
+  if (full_commit_seq || full_restart_seq)
+    return;
+
   while (!writeq.empty()) {
     // grab next item
-    epoch_t epoch = writeq.front().first;
-    bufferlist &ebl = writeq.front().second;
+    __u64 seq = writeq.front().seq;
+    bufferlist &ebl = writeq.front().bl;
     off64_t size = 2*sizeof(entry_header_t) + ebl.length();
 
-    if (bl.length() > 0 && bleft > 0 && bleft < size) break;
-    
-    check_for_wrap(epoch, queue_pos, size);
-    if (full) break;
-    if (bl.length() && must_write_header) 
+    bool can_wrap = !bl.length();  // only wrap if this is a new thinger
+    if (!check_for_wrap(seq, &queue_pos, size, can_wrap))
       break;
+
+    // set write_pos?  (check_for_wrap may have moved it)
+    if (!bl.length())
+      write_pos = queue_pos;
     
     // add to write buffer
-    dout(15) << "prepare_multi_write will write " << queue_pos << " : " 
-            << ebl.length() << " epoch " << epoch << " -> " << size << dendl;
+    dout(15) << "prepare_multi_write will write " << queue_pos << " : seq " << seq
+            << " len " << ebl.length() << " -> " << size
+            << " (left " << eleft << "/" << bleft << ")"
+            << dendl;
     
     // add it this entry
     entry_header_t h;
-    h.epoch = epoch;
+    h.seq = seq;
     h.len = ebl.length();
     h.make_magic(queue_pos, header.fsid);
     bl.append((const char*)&h, sizeof(h));
     bl.claim_append(ebl);
     bl.append((const char*)&h, sizeof(h));
     
-    Context *oncommit = commitq.front();
-    if (oncommit)
-      writingq.push_back(oncommit);
-    
+    if (writeq.front().fin) {
+      writing_seq.push_back(seq);
+      writing_fin.push_back(writeq.front().fin);
+    }
+
     // pop from writeq
     writeq.pop_front();
-    commitq.pop_front();
+    journalq.push_back(pair<__u64,off64_t>(seq, queue_pos));
 
     queue_pos += size;
-    if (--eleft == 0) break;
-    bleft -= size;
-    if (bleft == 0) break;
+
+    // pad...
+    if (queue_pos % header.alignment) {
+      int pad = header.alignment - (queue_pos % header.alignment);
+      bufferptr bp(pad);
+      bl.push_back(bp);
+      queue_pos += pad;
+      //dout(20) << "   padding with " << pad << " bytes, queue_pos now " << queue_pos << dendl;
+    }
+
+    if (eleft) {
+      if (--eleft == 0) {
+       dout(20) << "    hit max events per write " << g_conf.journal_max_write_entries << dendl;
+       break;
+      }
+    }
+    if (bleft) {
+      bleft -= size;
+      if (bleft == 0) {
+       dout(20) << "    hit max write size " << g_conf.journal_max_write_bytes << dendl;
+       break;
+      }
+    }
   }
 }
 
 bool FileJournal::prepare_single_dio_write(bufferlist& bl)
 {
   // grab next item
-  epoch_t epoch = writeq.front().first;
-  bufferlist &ebl = writeq.front().second;
+  __u64 seq = writeq.front().seq;
+  bufferlist &ebl = writeq.front().bl;
     
   off64_t size = 2*sizeof(entry_header_t) + ebl.length();
   size = ROUND_UP_TO(size, header.alignment);
   
-  check_for_wrap(epoch, write_pos, size);
-  if (full) return false;
+  if (!check_for_wrap(seq, &write_pos, size, true))
+    return false;
+  if (full_commit_seq || full_restart_seq) return false;
 
   // build it
-  dout(15) << "prepare_single_dio_write will write " << write_pos << " : 
-          << ebl.length() << " epoch " << epoch << " -> " << size << dendl;
+  dout(15) << "prepare_single_dio_write will write " << write_pos << " : seq " << seq
+          << " len " << ebl.length() << " -> " << size << dendl;
 
   bufferptr bp = buffer::create_page_aligned(size);
   entry_header_t *h = (entry_header_t*)bp.c_str();
-  h->epoch = epoch;
+  h->seq = seq;
   h->len = ebl.length();
   h->make_magic(write_pos, header.fsid);
   ebl.copy(0, ebl.length(), bp.c_str()+sizeof(*h));
   memcpy(bp.c_str() + sizeof(*h) + ebl.length(), h, sizeof(*h));
   bl.push_back(bp);
   
-  Context *oncommit = commitq.front();
-  if (oncommit)
-    writingq.push_back(oncommit);
+  if (writeq.front().fin) {
+    writing_seq.push_back(seq);
+    writing_fin.push_back(writeq.front().fin);
+  }
   
   // pop from writeq
   writeq.pop_front();
-  commitq.pop_front();
+  journalq.push_back(pair<__u64,off64_t>(seq, write_pos));
+
   return true;
 }
 
@@ -382,8 +412,10 @@ void FileJournal::do_write(bufferlist& bl)
     return;
 
   buffer::ptr hbp;
-  if (must_write_header) 
+  if (must_write_header) {
+    must_write_header = false;
     hbp = prepare_header();
+  }
 
   writing = true;
 
@@ -392,7 +424,7 @@ void FileJournal::do_write(bufferlist& bl)
   write_lock.Unlock();
 
   dout(15) << "do_write writing " << write_pos << "~" << bl.length() 
-          << (must_write_header ? " + header":"")
+          << (hbp.length() ? " + header":"")
           << dendl;
   
   // header
@@ -430,14 +462,19 @@ void FileJournal::do_write(bufferlist& bl)
   write_lock.Lock();    
 
   writing = false;
-  if (memcmp(&old_header, &header, sizeof(header)) == 0) {
-    write_pos += bl.length();
-    write_pos = ROUND_UP_TO(write_pos, header.alignment);
-    finisher->queue(writingq);
+
+  write_pos += bl.length();
+  write_pos = ROUND_UP_TO(write_pos, header.alignment);
+
+  // kick finisher?  
+  //  only if we haven't filled up recently!
+  if (full_commit_seq || full_restart_seq) {
+    dout(10) << "do_write NOT queueing finisher seq " << writing_seq.front()
+            << ", full_commit_seq|full_restart_seq" << dendl;
   } else {
-    dout(10) << "do_write finished write but header changed?  not moving write_pos." << dendl;
-    derr(0) << "do_write finished write but header changed?  not moving write_pos." << dendl;
-    assert(writingq.empty());
+    dout(20) << "do_write doing finisher queue " << writing_fin << dendl;
+    writing_seq.clear();
+    finisher->queue(writing_fin);
   }
 }
 
@@ -457,7 +494,6 @@ void FileJournal::write_thread_entry()
     }
     
     bufferlist bl;
-    must_write_header = false;
     if (directio)
       prepare_single_dio_write(bl);
     else
@@ -470,92 +506,83 @@ void FileJournal::write_thread_entry()
 }
 
 
-bool FileJournal::is_full()
-{
-  Mutex::Locker locker(write_lock);
-  return full;
-}
-
-void FileJournal::submit_entry(epoch_t epoch, bufferlist& e, Context *oncommit)
+void FileJournal::submit_entry(__u64 seq, bufferlist& e, Context *oncommit)
 {
   Mutex::Locker locker(write_lock);  // ** lock **
 
   // dump on queue
-  dout(10) << "submit_entry " << e.length()
-          << " epoch " << epoch
-          << " " << oncommit << dendl;
-  commitq.push_back(oncommit);
-  if (!full) {
-    writeq.push_back(pair<epoch_t,bufferlist>(epoch, e));
+  dout(10) << "submit_entry seq " << seq
+          << " len " << e.length()
+          << " (" << oncommit << ")" << dendl;
+  
+  if (!full_commit_seq && full_restart_seq && 
+      seq >= full_restart_seq) {
+    dout(10) << " seq " << seq << " >= full_restart_seq " << full_restart_seq 
+            << ", restarting journal" << dendl;
+    full_restart_seq = 0;
+  }
+  if (!full_commit_seq && !full_restart_seq) {
+    writeq.push_back(write_item(seq, e, oncommit));
     write_cond.Signal(); // kick writer thread
+  } else {
+    // not journaling this.  restart writing no sooner than seq + 1.
+    full_restart_seq = seq+1;
+    dout(10) << " journal is/was full, will restart no sooner than seq " << full_restart_seq << dendl;
+    writing_seq.push_back(seq);
+    writing_fin.push_back(oncommit);
   }
 }
 
 
-void FileJournal::commit_epoch_start(epoch_t new_epoch)
+void FileJournal::committed_thru(__u64 seq)
 {
-  dout(10) << "commit_epoch_start on " << new_epoch-1 
-          << " -- new epoch " << new_epoch
-          << dendl;
+  dout(10) << "committed_thru " << seq << dendl;
 
   Mutex::Locker locker(write_lock);
 
-  // was full -> empty -> now usable?
-  if (full) {
-    if (header.num != 0) {
-      dout(1) << " journal FULL, ignoring this epoch" << dendl;
-      return;
-    }
-
-    dout(1) << " clearing FULL flag, journal now usable" << dendl;
-    full = false;
-  } 
-}
-
-void FileJournal::commit_epoch_finish(epoch_t new_epoch)
-{
-  dout(10) << "commit_epoch_finish committed " << (new_epoch-1) << dendl;
+  // was full?
+  if (full_commit_seq && seq >= full_commit_seq) {
+    dout(1) << " seq " << seq << " >= full_commit_seq " << full_commit_seq 
+           << ", prior journal contents are now fully committed.  resetting journal." << dendl;
+    full_commit_seq = 0;
+  }
 
-  Mutex::Locker locker(write_lock);
-  
-  if (full) {
-    // full journal damage control.
-    dout(15) << " journal was FULL, contents now committed, clearing header.  journal still not usable until next epoch." << dendl;
-    header.clear();
-    write_pos = get_top();
-  } else {
-    // update header -- trim/discard old (committed) epochs
-    print_header();
-    while (header.num && header.epoch[0] < new_epoch) {
-      dout(10) << " popping epoch " << header.epoch[0] << " < " << new_epoch << dendl;
-      header.pop();
-    }
-    if (header.num == 0) {
-      dout(10) << " starting fresh" << dendl;
-      write_pos = get_top();
-      header.push(new_epoch, write_pos);
+  // adjust start pointer
+  while (!journalq.empty() && journalq.front().first <= seq) {
+    if (journalq.front().second == get_top()) {
+      dout(10) << " committed event at " << journalq.front().second << ", clearing wrap marker" << dendl;
+      header.wrap = 0;  // clear wrap marker
     }
+    journalq.pop_front();
+  }
+  if (!journalq.empty()) {
+    header.start = journalq.front().second;
+  } else {
+    header.start = write_pos;
   }
   must_write_header = true;
+  print_header();
   
-  // discard any unwritten items in previous epoch
-  while (!writeq.empty() && writeq.front().first < new_epoch) {
-    dout(15) << " dropping unwritten and committed " 
-            << write_pos << " : " << writeq.front().second.length()
-            << " epoch " << writeq.front().first 
+  // committed but writing
+  while (!writing_seq.empty() && writing_seq.front() < seq) {
+    dout(15) << " finishing committed but writing|waiting seq " << writing_seq.front() << dendl;
+    finisher->queue(writing_fin.front());
+    writing_seq.pop_front();
+    writing_fin.pop_front();
+  }
+  
+  // committed but unjournaled items
+  while (!writeq.empty() && writeq.front().seq < seq) {
+    dout(15) << " dropping committed but unwritten seq " << writeq.front().seq 
+            << " len " << writeq.front().bl.length()
+            << " (" << writeq.front().fin << ")"
             << dendl;
-    // finisher?
-    Context *oncommit = commitq.front();
-    if (oncommit) writingq.push_back(oncommit);
-    
-    // discard.
+    if (writeq.front().fin)
+      finisher->queue(writeq.front().fin);
     writeq.pop_front();  
-    commitq.pop_front();
   }
   
-  // queue the finishers
-  finisher->queue(writingq);
-  dout(10) << "commit_epoch_finish done" << dendl;
+  dout(10) << "committed_thru done" << dendl;
 }
 
 
@@ -574,23 +601,14 @@ void FileJournal::make_writeable()
 }
 
 
-bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch)
+bool FileJournal::read_entry(bufferlist& bl, __u64& seq)
 {
   if (!read_pos) {
     dout(2) << "read_entry -- not readable" << dendl;
     return false;
   }
-
   if (read_pos == header.wrap) {
-    // find wrap point
-    for (int i=1; i<header.num; i++) {
-      if (header.offset[i] < read_pos) {
-       assert(header.offset[i-1] < read_pos);
-       read_pos = header.offset[i];
-       break;
-      }
-    }
-    assert(read_pos != header.wrap);
+    read_pos = get_top();
     dout(10) << "read_entry wrapped from " << header.wrap << " to " << read_pos << dendl;
   }
 
@@ -615,7 +633,7 @@ bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch)
   entry_header_t f;
   ::read(fd, &f, sizeof(h));
   if (!f.check_magic(read_pos, header.fsid) ||
-      h.epoch != f.epoch ||
+      h.seq != f.seq ||
       h.len != f.len) {
     dout(2) << "read_entry " << read_pos << " : bad footer magic, partial entry, end of journal" << dendl;
     return false;
@@ -623,16 +641,17 @@ bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch)
 
 
   // yay!
-  dout(1) << "read_entry " << read_pos << " : 
+  dout(1) << "read_entry " << read_pos << " : seq " << h.seq
          << " " << h.len << " bytes"
-         << " epoch " << h.epoch 
          << dendl;
-  
+  bl.clear();
   bl.push_back(bp);
-  epoch = h.epoch;
+  seq = h.seq;
+
+  journalq.push_back(pair<__u64,off64_t>(h.seq, read_pos));
 
   read_pos += 2*sizeof(entry_header_t) + h.len;
   read_pos = ROUND_UP_TO(read_pos, header.alignment);
-
+  
   return true;
 }
index 3c46397a23dde1298186eb87e117fbf170ed9f87..8d779fce3fced454c348efe8431ea09b6f89dafa 100644 (file)
@@ -16,6 +16,8 @@
 #ifndef __EBOFS_FILEJOURNAL_H
 #define __EBOFS_FILEJOURNAL_H
 
+#include <deque>
+using std::deque;
 
 #include "Journal.h"
 #include "common/Cond.h"
 
 class FileJournal : public Journal {
 public:
-  /** log header
-   * we allow 4 pointers:
-   *  top/initial,
-   *  one for an epoch boundary (if any),
-   *  one for a wrap in the ring buffer/journal file,
-   *  one for a second epoch boundary (if any).
-   * the epoch boundary one is useful only for speedier recovery in certain cases
-   * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!)
+  /*
+   * journal header
    */
   struct header_t {
     __u64 fsid;
-    __s64 num;
     __u32 block_size;
     __u32 alignment;
-    __s64 max_size;
-    __s64 wrap;
-    __u32 epoch[4];
-    __s64 offset[4];
+    __s64 max_size;   // max size of journal ring buffer
+    __s64 wrap;       // wrap byte pos (if any)
+    __s64 start;      // offset of first entry
 
-    header_t() : fsid(0), num(0), block_size(0), alignment(0), max_size(0), wrap(0) {}
+    header_t() : fsid(0), block_size(0), alignment(0), max_size(0), wrap(0), start(0) {}
 
     void clear() {
-      num = 0;
       wrap = 0;
-    }
-    void pop() {
-      if (num >= 2 && offset[0] > offset[1]) 
-       wrap = 0;  // we're eliminating a wrap
-      num--;
-      for (int i=0; i<num; i++) {
-       epoch[i] = epoch[i+1];
-       offset[i] = offset[i+1];
-      }
-    }
-    void push(epoch_t e, off64_t o) {
-      assert(num < 4);
-      if (num > 2 && 
-         epoch[num-1] == e &&
-         epoch[num-2] == (e-1)) 
-       num--;  // tail was an epoch boundary; replace it.
-      epoch[num] = e;
-      offset[num] = o;
-      num++;
-    }
-    epoch_t last_epoch() {
-      if (num)
-       return epoch[num-1];
-      else
-       return 0;
+      start = block_size;
     }
   } header;
 
   struct entry_header_t {
-    uint64_t epoch;
+    uint64_t seq;  // fs op seq #
     uint64_t len;
     uint64_t magic1;
     uint64_t magic2;
     
     void make_magic(off64_t pos, uint64_t fsid) {
       magic1 = pos;
-      magic2 = fsid ^ epoch ^ len;
+      magic2 = fsid ^ seq ^ len;
     }
     bool check_magic(off64_t pos, uint64_t fsid) {
       return
        magic1 == (uint64_t)pos &&
-       magic2 == (fsid ^ epoch ^ len);
+       magic2 == (fsid ^ seq ^ len);
     }
   };
 
@@ -99,18 +68,33 @@ private:
   off64_t max_size;
   size_t block_size;
   bool directio;
-  bool full, writing, must_write_header;
-  off64_t write_pos;      // byte where next entry written goes
+  bool writing, must_write_header;
+  off64_t write_pos;      // byte where the next entry to be written will go
   off64_t read_pos;       // 
 
+  __u64 seq;
+
+  __u64 full_commit_seq;  // don't write, wait for this seq to commit
+  __u64 full_restart_seq; // start writing again with this seq
+
   int fd;
 
-  // to be journaled
-  list<pair<epoch_t,bufferlist> > writeq;
-  deque<Context*> commitq;
+  // in journal
+  deque<pair<__u64, off64_t> > journalq;  // track seq offsets, so we can trim later.
 
-  // being journaled
-  deque<Context*> writingq;
+  // currently being journaled and awaiting callback.
+  //  or, awaiting callback bc journal was full.
+  deque<__u64> writing_seq;
+  deque<Context*> writing_fin;
+
+  // waiting to be journaled
+  struct write_item {
+    __u64 seq;
+    bufferlist bl;
+    Context *fin;
+    write_item(__u64 s, bufferlist& b, Context *f) : seq(s), fin(f) { bl.claim(b); }
+  };
+  deque<write_item> writeq;
   
   // write thread
   Mutex write_lock;
@@ -125,7 +109,7 @@ private:
   void stop_writer();
   void write_thread_entry();
 
-  void check_for_wrap(epoch_t epoch, off64_t pos, off64_t size);
+  bool check_for_wrap(__u64 seq, off64_t *pos, off64_t size, bool can_wrap);
   bool prepare_single_dio_write(bufferlist& bl);
   void prepare_multi_write(bufferlist& bl);
   void do_write(bufferlist& bl);
@@ -152,14 +136,16 @@ private:
     Journal(fsid, fin), fn(f),
     max_size(0), block_size(0),
     directio(dio),
-    full(false), writing(false), must_write_header(false),
+    writing(false), must_write_header(false),
     write_pos(0), read_pos(0),
+    seq(0), 
+    full_commit_seq(0), full_restart_seq(0),
     fd(-1),
     write_stop(false), write_thread(this) { }
   ~FileJournal() {}
 
   int create();
-  int open(epoch_t epoch);
+  int open(__u64 last_seq);
   void close();
 
   bool is_writeable() {
@@ -168,15 +154,12 @@ private:
   void make_writeable();
 
   // writes
-  void submit_entry(epoch_t epoch, bufferlist& e, Context *oncommit);  // submit an item
-  void commit_epoch_start(epoch_t);   // mark epoch boundary
-  void commit_epoch_finish(epoch_t);  // mark prior epoch as committed (we can expire)
-
-  bool read_entry(bufferlist& bl, epoch_t& e);
-
+  void submit_entry(__u64 seq, bufferlist& bl, Context *oncommit);  // submit an item
+  void committed_thru(__u64 seq);
   bool is_full();
 
   // reads
+  bool read_entry(bufferlist& bl, __u64& seq);
 };
 
 #endif
index a2aff3130f6d21f4f4efd5ae38c456ec6c5ac9e3..4c1bdd6dbfbe6f5f7cc8abbbcb7665d5113730d7 100644 (file)
@@ -30,17 +30,15 @@ public:
   virtual ~Journal() { }
 
   virtual int create() = 0;
-  virtual int open(epoch_t epoch) = 0;
+  virtual int open(__u64 last_seq) = 0;
   virtual void close() = 0;
 
   // writes
   virtual bool is_writeable() = 0;
   virtual void make_writeable() = 0;
-  virtual void submit_entry(epoch_t epoch, bufferlist& e, Context *oncommit) = 0;
-  virtual void commit_epoch_start(epoch_t) = 0;  // mark epoch boundary
-  virtual void commit_epoch_finish(epoch_t) = 0; // mark prior epoch as committed (we can expire)
-  virtual bool read_entry(bufferlist& bl, epoch_t &e) = 0;
-  virtual bool is_full() = 0;
+  virtual void submit_entry(__u64 seq, bufferlist& e, Context *oncommit) = 0;
+  virtual void committed_thru(__u64 seq) = 0;
+  virtual bool read_entry(bufferlist& bl, __u64 &seq) = 0;
 
   // reads/recovery
   
index 00db8a8711fceda8223496e925235a7f84142e20..5fd1c5a94fd2ac7716fa2756f8035c3fc1c38f3c 100644 (file)
@@ -23,7 +23,7 @@ int JournalingObjectStore::journal_replay()
   int count = 0;
   while (1) {
     bufferlist bl;
-    epoch_t e;
+    __u64 e;
     if (!journal->read_entry(bl, e)) {
       dout(3) << "journal_replay: end of journal, done." << dendl;
       break;
index 0600f0a1463c143382826c345c7fb93b55908dbb..de43b37d1cd5bb1ee6f24933559eeb3bcf677c9b 100644 (file)
@@ -17,6 +17,7 @@
 
 #include "ObjectStore.h"
 #include "Journal.h"
+#include "common/RWLock.h"
 
 class JournalingObjectStore : public ObjectStore {
 protected:
@@ -24,6 +25,7 @@ protected:
   Journal *journal;
   Finisher finisher;
   map<version_t, vector<Context*> > commit_waiters;
+  RWLock op_lock;
 
   void journal_start() {
     finisher.start();
@@ -38,15 +40,22 @@ protected:
   }
   int journal_replay();
 
+  void op_start() {
+    op_lock.get_read();
+  }
+  void op_finish() {
+    op_lock.put_read();    
+  }
+
   void commit_start() {
+    op_lock.get_write();
     super_epoch++;
-    if (journal)
-      journal->commit_epoch_start(super_epoch);
+    op_lock.put_write();
   }
   void commit_finish() {
-    finisher.queue(commit_waiters[super_epoch-1]);
     if (journal)
-      journal->commit_epoch_finish(super_epoch);
+      journal->committed_thru(super_epoch-1);
+    finisher.queue(commit_waiters[super_epoch-1]);
   }
 
   void queue_commit_waiter(Context *oncommit) {
index 15d517b615a5fa9cf1e67d750c76d358d855b747..bb70af62c669b30551e042f0b8fc6624eb0fcd20 100644 (file)
@@ -96,8 +96,8 @@ int main(int argc, const char **argv)
   cout << "#dev " << filename
        << ", " << seconds << " seconds, " << bytes << " bytes per write" << std::endl;
 
-  //ObjectStore *fs = new Ebofs(filename, journal);
-  ObjectStore *fs = new FileStore(filename);
+  ObjectStore *fs = new Ebofs(filename, journal);
+  //ObjectStore *fs = new FileStore(filename);
 
   if (g_conf.mkfs && 
       fs->mkfs() < 0) {
index c669e2b14b68d09432fa52dc4bd564514d153afa..836aaffaa652d5711139d9d35549c10cfcefb220 100755 (executable)
@@ -40,9 +40,9 @@ $CEPH_BIN/cmonctl osd setmap -i .ceph_osdmap
 
 for osd in 0 #1 #2 3 #4 5 6 7 8 9 10 11 12 13 14 15
 do
- $SUDO $CEPH_BIN/cosd --mkfs_for_osd $osd dev/osd$osd  # initialize empty object store
+ $SUDO $CEPH_BIN/cosd --debug_journal 20 --mkfs_for_osd $osd dev/osd$osd  # initialize empty object store
 # echo valgrind --leak-check=full --show-reachable=yes $CEPH_BIN/cosd dev/osd$osd --debug_ms 1 --debug_osd 20 --debug_filestore 10 --debug_ebofs 20 #1>out/o$osd #& #--debug_osd 40
- $SUDO $CEPH_BIN/cosd -m $IP:$CEPH_PORT dev/osd$osd -d --debug_ms 1 --debug_osd 20 --debug_filestore 20
+ $SUDO $CEPH_BIN/cosd -m $IP:$CEPH_PORT dev/osd$osd -d --debug_ms 1 --debug_journal 20 --debug_osd 20 --debug_filestore 20 --debug_ebofs 20
 done
 
 # mds