From e515e84f69eeab90ea5c5831f7d3e684e48fb62e Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 27 Sep 2017 19:59:47 +0800 Subject: [PATCH] mds: open purge queue when transitioning out of standby replay MDS opens the purge queue when it starts standby replay. This is wrong because purge queue may change during standby replay. Fixes: http://tracker.ceph.com/issues/19593 Signed-off-by: "Yan, Zheng" --- src/mds/MDSRank.cc | 26 +++++++++++++++++++++---- src/mds/PurgeQueue.cc | 44 +++++++++++++++++++++++++++++++++---------- src/mds/PurgeQueue.h | 7 ++++++- 3 files changed, 62 insertions(+), 15 deletions(-) diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index ea90712eeef..d5cdc0dfbf6 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -1055,6 +1055,14 @@ void MDSRank::boot_start(BootStep step, int r) dout(2) << "boot_start " << step << ": opening mds log" << dendl; mdlog->open(gather.new_sub()); + if (is_starting()) { + dout(2) << "boot_start " << step << ": opening purge queue" << dendl; + purge_queue.open(new C_IO_Wrapper(this, gather.new_sub())); + } else if (!standby_replaying) { + dout(2) << "boot_start " << step << ": opening purge queue (async)" << dendl; + purge_queue.open(NULL); + } + if (mdsmap->get_tableserver() == whoami) { dout(2) << "boot_start " << step << ": opening snap table" << dendl; snapserver->set_rank(whoami); @@ -1073,8 +1081,6 @@ void MDSRank::boot_start(BootStep step, int r) mdcache->open_mydir_inode(gather.new_sub()); - purge_queue.open(new C_IO_Wrapper(this, gather.new_sub())); - if (is_starting() || whoami == mdsmap->get_root()) { // load root inode off disk if we are auth mdcache->open_root_inode(gather.new_sub()); @@ -1087,8 +1093,17 @@ void MDSRank::boot_start(BootStep step, int r) break; case MDS_BOOT_PREPARE_LOG: if (is_any_replay()) { - dout(2) << "boot_start " << step << ": replaying mds log" << dendl; - mdlog->replay(new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE)); + dout(2) << "boot_start " << step << ": replaying mds log" << dendl; + MDSGatherBuilder gather(g_ceph_context, + new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE)); + + if (!standby_replaying && !purge_queue.is_recovered()) { + dout(2) << "boot_start " << step << ": waiting for purge queue recovered" << dendl; + purge_queue.wait_for_recovery(new C_IO_Wrapper(this, gather.new_sub())); + } + + mdlog->replay(gather.new_sub()); + gather.activate(); } else { dout(2) << "boot_start " << step << ": positioning at end of old mds log" << dendl; mdlog->append(); @@ -1244,6 +1259,9 @@ void MDSRank::standby_replay_restart() new C_MDS_StandbyReplayRestartFinish( this, mdlog->get_journaler()->get_read_pos())); + + dout(1) << " opening purge queue (async)" << dendl; + purge_queue.open(NULL); } else { dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch() << " (which blacklists prior instance)" << dendl; diff --git a/src/mds/PurgeQueue.cc b/src/mds/PurgeQueue.cc index bdea6ac2810..e05c70ec4fc 100644 --- a/src/mds/PurgeQueue.cc +++ b/src/mds/PurgeQueue.cc @@ -79,7 +79,8 @@ PurgeQueue::PurgeQueue( max_purge_ops(0), drain_initial(0), draining(false), - delayed_flush(nullptr) + delayed_flush(nullptr), + recovered(false) { assert(cct != nullptr); assert(on_error != nullptr); @@ -147,11 +148,14 @@ void PurgeQueue::open(Context *completion) Mutex::Locker l(lock); - journaler.recover(new FunctionContext([this, completion](int r){ + if (completion) + waiting_for_recovery.push_back(completion); + + journaler.recover(new FunctionContext([this](int r){ if (r == -ENOENT) { dout(1) << "Purge Queue not found, assuming this is an upgrade and " "creating it." << dendl; - create(completion); + create(NULL); } else if (r == 0) { Mutex::Locker l(lock); dout(4) << "open complete" << dendl; @@ -162,12 +166,13 @@ void PurgeQueue::open(Context *completion) if (journaler.last_committed.write_pos < journaler.get_write_pos()) { dout(4) << "recovering write_pos" << dendl; journaler.set_read_pos(journaler.last_committed.write_pos); - _recover(completion); + _recover(); return; } journaler.set_writeable(); - completion->complete(0); + recovered = true; + finish_contexts(g_ceph_context, waiting_for_recovery); } else { derr << "Error " << r << " loading Journaler" << dendl; on_error->complete(r); @@ -175,8 +180,19 @@ void PurgeQueue::open(Context *completion) })); } +bool PurgeQueue::is_recovered() +{ + Mutex::Locker l(lock); + return recovered; +} -void PurgeQueue::_recover(Context *completion) +void PurgeQueue::wait_for_recovery(Context* c) +{ + Mutex::Locker l(lock); + waiting_for_recovery.push_back(c); +} + +void PurgeQueue::_recover() { assert(lock.is_locked_by_me()); @@ -185,9 +201,9 @@ void PurgeQueue::_recover(Context *completion) if (!journaler.is_readable() && !journaler.get_error() && journaler.get_read_pos() < journaler.get_write_pos()) { - journaler.wait_for_readable(new FunctionContext([this, completion](int r) { + journaler.wait_for_readable(new FunctionContext([this](int r) { Mutex::Locker l(lock); - _recover(completion); + _recover(); })); return; } @@ -204,7 +220,8 @@ void PurgeQueue::_recover(Context *completion) // restore original read_pos journaler.set_read_pos(journaler.last_committed.expire_pos); journaler.set_writeable(); - completion->complete(0); + recovered = true; + finish_contexts(g_ceph_context, waiting_for_recovery); return; } @@ -219,11 +236,18 @@ void PurgeQueue::create(Context *fin) dout(4) << "creating" << dendl; Mutex::Locker l(lock); + if (fin) + waiting_for_recovery.push_back(fin); + file_layout_t layout = file_layout_t::get_default(); layout.pool_id = metadata_pool; journaler.set_writeable(); journaler.create(&layout, JOURNAL_FORMAT_RESILIENT); - journaler.write_head(fin); + journaler.write_head(new FunctionContext([this](int r) { + Mutex::Locker l(lock); + recovered = true; + finish_contexts(g_ceph_context, waiting_for_recovery); + })); } /** diff --git a/src/mds/PurgeQueue.h b/src/mds/PurgeQueue.h index aed66c94ebc..221add80e9e 100644 --- a/src/mds/PurgeQueue.h +++ b/src/mds/PurgeQueue.h @@ -113,7 +113,7 @@ protected: bool draining; // recover the journal write_pos (drop any partial written entry) - void _recover(Context *completion); + void _recover(); /** * @return true if we were in a position to try and consume something: @@ -130,6 +130,8 @@ protected: void _execute_item_complete( uint64_t expire_to); + bool recovered; + std::list waiting_for_recovery; public: void init(); @@ -144,6 +146,9 @@ public: // Read the Journaler header for an existing queue and start consuming void open(Context *completion); + bool is_recovered(); + void wait_for_recovery(Context *c); + // Submit one entry to the work queue. Call back when it is persisted // to the queue (there is no callback for when it is executed) void push(const PurgeItem &pi, Context *completion); -- 2.47.3