]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: open purge queue when transitioning out of standby replay
authorYan, Zheng <zyan@redhat.com>
Wed, 27 Sep 2017 11:59:47 +0000 (19:59 +0800)
committerYan, Zheng <zyan@redhat.com>
Thu, 19 Oct 2017 02:14:29 +0000 (10:14 +0800)
MDS opens the purge queue when it starts standby replay. This is
wrong because purge queue may change during standby replay.

Fixes: http://tracker.ceph.com/issues/19593
Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
(cherry picked from commit e515e84f69eeab90ea5c5831f7d3e684e48fb62e)

src/mds/MDSRank.cc
src/mds/PurgeQueue.cc
src/mds/PurgeQueue.h

index ea90712eeef2db077b47985da0b8c452f4df227b..d5cdc0dfbf66b970af31dfe3a670b149a66719fa 100644 (file)
@@ -1055,6 +1055,14 @@ void MDSRank::boot_start(BootStep step, int r)
         dout(2) << "boot_start " << step << ": opening mds log" << dendl;
         mdlog->open(gather.new_sub());
 
+       if (is_starting()) {
+         dout(2) << "boot_start " << step << ": opening purge queue" << dendl;
+         purge_queue.open(new C_IO_Wrapper(this, gather.new_sub()));
+       } else if (!standby_replaying) {
+         dout(2) << "boot_start " << step << ": opening purge queue (async)" << dendl;
+         purge_queue.open(NULL);
+       }
+
         if (mdsmap->get_tableserver() == whoami) {
           dout(2) << "boot_start " << step << ": opening snap table" << dendl;
           snapserver->set_rank(whoami);
@@ -1073,8 +1081,6 @@ void MDSRank::boot_start(BootStep step, int r)
 
         mdcache->open_mydir_inode(gather.new_sub());
 
-        purge_queue.open(new C_IO_Wrapper(this, gather.new_sub()));
-
         if (is_starting() ||
             whoami == mdsmap->get_root()) {  // load root inode off disk if we are auth
           mdcache->open_root_inode(gather.new_sub());
@@ -1087,8 +1093,17 @@ void MDSRank::boot_start(BootStep step, int r)
       break;
     case MDS_BOOT_PREPARE_LOG:
       if (is_any_replay()) {
-        dout(2) << "boot_start " << step << ": replaying mds log" << dendl;
-        mdlog->replay(new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
+       dout(2) << "boot_start " << step << ": replaying mds log" << dendl;
+       MDSGatherBuilder gather(g_ceph_context,
+           new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
+
+       if (!standby_replaying && !purge_queue.is_recovered()) {
+         dout(2) << "boot_start " << step << ": waiting for purge queue recovered" << dendl;
+         purge_queue.wait_for_recovery(new C_IO_Wrapper(this, gather.new_sub()));
+       }
+
+       mdlog->replay(gather.new_sub());
+       gather.activate();
       } else {
         dout(2) << "boot_start " << step << ": positioning at end of old mds log" << dendl;
         mdlog->append();
@@ -1244,6 +1259,9 @@ void MDSRank::standby_replay_restart()
         new C_MDS_StandbyReplayRestartFinish(
           this,
          mdlog->get_journaler()->get_read_pos()));
+
+      dout(1) << " opening purge queue (async)" << dendl;
+      purge_queue.open(NULL);
     } else {
       dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
               << " (which blacklists prior instance)" << dendl;
index f520240da3dae16647c4a808b9d2bce720bfb68f..98f245b5fd5894ee38948389da18877cd97a7e3a 100644 (file)
@@ -79,7 +79,8 @@ PurgeQueue::PurgeQueue(
     max_purge_ops(0),
     drain_initial(0),
     draining(false),
-    delayed_flush(nullptr)
+    delayed_flush(nullptr),
+    recovered(false)
 {
   assert(cct != nullptr);
   assert(on_error != nullptr);
@@ -147,11 +148,14 @@ void PurgeQueue::open(Context *completion)
 
   Mutex::Locker l(lock);
 
-  journaler.recover(new FunctionContext([this, completion](int r){
+  if (completion)
+    waiting_for_recovery.push_back(completion);
+
+  journaler.recover(new FunctionContext([this](int r){
     if (r == -ENOENT) {
       dout(1) << "Purge Queue not found, assuming this is an upgrade and "
                  "creating it." << dendl;
-      create(completion);
+      create(NULL);
     } else if (r == 0) {
       Mutex::Locker l(lock);
       dout(4) << "open complete" << dendl;
@@ -162,12 +166,13 @@ void PurgeQueue::open(Context *completion)
       if (journaler.last_committed.write_pos < journaler.get_write_pos()) {
        dout(4) << "recovering write_pos" << dendl;
        journaler.set_read_pos(journaler.last_committed.write_pos);
-       _recover(completion);
+       _recover();
        return;
       }
 
       journaler.set_writeable();
-      completion->complete(0);
+      recovered = true;
+      finish_contexts(g_ceph_context, waiting_for_recovery);
     } else {
       derr << "Error " << r << " loading Journaler" << dendl;
       on_error->complete(r);
@@ -175,8 +180,19 @@ void PurgeQueue::open(Context *completion)
   }));
 }
 
+bool PurgeQueue::is_recovered()
+{
+  Mutex::Locker l(lock);
+  return recovered;
+}
 
-void PurgeQueue::_recover(Context *completion)
+void PurgeQueue::wait_for_recovery(Context* c)
+{
+  Mutex::Locker l(lock);
+  waiting_for_recovery.push_back(c);
+}
+
+void PurgeQueue::_recover()
 {
   assert(lock.is_locked_by_me());
 
@@ -185,9 +201,9 @@ void PurgeQueue::_recover(Context *completion)
     if (!journaler.is_readable() &&
        !journaler.get_error() &&
        journaler.get_read_pos() < journaler.get_write_pos()) {
-      journaler.wait_for_readable(new FunctionContext([this, completion](int r) {
+      journaler.wait_for_readable(new FunctionContext([this](int r) {
         Mutex::Locker l(lock);
-       _recover(completion);
+       _recover();
       }));
       return;
     }
@@ -204,7 +220,8 @@ void PurgeQueue::_recover(Context *completion)
       // restore original read_pos
       journaler.set_read_pos(journaler.last_committed.expire_pos);
       journaler.set_writeable();
-      completion->complete(0);
+      recovered = true;
+      finish_contexts(g_ceph_context, waiting_for_recovery);
       return;
     }
 
@@ -219,11 +236,18 @@ void PurgeQueue::create(Context *fin)
   dout(4) << "creating" << dendl;
   Mutex::Locker l(lock);
 
+  if (fin)
+    waiting_for_recovery.push_back(fin);
+
   file_layout_t layout = file_layout_t::get_default();
   layout.pool_id = metadata_pool;
   journaler.set_writeable();
   journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
-  journaler.write_head(fin);
+  journaler.write_head(new FunctionContext([this](int r) {
+    Mutex::Locker l(lock);
+    recovered = true;
+    finish_contexts(g_ceph_context, waiting_for_recovery);
+  }));
 }
 
 /**
index aed66c94ebcdda1f9ae34c25dd96fa2eea383e01..221add80e9e461bf3999d21d2568d297c5474382 100644 (file)
@@ -113,7 +113,7 @@ protected:
   bool draining;
 
   // recover the journal write_pos (drop any partial written entry)
-  void _recover(Context *completion);
+  void _recover();
 
   /**
    * @return true if we were in a position to try and consume something:
@@ -130,6 +130,8 @@ protected:
   void _execute_item_complete(
       uint64_t expire_to);
 
+  bool recovered;
+  std::list<Context*> waiting_for_recovery;
 
 public:
   void init();
@@ -144,6 +146,9 @@ public:
   // Read the Journaler header for an existing queue and start consuming
   void open(Context *completion);
 
+  bool is_recovered();
+  void wait_for_recovery(Context *c);
+
   // Submit one entry to the work queue.  Call back when it is persisted
   // to the queue (there is no callback for when it is executed)
   void push(const PurgeItem &pi, Context *completion);