]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: async flush for ImageReplayer
authorMykola Golub <mgolub@mirantis.com>
Sun, 20 Mar 2016 08:02:18 +0000 (10:02 +0200)
committerMykola Golub <mgolub@mirantis.com>
Mon, 21 Mar 2016 19:44:26 +0000 (21:44 +0200)
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
src/test/rbd_mirror/test_ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h

index b042f5735072830e5ef71c9485472a7a381a19c7..d31d9317af20978503c6cf6bcfb187d5e553816f 100644 (file)
@@ -256,7 +256,9 @@ public:
 
     for (int i = 0; i < 100; i++) {
       printf("m_replayer->flush()\n");
-      m_replayer->flush();
+      C_SaferCond cond;
+      m_replayer->flush(&cond);
+      ASSERT_EQ(0, cond.wait());
       get_commit_positions(&master_position, &mirror_position);
       if (master_position == mirror_position) {
        break;
index 25afe93189d9d8978a6d453620860ac32718eae4..2daf3c590da01545d0d444a78757978a0552f68b 100644 (file)
@@ -101,7 +101,9 @@ public:
   explicit FlushCommand(ImageReplayer *replayer) : replayer(replayer) {}
 
   bool call(Formatter *f, stringstream *ss) {
-    int r = replayer->flush();
+    C_SaferCond cond;
+    replayer->flush(&cond);
+    int r = cond.wait();
     if (r < 0) {
       *ss << "flush: " << cpp_strerror(r);
       return false;
@@ -625,6 +627,21 @@ void ImageReplayer::stop(Context *on_finish)
          on_finish->complete(0);
        });
 
+      m_on_finish = ctx;
+    }
+  } else if (m_state == STATE_FLUSHING_REPLAY) {
+    dout(20) << "interrupting flush" << dendl;
+
+    if (on_finish) {
+      Context *on_flush_finish = m_on_finish;
+      FunctionContext *ctx = new FunctionContext(
+       [this, on_flush_finish, on_finish](int r) {
+         if (on_flush_finish) {
+           on_flush_finish->complete(r);
+         }
+         on_finish->complete(0);
+       });
+
       m_on_finish = ctx;
     }
   } else {
@@ -736,47 +753,126 @@ void ImageReplayer::handle_replay_ready()
   m_local_replay->process(&it, on_ready, on_commit);
 }
 
