From: Sage Weil Date: Thu, 11 Sep 2008 17:56:16 +0000 (-0700) Subject: journal: journal rewrite X-Git-Tag: v0.4~118 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=37c3ad36d4ded2c300cce4b87eda12f29efd846d;p=ceph.git journal: journal rewrite --- diff --git a/src/TODO b/src/TODO index 5f19bb4f5c31..f18bf553cb2e 100644 --- a/src/TODO +++ b/src/TODO @@ -1,6 +1,7 @@ v0.4 -- btrfs latency. update howto. -- snap garbage collection +/- btrfs latency. update howto. +/- snap garbage collection +- journal v0.5 - ENOSPC @@ -51,10 +52,12 @@ userspace client - fix readdir vs fragment race by keeping a separate frag pos, and ignoring dentries below it kernel client +- statfs fsid should xor the ceph_fsid hi/low words +- async writepages? + - we want to saturate network with writeback on a single file - add i_frag_mutex to protect the frag tree.. - get rid of ugly i_lock vs kmalloc juggling in get_or_create_frag -- fsync on dir? -- virtual xattr for exposing dirstat/rstat info (instead of 'cat dirname') +- fsync on dir? what is that supposed to do? - make writepages maybe skip pages with errors? - EIO, or ENOSPC? - ... writeback vs ENOSPC vs flush vs close()... hrm... diff --git a/src/config.cc b/src/config.cc index 4d7527c6e2f8..8699474d06f1 100644 --- a/src/config.cc +++ b/src/config.cc @@ -450,7 +450,7 @@ md_config_t g_conf = { // journal journal_dio: false, journal_max_write_bytes: 0, - journal_max_write_entries: 10, + journal_max_write_entries: 100, // --- block device --- bdev_lock: true, diff --git a/src/config.h b/src/config.h index ef891e0c1a3b..b7819c115431 100644 --- a/src/config.h +++ b/src/config.h @@ -309,8 +309,8 @@ struct md_config_t { // journal bool journal_dio; - bool journal_max_write_bytes; - bool journal_max_write_entries; + int journal_max_write_bytes; + int journal_max_write_entries; // block device bool bdev_lock; diff --git a/src/ebofs/Ebofs.cc b/src/ebofs/Ebofs.cc index 1ad6be27c7e7..9bf65b7fb623 100644 --- a/src/ebofs/Ebofs.cc +++ b/src/ebofs/Ebofs.cc @@ -137,7 +137,7 @@ int Ebofs::mount() while (1) { bufferlist bl; - epoch_t e; + __u64 e; if (!journal->read_entry(bl, e)) { dout(3) << "mount replay: end of journal, done." << dendl; break; @@ -503,8 +503,6 @@ int Ebofs::commit_thread_entry() while (1) { // --- queue up commit writes --- bc.poison_commit = false; - if (journal) - journal->commit_epoch_start(super_epoch); // FIXME: make loopable commit_inodes_start(); // do this first; it currently involves inode reallocation allocator.commit_limbo(); // limbo -> limbo_tab nodepool.commit_start(dev, super_epoch); @@ -557,8 +555,8 @@ int Ebofs::commit_thread_entry() alloc_more_node_space(); } - // signal journal - if (journal) journal->commit_epoch_finish(super_epoch); + // trim journal + if (journal) journal->committed_thru(super_epoch-1); // kick waiters dout(10) << "commit_thread queueing commit + kicking sync waiters" << dendl; @@ -2447,6 +2445,10 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) ebofs_lock.Lock(); dout(7) << "apply_transaction start (" << t.get_num_ops() << " ops)" << dendl; + bufferlist bl; + if (journal) + t.encode(bl); + unsigned r = _apply_transaction(t); // journal, wait for commit @@ -2455,8 +2457,6 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) onsafe = 0; } if (journal) { - bufferlist bl; - t.encode(bl); journal->submit_entry(super_epoch, bl, onsafe); } else queue_commit_waiter(onsafe); diff --git a/src/include/types.h b/src/include/types.h index 7929b067552b..ea17a0343b17 100644 --- a/src/include/types.h +++ b/src/include/types.h @@ -102,6 +102,16 @@ inline ostream& operator<<(ostream& out, const vector& v) { out << "]"; return out; } +template +inline ostream& operator<<(ostream& out, const deque& v) { + out << "<"; + for (typename deque::const_iterator p = v.begin(); p != v.end(); p++) { + if (p != v.begin()) out << ","; + out << *p; + } + out << ">"; + return out; +} template inline ostream& operator<<(ostream& out, const list& ilist) { diff --git a/src/os/FileJournal.cc b/src/os/FileJournal.cc index b6cfd1d03382..5bde9833c568 100644 --- a/src/os/FileJournal.cc +++ b/src/os/FileJournal.cc @@ -74,6 +74,7 @@ int FileJournal::create() header.alignment = block_size; else header.alignment = 16; // at least stay word aligned on 64bit machines... + header.start = get_top(); print_header(); buffer::ptr bp = prepare_header(); @@ -89,9 +90,9 @@ int FileJournal::create() return 0; } -int FileJournal::open(epoch_t epoch) +int FileJournal::open(__u64 next_seq) { - dout(2) << "open " << fn << dendl; + dout(2) << "open " << fn << " next_seq " << next_seq << dendl; int err = _open(false); if (err < 0) return err; @@ -102,11 +103,11 @@ int FileJournal::open(epoch_t epoch) // read header? read_header(); - dout(10) << "open journal header.fsid = " << header.fsid + dout(10) << "open header.fsid = " << header.fsid //<< " vs expected fsid = " << fsid << dendl; if (header.fsid != fsid) { - dout(2) << "open journal fsid doesn't match, invalid (someone else's?) journal" << dendl; + dout(2) << "open fsid doesn't match, invalid (someone else's?) journal" << dendl; err = -EINVAL; } if (header.max_size > max_size) { @@ -127,40 +128,27 @@ int FileJournal::open(epoch_t epoch) // looks like a valid header. write_pos = 0; // not writeable yet - read_pos = 0; - if (header.num > 0) { - // pick an offset - for (int i=0; i epoch) { - dout(2) << "super_epoch is " << epoch - << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i] - << dendl; - break; - } + // find next entry + read_pos = header.start; + while (1) { + bufferlist bl; + __u64 seq; + off64_t old_pos = read_pos; + if (!read_entry(bl, seq)) { + dout(10) << "open reached end of journal." << dendl; + break; } - - if (read_pos == 0) { - dout(0) << "no valid journal segments" << dendl; - return 0; //hrm return -EINVAL; + if (seq > next_seq) { + dout(10) << "open entry " << seq << " len " << bl.length() << " > next_seq " << next_seq << dendl; + read_pos = -1; + return 0; + } + if (seq == next_seq) { + dout(10) << "open reached seq " << seq << dendl; + read_pos = old_pos; + break; } - - } else { - dout(0) << "journal was empty" << dendl; - read_pos = -1; } return 0; @@ -175,7 +163,6 @@ void FileJournal::close() // close assert(writeq.empty()); - assert(commitq.empty()); assert(fd > 0); ::close(fd); fd = -1; @@ -199,16 +186,15 @@ void FileJournal::stop_writer() } + void FileJournal::print_header() { - for (int i=0; i header.last_epoch()) { - dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl; - header.push(epoch, pos); - must_write_header = true; - } + dout(20) << "check_for_wrap seq " << seq + << " pos " << *pos << " size " << size + << " max_size " << header.max_size + << " wrap " << header.wrap + << dendl; + + // already full? + if (full_commit_seq || full_restart_seq) + return false; // does it fit? if (header.wrap) { // we're wrapped. don't overwrite ourselves. - if (pos + size >= header.offset[0]) { - dout(10) << "JOURNAL FULL (and wrapped), " << pos << "+" << size - << " >= " << header.offset[0] - << dendl; - full = true; - writeq.clear(); - print_header(); - } + + if (*pos + size < header.start) + return true; // fits + + dout(10) << "JOURNAL FULL (and wrapped), " << *pos << "+" << size + << " >= " << header.start + << dendl; } else { // we haven't wrapped. - if (pos + size >= header.max_size) { - // is there room if we wrap? - if (get_top() + size < header.offset[0]) { - // yes! - dout(10) << "wrapped from " << pos << " to " << get_top() << dendl; - header.wrap = pos; - pos = get_top(); - header.push(epoch, pos); - must_write_header = true; - } else { - // no room. - dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << pos << "+" << size - << " >= " << header.max_size - << dendl; - full = true; - writeq.clear(); - } + + if (*pos + size < header.max_size) + return true; // fits + + if (!can_wrap) + return false; // can't wrap just now.. + + // is there room if we wrap? + if (get_top() + size < header.start) { + // yes! + dout(10) << " wrapping from " << *pos << " to " << get_top() << dendl; + header.wrap = *pos; + *pos = get_top(); + must_write_header = true; + return true; } + + // no room. + dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << *pos << "+" << size + << " >= " << header.max_size + << dendl; } + + full_commit_seq = seq; + full_restart_seq = seq+1; + while (!writeq.empty()) { + writing_seq.push_back(writeq.front().seq); + writing_fin.push_back(writeq.front().fin); + writeq.pop_front(); + } + print_header(); + return false; } @@ -299,79 +299,109 @@ void FileJournal::prepare_multi_write(bufferlist& bl) int eleft = g_conf.journal_max_write_entries; int bleft = g_conf.journal_max_write_bytes; + if (full_commit_seq || full_restart_seq) + return; + while (!writeq.empty()) { // grab next item - epoch_t epoch = writeq.front().first; - bufferlist &ebl = writeq.front().second; + __u64 seq = writeq.front().seq; + bufferlist &ebl = writeq.front().bl; off64_t size = 2*sizeof(entry_header_t) + ebl.length(); - if (bl.length() > 0 && bleft > 0 && bleft < size) break; - - check_for_wrap(epoch, queue_pos, size); - if (full) break; - if (bl.length() && must_write_header) + bool can_wrap = !bl.length(); // only wrap if this is a new thinger + if (!check_for_wrap(seq, &queue_pos, size, can_wrap)) break; + + // set write_pos? (check_for_wrap may have moved it) + if (!bl.length()) + write_pos = queue_pos; // add to write buffer - dout(15) << "prepare_multi_write will write " << queue_pos << " : " - << ebl.length() << " epoch " << epoch << " -> " << size << dendl; + dout(15) << "prepare_multi_write will write " << queue_pos << " : seq " << seq + << " len " << ebl.length() << " -> " << size + << " (left " << eleft << "/" << bleft << ")" + << dendl; // add it this entry entry_header_t h; - h.epoch = epoch; + h.seq = seq; h.len = ebl.length(); h.make_magic(queue_pos, header.fsid); bl.append((const char*)&h, sizeof(h)); bl.claim_append(ebl); bl.append((const char*)&h, sizeof(h)); - Context *oncommit = commitq.front(); - if (oncommit) - writingq.push_back(oncommit); - + if (writeq.front().fin) { + writing_seq.push_back(seq); + writing_fin.push_back(writeq.front().fin); + } + // pop from writeq writeq.pop_front(); - commitq.pop_front(); + journalq.push_back(pair<__u64,off64_t>(seq, queue_pos)); queue_pos += size; - if (--eleft == 0) break; - bleft -= size; - if (bleft == 0) break; + + // pad... + if (queue_pos % header.alignment) { + int pad = header.alignment - (queue_pos % header.alignment); + bufferptr bp(pad); + bl.push_back(bp); + queue_pos += pad; + //dout(20) << " padding with " << pad << " bytes, queue_pos now " << queue_pos << dendl; + } + + if (eleft) { + if (--eleft == 0) { + dout(20) << " hit max events per write " << g_conf.journal_max_write_entries << dendl; + break; + } + } + if (bleft) { + bleft -= size; + if (bleft == 0) { + dout(20) << " hit max write size " << g_conf.journal_max_write_bytes << dendl; + break; + } + } } } bool FileJournal::prepare_single_dio_write(bufferlist& bl) { // grab next item - epoch_t epoch = writeq.front().first; - bufferlist &ebl = writeq.front().second; + __u64 seq = writeq.front().seq; + bufferlist &ebl = writeq.front().bl; off64_t size = 2*sizeof(entry_header_t) + ebl.length(); size = ROUND_UP_TO(size, header.alignment); - check_for_wrap(epoch, write_pos, size); - if (full) return false; + if (!check_for_wrap(seq, &write_pos, size, true)) + return false; + if (full_commit_seq || full_restart_seq) return false; // build it - dout(15) << "prepare_single_dio_write will write " << write_pos << " : " - << ebl.length() << " epoch " << epoch << " -> " << size << dendl; + dout(15) << "prepare_single_dio_write will write " << write_pos << " : seq " << seq + << " len " << ebl.length() << " -> " << size << dendl; bufferptr bp = buffer::create_page_aligned(size); entry_header_t *h = (entry_header_t*)bp.c_str(); - h->epoch = epoch; + h->seq = seq; h->len = ebl.length(); h->make_magic(write_pos, header.fsid); ebl.copy(0, ebl.length(), bp.c_str()+sizeof(*h)); memcpy(bp.c_str() + sizeof(*h) + ebl.length(), h, sizeof(*h)); bl.push_back(bp); - Context *oncommit = commitq.front(); - if (oncommit) - writingq.push_back(oncommit); + if (writeq.front().fin) { + writing_seq.push_back(seq); + writing_fin.push_back(writeq.front().fin); + } // pop from writeq writeq.pop_front(); - commitq.pop_front(); + journalq.push_back(pair<__u64,off64_t>(seq, write_pos)); + return true; } @@ -382,8 +412,10 @@ void FileJournal::do_write(bufferlist& bl) return; buffer::ptr hbp; - if (must_write_header) + if (must_write_header) { + must_write_header = false; hbp = prepare_header(); + } writing = true; @@ -392,7 +424,7 @@ void FileJournal::do_write(bufferlist& bl) write_lock.Unlock(); dout(15) << "do_write writing " << write_pos << "~" << bl.length() - << (must_write_header ? " + header":"") + << (hbp.length() ? " + header":"") << dendl; // header @@ -430,14 +462,19 @@ void FileJournal::do_write(bufferlist& bl) write_lock.Lock(); writing = false; - if (memcmp(&old_header, &header, sizeof(header)) == 0) { - write_pos += bl.length(); - write_pos = ROUND_UP_TO(write_pos, header.alignment); - finisher->queue(writingq); + + write_pos += bl.length(); + write_pos = ROUND_UP_TO(write_pos, header.alignment); + + // kick finisher? + // only if we haven't filled up recently! + if (full_commit_seq || full_restart_seq) { + dout(10) << "do_write NOT queueing finisher seq " << writing_seq.front() + << ", full_commit_seq|full_restart_seq" << dendl; } else { - dout(10) << "do_write finished write but header changed? not moving write_pos." << dendl; - derr(0) << "do_write finished write but header changed? not moving write_pos." << dendl; - assert(writingq.empty()); + dout(20) << "do_write doing finisher queue " << writing_fin << dendl; + writing_seq.clear(); + finisher->queue(writing_fin); } } @@ -457,7 +494,6 @@ void FileJournal::write_thread_entry() } bufferlist bl; - must_write_header = false; if (directio) prepare_single_dio_write(bl); else @@ -470,92 +506,83 @@ void FileJournal::write_thread_entry() } -bool FileJournal::is_full() -{ - Mutex::Locker locker(write_lock); - return full; -} - -void FileJournal::submit_entry(epoch_t epoch, bufferlist& e, Context *oncommit) +void FileJournal::submit_entry(__u64 seq, bufferlist& e, Context *oncommit) { Mutex::Locker locker(write_lock); // ** lock ** // dump on queue - dout(10) << "submit_entry " << e.length() - << " epoch " << epoch - << " " << oncommit << dendl; - commitq.push_back(oncommit); - if (!full) { - writeq.push_back(pair(epoch, e)); + dout(10) << "submit_entry seq " << seq + << " len " << e.length() + << " (" << oncommit << ")" << dendl; + + if (!full_commit_seq && full_restart_seq && + seq >= full_restart_seq) { + dout(10) << " seq " << seq << " >= full_restart_seq " << full_restart_seq + << ", restarting journal" << dendl; + full_restart_seq = 0; + } + if (!full_commit_seq && !full_restart_seq) { + writeq.push_back(write_item(seq, e, oncommit)); write_cond.Signal(); // kick writer thread + } else { + // not journaling this. restart writing no sooner than seq + 1. + full_restart_seq = seq+1; + dout(10) << " journal is/was full, will restart no sooner than seq " << full_restart_seq << dendl; + writing_seq.push_back(seq); + writing_fin.push_back(oncommit); } } -void FileJournal::commit_epoch_start(epoch_t new_epoch) +void FileJournal::committed_thru(__u64 seq) { - dout(10) << "commit_epoch_start on " << new_epoch-1 - << " -- new epoch " << new_epoch - << dendl; + dout(10) << "committed_thru " << seq << dendl; Mutex::Locker locker(write_lock); - // was full -> empty -> now usable? - if (full) { - if (header.num != 0) { - dout(1) << " journal FULL, ignoring this epoch" << dendl; - return; - } - - dout(1) << " clearing FULL flag, journal now usable" << dendl; - full = false; - } -} - -void FileJournal::commit_epoch_finish(epoch_t new_epoch) -{ - dout(10) << "commit_epoch_finish committed " << (new_epoch-1) << dendl; + // was full? + if (full_commit_seq && seq >= full_commit_seq) { + dout(1) << " seq " << seq << " >= full_commit_seq " << full_commit_seq + << ", prior journal contents are now fully committed. resetting journal." << dendl; + full_commit_seq = 0; + } - Mutex::Locker locker(write_lock); - - if (full) { - // full journal damage control. - dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << dendl; - header.clear(); - write_pos = get_top(); - } else { - // update header -- trim/discard old (committed) epochs - print_header(); - while (header.num && header.epoch[0] < new_epoch) { - dout(10) << " popping epoch " << header.epoch[0] << " < " << new_epoch << dendl; - header.pop(); - } - if (header.num == 0) { - dout(10) << " starting fresh" << dendl; - write_pos = get_top(); - header.push(new_epoch, write_pos); + // adjust start pointer + while (!journalq.empty() && journalq.front().first <= seq) { + if (journalq.front().second == get_top()) { + dout(10) << " committed event at " << journalq.front().second << ", clearing wrap marker" << dendl; + header.wrap = 0; // clear wrap marker } + journalq.pop_front(); + } + if (!journalq.empty()) { + header.start = journalq.front().second; + } else { + header.start = write_pos; } must_write_header = true; + print_header(); - // discard any unwritten items in previous epoch - while (!writeq.empty() && writeq.front().first < new_epoch) { - dout(15) << " dropping unwritten and committed " - << write_pos << " : " << writeq.front().second.length() - << " epoch " << writeq.front().first + // committed but writing + while (!writing_seq.empty() && writing_seq.front() < seq) { + dout(15) << " finishing committed but writing|waiting seq " << writing_seq.front() << dendl; + finisher->queue(writing_fin.front()); + writing_seq.pop_front(); + writing_fin.pop_front(); + } + + // committed but unjournaled items + while (!writeq.empty() && writeq.front().seq < seq) { + dout(15) << " dropping committed but unwritten seq " << writeq.front().seq + << " len " << writeq.front().bl.length() + << " (" << writeq.front().fin << ")" << dendl; - // finisher? - Context *oncommit = commitq.front(); - if (oncommit) writingq.push_back(oncommit); - - // discard. + if (writeq.front().fin) + finisher->queue(writeq.front().fin); writeq.pop_front(); - commitq.pop_front(); } - // queue the finishers - finisher->queue(writingq); - dout(10) << "commit_epoch_finish done" << dendl; + dout(10) << "committed_thru done" << dendl; } @@ -574,23 +601,14 @@ void FileJournal::make_writeable() } -bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch) +bool FileJournal::read_entry(bufferlist& bl, __u64& seq) { if (!read_pos) { dout(2) << "read_entry -- not readable" << dendl; return false; } - if (read_pos == header.wrap) { - // find wrap point - for (int i=1; i(h.seq, read_pos)); read_pos += 2*sizeof(entry_header_t) + h.len; read_pos = ROUND_UP_TO(read_pos, header.alignment); - + return true; } diff --git a/src/os/FileJournal.h b/src/os/FileJournal.h index 3c46397a23dd..8d779fce3fce 100644 --- a/src/os/FileJournal.h +++ b/src/os/FileJournal.h @@ -16,6 +16,8 @@ #ifndef __EBOFS_FILEJOURNAL_H #define __EBOFS_FILEJOURNAL_H +#include +using std::deque; #include "Journal.h" #include "common/Cond.h" @@ -24,72 +26,39 @@ class FileJournal : public Journal { public: - /** log header - * we allow 4 pointers: - * top/initial, - * one for an epoch boundary (if any), - * one for a wrap in the ring buffer/journal file, - * one for a second epoch boundary (if any). - * the epoch boundary one is useful only for speedier recovery in certain cases - * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!) + /* + * journal header */ struct header_t { __u64 fsid; - __s64 num; __u32 block_size; __u32 alignment; - __s64 max_size; - __s64 wrap; - __u32 epoch[4]; - __s64 offset[4]; + __s64 max_size; // max size of journal ring buffer + __s64 wrap; // wrap byte pos (if any) + __s64 start; // offset of first entry - header_t() : fsid(0), num(0), block_size(0), alignment(0), max_size(0), wrap(0) {} + header_t() : fsid(0), block_size(0), alignment(0), max_size(0), wrap(0), start(0) {} void clear() { - num = 0; wrap = 0; - } - void pop() { - if (num >= 2 && offset[0] > offset[1]) - wrap = 0; // we're eliminating a wrap - num--; - for (int i=0; i 2 && - epoch[num-1] == e && - epoch[num-2] == (e-1)) - num--; // tail was an epoch boundary; replace it. - epoch[num] = e; - offset[num] = o; - num++; - } - epoch_t last_epoch() { - if (num) - return epoch[num-1]; - else - return 0; + start = block_size; } } header; struct entry_header_t { - uint64_t epoch; + uint64_t seq; // fs op seq # uint64_t len; uint64_t magic1; uint64_t magic2; void make_magic(off64_t pos, uint64_t fsid) { magic1 = pos; - magic2 = fsid ^ epoch ^ len; + magic2 = fsid ^ seq ^ len; } bool check_magic(off64_t pos, uint64_t fsid) { return magic1 == (uint64_t)pos && - magic2 == (fsid ^ epoch ^ len); + magic2 == (fsid ^ seq ^ len); } }; @@ -99,18 +68,33 @@ private: off64_t max_size; size_t block_size; bool directio; - bool full, writing, must_write_header; - off64_t write_pos; // byte where next entry written goes + bool writing, must_write_header; + off64_t write_pos; // byte where the next entry to be written will go off64_t read_pos; // + __u64 seq; + + __u64 full_commit_seq; // don't write, wait for this seq to commit + __u64 full_restart_seq; // start writing again with this seq + int fd; - // to be journaled - list > writeq; - deque commitq; + // in journal + deque > journalq; // track seq offsets, so we can trim later. - // being journaled - deque writingq; + // currently being journaled and awaiting callback. + // or, awaiting callback bc journal was full. + deque<__u64> writing_seq; + deque writing_fin; + + // waiting to be journaled + struct write_item { + __u64 seq; + bufferlist bl; + Context *fin; + write_item(__u64 s, bufferlist& b, Context *f) : seq(s), fin(f) { bl.claim(b); } + }; + deque writeq; // write thread Mutex write_lock; @@ -125,7 +109,7 @@ private: void stop_writer(); void write_thread_entry(); - void check_for_wrap(epoch_t epoch, off64_t pos, off64_t size); + bool check_for_wrap(__u64 seq, off64_t *pos, off64_t size, bool can_wrap); bool prepare_single_dio_write(bufferlist& bl); void prepare_multi_write(bufferlist& bl); void do_write(bufferlist& bl); @@ -152,14 +136,16 @@ private: Journal(fsid, fin), fn(f), max_size(0), block_size(0), directio(dio), - full(false), writing(false), must_write_header(false), + writing(false), must_write_header(false), write_pos(0), read_pos(0), + seq(0), + full_commit_seq(0), full_restart_seq(0), fd(-1), write_stop(false), write_thread(this) { } ~FileJournal() {} int create(); - int open(epoch_t epoch); + int open(__u64 last_seq); void close(); bool is_writeable() { @@ -168,15 +154,12 @@ private: void make_writeable(); // writes - void submit_entry(epoch_t epoch, bufferlist& e, Context *oncommit); // submit an item - void commit_epoch_start(epoch_t); // mark epoch boundary - void commit_epoch_finish(epoch_t); // mark prior epoch as committed (we can expire) - - bool read_entry(bufferlist& bl, epoch_t& e); - + void submit_entry(__u64 seq, bufferlist& bl, Context *oncommit); // submit an item + void committed_thru(__u64 seq); bool is_full(); // reads + bool read_entry(bufferlist& bl, __u64& seq); }; #endif diff --git a/src/os/Journal.h b/src/os/Journal.h index a2aff3130f6d..4c1bdd6dbfbe 100644 --- a/src/os/Journal.h +++ b/src/os/Journal.h @@ -30,17 +30,15 @@ public: virtual ~Journal() { } virtual int create() = 0; - virtual int open(epoch_t epoch) = 0; + virtual int open(__u64 last_seq) = 0; virtual void close() = 0; // writes virtual bool is_writeable() = 0; virtual void make_writeable() = 0; - virtual void submit_entry(epoch_t epoch, bufferlist& e, Context *oncommit) = 0; - virtual void commit_epoch_start(epoch_t) = 0; // mark epoch boundary - virtual void commit_epoch_finish(epoch_t) = 0; // mark prior epoch as committed (we can expire) - virtual bool read_entry(bufferlist& bl, epoch_t &e) = 0; - virtual bool is_full() = 0; + virtual void submit_entry(__u64 seq, bufferlist& e, Context *oncommit) = 0; + virtual void committed_thru(__u64 seq) = 0; + virtual bool read_entry(bufferlist& bl, __u64 &seq) = 0; // reads/recovery diff --git a/src/os/JournalingObjectStore.cc b/src/os/JournalingObjectStore.cc index 00db8a8711fc..5fd1c5a94fd2 100644 --- a/src/os/JournalingObjectStore.cc +++ b/src/os/JournalingObjectStore.cc @@ -23,7 +23,7 @@ int JournalingObjectStore::journal_replay() int count = 0; while (1) { bufferlist bl; - epoch_t e; + __u64 e; if (!journal->read_entry(bl, e)) { dout(3) << "journal_replay: end of journal, done." << dendl; break; diff --git a/src/os/JournalingObjectStore.h b/src/os/JournalingObjectStore.h index 0600f0a1463c..de43b37d1cd5 100644 --- a/src/os/JournalingObjectStore.h +++ b/src/os/JournalingObjectStore.h @@ -17,6 +17,7 @@ #include "ObjectStore.h" #include "Journal.h" +#include "common/RWLock.h" class JournalingObjectStore : public ObjectStore { protected: @@ -24,6 +25,7 @@ protected: Journal *journal; Finisher finisher; map > commit_waiters; + RWLock op_lock; void journal_start() { finisher.start(); @@ -38,15 +40,22 @@ protected: } int journal_replay(); + void op_start() { + op_lock.get_read(); + } + void op_finish() { + op_lock.put_read(); + } + void commit_start() { + op_lock.get_write(); super_epoch++; - if (journal) - journal->commit_epoch_start(super_epoch); + op_lock.put_write(); } void commit_finish() { - finisher.queue(commit_waiters[super_epoch-1]); if (journal) - journal->commit_epoch_finish(super_epoch); + journal->committed_thru(super_epoch-1); + finisher.queue(commit_waiters[super_epoch-1]); } void queue_commit_waiter(Context *oncommit) { diff --git a/src/streamtest.cc b/src/streamtest.cc index 15d517b615a5..bb70af62c669 100644 --- a/src/streamtest.cc +++ b/src/streamtest.cc @@ -96,8 +96,8 @@ int main(int argc, const char **argv) cout << "#dev " << filename << ", " << seconds << " seconds, " << bytes << " bytes per write" << std::endl; - //ObjectStore *fs = new Ebofs(filename, journal); - ObjectStore *fs = new FileStore(filename); + ObjectStore *fs = new Ebofs(filename, journal); + //ObjectStore *fs = new FileStore(filename); if (g_conf.mkfs && fs->mkfs() < 0) { diff --git a/src/vstartnew.sh b/src/vstartnew.sh index c669e2b14b68..836aaffaa652 100755 --- a/src/vstartnew.sh +++ b/src/vstartnew.sh @@ -40,9 +40,9 @@ $CEPH_BIN/cmonctl osd setmap -i .ceph_osdmap for osd in 0 #1 #2 3 #4 5 6 7 8 9 10 11 12 13 14 15 do - $SUDO $CEPH_BIN/cosd --mkfs_for_osd $osd dev/osd$osd # initialize empty object store + $SUDO $CEPH_BIN/cosd --debug_journal 20 --mkfs_for_osd $osd dev/osd$osd # initialize empty object store # echo valgrind --leak-check=full --show-reachable=yes $CEPH_BIN/cosd dev/osd$osd --debug_ms 1 --debug_osd 20 --debug_filestore 10 --debug_ebofs 20 #1>out/o$osd #& #--debug_osd 40 - $SUDO $CEPH_BIN/cosd -m $IP:$CEPH_PORT dev/osd$osd -d --debug_ms 1 --debug_osd 20 --debug_filestore 20 + $SUDO $CEPH_BIN/cosd -m $IP:$CEPH_PORT dev/osd$osd -d --debug_ms 1 --debug_journal 20 --debug_osd 20 --debug_filestore 20 --debug_ebofs 20 done # mds