]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: fix MDLog shutdown process 5001/head
authorJohn Spray <john.spray@redhat.com>
Thu, 18 Jun 2015 10:07:46 +0000 (11:07 +0100)
committerJohn Spray <john.spray@redhat.com>
Thu, 25 Jun 2015 15:19:25 +0000 (16:19 +0100)
We must join threads before completing ::shutdown,
because otherwise these threads might try to use
torn-down resources like the objecter.

The replay/recovery threads may be blocking on
journaler calls like wait_for_readable, so we
must signal them using Journaler::shutdown.  In
order for that to be safe, we must also protect
the assignment of ::journaler from the threads
using the mds_lock.

Fixes: #11985
Signed-off-by: John Spray <john.spray@redhat.com>
src/mds/MDLog.cc
src/mds/MDLog.h

index b62ec748aa3059ce5dfceaf5d1fa86664f0cf079..cb5326bea866c153a803b501f5cc8ca9f33b810d 100644 (file)
@@ -173,7 +173,6 @@ void MDLog::open(MDSInternalContextBase *c)
 
   recovery_thread.set_completion(c);
   recovery_thread.create();
-  recovery_thread.detach();
 
   submit_thread.create();
   // either append() or replay() will follow.
@@ -213,7 +212,6 @@ void MDLog::reopen(MDSInternalContextBase *c)
 
   recovery_thread.set_completion(new C_ReopenComplete(this, c));
   recovery_thread.create();
-  recovery_thread.detach();
 }
 
 void MDLog::append()
@@ -347,7 +345,7 @@ void MDLog::_submit_thread()
 
   submit_mutex.Lock();
 
-  while (!stopping) {
+  while (!mds->stopping) {
     map<uint64_t,list<PendingEvent> >::iterator it = pending_events.begin();
     if (it == pending_events.end()) {
       submit_cond.Wait(submit_mutex);
@@ -460,19 +458,48 @@ void MDLog::cap()
 
 void MDLog::shutdown()
 {
+  assert(mds->mds_lock.is_locked_by_me());
+
   dout(5) << "shutdown" << dendl;
-  if (!submit_thread.is_started())
-    return;
+  if (submit_thread.is_started()) {
+    assert(mds->stopping);
 
-  assert(mds->mds_lock.is_locked_by_me());
-  mds->mds_lock.Unlock();
+    if (submit_thread.am_self()) {
+      // Called suicide from the thread: trust it to do no work after
+      // returning from suicide, and subsequently respect mds->stopping
+      // and fall out of its loop.
+    } else {
+      mds->mds_lock.Unlock();
+      // Because MDS::stopping is true, it's safe to drop mds_lock: nobody else
+      // picking it up will do anything with it.
+   
+      submit_mutex.Lock();
+      submit_cond.Signal();
+      submit_mutex.Unlock();
 
-  submit_mutex.Lock();
-  stopping = true;
-  submit_cond.Signal();
-  submit_mutex.Unlock();
+      mds->mds_lock.Lock();
+
+      submit_thread.join();
+    }
+  }
+
+  // Replay thread can be stuck inside e.g. Journaler::wait_for_readable,
+  // so we need to shutdown the journaler first.
+  if (journaler) {
+    journaler->shutdown();
+  }
+
+  if (replay_thread.is_started() && !replay_thread.am_self()) {
+    mds->mds_lock.Unlock();
+    replay_thread.join();
+    mds->mds_lock.Lock();
+  }
 
-  mds->mds_lock.Lock();
+  if (recovery_thread.is_started() && !recovery_thread.am_self()) {
+    mds->mds_lock.Unlock();
+    recovery_thread.join();
+    mds->mds_lock.Lock();
+  }
 }
 
 
@@ -808,7 +835,6 @@ void MDLog::replay(MDSInternalContextBase *c)
   already_replayed = true;
 
   replay_thread.create();
-  replay_thread.detach();
 }
 
 
@@ -899,6 +925,14 @@ void MDLog::_recovery_thread(MDSInternalContextBase *completion)
   /* Read the header from the front journal */
   Journaler *front_journal = new Journaler(jp.front, mds->mdsmap->get_metadata_pool(),
       CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, &mds->timer, &mds->finisher);
+
+  // Assign to ::journaler so that we can be aborted by ::shutdown while
+  // waiting for journaler recovery
+  {
+    Mutex::Locker l(mds->mds_lock);
+    journaler = front_journal;
+  }
+
   C_SaferCond recover_wait;
   front_journal->recover(&recover_wait);
   dout(4) << "Waiting for journal " << jp.front << " to recover..." << dendl;
@@ -916,25 +950,33 @@ void MDLog::_recovery_thread(MDSInternalContextBase *completion)
   if (front_journal->get_stream_format() > JOURNAL_FORMAT_MAX) {
     dout(0) << "Journal " << jp.front << " is in unknown format " << front_journal->get_stream_format()
             << ", does this MDS daemon require upgrade?" << dendl;
-    mds->mds_lock.Lock();
-    completion->complete(-EINVAL);
-    mds->mds_lock.Unlock();
+    {
+      Mutex::Locker l(mds->mds_lock);
+      if (mds->stopping) {
+        journaler = NULL;
+        delete front_journal;
+        return;
+      }
+      completion->complete(-EINVAL);
+    }
   } else if (mds->is_standby_replay() || front_journal->get_stream_format() >= g_conf->mds_journal_format) {
     /* The journal is of configured format, or we are in standbyreplay and will
      * tolerate replaying old journals until we have to go active. Use front_journal as
      * our journaler attribute and complete */
     dout(4) << "Recovered journal " << jp.front << " in format " << front_journal->get_stream_format() << dendl;
-    journaler = front_journal;
     journaler->set_write_error_handler(new C_MDL_WriteError(this));
-    mds->mds_lock.Lock();
-    completion->complete(0);
-    mds->mds_lock.Unlock();
+    {
+      Mutex::Locker l(mds->mds_lock);
+      if (mds->stopping) {
+        return;
+      }
+      completion->complete(0);
+    }
   } else {
     /* Hand off to reformat routine, which will ultimately set the
      * completion when it has done its thing */
     dout(1) << "Journal " << jp.front << " has old format "
       << front_journal->get_stream_format() << ", it will now be updated" << dendl;
-
     _reformat_journal(jp, front_journal, completion);
   }
 }
@@ -1099,7 +1141,16 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa
   old_journal->erase(&erase_waiter);
   int erase_result = erase_waiter.wait();
   assert(erase_result == 0);
-  delete old_journal;
+  {
+    Mutex::Locker l(mds->mds_lock);
+    if (mds->stopping) {
+      delete new_journal;
+      return;
+    }
+    assert(journaler == old_journal);
+    journaler = NULL;
+    delete old_journal;
+  }
 
   /* Update the pointer to reflect we're back in clean single journal state. */
   jp.back = 0;
@@ -1108,14 +1159,25 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa
 
   /* Reset the Journaler object to its default state */
   dout(1) << "Journal rewrite complete, continuing with normal startup" << dendl;
-  journaler = new_journal;
-  journaler->set_readonly();
-  journaler->set_write_error_handler(new C_MDL_WriteError(this));
+  {
+    Mutex::Locker l(mds->mds_lock);
+    if (mds->stopping) {
+      delete new_journal;
+      return;
+    }
+    journaler = new_journal;
+    journaler->set_readonly();
+    journaler->set_write_error_handler(new C_MDL_WriteError(this));
+  }
 
   /* Trigger completion */
-  mds->mds_lock.Lock();
-  completion->complete(0);
-  mds->mds_lock.Unlock();
+  {
+    Mutex::Locker l(mds->mds_lock);
+    if (mds->stopping) {
+      return;
+    }
+    completion->complete(0);
+  }
 }
 
 
