From f46b1b473fce0322a672b16c7739e569a45054b6 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 1 Feb 2010 15:44:26 -0800 Subject: [PATCH] journal: make wrapping simpler Take out weirdness that tries to keep journal items contiguous. No reason not to split them across the end/beginning of the journal. In the general case, this is the same # of seeks because we have to rewrite the header anyway. --- src/os/FileJournal.cc | 276 +++++++++++++++++++++--------------------- src/os/FileJournal.h | 13 +- 2 files changed, 148 insertions(+), 141 deletions(-) diff --git a/src/os/FileJournal.cc b/src/os/FileJournal.cc index f5c9a28acef49..241789d407c50 100644 --- a/src/os/FileJournal.cc +++ b/src/os/FileJournal.cc @@ -280,7 +280,7 @@ void FileJournal::print_header() << " alignment " << header.alignment << " max_size " << header.max_size << dendl; - dout(10) << "header: start " << header.start << " wrap " << header.wrap << dendl; + dout(10) << "header: start " << header.start << dendl; dout(10) << " write_pos " << write_pos << dendl; } @@ -306,77 +306,40 @@ void FileJournal::read_header() bufferptr FileJournal::prepare_header() { - bufferptr bp; - if (directio) { - bp = buffer::create_page_aligned(block_size); - bp.zero(); - memcpy(bp.c_str(), &header, sizeof(header)); - } else { - bp = buffer::create(sizeof(header)); - memcpy(bp.c_str(), &header, sizeof(header)); - } + bufferptr bp = buffer::create_page_aligned(get_top()); + bp.zero(); + memcpy(bp.c_str(), &header, sizeof(header)); return bp; } - -bool FileJournal::check_for_wrap(__u64 seq, off64_t *pos, off64_t size, bool can_wrap) +bool FileJournal::check_for_full(__u64 seq, off64_t pos, off64_t size) { - dout(20) << "check_for_wrap seq " << seq - << " pos " << *pos << " size " << size - << " max_size " << header.max_size - << " wrap " << header.wrap - << dendl; - // already full? if (full_commit_seq || full_restart_seq) return false; + off64_t room = (header.max_size - pos) + (header.start - get_top()); + if (do_sync_cond) { - __s64 approxroom = header.wrap ? - header.wrap + *pos - header.start : - header.max_size + header.start - *pos; - if (approxroom < (header.max_size >> 1) && - approxroom + size > (header.max_size >> 1)) + if (room < (header.max_size >> 1) && + room + size > (header.max_size >> 1)) do_sync_cond->Signal(); // initiate a real commit so we can trim } - // does it fit? - if (header.wrap) { - // we're wrapped. don't overwrite ourselves. - - if (*pos + size < header.start) - return true; // fits - - dout(1) << "JOURNAL FULL (and wrapped), " << *pos << "+" << size - << " >= " << header.start - << dendl; - } else { - // we haven't wrapped. - - if (*pos + size < header.max_size) - return true; // fits - - if (!can_wrap) - return false; // can't wrap just now.. - - // is there room if we wrap? - if (get_top() + size < header.start) { - // yes! - dout(10) << " wrapping from " << *pos << " to " << get_top() << dendl; - header.wrap = *pos; - write_pos = *pos = get_top(); + if (room >= size) { + dout(10) << "check_for_full at " << pos << " : " << size << " < " << room << dendl; + if (pos + size > header.max_size) must_write_header = true; - return true; - } - - // no room. - dout(1) << "submit_entry JOURNAL FULL (and can't wrap), " << *pos << "+" << size - << " >= " << header.max_size - << dendl; + return true; } + // full + dout(1) << "check_for_full at " << pos << " : JOURNAL FULL " + << pos << " >= " << room + << " (max_size " << header.max_size << " start " << header.start << ")" + << dendl; full_commit_seq = seq; full_restart_seq = seq+1; while (!writeq.empty()) { @@ -388,8 +351,8 @@ bool FileJournal::check_for_wrap(__u64 seq, off64_t *pos, off64_t size, bool can } print_header(); return false; -} +} void FileJournal::prepare_multi_write(bufferlist& bl) { @@ -403,9 +366,7 @@ void FileJournal::prepare_multi_write(bufferlist& bl) return; while (!writeq.empty()) { - bool can_wrap = !bl.length(); // only wrap if this is a new thinger - - bool r = prepare_single_write(bl, queue_pos, can_wrap); + bool r = prepare_single_write(bl, queue_pos); if (!r) break; @@ -423,10 +384,10 @@ void FileJournal::prepare_multi_write(bufferlist& bl) } } - assert(write_pos + bl.length() == queue_pos); + //assert(write_pos + bl.length() == queue_pos); } -bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, bool can_wrap) +bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos) { // grab next item __u64 seq = writeq.front().seq; @@ -434,7 +395,7 @@ bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, bool off64_t base_size = 2*sizeof(entry_header_t) + ebl.length(); off64_t size = ROUND_UP_TO(base_size, header.alignment); - if (!check_for_wrap(seq, &queue_pos, size, can_wrap)) + if (!check_for_full(seq, queue_pos, size)) return false; // add to write buffer @@ -468,10 +429,32 @@ bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, bool journalq.push_back(pair<__u64,off64_t>(seq, queue_pos)); queue_pos += size; + if (queue_pos > header.max_size) + queue_pos = queue_pos + get_top() - header.max_size; return true; } +void FileJournal::write_bl(off64_t& pos, bufferlist& bl) +{ + ::lseek64(fd, pos, SEEK_SET); + + for (list::const_iterator it = bl.buffers().begin(); + it != bl.buffers().end(); + it++) { + if ((*it).length() == 0) + continue; // blank buffer. + int r = ::write(fd, (char*)(*it).c_str(), (*it).length()); + if (r < 0) { + char buf[80]; + derr(0) << "do_write failed with " << errno << " " << strerror_r(errno, buf, sizeof(buf)) + << " with " << (void*)(*it).c_str() << " len " << (*it).length() + << dendl; + } + pos += (*it).length(); + } +} + void FileJournal::do_write(bufferlist& bl) { // nothing to do? @@ -496,18 +479,8 @@ void FileJournal::do_write(bufferlist& bl) utime_t from = g_clock.now(); - // header - if (hbp.length()) - ::pwrite(fd, hbp.c_str(), hbp.length(), 0); - // entry -#ifdef DARWIN - off_t pos = write_pos; - ::lseek(fd, write_pos, SEEK_SET); -#else off64_t pos = write_pos; - ::lseek64(fd, write_pos, SEEK_SET); -#endif // make sure this is a single, contiguous buffer if (directio && !bl.is_contiguous()) { @@ -516,28 +489,50 @@ void FileJournal::do_write(bufferlist& bl) assert((bl.length() & ~PAGE_MASK) == 0); } - for (list::const_iterator it = bl.buffers().begin(); - it != bl.buffers().end(); - it++) { - if ((*it).length() == 0) continue; // blank buffer. - int r = ::write(fd, (char*)(*it).c_str(), (*it).length()); - if (r < 0) { - char buf[80]; - derr(0) << "do_write failed with " << errno << " " << strerror_r(errno, buf, sizeof(buf)) - << " with " << (void*)(*it).c_str() << " len " << (*it).length() - << dendl; - } - pos += (*it).length(); + // split? + off64_t split = 0; + if (pos + bl.length() > header.max_size) { + bufferlist first, second; + split = header.max_size - pos; + first.substr_of(bl, 0, split); + second.substr_of(bl, split, bl.length() - split); + assert(first.length() + second.length() == bl.length()); + dout(10) << "do_write wrapping, first bit at " << pos << " len " << first.length() + << " second bit len " << second.length() << " (orig len " << bl.length() << ")" << dendl; + + write_bl(pos, first); + assert(pos == header.max_size); + if (hbp.length()) { + // be sneaky: include the header in the second fragment + second.push_front(hbp); + pos = 0; // we included the header + } else + pos = get_top(); // no header, start after that + write_bl(pos, second); + } else { + // header too? + if (hbp.length()) + ::pwrite(fd, hbp.c_str(), hbp.length(), 0); + + write_bl(pos, bl); } + if (!directio) { dout(20) << "do_write fsync" << dendl; #ifdef DARWIN ::fsync(fd); #else - if (is_bdev) - ::sync_file_range(fd, write_pos, bl.length(), - SYNC_FILE_RANGE_WAIT_BEFORE|SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER); - else + if (is_bdev) { + if (split) { + ::sync_file_range(fd, header.max_size - split, split, SYNC_FILE_RANGE_WAIT_BEFORE|SYNC_FILE_RANGE_WRITE); + ::sync_file_range(fd, get_top(), bl.length() - split, SYNC_FILE_RANGE_WAIT_BEFORE|SYNC_FILE_RANGE_WRITE); + ::sync_file_range(fd, header.max_size - split, split, SYNC_FILE_RANGE_WAIT_AFTER); + ::sync_file_range(fd, get_top(), bl.length() - split, SYNC_FILE_RANGE_WAIT_AFTER); + } else { + ::sync_file_range(fd, write_pos, bl.length(), + SYNC_FILE_RANGE_WAIT_BEFORE|SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER); + } + } else ::fdatasync(fd); #endif } @@ -549,8 +544,7 @@ void FileJournal::do_write(bufferlist& bl) writing = false; - write_pos += bl.length(); - //write_pos = ROUND_UP_TO(write_pos, header.alignment); + write_pos = pos; assert(write_pos % header.alignment == 0); // kick finisher? @@ -647,10 +641,6 @@ void FileJournal::committed_thru(__u64 seq) // adjust start pointer while (!journalq.empty() && journalq.front().first <= seq) { - if (journalq.front().second == get_top()) { - dout(10) << " committed event at " << journalq.front().second << ", clearing wrap marker" << dendl; - header.wrap = 0; // clear wrap marker - } journalq.pop_front(); } if (!journalq.empty()) { @@ -698,6 +688,32 @@ void FileJournal::make_writeable() start_writer(); } +void FileJournal::wrap_read_bl(off64_t& pos, int64_t olen, bufferlist& bl) +{ + while (olen > 0) { + while (pos >= header.max_size) + pos = pos + get_top() - header.max_size; + + int64_t len; + if (pos + olen > header.max_size) + len = header.max_size - pos; // partial + else + len = olen; // rest + +#ifdef DARWIN + ::lseek(fd, pos, SEEK_SET); +#else + ::lseek64(fd, pos, SEEK_SET); +#endif + + bufferptr bp = buffer::create(len); + int r = ::read(fd, bp.c_str(), len); + assert(r == len); + bl.push_back(bp); + pos += len; + olen -= len; + } +} bool FileJournal::read_entry(bufferlist& bl, __u64& seq) { @@ -705,71 +721,61 @@ bool FileJournal::read_entry(bufferlist& bl, __u64& seq) dout(2) << "read_entry -- not readable" << dendl; return false; } - if (read_pos == header.wrap) { - read_pos = get_top(); - dout(10) << "read_entry wrapped from " << header.wrap << " to " << read_pos << dendl; - } + + off64_t pos = read_pos; + bl.clear(); // header - entry_header_t h; -#ifdef DARWIN - ::lseek(fd, read_pos, SEEK_SET); -#else - ::lseek64(fd, read_pos, SEEK_SET); -#endif - ::read(fd, &h, sizeof(h)); - if (!h.check_magic(read_pos, header.fsid)) { + entry_header_t *h; + bufferlist hbl; + wrap_read_bl(pos, sizeof(*h), hbl); + h = (entry_header_t *)hbl.c_str(); + + if (!h->check_magic(read_pos, header.fsid)) { dout(2) << "read_entry " << read_pos << " : bad header magic, end of journal" << dendl; return false; } - if (h.pre_pad) - ::lseek64(fd, h.pre_pad, SEEK_CUR); - - // body - bufferptr bp(h.len); - ::read(fd, bp.c_str(), h.len); - - if (h.post_pad) - ::lseek64(fd, h.post_pad, SEEK_CUR); + // pad + body + pad + if (h->pre_pad) + pos += h->pre_pad; + wrap_read_bl(pos, h->len, bl); + if (h->post_pad) + pos += h->post_pad; // footer - entry_header_t f; - ::read(fd, &f, sizeof(h)); - if (!f.check_magic(read_pos, header.fsid) || - h.seq != f.seq || - h.len != f.len) { + entry_header_t *f; + bufferlist fbl; + wrap_read_bl(pos, sizeof(*f), fbl); + f = (entry_header_t *)fbl.c_str(); + if (memcmp(f, h, sizeof(*f))) { dout(2) << "read_entry " << read_pos << " : bad footer magic, partial entry, end of journal" << dendl; return false; } - // yay! - dout(1) << "read_entry " << read_pos << " : seq " << h.seq - << " " << h.len << " bytes" + dout(1) << "read_entry " << read_pos << " : seq " << h->seq + << " " << h->len << " bytes" << dendl; - if (seq && h.seq != seq) { - dout(2) << "read_entry " << read_pos << " : got seq " << h.seq << ", expected " << seq << ", stopping" << dendl; + if (seq && h->seq != seq) { + dout(2) << "read_entry " << read_pos << " : got seq " << h->seq << ", expected " << seq << ", stopping" << dendl; return false; } - if (h.seq < last_committed_seq) { + if (h->seq < last_committed_seq) { dout(0) << "read_entry seq " << seq << " < last_committed_seq " << last_committed_seq << dendl; - assert(h.seq >= last_committed_seq); + assert(h->seq >= last_committed_seq); return false; } - last_committed_seq = h.seq; + last_committed_seq = h->seq; // ok! - bl.clear(); - bl.push_back(bp); - seq = h.seq; - journalq.push_back(pair<__u64,off64_t>(h.seq, read_pos)); + seq = h->seq; + journalq.push_back(pair<__u64,off64_t>(h->seq, read_pos)); - read_pos += 2*sizeof(entry_header_t) + h.pre_pad + h.len + h.post_pad; + read_pos = pos; assert(read_pos % header.alignment == 0); - //read_pos = ROUND_UP_TO(read_pos, header.alignment); - + return true; } diff --git a/src/os/FileJournal.h b/src/os/FileJournal.h index aa90153298388..a0c929b0b4f60 100644 --- a/src/os/FileJournal.h +++ b/src/os/FileJournal.h @@ -36,13 +36,11 @@ public: __u32 block_size; __u32 alignment; __s64 max_size; // max size of journal ring buffer - __s64 wrap; // wrap byte pos (if any) __s64 start; // offset of first entry - header_t() : version(1), flags(0), fsid(0), block_size(0), alignment(0), max_size(0), wrap(0), start(0) {} + header_t() : version(1), flags(0), fsid(0), block_size(0), alignment(0), max_size(0), start(0) {} void clear() { - wrap = 0; start = block_size; } } header; @@ -116,12 +114,15 @@ private: void stop_writer(); void write_thread_entry(); - bool check_for_wrap(__u64 seq, off64_t *pos, off64_t size, bool can_wrap); + bool check_for_full(__u64 seq, off64_t pos, off64_t size); void prepare_multi_write(bufferlist& bl); - bool prepare_single_write(bufferlist& bl, off64_t& queue_pos, bool can_wrap); - bool prepare_single_dio_write(bufferlist& bl, off64_t& queue_pos, bool can_wrap); + bool prepare_single_write(bufferlist& bl, off64_t& queue_pos); + bool prepare_single_dio_write(bufferlist& bl, off64_t& queue_pos); void do_write(bufferlist& bl); + void write_bl(off64_t& pos, bufferlist& bl); + void wrap_read_bl(off64_t& pos, int64_t len, bufferlist& bl); + class Writer : public Thread { FileJournal *journal; public: -- 2.39.5