From eb020b6a6a75c69f71dc29fd607a4626bed28366 Mon Sep 17 00:00:00 2001 From: Xinze Chi Date: Fri, 6 Nov 2015 21:44:38 +0800 Subject: [PATCH] os: write file journal optimezation Currently, there is single write thread for file journal, so it would be bottleneck. It is important to keep logic of the journal write thread simple. According to the implementation of transaction encoding, it is almost impossible that the write bufferlist would be align. So write journal would call rebuild_aligned almost every time. Because of the memory fragmentation, the bufferlist crc and rebuild would be bottleneck. My implementation would move the complex logic out of journal write thread. Signed-off-by: Xinze Chi Reviewed-by: Haomai Wang --- src/os/FileJournal.cc | 129 ++++++++++++++++++++------------ src/os/FileJournal.h | 22 +++--- src/os/FileStore.cc | 16 ++-- src/os/Journal.h | 5 +- src/os/JournalingObjectStore.cc | 22 +----- src/os/JournalingObjectStore.h | 9 ++- 6 files changed, 116 insertions(+), 87 deletions(-) diff --git a/src/os/FileJournal.cc b/src/os/FileJournal.cc index 4fa419ccbf82a..7a33d313fffde 100644 --- a/src/os/FileJournal.cc +++ b/src/os/FileJournal.cc @@ -893,7 +893,7 @@ int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_ // throw out what we have so far full_state = FULL_FULL; while (!writeq_empty()) { - put_throttle(1, peek_write().bl.length()); + put_throttle(1, peek_write().orig_len); pop_write(); } print_header(header); @@ -973,54 +973,39 @@ int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64 write_item &next_write = peek_write(); uint64_t seq = next_write.seq; bufferlist &ebl = next_write.bl; - unsigned head_size = sizeof(entry_header_t); - off64_t base_size = 2*head_size + ebl.length(); - - int alignment = next_write.alignment; // we want to start ebl with this alignment - unsigned pre_pad = 0; - if (alignment >= 0) - pre_pad = ((unsigned int)alignment - (unsigned int)head_size) & ~CEPH_PAGE_MASK; - off64_t size = ROUND_UP_TO(base_size + pre_pad, header.alignment); - unsigned post_pad = size - base_size - pre_pad; + off64_t size = ebl.length(); int r = check_for_full(seq, queue_pos, size); if (r < 0) return r; // ENOSPC or EAGAIN - orig_bytes += ebl.length(); + uint32_t orig_len = next_write.orig_len; + orig_bytes += orig_len; orig_ops++; // add to write buffer dout(15) << "prepare_single_write " << orig_ops << " will write " << queue_pos << " : seq " << seq - << " len " << ebl.length() << " -> " << size - << " (head " << head_size << " pre_pad " << pre_pad - << " ebl " << ebl.length() << " post_pad " << post_pad << " tail " << head_size << ")" - << " (ebl alignment " << alignment << ")" - << dendl; + << " len " << orig_len << " -> " << size << dendl; - // add it this entry - entry_header_t h; - memset(&h, 0, sizeof(h)); - h.seq = seq; - h.pre_pad = pre_pad; - h.len = ebl.length(); - h.post_pad = post_pad; - h.make_magic(queue_pos, header.get_fsid64()); - h.crc32c = ebl.crc32c(0); - - bl.append((const char*)&h, sizeof(h)); - if (pre_pad) { - bufferptr bp = buffer::create_static(pre_pad, zero_buf); - bl.push_back(bp); - } - bl.claim_append(ebl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy - - if (h.post_pad) { - bufferptr bp = buffer::create_static(post_pad, zero_buf); - bl.push_back(bp); - } - bl.append((const char*)&h, sizeof(h)); - + unsigned seq_offset = offsetof(entry_header_t, seq); + unsigned magic1_offset = offsetof(entry_header_t, magic1); + unsigned magic2_offset = offsetof(entry_header_t, magic2); + + bufferptr headerptr = ebl.buffers().front(); + uint64_t _seq = seq; + uint64_t _queue_pos = queue_pos; + uint64_t magic2 = entry_header_t::make_magic(seq, orig_len, header.get_fsid64()); + headerptr.copy_in(seq_offset, sizeof(uint64_t), (char *)&_seq); + headerptr.copy_in(magic1_offset, sizeof(uint64_t), (char *)&_queue_pos); + headerptr.copy_in(magic2_offset, sizeof(uint64_t), (char *)&magic2); + + bufferptr footerptr = ebl.buffers().back(); + unsigned post_offset = footerptr.length() - sizeof(entry_header_t); + footerptr.copy_in(post_offset + seq_offset, sizeof(uint64_t), (char *)&_seq); + footerptr.copy_in(post_offset + magic1_offset, sizeof(uint64_t), (char *)&_queue_pos); + footerptr.copy_in(post_offset + magic2_offset, sizeof(uint64_t), (char *)&magic2); + + bl.claim_append(ebl); if (next_write.tracked_op) next_write.tracked_op->mark_event("write_thread_in_journal_buffer"); @@ -1041,8 +1026,7 @@ void FileJournal::align_bl(off64_t pos, bufferlist& bl) // make sure list segments are page aligned if (directio && (!bl.is_aligned(block_size) || !bl.is_n_align_sized(CEPH_MINIMUM_BLOCK_SIZE))) { - bl.rebuild_aligned(CEPH_MINIMUM_BLOCK_SIZE); - dout(10) << __func__ << " total memcopy: " << bl.get_memcopy_count() << dendl; + assert(0 == "bl should be align"); if ((bl.length() & (CEPH_MINIMUM_BLOCK_SIZE - 1)) != 0 || (pos & (CEPH_MINIMUM_BLOCK_SIZE - 1)) != 0) dout(0) << "rebuild_page_aligned failed, " << bl << dendl; @@ -1300,7 +1284,7 @@ void FileJournal::write_thread_entry() if (write_stop) { dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl; while (!writeq_empty()) { - put_throttle(1, peek_write().bl.length()); + put_throttle(1, peek_write().orig_len); pop_write(); } print_header(header); @@ -1576,7 +1560,60 @@ void FileJournal::check_aio_completion() } #endif -void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment, +int FileJournal::prepare_entry(list& tls, bufferlist* tbl) { + dout(10) << "prepare_entry " << tls << dendl; + unsigned data_len = 0; + int data_align = -1; // -1 indicates that we don't care about the alignment + bufferlist bl; + for (list::iterator p = tls.begin(); + p != tls.end(); ++p) { + ObjectStore::Transaction *t = *p; + if (t->get_data_length() > data_len && + (int)t->get_data_length() >= g_conf->journal_align_min_size) { + data_len = t->get_data_length(); + data_align = (t->get_data_alignment() - bl.length()) & ~CEPH_PAGE_MASK; + } + ::encode(*t, bl); + } + if (tbl->length()) { + bl.claim_append(*tbl); + } + // add it this entry + entry_header_t h; + unsigned head_size = sizeof(entry_header_t); + off64_t base_size = 2*head_size + bl.length(); + memset(&h, 0, sizeof(h)); + if (data_align >= 0) + h.pre_pad = ((unsigned int)data_align - (unsigned int)head_size) & ~CEPH_PAGE_MASK; + off64_t size = ROUND_UP_TO(base_size + h.pre_pad, header.alignment); + unsigned post_pad = size - base_size - h.pre_pad; + h.len = bl.length(); + h.post_pad = post_pad; + h.crc32c = bl.crc32c(0); + dout(10) << " len " << bl.length() << " -> " << size + << " (head " << head_size << " pre_pad " << h.pre_pad + << " bl " << bl.length() << " post_pad " << post_pad << " tail " << head_size << ")" + << " (bl alignment " << data_align << ")" + << dendl; + bufferlist ebl; + // header + ebl.append((const char*)&h, sizeof(h)); + if (h.pre_pad) { + ebl.push_back(buffer::create_static(h.pre_pad, zero_buf)); + } + // payload + ebl.claim_append(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy + if (h.post_pad) { + ebl.push_back(buffer::create_static(h.post_pad, zero_buf)); + } + // footer + ebl.append((const char*)&h, sizeof(h)); + ebl.rebuild_aligned(CEPH_MINIMUM_BLOCK_SIZE); + tbl->claim(ebl); + return h.len; +} + +void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len, Context *oncommit, TrackedOpRef osd_op) { // dump on queue @@ -1586,7 +1623,7 @@ void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment, assert(e.length() > 0); throttle_ops.take(1); - throttle_bytes.take(e.length()); + throttle_bytes.take(orig_len); if (osd_op) osd_op->mark_event("commit_queued_for_journal_write"); if (logger) { @@ -1604,7 +1641,7 @@ void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment, seq, oncommit, ceph_clock_now(g_ceph_context), osd_op)); if (writeq.empty()) writeq_cond.Signal(); - writeq.push_back(write_item(seq, e, alignment, osd_op)); + writeq.push_back(write_item(seq, e, orig_len, osd_op)); } } @@ -1737,7 +1774,7 @@ void FileJournal::committed_thru(uint64_t seq) dout(15) << " dropping committed but unwritten seq " << peek_write().seq << " len " << peek_write().bl.length() << dendl; - put_throttle(1, peek_write().bl.length()); + put_throttle(1, peek_write().orig_len); pop_write(); } diff --git a/src/os/FileJournal.h b/src/os/FileJournal.h index fbe616d301ac7..50bc8109b9e93 100644 --- a/src/os/FileJournal.h +++ b/src/os/FileJournal.h @@ -50,13 +50,13 @@ public: struct write_item { uint64_t seq; bufferlist bl; - int alignment; + uint32_t orig_len; TrackedOpRef tracked_op; - write_item(uint64_t s, bufferlist& b, int al, TrackedOpRef opref) : - seq(s), alignment(al), tracked_op(opref) { + write_item(uint64_t s, bufferlist& b, int ol, TrackedOpRef opref) : + seq(s), orig_len(ol), tracked_op(opref) { bl.claim(b, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy } - write_item() : seq(0), alignment(0) {} + write_item() : seq(0), orig_len(0) {} }; Mutex finisher_lock; @@ -88,7 +88,9 @@ public: completions.pop_front(); } - void submit_entry(uint64_t seq, bufferlist& bl, int alignment, + int prepare_entry(list& tls, bufferlist* tbl); + + void submit_entry(uint64_t seq, bufferlist& bl, uint32_t orig_len, Context *oncommit, TrackedOpRef osd_op = TrackedOpRef()); /// End protected by finisher_lock @@ -203,14 +205,13 @@ public: uint64_t magic1; uint64_t magic2; - void make_magic(off64_t pos, uint64_t fsid) { - magic1 = pos; - magic2 = fsid ^ seq ^ len; + static uint64_t make_magic(uint64_t seq, uint32_t len, uint64_t fsid) { + return (fsid ^ seq ^ len); } bool check_magic(off64_t pos, uint64_t fsid) { return - magic1 == (uint64_t)pos && - magic2 == (fsid ^ seq ^ len); + magic1 == (uint64_t)pos && + magic2 == (fsid ^ seq ^ len); } } __attribute__((__packed__, aligned(4))); @@ -220,7 +221,6 @@ private: string fn; char *zero_buf; - off64_t max_size; size_t block_size; bool directio, aio, force_aio; diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 5b8c2b5edf39b..70b7cc92fd0f8 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -1959,7 +1959,10 @@ int FileStore::queue_transactions(Sequencer *posr, list &tls, journal->throttle(); //prepare and encode transactions data out of lock bufferlist tbl; - int data_align = _op_journal_transactions_prepare(o->tls, tbl); + int orig_len = -1; + if (journal && journal->is_writeable()) { + orig_len = journal->prepare_entry(o->tls, &tbl); + } uint64_t op_num = submit_manager.op_submit_start(); o->op = op_num; @@ -1969,7 +1972,7 @@ int FileStore::queue_transactions(Sequencer *posr, list &tls, if (m_filestore_journal_parallel) { dout(5) << "queue_transactions (parallel) " << o->op << " " << o->tls << dendl; - _op_journal_transactions(tbl, data_align, o->op, ondisk, osd_op); + _op_journal_transactions(tbl, orig_len, o->op, ondisk, osd_op); // queue inside submit_manager op submission lock queue_op(osr, o); @@ -1978,7 +1981,7 @@ int FileStore::queue_transactions(Sequencer *posr, list &tls, osr->queue_journal(o->op); - _op_journal_transactions(tbl, data_align, o->op, + _op_journal_transactions(tbl, orig_len, o->op, new C_JournaledAhead(this, osr, o, ondisk), osd_op); } else { @@ -2011,7 +2014,10 @@ int FileStore::queue_transactions(Sequencer *posr, list &tls, //prepare and encode transactions data out of lock bufferlist tbl; - int data_align = _op_journal_transactions_prepare(tls, tbl); + int orig_len = -1; + if (journal && journal->is_writeable()) { + orig_len = journal->prepare_entry(tls, &tbl); + } uint64_t op = submit_manager.op_submit_start(); dout(5) << "queue_transactions (trailing journal) " << op << " " << tls << dendl; @@ -2022,7 +2028,7 @@ int FileStore::queue_transactions(Sequencer *posr, list &tls, int r = do_transactions(tls, op); if (r >= 0) { - _op_journal_transactions(tbl, data_align, op, ondisk, osd_op); + _op_journal_transactions(tbl, orig_len, op, ondisk, osd_op); } else { delete ondisk; } diff --git a/src/os/Journal.h b/src/os/Journal.h index 4f8658fb418f2..d5b918698e496 100644 --- a/src/os/Journal.h +++ b/src/os/Journal.h @@ -22,6 +22,7 @@ #include "include/Context.h" #include "common/Finisher.h" #include "common/TrackedOp.h" +#include "os/ObjectStore.h" class PerfCounters; @@ -57,7 +58,7 @@ public: // writes virtual bool is_writeable() = 0; virtual int make_writeable() = 0; - virtual void submit_entry(uint64_t seq, bufferlist& e, int alignment, + virtual void submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len, Context *oncommit, TrackedOpRef osd_op = TrackedOpRef()) = 0; virtual void commit_start(uint64_t seq) = 0; @@ -71,6 +72,8 @@ public: virtual bool should_commit_now() = 0; + virtual int prepare_entry(list& tls, bufferlist* tbl) = 0; + // reads/recovery }; diff --git a/src/os/JournalingObjectStore.cc b/src/os/JournalingObjectStore.cc index 35cf74ae6ae51..599a1b568cb65 100644 --- a/src/os/JournalingObjectStore.cc +++ b/src/os/JournalingObjectStore.cc @@ -251,7 +251,7 @@ void JournalingObjectStore::ApplyManager::commit_finish() } void JournalingObjectStore::_op_journal_transactions( - bufferlist& tbl, int data_align, uint64_t op, + bufferlist& tbl, uint32_t orig_len, uint64_t op, Context *onjournal, TrackedOpRef osd_op) { if (osd_op.get()) @@ -261,27 +261,9 @@ void JournalingObjectStore::_op_journal_transactions( dout(10) << "op_journal_transactions " << op << dendl; if (journal && journal->is_writeable()) { - journal->submit_entry(op, tbl, data_align, onjournal, osd_op); + journal->submit_entry(op, tbl, orig_len, onjournal, osd_op); } else if (onjournal) { apply_manager.add_waiter(op, onjournal); } } -int JournalingObjectStore::_op_journal_transactions_prepare( - list& tls, bufferlist& tbl) -{ - dout(10) << "_op_journal_transactions_prepare " << tls << dendl; - unsigned data_len = 0; - int data_align = -1; // -1 indicates that we don't care about the alignment - for (list::iterator p = tls.begin(); - p != tls.end(); ++p) { - ObjectStore::Transaction *t = *p; - if (t->get_data_length() > data_len && - (int)t->get_data_length() >= g_conf->journal_align_min_size) { - data_len = t->get_data_length(); - data_align = (t->get_data_alignment() - tbl.length()) & ~CEPH_PAGE_MASK; - } - ::encode(*t, tbl); - } - return data_align; -} diff --git a/src/os/JournalingObjectStore.h b/src/os/JournalingObjectStore.h index fbfa20ce0f1f5..42d13f6491a46 100644 --- a/src/os/JournalingObjectStore.h +++ b/src/os/JournalingObjectStore.h @@ -17,6 +17,7 @@ #include "ObjectStore.h" #include "Journal.h" +#include "FileJournal.h" #include "common/RWLock.h" class JournalingObjectStore : public ObjectStore { @@ -114,9 +115,7 @@ protected: void journal_write_close(); int journal_replay(uint64_t fs_op_seq); - int _op_journal_transactions_prepare( - list& tls, bufferlist& tbl); - void _op_journal_transactions(bufferlist& tls, int data_align, uint64_t op, + void _op_journal_transactions(bufferlist& tls, uint32_t orig_len, uint64_t op, Context *onjournal, TrackedOpRef osd_op); virtual int do_transactions(list& tls, uint64_t op_seq) = 0; @@ -136,7 +135,9 @@ public: finisher(g_ceph_context), apply_manager(journal, finisher), replaying(false) {} - + + ~JournalingObjectStore() { + } }; #endif -- 2.39.5