From: Jason Dillaman Date: Mon, 15 May 2017 14:35:46 +0000 (-0400) Subject: rbd-mirror: close local image before stopping remote journaler X-Git-Tag: ses5-milestone6~8^2~5^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8ca85fc0196acb08c936df8d7db69779c9f3da17;p=ceph.git rbd-mirror: close local image before stopping remote journaler Fixes: http://tracker.ceph.com/issues/18963 Signed-off-by: Jason Dillaman --- diff --git a/src/librbd/journal/Replay.cc b/src/librbd/journal/Replay.cc index 2d408d5b8eba..d0b585922980 100644 --- a/src/librbd/journal/Replay.cc +++ b/src/librbd/journal/Replay.cc @@ -216,6 +216,7 @@ void Replay::shut_down(bool cancel_ops, Context *on_finish) { // safely commit any remaining AIO modify operations if ((m_in_flight_aio_flush + m_in_flight_aio_modify) != 0) { flush_comp = create_aio_flush_completion(nullptr); + assert(flush_comp != nullptr); } for (auto &op_event_pair : m_op_events) { @@ -240,6 +241,9 @@ void Replay::shut_down(bool cancel_ops, Context *on_finish) { } } + assert(!m_shut_down); + m_shut_down = true; + assert(m_flush_ctx == nullptr); if (m_in_flight_op_events > 0 || flush_comp != nullptr) { std::swap(m_flush_ctx, on_finish); @@ -263,6 +267,9 @@ void Replay::flush(Context *on_finish) { Mutex::Locker locker(m_lock); aio_comp = create_aio_flush_completion( util::create_async_context_callback(m_image_ctx, on_finish)); + if (aio_comp == nullptr) { + return; + } } RWLock::RLocker owner_locker(m_image_ctx.owner_lock); @@ -318,6 +325,10 @@ void Replay::handle_event(const journal::AioDiscardEvent &event, auto aio_comp = create_aio_modify_completion(on_ready, on_safe, io::AIO_TYPE_DISCARD, &flush_required); + if (aio_comp == nullptr) { + return; + } + io::ImageRequest::aio_discard(&m_image_ctx, aio_comp, event.offset, event.length, event.skip_partial_discard, {}); @@ -326,7 +337,9 @@ void Replay::handle_event(const journal::AioDiscardEvent &event, auto flush_comp = create_aio_flush_completion(nullptr); m_lock.Unlock(); - io::ImageRequest::aio_flush(&m_image_ctx, flush_comp, {}); + if (flush_comp != nullptr) { + io::ImageRequest::aio_flush(&m_image_ctx, flush_comp, {}); + } } } @@ -341,6 +354,10 @@ void Replay::handle_event(const journal::AioWriteEvent &event, auto aio_comp = create_aio_modify_completion(on_ready, on_safe, io::AIO_TYPE_WRITE, &flush_required); + if (aio_comp == nullptr) { + return; + } + io::ImageRequest::aio_write(&m_image_ctx, aio_comp, {{event.offset, event.length}}, std::move(data), 0, {}); @@ -349,7 +366,9 @@ void Replay::handle_event(const journal::AioWriteEvent &event, auto flush_comp = create_aio_flush_completion(nullptr); m_lock.Unlock(); - io::ImageRequest::aio_flush(&m_image_ctx, flush_comp, {}); + if (flush_comp != nullptr) { + io::ImageRequest::aio_flush(&m_image_ctx, flush_comp, {}); + } } } @@ -364,8 +383,10 @@ void Replay::handle_event(const journal::AioFlushEvent &event, Mutex::Locker locker(m_lock); aio_comp = create_aio_flush_completion(on_safe); } - io::ImageRequest::aio_flush(&m_image_ctx, aio_comp, {}); + if (aio_comp != nullptr) { + io::ImageRequest::aio_flush(&m_image_ctx, aio_comp, {}); + } on_ready->complete(0); } @@ -380,6 +401,10 @@ void Replay::handle_event(const journal::AioWriteSameEvent &event, auto aio_comp = create_aio_modify_completion(on_ready, on_safe, io::AIO_TYPE_WRITESAME, &flush_required); + if (aio_comp == nullptr) { + return; + } + io::ImageRequest::aio_writesame(&m_image_ctx, aio_comp, event.offset, event.length, std::move(data), 0, {}); if (flush_required) { @@ -387,7 +412,9 @@ void Replay::handle_event(const journal::AioWriteSameEvent &event, auto flush_comp = create_aio_flush_completion(nullptr); m_lock.Unlock(); - io::ImageRequest::aio_flush(&m_image_ctx, flush_comp, {}); + if (flush_comp != nullptr) { + io::ImageRequest::aio_flush(&m_image_ctx, flush_comp, {}); + } } } @@ -861,6 +888,12 @@ Context *Replay::create_op_context_callback(uint64_t op_tid, Context *on_safe, OpEvent **op_event) { CephContext *cct = m_image_ctx.cct; + if (m_shut_down) { + ldout(cct, 5) << ": ignoring event after shut down" << dendl; + on_ready->complete(0); + m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN); + return nullptr; + } assert(m_lock.is_locked()); if (m_op_events.count(op_tid) != 0) { @@ -898,7 +931,10 @@ void Replay::handle_op_complete(uint64_t op_tid, int r) { op_event = std::move(op_it->second); m_op_events.erase(op_it); - shutting_down = (m_flush_ctx != nullptr); + if (m_shut_down) { + assert(m_flush_ctx != nullptr); + shutting_down = true; + } } assert(op_event.on_start_ready == nullptr || (r < 0 && r != -ERESTART)); @@ -958,6 +994,13 @@ Replay::create_aio_modify_completion(Context *on_ready, Context *on_safe, CephContext *cct = m_image_ctx.cct; assert(m_on_aio_ready == nullptr); + if (m_shut_down) { + ldout(cct, 5) << ": ignoring event after shut down" << dendl; + on_ready->complete(0); + m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN); + return nullptr; + } + ++m_in_flight_aio_modify; m_aio_modify_unsafe_contexts.push_back(on_safe); @@ -997,6 +1040,15 @@ template io::AioCompletion *Replay::create_aio_flush_completion(Context *on_safe) { assert(m_lock.is_locked()); + CephContext *cct = m_image_ctx.cct; + if (m_shut_down) { + ldout(cct, 5) << ": ignoring event after shut down" << dendl; + if (on_safe != nullptr) { + m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN); + } + return nullptr; + } + ++m_in_flight_aio_flush; // associate all prior write/discard ops to this flush request diff --git a/src/librbd/journal/Replay.h b/src/librbd/journal/Replay.h index d4a65e089bb2..5b2219851371 100644 --- a/src/librbd/journal/Replay.h +++ b/src/librbd/journal/Replay.h @@ -126,6 +126,7 @@ private: OpEvents m_op_events; uint64_t m_in_flight_op_events = 0; + bool m_shut_down = false; Context *m_flush_ctx = nullptr; Context *m_on_aio_ready = nullptr; diff --git a/src/test/librbd/journal/test_mock_Replay.cc b/src/test/librbd/journal/test_mock_Replay.cc index e13a156ee056..14d2a6a4b79e 100644 --- a/src/test/librbd/journal/test_mock_Replay.cc +++ b/src/test/librbd/journal/test_mock_Replay.cc @@ -1592,5 +1592,69 @@ TEST_F(TestMockJournalReplay, RefreshImageBeforeOpStart) { ASSERT_EQ(0, on_finish_safe.wait()); } +TEST_F(TestMockJournalReplay, FlushEventAfterShutDown) { + REQUIRE_FEATURE(RBD_FEATURE_JOURNALING); + + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockReplayImageCtx mock_image_ctx(*ictx); + MockJournalReplay mock_journal_replay(mock_image_ctx); + MockIoImageRequest mock_io_image_request; + expect_op_work_queue(mock_image_ctx); + + ASSERT_EQ(0, when_shut_down(mock_journal_replay, false)); + + C_SaferCond on_ready; + C_SaferCond on_safe; + when_process(mock_journal_replay, EventEntry{AioFlushEvent()}, + &on_ready, &on_safe); + ASSERT_EQ(0, on_ready.wait()); + ASSERT_EQ(-ESHUTDOWN, on_safe.wait()); +} + +TEST_F(TestMockJournalReplay, ModifyEventAfterShutDown) { + REQUIRE_FEATURE(RBD_FEATURE_JOURNALING); + + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockReplayImageCtx mock_image_ctx(*ictx); + MockJournalReplay mock_journal_replay(mock_image_ctx); + MockIoImageRequest mock_io_image_request; + expect_op_work_queue(mock_image_ctx); + + ASSERT_EQ(0, when_shut_down(mock_journal_replay, false)); + + C_SaferCond on_ready; + C_SaferCond on_safe; + when_process(mock_journal_replay, + EventEntry{AioWriteEvent(123, 456, to_bl("test"))}, + &on_ready, &on_safe); + ASSERT_EQ(0, on_ready.wait()); + ASSERT_EQ(-ESHUTDOWN, on_safe.wait()); +} + +TEST_F(TestMockJournalReplay, OpEventAfterShutDown) { + REQUIRE_FEATURE(RBD_FEATURE_JOURNALING); + + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockReplayImageCtx mock_image_ctx(*ictx); + MockJournalReplay mock_journal_replay(mock_image_ctx); + MockIoImageRequest mock_io_image_request; + expect_op_work_queue(mock_image_ctx); + + ASSERT_EQ(0, when_shut_down(mock_journal_replay, false)); + + C_SaferCond on_ready; + C_SaferCond on_safe; + when_process(mock_journal_replay, EventEntry{RenameEvent(123, "image")}, + &on_ready, &on_safe); + ASSERT_EQ(0, on_ready.wait()); + ASSERT_EQ(-ESHUTDOWN, on_safe.wait()); +} + } // namespace journal } // namespace librbd diff --git a/src/test/rbd_mirror/test_mock_ImageReplayer.cc b/src/test/rbd_mirror/test_mock_ImageReplayer.cc index 430edaa16d31..c845faacaf54 100644 --- a/src/test/rbd_mirror/test_mock_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_mock_ImageReplayer.cc @@ -385,6 +385,10 @@ public: .WillOnce(CompleteContext(r)); } + void expect_flush(MockReplay &mock_replay, int r) { + EXPECT_CALL(mock_replay, flush(_)).WillOnce(CompleteContext(r)); + } + void expect_shut_down(MockReplay &mock_replay, bool cancel_ops, int r) { EXPECT_CALL(mock_replay, shut_down(cancel_ops, _)) .WillOnce(WithArg<1>(CompleteContext(r))); @@ -514,17 +518,15 @@ TEST_F(TestMockImageReplayer, StartStop) { MockCloseImageRequest mock_close_local_image_request; - expect_stop_replay(mock_remote_journaler, 0); expect_shut_down(mock_local_replay, true, 0); - EXPECT_CALL(mock_local_journal, remove_listener(_)); EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_send(mock_close_local_image_request, 0); + expect_stop_replay(mock_remote_journaler, 0); EXPECT_CALL(mock_remote_journaler, remove_listener(_)); expect_shut_down(mock_remote_journaler, 0); - expect_send(mock_close_local_image_request, 0); - C_SaferCond stop_ctx; m_image_replayer->stop(&stop_ctx); ASSERT_EQ(0, stop_ctx.wait()); @@ -650,14 +652,12 @@ TEST_F(TestMockImageReplayer, StartExternalReplayError) { expect_start_external_replay(mock_local_journal, nullptr, -EINVAL); MockCloseImageRequest mock_close_local_image_request; - EXPECT_CALL(mock_local_journal, remove_listener(_)); + expect_send(mock_close_local_image_request, 0); EXPECT_CALL(mock_remote_journaler, remove_listener(_)); expect_shut_down(mock_remote_journaler, 0); - expect_send(mock_close_local_image_request, 0); - C_SaferCond start_ctx; m_image_replayer->start(&start_ctx); ASSERT_EQ(-EINVAL, start_ctx.wait()); @@ -706,17 +706,15 @@ TEST_F(TestMockImageReplayer, StopError) { MockCloseImageRequest mock_close_local_image_request; - expect_stop_replay(mock_remote_journaler, -EINVAL); expect_shut_down(mock_local_replay, true, -EINVAL); - EXPECT_CALL(mock_local_journal, remove_listener(_)); EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_send(mock_close_local_image_request, -EINVAL); + expect_stop_replay(mock_remote_journaler, -EINVAL); EXPECT_CALL(mock_remote_journaler, remove_listener(_)); expect_shut_down(mock_remote_journaler, -EINVAL); - expect_send(mock_close_local_image_request, -EINVAL); - C_SaferCond stop_ctx; m_image_replayer->stop(&stop_ctx); ASSERT_EQ(0, stop_ctx.wait()); @@ -806,18 +804,15 @@ TEST_F(TestMockImageReplayer, Replay) { // STOP MockCloseImageRequest mock_close_local_image_request; - - expect_stop_replay(mock_remote_journaler, 0); expect_shut_down(mock_local_replay, true, 0); - EXPECT_CALL(mock_local_journal, remove_listener(_)); EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_send(mock_close_local_image_request, 0); + expect_stop_replay(mock_remote_journaler, 0); EXPECT_CALL(mock_remote_journaler, remove_listener(_)); expect_shut_down(mock_remote_journaler, 0); - expect_send(mock_close_local_image_request, 0); - C_SaferCond stop_ctx; m_image_replayer->stop(&stop_ctx); ASSERT_EQ(0, stop_ctx.wait()); @@ -886,15 +881,10 @@ TEST_F(TestMockImageReplayer, DecodeError) { .WillOnce(Return(-EINVAL)); // stop on error - expect_stop_replay(mock_remote_journaler, 0); expect_shut_down(mock_local_replay, true, 0); - EXPECT_CALL(mock_local_journal, remove_listener(_)); EXPECT_CALL(mock_local_journal, stop_external_replay()); - EXPECT_CALL(mock_remote_journaler, remove_listener(_)); - expect_shut_down(mock_remote_journaler, 0); - MockCloseImageRequest mock_close_local_image_request; C_SaferCond close_ctx; EXPECT_CALL(mock_close_local_image_request, send()) @@ -904,6 +894,10 @@ TEST_F(TestMockImageReplayer, DecodeError) { close_ctx.complete(0); })); + expect_stop_replay(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + expect_shut_down(mock_remote_journaler, 0); + // fire m_image_replayer->handle_replay_ready(); ASSERT_EQ(0, close_ctx.wait()); @@ -1010,17 +1004,15 @@ TEST_F(TestMockImageReplayer, DelayedReplay) { MockCloseImageRequest mock_close_local_image_request; - expect_stop_replay(mock_remote_journaler, 0); expect_shut_down(mock_local_replay, true, 0); - EXPECT_CALL(mock_local_journal, remove_listener(_)); EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_send(mock_close_local_image_request, 0); + expect_stop_replay(mock_remote_journaler, 0); EXPECT_CALL(mock_remote_journaler, remove_listener(_)); expect_shut_down(mock_remote_journaler, 0); - expect_send(mock_close_local_image_request, 0); - C_SaferCond stop_ctx; m_image_replayer->stop(&stop_ctx); ASSERT_EQ(0, stop_ctx.wait()); diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 3276dba3b8f6..7e4211b02ee6 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -810,6 +810,17 @@ void ImageReplayer::handle_replay_ready() } m_event_replay_tracker.start_op(); + + m_lock.Lock(); + bool stopping = (m_state == STATE_STOPPING); + m_lock.Unlock(); + + if (stopping) { + dout(10) << "stopping event replay" << dendl; + m_event_replay_tracker.finish_op(); + return; + } + if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) { preprocess_entry(); return; @@ -1452,19 +1463,18 @@ void ImageReplayer::shut_down(int r) { } } + // NOTE: it's important to ensure that the local image is fully + // closed before attempting to close the remote journal in + // case the remote cluster is unreachable + // chain the shut down sequence (reverse order) Context *ctx = new FunctionContext( [this, r](int _r) { update_mirror_image_status(true, STATE_STOPPED); handle_shut_down(r); }); - if (m_local_image_ctx) { - ctx = new FunctionContext([this, ctx](int r) { - CloseImageRequest *request = CloseImageRequest::create( - &m_local_image_ctx, ctx); - request->send(); - }); - } + + // close the remote journal if (m_remote_journaler != nullptr) { ctx = new FunctionContext([this, ctx](int r) { delete m_remote_journaler; @@ -1481,6 +1491,30 @@ void ImageReplayer::shut_down(int r) { }); } } + + // stop the replay of remote journal events + if (m_replay_handler != nullptr) { + ctx = new FunctionContext([this, ctx](int r) { + delete m_replay_handler; + m_replay_handler = nullptr; + + m_event_replay_tracker.wait_for_ops(ctx); + }); + ctx = new FunctionContext([this, ctx](int r) { + m_remote_journaler->stop_replay(ctx); + }); + } + + // close the local image (release exclusive lock) + if (m_local_image_ctx) { + ctx = new FunctionContext([this, ctx](int r) { + CloseImageRequest *request = CloseImageRequest::create( + &m_local_image_ctx, ctx); + request->send(); + }); + } + + // shut down event replay into the local image if (m_local_journal != nullptr) { ctx = new FunctionContext([this, ctx](int r) { m_local_journal = nullptr; @@ -1497,32 +1531,29 @@ void ImageReplayer::shut_down(int r) { }); } ctx = new FunctionContext([this, ctx](int r) { - if (r < 0) { - derr << "error flushing journal replay: " << cpp_strerror(r) << dendl; - } - // blocks if listener notification is in-progress m_local_journal->remove_listener(m_journal_listener); - - // wait for all in-flight replayed events to complete - m_event_replay_tracker.wait_for_ops(ctx); - }); - if (m_local_replay != nullptr) { - ctx = new FunctionContext([this, ctx](int r) { - m_local_replay->shut_down(true, ctx); - }); - } - } - if (m_replay_handler != nullptr) { - ctx = new FunctionContext([this, ctx](int r) { - delete m_replay_handler; - m_replay_handler = nullptr; ctx->complete(0); }); + } + + // wait for all local in-flight replay events to complete + ctx = new FunctionContext([this, ctx](int r) { + if (r < 0) { + derr << "error shutting down journal replay: " << cpp_strerror(r) + << dendl; + } + + m_event_replay_tracker.wait_for_ops(ctx); + }); + + // flush any local in-flight replay events + if (m_local_replay != nullptr) { ctx = new FunctionContext([this, ctx](int r) { - m_remote_journaler->stop_replay(ctx); + m_local_replay->shut_down(true, ctx); }); } + m_threads->work_queue->queue(ctx, 0); }