From 8c737cfce81fa81c015879844e45e71dded77d95 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 29 Jan 2010 16:34:17 -0800 Subject: [PATCH] journal: group entries into single io in directio mode --- src/include/buffer.h | 9 ++- src/os/FileJournal.cc | 140 +++++++++++++++--------------------------- src/os/FileJournal.h | 3 +- 3 files changed, 60 insertions(+), 92 deletions(-) diff --git a/src/include/buffer.h b/src/include/buffer.h index 43ce02ba0bfc5..936650986b4bf 100644 --- a/src/include/buffer.h +++ b/src/include/buffer.h @@ -804,8 +804,15 @@ public: } } + bool is_contiguous() { + return &(*_buffers.begin()) == &(*_buffers.rbegin()); + } void rebuild() { - ptr nb(_len); + ptr nb; + if ((_len & ~PAGE_MASK) == 0) + nb = buffer::create_page_aligned(_len); + else + nb = buffer::create(_len); unsigned pos = 0; for (std::list::iterator it = _buffers.begin(); it != _buffers.end(); diff --git a/src/os/FileJournal.cc b/src/os/FileJournal.cc index 474859bddb39d..72321f2ec9c8b 100644 --- a/src/os/FileJournal.cc +++ b/src/os/FileJournal.cc @@ -322,7 +322,7 @@ bool FileJournal::check_for_wrap(__u64 seq, off64_t *pos, off64_t size, bool can // yes! dout(10) << " wrapping from " << *pos << " to " << get_top() << dendl; header.wrap = *pos; - *pos = get_top(); + write_pos = *pos = get_top(); must_write_header = true; return true; } @@ -353,122 +353,77 @@ void FileJournal::prepare_multi_write(bufferlist& bl) off64_t queue_pos = write_pos; int eleft = g_conf.journal_max_write_entries; - int bleft = g_conf.journal_max_write_bytes; + unsigned bmax = g_conf.journal_max_write_bytes; if (full_commit_seq || full_restart_seq) return; - + while (!writeq.empty()) { - // grab next item - __u64 seq = writeq.front().seq; - bufferlist &ebl = writeq.front().bl; - off64_t size = 2*sizeof(entry_header_t) + ebl.length(); - bool can_wrap = !bl.length(); // only wrap if this is a new thinger - if (!check_for_wrap(seq, &queue_pos, size, can_wrap)) - break; - - // set write_pos? (check_for_wrap may have moved it) - if (!bl.length()) - write_pos = queue_pos; - - // add to write buffer - dout(15) << "prepare_multi_write will write " << queue_pos << " : seq " << seq - << " len " << ebl.length() << " -> " << size - << " (left " << eleft << "/" << bleft << ")" - << dendl; - - // add it this entry - entry_header_t h; - h.seq = seq; - h.flags = 0; - h.len = ebl.length(); - h.pre_pad = 0; - h.post_pad = 0; - h.make_magic(queue_pos, header.fsid); - - - // pad? - if ((queue_pos + size) % header.alignment) { - h.post_pad = header.alignment - ((queue_pos + size) % header.alignment); - size += h.post_pad; - //dout(20) << " padding with " << h.post_pad << " bytes, queue_pos now " << queue_pos << dendl; - } - dout(15) << " pad " << h.pre_pad << " " << h.post_pad << " len " << h.len << dendl; - - bl.append((const char*)&h, sizeof(h)); - bl.claim_append(ebl); - if (h.post_pad) { - bufferptr bp = buffer::create_static(h.post_pad, zero_buf); - bl.push_back(bp); - } - bl.append((const char*)&h, sizeof(h)); - if (writeq.front().fin) { - writing_seq.push_back(seq); - writing_fin.push_back(writeq.front().fin); - } - - // pop from writeq - writeq.pop_front(); - journalq.push_back(pair<__u64,off64_t>(seq, queue_pos)); - - queue_pos += size; + bool r = prepare_single_write(bl, queue_pos, can_wrap); + if (!r) + break; if (eleft) { if (--eleft == 0) { - dout(20) << " hit max events per write " << g_conf.journal_max_write_entries << dendl; + dout(20) << "prepare_multi_write hit max events per write " << g_conf.journal_max_write_entries << dendl; break; } } - if (bleft) { - bleft -= size; - if (bleft == 0) { - dout(20) << " hit max write size " << g_conf.journal_max_write_bytes << dendl; + if (bmax) { + if (bl.length() >= bmax) { + dout(20) << "prepare_multi_write hit max write size " << g_conf.journal_max_write_bytes << dendl; break; } } } + + assert(write_pos + bl.length() == queue_pos); } -bool FileJournal::prepare_single_dio_write(bufferlist& bl) +bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, bool can_wrap) { // grab next item __u64 seq = writeq.front().seq; bufferlist &ebl = writeq.front().bl; - off64_t base_size = 2*sizeof(entry_header_t) + ebl.length(); off64_t size = ROUND_UP_TO(base_size, header.alignment); - - if (!check_for_wrap(seq, &write_pos, size, true)) + + if (!check_for_wrap(seq, &queue_pos, size, can_wrap)) return false; - if (full_commit_seq || full_restart_seq) return false; - - // build it - dout(15) << "prepare_single_dio_write will write " << write_pos << " : seq " << seq - << " len " << ebl.length() << " -> " << size << " (base " << base_size << ")" << dendl; - - bufferptr bp = buffer::create_page_aligned(size); - entry_header_t *h = (entry_header_t*)bp.c_str(); - h->seq = seq; - h->flags = 0; - h->len = ebl.length(); - h->pre_pad = 0; - h->post_pad = size - base_size; - dout(15) << " pad " << h->pre_pad << " " << h->post_pad << dendl; - h->make_magic(write_pos, header.fsid); - ebl.copy(0, ebl.length(), bp.c_str()+sizeof(*h)+h->pre_pad); - memcpy(bp.c_str() + sizeof(*h) + h->pre_pad + ebl.length() + h->post_pad, h, sizeof(*h)); - bl.push_back(bp); - + + // add to write buffer + dout(15) << "prepare_single_write will write " << queue_pos << " : seq " << seq + << " len " << ebl.length() << " -> " << size + << dendl; + + // add it this entry + entry_header_t h; + h.seq = seq; + h.pre_pad = 0; + h.len = ebl.length(); + h.post_pad = size - base_size; + h.make_magic(queue_pos, header.fsid); + + bl.append((const char*)&h, sizeof(h)); + bl.claim_append(ebl); + if (h.post_pad) { + bufferptr bp = buffer::create_static(h.post_pad, zero_buf); + bl.push_back(bp); + } + bl.append((const char*)&h, sizeof(h)); + if (writeq.front().fin) { writing_seq.push_back(seq); writing_fin.push_back(writeq.front().fin); } - + // pop from writeq writeq.pop_front(); - journalq.push_back(pair<__u64,off64_t>(seq, write_pos)); + journalq.push_back(pair<__u64,off64_t>(seq, queue_pos)); + + queue_pos += size; return true; } @@ -507,6 +462,14 @@ void FileJournal::do_write(bufferlist& bl) off64_t pos = write_pos; ::lseek64(fd, write_pos, SEEK_SET); #endif + + // make sure this is a single, contiguous buffer + if (directio && !bl.is_contiguous()) { + bl.rebuild(); + //dout(0) << "len " << bl.length() << " page_mask " << PAGE_MASK << " ~" << ~PAGE_MASK << dendl; + assert((bl.length() & ~PAGE_MASK) == 0); + } + for (list::const_iterator it = bl.buffers().begin(); it != bl.buffers().end(); it++) { @@ -567,10 +530,7 @@ void FileJournal::write_thread_entry() } bufferlist bl; - if (directio) - prepare_single_dio_write(bl); - else - prepare_multi_write(bl); + prepare_multi_write(bl); do_write(bl); } diff --git a/src/os/FileJournal.h b/src/os/FileJournal.h index c4d2baa91535d..c0a95054d585e 100644 --- a/src/os/FileJournal.h +++ b/src/os/FileJournal.h @@ -116,8 +116,9 @@ private: void write_thread_entry(); bool check_for_wrap(__u64 seq, off64_t *pos, off64_t size, bool can_wrap); - bool prepare_single_dio_write(bufferlist& bl); void prepare_multi_write(bufferlist& bl); + bool prepare_single_write(bufferlist& bl, off64_t& queue_pos, bool can_wrap); + bool prepare_single_dio_write(bufferlist& bl, off64_t& queue_pos, bool can_wrap); void do_write(bufferlist& bl); class Writer : public Thread { -- 2.39.5