]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
roll directio into FileJournal
authorSage Weil <sage@newdream.net>
Fri, 25 Jan 2008 00:33:32 +0000 (16:33 -0800)
committerSage Weil <sage@newdream.net>
Fri, 25 Jan 2008 00:33:32 +0000 (16:33 -0800)
src/Makefile
src/ebofs/DIOJournal.cc [deleted file]
src/ebofs/DIOJournal.h [deleted file]
src/ebofs/Ebofs.cc
src/ebofs/FileJournal.cc
src/ebofs/FileJournal.h

index 023f5e144f0fdb86e9c479bb330ecb1cc9859144..6f383bcb498681a9bed39daf6bcb8b1d00ec831f 100644 (file)
@@ -53,8 +53,7 @@ EBOFS_OBJS= \
        ebofs/BufferCache.o\
        ebofs/Ebofs.o\
        ebofs/Allocator.o\
-       ebofs/FileJournal.o\
-       ebofs/DIOJournal.o
+       ebofs/FileJournal.o
 
 MDS_OBJS= \
        mds/MDS.o\
diff --git a/src/ebofs/DIOJournal.cc b/src/ebofs/DIOJournal.cc
deleted file mode 100644 (file)
index 9146abc..0000000
+++ /dev/null
@@ -1,438 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-#include "DIOJournal.h"
-#include "Ebofs.h"
-
-#include <stdio.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-
-
-#include "config.h"
-
-#define dout(x) if (x <= g_conf.debug_ebofs) *_dout << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal "
-#define derr(x) if (x <= g_conf.debug_ebofs) *_derr << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal "
-
-
-int DioJournal::create()
-{
-  dout(2) << "create " << fn << dendl;
-
-  // open/create
-  fd = ::open(fn.c_str(), O_RDWR|O_DIRECT);
-  if (fd < 0) {
-    dout(2) << "create failed " << errno << " " << strerror(errno) << dendl;
-    return -errno;
-  }
-  assert(fd > 0);
-
-  //::ftruncate(fd, 0);
-  //::fchmod(fd, 0644);
-
-  // get size
-  struct stat st;
-  ::fstat(fd, &st);
-  dout(2) << "create " << fn << " " << st.st_size << " bytes" << dendl;
-
-  // write empty header
-  memset(&header, 0, sizeof(header));
-  header.clear();
-  header.fsid = ebofs->get_fsid();
-  header.max_size = st.st_size;
-  write_header();
-  
-  // writeable.
-  read_pos = 0;
-  write_pos = sizeof(header);
-
-  ::close(fd);
-
-  return 0;
-}
-
-int DioJournal::open()
-{
-  //dout(1) << "open " << fn << dendl;
-
-  // open and file
-  assert(fd == 0);
-  fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
-  if (fd < 0) {
-    dout(2) << "open failed " << errno << " " << strerror(errno) << dendl;
-    return -errno;
-  }
-  assert(fd > 0);
-
-  // assume writeable, unless...
-  read_pos = 0;
-  write_pos = sizeof(header);
-
-  // read header?
-  read_header();
-  if (header.fsid != ebofs->get_fsid()) {
-    dout(2) << "open journal fsid doesn't match, invalid (someone else's?) journal" << dendl;
-  } 
-  else if (header.num > 0) {
-    // valid header, pick an offset
-    for (int i=0; i<header.num; i++) {
-      if (header.epoch[i] == ebofs->get_super_epoch()) {
-       dout(2) << "using read_pos header pointer "
-               << header.epoch[i] << " at " << header.offset[i]
-               << dendl;
-       read_pos = header.offset[i];
-       write_pos = 0;
-       break;
-      }      
-      else if (header.epoch[i] < ebofs->get_super_epoch()) {
-       dout(2) << "super_epoch is " << ebofs->get_super_epoch() 
-               << ", skipping old " << header.epoch[i] << " at " << header.offset[i]
-               << dendl;
-      }
-      else if (header.epoch[i] > ebofs->get_super_epoch()) {
-       dout(2) << "super_epoch is " << ebofs->get_super_epoch() 
-               << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i]
-               << dendl;
-       break;
-      }
-    }
-  }
-
-  start_writer();
-
-  return 0;
-}
-
-void DioJournal::close()
-{
-  dout(1) << "close " << fn << dendl;
-
-  // stop writer thread
-  stop_writer();
-
-  // close
-  assert(writeq.empty());
-  assert(commitq.empty());
-  assert(fd > 0);
-  ::close(fd);
-  fd = 0;
-}
-
-void DioJournal::start_writer()
-{
-  write_stop = false;
-  write_thread.create();
-}
-
-void DioJournal::stop_writer()
-{
-  write_lock.Lock();
-  {
-    write_stop = true;
-    write_cond.Signal();
-  } 
-  write_lock.Unlock();
-  write_thread.join();
-}
-
-
-void DioJournal::print_header()
-{
-  for (int i=0; i<header.num; i++) {
-    if (i && header.offset[i] < header.offset[i-1]) {
-      assert(header.wrap);
-      dout(10) << "header: wrap at " << header.wrap << dendl;
-    }
-    dout(10) << "header: epoch " << header.epoch[i] << " at " << header.offset[i] << dendl;
-  }
-  //if (header.wrap) dout(10) << "header: wrap at " << header.wrap << dendl;
-}
-void DioJournal::read_header()
-{
-  dout(10) << "read_header" << dendl;
-  char buf[4096];
-  memset(buf, 0, 4096);  // zero out (read may fail)
-  ::lseek(fd, 0, SEEK_SET);
-  int r = ::read(fd, buf, 4096);
-  memcpy(&header, buf, sizeof(header));
-  if (r < 0) 
-    dout(0) << "read_header error " << errno << " " << strerror(errno) << dendl;
-  print_header();
-}
-void DioJournal::write_header()
-{
-  dout(10) << "write_header " << dendl;
-  print_header();
-
-  ::lseek(fd, 0, SEEK_SET);
-  char buf[4096];
-  memcpy(buf, &header, sizeof(header));
-  int r = ::write(fd, buf, 4096);
-  if (r < 0) 
-    dout(0) << "write_header error " << errno << " " << strerror(errno) << dendl;
-}
-
-
-void DioJournal::write_thread_entry()
-{
-  dout(10) << "write_thread_entry start" << dendl;
-  write_lock.Lock();
-  
-  while (!write_stop) {
-    if (writeq.empty()) {
-      // sleep
-      dout(20) << "write_thread_entry going to sleep" << dendl;
-      write_cond.Wait(write_lock);
-      dout(20) << "write_thread_entry woke up" << dendl;
-      continue;
-    }
-    
-    // grab next item
-    epoch_t epoch = writeq.front().first;
-    bufferlist &ebl = writeq.front().second;
-
-    // epoch boundary?
-    if (epoch > header.last_epoch()) {
-      dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl;
-      header.push(epoch, write_pos);
-    }
-
-    off_t size = 2*sizeof(entry_header_t) + ebl.length();
-    size = DIV_ROUND_UP(size, 4096) * 4096;
-
-    // does it fit?
-    if (header.wrap) {
-      // we're wrapped.  don't overwrite ourselves.
-      if (write_pos + size >= header.offset[0]) {
-       dout(10) << "JOURNAL FULL (and wrapped), " << write_pos << "+" << size
-                << " >= " << header.offset[0]
-                << dendl;
-       full = true;
-       writeq.clear();
-       print_header();
-       continue;
-      }
-    } else {
-      // we haven't wrapped.  
-      if (write_pos + size >= header.max_size) {
-       // is there room if we wrap?
-       if ((off_t)sizeof(header_t) + size < header.offset[0]) {
-         // yes!
-         dout(10) << "wrapped from " << write_pos << " to " << sizeof(header_t) << dendl;
-         header.wrap = write_pos;
-         write_pos = sizeof(header_t);
-         header.push(ebofs->get_super_epoch(), write_pos);
-         write_header();
-       } else {
-         // no room.
-         dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << write_pos << "+" << size
-                  << " >= " << header.max_size
-                  << dendl;
-         full = true;
-         writeq.clear();
-         continue;
-       }
-      }
-    }
-
-    // build it
-    bufferptr bp = buffer::create_page_aligned(size);
-    entry_header_t *h = (entry_header_t*)bp.c_str();
-    ebl.copy(0, ebl.length(), bp.c_str()+sizeof(entry_header_t));
-    
-    // add to write buffer
-    dout(15) << "write_thread_entry will write " << write_pos << " : " 
-            << bp.length() 
-            << " epoch " << epoch
-            << dendl;
-      
-    // add it this entry
-    h->epoch = epoch;
-    h->len = ebl.length();
-    h->make_magic(write_pos, header.fsid);
-    memcpy(bp.c_str() + sizeof(*h) + ebl.length(), h, sizeof(*h));
-      
-    Context *oncommit = commitq.front();
-    if (oncommit)
-      writingq.push_back(oncommit);
-      
-    // pop from writeq
-    writeq.pop_front();
-    commitq.pop_front();
-    
-    writing = true;
-    write_lock.Unlock();
-    dout(15) << "write_thread_entry writing " << write_pos << "~" << bp.length() << dendl;
-    
-    ::lseek(fd, write_pos, SEEK_SET);
-    ::write(fd, bp.c_str(), size);
-    
-    write_lock.Lock();    
-    writing = false;
-    write_pos += bp.length();
-    ebofs->queue_finishers(writingq);
-  }
-  write_lock.Unlock();
-  dout(10) << "write_thread_entry finish" << dendl;
-}
-
-bool DioJournal::is_full()
-{
-  Mutex::Locker locker(write_lock);
-  return full;
-}
-
-void DioJournal::submit_entry(bufferlist& e, Context *oncommit)
-{
-  Mutex::Locker locker(write_lock);  // ** lock **
-
-  // dump on queue
-  dout(10) << "submit_entry " << e.length()
-          << " epoch " << ebofs->get_super_epoch()
-          << " " << oncommit << dendl;
-  commitq.push_back(oncommit);
-  if (!full) {
-    writeq.push_back(pair<epoch_t,bufferlist>(ebofs->get_super_epoch(), e));
-    write_cond.Signal(); // kick writer thread
-  }
-}
-
-
-void DioJournal::commit_epoch_start()
-{
-  dout(10) << "commit_epoch_start on " << ebofs->get_super_epoch()-1 
-          << " -- new epoch " << ebofs->get_super_epoch()
-          << dendl;
-
-  Mutex::Locker locker(write_lock);
-
-  // was full -> empty -> now usable?
-  if (full) {
-    if (header.num != 0) {
-      dout(1) << " journal FULL, ignoring this epoch" << dendl;
-      return;
-    }
-    
-    dout(1) << " clearing FULL flag, journal now usable" << dendl;
-    full = false;
-  } 
-}
-
-void DioJournal::commit_epoch_finish()
-{
-  dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << dendl;
-
-   Mutex::Locker locker(write_lock);
-
-   if (full) {
-     // full journal damage control.
-     dout(15) << " journal was FULL, contents now committed, clearing header.  journal still not usable until next epoch." << dendl;
-     header.clear();
-     write_pos = sizeof(header_t);
-   } else {
-     // update header -- trim/discard old (committed) epochs
-     while (header.num && header.epoch[0] < ebofs->get_super_epoch())
-       header.pop();
-   }
-   write_header();
-   
-   // discard any unwritten items in previous epoch
-   epoch_t epoch = ebofs->get_super_epoch();
-   while (!writeq.empty() && writeq.front().first < epoch) {
-     dout(15) << " dropping unwritten and committed " 
-             << write_pos << " : " << writeq.front().second.length()
-             << " epoch " << writeq.front().first 
-             << dendl;
-     // finisher?
-     Context *oncommit = commitq.front();
-     if (oncommit) writingq.push_back(oncommit);
-     
-     // discard.
-     writeq.pop_front();  
-     commitq.pop_front();
-   }
-   
-   // queue the finishers
-   ebofs->queue_finishers(writingq);
-}
-
-
-void DioJournal::make_writeable()
-{
-  if (read_pos)
-    write_pos = read_pos;
-  else
-    write_pos = sizeof(header_t);
-  read_pos = 0;
-}
-
-
-bool DioJournal::read_entry(bufferlist& bl, epoch_t& epoch)
-{
-  if (!read_pos) {
-    dout(2) << "read_entry -- not readable" << dendl;
-    return false;
-  }
-
-  if (read_pos == header.wrap) {
-    // find wrap point
-    for (int i=1; i<header.num; i++) {
-      if (header.offset[i] < read_pos) {
-       assert(header.offset[i-1] < read_pos);
-       read_pos = header.offset[i];
-       break;
-      }
-    }
-    assert(read_pos != header.wrap);
-    dout(10) << "read_entry wrapped from " << header.wrap << " to " << read_pos << dendl;
-  }
-
-  // header
-  entry_header_t h;
-  ::lseek(fd, read_pos, SEEK_SET);
-  ::read(fd, &h, sizeof(h));
-  if (!h.check_magic(read_pos, header.fsid)) {
-    dout(2) << "read_entry " << read_pos << " : bad header magic, end of journal" << dendl;
-    return false;
-  }
-
-  // body
-  bufferptr bp(h.len);
-  ::read(fd, bp.c_str(), h.len);
-
-  // footer
-  entry_header_t f;
-  ::read(fd, &f, sizeof(h));
-  if (!f.check_magic(read_pos, header.fsid) ||
-      h.epoch != f.epoch ||
-      h.len != f.len) {
-    dout(2) << "read_entry " << read_pos << " : bad footer magic, partially entry, end of journal" << dendl;
-    return false;
-  }
-
-
-  // yay!
-  dout(1) << "read_entry " << read_pos << " : " 
-         << " " << h.len << " bytes"
-         << " epoch " << h.epoch 
-         << dendl;
-  
-  bl.push_back(bp);
-  epoch = h.epoch;
-
-  read_pos += 2*sizeof(entry_header_t) + h.len;
-
-  return true;
-}
diff --git a/src/ebofs/DIOJournal.h b/src/ebofs/DIOJournal.h
deleted file mode 100644 (file)
index 38a72cc..0000000
+++ /dev/null
@@ -1,156 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed dio system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See dio COPYING.
- * 
- */
-
-
-#ifndef __EBOFS_DIOJOURNAL_H
-#define __EBOFS_DIOJOURNAL_H
-
-
-#include "Journal.h"
-#include "common/Cond.h"
-#include "common/Mutex.h"
-#include "common/Thread.h"
-
-class DioJournal : public Journal {
-public:
-  /** log header
-   * we allow 4 pointers:
-   *  top/initial,
-   *  one for an epoch boundary (if any),
-   *  one for a wrap in the ring buffer/journal dio,
-   *  one for a second epoch boundary (if any).
-   * the epoch boundary one is useful only for speedier recovery in certain cases
-   * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!)
-   */
-  struct header_t {
-    uint64_t fsid;
-    int num;
-    off_t wrap;
-    off_t max_size;
-    epoch_t epoch[4];
-    off_t offset[4];
-
-    header_t() : fsid(0), num(0), wrap(0), max_size(0) {}
-
-    void clear() {
-      num = 0;
-      wrap = 0;
-    }
-    void pop() {
-      if (num >= 2 && offset[0] > offset[1]) 
-       wrap = 0;  // we're eliminating a wrap
-      num--;
-      for (int i=0; i<num; i++) {
-       epoch[i] = epoch[i+1];
-       offset[i] = offset[i+1];
-      }
-    }
-    void push(epoch_t e, off_t o) {
-      assert(num < 4);
-      if (num > 2 && 
-         epoch[num-1] == e &&
-         epoch[num-2] == (e-1)) 
-       num--;  // tail was an epoch boundary; replace it.
-      epoch[num] = e;
-      offset[num] = o;
-      num++;
-    }
-    epoch_t last_epoch() {
-      return epoch[num-1];
-    }
-  } header;
-
-  struct entry_header_t {
-    uint64_t epoch;
-    uint64_t len;
-    uint64_t magic1;
-    uint64_t magic2;
-    
-    void make_magic(off_t pos, uint64_t fsid) {
-      magic1 = pos;
-      magic2 = fsid ^ epoch ^ len;
-    }
-    bool check_magic(off_t pos, uint64_t fsid) {
-      return
-       magic1 == (uint64_t)pos &&
-       magic2 == (fsid ^ epoch ^ len);
-    }
-  };
-
-private:
-  string fn;
-
-  bool full, writing;
-  off_t write_pos;      // byte where next entry written goes
-  off_t read_pos;       // 
-
-  int fd;
-
-  // to be journaled
-  list<pair<epoch_t,bufferlist> > writeq;
-  list<Context*> commitq;
-
-  // being journaled
-  list<Context*> writingq;
-  
-  // write thread
-  Mutex write_lock;
-  Cond write_cond;
-  bool write_stop;
-
-  void print_header();
-  void read_header();
-  void write_header();
-  void start_writer();
-  void stop_writer();
-  void write_thread_entry();
-
-  class Writer : public Thread {
-    DioJournal *journal;
-  public:
-    Writer(DioJournal *fj) : journal(fj) {}
-    void *entry() {
-      journal->write_thread_entry();
-      return 0;
-    }
-  } write_thread;
-
- public:
-  DioJournal(Ebofs *e, const char *f) : 
-    Journal(e), fn(f),
-    full(false), writing(false),
-    write_pos(0), read_pos(0),
-    fd(0),
-    write_stop(false), write_thread(this) { }
-  ~DioJournal() {}
-
-  int create();
-  int open();
-  void close();
-
-  void make_writeable();
-
-  // writes
-  void submit_entry(bufferlist& e, Context *oncommit);  // submit an item
-  void commit_epoch_start();   // mark epoch boundary
-  void commit_epoch_finish();  // mark prior epoch as committed (we can expire)
-
-  bool read_entry(bufferlist& bl, epoch_t& e);
-
-  bool is_full();
-
-  // reads
-};
-
-#endif
index e6410abc8430cf540f74288188815d0fbd1c151d..176c263e3f3ec07255ec1eb59cd74422641f7ed9 100644 (file)
@@ -17,7 +17,6 @@
 #include "Ebofs.h"
 
 #include "FileJournal.h"
