From fef1c8718f77c190a0908d353f38b16b7c3832ab Mon Sep 17 00:00:00 2001 From: Nathan Cutler Date: Wed, 12 Jul 2017 08:40:20 +0200 Subject: [PATCH] Revert "osdc/Journaler: make header write_pos align to boundary of flushed entry" This reverts commit 2e299b50de4a297fee2aec21290632336d239857. Signed-off-by: Nathan Cutler --- src/osdc/Journaler.cc | 36 +++++++++++++++--------------------- src/osdc/Journaler.h | 13 +++---------- 2 files changed, 18 insertions(+), 31 deletions(-) diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index 88208dcec0082..c57616edca47e 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -56,10 +56,9 @@ void Journaler::create(file_layout_t *l, stream_format_t const sf) journal_stream.set_format(sf); _set_layout(l); - prezeroing_pos = prezero_pos = write_pos = flush_pos = - safe_pos = read_pos = requested_pos = received_pos = - expire_pos = trimming_pos = trimmed_pos = - next_safe_pos = layout.get_period(); + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = + read_pos = requested_pos = received_pos = + expire_pos = trimming_pos = trimmed_pos = layout.get_period(); ldout(cct, 1) << "created blank journal at inode 0x" << std::hex << ino << std::dec << ", format=" << stream_format << dendl; @@ -220,7 +219,7 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish) finish->complete(-EINVAL); return; } - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = h.write_pos; expire_pos = h.expire_pos; trimmed_pos = trimming_pos = h.trimmed_pos; @@ -281,7 +280,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl) return; } - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = h.write_pos; read_pos = requested_pos = received_pos = expire_pos = h.expire_pos; trimmed_pos = trimming_pos = h.trimmed_pos; @@ -329,7 +328,7 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end, ldout(cct, 1) << "_finish_reprobe new_end = " << new_end << " (header had " << write_pos << ")." << dendl; - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = new_end; + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = new_end; state = STATE_ACTIVE; onfinish->complete(r); } @@ -356,7 +355,7 @@ void Journaler::_finish_probe_end(int r, uint64_t end) state = STATE_ACTIVE; - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = end; + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = end; out: // done. @@ -492,6 +491,7 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp) return; } + assert(start >= safe_pos); assert(start < flush_pos); // calc latency? @@ -501,13 +501,12 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp) } // adjust safe_pos - auto it = pending_safe.find(start); - assert(it != pending_safe.end()); - pending_safe.erase(it); + assert(pending_safe.count(start)); + pending_safe.erase(start); if (pending_safe.empty()) - safe_pos = next_safe_pos; + safe_pos = flush_pos; else - safe_pos = pending_safe.begin()->second; + safe_pos = *pending_safe.begin(); ldout(cct, 10) << "_finish_flush safe from " << start << ", pending_safe " << pending_safe @@ -594,10 +593,6 @@ uint64_t Journaler::append_entry(bufferlist& bl) ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro " << write_obj << " flo " << flush_obj << ")" << dendl; _do_flush(write_buf.length() - write_off); - if (write_off) { - // current entry isn't being flushed, set next_safe_pos to the end of previous entry - next_safe_pos = write_pos - wrote; - } } return write_pos; @@ -650,14 +645,13 @@ void Journaler::_do_flush(unsigned amount) SnapContext snapc; Context *onsafe = new C_Flush(this, flush_pos, now); // on COMMIT - pending_safe[flush_pos] = next_safe_pos; + pending_safe.insert(flush_pos); bufferlist write_bl; // adjust pointers if (len == write_buf.length()) { write_bl.swap(write_buf); - next_safe_pos = write_pos; } else { write_buf.splice(0, len, &write_bl); } @@ -984,7 +978,7 @@ void Journaler::_issue_read(uint64_t len) ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos << ", waiting" << dendl; assert(write_pos > requested_pos); - if (pending_safe.empty()) { + if (flush_pos == safe_pos) { _flush(NULL); } assert(flush_pos > safe_pos); @@ -1083,7 +1077,7 @@ bool Journaler::_is_readable() "adjusting write_pos to " << read_pos << dendl; // adjust write_pos - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos; + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = read_pos; assert(write_buf.length() == 0); // reset read state diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index e96ebf3592f41..58269462ea516 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -309,11 +309,6 @@ private: uint64_t flush_pos; ///< where we will flush. if /// write_pos>flush_pos, we're buffering writes. uint64_t safe_pos; ///< what has been committed safely to disk. - - uint64_t next_safe_pos; /// start postion of the first entry that isn't - /// being fully flushed. If we don't flush any - // partial entry, it's equal to flush_pos. - bufferlist write_buf; ///< write buffer. flush_pos + /// write_buf.length() == write_pos. @@ -322,7 +317,7 @@ private: bool waiting_for_zero; interval_set pending_zero; // non-contig bits we've zeroed - std::map pending_safe; // flush_pos -> safe_pos + std::set pending_safe; // when safe through given offset std::map > waitfor_safe; @@ -414,8 +409,7 @@ public: objecter(obj), filer(objecter, f), logger(l), logger_key_lat(lkey), timer(tim), delay_flush_event(0), state(STATE_UNDEF), error(0), - prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0), - safe_pos(0), next_safe_pos(0), + prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0), safe_pos(0), write_buf_throttle(cct, "write_buf_throttle", UINT_MAX - (UINT_MAX >> 3)), waiting_for_zero(false), read_pos(0), requested_pos(0), received_pos(0), @@ -445,7 +439,6 @@ public: write_pos = 0; flush_pos = 0; safe_pos = 0; - next_safe_pos = 0; read_pos = 0; requested_pos = 0; received_pos = 0; @@ -476,7 +469,7 @@ public: void set_writeable(); void set_write_pos(int64_t p) { lock_guard l(lock); - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = p; + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = p; } void set_read_pos(int64_t p) { lock_guard l(lock); -- 2.39.5