From 48f3c91004cc93fa5506f050291e62e5474dc757 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 12 Apr 2017 18:06:23 +0800 Subject: [PATCH] mds: drop partial entry and adjust write_pos when opening PurgeQueue At tail journal, there can be partial written entry. Before appending new entries to the journal, we need to drop any partial written entry and adjust write_pos. For mds log, partial written entry is detected and dropped when replaying the journal. For PurgeQueue journal, we don't replay the whole journal when MDS starts. Before appending new entry to the journal, we need to drop any partial written entry and adjust write_pos. Previous patch makes the journal header write_pos align to boundary of fully flushed entry. We can start finding partial written entry from the journal header write_pos. It should be fast even when the purge queue is very large. Fixes: http://tracker.ceph.com/issues/19450 Signed-off-by: "Yan, Zheng" --- src/mds/PurgeQueue.cc | 56 +++++++++++++++++++++++++++++++++++++++---- src/mds/PurgeQueue.h | 3 +++ 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/src/mds/PurgeQueue.cc b/src/mds/PurgeQueue.cc index 7bea41bfab3a0..2cf61938afcd7 100644 --- a/src/mds/PurgeQueue.cc +++ b/src/mds/PurgeQueue.cc @@ -139,17 +139,65 @@ void PurgeQueue::open(Context *completion) } else if (r == 0) { Mutex::Locker l(lock); dout(4) << "open complete" << dendl; - if (r == 0) { - journaler.set_writeable(); + + // Journaler only guarantees entries before head write_pos have been + // fully flushed. Before appending new entries, we need to find and + // drop any partial written entry. + if (journaler.last_committed.write_pos < journaler.get_write_pos()) { + dout(4) << "recovering write_pos" << dendl; + journaler.set_read_pos(journaler.last_committed.write_pos); + _recover(completion); + return; } - completion->complete(r); + + journaler.set_writeable(); + completion->complete(0); } else { derr << "Error " << r << " loading Journaler" << dendl; - on_error->complete(0); + on_error->complete(r); } })); } + +void PurgeQueue::_recover(Context *completion) +{ + assert(lock.is_locked_by_me()); + + // Journaler::is_readable() adjusts write_pos if partial entry is encountered + while (1) { + if (!journaler.is_readable() && + !journaler.get_error() && + journaler.get_read_pos() < journaler.get_write_pos()) { + journaler.wait_for_readable(new FunctionContext([this, completion](int r) { + Mutex::Locker l(lock); + _recover(completion); + })); + return; + } + + if (journaler.get_error()) { + int r = journaler.get_error(); + derr << "Error " << r << " recovering write_pos" << dendl; + on_error->complete(r); + return; + } + + if (journaler.get_read_pos() == journaler.get_write_pos()) { + dout(4) << "write_pos recovered" << dendl; + // restore original read_pos + journaler.set_read_pos(journaler.last_committed.expire_pos); + journaler.set_writeable(); + completion->complete(0); + return; + } + + bufferlist bl; + bool readable = journaler.try_read_entry(bl); + assert(readable); // we checked earlier + } +} + void PurgeQueue::create(Context *fin) { dout(4) << "creating" << dendl; diff --git a/src/mds/PurgeQueue.h b/src/mds/PurgeQueue.h index 33b8c8617998a..b9699ddf3e3d4 100644 --- a/src/mds/PurgeQueue.h +++ b/src/mds/PurgeQueue.h @@ -112,6 +112,9 @@ protected: // Has drain() ever been called on this instance? bool draining; + // recover the journal write_pos (drop any partial written entry) + void _recover(Context *completion); + /** * @return true if we were in a position to try and consume something: * does not mean we necessarily did. -- 2.39.5