From d66e8f646ab8db8fff86bc626a8a13bda4a315a9 Mon Sep 17 00:00:00 2001 From: Mykola Golub Date: Sun, 20 Mar 2016 10:02:18 +0200 Subject: [PATCH] rbd-mirror: async flush for ImageReplayer Signed-off-by: Mykola Golub --- src/test/rbd_mirror/test_ImageReplayer.cc | 4 +- src/tools/rbd_mirror/ImageReplayer.cc | 134 +++++++++++++++++++--- src/tools/rbd_mirror/ImageReplayer.h | 32 +++--- 3 files changed, 137 insertions(+), 33 deletions(-) diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc index b042f57350728..d31d9317af209 100644 --- a/src/test/rbd_mirror/test_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_ImageReplayer.cc @@ -256,7 +256,9 @@ public: for (int i = 0; i < 100; i++) { printf("m_replayer->flush()\n"); - m_replayer->flush(); + C_SaferCond cond; + m_replayer->flush(&cond); + ASSERT_EQ(0, cond.wait()); get_commit_positions(&master_position, &mirror_position); if (master_position == mirror_position) { break; diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 25afe93189d9d..2daf3c590da01 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -101,7 +101,9 @@ public: explicit FlushCommand(ImageReplayer *replayer) : replayer(replayer) {} bool call(Formatter *f, stringstream *ss) { - int r = replayer->flush(); + C_SaferCond cond; + replayer->flush(&cond); + int r = cond.wait(); if (r < 0) { *ss << "flush: " << cpp_strerror(r); return false; @@ -625,6 +627,21 @@ void ImageReplayer::stop(Context *on_finish) on_finish->complete(0); }); + m_on_finish = ctx; + } + } else if (m_state == STATE_FLUSHING_REPLAY) { + dout(20) << "interrupting flush" << dendl; + + if (on_finish) { + Context *on_flush_finish = m_on_finish; + FunctionContext *ctx = new FunctionContext( + [this, on_flush_finish, on_finish](int r) { + if (on_flush_finish) { + on_flush_finish->complete(r); + } + on_finish->complete(0); + }); + m_on_finish = ctx; } } else { @@ -736,47 +753,126 @@ void ImageReplayer::handle_replay_ready() m_local_replay->process(&it, on_ready, on_commit); } -int ImageReplayer::flush() +void ImageReplayer::flush(Context *on_finish) { - // TODO: provide async method - dout(20) << "enter" << dendl; + bool start_flush = false; + { Mutex::Locker locker(m_lock); - if (m_state != STATE_REPLAYING) { - return 0; + if (m_state == STATE_REPLAYING) { + assert(m_on_finish == nullptr); + m_on_finish = on_finish; + + m_state = STATE_FLUSHING_REPLAY; + + start_flush = true; } + } - m_state = STATE_FLUSHING_REPLAY; + if (start_flush) { + on_flush_local_replay_flush_start(); + } else if (on_finish) { + on_finish->complete(0); } +} + +void ImageReplayer::on_flush_local_replay_flush_start() +{ + dout(20) << "enter" << dendl; + + FunctionContext *ctx = new FunctionContext( + [this](int r) { + on_flush_local_replay_flush_finish(r); + }); + + m_local_replay->flush(ctx); +} + +void ImageReplayer::on_flush_local_replay_flush_finish(int r) +{ + dout(20) << "r=" << r << dendl; - C_SaferCond replay_flush_ctx; - m_local_replay->flush(&replay_flush_ctx); - int r = replay_flush_ctx.wait(); if (r < 0) { derr << "error flushing local replay: " << cpp_strerror(r) << dendl; } - C_SaferCond journaler_flush_ctx; - m_remote_journaler->flush_commit_position(&journaler_flush_ctx); - int r1 = journaler_flush_ctx.wait(); - if (r1 < 0) { + if (on_flush_interrupted()) { + return; + } + + on_flush_flush_commit_position_start(r); +} + +void ImageReplayer::on_flush_flush_commit_position_start(int last_r) +{ + + FunctionContext *ctx = new FunctionContext( + [this, last_r](int r) { + on_flush_flush_commit_position_finish(last_r, r); + }); + + m_remote_journaler->flush_commit_position(ctx); +} + +void ImageReplayer::on_flush_flush_commit_position_finish(int last_r, int r) +{ + if (r < 0) { derr << "error flushing remote journal commit position: " - << cpp_strerror(r1) << dendl; + << cpp_strerror(r) << dendl; + } else { + r = last_r; } + Context *on_finish(nullptr); + { Mutex::Locker locker(m_lock); - assert(m_state == STATE_FLUSHING_REPLAY); + if (m_state == STATE_STOPPING) { + r = -EINTR; + } else { + assert(m_state == STATE_FLUSHING_REPLAY); - m_state = STATE_REPLAYING; + m_state = STATE_REPLAYING; + } + std::swap(m_on_finish, on_finish); + } + + dout(20) << "flush complete, r=" << r << dendl; + + if (on_finish) { + dout(20) << "on finish complete, r=" << r << dendl; + on_finish->complete(r); + } +} + +bool ImageReplayer::on_flush_interrupted() +{ + Context *on_finish(nullptr); + + { + Mutex::Locker locker(m_lock); + + if (m_state == STATE_FLUSHING_REPLAY) { + return false; + } + + assert(m_state == STATE_STOPPING); + + std::swap(m_on_finish, on_finish); } - dout(20) << "done" << dendl; + dout(20) << "flush interrupted" << dendl; - return r < 0 ? r : r1; + if (on_finish) { + int r = -EINTR; + dout(20) << "on finish complete, r=" << r << dendl; + on_finish->complete(r); + } + + return true; } void ImageReplayer::handle_replay_process_ready(int r) diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 9dc26e1bf1934..19a87c5262be9 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -84,7 +84,7 @@ public: void start(Context *on_finish = nullptr, const BootstrapParams *bootstrap_params = nullptr); void stop(Context *on_finish = nullptr); - int flush(); + void flush(Context *on_finish = nullptr); virtual void handle_replay_ready(); virtual void handle_replay_process_ready(int r); @@ -107,7 +107,7 @@ protected: * | (sync required) * * |\-----\ * * | | * - * | v * + * | v (error) * * | BOOTSTRAP_IMAGE * * * * * * * * * * * * | | * * | v * @@ -117,21 +117,21 @@ protected: * REMOTE_JOURNALER_INIT * * * * * * * * * * * * | * * v (error) * - * LOCAL_IMAGE_OPEN (skip if not * + * LOCAL_IMAGE_OPEN (skip if not * * * * * * * * | needed * * v (error) * * WAIT_FOR_LOCAL_JOURNAL_READY * * * * * * * * * | - * v - * - * | - * v - * - * | - * v - * JOURNAL_REPLAY_SHUT_DOWN - * | - * v + * v-----------------------------------------------\ + * --------------> | + * | | | + * v v | + * LOCAL_REPLAY_FLUSH | + * | | | + * v v | + * JOURNAL_REPLAY_SHUT_DOWN FLUSH_COMMIT_POSITION | + * | | | + * v \-------------------/ * LOCAL_IMAGE_CLOSE * | * v @@ -164,6 +164,12 @@ protected: virtual void on_stop_local_image_close_start(); virtual void on_stop_local_image_close_finish(int r); + virtual void on_flush_local_replay_flush_start(); + virtual void on_flush_local_replay_flush_finish(int r); + virtual void on_flush_flush_commit_position_start(int last_r); + virtual void on_flush_flush_commit_position_finish(int last_r, int r); + virtual bool on_flush_interrupted(); + void close_local_image(Context *on_finish); // for tests private: -- 2.39.5