-int ImageReplayer::flush()
+void ImageReplayer::flush(Context *on_finish)
 {
-  // TODO: provide async method
-
   dout(20) << "enter" << dendl;
 
+  bool start_flush = false;
+
   {
     Mutex::Locker locker(m_lock);
 
-    if (m_state != STATE_REPLAYING) {
-      return 0;
+    if (m_state == STATE_REPLAYING) {
+      assert(m_on_finish == nullptr);
+      m_on_finish = on_finish;
+
+      m_state = STATE_FLUSHING_REPLAY;
+
+      start_flush = true;
     }
+  }
 
-    m_state = STATE_FLUSHING_REPLAY;
+  if (start_flush) {
+    on_flush_local_replay_flush_start();
+  } else if (on_finish) {
+    on_finish->complete(0);
   }
+}
+
+void ImageReplayer::on_flush_local_replay_flush_start()
+{
+  dout(20) << "enter" << dendl;
+
+  FunctionContext *ctx = new FunctionContext(
+    [this](int r) {
+      on_flush_local_replay_flush_finish(r);
+    });
+
+  m_local_replay->flush(ctx);
+}
+
+void ImageReplayer::on_flush_local_replay_flush_finish(int r)
+{
+  dout(20) << "r=" << r << dendl;
 
-  C_SaferCond replay_flush_ctx;
-  m_local_replay->flush(&replay_flush_ctx);
-  int r = replay_flush_ctx.wait();
   if (r < 0) {
     derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
   }
 
-  C_SaferCond journaler_flush_ctx;
-  m_remote_journaler->flush_commit_position(&journaler_flush_ctx);
-  int r1 = journaler_flush_ctx.wait();
-  if (r1 < 0) {
+  if (on_flush_interrupted()) {
+    return;
+  }
+
+  on_flush_flush_commit_position_start(r);
+}
+
+void ImageReplayer::on_flush_flush_commit_position_start(int last_r)
+{
+
+  FunctionContext *ctx = new FunctionContext(
+    [this, last_r](int r) {
+      on_flush_flush_commit_position_finish(last_r, r);
+    });
+
+  m_remote_journaler->flush_commit_position(ctx);
+}
+
+void ImageReplayer::on_flush_flush_commit_position_finish(int last_r, int r)
+{
+  if (r < 0) {
     derr << "error flushing remote journal commit position: "
-        << cpp_strerror(r1) << dendl;
+        << cpp_strerror(r) << dendl;
+  } else {
+    r = last_r;
   }
 
+  Context *on_finish(nullptr);
+
   {
     Mutex::Locker locker(m_lock);
-    assert(m_state == STATE_FLUSHING_REPLAY);
+    if (m_state == STATE_STOPPING) {
+      r = -EINTR;
+    } else {
+      assert(m_state == STATE_FLUSHING_REPLAY);
 
-    m_state = STATE_REPLAYING;
+      m_state = STATE_REPLAYING;
+    }
+    std::swap(m_on_finish, on_finish);
+  }
+
+  dout(20) << "flush complete, r=" << r << dendl;
+
+  if (on_finish) {
+    dout(20) << "on finish complete, r=" << r << dendl;
+    on_finish->complete(r);
+  }
+}
+
+bool ImageReplayer::on_flush_interrupted()
+{
+  Context *on_finish(nullptr);
+
+  {
+    Mutex::Locker locker(m_lock);
+
+    if (m_state == STATE_FLUSHING_REPLAY) {
+      return false;
+    }
+
+    assert(m_state == STATE_STOPPING);
+
+    std::swap(m_on_finish, on_finish);
   }
 
-  dout(20) << "done" << dendl;
+  dout(20) << "flush interrupted" << dendl;
 
-  return r < 0 ? r : r1;
+  if (on_finish) {
+    int r = -EINTR;
+    dout(20) << "on finish complete, r=" << r << dendl;
+    on_finish->complete(r);
+  }
+
+  return true;
 }
 
 void ImageReplayer::handle_replay_process_ready(int r)
index 9dc26e1bf1934e97e2f6d40a2e4cef35f147ba2f..19a87c5262be9d282fdcbc2ef96a2ce67a4a7796 100644 (file)
@@ -84,7 +84,7 @@ public:
   void start(Context *on_finish = nullptr,
             const BootstrapParams *bootstrap_params = nullptr);
   void stop(Context *on_finish = nullptr);
-  int flush();
+  void flush(Context *on_finish = nullptr);
 
   virtual void handle_replay_ready();
   virtual void handle_replay_process_ready(int r);
@@ -107,7 +107,7 @@ protected:
    *    | (sync required)                       *
    *    |\-----\                                *
    *    |      |                                *
-   *    |      v                                *
+   *    |      v                        (error) *
    *    |   BOOTSTRAP_IMAGE * * * * * * * * * * *
    *    |      |                                *
    *    |      v                                *
@@ -117,21 +117,21 @@ protected:
    * REMOTE_JOURNALER_INIT  * * * * * * * * * * *
    *    |                                       *
    *    v                               (error) *
-   * LOCAL_IMAGE_OPEN (skip if not              *
+   * LOCAL_IMAGE_OPEN (skip if not  * * * * * * *
    *    |              needed                   *
    *    v                               (error) *
    * WAIT_FOR_LOCAL_JOURNAL_READY * * * * * * * *
    *    |
-   *    v
-   * <replaying>
-   *    |
-   *    v
-   * <stopping>
-   *    |
-   *    v
-   * JOURNAL_REPLAY_SHUT_DOWN
-   *    |
-   *    v
+   *    v-----------------------------------------------\
+   * <replaying> --------------> <flushing_replay>      |
+   *    |                           |                   |
+   *    v                           v                   |
+   * <stopping>                  LOCAL_REPLAY_FLUSH     |
+   *    |                           |                   |
+   *    v                           v                   |
+   * JOURNAL_REPLAY_SHUT_DOWN    FLUSH_COMMIT_POSITION  |
+   *    |                           |                   |
+   *    v                           \-------------------/
    * LOCAL_IMAGE_CLOSE
    *    |
    *    v
@@ -164,6 +164,12 @@ protected:
   virtual void on_stop_local_image_close_start();
   virtual void on_stop_local_image_close_finish(int r);
 
+  virtual void on_flush_local_replay_flush_start();
+  virtual void on_flush_local_replay_flush_finish(int r);
+  virtual void on_flush_flush_commit_position_start(int last_r);
+  virtual void on_flush_flush_commit_position_finish(int last_r, int r);
+  virtual bool on_flush_interrupted();
+
   void close_local_image(Context *on_finish); // for tests
 
 private: