recovery_thread.set_completion(c);
recovery_thread.create();
- recovery_thread.detach();
submit_thread.create();
// either append() or replay() will follow.
recovery_thread.set_completion(new C_ReopenComplete(this, c));
recovery_thread.create();
- recovery_thread.detach();
}
void MDLog::append()
submit_mutex.Lock();
- while (!stopping) {
+ while (!mds->stopping) {
map<uint64_t,list<PendingEvent> >::iterator it = pending_events.begin();
if (it == pending_events.end()) {
submit_cond.Wait(submit_mutex);
void MDLog::shutdown()
{
+ assert(mds->mds_lock.is_locked_by_me());
+
dout(5) << "shutdown" << dendl;
- if (!submit_thread.is_started())
- return;
+ if (submit_thread.is_started()) {
+ assert(mds->stopping);
- assert(mds->mds_lock.is_locked_by_me());
- mds->mds_lock.Unlock();
+ if (submit_thread.am_self()) {
+ // Called suicide from the thread: trust it to do no work after
+ // returning from suicide, and subsequently respect mds->stopping
+ // and fall out of its loop.
+ } else {
+ mds->mds_lock.Unlock();
+ // Because MDS::stopping is true, it's safe to drop mds_lock: nobody else
+ // picking it up will do anything with it.
+
+ submit_mutex.Lock();
+ submit_cond.Signal();
+ submit_mutex.Unlock();
- submit_mutex.Lock();
- stopping = true;
- submit_cond.Signal();
- submit_mutex.Unlock();
+ mds->mds_lock.Lock();
+
+ submit_thread.join();
+ }
+ }
+
+ // Replay thread can be stuck inside e.g. Journaler::wait_for_readable,
+ // so we need to shutdown the journaler first.
+ if (journaler) {
+ journaler->shutdown();
+ }
+
+ if (replay_thread.is_started() && !replay_thread.am_self()) {
+ mds->mds_lock.Unlock();
+ replay_thread.join();
+ mds->mds_lock.Lock();
+ }
- mds->mds_lock.Lock();
+ if (recovery_thread.is_started() && !recovery_thread.am_self()) {
+ mds->mds_lock.Unlock();
+ recovery_thread.join();
+ mds->mds_lock.Lock();
+ }
}
already_replayed = true;
replay_thread.create();
- replay_thread.detach();
}
/* Read the header from the front journal */
Journaler *front_journal = new Journaler(jp.front, mds->mdsmap->get_metadata_pool(),
CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, &mds->timer, &mds->finisher);
+
+ // Assign to ::journaler so that we can be aborted by ::shutdown while
+ // waiting for journaler recovery
+ {
+ Mutex::Locker l(mds->mds_lock);
+ journaler = front_journal;
+ }
+
C_SaferCond recover_wait;
front_journal->recover(&recover_wait);
dout(4) << "Waiting for journal " << jp.front << " to recover..." << dendl;
if (front_journal->get_stream_format() > JOURNAL_FORMAT_MAX) {
dout(0) << "Journal " << jp.front << " is in unknown format " << front_journal->get_stream_format()
<< ", does this MDS daemon require upgrade?" << dendl;
- mds->mds_lock.Lock();
- completion->complete(-EINVAL);
- mds->mds_lock.Unlock();
+ {
+ Mutex::Locker l(mds->mds_lock);
+ if (mds->stopping) {
+ journaler = NULL;
+ delete front_journal;
+ return;
+ }
+ completion->complete(-EINVAL);
+ }
} else if (mds->is_standby_replay() || front_journal->get_stream_format() >= g_conf->mds_journal_format) {
/* The journal is of configured format, or we are in standbyreplay and will
* tolerate replaying old journals until we have to go active. Use front_journal as
* our journaler attribute and complete */
dout(4) << "Recovered journal " << jp.front << " in format " << front_journal->get_stream_format() << dendl;
- journaler = front_journal;
journaler->set_write_error_handler(new C_MDL_WriteError(this));
- mds->mds_lock.Lock();
- completion->complete(0);
- mds->mds_lock.Unlock();
+ {
+ Mutex::Locker l(mds->mds_lock);
+ if (mds->stopping) {
+ return;
+ }
+ completion->complete(0);
+ }
} else {
/* Hand off to reformat routine, which will ultimately set the
* completion when it has done its thing */
dout(1) << "Journal " << jp.front << " has old format "
<< front_journal->get_stream_format() << ", it will now be updated" << dendl;
-
_reformat_journal(jp, front_journal, completion);
}
}
old_journal->erase(&erase_waiter);
int erase_result = erase_waiter.wait();
assert(erase_result == 0);
- delete old_journal;
+ {
+ Mutex::Locker l(mds->mds_lock);
+ if (mds->stopping) {
+ delete new_journal;
+ return;
+ }
+ assert(journaler == old_journal);
+ journaler = NULL;
+ delete old_journal;
+ }
/* Update the pointer to reflect we're back in clean single journal state. */
jp.back = 0;
/* Reset the Journaler object to its default state */
dout(1) << "Journal rewrite complete, continuing with normal startup" << dendl;
- journaler = new_journal;
- journaler->set_readonly();
- journaler->set_write_error_handler(new C_MDL_WriteError(this));
+ {
+ Mutex::Locker l(mds->mds_lock);
+ if (mds->stopping) {
+ delete new_journal;
+ return;
+ }
+ journaler = new_journal;
+ journaler->set_readonly();
+ journaler->set_write_error_handler(new C_MDL_WriteError(this));
+ }
/* Trigger completion */
- mds->mds_lock.Lock();
- completion->complete(0);
- mds->mds_lock.Unlock();
+ {
+ Mutex::Locker l(mds->mds_lock);
+ if (mds->stopping) {
+ return;
+ }
+ completion->complete(0);
+ }
}
!journaler->get_error()) {
C_SaferCond readable_waiter;
journaler->wait_for_readable(&readable_waiter);
- readable_waiter.wait();
+ r = readable_waiter.wait();
}
if (journaler->get_error()) {
r = journaler->get_error();
}
break;
}
-
+
if (!journaler->is_readable() &&
journaler->get_read_pos() == journaler->get_write_pos())
break;
- assert(journaler->is_readable());
+ assert(journaler->is_readable() || mds->stopping);
// read it
uint64_t pos = journaler->get_read_pos();
le->_segment->end = journaler->get_read_pos();
num_events++;
- mds->mds_lock.Lock();
- le->replay(mds);
- mds->mds_lock.Unlock();
+ {
+ Mutex::Locker l(mds->mds_lock);
+ if (mds->stopping) {
+ return;
+ }
+ le->replay(mds);
+ }
}
delete le;
safe_pos = journaler->get_write_safe_pos();
dout(10) << "_replay_thread kicking waiters" << dendl;
- mds->mds_lock.Lock();
- finish_contexts(g_ceph_context, waitfor_replay, r);
- mds->mds_lock.Unlock();
+ {
+ Mutex::Locker l(mds->mds_lock);
+ if (mds->stopping) {
+ return;
+ }
+ finish_contexts(g_ceph_context, waitfor_replay, r);
+ }
dout(10) << "_replay_thread finish" << dendl;
}