From 61542c14d34c4192f1468e2d7865fe913cce21b8 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Wed, 25 May 2016 02:31:11 -0400 Subject: [PATCH] rbd-mirror: image replay now uses asynchronous journal shutdown Signed-off-by: Jason Dillaman (cherry picked from commit a1b0a1b1b9a237ae363a52c7b4038b19e87052a5) --- src/test/rbd_mirror/test_ImageReplayer.cc | 75 ------- src/tools/rbd_mirror/ImageReplayer.cc | 248 ++++++++-------------- src/tools/rbd_mirror/ImageReplayer.h | 13 +- 3 files changed, 97 insertions(+), 239 deletions(-) diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc index db854beca035..58ed3654920d 100644 --- a/src/test/rbd_mirror/test_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_ImageReplayer.cc @@ -536,78 +536,3 @@ TEST_F(TestImageReplayer, NextTag) stop(); } - -class ImageReplayer : public rbd::mirror::ImageReplayer<> { -public: - ImageReplayer(rbd::mirror::Threads *threads, - rbd::mirror::RadosRef local, rbd::mirror::RadosRef remote, - const std::string &local_mirror_uuid, - const std::string &remote_mirror_uuid, - int64_t local_pool_id, - int64_t remote_pool_id, const std::string &remote_image_id, - const std::string &global_image_id) - : rbd::mirror::ImageReplayer<>(threads, local, remote, local_mirror_uuid, - remote_mirror_uuid, local_pool_id, - remote_pool_id, remote_image_id, - global_image_id) - {} - - void set_error(const std::string &state, int r) { - m_errors[state] = r; - } - - int get_error(const std::string &state) const { - std::map::const_iterator i = m_errors.find(state); - return i == m_errors.end() ? 0 : i->second; - } - -protected: - virtual void on_stop_journal_replay_shut_down_finish(int r) { - ASSERT_EQ(0, r); - rbd::mirror::ImageReplayer<>::on_stop_journal_replay_shut_down_finish( - get_error("on_stop_journal_replay_shut_down")); - } - - virtual void on_stop_local_image_close_finish(int r) { - ASSERT_EQ(0, r); - rbd::mirror::ImageReplayer<>::on_stop_local_image_close_finish( - get_error("on_stop_local_image_close")); - } - -private: - std::map m_errors; -}; - -#define TEST_ON_START_ERROR(state) \ -TEST_F(TestImageReplayer, Error_on_start_##state) \ -{ \ - create_replayer(); \ - reinterpret_cast(m_replayer)-> \ - set_error("on_start_" #state, -1); \ - rbd::mirror::ImageReplayer<>::BootstrapParams \ - bootstap_params(m_image_name); \ - C_SaferCond cond; \ - m_replayer->start(&cond, &bootstap_params); \ - ASSERT_EQ(-1, cond.wait()); \ -} - -#define TEST_ON_STOP_ERROR(state) \ -TEST_F(TestImageReplayer, Error_on_stop_##state) \ -{ \ - create_replayer(); \ - reinterpret_cast(m_replayer)-> \ - set_error("on_stop_" #state, -1); \ - rbd::mirror::ImageReplayer<>::BootstrapParams \ - bootstap_params(m_image_name); \ - start(&bootstap_params); \ - /* TODO: investigate: without wait below I observe: */ \ - /* librbd/journal/Replay.cc: 70: FAILED assert(m_op_events.empty()) */\ - wait_for_replay_complete(); \ - C_SaferCond cond; \ - m_replayer->stop(&cond); \ - ASSERT_EQ(0, cond.wait()); \ -} - -TEST_ON_STOP_ERROR(journal_replay_shut_down); -TEST_ON_STOP_ERROR(no_error); - diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index d4735fb30fee..4e88fb2b7446 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -334,7 +334,7 @@ void ImageReplayer::start(Context *on_finish, if (r < 0) { derr << "error opening ioctx for remote pool " << m_remote_pool_id << ": " << cpp_strerror(r) << dendl; - on_start_fail_start(r, "error opening remote pool"); + on_start_fail(r, "error opening remote pool"); return; } @@ -346,7 +346,7 @@ void ImageReplayer::start(Context *on_finish, if (r < 0) { derr << "error opening ioctx for local pool " << m_local_pool_id << ": " << cpp_strerror(r) << dendl; - on_start_fail_start(r, "error opening local pool"); + on_start_fail(r, "error opening local pool"); return; } @@ -402,10 +402,10 @@ void ImageReplayer::handle_bootstrap(int r) { if (r == -EREMOTEIO) { dout(5) << "remote image is non-primary or local image is primary" << dendl; - on_start_fail_start(0, "remote image is non-primary or local image is primary"); + on_start_fail(0, "remote image is non-primary or local image is primary"); return; } else if (r < 0) { - on_start_fail_start(r, "error bootstrapping replay"); + on_start_fail(r, "error bootstrapping replay"); return; } else if (on_start_interrupted()) { return; @@ -449,7 +449,7 @@ void ImageReplayer::handle_init_remote_journaler(int r) { if (r < 0) { derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl; - on_start_fail_start(r, "error initializing remote journal"); + on_start_fail(r, "error initializing remote journal"); return; } else if (on_start_interrupted()) { return; @@ -466,7 +466,7 @@ void ImageReplayer::start_replay() { if (r < 0) { derr << "error starting external replay on local image " << m_local_image_id << ": " << cpp_strerror(r) << dendl; - on_start_fail_start(r, "error starting replay on local image"); + on_start_fail(r, "error starting replay on local image"); return; } @@ -504,74 +504,29 @@ void ImageReplayer::start_replay() { } template -void ImageReplayer::on_start_fail_start(int r, const std::string &desc) +void ImageReplayer::on_start_fail(int r, const std::string &desc) { dout(20) << "r=" << r << dendl; + Context *ctx = new FunctionContext([this, r, desc](int _r) { + Context *on_start_finish(nullptr); + { + Mutex::Locker locker(m_lock); + m_state = STATE_STOPPING; + if (r < 0 && r != -EINTR) { + derr << "start failed: " << cpp_strerror(r) << dendl; + } else { + dout(20) << "start interrupted" << dendl; + } + std::swap(m_on_start_finish, on_start_finish); + } - FunctionContext *ctx = new FunctionContext( - [this, r, desc](int r1) { - assert(r1 == 0); set_state_description(r, desc); - on_start_fail_finish(r); + update_mirror_image_status(false, boost::none); + shut_down(r, on_start_finish); }); - m_threads->work_queue->queue(ctx, 0); } -template -void ImageReplayer::on_start_fail_finish(int r) -{ - dout(20) << "r=" << r << dendl; - - { - Mutex::Locker locker(m_lock); - m_state = STATE_STOPPING; - } - update_mirror_image_status(false, boost::none); - - if (m_remote_journaler) { - if (m_remote_journaler->is_initialized()) { - m_remote_journaler->shut_down(); - } - delete m_remote_journaler; - m_remote_journaler = nullptr; - } - - if (m_local_replay) { - shut_down_journal_replay(true); - m_local_image_ctx->journal->stop_external_replay(); - m_local_replay = nullptr; - } - - if (m_replay_handler) { - delete m_replay_handler; - m_replay_handler = nullptr; - } - - if (m_local_image_ctx) { - // TODO: switch to async close via CloseImageRequest - m_local_image_ctx->state->close(); - m_local_image_ctx = nullptr; - } - - Context *on_start_finish(nullptr); - Context *on_stop_finish(nullptr); - { - Mutex::Locker locker(m_lock); - if (r < 0 && r != -EINTR) { - derr << "start failed: " << cpp_strerror(r) << dendl; - } else { - dout(20) << "start interrupted" << dendl; - } - - std::swap(m_on_start_finish, on_start_finish); - std::swap(m_on_stop_finish, on_stop_finish); - } - - update_mirror_image_status(true, STATE_STOPPED); - handle_stop(r, on_start_finish, on_stop_finish); -} - template bool ImageReplayer::on_start_interrupted() { @@ -581,7 +536,7 @@ bool ImageReplayer::on_start_interrupted() return false; } - on_start_fail_start(-EINTR); + on_start_fail(-EINTR); return true; } @@ -622,98 +577,29 @@ void ImageReplayer::stop(Context *on_finish, bool manual) } if (shut_down_replay) { - on_stop_journal_replay_shut_down_start(); + on_stop_journal_replay(); } else if (on_finish != nullptr) { on_finish->complete(0); } } template -void ImageReplayer::on_stop_journal_replay_shut_down_start() +void ImageReplayer::on_stop_journal_replay() { dout(20) << "enter" << dendl; - FunctionContext *ctx = new FunctionContext( - [this](int r) { - on_stop_journal_replay_shut_down_finish(r); - }); - { Mutex::Locker locker(m_lock); - - // as we complete in-flight records, we might receive multiple stop requests if (m_state != STATE_REPLAYING) { + // might be invoked multiple times while stopping return; } m_state = STATE_STOPPING; - m_local_replay->shut_down(false, ctx); } set_state_description(0, ""); update_mirror_image_status(false, boost::none); -} - -template -void ImageReplayer::on_stop_journal_replay_shut_down_finish(int r) -{ - dout(20) << "r=" << r << dendl; - if (r < 0) { - derr << "error flushing journal replay: " << cpp_strerror(r) << dendl; - } - - { - Mutex::Locker locker(m_lock); - assert(m_state == STATE_STOPPING); - m_local_image_ctx->journal->stop_external_replay(); - m_local_replay = nullptr; - m_replay_entry = ReplayEntry(); - m_replay_tag_valid = false; - } - - on_stop_local_image_close_start(); -} - -template -void ImageReplayer::on_stop_local_image_close_start() -{ - dout(20) << "enter" << dendl; - - // close and delete the image (from outside the image's thread context) - Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::on_stop_local_image_close_finish>(this); - CloseImageRequest *request = CloseImageRequest::create( - &m_local_image_ctx, m_threads->work_queue, false, ctx); - request->send(); -} - -template -void ImageReplayer::on_stop_local_image_close_finish(int r) -{ - dout(20) << "r=" << r << dendl; - if (r < 0) { - derr << "error closing local image: " << cpp_strerror(r) << dendl; - } - - delete m_replay_status_formatter; - m_replay_status_formatter = nullptr; - - m_remote_journaler->stop_replay(); - m_remote_journaler->shut_down(); - delete m_remote_journaler; - m_remote_journaler = nullptr; - - delete m_replay_handler; - m_replay_handler = nullptr; - - Context *on_finish(nullptr); - { - Mutex::Locker locker(m_lock); - assert(m_state == STATE_STOPPING); - std::swap(m_on_stop_finish, on_finish); - } - - update_mirror_image_status(true, STATE_STOPPED); - handle_stop(r, nullptr, on_finish); + shut_down(0, nullptr); } template @@ -843,7 +729,7 @@ bool ImageReplayer::on_replay_interrupted() } if (shut_down) { - on_stop_journal_replay_shut_down_start(); + on_stop_journal_replay(); } return shut_down; } @@ -1024,17 +910,6 @@ void ImageReplayer::handle_process_entry_safe(const ReplayEntry& replay_entry m_remote_journaler->committed(replay_entry); } -template -void ImageReplayer::shut_down_journal_replay(bool cancel_ops) -{ - C_SaferCond cond; - m_local_replay->shut_down(cancel_ops, &cond); - int r = cond.wait(); - if (r < 0) { - derr << "error flushing journal replay: " << cpp_strerror(r) << dendl; - } -} - template bool ImageReplayer::update_mirror_image_status(bool force, const OptionalState &state) { @@ -1254,9 +1129,67 @@ void ImageReplayer::reschedule_update_status_task(int new_interval) { } template -void ImageReplayer::handle_stop(int r, Context *on_start, Context *on_stop) { +void ImageReplayer::shut_down(int r, Context *on_start) { + dout(20) << "r=" << r << dendl; + { + Mutex::Locker locker(m_lock); + assert(m_state == STATE_STOPPING); + } + + // chain the shut down sequence (reverse order) + Context *ctx = new FunctionContext( + [this, r, on_start](int _r) { + update_mirror_image_status(true, STATE_STOPPED); + handle_shut_down(r, on_start); + }); + if (m_local_image_ctx) { + ctx = new FunctionContext([this, ctx](int r) { + CloseImageRequest *request = CloseImageRequest::create( + &m_local_image_ctx, m_threads->work_queue, false, ctx); + request->send(); + }); + } + if (m_local_replay != nullptr) { + ctx = new FunctionContext([this, ctx](int r) { + if (r < 0) { + derr << "error flushing journal replay: " << cpp_strerror(r) << dendl; + } + m_local_image_ctx->journal->stop_external_replay(); + m_local_replay = nullptr; + ctx->complete(0); + }); + ctx = new FunctionContext([this, ctx](int r) { + m_local_replay->shut_down(true, ctx); + }); + } + if (m_remote_journaler != nullptr) { + ctx = new FunctionContext([this, ctx](int r) { + delete m_remote_journaler; + m_remote_journaler = nullptr; + ctx->complete(0); + }); + ctx = new FunctionContext([this, ctx](int r) { + m_remote_journaler->shut_down(ctx); + }); + } + if (m_replay_handler != nullptr) { + ctx = new FunctionContext([this, ctx](int r) { + delete m_replay_handler; + m_replay_handler = nullptr; + ctx->complete(0); + }); + ctx = new FunctionContext([this, ctx](int r) { + m_remote_journaler->stop_replay(ctx); + }); + } + m_threads->work_queue->queue(ctx, 0); +} + +template +void ImageReplayer::handle_shut_down(int r, Context *on_start) { reschedule_update_status_task(-1); + Context *on_stop = nullptr; { Mutex::Locker locker(m_lock); @@ -1266,13 +1199,15 @@ void ImageReplayer::handle_stop(int r, Context *on_start, Context *on_stop) { dout(20) << "waiting for in-flight status update" << dendl; assert(m_on_update_status_finish == nullptr); m_on_update_status_finish = new FunctionContext( - [this, r, on_start, on_stop](int r) { - handle_stop(r, on_start, on_stop); + [this, r, on_start](int r) { + handle_shut_down(r, on_start); }); return; } + std::swap(on_stop, m_on_stop_finish); m_stop_requested = false; + assert(m_state == STATE_STOPPING); m_state = STATE_STOPPED; m_state_desc.clear(); m_last_r = 0; @@ -1282,6 +1217,9 @@ void ImageReplayer::handle_stop(int r, Context *on_start, Context *on_stop) { m_local_ioctx.close(); m_remote_ioctx.close(); + delete m_replay_status_formatter; + m_replay_status_formatter = nullptr; + if (on_start != nullptr) { dout(20) << "on start finish complete, r=" << r << dendl; on_start->complete(r); diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 280218350776..82c9c19b7777 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -176,14 +176,10 @@ protected: * @endverbatim */ - virtual void on_start_fail_start(int r, const std::string &desc = ""); - virtual void on_start_fail_finish(int r); + virtual void on_start_fail(int r, const std::string &desc = ""); virtual bool on_start_interrupted(); - virtual void on_stop_journal_replay_shut_down_start(); - virtual void on_stop_journal_replay_shut_down_finish(int r); - virtual void on_stop_local_image_close_start(); - virtual void on_stop_local_image_close_finish(int r); + virtual void on_stop_journal_replay(); virtual void on_flush_local_replay_flush_start(Context *on_flush); virtual void on_flush_local_replay_flush_finish(Context *on_flush, int r); @@ -280,8 +276,6 @@ private: return !is_stopped_() && m_state != STATE_STOPPING && !m_stop_requested; } - void shut_down_journal_replay(bool cancel_ops); - bool update_mirror_image_status(bool force, const OptionalState &state); bool start_mirror_image_status_update(bool force, bool restarting); void finish_mirror_image_status_update(); @@ -290,7 +284,8 @@ private: void handle_mirror_status_update(int r); void reschedule_update_status_task(int new_interval = 0); - void handle_stop(int r, Context *on_start, Context *on_stop); + void shut_down(int r, Context *on_start); + void handle_shut_down(int r, Context *on_start); void bootstrap(); void handle_bootstrap(int r); -- 2.47.3