--- /dev/null
+
+#include "FileJournal.h"
+#include "Ebofs.h"
+
+#include "config.h"
+#define dout(x) if (x <= g_conf.debug_ebofs) cout << "ebofs(" << dev.get_device_name() << ").journal "
+#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << dev.get_device_name() << ").journal "
+
+
+
+void FileJournal::create()
+{
+ dout(1) << "create " << fn << endl;
+
+ // open/create
+ fd = ::open(fn.c_str(), O_CREAT|O_WRONLY);
+ assert(fd > 0);
+
+ ::ftruncate(fd);
+ ::fchmod(fd, 0644);
+
+ ::close(fd);
+}
+
+
+void FileJournal::open()
+{
+ dout(1) << "open " << fn << endl;
+
+ // open and file
+ assert(fd == 0);
+ fd = ::open(fn.c_str(), O_RDWR);
+ assert(fd > 0);
+
+ // read header?
+ // ***
+
+
+ start_writer();
+}
+
+void FileJournal::close()
+{
+ dout(1) << "close " << fn << endl;
+
+ // stop writer thread
+ stop_writer();
+
+ // close
+ assert(q.empty());
+ assert(fd > 0);
+ ::close(fd);
+ fd = 0;
+}
+
+void FileJournal::start_writer()
+{
+ writer_stop = false;
+ write_thread.create();
+}
+
+void FileJournal::stop_writer()
+{
+ writer_lock.Lock();
+ {
+ writer_stop = true;
+ writer_cond.Signal();
+ }
+ writer_lock.Unlock();
+ write_thread.join();
+}
+
+void FileJournal::write_thread_entry()
+{
+ dout(10) << "write_thread_entry start" << endl;
+ writer_lock.Lock();
+
+ while (!write_stop) {
+ if (writeq.empty()) {
+ // sleep
+ dout(20) << "write_thread_entry going to sleep" << endl;
+ write_cond.Wait(writer_lock);
+ dout(20) << "write_thread_entry woke up" << endl;
+ continue;
+ }
+
+ // do queued writes
+ while (!writeq.empty()) {
+ // grab next item
+ epoch_t e = writeq.front().first;
+ bufferlist bl;
+ bl.claim(writeq.front().second);
+ writeq.pop_front();
+ Context *oncommit = commitq.front();
+ commitq.pop_front();
+
+ dout(15) << "write_thread_entry writing " << bottom << " : "
+ << bl.length()
+ << " epoch " << e
+ << endl;
+
+ // write epoch, len, data.
+ ::fseek(fd, bottom, SEEK_SET);
+ ::write(fd, &e, sizeof(e));
+
+ uint32_t len = bl.length();
+ ::write(fd, &len, sizeof(len));
+
+ for (list<bufferptr>::const_iterator it = bl.buffers().begin();
+ it != bl.buffers().end();
+ it++) {
+ if ((*it).length() == 0) continue; // blank buffer.
+ ::write(fd, (char*)(*it).c_str(), (*it).length() );
+ }
+
+ // move position pointer
+ bottom += sizeof(epoch_t) + sizeof(uint32_t) + e.length();
+
+ // do commit callback
+ if (oncommit) {
+ oncommit->finish(0);
+ delete oncommit;
+ }
+ }
+ }
+
+ writer_lock.Unlock();
+ dout(10) << "write_thread_entry finish" << endl;
+}
+
+void FileJournal::submit_entry(bufferlist& e, Context *oncommit)
+{
+ dout(10) << "submit_entry " << bottom << " : " << e.length()
+ << " epoch " << ebofs->super_epoch
+ << " " << oncommit << endl;
+
+ // dump on queue
+ writeq.push_back(pair<epoch_t,bufferlist>(ebofs->super_epoch, e));
+ commitq.push_back(oncommit);
+
+ // kick writer thread
+ writer_cond.Signal();
+}
+
+
+void FileJournal::commit_epoch_start()
+{
+ dout(10) << "commit_epoch_start" << endl;
+}
+
+void FileJournal::commit_epoch_finish()
+{
+ dout(10) << "commit_epoch_finish" << endl;
+
+ // flush any unwritten items in previous epoch
+ writer_lock.Lock();
+ {
+ while (!writeq.empty() &&
+ writeq.front().first < ebofs->super_epoch) {
+ dout(15) << " dropping uncommitted journal item from prior epoch" << endl;
+ writeq.pop_front();
+ Context *oncommit = commitq.front();
+ commitq.pop_front();
+
+ if (oncommit) {
+ oncommit->finish(0);
+ delete oncommit;
+ }
+ }
+ }
+ writer_lock.Unlock();
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __EBOFS_FILEJOURNAL_H
+#define __EBOFS_FILEJOURNAL_H
+
+
+#include "Journal.h"
+
+
+class FileJournal : public Journal {
+ string fn;
+
+ off_t max_size;
+ off_t top; // byte of first entry chronologically
+ off_t bottom; // byte where next entry goes
+ off_t committing_to; // offset of epoch boundary, if we are committing
+
+ int fd;
+
+ list<pair<epoch_t,bufferlist> > writeq; // currently journaling
+ map<off_t,Context*> commitq; // currently journaling
+
+ // write thread
+ bool writer_stop;
+
+ void write_header();
+ void start_writer();
+ void stop_writer();
+ void write_thread_entry();
+
+ class Writer : public Thread {
+ FileJournal *journal;
+ public:
+ Writer(FileJournal *fj) : journal(fj) {}
+ void *entry() {
+ journal->write_thread();
+ return 0;
+ }
+ } writer_thread;
+
+ public:
+ FileJournal(Ebofs *e, char *f, off_t sz) :
+ Journal(e),
+ fn(f), max_size(sz),
+ top(0), bottom(0), committing_to(0),
+ fd(0),
+ writer_stop(false), writer_thread(this)
+ { }
+ ~FileJournal() {}
+
+ void create();
+ void open();
+ void close();
+
+ // writes
+ void submit_entry(bufferlist& e, Context *oncommit); // submit an item
+ void commit_epoch_start(); // mark epoch boundary
+ void commit_epoch_finish(); // mark prior epoch as committed (we can expire)
+
+ // reads
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __EBOFS_JOURNAL_H
+#define __EBOFS_JOURNAL_H
+
+
+class Journal {
+ Ebofs *ebofs;
+
+ public:
+ Journal(Ebofs *e) : ebofs(e) { }
+ virtual ~Journal() { }
+
+ virtual void create() = 0;
+ virtual void open() = 0;
+ virtual void close() = 0;
+
+ // writes
+ virtual void submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item
+ virtual void commit_epoch_start() = 0; // mark epoch boundary
+ virtual void commit_epoch_finish(list<Context*>& ls) = 0; // mark prior epoch as committed (we can expire)
+
+ // reads/recovery
+
+};
+
+#endif