header.alignment = block_size;
else
header.alignment = 16; // at least stay word aligned on 64bit machines...
+ header.start = get_top();
print_header();
buffer::ptr bp = prepare_header();
return 0;
}
-int FileJournal::open(epoch_t epoch)
+int FileJournal::open(__u64 next_seq)
{
- dout(2) << "open " << fn << dendl;
+ dout(2) << "open " << fn << " next_seq " << next_seq << dendl;
int err = _open(false);
if (err < 0) return err;
// read header?
read_header();
- dout(10) << "open journal header.fsid = " << header.fsid
+ dout(10) << "open header.fsid = " << header.fsid
//<< " vs expected fsid = " << fsid
<< dendl;
if (header.fsid != fsid) {
- dout(2) << "open journal fsid doesn't match, invalid (someone else's?) journal" << dendl;
+ dout(2) << "open fsid doesn't match, invalid (someone else's?) journal" << dendl;
err = -EINVAL;
}
if (header.max_size > max_size) {
// looks like a valid header.
write_pos = 0; // not writeable yet
- read_pos = 0;
- if (header.num > 0) {
- // pick an offset
- for (int i=0; i<header.num; i++) {
- if (header.epoch[i] == epoch) {
- dout(2) << "using read_pos header pointer "
- << header.epoch[i] << " at " << header.offset[i]
- << dendl;
- read_pos = header.offset[i];
- write_pos = 0;
- break;
- }
- else if (header.epoch[i] < epoch) {
- dout(2) << "super_epoch is " << epoch
- << ", skipping old " << header.epoch[i] << " at " << header.offset[i]
- << dendl;
- }
- else if (header.epoch[i] > epoch) {
- dout(2) << "super_epoch is " << epoch
- << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i]
- << dendl;
- break;
- }
+ // find next entry
+ read_pos = header.start;
+ while (1) {
+ bufferlist bl;
+ __u64 seq;
+ off64_t old_pos = read_pos;
+ if (!read_entry(bl, seq)) {
+ dout(10) << "open reached end of journal." << dendl;
+ break;
}
-
- if (read_pos == 0) {
- dout(0) << "no valid journal segments" << dendl;
- return 0; //hrm return -EINVAL;
+ if (seq > next_seq) {
+ dout(10) << "open entry " << seq << " len " << bl.length() << " > next_seq " << next_seq << dendl;
+ read_pos = -1;
+ return 0;
+ }
+ if (seq == next_seq) {
+ dout(10) << "open reached seq " << seq << dendl;
+ read_pos = old_pos;
+ break;
}
-
- } else {
- dout(0) << "journal was empty" << dendl;
- read_pos = -1;
}
return 0;
// close
assert(writeq.empty());
- assert(commitq.empty());
assert(fd > 0);
::close(fd);
fd = -1;
}
+
void FileJournal::print_header()
{
- for (int i=0; i<header.num; i++) {
- if (i && header.offset[i] < header.offset[i-1]) {
- assert(header.wrap);
- dout(10) << "header: wrap at " << header.wrap << dendl;
- }
- dout(10) << "header: epoch " << header.epoch[i] << " at " << header.offset[i] << dendl;
- }
- //if (header.wrap) dout(10) << "header: wrap at " << header.wrap << dendl;
+ dout(10) << "header: block_size " << header.block_size
+ << " alignment " << header.alignment
+ << " max_size " << header.max_size
+ << dendl;
+ dout(10) << "header: start " << header.start << " wrap " << header.wrap << dendl;
+ dout(10) << " write_pos " << write_pos << dendl;
}
void FileJournal::read_header()
-void FileJournal::check_for_wrap(epoch_t epoch, off64_t pos, off64_t size)
+bool FileJournal::check_for_wrap(__u64 seq, off64_t *pos, off64_t size, bool can_wrap)
{
- // epoch boundary?
- dout(10) << "check_for_wrap epoch " << epoch << " last " << header.last_epoch() << " of " << header.num << dendl;
- if (epoch > header.last_epoch()) {
- dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl;
- header.push(epoch, pos);
- must_write_header = true;
- }
+ dout(20) << "check_for_wrap seq " << seq
+ << " pos " << *pos << " size " << size
+ << " max_size " << header.max_size
+ << " wrap " << header.wrap
+ << dendl;
+
+ // already full?
+ if (full_commit_seq || full_restart_seq)
+ return false;
// does it fit?
if (header.wrap) {
// we're wrapped. don't overwrite ourselves.
- if (pos + size >= header.offset[0]) {
- dout(10) << "JOURNAL FULL (and wrapped), " << pos << "+" << size
- << " >= " << header.offset[0]
- << dendl;
- full = true;
- writeq.clear();
- print_header();
- }
+
+ if (*pos + size < header.start)
+ return true; // fits
+
+ dout(10) << "JOURNAL FULL (and wrapped), " << *pos << "+" << size
+ << " >= " << header.start
+ << dendl;
} else {
// we haven't wrapped.
- if (pos + size >= header.max_size) {
- // is there room if we wrap?
- if (get_top() + size < header.offset[0]) {
- // yes!
- dout(10) << "wrapped from " << pos << " to " << get_top() << dendl;
- header.wrap = pos;
- pos = get_top();
- header.push(epoch, pos);
- must_write_header = true;
- } else {
- // no room.
- dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << pos << "+" << size
- << " >= " << header.max_size
- << dendl;
- full = true;
- writeq.clear();
- }
+
+ if (*pos + size < header.max_size)
+ return true; // fits
+
+ if (!can_wrap)
+ return false; // can't wrap just now..
+
+ // is there room if we wrap?
+ if (get_top() + size < header.start) {
+ // yes!
+ dout(10) << " wrapping from " << *pos << " to " << get_top() << dendl;
+ header.wrap = *pos;
+ *pos = get_top();
+ must_write_header = true;
+ return true;
}
+
+ // no room.
+ dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << *pos << "+" << size
+ << " >= " << header.max_size
+ << dendl;
}
+
+ full_commit_seq = seq;
+ full_restart_seq = seq+1;
+ while (!writeq.empty()) {
+ writing_seq.push_back(writeq.front().seq);
+ writing_fin.push_back(writeq.front().fin);
+ writeq.pop_front();
+ }
+ print_header();
+ return false;
}
int eleft = g_conf.journal_max_write_entries;
int bleft = g_conf.journal_max_write_bytes;
+ if (full_commit_seq || full_restart_seq)
+ return;
+
while (!writeq.empty()) {
// grab next item
- epoch_t epoch = writeq.front().first;
- bufferlist &ebl = writeq.front().second;
+ __u64 seq = writeq.front().seq;
+ bufferlist &ebl = writeq.front().bl;
off64_t size = 2*sizeof(entry_header_t) + ebl.length();
- if (bl.length() > 0 && bleft > 0 && bleft < size) break;
-
- check_for_wrap(epoch, queue_pos, size);
- if (full) break;
- if (bl.length() && must_write_header)
+ bool can_wrap = !bl.length(); // only wrap if this is a new thinger
+ if (!check_for_wrap(seq, &queue_pos, size, can_wrap))
break;
+
+ // set write_pos? (check_for_wrap may have moved it)
+ if (!bl.length())
+ write_pos = queue_pos;
// add to write buffer
- dout(15) << "prepare_multi_write will write " << queue_pos << " : "
- << ebl.length() << " epoch " << epoch << " -> " << size << dendl;
+ dout(15) << "prepare_multi_write will write " << queue_pos << " : seq " << seq
+ << " len " << ebl.length() << " -> " << size
+ << " (left " << eleft << "/" << bleft << ")"
+ << dendl;
// add it this entry
entry_header_t h;
- h.epoch = epoch;
+ h.seq = seq;
h.len = ebl.length();
h.make_magic(queue_pos, header.fsid);
bl.append((const char*)&h, sizeof(h));
bl.claim_append(ebl);
bl.append((const char*)&h, sizeof(h));
- Context *oncommit = commitq.front();
- if (oncommit)
- writingq.push_back(oncommit);
-
+ if (writeq.front().fin) {
+ writing_seq.push_back(seq);
+ writing_fin.push_back(writeq.front().fin);
+ }
+
// pop from writeq
writeq.pop_front();
- commitq.pop_front();
+ journalq.push_back(pair<__u64,off64_t>(seq, queue_pos));
queue_pos += size;
- if (--eleft == 0) break;
- bleft -= size;
- if (bleft == 0) break;
+
+ // pad...
+ if (queue_pos % header.alignment) {
+ int pad = header.alignment - (queue_pos % header.alignment);
+ bufferptr bp(pad);
+ bl.push_back(bp);
+ queue_pos += pad;
+ //dout(20) << " padding with " << pad << " bytes, queue_pos now " << queue_pos << dendl;
+ }
+
+ if (eleft) {
+ if (--eleft == 0) {
+ dout(20) << " hit max events per write " << g_conf.journal_max_write_entries << dendl;
+ break;
+ }
+ }
+ if (bleft) {
+ bleft -= size;
+ if (bleft == 0) {
+ dout(20) << " hit max write size " << g_conf.journal_max_write_bytes << dendl;
+ break;
+ }
+ }
}
}
bool FileJournal::prepare_single_dio_write(bufferlist& bl)
{
// grab next item
- epoch_t epoch = writeq.front().first;
- bufferlist &ebl = writeq.front().second;
+ __u64 seq = writeq.front().seq;
+ bufferlist &ebl = writeq.front().bl;
off64_t size = 2*sizeof(entry_header_t) + ebl.length();
size = ROUND_UP_TO(size, header.alignment);
- check_for_wrap(epoch, write_pos, size);
- if (full) return false;
+ if (!check_for_wrap(seq, &write_pos, size, true))
+ return false;
+ if (full_commit_seq || full_restart_seq) return false;
// build it
- dout(15) << "prepare_single_dio_write will write " << write_pos << " : "
- << ebl.length() << " epoch " << epoch << " -> " << size << dendl;
+ dout(15) << "prepare_single_dio_write will write " << write_pos << " : seq " << seq
+ << " len " << ebl.length() << " -> " << size << dendl;
bufferptr bp = buffer::create_page_aligned(size);
entry_header_t *h = (entry_header_t*)bp.c_str();
- h->epoch = epoch;
+ h->seq = seq;
h->len = ebl.length();
h->make_magic(write_pos, header.fsid);
ebl.copy(0, ebl.length(), bp.c_str()+sizeof(*h));
memcpy(bp.c_str() + sizeof(*h) + ebl.length(), h, sizeof(*h));
bl.push_back(bp);
- Context *oncommit = commitq.front();
- if (oncommit)
- writingq.push_back(oncommit);
+ if (writeq.front().fin) {
+ writing_seq.push_back(seq);
+ writing_fin.push_back(writeq.front().fin);
+ }
// pop from writeq
writeq.pop_front();
- commitq.pop_front();
+ journalq.push_back(pair<__u64,off64_t>(seq, write_pos));
+
return true;
}
return;
buffer::ptr hbp;
- if (must_write_header)
+ if (must_write_header) {
+ must_write_header = false;
hbp = prepare_header();
+ }
writing = true;
write_lock.Unlock();
dout(15) << "do_write writing " << write_pos << "~" << bl.length()
- << (must_write_header ? " + header":"")
+ << (hbp.length() ? " + header":"")
<< dendl;
// header
write_lock.Lock();
writing = false;
- if (memcmp(&old_header, &header, sizeof(header)) == 0) {
- write_pos += bl.length();
- write_pos = ROUND_UP_TO(write_pos, header.alignment);
- finisher->queue(writingq);
+
+ write_pos += bl.length();
+ write_pos = ROUND_UP_TO(write_pos, header.alignment);
+
+ // kick finisher?
+ // only if we haven't filled up recently!
+ if (full_commit_seq || full_restart_seq) {
+ dout(10) << "do_write NOT queueing finisher seq " << writing_seq.front()
+ << ", full_commit_seq|full_restart_seq" << dendl;
} else {
- dout(10) << "do_write finished write but header changed? not moving write_pos." << dendl;
- derr(0) << "do_write finished write but header changed? not moving write_pos." << dendl;
- assert(writingq.empty());
+ dout(20) << "do_write doing finisher queue " << writing_fin << dendl;
+ writing_seq.clear();
+ finisher->queue(writing_fin);
}
}
}
bufferlist bl;
- must_write_header = false;
if (directio)
prepare_single_dio_write(bl);
else
}
-bool FileJournal::is_full()
-{
- Mutex::Locker locker(write_lock);
- return full;
-}
-
-void FileJournal::submit_entry(epoch_t epoch, bufferlist& e, Context *oncommit)
+void FileJournal::submit_entry(__u64 seq, bufferlist& e, Context *oncommit)
{
Mutex::Locker locker(write_lock); // ** lock **
// dump on queue
- dout(10) << "submit_entry " << e.length()
- << " epoch " << epoch
- << " " << oncommit << dendl;
- commitq.push_back(oncommit);
- if (!full) {
- writeq.push_back(pair<epoch_t,bufferlist>(epoch, e));
+ dout(10) << "submit_entry seq " << seq
+ << " len " << e.length()
+ << " (" << oncommit << ")" << dendl;
+
+ if (!full_commit_seq && full_restart_seq &&
+ seq >= full_restart_seq) {
+ dout(10) << " seq " << seq << " >= full_restart_seq " << full_restart_seq
+ << ", restarting journal" << dendl;
+ full_restart_seq = 0;
+ }
+ if (!full_commit_seq && !full_restart_seq) {
+ writeq.push_back(write_item(seq, e, oncommit));
write_cond.Signal(); // kick writer thread
+ } else {
+ // not journaling this. restart writing no sooner than seq + 1.
+ full_restart_seq = seq+1;
+ dout(10) << " journal is/was full, will restart no sooner than seq " << full_restart_seq << dendl;
+ writing_seq.push_back(seq);
+ writing_fin.push_back(oncommit);
}
}
-void FileJournal::commit_epoch_start(epoch_t new_epoch)
+void FileJournal::committed_thru(__u64 seq)
{
- dout(10) << "commit_epoch_start on " << new_epoch-1
- << " -- new epoch " << new_epoch
- << dendl;
+ dout(10) << "committed_thru " << seq << dendl;
Mutex::Locker locker(write_lock);
- // was full -> empty -> now usable?
- if (full) {
- if (header.num != 0) {
- dout(1) << " journal FULL, ignoring this epoch" << dendl;
- return;
- }
-
- dout(1) << " clearing FULL flag, journal now usable" << dendl;
- full = false;
- }
-}
-
-void FileJournal::commit_epoch_finish(epoch_t new_epoch)
-{
- dout(10) << "commit_epoch_finish committed " << (new_epoch-1) << dendl;
+ // was full?
+ if (full_commit_seq && seq >= full_commit_seq) {
+ dout(1) << " seq " << seq << " >= full_commit_seq " << full_commit_seq
+ << ", prior journal contents are now fully committed. resetting journal." << dendl;
+ full_commit_seq = 0;
+ }
- Mutex::Locker locker(write_lock);
-
- if (full) {
- // full journal damage control.
- dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << dendl;
- header.clear();
- write_pos = get_top();
- } else {
- // update header -- trim/discard old (committed) epochs
- print_header();
- while (header.num && header.epoch[0] < new_epoch) {
- dout(10) << " popping epoch " << header.epoch[0] << " < " << new_epoch << dendl;
- header.pop();
- }
- if (header.num == 0) {
- dout(10) << " starting fresh" << dendl;
- write_pos = get_top();
- header.push(new_epoch, write_pos);
+ // adjust start pointer
+ while (!journalq.empty() && journalq.front().first <= seq) {
+ if (journalq.front().second == get_top()) {
+ dout(10) << " committed event at " << journalq.front().second << ", clearing wrap marker" << dendl;
+ header.wrap = 0; // clear wrap marker
}
+ journalq.pop_front();
+ }
+ if (!journalq.empty()) {
+ header.start = journalq.front().second;
+ } else {
+ header.start = write_pos;
}
must_write_header = true;
+ print_header();
- // discard any unwritten items in previous epoch
- while (!writeq.empty() && writeq.front().first < new_epoch) {
- dout(15) << " dropping unwritten and committed "
- << write_pos << " : " << writeq.front().second.length()
- << " epoch " << writeq.front().first
+ // committed but writing
+ while (!writing_seq.empty() && writing_seq.front() < seq) {
+ dout(15) << " finishing committed but writing|waiting seq " << writing_seq.front() << dendl;
+ finisher->queue(writing_fin.front());
+ writing_seq.pop_front();
+ writing_fin.pop_front();
+ }
+
+ // committed but unjournaled items
+ while (!writeq.empty() && writeq.front().seq < seq) {
+ dout(15) << " dropping committed but unwritten seq " << writeq.front().seq
+ << " len " << writeq.front().bl.length()
+ << " (" << writeq.front().fin << ")"
<< dendl;
- // finisher?
- Context *oncommit = commitq.front();
- if (oncommit) writingq.push_back(oncommit);
-
- // discard.
+ if (writeq.front().fin)
+ finisher->queue(writeq.front().fin);
writeq.pop_front();
- commitq.pop_front();
}
- // queue the finishers
- finisher->queue(writingq);
- dout(10) << "commit_epoch_finish done" << dendl;
+ dout(10) << "committed_thru done" << dendl;
}
}
-bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch)
+bool FileJournal::read_entry(bufferlist& bl, __u64& seq)
{
if (!read_pos) {
dout(2) << "read_entry -- not readable" << dendl;
return false;
}
-
if (read_pos == header.wrap) {
- // find wrap point
- for (int i=1; i<header.num; i++) {
- if (header.offset[i] < read_pos) {
- assert(header.offset[i-1] < read_pos);
- read_pos = header.offset[i];
- break;
- }
- }
- assert(read_pos != header.wrap);
+ read_pos = get_top();
dout(10) << "read_entry wrapped from " << header.wrap << " to " << read_pos << dendl;
}
entry_header_t f;
::read(fd, &f, sizeof(h));
if (!f.check_magic(read_pos, header.fsid) ||
- h.epoch != f.epoch ||
+ h.seq != f.seq ||
h.len != f.len) {
dout(2) << "read_entry " << read_pos << " : bad footer magic, partial entry, end of journal" << dendl;
return false;
// yay!
- dout(1) << "read_entry " << read_pos << " : "
+ dout(1) << "read_entry " << read_pos << " : seq " << h.seq
<< " " << h.len << " bytes"
- << " epoch " << h.epoch
<< dendl;
-
+ bl.clear();
bl.push_back(bp);
- epoch = h.epoch;
+ seq = h.seq;
+
+ journalq.push_back(pair<__u64,off64_t>(h.seq, read_pos));
read_pos += 2*sizeof(entry_header_t) + h.len;
read_pos = ROUND_UP_TO(read_pos, header.alignment);
-
+
return true;
}
#ifndef __EBOFS_FILEJOURNAL_H
#define __EBOFS_FILEJOURNAL_H
+#include <deque>
+using std::deque;
#include "Journal.h"
#include "common/Cond.h"
class FileJournal : public Journal {
public:
- /** log header
- * we allow 4 pointers:
- * top/initial,
- * one for an epoch boundary (if any),
- * one for a wrap in the ring buffer/journal file,
- * one for a second epoch boundary (if any).
- * the epoch boundary one is useful only for speedier recovery in certain cases
- * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!)
+ /*
+ * journal header
*/
struct header_t {
__u64 fsid;
- __s64 num;
__u32 block_size;
__u32 alignment;
- __s64 max_size;
- __s64 wrap;
- __u32 epoch[4];
- __s64 offset[4];
+ __s64 max_size; // max size of journal ring buffer
+ __s64 wrap; // wrap byte pos (if any)
+ __s64 start; // offset of first entry
- header_t() : fsid(0), num(0), block_size(0), alignment(0), max_size(0), wrap(0) {}
+ header_t() : fsid(0), block_size(0), alignment(0), max_size(0), wrap(0), start(0) {}
void clear() {
- num = 0;
wrap = 0;
- }
- void pop() {
- if (num >= 2 && offset[0] > offset[1])
- wrap = 0; // we're eliminating a wrap
- num--;
- for (int i=0; i<num; i++) {
- epoch[i] = epoch[i+1];
- offset[i] = offset[i+1];
- }
- }
- void push(epoch_t e, off64_t o) {
- assert(num < 4);
- if (num > 2 &&
- epoch[num-1] == e &&
- epoch[num-2] == (e-1))
- num--; // tail was an epoch boundary; replace it.
- epoch[num] = e;
- offset[num] = o;
- num++;
- }
- epoch_t last_epoch() {
- if (num)
- return epoch[num-1];
- else
- return 0;
+ start = block_size;
}
} header;
struct entry_header_t {
- uint64_t epoch;
+ uint64_t seq; // fs op seq #
uint64_t len;
uint64_t magic1;
uint64_t magic2;
void make_magic(off64_t pos, uint64_t fsid) {
magic1 = pos;
- magic2 = fsid ^ epoch ^ len;
+ magic2 = fsid ^ seq ^ len;
}
bool check_magic(off64_t pos, uint64_t fsid) {
return
magic1 == (uint64_t)pos &&
- magic2 == (fsid ^ epoch ^ len);
+ magic2 == (fsid ^ seq ^ len);
}
};
off64_t max_size;
size_t block_size;
bool directio;
- bool full, writing, must_write_header;
- off64_t write_pos; // byte where next entry written goes
+ bool writing, must_write_header;
+ off64_t write_pos; // byte where the next entry to be written will go
off64_t read_pos; //
+ __u64 seq;
+
+ __u64 full_commit_seq; // don't write, wait for this seq to commit
+ __u64 full_restart_seq; // start writing again with this seq
+
int fd;
- // to be journaled
- list<pair<epoch_t,bufferlist> > writeq;
- deque<Context*> commitq;
+ // in journal
+ deque<pair<__u64, off64_t> > journalq; // track seq offsets, so we can trim later.
- // being journaled
- deque<Context*> writingq;
+ // currently being journaled and awaiting callback.
+ // or, awaiting callback bc journal was full.
+ deque<__u64> writing_seq;
+ deque<Context*> writing_fin;
+
+ // waiting to be journaled
+ struct write_item {
+ __u64 seq;
+ bufferlist bl;
+ Context *fin;
+ write_item(__u64 s, bufferlist& b, Context *f) : seq(s), fin(f) { bl.claim(b); }
+ };
+ deque<write_item> writeq;
// write thread
Mutex write_lock;
void stop_writer();
void write_thread_entry();
- void check_for_wrap(epoch_t epoch, off64_t pos, off64_t size);
+ bool check_for_wrap(__u64 seq, off64_t *pos, off64_t size, bool can_wrap);
bool prepare_single_dio_write(bufferlist& bl);
void prepare_multi_write(bufferlist& bl);
void do_write(bufferlist& bl);
Journal(fsid, fin), fn(f),
max_size(0), block_size(0),
directio(dio),
- full(false), writing(false), must_write_header(false),
+ writing(false), must_write_header(false),
write_pos(0), read_pos(0),
+ seq(0),
+ full_commit_seq(0), full_restart_seq(0),
fd(-1),
write_stop(false), write_thread(this) { }
~FileJournal() {}
int create();
- int open(epoch_t epoch);
+ int open(__u64 last_seq);
void close();
bool is_writeable() {
void make_writeable();
// writes
- void submit_entry(epoch_t epoch, bufferlist& e, Context *oncommit); // submit an item
- void commit_epoch_start(epoch_t); // mark epoch boundary
- void commit_epoch_finish(epoch_t); // mark prior epoch as committed (we can expire)
-
- bool read_entry(bufferlist& bl, epoch_t& e);
-
+ void submit_entry(__u64 seq, bufferlist& bl, Context *oncommit); // submit an item
+ void committed_thru(__u64 seq);
bool is_full();
// reads
+ bool read_entry(bufferlist& bl, __u64& seq);
};
#endif