-#include "DIOJournal.h"
 
 #include <errno.h>
 
index 82d82a0b8a6e9aa68f5c312daed102d0509a8e0f..d2d26fb1de12fa16cc6353e412c1ab5b4268c95d 100644 (file)
@@ -45,7 +45,8 @@ int FileJournal::create()
   // get size
   struct stat st;
   ::fstat(fd, &st);
-  dout(2) << "create " << fn << " " << st.st_size << " bytes" << dendl;
+  block_size = st.st_blksize;
+  dout(2) << "create " << fn << " " << st.st_size << " bytes, block size " << block_size << dendl;
 
   // write empty header
   memset(&header, 0, sizeof(header));
@@ -56,7 +57,7 @@ int FileJournal::create()
   
   // writeable.
   read_pos = 0;
-  write_pos = sizeof(header);
+  write_pos = get_top();
 
   ::close(fd);
 
@@ -78,7 +79,7 @@ int FileJournal::open()
 
   // assume writeable, unless...
   read_pos = 0;
-  write_pos = sizeof(header);
+  write_pos = get_top();
 
   // read header?
   read_header();
@@ -161,21 +162,36 @@ void FileJournal::print_header()
 }
 void FileJournal::read_header()
 {
+  int r;
   dout(10) << "read_header" << dendl;
-  memset(&header, 0, sizeof(header));  // zero out (read may fail)
   ::lseek(fd, 0, SEEK_SET);
-  int r = ::read(fd, &header, sizeof(header));
+  if (directio) {
+    buffer::ptr bp = buffer::create_page_aligned(block_size);
+    bp.zero();
+    r = ::read(fd, bp.c_str(), bp.length());
+    memcpy(&header, bp.c_str(), sizeof(header));
+  } else {
+    memset(&header, 0, sizeof(header));  // zero out (read may fail)
+    r = ::read(fd, &header, sizeof(header));
+  }
   if (r < 0) 
     dout(0) << "read_header error " << errno << " " << strerror(errno) << dendl;
   print_header();
 }
 void FileJournal::write_header()
 {
+  int r;
   dout(10) << "write_header " << dendl;
   print_header();
-
   ::lseek(fd, 0, SEEK_SET);
-  int r = ::write(fd, &header, sizeof(header));
+  if (directio) {
+    buffer::ptr bp = buffer::create_page_aligned(block_size);
+    bp.zero();
+    memcpy(bp.c_str(), &header, sizeof(header));
+    r = ::write(fd, bp.c_str(), bp.length());
+  } else {
+    r = ::write(fd, &header, sizeof(header));
+  }
   if (r < 0) 
     dout(0) << "write_header error " << errno << " " << strerror(errno) << dendl;
 }
