From 2e299b50de4a297fee2aec21290632336d239857 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 12 Apr 2017 16:00:18 +0800 Subject: [PATCH] osdc/Journaler: make header write_pos align to boundary of flushed entry This can speed up the process that detects and drops partial written entry in the log tail. Signed-off-by: "Yan, Zheng" (cherry picked from commit 8ae2962b79903e217fda83cea4140af64b5d6883) Conflicts: src/osdc/Journaler.cc - 8d4f6b92cba is not being backported to jewel src/osdc/Journaler.h - Journaler::Journaler initializer list is different in jewel, compared to master --- src/osdc/Journaler.cc | 36 +++++++++++++++++++++--------------- src/osdc/Journaler.h | 13 ++++++++++--- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index c57616edca47e..88208dcec0082 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -56,9 +56,10 @@ void Journaler::create(file_layout_t *l, stream_format_t const sf) journal_stream.set_format(sf); _set_layout(l); - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = - read_pos = requested_pos = received_pos = - expire_pos = trimming_pos = trimmed_pos = layout.get_period(); + prezeroing_pos = prezero_pos = write_pos = flush_pos = + safe_pos = read_pos = requested_pos = received_pos = + expire_pos = trimming_pos = trimmed_pos = + next_safe_pos = layout.get_period(); ldout(cct, 1) << "created blank journal at inode 0x" << std::hex << ino << std::dec << ", format=" << stream_format << dendl; @@ -219,7 +220,7 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish) finish->complete(-EINVAL); return; } - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = h.write_pos; expire_pos = h.expire_pos; trimmed_pos = trimming_pos = h.trimmed_pos; @@ -280,7 +281,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl) return; } - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = h.write_pos; read_pos = requested_pos = received_pos = expire_pos = h.expire_pos; trimmed_pos = trimming_pos = h.trimmed_pos; @@ -328,7 +329,7 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end, ldout(cct, 1) << "_finish_reprobe new_end = " << new_end << " (header had " << write_pos << ")." << dendl; - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = new_end; + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = new_end; state = STATE_ACTIVE; onfinish->complete(r); } @@ -355,7 +356,7 @@ void Journaler::_finish_probe_end(int r, uint64_t end) state = STATE_ACTIVE; - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = end; + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = end; out: // done. @@ -491,7 +492,6 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp) return; } - assert(start >= safe_pos); assert(start < flush_pos); // calc latency? @@ -501,12 +501,13 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp) } // adjust safe_pos - assert(pending_safe.count(start)); - pending_safe.erase(start); + auto it = pending_safe.find(start); + assert(it != pending_safe.end()); + pending_safe.erase(it); if (pending_safe.empty()) - safe_pos = flush_pos; + safe_pos = next_safe_pos; else - safe_pos = *pending_safe.begin(); + safe_pos = pending_safe.begin()->second; ldout(cct, 10) << "_finish_flush safe from " << start << ", pending_safe " << pending_safe @@ -593,6 +594,10 @@ uint64_t Journaler::append_entry(bufferlist& bl) ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro " << write_obj << " flo " << flush_obj << ")" << dendl; _do_flush(write_buf.length() - write_off); + if (write_off) { + // current entry isn't being flushed, set next_safe_pos to the end of previous entry + next_safe_pos = write_pos - wrote; + } } return write_pos; @@ -645,13 +650,14 @@ void Journaler::_do_flush(unsigned amount) SnapContext snapc; Context *onsafe = new C_Flush(this, flush_pos, now); // on COMMIT - pending_safe.insert(flush_pos); + pending_safe[flush_pos] = next_safe_pos; bufferlist write_bl; // adjust pointers if (len == write_buf.length()) { write_bl.swap(write_buf); + next_safe_pos = write_pos; } else { write_buf.splice(0, len, &write_bl); } @@ -978,7 +984,7 @@ void Journaler::_issue_read(uint64_t len) ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos << ", waiting" << dendl; assert(write_pos > requested_pos); - if (flush_pos == safe_pos) { + if (pending_safe.empty()) { _flush(NULL); } assert(flush_pos > safe_pos); @@ -1077,7 +1083,7 @@ bool Journaler::_is_readable() "adjusting write_pos to " << read_pos << dendl; // adjust write_pos - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = read_pos; + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos; assert(write_buf.length() == 0); // reset read state diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index 58269462ea516..e96ebf3592f41 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -309,6 +309,11 @@ private: uint64_t flush_pos; ///< where we will flush. if /// write_pos>flush_pos, we're buffering writes. uint64_t safe_pos; ///< what has been committed safely to disk. + + uint64_t next_safe_pos; /// start postion of the first entry that isn't + /// being fully flushed. If we don't flush any + // partial entry, it's equal to flush_pos. + bufferlist write_buf; ///< write buffer. flush_pos + /// write_buf.length() == write_pos. @@ -317,7 +322,7 @@ private: bool waiting_for_zero; interval_set pending_zero; // non-contig bits we've zeroed - std::set pending_safe; + std::map pending_safe; // flush_pos -> safe_pos // when safe through given offset std::map > waitfor_safe; @@ -409,7 +414,8 @@ public: objecter(obj), filer(objecter, f), logger(l), logger_key_lat(lkey), timer(tim), delay_flush_event(0), state(STATE_UNDEF), error(0), - prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0), safe_pos(0), + prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0), + safe_pos(0), next_safe_pos(0), write_buf_throttle(cct, "write_buf_throttle", UINT_MAX - (UINT_MAX >> 3)), waiting_for_zero(false), read_pos(0), requested_pos(0), received_pos(0), @@ -439,6 +445,7 @@ public: write_pos = 0; flush_pos = 0; safe_pos = 0; + next_safe_pos = 0; read_pos = 0; requested_pos = 0; received_pos = 0; @@ -469,7 +476,7 @@ public: void set_writeable(); void set_write_pos(int64_t p) { lock_guard l(lock); - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = p; + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = p; } void set_read_pos(int64_t p) { lock_guard l(lock); -- 2.39.5