]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
* beginnings of ebofs journaling
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 1 Jun 2007 19:23:48 +0000 (19:23 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 1 Jun 2007 19:23:48 +0000 (19:23 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1386 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/ebofs/FileJournal.cc [new file with mode: 0644]
trunk/ceph/ebofs/FileJournal.h [new file with mode: 0644]
trunk/ceph/ebofs/Journal.h [new file with mode: 0644]

diff --git a/trunk/ceph/ebofs/FileJournal.cc b/trunk/ceph/ebofs/FileJournal.cc
new file mode 100644 (file)
index 0000000..e354dd2
--- /dev/null
@@ -0,0 +1,173 @@
+
+#include "FileJournal.h"
+#include "Ebofs.h"
+
+#include "config.h"
+#define dout(x) if (x <= g_conf.debug_ebofs) cout << "ebofs(" << dev.get_device_name() << ").journal "
+#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << dev.get_device_name() << ").journal "
+
+
+
+void FileJournal::create()
+{
+  dout(1) << "create " << fn << endl;
+
+  // open/create
+  fd = ::open(fn.c_str(), O_CREAT|O_WRONLY);
+  assert(fd > 0);
+
+  ::ftruncate(fd);
+  ::fchmod(fd, 0644);
+
+  ::close(fd);
+}
+
+
+void FileJournal::open()
+{
+  dout(1) << "open " << fn << endl;
+
+  // open and file
+  assert(fd == 0);
+  fd = ::open(fn.c_str(), O_RDWR);
+  assert(fd > 0);
+
+  // read header?
+  // ***
+
+
+  start_writer();
+}
+
+void FileJournal::close()
+{
+  dout(1) << "close " << fn << endl;
+
+  // stop writer thread
+  stop_writer();
+
+  // close
+  assert(q.empty());
+  assert(fd > 0);
+  ::close(fd);
+  fd = 0;
+}
+
+void FileJournal::start_writer()
+{
+  writer_stop = false;
+  write_thread.create();
+}
+
+void FileJournal::stop_writer()
+{
+  writer_lock.Lock();
+  {
+       writer_stop = true;
+       writer_cond.Signal();
+  } 
+  writer_lock.Unlock();
+  write_thread.join();
+}
+
+void FileJournal::write_thread_entry()
+{
+  dout(10) << "write_thread_entry start" << endl;
+  writer_lock.Lock();
+  
+  while (!write_stop) {
+       if (writeq.empty()) {
+         // sleep
+         dout(20) << "write_thread_entry going to sleep" << endl;
+         write_cond.Wait(writer_lock);
+         dout(20) << "write_thread_entry woke up" << endl;
+         continue;
+       }
+
+       // do queued writes
+       while (!writeq.empty()) {
+         // grab next item
+         epoch_t e = writeq.front().first;
+         bufferlist bl;
+         bl.claim(writeq.front().second);
+         writeq.pop_front();
+         Context *oncommit = commitq.front();
+         commitq.pop_front();
+
+         dout(15) << "write_thread_entry writing " << bottom << " : " 
+                          << bl.length() 
+                          << " epoch " << e
+                          << endl;
+         
+         // write epoch, len, data.
+         ::fseek(fd, bottom, SEEK_SET);
+         ::write(fd, &e, sizeof(e));
+
+         uint32_t len = bl.length();
+         ::write(fd, &len, sizeof(len));
+
+         for (list<bufferptr>::const_iterator it = bl.buffers().begin();
+                  it != bl.buffers().end();
+                  it++) {
+               if ((*it).length() == 0) continue;  // blank buffer.
+               ::write(fd, (char*)(*it).c_str(), (*it).length() );
+         }
+
+         // move position pointer
+         bottom += sizeof(epoch_t) + sizeof(uint32_t) + e.length();
+
+         // do commit callback
+         if (oncommit) {
+               oncommit->finish(0);
+               delete oncommit;
+         }
+       }
+  }
+  
+  writer_lock.Unlock();
+  dout(10) << "write_thread_entry finish" << endl;
+}
+
+void FileJournal::submit_entry(bufferlist& e, Context *oncommit)
+{
+  dout(10) << "submit_entry " << bottom << " : " << e.length()
+                  << " epoch " << ebofs->super_epoch
+                  << " " << oncommit << endl;
+
+  // dump on queue
+  writeq.push_back(pair<epoch_t,bufferlist>(ebofs->super_epoch, e));
+  commitq.push_back(oncommit);
+
+  // kick writer thread
+  writer_cond.Signal();
+}
+
+
+void FileJournal::commit_epoch_start()
+{
+  dout(10) << "commit_epoch_start" << endl;
+}
+
+void FileJournal::commit_epoch_finish()
+{
+  dout(10) << "commit_epoch_finish" << endl;
+  
+  // flush any unwritten items in previous epoch
+  writer_lock.Lock();
+  {
+       while (!writeq.empty() &&
+                  writeq.front().first < ebofs->super_epoch) {
+         dout(15) << " dropping uncommitted journal item from prior epoch" << endl;
+         writeq.pop_front();
+         Context *oncommit = commitq.front();
+         commitq.pop_front();
+
+         if (oncommit) {
+               oncommit->finish(0);
+               delete oncommit;
+         }
+       }
+  }
+  writer_lock.Unlock();
+  
+}
diff --git a/trunk/ceph/ebofs/FileJournal.h b/trunk/ceph/ebofs/FileJournal.h
new file mode 100644 (file)
index 0000000..a6611a0
--- /dev/null
@@ -0,0 +1,75 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __EBOFS_FILEJOURNAL_H
+#define __EBOFS_FILEJOURNAL_H
+
+
+#include "Journal.h"
+
+
+class FileJournal : public Journal {
+  string fn;
+
+  off_t max_size;
+  off_t top;            // byte of first entry chronologically
+  off_t bottom;         // byte where next entry goes
+  off_t committing_to;  // offset of epoch boundary, if we are committing
+
+  int fd;
+
+  list<pair<epoch_t,bufferlist> > writeq;  // currently journaling
+  map<off_t,Context*> commitq; // currently journaling
+  
+  // write thread
+  bool writer_stop;
+
+  void write_header();
+  void start_writer();
+  void stop_writer();
+  void write_thread_entry();
+
+  class Writer : public Thread {
+       FileJournal *journal;
+  public:
+       Writer(FileJournal *fj) : journal(fj) {}
+       void *entry() {
+         journal->write_thread();
+         return 0;
+       }
+  } writer_thread;
+
+ public:
+  FileJournal(Ebofs *e, char *f, off_t sz) : 
+       Journal(e),
+       fn(f), max_size(sz),
+       top(0), bottom(0), committing_to(0),
+       fd(0),
+       writer_stop(false), writer_thread(this)
+       { }
+  ~FileJournal() {}
+
+  void create();
+  void open();
+  void close();
+
+  // writes
+  void submit_entry(bufferlist& e, Context *oncommit);  // submit an item
+  void commit_epoch_start();  // mark epoch boundary
+  void commit_epoch_finish(); // mark prior epoch as committed (we can expire)
+
+  // reads
+};
+
+#endif
diff --git a/trunk/ceph/ebofs/Journal.h b/trunk/ceph/ebofs/Journal.h
new file mode 100644 (file)
index 0000000..a432eaa
--- /dev/null
@@ -0,0 +1,39 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __EBOFS_JOURNAL_H
+#define __EBOFS_JOURNAL_H
+
+
+class Journal {
+  Ebofs *ebofs;
+
+ public:
+  Journal(Ebofs *e) : ebofs(e) { }
+  virtual ~Journal() { }
+
+  virtual void create() = 0;
+  virtual void open() = 0;
+  virtual void close() = 0;
+
+  // writes
+  virtual void submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item
+  virtual void commit_epoch_start() = 0;  // mark epoch boundary
+  virtual void commit_epoch_finish(list<Context*>& ls) = 0; // mark prior epoch as committed (we can expire)
+
+  // reads/recovery
+  
+};
+
+#endif