From 39cf07118583166287ef0faa1811ae8efc9bef85 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 18 Jun 2015 11:07:46 +0100 Subject: [PATCH] mds: fix MDLog shutdown process We must join threads before completing ::shutdown, because otherwise these threads might try to use torn-down resources like the objecter. The replay/recovery threads may be blocking on journaler calls like wait_for_readable, so we must signal them using Journaler::shutdown. In order for that to be safe, we must also protect the assignment of ::journaler from the threads using the mds_lock. Fixes: #11985 Signed-off-by: John Spray --- src/mds/MDLog.cc | 144 +++++++++++++++++++++++++++++++++++------------ src/mds/MDLog.h | 3 - 2 files changed, 107 insertions(+), 40 deletions(-) diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc index b62ec748aa305..cb5326bea866c 100644 --- a/src/mds/MDLog.cc +++ b/src/mds/MDLog.cc @@ -173,7 +173,6 @@ void MDLog::open(MDSInternalContextBase *c) recovery_thread.set_completion(c); recovery_thread.create(); - recovery_thread.detach(); submit_thread.create(); // either append() or replay() will follow. @@ -213,7 +212,6 @@ void MDLog::reopen(MDSInternalContextBase *c) recovery_thread.set_completion(new C_ReopenComplete(this, c)); recovery_thread.create(); - recovery_thread.detach(); } void MDLog::append() @@ -347,7 +345,7 @@ void MDLog::_submit_thread() submit_mutex.Lock(); - while (!stopping) { + while (!mds->stopping) { map >::iterator it = pending_events.begin(); if (it == pending_events.end()) { submit_cond.Wait(submit_mutex); @@ -460,19 +458,48 @@ void MDLog::cap() void MDLog::shutdown() { + assert(mds->mds_lock.is_locked_by_me()); + dout(5) << "shutdown" << dendl; - if (!submit_thread.is_started()) - return; + if (submit_thread.is_started()) { + assert(mds->stopping); - assert(mds->mds_lock.is_locked_by_me()); - mds->mds_lock.Unlock(); + if (submit_thread.am_self()) { + // Called suicide from the thread: trust it to do no work after + // returning from suicide, and subsequently respect mds->stopping + // and fall out of its loop. + } else { + mds->mds_lock.Unlock(); + // Because MDS::stopping is true, it's safe to drop mds_lock: nobody else + // picking it up will do anything with it. + + submit_mutex.Lock(); + submit_cond.Signal(); + submit_mutex.Unlock(); - submit_mutex.Lock(); - stopping = true; - submit_cond.Signal(); - submit_mutex.Unlock(); + mds->mds_lock.Lock(); + + submit_thread.join(); + } + } + + // Replay thread can be stuck inside e.g. Journaler::wait_for_readable, + // so we need to shutdown the journaler first. + if (journaler) { + journaler->shutdown(); + } + + if (replay_thread.is_started() && !replay_thread.am_self()) { + mds->mds_lock.Unlock(); + replay_thread.join(); + mds->mds_lock.Lock(); + } - mds->mds_lock.Lock(); + if (recovery_thread.is_started() && !recovery_thread.am_self()) { + mds->mds_lock.Unlock(); + recovery_thread.join(); + mds->mds_lock.Lock(); + } } @@ -808,7 +835,6 @@ void MDLog::replay(MDSInternalContextBase *c) already_replayed = true; replay_thread.create(); - replay_thread.detach(); } @@ -899,6 +925,14 @@ void MDLog::_recovery_thread(MDSInternalContextBase *completion) /* Read the header from the front journal */ Journaler *front_journal = new Journaler(jp.front, mds->mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, &mds->timer, &mds->finisher); + + // Assign to ::journaler so that we can be aborted by ::shutdown while + // waiting for journaler recovery + { + Mutex::Locker l(mds->mds_lock); + journaler = front_journal; + } + C_SaferCond recover_wait; front_journal->recover(&recover_wait); dout(4) << "Waiting for journal " << jp.front << " to recover..." << dendl; @@ -916,25 +950,33 @@ void MDLog::_recovery_thread(MDSInternalContextBase *completion) if (front_journal->get_stream_format() > JOURNAL_FORMAT_MAX) { dout(0) << "Journal " << jp.front << " is in unknown format " << front_journal->get_stream_format() << ", does this MDS daemon require upgrade?" << dendl; - mds->mds_lock.Lock(); - completion->complete(-EINVAL); - mds->mds_lock.Unlock(); + { + Mutex::Locker l(mds->mds_lock); + if (mds->stopping) { + journaler = NULL; + delete front_journal; + return; + } + completion->complete(-EINVAL); + } } else if (mds->is_standby_replay() || front_journal->get_stream_format() >= g_conf->mds_journal_format) { /* The journal is of configured format, or we are in standbyreplay and will * tolerate replaying old journals until we have to go active. Use front_journal as * our journaler attribute and complete */ dout(4) << "Recovered journal " << jp.front << " in format " << front_journal->get_stream_format() << dendl; - journaler = front_journal; journaler->set_write_error_handler(new C_MDL_WriteError(this)); - mds->mds_lock.Lock(); - completion->complete(0); - mds->mds_lock.Unlock(); + { + Mutex::Locker l(mds->mds_lock); + if (mds->stopping) { + return; + } + completion->complete(0); + } } else { /* Hand off to reformat routine, which will ultimately set the * completion when it has done its thing */ dout(1) << "Journal " << jp.front << " has old format " << front_journal->get_stream_format() << ", it will now be updated" << dendl; - _reformat_journal(jp, front_journal, completion); } } @@ -1099,7 +1141,16 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa old_journal->erase(&erase_waiter); int erase_result = erase_waiter.wait(); assert(erase_result == 0); - delete old_journal; + { + Mutex::Locker l(mds->mds_lock); + if (mds->stopping) { + delete new_journal; + return; + } + assert(journaler == old_journal); + journaler = NULL; + delete old_journal; + } /* Update the pointer to reflect we're back in clean single journal state. */ jp.back = 0; @@ -1108,14 +1159,25 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa /* Reset the Journaler object to its default state */ dout(1) << "Journal rewrite complete, continuing with normal startup" << dendl; - journaler = new_journal; - journaler->set_readonly(); - journaler->set_write_error_handler(new C_MDL_WriteError(this)); + { + Mutex::Locker l(mds->mds_lock); + if (mds->stopping) { + delete new_journal; + return; + } + journaler = new_journal; + journaler->set_readonly(); + journaler->set_write_error_handler(new C_MDL_WriteError(this)); + } /* Trigger completion */ - mds->mds_lock.Lock(); - completion->complete(0); - mds->mds_lock.Unlock(); + { + Mutex::Locker l(mds->mds_lock); + if (mds->stopping) { + return; + } + completion->complete(0); + } } @@ -1133,7 +1195,7 @@ void MDLog::_replay_thread() !journaler->get_error()) { C_SaferCond readable_waiter; journaler->wait_for_readable(&readable_waiter); - readable_waiter.wait(); + r = readable_waiter.wait(); } if (journaler->get_error()) { r = journaler->get_error(); @@ -1192,12 +1254,12 @@ void MDLog::_replay_thread() } break; } - + if (!journaler->is_readable() && journaler->get_read_pos() == journaler->get_write_pos()) break; - assert(journaler->is_readable()); + assert(journaler->is_readable() || mds->stopping); // read it uint64_t pos = journaler->get_read_pos(); @@ -1256,9 +1318,13 @@ void MDLog::_replay_thread() le->_segment->end = journaler->get_read_pos(); num_events++; - mds->mds_lock.Lock(); - le->replay(mds); - mds->mds_lock.Unlock(); + { + Mutex::Locker l(mds->mds_lock); + if (mds->stopping) { + return; + } + le->replay(mds); + } } delete le; @@ -1277,9 +1343,13 @@ void MDLog::_replay_thread() safe_pos = journaler->get_write_safe_pos(); dout(10) << "_replay_thread kicking waiters" << dendl; - mds->mds_lock.Lock(); - finish_contexts(g_ceph_context, waitfor_replay, r); - mds->mds_lock.Unlock(); + { + Mutex::Locker l(mds->mds_lock); + if (mds->stopping) { + return; + } + finish_contexts(g_ceph_context, waitfor_replay, r); + } dout(10) << "_replay_thread finish" << dendl; } diff --git a/src/mds/MDLog.h b/src/mds/MDLog.h index bc0cf413e0f24..71081ea174a45 100644 --- a/src/mds/MDLog.h +++ b/src/mds/MDLog.h @@ -72,8 +72,6 @@ protected: bool capped; - bool stopping; - // Log position which is persistent *and* for which // submit_entry wait_for_safe callbacks have already // been called. @@ -188,7 +186,6 @@ public: num_events(0), unflushed(0), capped(false), - stopping(false), safe_pos(0), journaler(0), logger(0), -- 2.39.5