@@ -231,9 +247,9 @@ void FileJournal::write_thread_entry()
          // is there room if we wrap?
          if ((off_t)sizeof(header_t) + size < header.offset[0]) {
            // yes!
-           dout(10) << "wrapped from " << queue_pos << " to " << sizeof(header_t) << dendl;
+           dout(10) << "wrapped from " << queue_pos << " to " << get_top() << dendl;
            header.wrap = queue_pos;
-           queue_pos = sizeof(header_t);
+           queue_pos = get_top();
            header.push(ebofs->get_super_epoch(), queue_pos);
            write_header();
          } else {
@@ -297,6 +313,109 @@ void FileJournal::write_thread_entry()
   dout(10) << "write_thread_entry finish" << dendl;
 }
 
+void FileJournal::write_thread_entry_dio()
+{
+  dout(10) << "write_thread_entry start" << dendl;
+  write_lock.Lock();
+  
+  while (!write_stop) {
+    if (writeq.empty()) {
+      // sleep
+      dout(20) << "write_thread_entry going to sleep" << dendl;
+      write_cond.Wait(write_lock);
+      dout(20) << "write_thread_entry woke up" << dendl;
+      continue;
+    }
+    
+    // grab next item
+    epoch_t epoch = writeq.front().first;
+    bufferlist &ebl = writeq.front().second;
+
+    // epoch boundary?
+    if (epoch > header.last_epoch()) {
+      dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl;
+      header.push(epoch, write_pos);
+    }
+
+    off_t size = 2*sizeof(entry_header_t) + ebl.length();
+    size = DIV_ROUND_UP(size, 4096) * 4096;
+
+    // does it fit?
+    if (header.wrap) {
+      // we're wrapped.  don't overwrite ourselves.
+      if (write_pos + size >= header.offset[0]) {
+       dout(10) << "JOURNAL FULL (and wrapped), " << write_pos << "+" << size
+                << " >= " << header.offset[0]
+                << dendl;
+       full = true;
+       writeq.clear();
+       print_header();
+       continue;
+      }
+    } else {
+      // we haven't wrapped.  
+      if (write_pos + size >= header.max_size) {
+       // is there room if we wrap?
+       if ((off_t)sizeof(header_t) + size < header.offset[0]) {
+         // yes!
+         dout(10) << "wrapped from " << write_pos << " to " << sizeof(header_t) << dendl;
+         header.wrap = write_pos;
+         write_pos = sizeof(header_t);
+         header.push(ebofs->get_super_epoch(), write_pos);
+         write_header();
+       } else {
+         // no room.
+         dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << write_pos << "+" << size
+                  << " >= " << header.max_size
+                  << dendl;
+         full = true;
+         writeq.clear();
+         continue;
+       }
+      }
+    }
+
+    // build it
+    bufferptr bp = buffer::create_page_aligned(size);
+    entry_header_t *h = (entry_header_t*)bp.c_str();
+    ebl.copy(0, ebl.length(), bp.c_str()+sizeof(entry_header_t));
+    
+    // add to write buffer
+    dout(15) << "write_thread_entry will write " << write_pos << " : " 
+            << bp.length() 
+            << " epoch " << epoch
+            << dendl;
+      
+    // add it this entry
+    h->epoch = epoch;
+    h->len = ebl.length();
+    h->make_magic(write_pos, header.fsid);
+    memcpy(bp.c_str() + sizeof(*h) + ebl.length(), h, sizeof(*h));
+      
+    Context *oncommit = commitq.front();
+    if (oncommit)
+      writingq.push_back(oncommit);
+      
+    // pop from writeq
+    writeq.pop_front();
+    commitq.pop_front();
+    
+    writing = true;
+    write_lock.Unlock();
+    dout(15) << "write_thread_entry writing " << write_pos << "~" << bp.length() << dendl;
+    
+    ::lseek(fd, write_pos, SEEK_SET);
+    ::write(fd, bp.c_str(), size);
+    
+    write_lock.Lock();    
+    writing = false;
+    write_pos += bp.length();
+    ebofs->queue_finishers(writingq);
+  }
+  write_lock.Unlock();
+  dout(10) << "write_thread_entry finish" << dendl;
+}
+
 bool FileJournal::is_full()
 {
   Mutex::Locker locker(write_lock);
@@ -349,7 +468,7 @@ void FileJournal::commit_epoch_finish()
      // full journal damage control.
      dout(15) << " journal was FULL, contents now committed, clearing header.  journal still not usable until next epoch." << dendl;
      header.clear();
-     write_pos = sizeof(header_t);
+     write_pos = get_top();
    } else {
      // update header -- trim/discard old (committed) epochs
      while (header.num && header.epoch[0] < ebofs->get_super_epoch())
@@ -383,7 +502,7 @@ void FileJournal::make_writeable()
   if (read_pos)
     write_pos = read_pos;
   else
-    write_pos = sizeof(header_t);
+    write_pos = get_top();
   read_pos = 0;
 }
 
