]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal replay, makefile cleanup
authorSage Weil <sage@newdream.net>
Fri, 25 Jan 2008 05:59:38 +0000 (21:59 -0800)
committerSage Weil <sage@newdream.net>
Fri, 25 Jan 2008 05:59:38 +0000 (21:59 -0800)
src/Makefile
src/ebofs/FileJournal.cc
src/ebofs/FileJournal.h
src/ebofs/mkfs.ebofs.cc
src/ebofs/streamtest.cc
src/ebofs/test.ebofs.cc

index 6f383bcb498681a9bed39daf6bcb8b1d00ec831f..ab1ee7d61df5196adca0ce468eb4073db63e2355 100644 (file)
@@ -226,6 +226,8 @@ streamtest.ebofs: ebofs/streamtest.cc config.cc common/Clock.o ebofs.o
 dupstore: dupstore.cc config.cc ebofs.o common/Clock.o common/Timer.o osd/FakeStore.o
        ${CXX} ${CFLAGS} ${LIBS} $^ -o $@
 
+allebofs: mkfs.ebofs test.ebofs streamtest.ebofs dupstore
+
 
 # hadoop
 libhadoopcephfs.so: client/hadoop/CephFSInterface.cc client.o osdc.o msg/SimpleMessenger.o common.o
index af2111a0ef17501c6ce13e36a315131528d9de2c..73a1009bdbdb2cceec433cc6919ac8290762b017 100644 (file)
 #define derr(x) if (x <= g_conf.debug_ebofs) *_derr << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal "
 
 
-int FileJournal::_open()
+int FileJournal::_open(bool forwrite)
 {
-  int flags = O_RDWR;
-  if (directio) flags |= O_DIRECT;
+  int flags;
+
+  if (forwrite) {
+    flags = O_RDONLY;
+  } else {
+    flags = O_RDWR;
+    if (directio) flags |= O_DIRECT;
+  }
   
+  if (fd >= 0) 
+    ::close(fd);
   fd = ::open(fn.c_str(), flags);
   if (fd < 0) {
     dout(2) << "_open failed " << errno << " " << strerror(errno) << dendl;
     return -errno;
   }
-  assert(fd > 0);
 
   // get size
   struct stat st;
@@ -53,7 +60,7 @@ int FileJournal::create()
 {
   dout(2) << "create " << fn << dendl;
 
-  int err = _open();
+  int err = _open(true);
   if (err < 0) return err;
 
   // write empty header
@@ -69,6 +76,7 @@ int FileJournal::create()
   write_pos = get_top();
 
   ::close(fd);
+  fd = -1;
   dout(2) << "create done" << dendl;
   return 0;
 }
@@ -77,7 +85,7 @@ int FileJournal::open()
 {
   dout(2) << "open " << fn << dendl;
 
-  int err = _open();
+  int err = _open(false);
   if (err < 0) return err;
 
   // assume writeable, unless...
@@ -121,8 +129,6 @@ int FileJournal::open()
       }
     }
   }
-  write_header();
-  start_writer();
 
   return 0;
 }
@@ -139,7 +145,7 @@ void FileJournal::close()
   assert(commitq.empty());
   assert(fd > 0);
   ::close(fd);
-  fd = 0;
+  fd = -1;
 }
 
 void FileJournal::start_writer()
@@ -311,7 +317,6 @@ void FileJournal::prepare_multi_write(bufferlist& bl)
   }
 }
 
-
 bool FileJournal::prepare_single_dio_write(bufferlist& bl)
 {
   // grab next item
@@ -359,6 +364,8 @@ void FileJournal::do_write(bufferlist& bl)
 
   writing = true;
 
+  header_t old_header = header;
+
   write_lock.Unlock();
 
   dout(15) << "do_write writing " << write_pos << "~" << bl.length() 
@@ -378,12 +385,18 @@ void FileJournal::do_write(bufferlist& bl)
     ::pwrite(fd, (char*)(*it).c_str(), (*it).length(), pos);
     pos += (*it).length();
   }
+  ::fdatasync(fd);
       
   write_lock.Lock();    
 
   writing = false;
-  write_pos += bl.length();
-  ebofs->queue_finishers(writingq);
+  if (memcmp(&old_header, &header, sizeof(header)) == 0) {
+    write_pos += bl.length();
+    ebofs->queue_finishers(writingq);
+  } else {
+    derr(0) << "do_write finished write but header changed?  not moving write_pos." << dendl;
+    assert(writingq.empty());
+  }
 }
 
 
@@ -451,7 +464,7 @@ void FileJournal::commit_epoch_start()
       dout(1) << " journal FULL, ignoring this epoch" << dendl;
       return;
     }
-    
+
     dout(1) << " clearing FULL flag, journal now usable" << dendl;
     full = false;
   } 
@@ -506,11 +519,16 @@ void FileJournal::commit_epoch_finish(epoch_t new_epoch)
 
 void FileJournal::make_writeable()
 {
+  _open(true);
+
   if (read_pos)
     write_pos = read_pos;
   else
     write_pos = get_top();
   read_pos = 0;
+
+  must_write_header = true;
+  start_writer();
 }
 
 
