ebofs/BlockDevice.o\
ebofs/BufferCache.o\
ebofs/Ebofs.o\
- ebofs/Allocator.o
+ ebofs/Allocator.o\
+ ebofs/FileJournal.o
MDS_OBJS= \
mds/MDS.o\
#include "Ebofs.h"
+#include "FileJournal.h"
+
#include <errno.h>
#ifndef DARWIN
ebofs_lock.Lock();
assert(!mounted);
+ // open dev
int r = dev.open(&idle_kicker);
if (r < 0) {
ebofs_lock.Unlock();
dout(3) << "mount epoch " << super_epoch << endl;
assert(super_epoch == sb->epoch);
+ super_fsid = sb->fsid;
+
free_blocks = sb->free_blocks;
limbo_blocks = sb->limbo_blocks;
allocator.release_limbo();
+
+ // open journal?
+ if (journalfn) {
+ journal = new FileJournal(this, journalfn);
+ if (journal->open() < 0) {
+ dout(-3) << "mount journal " << journalfn << " open failed" << endl;
+ delete journal;
+ journal = 0;
+ } else {
+ dout(-3) << "mount journal " << journalfn << " opened, replaying" << endl;
+
+ while (1) {
+ bufferlist bl;
+ epoch_t e;
+ if (!journal->read_entry(bl, e)) {
+ dout(-3) << "mount replay: end of journal, done." << endl;
+ break;
+ }
+
+ if (e < super_epoch) {
+ dout(-3) << "mount replay: skipping old entry in epoch " << e << " < " << super_epoch << endl;
+ }
+ if (e == super_epoch+1) {
+ super_epoch++;
+ dout(-3) << "mount replay: jumped to next epoch " << super_epoch << endl;
+ }
+ assert(e == super_epoch);
+
+ dout(-3) << "mount replay: applying transaction in epoch " << e << endl;
+ Transaction t;
+ int off = 0;
+ t._decode(bl, off);
+ _apply_transaction(t);
+ }
+ }
+ }
+
dout(3) << "mount starting commit+finisher threads" << endl;
commit_thread.create();
finisher_thread.create();
dout(1) << "mounted " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl;
mounted = true;
+
ebofs_lock.Unlock();
return 0;
}
block_t num_blocks = dev.get_num_blocks();
+ // make a super-random fsid
+ srand(time(0) ^ getpid());
+ super_fsid = (lrand48() << 32) ^ mrand48();
+
free_blocks = 0;
limbo_blocks = 0;
dev.close();
+
+ // create journal?
+ if (journalfn) {
+ journal = new FileJournal(this, journalfn);
+ if (journal->create() < 0) {
+ dout(3) << "mount journal " << journalfn << " created failed" << endl;
+ delete journal;
+ } else {
+ dout(3) << "mount journal " << journalfn << " created" << endl;
+ }
+ }
+
dout(2) << "mkfs: " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl;
ebofs_lock.Unlock();
return 0;
// fill in super
memset(&sb, 0, sizeof(sb));
sb.s_magic = EBOFS_MAGIC;
+ sb.fsid = super_fsid;
sb.epoch = epoch;
sb.num_blocks = dev.get_num_blocks();
<< ", max dirty " << g_conf.ebofs_bc_max_dirty
<< endl;
+ if (journal) journal->commit_epoch_start();
// (async) write onodes+condes (do this first; it currently involves inode reallocation)
commit_inodes_start();
alloc_more_node_space();
}
+ // signal journal
+ if (journal) journal->commit_epoch_finish();
+
// kick waiters
dout(10) << "commit_thread queueing commit + kicking sync waiters" << endl;
- finisher_lock.Lock();
- finisher_queue.splice(finisher_queue.end(), commit_waiters[super_epoch-1]);
+ queue_finishers(commit_waiters[super_epoch-1]);
commit_waiters.erase(super_epoch-1);
- finisher_cond.Signal();
- finisher_lock.Unlock();
sync_cond.Signal();
ebofs_lock.Lock();
if (onsafe) {
dirty = true;
- commit_waiters[super_epoch].push_back(onsafe);
+
+ while (1) {
+ if (journal) {
+ // journal empty transaction
+ Transaction t;
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
}
ebofs_lock.Unlock();
}
ebofs_lock.Lock();
dout(7) << "apply_transaction start (" << t.ops.size() << " ops)" << endl;
+ unsigned r = _apply_transaction(t);
+
+ // journal, wait for commit
+ if (r != 0 && onsafe) {
+ delete onsafe; // kill callback, but still journal below (in case transaction had side effects)
+ onsafe = 0;
+ }
+ while (1) {
+ if (journal) {
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
+
+ ebofs_lock.Unlock();
+ return r;
+}
+
+unsigned Ebofs::_apply_transaction(Transaction& t)
+{
// do ops
unsigned r = 0; // bit fields indicate which ops failed.
int bit = 1;
case Transaction::OP_GETATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
pair<void*,int*> pattrval = t.pattrvals.front(); t.pattrvals.pop_front();
if ((*(pattrval.second) = _getattr(oid, attrname, pattrval.first, *(pattrval.second))) < 0) {
dout(7) << "apply_transaction fail on _getattr" << endl;
case Transaction::OP_SETATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
//pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
bufferlist bl;
bl.claim( t.attrbls.front() );
case Transaction::OP_RMATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
if (_rmattr(oid, attrname) < 0) {
dout(7) << "apply_transaction fail on _rmattr" << endl;
r &= bit;
case Transaction::OP_COLL_SETATTR:
{
coll_t cid = t.cids.front(); t.cids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
//pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
bufferlist bl;
bl.claim( t.attrbls.front() );
case Transaction::OP_COLL_RMATTR:
{
coll_t cid = t.cids.front(); t.cids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
if (_collection_rmattr(cid, attrname) < 0) {
dout(7) << "apply_transaction fail on _collection_rmattr" << endl;
r &= bit;
bit = bit << 1;
}
- dout(7) << "apply_transaction finish (r = " << r << ")" << endl;
-
- // set up commit waiter
- //if (r == 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- //} else {
- //if (onsafe) delete onsafe;
- //}
-
- ebofs_lock.Unlock();
+ dout(7) << "_apply_transaction finish (r = " << r << ")" << endl;
return r;
}
}
-/*int Ebofs::write(object_t oid,
- off_t off, size_t len,
- bufferlist& bl, bool fsync)
-{
- // wait?
- if (fsync) {
- // wait for flush.
- Cond cond;
- bool done;
- int flush = 1; // write never returns positive
- Context *c = new C_Cond(&cond, &done, &flush);
- int r = write(oid, off, len, bl, c);
- if (r < 0) return r;
-
- ebofs_lock.Lock();
- {
- while (!done)
- cond.Wait(ebofs_lock);
- assert(flush <= 0);
- }
- ebofs_lock.Unlock();
- if (flush < 0) return flush;
- return r;
- } else {
- // don't wait for flush.
- return write(oid, off, len, bl, (Context*)0);
- }
-}
-*/
-
int Ebofs::write(object_t oid,
off_t off, size_t len,
bufferlist& bl, Context *onsafe)
// commit waiter
if (r > 0) {
assert((size_t)r == len);
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.write(oid, off, len, bl);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
// do it
int r = _remove(oid);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.remove(oid);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _truncate(oid, size);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.truncate(oid, size);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _clone(from, to);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.clone(from, to);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
ebofs_lock.Lock();
int r = _setattr(oid, name, value, size);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.setattr(oid, name, value, size);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
ebofs_lock.Lock();
int r = _setattrs(oid, attrset);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.setattrs(oid, attrset);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _rmattr(oid, name);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.rmattr(oid, name);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _create_collection(cid);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.create_collection(cid);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _destroy_collection(cid);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.remove_collection(cid);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _collection_add(cid, oid);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.collection_add(cid, oid);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _collection_remove(cid, oid);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.collection_remove(cid, oid);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _collection_setattr(cid, name, value, size);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.collection_setattr(cid, name, value, size);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _collection_rmattr(cid, name);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.collection_rmattr(cid, name);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
#include "nodes.h"
#include "Allocator.h"
#include "Table.h"
+#include "Journal.h"
#include "common/Mutex.h"
#include "common/Cond.h"
class Ebofs : public ObjectStore {
- protected:
+protected:
Mutex ebofs_lock; // a beautiful global lock
// ** debuggy **
bool fake_writes;
// ** super **
+public:
BlockDevice dev;
+protected:
bool mounted, unmounting, dirty;
bool readonly;
version_t super_epoch;
bool commit_thread_started, mid_commit;
Cond commit_cond; // to wake up the commit thread
Cond sync_cond;
+ uint64_t super_fsid;
map<version_t, list<Context*> > commit_waiters;
}
} commit_thread;
-
+public:
+ uint64_t get_fsid() { return super_fsid; }
+ epoch_t get_super_epoch() { return super_epoch; }
+protected:
+ // ** journal **
+ char *journalfn;
+ Journal *journal;
+
// ** allocator **
block_t free_blocks, limbo_blocks;
Allocator allocator;
bool finisher_stop;
list<Context*> finisher_queue;
+public:
+ void queue_finisher(Context *c) {
+ finisher_lock.Lock();
+ finisher_queue.push_back(c);
+ finisher_cond.Signal();
+ finisher_lock.Unlock();
+ }
+ void queue_finishers(list<Context*>& ls) {
+ finisher_lock.Lock();
+ finisher_queue.splice(finisher_queue.end(), ls);
+ finisher_cond.Signal();
+ finisher_lock.Unlock();
+ }
+protected:
+
void *finisher_thread_entry();
class FinisherThread : public Thread {
Ebofs *ebofs;
public:
- Ebofs(char *devfn) :
+ Ebofs(char *devfn, char *jfn=0) :
fake_writes(false),
dev(devfn),
mounted(false), unmounting(false), dirty(false), readonly(false),
super_epoch(0), commit_thread_started(false), mid_commit(false),
commit_thread(this),
+ journalfn(jfn), journal(0),
free_blocks(0), limbo_blocks(0),
allocator(this),
nodepool(ebofs_lock),
finisher_stop(false), finisher_thread(this) {
for (int i=0; i<EBOFS_NUM_FREE_BUCKETS; i++)
free_tab[i] = 0;
+ if (!journalfn) {
+ journalfn = new char[strlen(devfn) + 100];
+ strcpy(journalfn, devfn);
+ strcat(journalfn, ".journal");
+ }
}
~Ebofs() {
}
private:
// private interface -- use if caller already holds lock
+ unsigned _apply_transaction(Transaction& t);
+
int _read(object_t oid, off_t off, size_t len, bufferlist& bl);
int _is_cached(object_t oid, off_t off, size_t len);
int _stat(object_t oid, struct stat *st);
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
#include "FileJournal.h"
#include "Ebofs.h"
-#include "config.h"
-#define dout(x) if (x <= g_conf.debug_ebofs) cout << "ebofs(" << dev.get_device_name() << ").journal "
-#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << dev.get_device_name() << ").journal "
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include "config.h"
+#undef dout
+#define dout(x) if (true || x <= g_conf.debug_ebofs) cout << "ebofs(" << ebofs->dev.get_device_name() << ").journal "
+#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << ebofs->dev.get_device_name() << ").journal "
-void FileJournal::create()
+int FileJournal::create()
{
dout(1) << "create " << fn << endl;
// open/create
- fd = ::open(fn.c_str(), O_CREAT|O_WRONLY);
+ fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
+ if (fd < 0) {
+ dout(1) << "create failed " << errno << " " << strerror(errno) << endl;
+ return -errno;
+ }
assert(fd > 0);
- ::ftruncate(fd);
- ::fchmod(fd, 0644);
+ //::ftruncate(fd, 0);
+ //::fchmod(fd, 0644);
+
+ // get size
+ struct stat st;
+ ::fstat(fd, &st);
+ dout(1) << "open " << fn << " " << st.st_size << " bytes" << endl;
+
+ // write empty header
+ header.clear();
+ header.fsid = ebofs->get_fsid();
+ header.max_size = st.st_size;
+ write_header();
+
+ // writeable.
+ read_pos = 0;
+ write_pos = queue_pos = sizeof(header);
::close(fd);
-}
+ return 0;
+}
-void FileJournal::open()
+int FileJournal::open()
{
- dout(1) << "open " << fn << endl;
+ //dout(1) << "open " << fn << endl;
// open and file
assert(fd == 0);
- fd = ::open(fn.c_str(), O_RDWR);
+ fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
+ if (fd < 0) {
+ dout(1) << "open failed " << errno << " " << strerror(errno) << endl;
+ return -errno;
+ }
assert(fd > 0);
- // read header?
- // ***
+ // assume writeable, unless...
+ read_pos = 0;
+ write_pos = queue_pos = sizeof(header);
+ // read header?
+ read_header();
+ if (header.num > 0 && header.fsid == ebofs->get_fsid()) {
+ // valid header, pick an offset
+ for (int i=0; i<header.num; i++) {
+ if (header.epoch[i] == ebofs->get_super_epoch()) {
+ dout(2) << "using read_pos header pointer "
+ << header.epoch[i] << " at " << header.offset[i]
+ << endl;
+ read_pos = header.offset[i];
+ write_pos = queue_pos = 0;
+ break;
+ }
+ else if (header.epoch[i] < ebofs->get_super_epoch()) {
+ dout(2) << "super_epoch is " << ebofs->get_super_epoch()
+ << ", skipping old " << header.epoch[i] << " at " << header.offset[i]
+ << endl;
+ }
+ else if (header.epoch[i] > ebofs->get_super_epoch()) {
+ dout(2) << "super_epoch is " << ebofs->get_super_epoch()
+ << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i]
+ << endl;
+ break;
+ }
+ }
+ }
start_writer();
+
+ return 0;
}
void FileJournal::close()
stop_writer();
// close
- assert(q.empty());
+ assert(writeq.empty());
+ assert(commitq.empty());
assert(fd > 0);
::close(fd);
fd = 0;
}
+void FileJournal::print_header()
+{
+ for (int i=0; i<header.num; i++) {
+ if (i && header.offset[i] < header.offset[i-1]) {
+ assert(header.wrap);
+ dout(10) << "header: wrap at " << header.wrap << endl;
+ }
+ dout(10) << "header: epoch " << header.epoch[i] << " at " << header.offset[i] << endl;
+ }
+ //if (header.wrap) dout(10) << "header: wrap at " << header.wrap << endl;
+}
+void FileJournal::read_header()
+{
+ dout(10) << "read_header" << endl;
+ memset(&header, 0, sizeof(header)); // zero out (read may fail)
+ ::lseek(fd, 0, SEEK_SET);
+ int r = ::read(fd, &header, sizeof(header));
+ if (r < 0)
+ dout(0) << "read_header error " << errno << " " << strerror(errno) << endl;
+ print_header();
+}
void FileJournal::write_header()
{
- dout(10) << "write_header" << endl;
-
+ dout(10) << "write_header " << endl;
+ print_header();
+
::lseek(fd, 0, SEEK_SET);
- ::write(fd, &header, sizeof(header));
+ int r = ::write(fd, &header, sizeof(header));
+ if (r < 0)
+ dout(0) << "write_header error " << errno << " " << strerror(errno) << endl;
}
if (writeq.empty()) {
// sleep
dout(20) << "write_thread_entry going to sleep" << endl;
+ assert(write_pos == queue_pos);
write_cond.Wait(write_lock);
dout(20) << "write_thread_entry woke up" << endl;
continue;
// do queued writes
while (!writeq.empty()) {
// grab next item
- epoch_t e = writeq.front().first;
+ epoch_t epoch = writeq.front().first;
bufferlist bl;
bl.claim(writeq.front().second);
writeq.pop_front();
Context *oncommit = commitq.front();
commitq.pop_front();
- dout(15) << "write_thread_entry writing " << bottom << " : "
+ // wrap?
+ if (write_pos == header.wrap) {
+ dout(15) << "write_thread_entry wrapped write_pos at " << write_pos << " to " << sizeof(header_t) << endl;
+ assert(header.wrap == write_pos);
+ write_header();
+ write_pos = sizeof(header_t);
+ }
+
+ // write!
+ dout(15) << "write_thread_entry writing " << write_pos << " : "
<< bl.length()
- << " epoch " << e
+ << " epoch " << epoch
<< endl;
- // write epoch, len, data.
- ::fseek(fd, bottom, SEEK_SET);
- ::write(fd, &e, sizeof(e));
-
- uint32_t len = bl.length();
- ::write(fd, &len, sizeof(len));
+ // write entry header
+ entry_header_t h;
+ h.epoch = epoch;
+ h.len = bl.length();
+ h.make_magic(write_pos, header.fsid);
+
+ ::lseek(fd, write_pos, SEEK_SET);
+ ::write(fd, &h, sizeof(h));
for (list<bufferptr>::const_iterator it = bl.buffers().begin();
it != bl.buffers().end();
if ((*it).length() == 0) continue; // blank buffer.
::write(fd, (char*)(*it).c_str(), (*it).length() );
}
+
+ ::write(fd, &h, sizeof(h));
// move position pointer
- bottom += sizeof(epoch_t) + sizeof(uint32_t) + e.length();
+ write_pos += 2*sizeof(entry_header_t) + bl.length();
- // do commit callback
if (oncommit) {
- oncommit->finish(0);
- delete oncommit;
+ if (1) {
+ // queue callback
+ ebofs->queue_finisher(oncommit);
+ } else {
+ // callback now
+ oncommit->finish(0);
+ delete oncommit;
+ }
}
}
}
dout(10) << "write_thread_entry finish" << endl;
}
-void FileJournal::submit_entry(bufferlist& e, Context *oncommit)
+bool FileJournal::submit_entry(bufferlist& e, Context *oncommit)
{
- dout(10) << "submit_entry " << bottom << " : " << e.length()
- << " epoch " << ebofs->super_epoch
+ assert(queue_pos != 0); // bad create(), or journal didn't replay to completion.
+
+ // ** lock **
+ Mutex::Locker locker(write_lock);
+
+ // wrap? full?
+ off_t size = 2*sizeof(entry_header_t) + e.length();
+
+ if (full) return false; // already marked full.
+
+ if (header.wrap) {
+ // we're wrapped. don't overwrite ourselves.
+ if (queue_pos + size >= header.offset[0]) {
+ dout(10) << "submit_entry JOURNAL FULL (and wrapped), " << queue_pos << "+" << size
+ << " >= " << header.offset[0]
+ << endl;
+ full = true;
+ print_header();
+ return false;
+ }
+ } else {
+ // we haven't wrapped.
+ if (queue_pos + size >= header.max_size) {
+ // is there room if we wrap?
+ if ((off_t)sizeof(header_t) + size < header.offset[0]) {
+ // yes!
+ dout(10) << "submit_entry wrapped from " << queue_pos << " to " << sizeof(header_t) << endl;
+ header.wrap = queue_pos;
+ queue_pos = sizeof(header_t);
+ header.push(ebofs->get_super_epoch(), queue_pos);
+ } else {
+ // no room.
+ dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size
+ << " >= " << header.max_size
+ << endl;
+ full = true;
+ return false;
+ }
+ }
+ }
+
+ dout(10) << "submit_entry " << queue_pos << " : " << e.length()
+ << " epoch " << ebofs->get_super_epoch()
<< " " << oncommit << endl;
// dump on queue
- writeq.push_back(pair<epoch_t,bufferlist>(ebofs->super_epoch, e));
+ writeq.push_back(pair<epoch_t,bufferlist>(ebofs->get_super_epoch(), e));
commitq.push_back(oncommit);
-
+
+ queue_pos += size;
+
// kick writer thread
write_cond.Signal();
+
+ return true;
}
void FileJournal::commit_epoch_start()
{
- dout(10) << "commit_epoch_start" << endl;
+ dout(10) << "commit_epoch_start on " << ebofs->get_super_epoch()-1
+ << " -- new epoch " << ebofs->get_super_epoch()
+ << endl;
- write_lock.Lock();
- {
- header.epoch2 = ebofs->super_epoch;
- header.top2 = bottom;
- write_header();
- }
- write_lock.Unlock();
+ Mutex::Locker locker(write_lock);
+
+ // was full -> empty -> now usable?
+ if (full) {
+ if (header.num != 0) {
+ dout(1) << " journal FULL, ignoring this epoch" << endl;
+ return;
+ }
+
+ dout(1) << " clearing FULL flag, journal now usable" << endl;
+ full = false;
+ }
+
+ // note epoch boundary
+ header.push(ebofs->get_super_epoch(), queue_pos); // note: these entries may not yet be written.
+ //write_header(); // no need to write it now, though...
}
void FileJournal::commit_epoch_finish()
{
- dout(10) << "commit_epoch_finish" << endl;
+ dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << endl;
write_lock.Lock();
{
- // update header
- header.epoch1 = ebofs->super_epoch;
- header.top1 = header.top2;
- header.epoch2 = 0;
- header.top2 = 0;
+ if (full) {
+ // full journal damage control.
+ dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << endl;
+ header.clear();
+ write_pos = queue_pos = sizeof(header_t);
+ } else {
+ // update header -- trim/discard old (committed) epochs
+ while (header.epoch[0] < ebofs->get_super_epoch())
+ header.pop();
+ }
write_header();
- // flush any unwritten items in previous epoch
- while (!writeq.empty() &&
- writeq.front().first < ebofs->super_epoch) {
- dout(15) << " dropping uncommitted journal item from prior epoch" << endl;
- writeq.pop_front();
+ // discard any unwritten items in previous epoch, and do callbacks
+ epoch_t epoch = ebofs->get_super_epoch();
+ list<Context*> callbacks;
+ while (!writeq.empty() && writeq.front().first < epoch) {
+ dout(15) << " dropping unwritten and committed "
+ << write_pos << " : " << writeq.front().second.length()
+ << " epoch " << writeq.front().first
+ << endl;
+ // finisher?
Context *oncommit = commitq.front();
+ if (oncommit) callbacks.push_back(oncommit);
+
+ write_pos += 2*sizeof(entry_header_t) + writeq.front().second.length();
+
+ // discard.
+ writeq.pop_front();
commitq.pop_front();
-
- if (oncommit) {
- oncommit->finish(0);
- delete oncommit;
- }
}
+
+ // queue the finishers
+ ebofs->queue_finishers(callbacks);
}
write_lock.Unlock();
}
+
+
+void FileJournal::make_writeable()
+{
+ if (read_pos)
+ write_pos = queue_pos = read_pos;
+ else
+ write_pos = queue_pos = sizeof(header_t);
+ read_pos = 0;
+}
+
+
+bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch)
+{
+ if (!read_pos) {
+ dout(1) << "read_entry -- not readable" << endl;
+ make_writeable();
+ return false;
+ }
+
+ if (read_pos == header.wrap) {
+ // find wrap point
+ for (int i=1; i<header.num; i++) {
+ if (header.offset[i] < read_pos) {
+ assert(header.offset[i-1] < read_pos);
+ read_pos = header.offset[i];
+ break;
+ }
+ }
+ assert(read_pos != header.wrap);
+ dout(10) << "read_entry wrapped from " << header.wrap << " to " << read_pos << endl;
+ }
+
+ // header
+ entry_header_t h;
+ ::lseek(fd, read_pos, SEEK_SET);
+ ::read(fd, &h, sizeof(h));
+ if (!h.check_magic(read_pos, header.fsid)) {
+ dout(1) << "read_entry " << read_pos << " : bad header magic, end of journal" << endl;
+ make_writeable();
+ return false;
+ }
+
+ // body
+ bufferptr bp(h.len);
+ ::read(fd, bp.c_str(), h.len);
+
+ // footer
+ entry_header_t f;
+ ::read(fd, &f, sizeof(h));
+ if (!f.check_magic(read_pos, header.fsid) ||
+ h.epoch != f.epoch ||
+ h.len != f.len) {
+ dout(1) << "read_entry " << read_pos << " : bad footer magic, partially entry, end of journal" << endl;
+ make_writeable();
+ return false;
+ }
+
+
+ // yay!
+ dout(1) << "read_entry " << read_pos << " : "
+ << " " << h.len << " bytes"
+ << " epoch " << h.epoch
+ << endl;
+
+ bl.push_back(bp);
+ epoch = h.epoch;
+
+ read_pos += 2*sizeof(entry_header_t) + h.len;
+
+ return true;
+}
#include "Journal.h"
-
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/Thread.h"
class FileJournal : public Journal {
public:
+ /** log header
+ * we allow 3 pointers:
+ * top/initial,
+ * one for an epoch boundary,
+ * and one for a wrap in the ring buffer/journal file.
+ * the epoch boundary one is useful only for speedier recovery in certain cases
+ * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!)
+ */
struct header_t {
- epoch_t epoch1;
- off_t top1;
- epoch_t epoch2;
- off_t top2;
+ uint64_t fsid;
+ int num;
+ off_t wrap;
+ off_t max_size;
+ epoch_t epoch[3];
+ off_t offset[3];
+
+ header_t() : fsid(0), num(0), wrap(0), max_size(0) {}
+
+ void clear() {
+ num = 0;
+ wrap = 0;
+ }
+ void pop() {
+ if (num >= 2 && offset[0] > offset[1])
+ wrap = 0; // we're eliminating a wrap
+ num--;
+ for (int i=0; i<num; i++) {
+ epoch[i] = epoch[i+1];
+ offset[i] = offset[i+1];
+ }
+ }
+ void push(epoch_t e, off_t o) {
+ assert(num < 3);
+ epoch[num] = e;
+ offset[num] = o;
+ num++;
+ }
} header;
+ struct entry_header_t {
+ uint64_t epoch;
+ uint64_t len;
+ uint64_t magic1;
+ uint64_t magic2;
+
+ void make_magic(off_t pos, uint64_t fsid) {
+ magic1 = pos;
+ magic2 = fsid ^ epoch ^ len;
+ }
+ bool check_magic(off_t pos, uint64_t fsid) {
+ return
+ magic1 == (uint64_t)pos &&
+ magic2 == (fsid ^ epoch ^ len);
+ }
+ };
+
private:
string fn;
- off_t max_size;
- off_t top; // byte of first entry chronologically
- off_t bottom; // byte where next entry goes
- off_t committing_to; // offset of epoch boundary, if we are committing
+ bool full;
+ off_t write_pos; // byte where next entry written goes
+ off_t queue_pos; // byte where next entry queued for write goes
+
+ off_t read_pos; //
int fd;
Cond write_cond;
bool write_stop;
+ void print_header();
+ void read_header();
void write_header();
void start_writer();
void stop_writer();
void write_thread_entry();
+ void make_writeable();
+
class Writer : public Thread {
FileJournal *journal;
public:
Writer(FileJournal *fj) : journal(fj) {}
void *entry() {
- journal->write_thread();
+ journal->write_thread_entry();
return 0;
}
} write_thread;
public:
- FileJournal(Ebofs *e, char *f, off_t sz) :
- Journal(e),
- fn(f), max_size(sz),
- top(0), bottom(0), committing_to(0),
+ FileJournal(Ebofs *e, char *f) :
+ Journal(e), fn(f),
+ full(false),
+ write_pos(0), queue_pos(0), read_pos(0),
fd(0),
- write_stop(false), write_thread(this)
- { }
+ write_stop(false), write_thread(this) { }
~FileJournal() {}
- void create();
- void open();
+ int create();
+ int open();
void close();
// writes
- void submit_entry(bufferlist& e, Context *oncommit); // submit an item
- void commit_epoch_start(); // mark epoch boundary
- void commit_epoch_finish(); // mark prior epoch as committed (we can expire)
+ bool submit_entry(bufferlist& e, Context *oncommit); // submit an item
+ void commit_epoch_start(); // mark epoch boundary
+ void commit_epoch_finish(); // mark prior epoch as committed (we can expire)
+
+ bool read_entry(bufferlist& bl, epoch_t& e);
// reads
};
#ifndef __EBOFS_JOURNAL_H
#define __EBOFS_JOURNAL_H
+class Ebofs;
+
+#include "include/buffer.h"
+#include "include/Context.h"
class Journal {
+protected:
Ebofs *ebofs;
- public:
+public:
Journal(Ebofs *e) : ebofs(e) { }
virtual ~Journal() { }
- virtual void create() = 0;
- virtual void open() = 0;
+ virtual int create() = 0;
+ virtual int open() = 0;
virtual void close() = 0;
// writes
- virtual void submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item
+ virtual bool submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item
virtual void commit_epoch_start() = 0; // mark epoch boundary
- virtual void commit_epoch_finish(list<Context*>& ls) = 0; // mark prior epoch as committed (we can expire)
+ virtual void commit_epoch_finish() = 0; // mark prior epoch as committed (we can expire)
+ virtual bool read_entry(bufferlist& bl, epoch_t &e) = 0;
// reads/recovery
char *filename = args[0];
int seconds = atoi(args[1]);
int threads = atoi(args[2]);
+ if (!threads) threads = 1;
cout << "dev " << filename << " .. " << threads << " threads .. " << seconds << " seconds" << endl;
// explicit tests
- if (1) {
+ if (0) {
// verify that clone() plays nice with partial writes
object_t oid(1,1);
bufferptr bp(10000);
struct ebofs_super {
- unsigned s_magic;
-
- unsigned epoch; // version of this superblock.
+ uint64_t s_magic;
+ uint64_t fsid;
+
+ epoch_t epoch; // version of this superblock.
- unsigned num_blocks; /* # blocks in filesystem */
+ uint64_t num_blocks; /* # blocks in filesystem */
// some basic stats, for kicks
- unsigned free_blocks; /* unused blocks */
- unsigned limbo_blocks; /* limbo blocks */
+ uint64_t free_blocks; /* unused blocks */
+ uint64_t limbo_blocks; /* limbo blocks */
//unsigned num_objects;
//unsigned num_fragmented;
off += len+1;
}
+// const char* (encode only, string compatible)
+inline void _encode(const char *s, bufferlist& bl)
+{
+ uint32_t len = strlen(s);
+ _encoderaw(len, bl);
+ bl.append(s, len+1);
+}
+
// bufferptr (encapsulated)
inline void _encode(bufferptr& bp, bufferlist& bl)
{
switch (op->get_op()) {
case OSD_OP_WRLOCK:
{ // lock object
- //r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit);
t.setattr(oid, "wrlock", &op->get_client(), sizeof(entity_name_t));
}
break;
list<off_t> offsets;
list<size_t> lengths;
list<const char*> attrnames;
+ list<string> attrnames2;
//list< pair<const void*,int> > attrvals;
list<bufferlist> attrbls;
+ // for reads only (not encoded)
list<bufferlist*> pbls;
list<struct stat*> psts;
list< pair<void*,int*> > pattrvals;
list< map<string,bufferptr>* > pattrsets;
+ const char *get_attrname() {
+ if (attrnames.empty())
+ return attrnames2.front().c_str();
+ else
+ return attrnames.front();
+ }
+ void pop_attrname() {
+ if (attrnames.empty())
+ attrnames2.pop_front();
+ else
+ attrnames.pop_front();
+ }
+
void read(object_t oid, off_t off, size_t len, bufferlist *pbl) {
int op = OP_READ;
ops.push_back(op);
}
// etc.
+
+ void _encode(bufferlist& bl) {
+ ::_encode(ops, bl);
+ ::_encode(bls, bl);
+ ::_encode(oids, bl);
+ ::_encode(cids, bl);
+ ::_encode(offsets, bl);
+ ::_encode(lengths, bl);
+ ::_encode(attrnames, bl);
+ ::_encode(attrbls, bl);
+ }
+ void _decode(bufferlist& bl, int& off) {
+ ::_decode(ops, bl, off);
+ ::_decode(bls, bl, off);
+ ::_decode(oids, bl, off);
+ ::_decode(cids, bl, off);
+ ::_decode(offsets, bl, off);
+ ::_decode(lengths, bl, off);
+ ::_decode(attrnames2, bl, off);
+ ::_decode(attrbls, bl, off);
+ }
};
case Transaction::OP_GETATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
pair<void*,int*> pattrval = t.pattrvals.front(); t.pattrvals.pop_front();
*pattrval.second = getattr(oid, attrname, pattrval.first, *pattrval.second);
}
case Transaction::OP_SETATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
//pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
bufferlist bl;
bl.claim( t.attrbls.front() );
case Transaction::OP_RMATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
rmattr(oid, attrname, 0);
}
break;
case Transaction::OP_COLL_SETATTR:
{
coll_t cid = t.cids.front(); t.cids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
//pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
bufferlist bl;
bl.claim( t.attrbls.front() );
case Transaction::OP_COLL_RMATTR:
{
coll_t cid = t.cids.front(); t.cids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
collection_rmattr(cid, attrname, 0);
}
break;