--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "DIOJournal.h"
+#include "Ebofs.h"
+
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+
+#include "config.h"
+
+#define dout(x) if (x <= g_conf.debug_ebofs) *_dout << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal "
+#define derr(x) if (x <= g_conf.debug_ebofs) *_derr << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal "
+
+
+int DIOJournal::create()
+{
+ dout(2) << "create " << fn << dendl;
+
+ // open/create
+ fd = ::open(fn.c_str(), O_RDWR|O_DIRECT);
+ if (fd < 0) {
+ dout(2) << "create failed " << errno << " " << strerror(errno) << dendl;
+ return -errno;
+ }
+ assert(fd > 0);
+
+ //::ftruncate(fd, 0);
+ //::fchmod(fd, 0644);
+
+ // get size
+ struct stat st;
+ ::fstat(fd, &st);
+ dout(2) << "create " << fn << " " << st.st_size << " bytes" << dendl;
+
+ // write empty header
+ memset(&header, 0, sizeof(header));
+ header.clear();
+ header.fsid = ebofs->get_fsid();
+ header.max_size = st.st_size;
+ write_header();
+
+ // writeable.
+ read_pos = 0;
+ write_pos = queue_pos = sizeof(header);
+
+ ::close(fd);
+
+ return 0;
+}
+
+int DIOJournal::open()
+{
+ //dout(1) << "open " << fn << dendl;
+
+ // open and file
+ assert(fd == 0);
+ fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
+ if (fd < 0) {
+ dout(2) << "open failed " << errno << " " << strerror(errno) << dendl;
+ return -errno;
+ }
+ assert(fd > 0);
+
+ // assume writeable, unless...
+ read_pos = 0;
+ write_pos = queue_pos = sizeof(header);
+
+ // read header?
+ read_header();
+ if (header.fsid != ebofs->get_fsid()) {
+ dout(2) << "open journal fsid doesn't match, invalid (someone else's?) journal" << dendl;
+ }
+ else if (header.num > 0) {
+ // valid header, pick an offset
+ for (int i=0; i<header.num; i++) {
+ if (header.epoch[i] == ebofs->get_super_epoch()) {
+ dout(2) << "using read_pos header pointer "
+ << header.epoch[i] << " at " << header.offset[i]
+ << dendl;
+ read_pos = header.offset[i];
+ write_pos = queue_pos = 0;
+ break;
+ }
+ else if (header.epoch[i] < ebofs->get_super_epoch()) {
+ dout(2) << "super_epoch is " << ebofs->get_super_epoch()
+ << ", skipping old " << header.epoch[i] << " at " << header.offset[i]
+ << dendl;
+ }
+ else if (header.epoch[i] > ebofs->get_super_epoch()) {
+ dout(2) << "super_epoch is " << ebofs->get_super_epoch()
+ << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i]
+ << dendl;
+ break;
+ }
+ }
+ }
+
+ start_writer();
+
+ return 0;
+}
+
+void DIOJournal::close()
+{
+ dout(1) << "close " << fn << dendl;
+
+ // stop writer thread
+ stop_writer();
+
+ // close
+ assert(writeq.empty());
+ assert(commitq.empty());
+ assert(fd > 0);
+ ::close(fd);
+ fd = 0;
+}
+
+void DIOJournal::start_writer()
+{
+ write_stop = false;
+ write_thread.create();
+}
+
+void DIOJournal::stop_writer()
+{
+ write_lock.Lock();
+ {
+ write_stop = true;
+ write_cond.Signal();
+ }
+ write_lock.Unlock();
+ write_thread.join();
+}
+
+
+void DIOJournal::print_header()
+{
+ for (int i=0; i<header.num; i++) {
+ if (i && header.offset[i] < header.offset[i-1]) {
+ assert(header.wrap);
+ dout(10) << "header: wrap at " << header.wrap << dendl;
+ }
+ dout(10) << "header: epoch " << header.epoch[i] << " at " << header.offset[i] << dendl;
+ }
+ //if (header.wrap) dout(10) << "header: wrap at " << header.wrap << dendl;
+}
+void DIOJournal::read_header()
+{
+ dout(10) << "read_header" << dendl;
+ memset(&header, 0, sizeof(header)); // zero out (read may fail)
+ char buf[4096];
+ ::lseek(fd, 0, SEEK_SET);
+ int r = ::read(fd, buf, 4096);
+ memcpy(&header, buf, sizeof(header));
+ if (r < 0)
+ dout(0) << "read_header error " << errno << " " << strerror(errno) << dendl;
+ print_header();
+}
+void DIOJournal::write_header()
+{
+ dout(10) << "write_header " << dendl;
+ print_header();
+
+ char buf[4096];
+ memcpy(buf, &header, sizeof(header));
+
+ ::lseek(fd, 0, SEEK_SET);
+ int r = ::write(fd, &buf, 4096);
+ if (r < 0)
+ dout(0) << "write_header error " << errno << " " << strerror(errno) << dendl;
+}
+
+
+void DIOJournal::write_thread_entry()
+{
+ dout(10) << "write_thread_entry start" << dendl;
+ write_lock.Lock();
+
+ while (!write_stop) {
+ if (writeq.empty()) {
+ // sleep
+ dout(20) << "write_thread_entry going to sleep" << dendl;
+ assert(write_pos == queue_pos);
+ write_cond.Wait(write_lock);
+ dout(20) << "write_thread_entry woke up" << dendl;
+ continue;
+ }
+
+ // do queued writes
+ while (!writeq.empty()) {
+ // grab next item
+ epoch_t epoch = writeq.front().first;
+
+ int len = writeq.front().second.length() + sizeof(entry_header_t);
+ int dolen = DIV_ROUND_UP(len,4096) * 4096;
+ bufferptr bp = buffer::create_page_aligned(dolen);
+ entry_header_t *h = (entry_header_t*)bp.c_str();
+ writeq.front().second.copy(0, writeq.front().second.length(), bp.c_str()+sizeof(entry_header_t));
+
+ writeq.pop_front();
+ Context *oncommit = commitq.front();
+ commitq.pop_front();
+
+ // wrap?
+ if (write_pos == header.wrap) {
+ dout(15) << "write_thread_entry wrapped write_pos at " << write_pos << " to " << sizeof(header_t) << dendl;
+ assert(header.wrap == write_pos);
+ write_header();
+ write_pos = 4096; //sizeof(header_t);
+ }
+
+ // write!
+ dout(-15) << "write_thread_entry writing " << write_pos << " : "
+ << len << " (" << dolen << ")"
+ << " epoch " << epoch
+ << dendl;
+
+ // write entry header
+ //entry_header_t h;
+ h->epoch = epoch;
+ h->len = len;//bl.length();
+ h->make_magic(write_pos, header.fsid);
+
+ ::lseek(fd, write_pos, SEEK_SET);
+ /*
+ ::write(fd, &h, sizeof(h));
+ for (list<bufferptr>::const_iterator it = bl.buffers().begin();
+ it != bl.buffers().end();
+ it++) {
+ if ((*it).length() == 0) continue; // blank buffer.
+ ::write(fd, (char*)(*it).c_str(), (*it).length() );
+ }
+ ::write(fd, &h, sizeof(h));
+ ::fsync(fd);
+ */
+ int r = ::write(fd, bp.c_str(), dolen);
+ if (r != dolen) derr(0) << "write got " << r << " not " << dolen << dendl;
+
+ // move position pointer
+ write_pos += dolen;
+
+ if (oncommit) {
+ if (1) {
+ // queue callback
+ ebofs->queue_finisher(oncommit);
+ } else {
+ // callback now
+ oncommit->finish(0);
+ delete oncommit;
+ }
+ }
+ }
+ }
+
+ write_lock.Unlock();
+ dout(10) << "write_thread_entry finish" << dendl;
+}
+
+bool DIOJournal::submit_entry(bufferlist& e, Context *oncommit)
+{
+ assert(queue_pos != 0); // bad create(), or journal didn't replay to completion.
+
+ // ** lock **
+ Mutex::Locker locker(write_lock);
+
+ // wrap? full?
+ off_t size = 2*sizeof(entry_header_t) + e.length();
+
+ if (full) return false; // already marked full.
+
+ if (header.wrap) {
+ // we're wrapped. don't overwrite ourselves.
+ if (queue_pos + size >= header.offset[0]) {
+ derr(0) << "submit_entry JOURNAL FULL (and wrapped), " << queue_pos << "+" << size
+ << " >= " << header.offset[0]
+ << dendl;
+ full = true;
+ print_header();
+ return false;
+ }
+ } else {
+ // we haven't wrapped.
+ if (queue_pos + size >= header.max_size) {
+ // is there room if we wrap?
+ if ((off_t)sizeof(header_t) + size < header.offset[0]) {
+ // yes!
+ dout(10) << "submit_entry wrapped from " << queue_pos << " to " << sizeof(header_t) << dendl;
+ header.wrap = queue_pos;
+ queue_pos = sizeof(header_t);
+ header.push(ebofs->get_super_epoch(), queue_pos);
+ } else {
+ // no room.
+ derr(0) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size
+ << " >= " << header.max_size
+ << dendl;
+ full = true;
+ return false;
+ }
+ }
+ }
+
+ dout(10) << "submit_entry " << queue_pos << " : " << e.length()
+ << " epoch " << ebofs->get_super_epoch()
+ << " " << oncommit << dendl;
+
+ // dump on queue
+ writeq.push_back(pair<epoch_t,bufferlist>(ebofs->get_super_epoch(), e));
+ commitq.push_back(oncommit);
+
+ queue_pos += size;
+
+ // kick writer thread
+ write_cond.Signal();
+
+ return true;
+}
+
+
+void DIOJournal::commit_epoch_start()
+{
+ dout(10) << "commit_epoch_start on " << ebofs->get_super_epoch()-1
+ << " -- new epoch " << ebofs->get_super_epoch()
+ << dendl;
+
+ Mutex::Locker locker(write_lock);
+
+ // was full -> empty -> now usable?
+ if (full) {
+ if (header.num != 0) {
+ dout(1) << " journal FULL, ignoring this epoch" << dendl;
+ return;
+ }
+
+ derr(0) << " clearing FULL flag, journal now usable" << dendl;
+ full = false;
+ }
+
+ // note epoch boundary
+ header.push(ebofs->get_super_epoch(), queue_pos); // note: these entries may not yet be written.
+ //write_header(); // no need to write it now, though...
+}
+
+void DIOJournal::commit_epoch_finish()
+{
+ dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << dendl;
+
+ write_lock.Lock();
+ {
+ if (full) {
+ // full journal damage control.
+ dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << dendl;
+ header.clear();
+ write_pos = queue_pos = sizeof(header_t);
+ } else {
+ // update header -- trim/discard old (committed) epochs
+ while (header.epoch[0] < ebofs->get_super_epoch())
+ header.pop();
+ }
+ write_header();
+
+ // discard any unwritten items in previous epoch, and do callbacks
+ epoch_t epoch = ebofs->get_super_epoch();
+ list<Context*> callbacks;
+ while (!writeq.empty() && writeq.front().first < epoch) {
+ dout(15) << " dropping unwritten and committed "
+ << write_pos << " : " << writeq.front().second.length()
+ << " epoch " << writeq.front().first
+ << dendl;
+ // finisher?
+ Context *oncommit = commitq.front();
+ if (oncommit) callbacks.push_back(oncommit);
+
+ write_pos += 2*sizeof(entry_header_t) + writeq.front().second.length();
+
+ // discard.
+ writeq.pop_front();
+ commitq.pop_front();
+ }
+
+ // queue the finishers
+ ebofs->queue_finishers(callbacks);
+ }
+ write_lock.Unlock();
+
+}
+
+
+void DIOJournal::make_writeable()
+{
+ if (read_pos)
+ write_pos = queue_pos = read_pos;
+ else
+ write_pos = queue_pos = sizeof(header_t);
+ read_pos = 0;
+}
+
+
+bool DIOJournal::read_entry(bufferlist& bl, epoch_t& epoch)
+{
+ if (!read_pos) {
+ dout(2) << "read_entry -- not readable" << dendl;
+ return false;
+ }
+
+ if (read_pos == header.wrap) {
+ // find wrap point
+ for (int i=1; i<header.num; i++) {
+ if (header.offset[i] < read_pos) {
+ assert(header.offset[i-1] < read_pos);
+ read_pos = header.offset[i];
+ break;
+ }
+ }
+ assert(read_pos != header.wrap);
+ dout(10) << "read_entry wrapped from " << header.wrap << " to " << read_pos << dendl;
+ }
+
+ // header
+ entry_header_t h;
+ ::lseek(fd, read_pos, SEEK_SET);
+ ::read(fd, &h, sizeof(h));
+ if (!h.check_magic(read_pos, header.fsid)) {
+ dout(2) << "read_entry " << read_pos << " : bad header magic, end of journal" << dendl;
+ return false;
+ }
+
+ // body
+ bufferptr bp(h.len);
+ ::read(fd, bp.c_str(), h.len);
+
+ // footer
+ entry_header_t f;
+ ::read(fd, &f, sizeof(h));
+ if (!f.check_magic(read_pos, header.fsid) ||
+ h.epoch != f.epoch ||
+ h.len != f.len) {
+ dout(2) << "read_entry " << read_pos << " : bad footer magic, partially entry, end of journal" << dendl;
+ return false;
+ }
+
+
+ // yay!
+ dout(1) << "read_entry " << read_pos << " : "
+ << " " << h.len << " bytes"
+ << " epoch " << h.epoch
+ << dendl;
+
+ bl.push_back(bp);
+ epoch = h.epoch;
+
+ read_pos += 2*sizeof(entry_header_t) + h.len;
+
+ return true;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __EBOFS_DIOJOURNAL_H
+#define __EBOFS_DIOJOURNAL_H
+
+
+#include "Journal.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/Thread.h"
+
+class DIOJournal : public Journal {
+public:
+ /** log header
+ * we allow 4 pointers:
+ * top/initial,
+ * one for an epoch boundary (if any),
+ * one for a wrap in the ring buffer/journal file,
+ * one for a second epoch boundary (if any).
+ * the epoch boundary one is useful only for speedier recovery in certain cases
+ * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!)
+ */
+ struct header_t {
+ uint64_t fsid;
+ int num;
+ off_t wrap;
+ off_t max_size;
+ epoch_t epoch[4];
+ off_t offset[4];
+
+ header_t() : fsid(0), num(0), wrap(0), max_size(0) {}
+
+ void clear() {
+ num = 0;
+ wrap = 0;
+ }
+ void pop() {
+ if (num >= 2 && offset[0] > offset[1])
+ wrap = 0; // we're eliminating a wrap
+ num--;
+ for (int i=0; i<num; i++) {
+ epoch[i] = epoch[i+1];
+ offset[i] = offset[i+1];
+ }
+ }
+ void push(epoch_t e, off_t o) {
+ assert(num < 4);
+ if (num > 2 &&
+ epoch[num-1] == e &&
+ epoch[num-2] == (e-1))
+ num--; // tail was an epoch boundary; replace it.
+ epoch[num] = e;
+ offset[num] = o;
+ num++;
+ }
+ } header;
+
+ struct entry_header_t {
+ uint64_t epoch;
+ uint64_t len;
+ uint64_t magic1;
+ uint64_t magic2;
+
+ void make_magic(off_t pos, uint64_t fsid) {
+ magic1 = pos;
+ magic2 = fsid ^ epoch ^ len;
+ }
+ bool check_magic(off_t pos, uint64_t fsid) {
+ return
+ magic1 == (uint64_t)pos &&
+ magic2 == (fsid ^ epoch ^ len);
+ }
+ };
+
+private:
+ string fn;
+
+ bool full;
+ off_t write_pos; // byte where next entry written goes
+ off_t queue_pos; // byte where next entry queued for write goes
+
+ off_t read_pos; //
+
+ int fd;
+
+ list<pair<epoch_t,bufferlist> > writeq; // currently journaling
+ list<Context*> commitq; // currently journaling
+
+ // write thread
+ Mutex write_lock;
+ Cond write_cond;
+ bool write_stop;
+
+ void print_header();
+ void read_header();
+ void write_header();
+ void start_writer();
+ void stop_writer();
+ void write_thread_entry();
+
+ class Writer : public Thread {
+ DIOJournal *journal;
+ public:
+ Writer(DIOJournal *fj) : journal(fj) {}
+ void *entry() {
+ journal->write_thread_entry();
+ return 0;
+ }
+ } write_thread;
+
+ public:
+ DIOJournal(Ebofs *e, char *f) :
+ Journal(e), fn(f),
+ full(false),
+ write_pos(0), queue_pos(0), read_pos(0),
+ fd(0),
+ write_stop(false), write_thread(this) { }
+ ~DIOJournal() {}
+
+ int create();
+ int open();
+ void close();
+
+ void make_writeable();
+
+ // writes
+ bool submit_entry(bufferlist& e, Context *oncommit); // submit an item
+ void commit_epoch_start(); // mark epoch boundary
+ void commit_epoch_finish(); // mark prior epoch as committed (we can expire)
+
+ bool read_entry(bufferlist& bl, epoch_t& e);
+
+ // reads
+};
+
+#endif
if (onsafe) {
dirty = true;
- while (1) {
- if (journal) {
- // journal empty transaction
- Transaction t;
- bufferlist bl;
- t._encode(bl);
- if (journal->submit_entry(bl, onsafe)) break;
- }
- commit_waiters[super_epoch].push_back(onsafe);
- break;
- }
+ if (journal) {
+ // journal empty transaction
+ Transaction t;
+ bufferlist bl;
+ t._encode(bl);
+ journal->submit_entry(bl, onsafe);
+ } else
+ queue_commit_waiter(onsafe);
}
ebofs_lock.Unlock();
}
delete onsafe; // kill callback, but still journal below (in case transaction had side effects)
onsafe = 0;
}
- while (1) {
- if (journal) {
- bufferlist bl;
- t._encode(bl);
- if (journal->submit_entry(bl, onsafe)) break;
- }
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- break;
- }
+ if (journal) {
+ bufferlist bl;
+ t._encode(bl);
+ journal->submit_entry(bl, onsafe);
+ } else
+ queue_commit_waiter(onsafe);
ebofs_lock.Unlock();
return r;
// commit waiter
if (r > 0) {
assert((size_t)r == len);
- while (1) {
- if (journal) {
- Transaction t;
- t.write(oid, off, len, bl);
- bufferlist tbl;
- t._encode(tbl);
- if (journal->submit_entry(tbl, onsafe)) break;
- }
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- break;
- }
+ if (journal) {
+ Transaction t;
+ t.write(oid, off, len, bl);
+ bufferlist tbl;
+ t._encode(tbl);
+ journal->submit_entry(tbl, onsafe);
+ } else
+ queue_commit_waiter(onsafe);
} else {
if (onsafe) delete onsafe;
}
// commit waiter
if (r > 0) {
assert((size_t)r == len);
- while (1) {
- if (journal) {
- Transaction t;
- t.zero(oid, off, len);
- bufferlist tbl;
- t._encode(tbl);
- if (journal->submit_entry(tbl, onsafe)) break;
- }
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- break;
- }
+ if (journal) {
+ Transaction t;
+ t.zero(oid, off, len);
+ bufferlist tbl;
+ t._encode(tbl);
+ journal->submit_entry(tbl, onsafe);
+ } else
+ queue_commit_waiter(onsafe);
} else {
if (onsafe) delete onsafe;
}
// journal, wait for commit
if (r >= 0) {
- while (1) {
- if (journal) {
- Transaction t;
- t.remove(oid);
- bufferlist bl;
- t._encode(bl);
- if (journal->submit_entry(bl, onsafe)) break;
- }
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- break;
- }
+ if (journal) {
+ Transaction t;
+ t.remove(oid);
+ bufferlist bl;
+ t._encode(bl);
+ journal->submit_entry(bl, onsafe);
+ } else
+ queue_commit_waiter(onsafe);
} else {
if (onsafe) delete onsafe;
}
// journal, wait for commit
if (r >= 0) {
- while (1) {
- if (journal) {
- Transaction t;
- t.truncate(oid, size);
- bufferlist bl;
- t._encode(bl);
- if (journal->submit_entry(bl, onsafe)) break;
- }
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- break;
- }
+ if (journal) {
+ Transaction t;
+ t.truncate(oid, size);
+ bufferlist bl;
+ t._encode(bl);
+ journal->submit_entry(bl, onsafe);
+ } else
+ queue_commit_waiter(onsafe);
} else {
if (onsafe) delete onsafe;
}
// journal, wait for commit
if (r >= 0) {
- while (1) {
- if (journal) {
- Transaction t;
- t.clone(from, to);
- bufferlist bl;
- t._encode(bl);
- if (journal->submit_entry(bl, onsafe)) break;
- }
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- break;
- }
+ if (journal) {
+ Transaction t;
+ t.clone(from, to);
+ bufferlist bl;
+ t._encode(bl);
+ journal->submit_entry(bl, onsafe);
+ } else
+ queue_commit_waiter(onsafe);
} else {
if (onsafe) delete onsafe;
}
// journal, wait for commit
if (r >= 0) {
- while (1) {
- if (journal) {
- Transaction t;
- t.setattr(oid, name, value, size);
- bufferlist bl;
- t._encode(bl);
- if (journal->submit_entry(bl, onsafe)) break;
- }
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- break;
- }
+ if (journal) {
+ Transaction t;
+ t.setattr(oid, name, value, size);
+ bufferlist bl;
+ t._encode(bl);
+ journal->submit_entry(bl, onsafe);
+ } else
+ queue_commit_waiter(onsafe);
} else {
if (onsafe) delete onsafe;
}
// journal, wait for commit
if (r >= 0) {
- while (1) {
- if (journal) {
- Transaction t;
- t.setattrs(oid, attrset);
- bufferlist bl;
- t._encode(bl);
- if (journal->submit_entry(bl, onsafe)) break;
- }
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- break;
- }
+ if (journal) {
+ Transaction t;
+ t.setattrs(oid, attrset);
+ bufferlist bl;
+ t._encode(bl);
+ journal->submit_entry(bl, onsafe);
+ } else
+ queue_commit_waiter(onsafe);
} else {
if (onsafe) delete onsafe;
}
// journal, wait for commit
if (r >= 0) {
- while (1) {
- if (journal) {
- Transaction t;
- t.rmattr(oid, name);
- bufferlist bl;
- t._encode(bl);
- if (journal->submit_entry(bl, onsafe)) break;
- }
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- break;
- }
+ if (journal) {
+ Transaction t;
+ t.rmattr(oid, name);
+ bufferlist bl;
+ t._encode(bl);
+ journal->submit_entry(bl, onsafe);
+ } else
+ queue_commit_waiter(onsafe);
} else {
if (onsafe) delete onsafe;
}
// journal, wait for commit
if (r >= 0) {
- while (1) {
- if (journal) {
- Transaction t;
- t.create_collection(cid);
- bufferlist bl;
- t._encode(bl);
- if (journal->submit_entry(bl, onsafe)) break;
- }
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- break;
- }
+ if (journal) {
+ Transaction t;
+ t.create_collection(cid);
+ bufferlist bl;
+ t._encode(bl);
+ journal->submit_entry(bl, onsafe);
+ } else
+ queue_commit_waiter(onsafe);
} else {
if (onsafe) delete onsafe;
}
// journal, wait for commit
if (r >= 0) {
- while (1) {
- if (journal) {
- Transaction t;
- t.remove_collection(cid);
- bufferlist bl;
- t._encode(bl);
- if (journal->submit_entry(bl, onsafe)) break;
- }
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- break;
- }
+ if (journal) {
+ Transaction t;
+ t.remove_collection(cid);
+ bufferlist bl;
+ t._encode(bl);
+ journal->submit_entry(bl, onsafe);
+ } else
+ queue_commit_waiter(onsafe);
} else {
if (onsafe) delete onsafe;
}
// journal, wait for commit
if (r >= 0) {
- while (1) {
- if (journal) {
- Transaction t;
- t.collection_add(cid, oid);
- bufferlist bl;
- t._encode(bl);
- if (journal->submit_entry(bl, onsafe)) break;
- }
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- break;
- }
+ if (journal) {
+ Transaction t;
+ t.collection_add(cid, oid);
+ bufferlist bl;
+ t._encode(bl);
+ journal->submit_entry(bl, onsafe);
+ } else
+ queue_commit_waiter(onsafe);
} else {
if (onsafe) delete onsafe;
}
// journal, wait for commit
if (r >= 0) {
- while (1) {
- if (journal) {
- Transaction t;
- t.collection_remove(cid, oid);
- bufferlist bl;
- t._encode(bl);
- if (journal->submit_entry(bl, onsafe)) break;
- }
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- break;
- }
+ if (journal) {
+ Transaction t;
+ t.collection_remove(cid, oid);
+ bufferlist bl;
+ t._encode(bl);
+ journal->submit_entry(bl, onsafe);
+ } else
+ queue_commit_waiter(onsafe);
} else {
if (onsafe) delete onsafe;
}
// journal, wait for commit
if (r >= 0) {
- while (1) {
- if (journal) {
- Transaction t;
- t.collection_setattr(cid, name, value, size);
- bufferlist bl;
- t._encode(bl);
- if (journal->submit_entry(bl, onsafe)) break;
- }
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- break;
- }
+ if (journal) {
+ Transaction t;
+ t.collection_setattr(cid, name, value, size);
+ bufferlist bl;
+ t._encode(bl);
+ journal->submit_entry(bl, onsafe);
+ } else
+ queue_commit_waiter(onsafe);
} else {
if (onsafe) delete onsafe;
}
// journal, wait for commit
if (r >= 0) {
- while (1) {
- if (journal) {
- Transaction t;
- t.collection_rmattr(cid, name);
- bufferlist bl;
- t._encode(bl);
- if (journal->submit_entry(bl, onsafe)) break;
- }
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- break;
- }
+ if (journal) {
+ Transaction t;
+ t.collection_rmattr(cid, name);
+ bufferlist bl;
+ t._encode(bl);
+ journal->submit_entry(bl, onsafe);
+ } else
+ queue_commit_waiter(onsafe);
} else {
if (onsafe) delete onsafe;
}
public:
uint64_t get_fsid() { return super_fsid; }
epoch_t get_super_epoch() { return super_epoch; }
+
+ void queue_commit_waiter(Context *oncommit) {
+ if (oncommit)
+ commit_waiters[super_epoch].push_back(oncommit);
+ }
+
+
protected:
public:
- Ebofs(const char *devfn, char *jfn=0) :
+ Ebofs(const char *devfn, const char *jfn=0) :
fake_writes(false),
dev(devfn),
mounted(false), unmounting(false), dirty(false), readonly(false),
super_epoch(0), commit_starting(false), commit_thread_started(false),
commit_thread(this),
- journalfn(jfn), journal(0),
+ journal(0),
free_blocks(0), limbo_blocks(0),
allocator(this),
nodepool(ebofs_lock),
finisher_stop(false), finisher_thread(this) {
for (int i=0; i<EBOFS_NUM_FREE_BUCKETS; i++)
free_tab[i] = 0;
- if (!journalfn) {
+ if (jfn) {
+ journalfn = new char[strlen(jfn) + 1];
+ strcpy(journalfn, jfn);
+ } else {
journalfn = new char[strlen(devfn) + 100];
strcpy(journalfn, devfn);
strcat(journalfn, ".journal");
// writeable.
read_pos = 0;
- write_pos = queue_pos = sizeof(header);
+ write_pos = sizeof(header);
::close(fd);
// assume writeable, unless...
read_pos = 0;
- write_pos = queue_pos = sizeof(header);
+ write_pos = sizeof(header);
// read header?
read_header();
<< header.epoch[i] << " at " << header.offset[i]
<< dendl;
read_pos = header.offset[i];
- write_pos = queue_pos = 0;
+ write_pos = 0;
break;
}
else if (header.epoch[i] < ebofs->get_super_epoch()) {
if (writeq.empty()) {
// sleep
dout(20) << "write_thread_entry going to sleep" << dendl;
- assert(write_pos == queue_pos);
write_cond.Wait(write_lock);
dout(20) << "write_thread_entry woke up" << dendl;
continue;
}
- // do queued writes
+ // gather queued writes
+ off_t queue_pos = write_pos;
+ bufferlist bl;
+
while (!writeq.empty()) {
// grab next item
epoch_t epoch = writeq.front().first;
- bufferlist bl;
- bl.claim(writeq.front().second);
- writeq.pop_front();
- Context *oncommit = commitq.front();
- commitq.pop_front();
+ bufferlist &ebl = writeq.front().second;
+ off_t size = 2*sizeof(entry_header_t) + ebl.length();
- // wrap?
- if (write_pos == header.wrap) {
- dout(15) << "write_thread_entry wrapped write_pos at " << write_pos << " to " << sizeof(header_t) << dendl;
- assert(header.wrap == write_pos);
- write_header();
- write_pos = sizeof(header_t);
+ // epoch boundary?
+ if (epoch > header.last_epoch()) {
+ dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl;
+ header.push(epoch, queue_pos);
}
- // write!
- dout(15) << "write_thread_entry writing " << write_pos << " : "
- << bl.length()
- << " epoch " << epoch
- << dendl;
+ // does it fit?
+ if (header.wrap) {
+ // we're wrapped. don't overwrite ourselves.
+ if (queue_pos + size >= header.offset[0]) {
+ if (queue_pos != write_pos) break; // do what we have, first
+ dout(10) << "JOURNAL FULL (and wrapped), " << queue_pos << "+" << size
+ << " >= " << header.offset[0]
+ << dendl;
+ full = true;
+ writeq.clear();
+ print_header();
+ break;
+ }
+ } else {
+ // we haven't wrapped.
+ if (queue_pos + size >= header.max_size) {
+ if (queue_pos != write_pos) break; // do what we have, first
+ // is there room if we wrap?
+ if ((off_t)sizeof(header_t) + size < header.offset[0]) {
+ // yes!
+ dout(10) << "wrapped from " << queue_pos << " to " << sizeof(header_t) << dendl;
+ header.wrap = queue_pos;
+ queue_pos = sizeof(header_t);
+ header.push(ebofs->get_super_epoch(), queue_pos);
+ write_header();
+ } else {
+ // no room.
+ dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size
+ << " >= " << header.max_size
+ << dendl;
+ full = true;
+ writeq.clear();
+ break;
+ }
+ }
+ }
+
+ // add to write buffer
+ dout(15) << "write_thread_entry will write " << queue_pos << " : "
+ << ebl.length()
+ << " epoch " << epoch
+ << dendl;
- // write entry header
+ // add it this entry
entry_header_t h;
h.epoch = epoch;
- h.len = bl.length();
+ h.len = ebl.length();
h.make_magic(write_pos, header.fsid);
+ bl.append((const char*)&h, sizeof(h));
+ bl.claim_append(ebl);
+ bl.append((const char*)&h, sizeof(h));
+
+ Context *oncommit = commitq.front();
+ if (oncommit)
+ writingq.push_back(oncommit);
+
+ // pop from writeq
+ writeq.pop_front();
+ commitq.pop_front();
+ break;
+ }
- ::lseek(fd, write_pos, SEEK_SET);
- ::write(fd, &h, sizeof(h));
+ // write anything?
+ if (bl.length() > 0) {
+ writing = true;
+ write_lock.Unlock();
+ dout(15) << "write_thread_entry writing " << write_pos << "~" << bl.length() << dendl;
+ ::lseek(fd, write_pos, SEEK_SET);
for (list<bufferptr>::const_iterator it = bl.buffers().begin();
it != bl.buffers().end();
it++) {
if ((*it).length() == 0) continue; // blank buffer.
::write(fd, (char*)(*it).c_str(), (*it).length() );
}
-
- ::write(fd, &h, sizeof(h));
- // move position pointer
- write_pos += 2*sizeof(entry_header_t) + bl.length();
-
- if (oncommit) {
- if (1) {
- // queue callback
- ebofs->queue_finisher(oncommit);
- } else {
- // callback now
- oncommit->finish(0);
- delete oncommit;
- }
- }
+ write_lock.Lock();
+ writing = false;
+ write_pos = queue_pos;
+ ebofs->queue_finishers(writingq);
}
}
-
write_lock.Unlock();
dout(10) << "write_thread_entry finish" << dendl;
}
-bool FileJournal::submit_entry(bufferlist& e, Context *oncommit)
+bool FileJournal::is_full()
{
- assert(queue_pos != 0); // bad create(), or journal didn't replay to completion.
-
- // ** lock **
Mutex::Locker locker(write_lock);
+ return full;
+}
- // wrap? full?
- off_t size = 2*sizeof(entry_header_t) + e.length();
-
- if (full) return false; // already marked full.
+void FileJournal::submit_entry(bufferlist& e, Context *oncommit)
+{
+ Mutex::Locker locker(write_lock); // ** lock **
- if (header.wrap) {
- // we're wrapped. don't overwrite ourselves.
- if (queue_pos + size >= header.offset[0]) {
- dout(10) << "submit_entry JOURNAL FULL (and wrapped), " << queue_pos << "+" << size
- << " >= " << header.offset[0]
- << dendl;
- full = true;
- print_header();
- return false;
- }
- } else {
- // we haven't wrapped.
- if (queue_pos + size >= header.max_size) {
- // is there room if we wrap?
- if ((off_t)sizeof(header_t) + size < header.offset[0]) {
- // yes!
- dout(10) << "submit_entry wrapped from " << queue_pos << " to " << sizeof(header_t) << dendl;
- header.wrap = queue_pos;
- queue_pos = sizeof(header_t);
- header.push(ebofs->get_super_epoch(), queue_pos);
- } else {
- // no room.
- dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size
- << " >= " << header.max_size
- << dendl;
- full = true;
- return false;
- }
- }
- }
-
- dout(10) << "submit_entry " << queue_pos << " : " << e.length()
+ // dump on queue
+ dout(10) << "submit_entry " << e.length()
<< " epoch " << ebofs->get_super_epoch()
<< " " << oncommit << dendl;
-
- // dump on queue
- writeq.push_back(pair<epoch_t,bufferlist>(ebofs->get_super_epoch(), e));
commitq.push_back(oncommit);
-
- queue_pos += size;
-
- // kick writer thread
- write_cond.Signal();
-
- return true;
+ if (!full) {
+ writeq.push_back(pair<epoch_t,bufferlist>(ebofs->get_super_epoch(), e));
+ write_cond.Signal(); // kick writer thread
+ }
}
dout(1) << " clearing FULL flag, journal now usable" << dendl;
full = false;
}
-
- // note epoch boundary
- header.push(ebofs->get_super_epoch(), queue_pos); // note: these entries may not yet be written.
- //write_header(); // no need to write it now, though...
}
void FileJournal::commit_epoch_finish()
{
dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << dendl;
- write_lock.Lock();
- {
- if (full) {
- // full journal damage control.
- dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << dendl;
- header.clear();
- write_pos = queue_pos = sizeof(header_t);
- } else {
- // update header -- trim/discard old (committed) epochs
- while (header.epoch[0] < ebofs->get_super_epoch())
- header.pop();
- }
- write_header();
-
- // discard any unwritten items in previous epoch, and do callbacks
- epoch_t epoch = ebofs->get_super_epoch();
- list<Context*> callbacks;
- while (!writeq.empty() && writeq.front().first < epoch) {
- dout(15) << " dropping unwritten and committed "
- << write_pos << " : " << writeq.front().second.length()
- << " epoch " << writeq.front().first
- << dendl;
- // finisher?
- Context *oncommit = commitq.front();
- if (oncommit) callbacks.push_back(oncommit);
-
- write_pos += 2*sizeof(entry_header_t) + writeq.front().second.length();
-
- // discard.
- writeq.pop_front();
- commitq.pop_front();
- }
-
- // queue the finishers
- ebofs->queue_finishers(callbacks);
- }
- write_lock.Unlock();
-
+ Mutex::Locker locker(write_lock);
+
+ if (full) {
+ // full journal damage control.
+ dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << dendl;
+ header.clear();
+ write_pos = sizeof(header_t);
+ } else {
+ // update header -- trim/discard old (committed) epochs
+ while (header.num && header.epoch[0] < ebofs->get_super_epoch())
+ header.pop();
+ }
+ write_header();
+
+ // discard any unwritten items in previous epoch
+ epoch_t epoch = ebofs->get_super_epoch();
+ while (!writeq.empty() && writeq.front().first < epoch) {
+ dout(15) << " dropping unwritten and committed "
+ << write_pos << " : " << writeq.front().second.length()
+ << " epoch " << writeq.front().first
+ << dendl;
+ // finisher?
+ Context *oncommit = commitq.front();
+ if (oncommit) writingq.push_back(oncommit);
+
+ // discard.
+ writeq.pop_front();
+ commitq.pop_front();
+ }
+
+ // queue the finishers
+ ebofs->queue_finishers(writingq);
}
void FileJournal::make_writeable()
{
if (read_pos)
- write_pos = queue_pos = read_pos;
+ write_pos = read_pos;
else
- write_pos = queue_pos = sizeof(header_t);
+ write_pos = sizeof(header_t);
read_pos = 0;
}
offset[num] = o;
num++;
}
+ epoch_t last_epoch() {
+ return epoch[num-1];
+ }
} header;
struct entry_header_t {
private:
string fn;
- bool full;
+ bool full, writing;
off_t write_pos; // byte where next entry written goes
- off_t queue_pos; // byte where next entry queued for write goes
-
off_t read_pos; //
int fd;
- list<pair<epoch_t,bufferlist> > writeq; // currently journaling
- list<Context*> commitq; // currently journaling
+ // to be journaled
+ list<pair<epoch_t,bufferlist> > writeq;
+ list<Context*> commitq;
+
+ // being journaled
+ list<Context*> writingq;
// write thread
Mutex write_lock;
} write_thread;
public:
- FileJournal(Ebofs *e, char *f) :
+ FileJournal(Ebofs *e, const char *f) :
Journal(e), fn(f),
- full(false),
- write_pos(0), queue_pos(0), read_pos(0),
+ full(false), writing(false),
+ write_pos(0), read_pos(0),
fd(0),
write_stop(false), write_thread(this) { }
~FileJournal() {}
void make_writeable();
// writes
- bool submit_entry(bufferlist& e, Context *oncommit); // submit an item
+ void submit_entry(bufferlist& e, Context *oncommit); // submit an item
void commit_epoch_start(); // mark epoch boundary
void commit_epoch_finish(); // mark prior epoch as committed (we can expire)
bool read_entry(bufferlist& bl, epoch_t& e);
+ bool is_full();
+
// reads
};
// writes
virtual void make_writeable() = 0;
- virtual bool submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item
+ virtual void submit_entry(bufferlist& e, Context *oncommit) = 0;
virtual void commit_epoch_start() = 0; // mark epoch boundary
virtual void commit_epoch_finish() = 0; // mark prior epoch as committed (we can expire)
virtual bool read_entry(bufferlist& bl, epoch_t &e) = 0;
+ virtual bool is_full() = 0;
// reads/recovery
// allocation
void verify_extents() {
- if (1) { // do crazy stupid sanity checking
+ if (0) { // do crazy stupid sanity checking
block_t count = 0, pos = 0;
interval_set<block_t> is;
csum_t csum = 0;
*
*/
+#define dout(x) if (x <= g_conf.debug_ebofs) *_dout << dbeginl
+
#include <iostream>
#include "ebofs/Ebofs.h"
C_Commit(off_t o) : off(o) {}
void finish(int r) {
utime_t now = g_clock.now();
- cout << off << "\t"
+ dout(0) << off << "\t"
<< (writes[off].second-writes[off].first) << "\t"
- << (now - writes[off].first) << std::endl;
+ << (now - writes[off].first) << dendl;
writes.erase(off);
}
};
parse_config_options(args);
// args
- if (args.size() != 3) return -1;
+ if (args.size() < 3) return -1;
const char *filename = args[0];
int seconds = atoi(args[1]);
int bytes = atoi(args[2]);
+ const char *journal = 0;
+ if (args.size() >= 4)
+ journal = args[3];
buffer::ptr bp(bytes);
bp.zero();
cout << "#dev " << filename
<< seconds << " seconds, " << bytes << " bytes per write" << std::endl;
- Ebofs fs(filename);
+ Ebofs fs(filename, journal);
if (fs.mkfs() < 0) {
cout << "mkfs failed" << std::endl;
return -1;