@@ -536,8 +554,7 @@ bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch)
 
   // header
   entry_header_t h;
-  ::lseek(fd, read_pos, SEEK_SET);
-  ::read(fd, &h, sizeof(h));
+  ::pread(fd, &h, sizeof(h), read_pos);
   if (!h.check_magic(read_pos, header.fsid)) {
     dout(2) << "read_entry " << read_pos << " : bad header magic, end of journal" << dendl;
     return false;
index 9365645ffdf76a084bdfb500105dff0866995ccf..a0a5cc40ad33a35616b91488cfd94d09a5dc87f9 100644 (file)
@@ -116,7 +116,7 @@ private:
   Cond write_cond;
   bool write_stop;
 
-  int _open();
+  int _open(bool wr);
   void print_header();
   void read_header();
   bufferptr prepare_header();
@@ -154,7 +154,7 @@ private:
     directio(dio),
     full(false), writing(false), must_write_header(false),
     write_pos(0), read_pos(0),
-    fd(0),
+    fd(-1),
     write_stop(false), write_thread(this) { }
   ~FileJournal() {}
 
index d1d5975e7fd65a925c6155bf7cda6df29fd6c93d..2f29ab93cd4ff63c72d6ece5d2d1a84a82e50329 100644 (file)
 #include "ebofs/Ebofs.h"
 
 
-int main(int argc, char **argv)
+int main(int argc, const char **argv)
 {
   // args
-  vector<char*> args;
+  vector<const char*> args;
   argv_to_vec(argc, argv, args);
   parse_config_options(args);
 
@@ -29,7 +29,7 @@ int main(int argc, char **argv)
     cerr << "usage: mkfs.ebofs [options] <device file>" << std::endl;
     return -1;
   }
-  char *filename = args[0];
+  const char *filename = args[0];
 
   // mkfs
   Ebofs mfs(filename);
index 41470591e0ded98e89450ca2d238f434256dc6a8..144d52b1d9f0a637337e28cb870a57f27cb8c26e 100644 (file)
 #include <iostream>
 #include "ebofs/Ebofs.h"
 
-map<off_t, pair<utime_t,utime_t> > writes;
+struct io {
+  utime_t start, ack, commit;
+  bool done() {
+    return ack.sec() && commit.sec();
+  }
+};
+map<off_t,io> writes;
 
 Mutex lock;
 
+
+void pr(off_t off)
+{
+  io &i = writes[off];
+  dout(0) << off << "\t" 
+         << (i.ack - i.start) << "\t"
+         << (i.commit - i.start) << dendl;
+  writes.erase(off);
+}
+
+void set_start(off_t off, utime_t t)
+{
+  Mutex::Locker l(lock);
+  writes[off].start = t;
+}
+
+void set_ack(off_t off, utime_t t)
+{
+  Mutex::Locker l(lock);
+  writes[off].ack = t;
+  if (writes[off].done())
+    pr(off);
+}
+
+void set_commit(off_t off, utime_t t)
+{
+  Mutex::Locker l(lock);
+  writes[off].commit = t;
+  if (writes[off].done())
+    pr(off);
+}
+
+
 struct C_Commit : public Context {
   off_t off;
   C_Commit(off_t o) : off(o) {}
   void finish(int r) {
     Mutex::Locker l(lock);
-    utime_t now = g_clock.now();
-    dout(0) << off << "\t" 
-        << (writes[off].second-writes[off].first) << "\t"
-        << (now - writes[off].first) << dendl;
-    writes.erase(off);
+    set_commit(off, g_clock.now());
   }
 };
 
@@ -78,17 +113,11 @@ int main(int argc, const char **argv)
   cout << "# offset\tack\tcommit" << std::endl;
   while (now < end) {
     object_t oid(1,1);
-    utime_t start;
-    {
-      Mutex::Locker l(lock);
-      start = writes[pos].first = now;
-    }
+    utime_t start = now;
+    set_start(pos, now);
     fs.write(oid, pos, bytes, bl, new C_Commit(pos));
     now = g_clock.now();
-    {
-      Mutex::Locker l(lock);
-      writes[pos].second = now;
-    }
+    set_ack(pos, now);
     pos += bytes;
 
     // wait?
index dd78b8ff9a11ebb7a4914c8a73f11d2401536e1d..8e4bfcdbd4597059b8ee6b8c0d7361bb98483266 100644 (file)
@@ -44,7 +44,7 @@ public:
       coll_t cid = rand() % 50;
       off_t off = rand() % 10000;//0;//rand() % 1000000;
       off_t len = 1+rand() % 100000;
-      char *a = "one";
+      const char *a = "one";
       if (rand() % 2) a = "two";
       int l = 3;//rand() % 10;
 
@@ -150,15 +150,15 @@ public:
   }
 };
 
-int main(int argc, char **argv)
+int main(int argc, const char **argv)
 {
-  vector<char*> args;
+  vector<const char*> args;
   argv_to_vec(argc, argv, args);
   parse_config_options(args);
 
   // args
   if (args.size() != 3) return -1;
-  char *filename = args[0];
+  const char *filename = args[0];
   int seconds = atoi(args[1]);
   int threads = atoi(args[2]);
   if (!threads) threads = 1;