]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: factor journaling out of ebofs
authorSage Weil <sage@newdream.net>
Wed, 27 Feb 2008 17:57:29 +0000 (09:57 -0800)
committerSage Weil <sage@newdream.net>
Wed, 27 Feb 2008 19:58:26 +0000 (11:58 -0800)
12 files changed:
src/Makefile.am
src/TODO
src/common/Finisher.cc [new file with mode: 0644]
src/common/Finisher.h [new file with mode: 0644]
src/ebofs/Ebofs.cc
src/ebofs/Ebofs.h
src/ebofs/FileJournal.cc [deleted file]
src/ebofs/FileJournal.h [deleted file]
src/ebofs/Journal.h [deleted file]
src/osd/FileJournal.cc [new file with mode: 0644]
src/osd/FileJournal.h [new file with mode: 0644]
src/osd/Journal.h [new file with mode: 0644]

index 30419e5c2f502b7084df0effa054829df035bb03..b0c77b625999adb5067326308922cc2ffcafb516 100644 (file)
@@ -76,6 +76,7 @@ libcommon_a_SOURCES = \
        common/Logger.cc \
        common/Clock.cc \
        common/Timer.cc \
+       common/Finisher.cc \
        mon/MonMap.cc \
        config.cc
 
@@ -100,7 +101,7 @@ libebofs_a_SOURCES = \
        ebofs/BufferCache.cc \
        ebofs/Ebofs.cc \
        ebofs/Allocator.cc \
-       ebofs/FileJournal.cc
+       osd/FileJournal.cc
 
 libmds_a_SOURCES = \
        mds/MDS.cc \
@@ -161,6 +162,7 @@ noinst_HEADERS = \
         common/ThreadPool.h\
         common/Timer.h\
         common/Thread.h\
+        common/Finisher.h\
         crush/builder.h\
         crush/hash.h\
         crush/mapper.h\
@@ -171,11 +173,11 @@ noinst_HEADERS = \
         ebofs/csum.h\
         ebofs/BlockDevice.h\
         ebofs/Ebofs.h\
-        ebofs/FileJournal.h\
+        osd/FileJournal.h\
         ebofs/types.h\
         ebofs/Allocator.h\
         ebofs/BufferCache.h\
-        ebofs/Journal.h\
+        osd/Journal.h\
         ebofs/nodes.h\
         ebofs/Cnode.h\
         ebofs/Onode.h\
index a3ef2bab3a59bbb407e301c4f20d3171cc13f959..7b49ee4f3c003858417a5b8b3c657415efc276f5 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -14,6 +14,9 @@ code cleanup
 - userspace encoding/decoding needs major cleanup
   - use le32 etc annotation
   - probably kill base case in encoder.h, replace with int types, with appropriate swabbing
+- ip quad printfs?
+- addr=?
+- fix ceph_lookup_open
 
 kernel client
 - some bugs
diff --git a/src/common/Finisher.cc b/src/common/Finisher.cc
new file mode 100644 (file)
index 0000000..cd4b4cd
--- /dev/null
@@ -0,0 +1,45 @@
+
+#include "config.h"
+#include "Finisher.h"
+
+void Finisher::start()
+{
+  finisher_thread.create();
+}
+
+void Finisher::stop()
+{
+  finisher_lock.Lock();
+  finisher_stop = true;
+  finisher_cond.Signal();
+  finisher_lock.Unlock();
+  finisher_thread.join();
+}
+
+void *Finisher::finisher_thread_entry()
+{
+  finisher_lock.Lock();
+  //dout_generic(10) << "finisher_thread start" << dendl;
+
+  while (!finisher_stop) {
+    while (!finisher_queue.empty()) {
+      list<Context*> ls;
+      ls.swap(finisher_queue);
+
+      finisher_lock.Unlock();
+
+      finish_contexts(ls, 0);
+
+      finisher_lock.Lock();
+    }
+    if (finisher_stop) break;
+    
+    //dout_generic(30) << "finisher_thread sleeping" << dendl;
+    finisher_cond.Wait(finisher_lock);
+  }
+
+  //dout_generic(10) << "finisher_thread start" << dendl;
+  finisher_lock.Unlock();
+  return 0;
+}
+
diff --git a/src/common/Finisher.h b/src/common/Finisher.h
new file mode 100644 (file)
index 0000000..e3ef5bf
--- /dev/null
@@ -0,0 +1,56 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __CEPH_FINISHER_H
+#define __CEPH_FINISHER_H
+
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "common/Thread.h"
+
+class Finisher {
+  Mutex          finisher_lock;
+  Cond           finisher_cond;
+  bool           finisher_stop;
+  list<Context*> finisher_queue;
+  
+  void *finisher_thread_entry();
+
+  struct FinisherThread : public Thread {
+    Finisher *fin;    
+    FinisherThread(Finisher *f) : fin(f) {}
+    void* entry() { return (void*)fin->finisher_thread_entry(); }
+  } finisher_thread;
+
+ public:
+  void queue(Context *c) {
+    finisher_lock.Lock();
+    finisher_queue.push_back(c);
+    finisher_cond.Signal();
+    finisher_lock.Unlock();
+  }
+  void queue(list<Context*>& ls) {
+    finisher_lock.Lock();
+    finisher_queue.splice(finisher_queue.end(), ls);
+    finisher_cond.Signal();
+    finisher_lock.Unlock();
+  }
+  
+  void start();
+  void stop();
+
+  Finisher() : finisher_stop(false), finisher_thread(this) {}
+};
+
+#endif
index cbc74c11db1f3ac9e460f7a12e92d466e8321701..89dd8dd86b2700f08ff9ca3376081570fe3f8b45 100644 (file)
@@ -16,7 +16,7 @@
 
 #include "Ebofs.h"
 
-#include "FileJournal.h"
+#include "osd/FileJournal.h"
 
 #include <errno.h>
 