index 84b3a603f74a8293476ff891b8e367f78a84a23c..050b6f251be5f1610c8021eeea9f068d679552a5 100644 (file)
@@ -91,6 +91,8 @@ public:
 private:
   string fn;
 
+  size_t block_size;
+  bool directio;
   bool full, writing;
   off_t write_pos;      // byte where next entry written goes
   off_t read_pos;       // 
@@ -115,20 +117,32 @@ private:
   void start_writer();
   void stop_writer();
   void write_thread_entry();
+  void write_thread_entry_dio();
 
   class Writer : public Thread {
     FileJournal *journal;
   public:
     Writer(FileJournal *fj) : journal(fj) {}
     void *entry() {
-      journal->write_thread_entry();
+      if (journal->directio)
+       journal->write_thread_entry_dio();
+      else
+       journal->write_thread_entry();
       return 0;
     }
   } write_thread;
 
+  off_t get_top() {
+    if (directio)
+      return block_size;
+    else
+      return sizeof(header);
+  }
+
  public:
-  FileJournal(Ebofs *e, const char *f) : 
+  FileJournal(Ebofs *e, const char *f, bool dio=false) : 
     Journal(e), fn(f),
+    directio(dio),
     full(false), writing(false),
     write_pos(0), read_pos(0),
     fd(0),