@@ -1133,7 +1195,7 @@ void MDLog::_replay_thread()
           !journaler->get_error()) {
       C_SaferCond readable_waiter;
       journaler->wait_for_readable(&readable_waiter);
-      readable_waiter.wait();
+      r = readable_waiter.wait();
     }
     if (journaler->get_error()) {
       r = journaler->get_error();
@@ -1192,12 +1254,12 @@ void MDLog::_replay_thread()
       }
       break;
     }
-    
+
     if (!journaler->is_readable() &&
        journaler->get_read_pos() == journaler->get_write_pos())
       break;
     
-    assert(journaler->is_readable());
+    assert(journaler->is_readable() || mds->stopping);
     
     // read it
     uint64_t pos = journaler->get_read_pos();
@@ -1256,9 +1318,13 @@ void MDLog::_replay_thread()
       le->_segment->end = journaler->get_read_pos();
       num_events++;
 
-      mds->mds_lock.Lock();
-      le->replay(mds);
-      mds->mds_lock.Unlock();
+      {
+        Mutex::Locker l(mds->mds_lock);
+        if (mds->stopping) {
+          return;
+        }
+        le->replay(mds);
+      }
     }
     delete le;
 
@@ -1277,9 +1343,13 @@ void MDLog::_replay_thread()
   safe_pos = journaler->get_write_safe_pos();
 
   dout(10) << "_replay_thread kicking waiters" << dendl;
-  mds->mds_lock.Lock();
-  finish_contexts(g_ceph_context, waitfor_replay, r);  
-  mds->mds_lock.Unlock();
+  {
+    Mutex::Locker l(mds->mds_lock);
+    if (mds->stopping) {
+      return;
+    }
+    finish_contexts(g_ceph_context, waitfor_replay, r);  
+  }
 
   dout(10) << "_replay_thread finish" << dendl;
 }
index bc0cf413e0f246dc45ff1aeaf0bd001f34002abc..71081ea174a45f2e1ab56f72030eaaddc03fbbb0 100644 (file)
@@ -72,8 +72,6 @@ protected:
 
   bool capped;
 
-  bool stopping;
-
   // Log position which is persistent *and* for which
   // submit_entry wait_for_safe callbacks have already
   // been called.
@@ -188,7 +186,6 @@ public:
                  num_events(0), 
                  unflushed(0),
                  capped(false),
-                 stopping(false),
                  safe_pos(0),
                  journaler(0),
                  logger(0),