]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: image replay now uses asynchronous journal shutdown 9309/head
authorJason Dillaman <dillaman@redhat.com>
Wed, 25 May 2016 06:31:11 +0000 (02:31 -0400)
committerJason Dillaman <dillaman@redhat.com>
Thu, 26 May 2016 12:43:18 +0000 (08:43 -0400)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/test/rbd_mirror/test_ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h

index db854beca035dbd0559339f7f4e68d9a6b3fcabd..58ed3654920d34ff8018af30ee6d1e9df6ef4611 100644 (file)
@@ -536,78 +536,3 @@ TEST_F(TestImageReplayer, NextTag)
 
   stop();
 }
-
-class ImageReplayer : public rbd::mirror::ImageReplayer<> {
-public:
-  ImageReplayer(rbd::mirror::Threads *threads,
-               rbd::mirror::RadosRef local, rbd::mirror::RadosRef remote,
-               const std::string &local_mirror_uuid,
-                const std::string &remote_mirror_uuid,
-                int64_t local_pool_id,
-               int64_t remote_pool_id, const std::string &remote_image_id,
-                const std::string &global_image_id)
-    : rbd::mirror::ImageReplayer<>(threads, local, remote, local_mirror_uuid,
-                                  remote_mirror_uuid, local_pool_id,
-                                   remote_pool_id, remote_image_id,
-                                   global_image_id)
-    {}
-
-  void set_error(const std::string &state, int r) {
-    m_errors[state] = r;
-  }
-
-  int get_error(const std::string &state) const {
-    std::map<std::string, int>::const_iterator i = m_errors.find(state);
-    return i == m_errors.end() ? 0 : i->second;
-  }
-
-protected:
-  virtual void on_stop_journal_replay_shut_down_finish(int r) {
-    ASSERT_EQ(0, r);
-    rbd::mirror::ImageReplayer<>::on_stop_journal_replay_shut_down_finish(
-      get_error("on_stop_journal_replay_shut_down"));
-  }
-
-  virtual void on_stop_local_image_close_finish(int r) {
-    ASSERT_EQ(0, r);
-    rbd::mirror::ImageReplayer<>::on_stop_local_image_close_finish(
-      get_error("on_stop_local_image_close"));
-  }
-
-private:
-  std::map<std::string, int> m_errors;
-};
-
-#define TEST_ON_START_ERROR(state) \
-TEST_F(TestImageReplayer, Error_on_start_##state)                      \
-{                                                                      \
-  create_replayer<ImageReplayer>();                                    \
-  reinterpret_cast<ImageReplayer *>(m_replayer)->                      \
-    set_error("on_start_" #state, -1);                                 \
-  rbd::mirror::ImageReplayer<>::BootstrapParams                                \
-    bootstap_params(m_image_name);                                     \
-  C_SaferCond cond;                                                    \
-  m_replayer->start(&cond, &bootstap_params);                          \
-  ASSERT_EQ(-1, cond.wait());                                          \
-}
-
-#define TEST_ON_STOP_ERROR(state) \
-TEST_F(TestImageReplayer, Error_on_stop_##state)                       \
-{                                                                      \
-  create_replayer<ImageReplayer>();                                    \
-  reinterpret_cast<ImageReplayer *>(m_replayer)->                      \
-    set_error("on_stop_" #state, -1);                                  \
-  rbd::mirror::ImageReplayer<>::BootstrapParams                                \
-    bootstap_params(m_image_name);                                     \
-  start(&bootstap_params);                                             \
-  /* TODO: investigate: without wait below I observe: */               \
-  /* librbd/journal/Replay.cc: 70: FAILED assert(m_op_events.empty()) */\
-  wait_for_replay_complete();                                          \
-  C_SaferCond cond;                                                    \
-  m_replayer->stop(&cond);                                             \
-  ASSERT_EQ(0, cond.wait());                                           \
-}
-
-TEST_ON_STOP_ERROR(journal_replay_shut_down);
-TEST_ON_STOP_ERROR(no_error);
-
index 392eeecd836da7a19d542b432353ce3b4177fe90..54b82285dd551cdf8d0725b70ae33d6c4d9440e3 100644 (file)
@@ -334,7 +334,7 @@ void ImageReplayer<I>::start(Context *on_finish,
   if (r < 0) {
     derr << "error opening ioctx for remote pool " << m_remote_pool_id
         << ": " << cpp_strerror(r) << dendl;
-    on_start_fail_start(r, "error opening remote pool");
+    on_start_fail(r, "error opening remote pool");
     return;
   }
 
@@ -346,7 +346,7 @@ void ImageReplayer<I>::start(Context *on_finish,
   if (r < 0) {
     derr << "error opening ioctx for local pool " << m_local_pool_id
          << ": " << cpp_strerror(r) << dendl;
-    on_start_fail_start(r, "error opening local pool");
+    on_start_fail(r, "error opening local pool");
     return;
   }
 
@@ -402,10 +402,10 @@ void ImageReplayer<I>::handle_bootstrap(int r) {
 
   if (r == -EREMOTEIO) {
     dout(5) << "remote image is non-primary or local image is primary" << dendl;
-    on_start_fail_start(0, "remote image is non-primary or local image is primary");
+    on_start_fail(0, "remote image is non-primary or local image is primary");
     return;
   } else if (r < 0) {
-    on_start_fail_start(r, "error bootstrapping replay");
+    on_start_fail(r, "error bootstrapping replay");
     return;
   } else if (on_start_interrupted()) {
     return;
@@ -449,7 +449,7 @@ void ImageReplayer<I>::handle_init_remote_journaler(int r) {
 
   if (r < 0) {
     derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
-    on_start_fail_start(r, "error initializing remote journal");
+    on_start_fail(r, "error initializing remote journal");
     return;
   } else if (on_start_interrupted()) {
     return;
@@ -466,7 +466,7 @@ void ImageReplayer<I>::start_replay() {
   if (r < 0) {
     derr << "error starting external replay on local image "
         <<  m_local_image_id << ": " << cpp_strerror(r) << dendl;
-    on_start_fail_start(r, "error starting replay on local image");
+    on_start_fail(r, "error starting replay on local image");
     return;
   }
 
@@ -503,74 +503,29 @@ void ImageReplayer<I>::start_replay() {
 }
 
 template <typename I>
-void ImageReplayer<I>::on_start_fail_start(int r, const std::string &desc)
+void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
 {
   dout(20) << "r=" << r << dendl;
+  Context *ctx = new FunctionContext([this, r, desc](int _r) {
+      Context *on_start_finish(nullptr);
+      {
+        Mutex::Locker locker(m_lock);
+        m_state = STATE_STOPPING;
+        if (r < 0 && r != -EINTR) {
+          derr << "start failed: " << cpp_strerror(r) << dendl;
+        } else {
+          dout(20) << "start interrupted" << dendl;
+        }
+        std::swap(m_on_start_finish, on_start_finish);
+      }
 
-  FunctionContext *ctx = new FunctionContext(
-    [this, r, desc](int r1) {
-      assert(r1 == 0);
       set_state_description(r, desc);
-      on_start_fail_finish(r);
+      update_mirror_image_status(false, boost::none);
+      shut_down(r, on_start_finish);
     });
-
   m_threads->work_queue->queue(ctx, 0);
 }
 
-template <typename I>
-void ImageReplayer<I>::on_start_fail_finish(int r)
-{
-  dout(20) << "r=" << r << dendl;
-
-  {
-    Mutex::Locker locker(m_lock);
-    m_state = STATE_STOPPING;
-  }
-  update_mirror_image_status(false, boost::none);
-
-  if (m_remote_journaler) {
-    if (m_remote_journaler->is_initialized()) {
-      m_remote_journaler->shut_down();
-    }
-    delete m_remote_journaler;
-    m_remote_journaler = nullptr;
-  }
-
-  if (m_local_replay) {
-    shut_down_journal_replay(true);
-    m_local_image_ctx->journal->stop_external_replay();
-    m_local_replay = nullptr;
-  }
-
-  if (m_replay_handler) {
-    delete m_replay_handler;
-    m_replay_handler = nullptr;
-  }
-
-  if (m_local_image_ctx) {
-    // TODO: switch to async close via CloseImageRequest
-    m_local_image_ctx->state->close();
-    m_local_image_ctx = nullptr;
-  }
-
-  Context *on_start_finish(nullptr);
-  Context *on_stop_finish(nullptr);
-  {
-    Mutex::Locker locker(m_lock);
-    if (r < 0 && r != -EINTR) {
-      derr << "start failed: " << cpp_strerror(r) << dendl;
-    } else {
-      dout(20) << "start interrupted" << dendl;
-    }
-
-    std::swap(m_on_start_finish, on_start_finish);
-    std::swap(m_on_stop_finish, on_stop_finish);
-  }
-
-  update_mirror_image_status(true, STATE_STOPPED);
-  handle_stop(r, on_start_finish, on_stop_finish);
-}
-
 template <typename I>
 bool ImageReplayer<I>::on_start_interrupted()
 {
@@ -580,7 +535,7 @@ bool ImageReplayer<I>::on_start_interrupted()
     return false;
   }
 
-  on_start_fail_start(-EINTR);
+  on_start_fail(-EINTR);
   return true;
 }
 
@@ -621,98 +576,29 @@ void ImageReplayer<I>::stop(Context *on_finish, bool manual)
   }
 
   if (shut_down_replay) {
-    on_stop_journal_replay_shut_down_start();
+    on_stop_journal_replay();
   } else if (on_finish != nullptr) {
     on_finish->complete(0);
   }
 }
 
 template <typename I>
-void ImageReplayer<I>::on_stop_journal_replay_shut_down_start()
+void ImageReplayer<I>::on_stop_journal_replay()
 {
   dout(20) << "enter" << dendl;
 
-  FunctionContext *ctx = new FunctionContext(
-    [this](int r) {
-      on_stop_journal_replay_shut_down_finish(r);
-    });
-
   {
     Mutex::Locker locker(m_lock);
-
-    // as we complete in-flight records, we might receive multiple stop requests
     if (m_state != STATE_REPLAYING) {
+      // might be invoked multiple times while stopping
       return;
     }
     m_state = STATE_STOPPING;
-    m_local_replay->shut_down(false, ctx);
   }
 
   set_state_description(0, "");
   update_mirror_image_status(false, boost::none);
-}
-
-template <typename I>
-void ImageReplayer<I>::on_stop_journal_replay_shut_down_finish(int r)
-{
-  dout(20) << "r=" << r << dendl;
-  if (r < 0) {
-    derr << "error flushing journal replay: " << cpp_strerror(r) << dendl;
-  }
-
-  {
-    Mutex::Locker locker(m_lock);
-    assert(m_state == STATE_STOPPING);
-    m_local_image_ctx->journal->stop_external_replay();
-    m_local_replay = nullptr;
-    m_replay_entry = ReplayEntry();
-    m_replay_tag_valid = false;
-  }
-
-  on_stop_local_image_close_start();
-}
-
-template <typename I>
-void ImageReplayer<I>::on_stop_local_image_close_start()
-{
-  dout(20) << "enter" << dendl;
-
-  // close and delete the image (from outside the image's thread context)
-  Context *ctx = create_context_callback<
-    ImageReplayer, &ImageReplayer<I>::on_stop_local_image_close_finish>(this);
-  CloseImageRequest<I> *request = CloseImageRequest<I>::create(
-    &m_local_image_ctx, m_threads->work_queue, false, ctx);
-  request->send();
-}
-
-template <typename I>
-void ImageReplayer<I>::on_stop_local_image_close_finish(int r)
-{
-  dout(20) << "r=" << r << dendl;
-  if (r < 0) {
-    derr << "error closing local image: " << cpp_strerror(r) << dendl;
-  }
-
-  delete m_replay_status_formatter;
-  m_replay_status_formatter = nullptr;
-
-  m_remote_journaler->stop_replay();
-  m_remote_journaler->shut_down();
-  delete m_remote_journaler;
-  m_remote_journaler = nullptr;
-
-  delete m_replay_handler;
-  m_replay_handler = nullptr;
-
-  Context *on_finish(nullptr);
-  {
-    Mutex::Locker locker(m_lock);
-    assert(m_state == STATE_STOPPING);
-    std::swap(m_on_stop_finish, on_finish);
-  }
-
-  update_mirror_image_status(true, STATE_STOPPED);
-  handle_stop(r, nullptr, on_finish);
+  shut_down(0, nullptr);
 }
 
 template <typename I>
@@ -842,7 +728,7 @@ bool ImageReplayer<I>::on_replay_interrupted()
   }
 
   if (shut_down) {
-    on_stop_journal_replay_shut_down_start();
+    on_stop_journal_replay();
   }
   return shut_down;
 }
@@ -1023,17 +909,6 @@ void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry& replay_entry
   m_remote_journaler->committed(replay_entry);
 }
 
-template <typename I>
-void ImageReplayer<I>::shut_down_journal_replay(bool cancel_ops)
-{
-  C_SaferCond cond;
-  m_local_replay->shut_down(cancel_ops, &cond);
-  int r = cond.wait();
-  if (r < 0) {
-    derr << "error flushing journal replay: " << cpp_strerror(r) << dendl;
-  }
-}
-
 template <typename I>
 bool ImageReplayer<I>::update_mirror_image_status(bool force,
                                                   const OptionalState &state) {
@@ -1253,9 +1128,67 @@ void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
 }
 
 template <typename I>
-void ImageReplayer<I>::handle_stop(int r, Context *on_start, Context *on_stop) {
+void ImageReplayer<I>::shut_down(int r, Context *on_start) {
+  dout(20) << "r=" << r << dendl;
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_state == STATE_STOPPING);
+  }
+
+  // chain the shut down sequence (reverse order)
+  Context *ctx = new FunctionContext(
+    [this, r, on_start](int _r) {
+      update_mirror_image_status(true, STATE_STOPPED);
+      handle_shut_down(r, on_start);
+    });
+  if (m_local_image_ctx) {
+    ctx = new FunctionContext([this, ctx](int r) {
+      CloseImageRequest<I> *request = CloseImageRequest<I>::create(
+        &m_local_image_ctx, m_threads->work_queue, false, ctx);
+      request->send();
+    });
+  }
+  if (m_local_replay != nullptr) {
+    ctx = new FunctionContext([this, ctx](int r) {
+        if (r < 0) {
+          derr << "error flushing journal replay: " << cpp_strerror(r) << dendl;
+        }
+        m_local_image_ctx->journal->stop_external_replay();
+        m_local_replay = nullptr;
+        ctx->complete(0);
+      });
+    ctx = new FunctionContext([this, ctx](int r) {
+        m_local_replay->shut_down(true, ctx);
+      });
+  }
+  if (m_remote_journaler != nullptr) {
+    ctx = new FunctionContext([this, ctx](int r) {
+        delete m_remote_journaler;
+        m_remote_journaler = nullptr;
+        ctx->complete(0);
+      });
+    ctx = new FunctionContext([this, ctx](int r) {
+        m_remote_journaler->shut_down(ctx);
+      });
+  }
+  if (m_replay_handler != nullptr) {
+    ctx = new FunctionContext([this, ctx](int r) {
+        delete m_replay_handler;
+        m_replay_handler = nullptr;
+        ctx->complete(0);
+      });
+    ctx = new FunctionContext([this, ctx](int r) {
+        m_remote_journaler->stop_replay(ctx);
+      });
+  }
+  m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_shut_down(int r, Context *on_start) {
   reschedule_update_status_task(-1);
 
+  Context *on_stop = nullptr;
   {
     Mutex::Locker locker(m_lock);
 
@@ -1265,13 +1198,15 @@ void ImageReplayer<I>::handle_stop(int r, Context *on_start, Context *on_stop) {
       dout(20) << "waiting for in-flight status update" << dendl;
       assert(m_on_update_status_finish == nullptr);
       m_on_update_status_finish = new FunctionContext(
-        [this, r, on_start, on_stop](int r) {
-          handle_stop(r, on_start, on_stop);
+        [this, r, on_start](int r) {
+          handle_shut_down(r, on_start);
         });
       return;
     }
 
+    std::swap(on_stop, m_on_stop_finish);
     m_stop_requested = false;
+    assert(m_state == STATE_STOPPING);
     m_state = STATE_STOPPED;
     m_state_desc.clear();
     m_last_r = 0;
@@ -1281,6 +1216,9 @@ void ImageReplayer<I>::handle_stop(int r, Context *on_start, Context *on_stop) {
   m_local_ioctx.close();
   m_remote_ioctx.close();
 
+  delete m_replay_status_formatter;
+  m_replay_status_formatter = nullptr;
+
   if (on_start != nullptr) {
     dout(20) << "on start finish complete, r=" << r << dendl;
     on_start->complete(r);
index 280218350776ee781d738d22135d55152e7129c8..82c9c19b7777f9ec814dcdc6c629337708350c43 100644 (file)
@@ -176,14 +176,10 @@ protected:
    * @endverbatim
    */
 
-  virtual void on_start_fail_start(int r, const std::string &desc = "");
-  virtual void on_start_fail_finish(int r);
+  virtual void on_start_fail(int r, const std::string &desc = "");
   virtual bool on_start_interrupted();
 
-  virtual void on_stop_journal_replay_shut_down_start();
-  virtual void on_stop_journal_replay_shut_down_finish(int r);
-  virtual void on_stop_local_image_close_start();
-  virtual void on_stop_local_image_close_finish(int r);
+  virtual void on_stop_journal_replay();
 
   virtual void on_flush_local_replay_flush_start(Context *on_flush);
   virtual void on_flush_local_replay_flush_finish(Context *on_flush, int r);
@@ -280,8 +276,6 @@ private:
     return !is_stopped_() && m_state != STATE_STOPPING && !m_stop_requested;
   }
 
-  void shut_down_journal_replay(bool cancel_ops);
-
   bool update_mirror_image_status(bool force, const OptionalState &state);
   bool start_mirror_image_status_update(bool force, bool restarting);
   void finish_mirror_image_status_update();
@@ -290,7 +284,8 @@ private:
   void handle_mirror_status_update(int r);
   void reschedule_update_status_task(int new_interval = 0);
 
-  void handle_stop(int r, Context *on_start, Context *on_stop);
+  void shut_down(int r, Context *on_start);
+  void handle_shut_down(int r, Context *on_start);
 
   void bootstrap();
   void handle_bootstrap(int r);