@@ -120,8 +120,8 @@ int Ebofs::mount()
 
   // open journal
   if (journalfn) {
-    journal = new FileJournal(this, journalfn, g_conf.ebofs_journal_dio);
-    int err = journal->open();
+    journal = new FileJournal(sb->fsid, &finisher, journalfn, g_conf.ebofs_journal_dio);
+    int err = journal->open(super_epoch);
     if (err < 0) {
       dout(3) << "mount journal " << journalfn << " open failed" << dendl;
       delete journal;
@@ -167,7 +167,7 @@ int Ebofs::mount()
 
   dout(3) << "mount starting commit+finisher threads" << dendl;
   commit_thread.create();
-  finisher_thread.create();
+  finisher.start();
 
   dout(1) << "mounted " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks())
          << (journal ? ", with journal":", no journal")
@@ -268,7 +268,7 @@ int Ebofs::mkfs()
 
   // create journal?
   if (journalfn) {
-    Journal *journal = new FileJournal(this, journalfn, g_conf.ebofs_journal_dio);
+    Journal *journal = new FileJournal(super_fsid, &finisher, journalfn, g_conf.ebofs_journal_dio);
     if (journal->create() < 0) {
       dout(3) << "mount journal " << journalfn << " created failed" << dendl;
     } else {
@@ -329,12 +329,7 @@ int Ebofs::umount()
 
   // kick finisher thread
   dout(5) << "umount stopping finisher thread" << dendl;
-  finisher_lock.Lock();
-  finisher_stop = true;
-  finisher_cond.Signal();
-  finisher_lock.Unlock();
-
-  finisher_thread.join();
+  finisher.stop();
 
   trim_bc(0);
   trim_inodes(0);
@@ -502,7 +497,7 @@ int Ebofs::commit_thread_entry()
        // --- queue up commit writes ---
        bc.poison_commit = false;
        if (journal) 
-         journal->commit_epoch_start();  // FIXME: make loopable
+         journal->commit_epoch_start(super_epoch);  // FIXME: make loopable
        commit_inodes_start();      // do this first; it currently involves inode reallocation
        allocator.commit_limbo();   // limbo -> limbo_tab
        nodepool.commit_start(dev, super_epoch);
@@ -560,7 +555,7 @@ int Ebofs::commit_thread_entry()
 
       // kick waiters
       dout(10) << "commit_thread queueing commit + kicking sync waiters" << dendl;
-      queue_finishers(commit_waiters[super_epoch-1]);
+      finisher.queue(commit_waiters[super_epoch-1]);
       commit_waiters.erase(super_epoch-1);
       sync_cond.Signal();
 
@@ -621,35 +616,6 @@ void Ebofs::alloc_more_node_space()
 }
 
 
-void *Ebofs::finisher_thread_entry()
-{
-  finisher_lock.Lock();
-  dout(10) << "finisher_thread start" << dendl;
-
-  while (!finisher_stop) {
-    while (!finisher_queue.empty()) {
-      list<Context*> ls;
-      ls.swap(finisher_queue);
-
-      finisher_lock.Unlock();
-
-      //ebofs_lock.Lock();            // um.. why lock this?  -sage
-      finish_contexts(ls, 0);
-      //ebofs_lock.Unlock();
-
-      finisher_lock.Lock();
-    }
-    if (finisher_stop) break;
-    
-    dout(30) << "finisher_thread sleeping" << dendl;
-    finisher_cond.Wait(finisher_lock);
-  }
-
-  dout(10) << "finisher_thread start" << dendl;
-  finisher_lock.Unlock();
-  return 0;
-}
-
 
 // *** onodes ***
 
@@ -1435,7 +1401,7 @@ void Ebofs::sync(Context *onsafe)
       Transaction t;
       bufferlist bl;
       t._encode(bl);
-      journal->submit_entry(bl, onsafe);
+      journal->submit_entry(super_epoch, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   }
@@ -2481,7 +2447,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
   if (journal) {
     bufferlist bl;
     t._encode(bl);
-    journal->submit_entry(bl, onsafe);
+    journal->submit_entry(super_epoch, bl, onsafe);
   } else
     queue_commit_waiter(onsafe);
 
@@ -2897,7 +2863,7 @@ int Ebofs::write(pobject_t oid,
       t.write(oid, off, len, bl);
       bufferlist tbl;
       t._encode(tbl);
-      journal->submit_entry(tbl, onsafe);
+      journal->submit_entry(super_epoch, tbl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   } else {
@@ -2923,7 +2889,7 @@ int Ebofs::zero(pobject_t oid, off_t off, size_t len, Context *onsafe)
       t.zero(oid, off, len);
       bufferlist tbl;
       t._encode(tbl);
-      journal->submit_entry(tbl, onsafe);
+      journal->submit_entry(super_epoch, tbl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   } else {
@@ -2964,7 +2930,7 @@ int Ebofs::remove(pobject_t oid, Context *onsafe)
       t.remove(oid);
       bufferlist bl;
       t._encode(bl);
-      journal->submit_entry(bl, onsafe);
+      journal->submit_entry(super_epoch, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   } else {
@@ -3047,7 +3013,7 @@ int Ebofs::truncate(pobject_t oid, off_t size, Context *onsafe)
       t.truncate(oid, size);
       bufferlist bl;
       t._encode(bl);
-      journal->submit_entry(bl, onsafe);
+      journal->submit_entry(super_epoch, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   } else {
@@ -3073,7 +3039,7 @@ int Ebofs::clone(pobject_t from, pobject_t to, Context *onsafe)
       t.clone(from, to);
       bufferlist bl;
       t._encode(bl);
-      journal->submit_entry(bl, onsafe);
+      journal->submit_entry(super_epoch, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   } else {
@@ -3257,7 +3223,7 @@ int Ebofs::setattr(pobject_t oid, const char *name, const void *value, size_t si
       t.setattr(oid, name, value, size);
       bufferlist bl;
       t._encode(bl);
-      journal->submit_entry(bl, onsafe);
+      journal->submit_entry(super_epoch, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   } else {
@@ -3297,7 +3263,7 @@ int Ebofs::setattrs(pobject_t oid, map<string,bufferptr>& attrset, Context *onsa
       t.setattrs(oid, attrset);
       bufferlist bl;
       t._encode(bl);
-      journal->submit_entry(bl, onsafe);
+      journal->submit_entry(super_epoch, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   } else {
@@ -3409,7 +3375,7 @@ int Ebofs::rmattr(pobject_t oid, const char *name, Context *onsafe)
       t.rmattr(oid, name);
       bufferlist bl;
       t._encode(bl);
-      journal->submit_entry(bl, onsafe);
+      journal->submit_entry(super_epoch, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   } else {
@@ -3512,7 +3478,7 @@ int Ebofs::create_collection(coll_t cid, Context *onsafe)
       t.create_collection(cid);
       bufferlist bl;
       t._encode(bl);
-      journal->submit_entry(bl, onsafe);
+      journal->submit_entry(super_epoch, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   } else {
@@ -3566,7 +3532,7 @@ int Ebofs::destroy_collection(coll_t cid, Context *onsafe)
       t.remove_collection(cid);
       bufferlist bl;
       t._encode(bl);
-      journal->submit_entry(bl, onsafe);
+      journal->submit_entry(super_epoch, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   } else {
@@ -3627,7 +3593,7 @@ int Ebofs::collection_add(coll_t cid, pobject_t oid, Context *onsafe)
       t.collection_add(cid, oid);
       bufferlist bl;
       t._encode(bl);
-      journal->submit_entry(bl, onsafe);
+      journal->submit_entry(super_epoch, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   } else {
@@ -3675,7 +3641,7 @@ int Ebofs::collection_remove(coll_t cid, pobject_t oid, Context *onsafe)
       t.collection_remove(cid, oid);
       bufferlist bl;
       t._encode(bl);
-      journal->submit_entry(bl, onsafe);
+      journal->submit_entry(super_epoch, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   } else {
@@ -3745,7 +3711,7 @@ int Ebofs::collection_setattr(coll_t cid, const char *name, const void *value, s
       t.collection_setattr(cid, name, value, size);
       bufferlist bl;
       t._encode(bl);
-      journal->submit_entry(bl, onsafe);
+      journal->submit_entry(super_epoch, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   } else {
@@ -3850,7 +3816,7 @@ int Ebofs::collection_rmattr(coll_t cid, const char *name, Context *onsafe)
       t.collection_rmattr(cid, name);
       bufferlist bl;
       t._encode(bl);
-      journal->submit_entry(bl, onsafe);
+      journal->submit_entry(super_epoch, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   } else {
index 216c1428baa342ca5637549575f57562982e55ab..10cb660e75ce2a51adfb3c6c963c1f7feed89485 100644 (file)
@@ -30,10 +30,11 @@ using namespace __gnu_cxx;
 #include "nodes.h"
 #include "Allocator.h"
 #include "Table.h"
-#include "Journal.h"
+#include "osd/Journal.h"
 
 #include "common/Mutex.h"
 #include "common/Cond.h"
+#include "common/Finisher.h"
 
 #include "osd/ObjectStore.h"
 
@@ -208,36 +209,7 @@ protected:
   int attempt_read(Onode *on, off_t off, size_t len, bufferlist& bl, 
                   Cond *will_wait_on, bool *will_wait_on_bool);
 
-  // ** finisher **
-  // async write notification to users
-  Mutex          finisher_lock;
-  Cond           finisher_cond;
-  bool           finisher_stop;
-  list<Context*> finisher_queue;
-
-public:
-  void queue_finisher(Context *c) {
-    finisher_lock.Lock();
-    finisher_queue.push_back(c);
-    finisher_cond.Signal();
-    finisher_lock.Unlock();
-  }
-  void queue_finishers(list<Context*>& ls) {
-    finisher_lock.Lock();
-    finisher_queue.splice(finisher_queue.end(), ls);
-    finisher_cond.Signal();
-    finisher_lock.Unlock();
-  }
-protected:
-
-  void *finisher_thread_entry();
-  class FinisherThread : public Thread {
-    Ebofs *ebofs;
-  public:
-    FinisherThread(Ebofs *e) : ebofs(e) {}
-    void* entry() { return (void*)ebofs->finisher_thread_entry(); }
-  } finisher_thread;
-
+  Finisher finisher;
 
   void alloc_more_node_space();
 
@@ -261,8 +233,7 @@ protected:
     cnode_lru(g_conf.ebofs_cc_size),
     inodes_flushing(0),
     bc(dev, ebofs_lock),
-    idle_kicker(this),
-    finisher_stop(false), finisher_thread(this) {
+    idle_kicker(this) {
     for (int i=0; i<EBOFS_NUM_FREE_BUCKETS; i++)
       free_tab[i] = 0;
     if (jfn) {
diff --git a/src/ebofs/FileJournal.cc b/src/ebofs/FileJournal.cc
deleted file mode 100644 (file)
index 35d16e9..0000000
+++ /dev/null
@@ -1,621 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-#include "FileJournal.h"
-#include "Ebofs.h"
-
-#include <stdio.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-
-
-#include "config.h"
-
-#define dout(x) if (x <= g_conf.debug_ebofs) *_dout << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal "
-#define derr(x) if (x <= g_conf.debug_ebofs) *_derr << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal "
-
-
-int FileJournal::_open(bool forwrite)
-{
-  int flags;
-
-  if (forwrite) {
-    flags = O_RDWR;
-    if (directio) flags |= O_DIRECT;
-  } else {
-    flags = O_RDONLY;
-  }
-  
-  if (fd >= 0) 
-    ::close(fd);
-  fd = ::open(fn.c_str(), flags);
-  if (fd < 0) {
-    dout(2) << "_open failed " << errno << " " << strerror(errno) << dendl;
-    return -errno;
-  }
-
-  // get size
-  struct stat st;
-  int r = ::fstat(fd, &st);
-  assert(r == 0);
-  max_size = st.st_size;
-  block_size = st.st_blksize;
-  dout(2) << "_open " << fn << " fd " << fd 
-         << ": " << st.st_size << " bytes, block size " << block_size << dendl;
-
-  return 0;
-}
-
-int FileJournal::create()
-{
-  dout(2) << "create " << fn << dendl;
-
-  int err = _open(true);
-  if (err < 0) return err;
-
-  // write empty header
-  memset(&header, 0, sizeof(header));
-  header.clear();
-  header.fsid = ebofs->get_fsid();
-  header.max_size = max_size;
-  header.block_size = block_size;
-  if (directio)
-    header.alignment = block_size;
-  else
-    header.alignment = 16;  // at least stay word aligned on 64bit machines...
-  print_header();
-
-  buffer::ptr bp = prepare_header();
-  int r = ::pwrite(fd, bp.c_str(), bp.length(), 0);
-  if (r < 0) {
-    dout(0) << "create write header error " << errno << " " << strerror(errno) << dendl;
-    return -errno;
-  }
-
-  ::close(fd);
-  fd = -1;
-  dout(2) << "create done" << dendl;
-  return 0;
-}
-
-int FileJournal::open()
-{
-  dout(2) << "open " << fn << dendl;
-
-  int err = _open(false);
-  if (err < 0) return err;
-
-  // assume writeable, unless...
-  read_pos = 0;
-  write_pos = get_top();
-
-  // read header?
-  read_header();
-  if (header.fsid != ebofs->get_fsid()) {
-    dout(2) << "open journal fsid doesn't match, invalid (someone else's?) journal" << dendl;
-    err = -EINVAL;
-  } 
-  if (header.max_size > max_size) {
-    dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl;
-    err = -EINVAL;
-  }
-  if (header.block_size != block_size) {
-    dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl;
-    err = -EINVAL;
-  }
-  if (header.alignment != block_size && directio) {
-    derr(0) << "open journal alignment " << header.alignment << " does not match block size " 
-           << block_size << " (required for direct_io journal mode)" << dendl;
-    err = -EINVAL;
-  }
-  if (err)
-    return err;
-
-  // looks like a valid header.
-  write_pos = 0;  // not writeable yet
-  read_pos = 0;
-
-  if (header.num > 0) {
-    // pick an offset
-    for (int i=0; i<header.num; i++) {
-      if (header.epoch[i] == ebofs->get_super_epoch()) {
-       dout(2) << "using read_pos header pointer "
-               << header.epoch[i] << " at " << header.offset[i]
-               << dendl;
-       read_pos = header.offset[i];
-       write_pos = 0;
-       break;
-      }      
-      else if (header.epoch[i] < ebofs->get_super_epoch()) {
-       dout(2) << "super_epoch is " << ebofs->get_super_epoch() 
-               << ", skipping old " << header.epoch[i] << " at " << header.offset[i]
-               << dendl;
-      }
-      else if (header.epoch[i] > ebofs->get_super_epoch()) {
-       dout(2) << "super_epoch is " << ebofs->get_super_epoch() 
-               << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i]
-               << dendl;
-       break;
-      }
-    }
-
-    if (read_pos == 0) {
-      dout(0) << "no valid journal segments" << dendl;
-      return -EINVAL;
-    }
-
-  } else {
-    dout(0) << "journal was empty" << dendl;
-    read_pos = get_top();
-  }
-
-  return 0;
-}
-
-void FileJournal::close()
-{
-  dout(1) << "close " << fn << dendl;
-
-  // stop writer thread
-  stop_writer();
-
-  // close
-  assert(writeq.empty());
-  assert(commitq.empty());
-  assert(fd > 0);
-  ::close(fd);
-  fd = -1;
-}
-
-void FileJournal::start_writer()
-{
-  write_stop = false;
-  write_thread.create();
-}
-
-void FileJournal::stop_writer()
-{
-  write_lock.Lock();
-  {
-    write_stop = true;
-    write_cond.Signal();
-  } 
-  write_lock.Unlock();
-  write_thread.join();
-}
-
-
-void FileJournal::print_header()
-{
-  for (int i=0; i<header.num; i++) {
-    if (i && header.offset[i] < header.offset[i-1]) {
-      assert(header.wrap);
-      dout(10) << "header: wrap at " << header.wrap << dendl;
-    }
-    dout(10) << "header: epoch " << header.epoch[i] << " at " << header.offset[i] << dendl;
-  }
-  //if (header.wrap) dout(10) << "header: wrap at " << header.wrap << dendl;
-}
-
-void FileJournal::read_header()
-{
-  int r;
-  dout(10) << "read_header" << dendl;
-  if (directio) {
-    buffer::ptr bp = buffer::create_page_aligned(block_size);
-    bp.zero();
-    r = ::pread(fd, bp.c_str(), bp.length(), 0);
-    memcpy(&header, bp.c_str(), sizeof(header));
-  } else {
-    memset(&header, 0, sizeof(header));  // zero out (read may fail)
-    r = ::pread(fd, &header, sizeof(header), 0);
-  }
-  if (r < 0) 
-    dout(0) << "read_header error " << errno << " " << strerror(errno) << dendl;
-  print_header();
-}
-
-bufferptr FileJournal::prepare_header()
-{
-  bufferptr bp;
-  if (directio) {
-    bp = buffer::create_page_aligned(block_size);
-    bp.zero();
-    memcpy(bp.c_str(), &header, sizeof(header));
-  } else {
-    bp = buffer::create(sizeof(header));
-    memcpy(bp.c_str(), &header, sizeof(header));
-  }
-  return bp;
-}
-
-
-
-
-void FileJournal::check_for_wrap(epoch_t epoch, off64_t pos, off64_t size)
-{
-  // epoch boundary?
-  dout(10) << "check_for_wrap epoch " << epoch << " last " << header.last_epoch() << " of " << header.num << dendl;
-  if (epoch > header.last_epoch()) {
-    dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl;
-    header.push(epoch, pos);
-    must_write_header = true;
-  }
-
-  // does it fit?
-  if (header.wrap) {
-    // we're wrapped.  don't overwrite ourselves.
-    if (pos + size >= header.offset[0]) {
-      dout(10) << "JOURNAL FULL (and wrapped), " << pos << "+" << size
-              << " >= " << header.offset[0]
-              << dendl;
-      full = true;
-      writeq.clear();
-      print_header();
-    }
-  } else {
-    // we haven't wrapped.  
-    if (pos + size >= header.max_size) {
-      // is there room if we wrap?
-      if (get_top() + size < header.offset[0]) {
-       // yes!
-       dout(10) << "wrapped from " << pos << " to " << get_top() << dendl;
-       header.wrap = pos;
-       pos = get_top();
-       header.push(ebofs->get_super_epoch(), pos);
-       must_write_header = true;
-      } else {
-       // no room.
-       dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << pos << "+" << size
-                << " >= " << header.max_size
-                << dendl;
-       full = true;
-       writeq.clear();
-      }
-    }
-  }
-}
-
-
-void FileJournal::prepare_multi_write(bufferlist& bl)
-{
-  // gather queued writes
-  off64_t queue_pos = write_pos;
-
-  int eleft = g_conf.ebofs_journal_max_write_entries;
-  int bleft = g_conf.ebofs_journal_max_write_bytes;
-
-  while (!writeq.empty()) {
-    // grab next item
-    epoch_t epoch = writeq.front().first;
-    bufferlist &ebl = writeq.front().second;
-    off64_t size = 2*sizeof(entry_header_t) + ebl.length();
-
-    if (bl.length() > 0 && bleft > 0 && bleft < size) break;
-    
-    check_for_wrap(epoch, queue_pos, size);
-    if (full) break;
-    if (bl.length() && must_write_header) 
-      break;
-    
-    // add to write buffer
-    dout(15) << "prepare_multi_write will write " << queue_pos << " : " 
-            << ebl.length() << " epoch " << epoch << " -> " << size << dendl;
-    
-    // add it this entry
-    entry_header_t h;
-    h.epoch = epoch;
-    h.len = ebl.length();
-    h.make_magic(queue_pos, header.fsid);
-    bl.append((const char*)&h, sizeof(h));
-    bl.claim_append(ebl);
-    bl.append((const char*)&h, sizeof(h));
-    
-    Context *oncommit = commitq.front();
-    if (oncommit)
-      writingq.push_back(oncommit);
-    
-    // pop from writeq
-    writeq.pop_front();
-    commitq.pop_front();
-
-    queue_pos += size;
-    if (--eleft == 0) break;
-    bleft -= size;
-    if (bleft == 0) break;
-  }
-}
-
-bool FileJournal::prepare_single_dio_write(bufferlist& bl)
-{
-  // grab next item
-  epoch_t epoch = writeq.front().first;
-  bufferlist &ebl = writeq.front().second;
-    
-  off64_t size = 2*sizeof(entry_header_t) + ebl.length();
-  size = ROUND_UP_TO(size, header.alignment);
-  
-  check_for_wrap(epoch, write_pos, size);
-  if (full) return false;
-
-  // build it
-  dout(15) << "prepare_single_dio_write will write " << write_pos << " : " 
-          << ebl.length() << " epoch " << epoch << " -> " << size << dendl;
-
-  bufferptr bp = buffer::create_page_aligned(size);
-  entry_header_t *h = (entry_header_t*)bp.c_str();
-  h->epoch = epoch;
-  h->len = ebl.length();
-  h->make_magic(write_pos, header.fsid);
-  ebl.copy(0, ebl.length(), bp.c_str()+sizeof(*h));
-  memcpy(bp.c_str() + sizeof(*h) + ebl.length(), h, sizeof(*h));
-  bl.push_back(bp);
-  
-  Context *oncommit = commitq.front();
-  if (oncommit)
-    writingq.push_back(oncommit);
-  
-  // pop from writeq
-  writeq.pop_front();
-  commitq.pop_front();
-  return true;
-}
-
-void FileJournal::do_write(bufferlist& bl)
-{
-  // nothing to do?
-  if (bl.length() == 0 && !must_write_header) 
-    return;
-
-  buffer::ptr hbp;
-  if (must_write_header) 
-    hbp = prepare_header();
-
-  writing = true;
-
-  header_t old_header = header;
-
-  write_lock.Unlock();
-
-  dout(15) << "do_write writing " << write_pos << "~" << bl.length() 
-          << (must_write_header ? " + header":"")
-          << dendl;
-  
-  // header
-  if (hbp.length())
-    ::pwrite(fd, hbp.c_str(), hbp.length(), 0);
-  
-  // entry
-  off64_t pos = write_pos;
-  ::lseek64(fd, write_pos, SEEK_SET);
-  for (list<bufferptr>::const_iterator it = bl.buffers().begin();
-       it != bl.buffers().end();
-       it++) {
-    if ((*it).length() == 0) continue;  // blank buffer.
-    int r = ::write(fd, (char*)(*it).c_str(), (*it).length());
-    if (r < 0)
-      derr(0) << "do_write failed with " << errno << " " << strerror(errno) 
-             << " with " << (void*)(*it).c_str() << " len " << (*it).length()
-             << dendl;
-    pos += (*it).length();
-  }
-  if (!directio)
-    ::fdatasync(fd);
-      
-  write_lock.Lock();    
-
-  writing = false;
-  if (memcmp(&old_header, &header, sizeof(header)) == 0) {
-    write_pos += bl.length();
-    write_pos = ROUND_UP_TO(write_pos, header.alignment);
-    ebofs->queue_finishers(writingq);
-  } else {
-    dout(10) << "do_write finished write but header changed?  not moving write_pos." << dendl;
-    derr(0) << "do_write finished write but header changed?  not moving write_pos." << dendl;
-    assert(writingq.empty());
-  }
-}
-
-
-void FileJournal::write_thread_entry()
-{
-  dout(10) << "write_thread_entry start" << dendl;
-  write_lock.Lock();
-  
-  while (!write_stop) {
-    if (writeq.empty()) {
-      // sleep
-      dout(20) << "write_thread_entry going to sleep" << dendl;
-      write_cond.Wait(write_lock);
-      dout(20) << "write_thread_entry woke up" << dendl;
-      continue;
-    }
-    
-    bufferlist bl;
-    must_write_header = false;
-    if (directio)
-      prepare_single_dio_write(bl);
-    else
-      prepare_multi_write(bl);
-    do_write(bl);
-  }
-
-  write_lock.Unlock();
-  dout(10) << "write_thread_entry finish" << dendl;
-}
-
-
-bool FileJournal::is_full()
-{
-  Mutex::Locker locker(write_lock);
-  return full;
-}
-
-void FileJournal::submit_entry(bufferlist& e, Context *oncommit)
-{
-  Mutex::Locker locker(write_lock);  // ** lock **
-
-  // dump on queue
-  dout(10) << "submit_entry " << e.length()
-          << " epoch " << ebofs->get_super_epoch()
-          << " " << oncommit << dendl;
-  commitq.push_back(oncommit);
-  if (!full) {
-    writeq.push_back(pair<epoch_t,bufferlist>(ebofs->get_super_epoch(), e));
-    write_cond.Signal(); // kick writer thread
-  }
-}
-
-
-void FileJournal::commit_epoch_start()
-{
-  dout(10) << "commit_epoch_start on " << ebofs->get_super_epoch()-1 
-          << " -- new epoch " << ebofs->get_super_epoch()
-          << dendl;
-
-  Mutex::Locker locker(write_lock);
-
-  // was full -> empty -> now usable?
-  if (full) {
-    if (header.num != 0) {
-      dout(1) << " journal FULL, ignoring this epoch" << dendl;
-      return;
-    }
-
-    dout(1) << " clearing FULL flag, journal now usable" << dendl;
-    full = false;
-  } 
-}
-
-void FileJournal::commit_epoch_finish(epoch_t new_epoch)
-{
-  dout(10) << "commit_epoch_finish committed " << (new_epoch-1) << dendl;
-
-  Mutex::Locker locker(write_lock);
-  
-  if (full) {
-    // full journal damage control.
-    dout(15) << " journal was FULL, contents now committed, clearing header.  journal still not usable until next epoch." << dendl;
-    header.clear();
-    write_pos = get_top();
-  } else {
-    // update header -- trim/discard old (committed) epochs
-    print_header();
-    while (header.num && header.epoch[0] < new_epoch) {
-      dout(10) << " popping epoch " << header.epoch[0] << " < " << new_epoch << dendl;
-      header.pop();
-    }
-    if (header.num == 0) {
-      dout(10) << " starting fresh" << dendl;
-      write_pos = get_top();
-      header.push(new_epoch, write_pos);
-    }
-  }
-  must_write_header = true;
-  
-  // discard any unwritten items in previous epoch
-  while (!writeq.empty() && writeq.front().first < new_epoch) {
-    dout(15) << " dropping unwritten and committed " 
-            << write_pos << " : " << writeq.front().second.length()
-            << " epoch " << writeq.front().first 
-            << dendl;
-    // finisher?
-    Context *oncommit = commitq.front();
-    if (oncommit) writingq.push_back(oncommit);
-    
-    // discard.
-    writeq.pop_front();  
-    commitq.pop_front();
-  }
-  
-  // queue the finishers
-  ebofs->queue_finishers(writingq);
-  dout(10) << "commit_epoch_finish done" << dendl;
-}
-
-
-void FileJournal::make_writeable()
-{
-  _open(true);
-
-  if (read_pos)
-    write_pos = read_pos;
-  else
-    write_pos = get_top();
-  read_pos = 0;
-
-  must_write_header = true;
-  start_writer();
-}
-
-
-bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch)
-{
-  if (!read_pos) {
-    dout(2) << "read_entry -- not readable" << dendl;
-    return false;
-  }
-
-  if (read_pos == header.wrap) {
-    // find wrap point
-    for (int i=1; i<header.num; i++) {
-      if (header.offset[i] < read_pos) {
-       assert(header.offset[i-1] < read_pos);
-       read_pos = header.offset[i];
-       break;
-      }
-    }
-    assert(read_pos != header.wrap);
-    dout(10) << "read_entry wrapped from " << header.wrap << " to " << read_pos << dendl;
-  }
-
-  // header
-  entry_header_t h;
-  ::lseek64(fd, read_pos, SEEK_SET);
-  ::read(fd, &h, sizeof(h));
-  if (!h.check_magic(read_pos, header.fsid)) {
-    dout(2) << "read_entry " << read_pos << " : bad header magic, end of journal" << dendl;
-    return false;
-  }
-
-  // body
-  bufferptr bp(h.len);
-  ::read(fd, bp.c_str(), h.len);
-
-  // footer
-  entry_header_t f;
-  ::read(fd, &f, sizeof(h));
-  if (!f.check_magic(read_pos, header.fsid) ||
-      h.epoch != f.epoch ||
-      h.len != f.len) {
-    dout(2) << "read_entry " << read_pos << " : bad footer magic, partial entry, end of journal" << dendl;
-    return false;
-  }
-
-
-  // yay!
-  dout(1) << "read_entry " << read_pos << " : " 
-         << " " << h.len << " bytes"
-         << " epoch " << h.epoch 
-         << dendl;
-  
-  bl.push_back(bp);
-  epoch = h.epoch;
-
-  read_pos += 2*sizeof(entry_header_t) + h.len;
-  read_pos = ROUND_UP_TO(read_pos, header.alignment);
-
-  return true;
-}
diff --git a/src/ebofs/FileJournal.h b/src/ebofs/FileJournal.h
deleted file mode 100644 (file)
index 03c3363..0000000
+++ /dev/null
@@ -1,179 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#ifndef __EBOFS_FILEJOURNAL_H
-#define __EBOFS_FILEJOURNAL_H
-
-
-#include "Journal.h"
-#include "common/Cond.h"
-#include "common/Mutex.h"
-#include "common/Thread.h"
-
-class FileJournal : public Journal {
-public:
-  /** log header
-   * we allow 4 pointers:
-   *  top/initial,
-   *  one for an epoch boundary (if any),
-   *  one for a wrap in the ring buffer/journal file,
-   *  one for a second epoch boundary (if any).
-   * the epoch boundary one is useful only for speedier recovery in certain cases
-   * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!)
-   */
-  struct header_t {
-    __u64 fsid;
-    __s64 num;
-    __u32 block_size;
-    __u32 alignment;
-    __s64 max_size;
-    __s64 wrap;
-    __u32 epoch[4];
-    __s64 offset[4];
-
-    header_t() : fsid(0), num(0), block_size(0), alignment(0), max_size(0), wrap(0) {}
-
-    void clear() {
-      num = 0;
-      wrap = 0;
-    }
-    void pop() {
-      if (num >= 2 && offset[0] > offset[1]) 
-       wrap = 0;  // we're eliminating a wrap
-      num--;
-      for (int i=0; i<num; i++) {
-       epoch[i] = epoch[i+1];
-       offset[i] = offset[i+1];
-      }
-    }
-    void push(epoch_t e, off64_t o) {
-      assert(num < 4);
-      if (num > 2 && 
-         epoch[num-1] == e &&
-         epoch[num-2] == (e-1)) 
-       num--;  // tail was an epoch boundary; replace it.
-      epoch[num] = e;
-      offset[num] = o;
-      num++;
-    }
-    epoch_t last_epoch() {
-      if (num)
-       return epoch[num-1];
-      else
-       return 0;
-    }
-  } header;
-
-  struct entry_header_t {
-    uint64_t epoch;
-    uint64_t len;
-    uint64_t magic1;
-    uint64_t magic2;
-    
-    void make_magic(off64_t pos, uint64_t fsid) {
-      magic1 = pos;
-      magic2 = fsid ^ epoch ^ len;
-    }
-    bool check_magic(off64_t pos, uint64_t fsid) {
-      return
-       magic1 == (uint64_t)pos &&
-       magic2 == (fsid ^ epoch ^ len);
-    }
-  };
-
-private:
-  string fn;
-
-  off64_t max_size;
-  size_t block_size;
-  bool directio;
-  bool full, writing, must_write_header;
-  off64_t write_pos;      // byte where next entry written goes
-  off64_t read_pos;       // 
-
-  int fd;
-
-  // to be journaled
-  list<pair<epoch_t,bufferlist> > writeq;
-  list<Context*> commitq;
-
-  // being journaled
-  list<Context*> writingq;
-  
-  // write thread
-  Mutex write_lock;
-  Cond write_cond;
-  bool write_stop;
-
-  int _open(bool wr);
-  void print_header();
-  void read_header();
-  bufferptr prepare_header();
-  void start_writer();
-  void stop_writer();
-  void write_thread_entry();
-
-  void check_for_wrap(epoch_t epoch, off64_t pos, off64_t size);
-  bool prepare_single_dio_write(bufferlist& bl);
-  void prepare_multi_write(bufferlist& bl);
-  void do_write(bufferlist& bl);
-
-  class Writer : public Thread {
-    FileJournal *journal;
-  public:
-    Writer(FileJournal *fj) : journal(fj) {}
-    void *entry() {
-      journal->write_thread_entry();
-      return 0;
-    }
-  } write_thread;
-
-  off64_t get_top() {
-    if (directio)
-      return block_size;
-    else
-      return sizeof(header);
-  }
-
- public:
-  FileJournal(Ebofs *e, const char *f, bool dio=false) : 
-    Journal(e), fn(f),
-    max_size(0), block_size(0),
-    directio(dio),
-    full(false), writing(false), must_write_header(false),
-    write_pos(0), read_pos(0),
-    fd(-1),
-    write_stop(false), write_thread(this) { }
-  ~FileJournal() {}
-
-  int create();
-  int open();
-  void close();
-
-  void make_writeable();
-
-  // writes
-  void submit_entry(bufferlist& e, Context *oncommit);  // submit an item
-  void commit_epoch_start();   // mark epoch boundary
-  void commit_epoch_finish(epoch_t);  // mark prior epoch as committed (we can expire)
-
-  bool read_entry(bufferlist& bl, epoch_t& e);
-
-  bool is_full();
-
-  // reads
-};
-
-#endif
diff --git a/src/ebofs/Journal.h b/src/ebofs/Journal.h
deleted file mode 100644 (file)
index 6e50e83..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#ifndef __EBOFS_JOURNAL_H
-#define __EBOFS_JOURNAL_H
-
-class Ebofs;
-
-#include "include/buffer.h"
-#include "include/Context.h"
-
-class Journal {
-protected:
-  Ebofs *ebofs;
-
-public:
-  Journal(Ebofs *e) : ebofs(e) { }
-  virtual ~Journal() { }
-
-  virtual int create() = 0;
-  virtual int open() = 0;
-  virtual void close() = 0;
-
-  // writes
-  virtual void make_writeable() = 0;
-  virtual void submit_entry(bufferlist& e, Context *oncommit) = 0;
-  virtual void commit_epoch_start() = 0;  // mark epoch boundary
-  virtual void commit_epoch_finish(epoch_t) = 0; // mark prior epoch as committed (we can expire)
-  virtual bool read_entry(bufferlist& bl, epoch_t &e) = 0;
-  virtual bool is_full() = 0;
-
-  // reads/recovery
-  
-};
-
-#endif
diff --git a/src/osd/FileJournal.cc b/src/osd/FileJournal.cc
new file mode 100644 (file)
index 0000000..f5062a4
--- /dev/null
@@ -0,0 +1,620 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include "FileJournal.h"
+
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+
+#include "config.h"
+
+#define dout(x) if (x <= g_conf.debug_ebofs) *_dout << dbeginl << g_clock.now() << " ebofs(" << fn << ").journal "
+#define derr(x) if (x <= g_conf.debug_ebofs) *_derr << dbeginl << g_clock.now() << " ebofs(" << fn << ").journal "
+
+
+int FileJournal::_open(bool forwrite)
+{
+  int flags;
+
+  if (forwrite) {
+    flags = O_RDWR;
+    if (directio) flags |= O_DIRECT;
+  } else {
+    flags = O_RDONLY;
+  }
+  
+  if (fd >= 0) 
+    ::close(fd);
+  fd = ::open(fn.c_str(), flags);
+  if (fd < 0) {
+    dout(2) << "_open failed " << errno << " " << strerror(errno) << dendl;
+    return -errno;
+  }
+
+  // get size
+  struct stat st;
+  int r = ::fstat(fd, &st);
+  assert(r == 0);
+  max_size = st.st_size;
+  block_size = st.st_blksize;
+  dout(2) << "_open " << fn << " fd " << fd 
+         << ": " << st.st_size << " bytes, block size " << block_size << dendl;
+
+  return 0;
+}
+
+int FileJournal::create()
+{
+  dout(2) << "create " << fn << dendl;
+
+  int err = _open(true);
+  if (err < 0) return err;
+
+  // write empty header
+  memset(&header, 0, sizeof(header));
+  header.clear();
+  header.fsid = fsid;
+  header.max_size = max_size;
+  header.block_size = block_size;
+  if (directio)
+    header.alignment = block_size;
+  else
+    header.alignment = 16;  // at least stay word aligned on 64bit machines...
+  print_header();
+
+  buffer::ptr bp = prepare_header();
+  int r = ::pwrite(fd, bp.c_str(), bp.length(), 0);
+  if (r < 0) {
+    dout(0) << "create write header error " << errno << " " << strerror(errno) << dendl;
+    return -errno;
+  }
+
+  ::close(fd);
+  fd = -1;
+  dout(2) << "create done" << dendl;
+  return 0;
+}
+
+int FileJournal::open(epoch_t epoch)
+{
+  dout(2) << "open " << fn << dendl;
+
+  int err = _open(false);
+  if (err < 0) return err;
+
+  // assume writeable, unless...
+  read_pos = 0;
+  write_pos = get_top();
+
+  // read header?
+  read_header();
+  if (header.fsid != fsid) {
+    dout(2) << "open journal fsid doesn't match, invalid (someone else's?) journal" << dendl;
+    err = -EINVAL;
+  } 
+  if (header.max_size > max_size) {
+    dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl;
+    err = -EINVAL;
+  }
+  if (header.block_size != block_size) {
+    dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl;
+    err = -EINVAL;
+  }
+  if (header.alignment != block_size && directio) {
+    derr(0) << "open journal alignment " << header.alignment << " does not match block size " 
+           << block_size << " (required for direct_io journal mode)" << dendl;
+    err = -EINVAL;
+  }
+  if (err)
+    return err;
+
+  // looks like a valid header.
+  write_pos = 0;  // not writeable yet
+  read_pos = 0;
+
+  if (header.num > 0) {
+    // pick an offset
+    for (int i=0; i<header.num; i++) {
+      if (header.epoch[i] == epoch) {
+       dout(2) << "using read_pos header pointer "
+               << header.epoch[i] << " at " << header.offset[i]
+               << dendl;
+       read_pos = header.offset[i];
+       write_pos = 0;
+       break;
+      }      
+      else if (header.epoch[i] < epoch) {
+       dout(2) << "super_epoch is " << epoch 
+               << ", skipping old " << header.epoch[i] << " at " << header.offset[i]
+               << dendl;
+      }
+      else if (header.epoch[i] > epoch) {
+       dout(2) << "super_epoch is " << epoch 
+               << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i]
+               << dendl;
+       break;
+      }
+    }
+
+    if (read_pos == 0) {
+      dout(0) << "no valid journal segments" << dendl;
+      return -EINVAL;
+    }
+
+  } else {
+    dout(0) << "journal was empty" << dendl;
+    read_pos = get_top();
+  }
+
+  return 0;
+}
+
+void FileJournal::close()
+{
+  dout(1) << "close " << fn << dendl;
+
+  // stop writer thread
+  stop_writer();
+
+  // close
+  assert(writeq.empty());
+  assert(commitq.empty());
+  assert(fd > 0);
+  ::close(fd);
+  fd = -1;
+}
+
+void FileJournal::start_writer()
+{
+  write_stop = false;
+  write_thread.create();
+}
+
+void FileJournal::stop_writer()
+{
+  write_lock.Lock();
+  {
+    write_stop = true;
+    write_cond.Signal();
+  } 
+  write_lock.Unlock();
+  write_thread.join();
+}
+
+
+void FileJournal::print_header()
+{
+  for (int i=0; i<header.num; i++) {
+    if (i && header.offset[i] < header.offset[i-1]) {
+      assert(header.wrap);
+      dout(10) << "header: wrap at " << header.wrap << dendl;
+    }
+    dout(10) << "header: epoch " << header.epoch[i] << " at " << header.offset[i] << dendl;
+  }
+  //if (header.wrap) dout(10) << "header: wrap at " << header.wrap << dendl;
+}
+
+void FileJournal::read_header()
+{
+  int r;
+  dout(10) << "read_header" << dendl;
+  if (directio) {
+    buffer::ptr bp = buffer::create_page_aligned(block_size);
+    bp.zero();
+    r = ::pread(fd, bp.c_str(), bp.length(), 0);
+    memcpy(&header, bp.c_str(), sizeof(header));
+  } else {
+    memset(&header, 0, sizeof(header));  // zero out (read may fail)
+    r = ::pread(fd, &header, sizeof(header), 0);
+  }
+  if (r < 0) 
+    dout(0) << "read_header error " << errno << " " << strerror(errno) << dendl;
+  print_header();
+}
+
+bufferptr FileJournal::prepare_header()
+{
+  bufferptr bp;
+  if (directio) {
+    bp = buffer::create_page_aligned(block_size);
+    bp.zero();
+    memcpy(bp.c_str(), &header, sizeof(header));
+  } else {
+    bp = buffer::create(sizeof(header));
+    memcpy(bp.c_str(), &header, sizeof(header));
+  }
+  return bp;
+}
+
+
+
+
+void FileJournal::check_for_wrap(epoch_t epoch, off64_t pos, off64_t size)
+{
+  // epoch boundary?
+  dout(10) << "check_for_wrap epoch " << epoch << " last " << header.last_epoch() << " of " << header.num << dendl;
+  if (epoch > header.last_epoch()) {
+    dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl;
+    header.push(epoch, pos);
+    must_write_header = true;
+  }
+
+  // does it fit?
+  if (header.wrap) {
+    // we're wrapped.  don't overwrite ourselves.
+    if (pos + size >= header.offset[0]) {
+      dout(10) << "JOURNAL FULL (and wrapped), " << pos << "+" << size
+              << " >= " << header.offset[0]
+              << dendl;
+      full = true;
+      writeq.clear();
+      print_header();
+    }
+  } else {
+    // we haven't wrapped.  
+    if (pos + size >= header.max_size) {
+      // is there room if we wrap?
+      if (get_top() + size < header.offset[0]) {
+       // yes!
+       dout(10) << "wrapped from " << pos << " to " << get_top() << dendl;
+       header.wrap = pos;
+       pos = get_top();
+       header.push(epoch, pos);
+       must_write_header = true;
+      } else {
+       // no room.
+       dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << pos << "+" << size
+                << " >= " << header.max_size
+                << dendl;
+       full = true;
+       writeq.clear();
+      }
+    }
+  }
+}
+
+
+void FileJournal::prepare_multi_write(bufferlist& bl)
+{
+  // gather queued writes
+  off64_t queue_pos = write_pos;
+
+  int eleft = g_conf.ebofs_journal_max_write_entries;
+  int bleft = g_conf.ebofs_journal_max_write_bytes;
+
+  while (!writeq.empty()) {
+    // grab next item
+    epoch_t epoch = writeq.front().first;
+    bufferlist &ebl = writeq.front().second;
+    off64_t size = 2*sizeof(entry_header_t) + ebl.length();
+
+    if (bl.length() > 0 && bleft > 0 && bleft < size) break;
+    
+    check_for_wrap(epoch, queue_pos, size);
+    if (full) break;
+    if (bl.length() && must_write_header) 
+      break;
+    
+    // add to write buffer
+    dout(15) << "prepare_multi_write will write " << queue_pos << " : " 
+            << ebl.length() << " epoch " << epoch << " -> " << size << dendl;
+    
+    // add it this entry
+    entry_header_t h;
+    h.epoch = epoch;
+    h.len = ebl.length();
+    h.make_magic(queue_pos, header.fsid);
+    bl.append((const char*)&h, sizeof(h));
+    bl.claim_append(ebl);
+    bl.append((const char*)&h, sizeof(h));
+    
+    Context *oncommit = commitq.front();
+    if (oncommit)
+      writingq.push_back(oncommit);
+    
+    // pop from writeq
+    writeq.pop_front();
+    commitq.pop_front();
+
+    queue_pos += size;
+    if (--eleft == 0) break;
+    bleft -= size;
+    if (bleft == 0) break;
+  }
+}
+
+bool FileJournal::prepare_single_dio_write(bufferlist& bl)
+{
+  // grab next item
+  epoch_t epoch = writeq.front().first;
+  bufferlist &ebl = writeq.front().second;
+    
+  off64_t size = 2*sizeof(entry_header_t) + ebl.length();
+  size = ROUND_UP_TO(size, header.alignment);
+  
+  check_for_wrap(epoch, write_pos, size);
+  if (full) return false;
+
+  // build it
+  dout(15) << "prepare_single_dio_write will write " << write_pos << " : " 
+          << ebl.length() << " epoch " << epoch << " -> " << size << dendl;
+
+  bufferptr bp = buffer::create_page_aligned(size);
+  entry_header_t *h = (entry_header_t*)bp.c_str();
+  h->epoch = epoch;
+  h->len = ebl.length();
+  h->make_magic(write_pos, header.fsid);
+  ebl.copy(0, ebl.length(), bp.c_str()+sizeof(*h));
+  memcpy(bp.c_str() + sizeof(*h) + ebl.length(), h, sizeof(*h));
+  bl.push_back(bp);
+  
+  Context *oncommit = commitq.front();
+  if (oncommit)
+    writingq.push_back(oncommit);
+  
+  // pop from writeq
+  writeq.pop_front();
+  commitq.pop_front();
+  return true;
+}
+
+void FileJournal::do_write(bufferlist& bl)
+{
+  // nothing to do?
+  if (bl.length() == 0 && !must_write_header) 
+    return;
+
+  buffer::ptr hbp;
+  if (must_write_header) 
+    hbp = prepare_header();
+
+  writing = true;
+
+  header_t old_header = header;
+
+  write_lock.Unlock();
+
+  dout(15) << "do_write writing " << write_pos << "~" << bl.length() 
+          << (must_write_header ? " + header":"")
+          << dendl;
+  
+  // header
+  if (hbp.length())
+    ::pwrite(fd, hbp.c_str(), hbp.length(), 0);
+  
+  // entry
+  off64_t pos = write_pos;
+  ::lseek64(fd, write_pos, SEEK_SET);
+  for (list<bufferptr>::const_iterator it = bl.buffers().begin();
+       it != bl.buffers().end();
+       it++) {
+    if ((*it).length() == 0) continue;  // blank buffer.
+    int r = ::write(fd, (char*)(*it).c_str(), (*it).length());
+    if (r < 0)
+      derr(0) << "do_write failed with " << errno << " " << strerror(errno) 
+             << " with " << (void*)(*it).c_str() << " len " << (*it).length()
+             << dendl;
+    pos += (*it).length();
+  }
+  if (!directio)
+    ::fdatasync(fd);
+      
+  write_lock.Lock();    
+
+  writing = false;
+  if (memcmp(&old_header, &header, sizeof(header)) == 0) {
+    write_pos += bl.length();
+    write_pos = ROUND_UP_TO(write_pos, header.alignment);
+    finisher->queue(writingq);
+  } else {
+    dout(10) << "do_write finished write but header changed?  not moving write_pos." << dendl;
+    derr(0) << "do_write finished write but header changed?  not moving write_pos." << dendl;
+    assert(writingq.empty());
+  }
+}
+
+
+void FileJournal::write_thread_entry()
+{
+  dout(10) << "write_thread_entry start" << dendl;
+  write_lock.Lock();
+  
+  while (!write_stop) {
+    if (writeq.empty()) {
+      // sleep
+      dout(20) << "write_thread_entry going to sleep" << dendl;
+      write_cond.Wait(write_lock);
+      dout(20) << "write_thread_entry woke up" << dendl;
+      continue;
+    }
+    
+    bufferlist bl;
+    must_write_header = false;
+    if (directio)
+      prepare_single_dio_write(bl);
+    else
+      prepare_multi_write(bl);
+    do_write(bl);
+  }
+
+  write_lock.Unlock();
+  dout(10) << "write_thread_entry finish" << dendl;
+}
+
+
+bool FileJournal::is_full()
+{
+  Mutex::Locker locker(write_lock);
+  return full;
+}
+
+void FileJournal::submit_entry(epoch_t epoch, bufferlist& e, Context *oncommit)
+{
+  Mutex::Locker locker(write_lock);  // ** lock **
+
+  // dump on queue
+  dout(10) << "submit_entry " << e.length()
+          << " epoch " << epoch
+          << " " << oncommit << dendl;
+  commitq.push_back(oncommit);
+  if (!full) {
+    writeq.push_back(pair<epoch_t,bufferlist>(epoch, e));
+    write_cond.Signal(); // kick writer thread
+  }
+}
+
+
+void FileJournal::commit_epoch_start(epoch_t new_epoch)
+{
+  dout(10) << "commit_epoch_start on " << new_epoch-1 
+          << " -- new epoch " << new_epoch
+          << dendl;
+
+  Mutex::Locker locker(write_lock);
+
+  // was full -> empty -> now usable?
+  if (full) {
+    if (header.num != 0) {
+      dout(1) << " journal FULL, ignoring this epoch" << dendl;
+      return;
+    }
+
+    dout(1) << " clearing FULL flag, journal now usable" << dendl;
+    full = false;
+  } 
+}
+
+void FileJournal::commit_epoch_finish(epoch_t new_epoch)
+{
+  dout(10) << "commit_epoch_finish committed " << (new_epoch-1) << dendl;
+
+  Mutex::Locker locker(write_lock);
+  
+  if (full) {
+    // full journal damage control.
+    dout(15) << " journal was FULL, contents now committed, clearing header.  journal still not usable until next epoch." << dendl;
+    header.clear();
+    write_pos = get_top();
+  } else {
+    // update header -- trim/discard old (committed) epochs
+    print_header();
+    while (header.num && header.epoch[0] < new_epoch) {
+      dout(10) << " popping epoch " << header.epoch[0] << " < " << new_epoch << dendl;
+      header.pop();
+    }
+    if (header.num == 0) {
+      dout(10) << " starting fresh" << dendl;
+      write_pos = get_top();
+      header.push(new_epoch, write_pos);
+    }
+  }
+  must_write_header = true;
+  
+  // discard any unwritten items in previous epoch
+  while (!writeq.empty() && writeq.front().first < new_epoch) {
+    dout(15) << " dropping unwritten and committed " 
+            << write_pos << " : " << writeq.front().second.length()
+            << " epoch " << writeq.front().first 
+            << dendl;
+    // finisher?
+    Context *oncommit = commitq.front();
+    if (oncommit) writingq.push_back(oncommit);
+    
+    // discard.
+    writeq.pop_front();  
+    commitq.pop_front();
+  }
+  
+  // queue the finishers
+  finisher->queue(writingq);
+  dout(10) << "commit_epoch_finish done" << dendl;
+}
+
+
+void FileJournal::make_writeable()
+{
+  _open(true);
+
+  if (read_pos)
+    write_pos = read_pos;
+  else
+    write_pos = get_top();
+  read_pos = 0;
+
+  must_write_header = true;
+  start_writer();
+}
+
+
+bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch)
+{
+  if (!read_pos) {
+    dout(2) << "read_entry -- not readable" << dendl;
+    return false;
+  }
+
+  if (read_pos == header.wrap) {
+    // find wrap point
+    for (int i=1; i<header.num; i++) {
+      if (header.offset[i] < read_pos) {
+       assert(header.offset[i-1] < read_pos);
+       read_pos = header.offset[i];
+       break;
+      }
+    }
+    assert(read_pos != header.wrap);
+    dout(10) << "read_entry wrapped from " << header.wrap << " to " << read_pos << dendl;
+  }
+
+  // header
+  entry_header_t h;
+  ::lseek64(fd, read_pos, SEEK_SET);
+  ::read(fd, &h, sizeof(h));
+  if (!h.check_magic(read_pos, header.fsid)) {
+    dout(2) << "read_entry " << read_pos << " : bad header magic, end of journal" << dendl;
+    return false;
+  }
+
+  // body
+  bufferptr bp(h.len);
+  ::read(fd, bp.c_str(), h.len);
+
+  // footer
+  entry_header_t f;
+  ::read(fd, &f, sizeof(h));
+  if (!f.check_magic(read_pos, header.fsid) ||
+      h.epoch != f.epoch ||
+      h.len != f.len) {
+    dout(2) << "read_entry " << read_pos << " : bad footer magic, partial entry, end of journal" << dendl;
+    return false;
+  }
+
+
+  // yay!
+  dout(1) << "read_entry " << read_pos << " : " 
+         << " " << h.len << " bytes"
+         << " epoch " << h.epoch 
+         << dendl;
+  
+  bl.push_back(bp);
+  epoch = h.epoch;
+
+  read_pos += 2*sizeof(entry_header_t) + h.len;
+  read_pos = ROUND_UP_TO(read_pos, header.alignment);
+
+  return true;
+}
diff --git a/src/osd/FileJournal.h b/src/osd/FileJournal.h
new file mode 100644 (file)
index 0000000..6e108f2
--- /dev/null
@@ -0,0 +1,179 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __EBOFS_FILEJOURNAL_H
+#define __EBOFS_FILEJOURNAL_H
+
+
+#include "Journal.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/Thread.h"
+
+class FileJournal : public Journal {
+public:
+  /** log header
+   * we allow 4 pointers:
+   *  top/initial,
+   *  one for an epoch boundary (if any),
+   *  one for a wrap in the ring buffer/journal file,
+   *  one for a second epoch boundary (if any).
+   * the epoch boundary one is useful only for speedier recovery in certain cases
+   * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!)
+   */
+  struct header_t {
+    __u64 fsid;
+    __s64 num;
+    __u32 block_size;
+    __u32 alignment;
+    __s64 max_size;
+    __s64 wrap;
+    __u32 epoch[4];
+    __s64 offset[4];
+
+    header_t() : fsid(0), num(0), block_size(0), alignment(0), max_size(0), wrap(0) {}
+
+    void clear() {
+      num = 0;
+      wrap = 0;
+    }
+    void pop() {
+      if (num >= 2 && offset[0] > offset[1]) 
+       wrap = 0;  // we're eliminating a wrap
+      num--;
+      for (int i=0; i<num; i++) {
+       epoch[i] = epoch[i+1];
+       offset[i] = offset[i+1];
+      }
+    }
+    void push(epoch_t e, off64_t o) {
+      assert(num < 4);
+      if (num > 2 && 
+         epoch[num-1] == e &&
+         epoch[num-2] == (e-1)) 
+       num--;  // tail was an epoch boundary; replace it.
+      epoch[num] = e;
+      offset[num] = o;
+      num++;
+    }
+    epoch_t last_epoch() {
+      if (num)
+       return epoch[num-1];
+      else
+       return 0;
+    }
+  } header;
+
+  struct entry_header_t {
+    uint64_t epoch;
+    uint64_t len;
+    uint64_t magic1;
+    uint64_t magic2;
+    
+    void make_magic(off64_t pos, uint64_t fsid) {
+      magic1 = pos;
+      magic2 = fsid ^ epoch ^ len;
+    }
+    bool check_magic(off64_t pos, uint64_t fsid) {
+      return
+       magic1 == (uint64_t)pos &&
+       magic2 == (fsid ^ epoch ^ len);
+    }
+  };
+
+private:
+  string fn;
+
+  off64_t max_size;
+  size_t block_size;
+  bool directio;
+  bool full, writing, must_write_header;
+  off64_t write_pos;      // byte where next entry written goes
+  off64_t read_pos;       // 
+
+  int fd;
+
+  // to be journaled
+  list<pair<epoch_t,bufferlist> > writeq;
+  list<Context*> commitq;
+
+  // being journaled
+  list<Context*> writingq;
+  
+  // write thread
+  Mutex write_lock;
+  Cond write_cond;
+  bool write_stop;
+
+  int _open(bool wr);
+  void print_header();
+  void read_header();
+  bufferptr prepare_header();
+  void start_writer();
+  void stop_writer();
+  void write_thread_entry();
+
+  void check_for_wrap(epoch_t epoch, off64_t pos, off64_t size);
+  bool prepare_single_dio_write(bufferlist& bl);
+  void prepare_multi_write(bufferlist& bl);
+  void do_write(bufferlist& bl);
+
+  class Writer : public Thread {
+    FileJournal *journal;
+  public:
+    Writer(FileJournal *fj) : journal(fj) {}
+    void *entry() {
+      journal->write_thread_entry();
+      return 0;
+    }
+  } write_thread;
+
+  off64_t get_top() {
+    if (directio)
+      return block_size;
+    else
+      return sizeof(header);
+  }
+
+ public:
+  FileJournal(__u64 fsid, Finisher *fin, const char *f, bool dio=false) : 
+    Journal(fsid, fin), fn(f),
+    max_size(0), block_size(0),
+    directio(dio),
+    full(false), writing(false), must_write_header(false),
+    write_pos(0), read_pos(0),
+    fd(-1),
+    write_stop(false), write_thread(this) { }
+  ~FileJournal() {}
+
+  int create();
+  int open(epoch_t epoch);
+  void close();
+
+  void make_writeable();
+
+  // writes
+  void submit_entry(epoch_t epoch, bufferlist& e, Context *oncommit);  // submit an item
+  void commit_epoch_start(epoch_t);   // mark epoch boundary
+  void commit_epoch_finish(epoch_t);  // mark prior epoch as committed (we can expire)
+
+  bool read_entry(bufferlist& bl, epoch_t& e);
+
+  bool is_full();
+
+  // reads
+};
+
+#endif
diff --git a/src/osd/Journal.h b/src/osd/Journal.h
new file mode 100644 (file)
index 0000000..9fb9077
--- /dev/null
@@ -0,0 +1,48 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __EBOFS_JOURNAL_H
+#define __EBOFS_JOURNAL_H
+
+#include "include/buffer.h"
+#include "include/Context.h"
+#include "common/Finisher.h"
+
+class Journal {
+protected:
+  __u64 fsid;
+  Finisher *finisher;
+
+public:
+  Journal(__u64 f, Finisher *fin) : fsid(f), finisher(fin) { }
+  virtual ~Journal() { }
+
+  virtual int create() = 0;
+  virtual int open(epoch_t epoch) = 0;
+  virtual void close() = 0;
+
+  // writes
+  virtual void make_writeable() = 0;
+  virtual void submit_entry(epoch_t epoch, bufferlist& e, Context *oncommit) = 0;
+  virtual void commit_epoch_start(epoch_t) = 0;  // mark epoch boundary
+  virtual void commit_epoch_finish(epoch_t) = 0; // mark prior epoch as committed (we can expire)
+  virtual bool read_entry(bufferlist& bl, epoch_t &e) = 0;
+  virtual bool is_full() = 0;
+
+  // reads/recovery
+  
+};
+
+#endif