flush(ictx);
close_image(ictx);
- C_SaferCond ctx;
- m_replayer->resync_image(&ctx);
- ASSERT_EQ(0, ctx.wait());
+ open_local_image(&ictx);
+ librbd::Journal<>::request_resync(ictx);
+ close_image(ictx);
wait_for_stopped();
// vim: ts=8 sw=2 smarttab
#include "cls/journal/cls_journal_types.h"
-#include "librbd/journal/Replay.h"
#include "librbd/journal/Types.h"
+#include "librbd/journal/TypeTraits.h"
#include "tools/rbd_mirror/ImageDeleter.h"
#include "tools/rbd_mirror/ImageReplayer.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
#include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
#include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
-#include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h"
-#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
+#include "tools/rbd_mirror/image_replayer/Replayer.h"
+#include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
+#include "tools/rbd_mirror/image_replayer/Utils.h"
+#include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
#include "test/rbd_mirror/test_mock_fixture.h"
#include "test/journal/mock/MockJournaler.h"
#include "test/librbd/mock/MockImageCtx.h"
-#include "test/librbd/mock/MockJournal.h"
#include "test/rbd_mirror/mock/MockContextWQ.h"
#include "test/rbd_mirror/mock/MockSafeTimer.h"
namespace {
-struct MockTestJournal;
-
struct MockTestImageCtx : public MockImageCtx {
MockTestImageCtx(librbd::ImageCtx &image_ctx)
: librbd::MockImageCtx(image_ctx) {
}
- MockTestJournal *journal = nullptr;
-};
-
-struct MockTestJournal : public MockJournal {
- MOCK_METHOD2(start_external_replay, void(journal::Replay<MockTestImageCtx> **,
- Context *on_start));
- MOCK_METHOD0(stop_external_replay, void());
};
} // anonymous namespace
namespace journal {
-template<>
-struct Replay<MockTestImageCtx> {
- MOCK_METHOD2(decode, int(bufferlist::const_iterator *, EventEntry *));
- MOCK_METHOD3(process, void(const EventEntry &, Context *, Context *));
- MOCK_METHOD1(flush, void(Context*));
- MOCK_METHOD2(shut_down, void(bool, Context*));
-};
-
template <>
struct TypeTraits<MockTestImageCtx> {
typedef ::journal::MockJournalerProxy Journaler;
- typedef ::journal::MockReplayEntryProxy ReplayEntry;
};
struct MirrorPeerClientMeta;
namespace journal {
-template<>
-struct EventPreprocessor<librbd::MockTestImageCtx> {
- static EventPreprocessor *s_instance;
-
- static EventPreprocessor *create(librbd::MockTestImageCtx &local_image_ctx,
- ::journal::MockJournalerProxy &remote_journaler,
- const std::string &local_mirror_uuid,
- librbd::journal::MirrorPeerClientMeta *client_meta,
- MockContextWQ *work_queue) {
+template <>
+struct Replayer<librbd::MockTestImageCtx> : public image_replayer::Replayer {
+ static Replayer* s_instance;
+ librbd::MockTestImageCtx** local_image_ctx;
+ image_replayer::ReplayerListener* replayer_listener;
+
+ static Replayer* create(librbd::MockTestImageCtx** local_image_ctx,
+ ::journal::MockJournalerProxy* remote_journaler,
+ const std::string& local_mirror_uuid,
+ const std::string& remote_mirror_uuid,
+ image_replayer::ReplayerListener* replayer_listener,
+ Threads<librbd::MockTestImageCtx>* threads) {
ceph_assert(s_instance != nullptr);
+ ceph_assert(local_image_ctx != nullptr);
+ s_instance->local_image_ctx = local_image_ctx;
+ s_instance->replayer_listener = replayer_listener;
return s_instance;
}
- static void destroy(EventPreprocessor* processor) {
- }
-
- EventPreprocessor() {
- ceph_assert(s_instance == nullptr);
+ Replayer() {
s_instance = this;
}
- ~EventPreprocessor() {
- ceph_assert(s_instance == this);
- s_instance = nullptr;
- }
-
- MOCK_METHOD1(is_required, bool(const librbd::journal::EventEntry &));
- MOCK_METHOD2(preprocess, void(librbd::journal::EventEntry *, Context *));
-};
+ MOCK_METHOD0(destroy, void());
-template<>
-struct ReplayStatusFormatter<librbd::MockTestImageCtx> {
- static ReplayStatusFormatter* s_instance;
-
- static ReplayStatusFormatter* create(::journal::MockJournalerProxy *journaler,
- const std::string &mirror_uuid) {
- ceph_assert(s_instance != nullptr);
- return s_instance;
- }
-
- static void destroy(ReplayStatusFormatter* formatter) {
- }
-
- ReplayStatusFormatter() {
- ceph_assert(s_instance == nullptr);
- s_instance = this;
- }
+ MOCK_METHOD1(init, void(Context*));
+ MOCK_METHOD1(shut_down, void(Context*));
+ MOCK_METHOD1(flush, void(Context*));
- ~ReplayStatusFormatter() {
- ceph_assert(s_instance == this);
- s_instance = nullptr;
- }
+ MOCK_METHOD2(get_replay_status, bool(std::string*, Context*));
- MOCK_METHOD2(get_or_send_update, bool(std::string *description, Context *on_finish));
+ MOCK_CONST_METHOD0(is_replaying, bool());
+ MOCK_CONST_METHOD0(is_resync_requested, bool());
+ MOCK_CONST_METHOD0(get_error_code, int());
+ MOCK_CONST_METHOD0(get_error_description, std::string());
};
-EventPreprocessor<librbd::MockTestImageCtx>* EventPreprocessor<librbd::MockTestImageCtx>::s_instance = nullptr;
-ReplayStatusFormatter<librbd::MockTestImageCtx>* ReplayStatusFormatter<librbd::MockTestImageCtx>::s_instance = nullptr;
+Replayer<librbd::MockTestImageCtx>* Replayer<librbd::MockTestImageCtx>::s_instance = nullptr;
} // namespace journal
} // namespace image_replayer
typedef image_replayer::CloseImageRequest<librbd::MockTestImageCtx> MockCloseImageRequest;
typedef image_replayer::PrepareLocalImageRequest<librbd::MockTestImageCtx> MockPrepareLocalImageRequest;
typedef image_replayer::PrepareRemoteImageRequest<librbd::MockTestImageCtx> MockPrepareRemoteImageRequest;
- typedef image_replayer::journal::EventPreprocessor<librbd::MockTestImageCtx> MockEventPreprocessor;
- typedef image_replayer::journal::ReplayStatusFormatter<librbd::MockTestImageCtx> MockReplayStatusFormatter;
- typedef librbd::journal::Replay<librbd::MockTestImageCtx> MockReplay;
+ typedef image_replayer::journal::Replayer<librbd::MockTestImageCtx> MockJournalReplayer;
typedef ImageReplayer<librbd::MockTestImageCtx> MockImageReplayer;
typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
}));
}
- void expect_flush_repeatedly(MockReplay& mock_replay,
- journal::MockJournaler& mock_journal) {
- EXPECT_CALL(mock_replay, flush(_))
- .WillRepeatedly(Invoke([this](Context* ctx) {
- m_threads->work_queue->queue(ctx, 0);
- }));
- EXPECT_CALL(mock_journal, flush_commit_position(_))
- .WillRepeatedly(Invoke([this](Context* ctx) {
- m_threads->work_queue->queue(ctx, 0);
- }));
- }
-
void expect_trash_move(MockImageDeleter& mock_image_deleter,
const std::string& global_image_id,
bool ignore_orphan, int r) {
return bl;
}
- void expect_get_or_send_update(
- MockReplayStatusFormatter &mock_replay_status_formatter) {
- EXPECT_CALL(mock_replay_status_formatter, get_or_send_update(_, _))
- .WillRepeatedly(DoAll(WithArg<1>(CompleteContext(-EEXIST)),
- Return(true)));
- }
-
void expect_send(MockPrepareLocalImageRequest &mock_request,
const std::string &local_image_id,
const std::string &local_image_name,
}));
}
- void expect_start_external_replay(librbd::MockTestJournal &mock_journal,
- MockReplay *mock_replay, int r) {
- EXPECT_CALL(mock_journal, start_external_replay(_, _))
- .WillOnce(DoAll(SetArgPointee<0>(mock_replay),
- WithArg<1>(CompleteContext(r))));
- }
-
- void expect_init(::journal::MockJournaler &mock_journaler, int r) {
- EXPECT_CALL(mock_journaler, init(_))
- .WillOnce(CompleteContext(r));
- }
-
- void expect_get_cached_client(::journal::MockJournaler &mock_journaler,
- int r) {
- librbd::journal::ImageClientMeta image_client_meta;
- image_client_meta.tag_class = 0;
-
- librbd::journal::ClientData client_data;
- client_data.client_meta = image_client_meta;
-
- cls::journal::Client client;
- encode(client_data, client.data);
-
- EXPECT_CALL(mock_journaler, get_cached_client("local_mirror_uuid", _))
- .WillOnce(DoAll(SetArgPointee<1>(client),
- Return(r)));
- }
-
- void expect_stop_replay(::journal::MockJournaler &mock_journaler, int r) {
- EXPECT_CALL(mock_journaler, stop_replay(_))
- .WillOnce(CompleteContext(r));
- }
-
- void expect_flush(MockReplay &mock_replay, int r) {
- EXPECT_CALL(mock_replay, flush(_)).WillOnce(CompleteContext(r));
+ void expect_init(MockJournalReplayer& mock_journal_replayer, int r) {
+ EXPECT_CALL(mock_journal_replayer, init(_))
+ .WillOnce(Invoke([this, &mock_journal_replayer, r](Context* ctx) {
+ if (r < 0) {
+ *mock_journal_replayer.local_image_ctx = nullptr;
+ }
+ m_threads->work_queue->queue(ctx, r);
+ }));
}
- void expect_shut_down(MockReplay &mock_replay, bool cancel_ops, int r) {
- EXPECT_CALL(mock_replay, shut_down(cancel_ops, _))
- .WillOnce(WithArg<1>(CompleteContext(r)));
+ void expect_shut_down(MockJournalReplayer& mock_journal_replayer, int r) {
+ EXPECT_CALL(mock_journal_replayer, shut_down(_))
+ .WillOnce(Invoke([this, &mock_journal_replayer, r](Context* ctx) {
+ *mock_journal_replayer.local_image_ctx = nullptr;
+ m_threads->work_queue->queue(ctx, r);
+ }));
+ EXPECT_CALL(mock_journal_replayer, destroy());
}
- void expect_shut_down(journal::MockJournaler &mock_journaler, int r) {
- EXPECT_CALL(mock_journaler, shut_down(_))
- .WillOnce(CompleteContext(r));
+ void expect_get_replay_status(MockJournalReplayer& mock_journal_replayer) {
+ EXPECT_CALL(mock_journal_replayer, get_replay_status(_, _))
+ .WillRepeatedly(DoAll(WithArg<1>(CompleteContext(-EEXIST)),
+ Return(true)));
}
void expect_send(MockCloseImageRequest &mock_close_image_request, int r) {
}));
}
- void expect_get_commit_tid_in_debug(
- ::journal::MockReplayEntry &mock_replay_entry) {
- // It is used in debug messages and depends on debug level
- EXPECT_CALL(mock_replay_entry, get_commit_tid())
- .Times(AtLeast(0))
- .WillRepeatedly(Return(0));
- }
-
- void expect_get_tag_tid_in_debug(librbd::MockTestJournal &mock_journal) {
- // It is used in debug messages and depends on debug level
- EXPECT_CALL(mock_journal, get_tag_tid()).Times(AtLeast(0))
- .WillRepeatedly(Return(0));
- }
-
- void expect_committed(::journal::MockReplayEntry &mock_replay_entry,
- ::journal::MockJournaler &mock_journaler, int times) {
- EXPECT_CALL(mock_replay_entry, get_data()).Times(times);
- EXPECT_CALL(mock_journaler, committed(
- MatcherCast<const ::journal::MockReplayEntryProxy&>(_)))
- .Times(times);
- }
-
- void expect_try_pop_front(::journal::MockJournaler &mock_journaler,
- uint64_t replay_tag_tid, bool entries_available) {
- EXPECT_CALL(mock_journaler, try_pop_front(_, _))
- .WillOnce(DoAll(SetArgPointee<0>(::journal::MockReplayEntryProxy()),
- SetArgPointee<1>(replay_tag_tid),
- Return(entries_available)));
- }
-
- void expect_try_pop_front_return_no_entries(
- ::journal::MockJournaler &mock_journaler, Context *on_finish) {
- EXPECT_CALL(mock_journaler, try_pop_front(_, _))
- .WillOnce(DoAll(Invoke([on_finish](::journal::MockReplayEntryProxy *e,
- uint64_t *t) {
- on_finish->complete(0);
- }),
- Return(false)));
- }
-
- void expect_get_tag(::journal::MockJournaler &mock_journaler,
- const cls::journal::Tag &tag, int r) {
- EXPECT_CALL(mock_journaler, get_tag(_, _, _))
- .WillOnce(DoAll(SetArgPointee<1>(tag),
- WithArg<2>(CompleteContext(r))));
- }
-
- void expect_allocate_tag(librbd::MockTestJournal &mock_journal, int r) {
- EXPECT_CALL(mock_journal, allocate_tag(_, _, _))
- .WillOnce(WithArg<2>(CompleteContext(r)));
- }
-
- void expect_preprocess(MockEventPreprocessor &mock_event_preprocessor,
- bool required, int r) {
- EXPECT_CALL(mock_event_preprocessor, is_required(_))
- .WillOnce(Return(required));
- if (required) {
- EXPECT_CALL(mock_event_preprocessor, preprocess(_, _))
- .WillOnce(WithArg<1>(CompleteContext(r)));
- }
- }
-
- void expect_process(MockReplay &mock_replay,
- int on_ready_r, int on_commit_r) {
- EXPECT_CALL(mock_replay, process(_, _, _))
- .WillOnce(DoAll(WithArg<1>(CompleteContext(on_ready_r)),
- WithArg<2>(CompleteContext(on_commit_r))));
- }
-
void expect_set_mirror_image_status_repeatedly() {
EXPECT_CALL(m_local_status_updater, set_mirror_image_status(_, _, _))
.WillRepeatedly(Invoke([](auto, auto, auto){}));
&m_remote_status_updater);
}
+ void wait_for_stopped() {
+ for (int i = 0; i < 10000; i++) {
+ if (m_image_replayer->is_stopped()) {
+ break;
+ }
+ usleep(1000);
+ }
+ ASSERT_TRUE(m_image_replayer->is_stopped());
+ }
+
librbd::ImageCtx *m_remote_image_ctx;
librbd::ImageCtx *m_local_image_ctx = nullptr;
MockInstanceWatcher m_instance_watcher;
create_local_image();
librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
- librbd::MockTestJournal mock_local_journal;
- mock_local_image_ctx.journal = &mock_local_journal;
-
journal::MockJournaler mock_remote_journaler;
MockThreads mock_threads(m_threads);
expect_work_queue_repeatedly(mock_threads);
MockPrepareLocalImageRequest mock_prepare_local_image_request;
MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
MockBootstrapRequest mock_bootstrap_request;
- MockReplay mock_local_replay;
- MockEventPreprocessor mock_event_preprocessor;
- MockReplayStatusFormatter mock_replay_status_formatter;
+ MockJournalReplayer mock_journal_replayer;
+ expect_get_replay_status(mock_journal_replayer);
expect_set_mirror_image_status_repeatedly();
- expect_flush_repeatedly(mock_local_replay, mock_remote_journaler);
- expect_get_or_send_update(mock_replay_status_formatter);
InSequence seq;
expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
m_remote_image_ctx->id, 0);
EXPECT_CALL(mock_remote_journaler, construct());
expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0);
-
- EXPECT_CALL(mock_local_journal, add_listener(_));
-
- expect_init(mock_remote_journaler, 0);
-
- EXPECT_CALL(mock_remote_journaler, add_listener(_));
- expect_get_cached_client(mock_remote_journaler, 0);
-
- expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
-
- EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _));
+ expect_init(mock_journal_replayer, 0);
create_image_replayer(mock_threads);
m_image_replayer->get_health_state());
// STOP
-
- MockCloseImageRequest mock_close_local_image_request;
-
- expect_shut_down(mock_local_replay, true, 0);
- EXPECT_CALL(mock_local_journal, remove_listener(_));
- EXPECT_CALL(mock_local_journal, stop_external_replay());
- expect_send(mock_close_local_image_request, 0);
-
- expect_stop_replay(mock_remote_journaler, 0);
- EXPECT_CALL(mock_remote_journaler, remove_listener(_));
- expect_shut_down(mock_remote_journaler, 0);
+ expect_shut_down(mock_journal_replayer, 0);
expect_mirror_image_status_exists(false);
C_SaferCond stop_ctx;
MockImageDeleter mock_image_deleter;
MockPrepareLocalImageRequest mock_prepare_local_image_request;
MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
- MockReplayStatusFormatter mock_replay_status_formatter;
expect_set_mirror_image_status_repeatedly();
- expect_get_or_send_update(mock_replay_status_formatter);
InSequence seq;
expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
expect_send(mock_prepare_remote_image_request, "remote mirror uuid",
"remote image id", 0);
EXPECT_CALL(mock_remote_journaler, construct());
- EXPECT_CALL(mock_remote_journaler, remove_listener(_));
- expect_shut_down(mock_remote_journaler, 0);
expect_mirror_image_status_exists(false);
create_image_replayer(mock_threads);
MockPrepareLocalImageRequest mock_prepare_local_image_request;
MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
MockBootstrapRequest mock_bootstrap_request;
- MockReplayStatusFormatter mock_replay_status_formatter;
expect_set_mirror_image_status_repeatedly();
- expect_get_or_send_update(mock_replay_status_formatter);
InSequence seq;
expect_send(mock_prepare_local_image_request, "", "", "", -ENOENT);
EXPECT_CALL(mock_remote_journaler, construct());
expect_send(mock_bootstrap_request, mock_local_image_ctx, false, -EREMOTEIO);
- EXPECT_CALL(mock_remote_journaler, remove_listener(_));
- expect_shut_down(mock_remote_journaler, 0);
expect_mirror_image_status_exists(false);
create_image_replayer(mock_threads);
MockImageDeleter mock_image_deleter;
MockPrepareLocalImageRequest mock_prepare_local_image_request;
- MockReplayStatusFormatter mock_replay_status_formatter;
EXPECT_CALL(m_local_status_updater, set_mirror_image_status(_, _, _))
.WillRepeatedly(Invoke([](auto, auto, auto){}));
- expect_get_or_send_update(mock_replay_status_formatter);
InSequence seq;
expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
MockImageDeleter mock_image_deleter;
MockPrepareLocalImageRequest mock_prepare_local_image_request;
MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
- MockReplayStatusFormatter mock_replay_status_formatter;
expect_set_mirror_image_status_repeatedly();
- expect_get_or_send_update(mock_replay_status_formatter);
InSequence seq;
expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
MockImageDeleter mock_image_deleter;
MockPrepareLocalImageRequest mock_prepare_local_image_request;
MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
- MockReplayStatusFormatter mock_replay_status_formatter;
expect_set_mirror_image_status_repeatedly();
- expect_get_or_send_update(mock_replay_status_formatter);
InSequence seq;
expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
MockImageDeleter mock_image_deleter;
MockPrepareLocalImageRequest mock_prepare_local_image_request;
MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
- MockReplayStatusFormatter mock_replay_status_formatter;
expect_set_mirror_image_status_repeatedly();
- expect_get_or_send_update(mock_replay_status_formatter);
InSequence seq;
expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
MockPrepareLocalImageRequest mock_prepare_local_image_request;
MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
MockBootstrapRequest mock_bootstrap_request;
- MockReplayStatusFormatter mock_replay_status_formatter;
expect_set_mirror_image_status_repeatedly();
- expect_get_or_send_update(mock_replay_status_formatter);
InSequence seq;
expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
EXPECT_CALL(mock_remote_journaler, construct());
expect_send(mock_bootstrap_request, mock_local_image_ctx, false, -EINVAL);
- EXPECT_CALL(mock_remote_journaler, remove_listener(_));
- expect_shut_down(mock_remote_journaler, 0);
expect_mirror_image_status_exists(false);
create_image_replayer(mock_threads);
MockImageDeleter mock_image_deleter;
MockPrepareLocalImageRequest mock_prepare_local_image_request;
MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
- MockReplayStatusFormatter mock_replay_status_formatter;
expect_set_mirror_image_status_repeatedly();
- expect_get_or_send_update(mock_replay_status_formatter);
InSequence seq;
expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
m_image_replayer->stop(nullptr, true);
}));
- EXPECT_CALL(mock_remote_journaler, remove_listener(_));
- expect_shut_down(mock_remote_journaler, 0);
expect_mirror_image_status_exists(false);
create_image_replayer(mock_threads);
ASSERT_EQ(-ECANCELED, start_ctx.wait());
}
-TEST_F(TestMockImageReplayer, StartExternalReplayError) {
+TEST_F(TestMockImageReplayer, StopError) {
// START
create_local_image();
librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
- librbd::MockTestJournal mock_local_journal;
- mock_local_image_ctx.journal = &mock_local_journal;
-
journal::MockJournaler mock_remote_journaler;
MockThreads mock_threads(m_threads);
expect_work_queue_repeatedly(mock_threads);
MockPrepareLocalImageRequest mock_prepare_local_image_request;
MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
MockBootstrapRequest mock_bootstrap_request;
- MockReplay mock_local_replay;
- MockEventPreprocessor mock_event_preprocessor;
- MockReplayStatusFormatter mock_replay_status_formatter;
+ MockJournalReplayer mock_journal_replayer;
+ expect_get_replay_status(mock_journal_replayer);
expect_set_mirror_image_status_repeatedly();
- expect_get_or_send_update(mock_replay_status_formatter);
InSequence seq;
expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
m_remote_image_ctx->id, 0);
EXPECT_CALL(mock_remote_journaler, construct());
expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0);
+ expect_init(mock_journal_replayer, 0);
- EXPECT_CALL(mock_local_journal, add_listener(_));
-
- expect_init(mock_remote_journaler, 0);
-
- EXPECT_CALL(mock_remote_journaler, add_listener(_));
- expect_get_cached_client(mock_remote_journaler, 0);
+ create_image_replayer(mock_threads);
- expect_start_external_replay(mock_local_journal, nullptr, -EINVAL);
+ C_SaferCond start_ctx;
+ m_image_replayer->start(&start_ctx);
+ ASSERT_EQ(0, start_ctx.wait());
- MockCloseImageRequest mock_close_local_image_request;
- EXPECT_CALL(mock_local_journal, remove_listener(_));
- expect_send(mock_close_local_image_request, 0);
+ // STOP (errors are ignored)
- EXPECT_CALL(mock_remote_journaler, remove_listener(_));
- expect_shut_down(mock_remote_journaler, 0);
+ expect_shut_down(mock_journal_replayer, -EINVAL);
expect_mirror_image_status_exists(false);
- create_image_replayer(mock_threads);
-
- C_SaferCond start_ctx;
- m_image_replayer->start(&start_ctx);
- ASSERT_EQ(-EINVAL, start_ctx.wait());
- ASSERT_EQ(image_replayer::HEALTH_STATE_ERROR,
- m_image_replayer->get_health_state());
+ C_SaferCond stop_ctx;
+ m_image_replayer->stop(&stop_ctx);
+ ASSERT_EQ(0, stop_ctx.wait());
}
-TEST_F(TestMockImageReplayer, StopError) {
- // START
-
+TEST_F(TestMockImageReplayer, ReplayerError) {
create_local_image();
librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
- librbd::MockTestJournal mock_local_journal;
- mock_local_image_ctx.journal = &mock_local_journal;
-
journal::MockJournaler mock_remote_journaler;
MockThreads mock_threads(m_threads);
expect_work_queue_repeatedly(mock_threads);
MockPrepareLocalImageRequest mock_prepare_local_image_request;
MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
MockBootstrapRequest mock_bootstrap_request;
- MockReplay mock_local_replay;
- MockEventPreprocessor mock_event_preprocessor;
- MockReplayStatusFormatter mock_replay_status_formatter;
+ MockJournalReplayer mock_journal_replayer;
expect_set_mirror_image_status_repeatedly();
- expect_flush_repeatedly(mock_local_replay, mock_remote_journaler);
- expect_get_or_send_update(mock_replay_status_formatter);
InSequence seq;
expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
m_remote_image_ctx->id, 0);
EXPECT_CALL(mock_remote_journaler, construct());
expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0);
+ expect_init(mock_journal_replayer, -EINVAL);
+ EXPECT_CALL(mock_journal_replayer, get_error_description())
+ .WillOnce(Return("FAIL"));
+ EXPECT_CALL(mock_journal_replayer, destroy());
- EXPECT_CALL(mock_local_journal, add_listener(_));
-
- expect_init(mock_remote_journaler, 0);
-
- EXPECT_CALL(mock_remote_journaler, add_listener(_));
- expect_get_cached_client(mock_remote_journaler, 0);
-
- expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
-
- EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _));
-
+ expect_mirror_image_status_exists(false);
create_image_replayer(mock_threads);
C_SaferCond start_ctx;
m_image_replayer->start(&start_ctx);
- ASSERT_EQ(0, start_ctx.wait());
-
- // STOP (errors are ignored)
-
- MockCloseImageRequest mock_close_local_image_request;
-
- expect_shut_down(mock_local_replay, true, -EINVAL);
- EXPECT_CALL(mock_local_journal, remove_listener(_));
- EXPECT_CALL(mock_local_journal, stop_external_replay());
- expect_send(mock_close_local_image_request, -EINVAL);
-
- expect_stop_replay(mock_remote_journaler, -EINVAL);
- EXPECT_CALL(mock_remote_journaler, remove_listener(_));
- expect_shut_down(mock_remote_journaler, -EINVAL);
- expect_mirror_image_status_exists(false);
-
- C_SaferCond stop_ctx;
- m_image_replayer->stop(&stop_ctx);
- ASSERT_EQ(0, stop_ctx.wait());
+ ASSERT_EQ(-EINVAL, start_ctx.wait());
}
-TEST_F(TestMockImageReplayer, Replay) {
+TEST_F(TestMockImageReplayer, ReplayerResync) {
// START
-
create_local_image();
librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
- librbd::MockTestJournal mock_local_journal;
- mock_local_image_ctx.journal = &mock_local_journal;
-
journal::MockJournaler mock_remote_journaler;
MockThreads mock_threads(m_threads);
expect_work_queue_repeatedly(mock_threads);
MockPrepareLocalImageRequest mock_prepare_local_image_request;
MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
MockBootstrapRequest mock_bootstrap_request;
- MockReplay mock_local_replay;
- MockEventPreprocessor mock_event_preprocessor;
- MockReplayStatusFormatter mock_replay_status_formatter;
- ::journal::MockReplayEntry mock_replay_entry;
+ MockJournalReplayer mock_journal_replayer;
+ expect_get_replay_status(mock_journal_replayer);
expect_set_mirror_image_status_repeatedly();
- expect_flush_repeatedly(mock_local_replay, mock_remote_journaler);
- expect_get_or_send_update(mock_replay_status_formatter);
- expect_get_commit_tid_in_debug(mock_replay_entry);
- expect_get_tag_tid_in_debug(mock_local_journal);
- expect_committed(mock_replay_entry, mock_remote_journaler, 2);
InSequence seq;
expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
m_remote_image_ctx->id, 0);
EXPECT_CALL(mock_remote_journaler, construct());
expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0);
-
- EXPECT_CALL(mock_local_journal, add_listener(_));
-
- expect_init(mock_remote_journaler, 0);
-
- EXPECT_CALL(mock_remote_journaler, add_listener(_));
- expect_get_cached_client(mock_remote_journaler, 0);
-
- expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
-
- EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _));
+ expect_init(mock_journal_replayer, 0);
create_image_replayer(mock_threads);
m_image_replayer->start(&start_ctx);
ASSERT_EQ(0, start_ctx.wait());
- // REPLAY
-
- cls::journal::Tag tag =
- {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
- librbd::Journal<>::LOCAL_MIRROR_UUID,
- true, 0, 0})};
-
- expect_try_pop_front(mock_remote_journaler, tag.tid, true);
-
- // replay_flush
- expect_shut_down(mock_local_replay, false, 0);
- EXPECT_CALL(mock_local_journal, stop_external_replay());
- expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
- expect_get_tag(mock_remote_journaler, tag, 0);
- expect_allocate_tag(mock_local_journal, 0);
-
- // process
- EXPECT_CALL(mock_replay_entry, get_data());
- EXPECT_CALL(mock_local_replay, decode(_, _))
- .WillOnce(Return(0));
- expect_preprocess(mock_event_preprocessor, false, 0);
- expect_process(mock_local_replay, 0, 0);
-
- // the next event with preprocess
- expect_try_pop_front(mock_remote_journaler, tag.tid, true);
- EXPECT_CALL(mock_replay_entry, get_data());
- EXPECT_CALL(mock_local_replay, decode(_, _))
- .WillOnce(Return(0));
- expect_preprocess(mock_event_preprocessor, true, 0);
- expect_process(mock_local_replay, 0, 0);
-
- // attempt to process the next event
- C_SaferCond replay_ctx;
- expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx);
-
- // fire
- m_image_replayer->handle_replay_ready();
- ASSERT_EQ(0, replay_ctx.wait());
-
- // STOP
-
- MockCloseImageRequest mock_close_local_image_request;
- expect_shut_down(mock_local_replay, true, 0);
- EXPECT_CALL(mock_local_journal, remove_listener(_));
- EXPECT_CALL(mock_local_journal, stop_external_replay());
- expect_send(mock_close_local_image_request, 0);
-
- expect_stop_replay(mock_remote_journaler, 0);
- EXPECT_CALL(mock_remote_journaler, remove_listener(_));
- expect_shut_down(mock_remote_journaler, 0);
+ // NOTIFY
+ EXPECT_CALL(mock_journal_replayer, is_resync_requested())
+ .WillOnce(Return(true));
+ expect_shut_down(mock_journal_replayer, 0);
+ expect_trash_move(mock_image_deleter, "global image id", true, 0);
expect_mirror_image_status_exists(false);
+ mock_journal_replayer.replayer_listener->handle_notification();
+ ASSERT_FALSE(m_image_replayer->is_running());
- C_SaferCond stop_ctx;
- m_image_replayer->stop(&stop_ctx);
- ASSERT_EQ(0, stop_ctx.wait());
+ wait_for_stopped();
}
-TEST_F(TestMockImageReplayer, DecodeError) {
+TEST_F(TestMockImageReplayer, ReplayerInterrupted) {
// START
-
create_local_image();
librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
- librbd::MockTestJournal mock_local_journal;
- mock_local_image_ctx.journal = &mock_local_journal;
-
journal::MockJournaler mock_remote_journaler;
MockThreads mock_threads(m_threads);
expect_work_queue_repeatedly(mock_threads);
MockPrepareLocalImageRequest mock_prepare_local_image_request;
MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
MockBootstrapRequest mock_bootstrap_request;
- MockReplay mock_local_replay;
- MockEventPreprocessor mock_event_preprocessor;
- MockReplayStatusFormatter mock_replay_status_formatter;
- ::journal::MockReplayEntry mock_replay_entry;
+ MockJournalReplayer mock_journal_replayer;
+ expect_get_replay_status(mock_journal_replayer);
expect_set_mirror_image_status_repeatedly();
- expect_flush_repeatedly(mock_local_replay, mock_remote_journaler);
- expect_get_or_send_update(mock_replay_status_formatter);
- expect_get_commit_tid_in_debug(mock_replay_entry);
- expect_get_tag_tid_in_debug(mock_local_journal);
InSequence seq;
expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
m_remote_image_ctx->id, 0);
EXPECT_CALL(mock_remote_journaler, construct());
expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0);
-
- EXPECT_CALL(mock_local_journal, add_listener(_));
-
- expect_init(mock_remote_journaler, 0);
-
- EXPECT_CALL(mock_remote_journaler, add_listener(_));
- expect_get_cached_client(mock_remote_journaler, 0);
-
- expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
-
- EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _));
+ expect_init(mock_journal_replayer, 0);
create_image_replayer(mock_threads);
m_image_replayer->start(&start_ctx);
ASSERT_EQ(0, start_ctx.wait());
- // REPLAY
-
- cls::journal::Tag tag =
- {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
- librbd::Journal<>::LOCAL_MIRROR_UUID,
- true, 0, 0})};
-
- expect_try_pop_front(mock_remote_journaler, tag.tid, true);
-
- // replay_flush
- expect_shut_down(mock_local_replay, false, 0);
- EXPECT_CALL(mock_local_journal, stop_external_replay());
- expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
- expect_get_tag(mock_remote_journaler, tag, 0);
- expect_allocate_tag(mock_local_journal, 0);
-
- // process
- EXPECT_CALL(mock_replay_entry, get_data());
- EXPECT_CALL(mock_local_replay, decode(_, _))
+ // NOTIFY
+ EXPECT_CALL(mock_journal_replayer, is_resync_requested())
+ .WillOnce(Return(false));
+ EXPECT_CALL(mock_journal_replayer, is_replaying())
+ .WillOnce(Return(false));
+ EXPECT_CALL(mock_journal_replayer, get_error_description())
+ .WillOnce(Return("INVALID"));
+ EXPECT_CALL(mock_journal_replayer, get_error_code())
.WillOnce(Return(-EINVAL));
-
- // stop on error
- expect_shut_down(mock_local_replay, true, 0);
- EXPECT_CALL(mock_local_journal, remove_listener(_));
- EXPECT_CALL(mock_local_journal, stop_external_replay());
-
- MockCloseImageRequest mock_close_local_image_request;
- C_SaferCond close_ctx;
- EXPECT_CALL(mock_close_local_image_request, send())
- .WillOnce(Invoke([&mock_close_local_image_request, &close_ctx]() {
- *mock_close_local_image_request.image_ctx = nullptr;
- mock_close_local_image_request.on_finish->complete(0);
- close_ctx.complete(0);
- }));
-
- expect_stop_replay(mock_remote_journaler, 0);
- EXPECT_CALL(mock_remote_journaler, remove_listener(_));
- expect_shut_down(mock_remote_journaler, 0);
+ expect_shut_down(mock_journal_replayer, 0);
expect_mirror_image_status_exists(false);
+ mock_journal_replayer.replayer_listener->handle_notification();
+ ASSERT_FALSE(m_image_replayer->is_running());
- // fire
- m_image_replayer->handle_replay_ready();
- ASSERT_EQ(0, close_ctx.wait());
-
- while (!m_image_replayer->is_stopped()) {
- usleep(1000);
- }
+ wait_for_stopped();
}
-TEST_F(TestMockImageReplayer, DelayedReplay) {
-
+TEST_F(TestMockImageReplayer, ReplayerRenamed) {
// START
-
create_local_image();
librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
- librbd::MockTestJournal mock_local_journal;
- mock_local_image_ctx.journal = &mock_local_journal;
-
journal::MockJournaler mock_remote_journaler;
MockThreads mock_threads(m_threads);
expect_work_queue_repeatedly(mock_threads);
MockPrepareLocalImageRequest mock_prepare_local_image_request;
MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
MockBootstrapRequest mock_bootstrap_request;
- MockReplay mock_local_replay;
- MockEventPreprocessor mock_event_preprocessor;
- MockReplayStatusFormatter mock_replay_status_formatter;
- ::journal::MockReplayEntry mock_replay_entry;
+ MockJournalReplayer mock_journal_replayer;
+ expect_get_replay_status(mock_journal_replayer);
expect_set_mirror_image_status_repeatedly();
- expect_flush_repeatedly(mock_local_replay, mock_remote_journaler);
- expect_get_or_send_update(mock_replay_status_formatter);
- expect_get_commit_tid_in_debug(mock_replay_entry);
- expect_get_tag_tid_in_debug(mock_local_journal);
- expect_committed(mock_replay_entry, mock_remote_journaler, 1);
InSequence seq;
expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
m_remote_image_ctx->id, 0);
EXPECT_CALL(mock_remote_journaler, construct());
expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0);
-
- EXPECT_CALL(mock_local_journal, add_listener(_));
-
- expect_init(mock_remote_journaler, 0);
-
- EXPECT_CALL(mock_remote_journaler, add_listener(_));
- expect_get_cached_client(mock_remote_journaler, 0);
-
- expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
-
- EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _));
+ expect_init(mock_journal_replayer, 0);
create_image_replayer(mock_threads);
m_image_replayer->start(&start_ctx);
ASSERT_EQ(0, start_ctx.wait());
- // REPLAY
-
- cls::journal::Tag tag =
- {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
- librbd::Journal<>::LOCAL_MIRROR_UUID,
- true, 0, 0})};
-
- expect_try_pop_front(mock_remote_journaler, tag.tid, true);
-
- // replay_flush
- expect_shut_down(mock_local_replay, false, 0);
- EXPECT_CALL(mock_local_journal, stop_external_replay());
- expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
- expect_get_tag(mock_remote_journaler, tag, 0);
- expect_allocate_tag(mock_local_journal, 0);
-
- // process with delay
- EXPECT_CALL(mock_replay_entry, get_data());
- librbd::journal::EventEntry event_entry(
- librbd::journal::AioDiscardEvent(123, 345, 0), ceph_clock_now());
- EXPECT_CALL(mock_local_replay, decode(_, _))
- .WillOnce(DoAll(SetArgPointee<1>(event_entry),
- Return(0)));
- expect_preprocess(mock_event_preprocessor, false, 0);
- expect_process(mock_local_replay, 0, 0);
-
- // attempt to process the next event
- C_SaferCond replay_ctx;
- expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx);
-
- // fire
- mock_local_image_ctx.mirroring_replay_delay = 2;
- m_image_replayer->handle_replay_ready();
- ASSERT_EQ(0, replay_ctx.wait());
-
- // add a pending (delayed) entry before stop
- expect_try_pop_front(mock_remote_journaler, tag.tid, true);
- EXPECT_CALL(mock_replay_entry, get_data());
- C_SaferCond decode_ctx;
- EXPECT_CALL(mock_local_replay, decode(_, _))
- .WillOnce(DoAll(Invoke([&decode_ctx](bufferlist::const_iterator* it,
- librbd::journal::EventEntry *e) {
- decode_ctx.complete(0);
- }),
- Return(0)));
-
- mock_local_image_ctx.mirroring_replay_delay = 10;
- m_image_replayer->handle_replay_ready();
- ASSERT_EQ(0, decode_ctx.wait());
+ // NOTIFY
+ EXPECT_CALL(mock_journal_replayer, is_resync_requested())
+ .WillOnce(Return(false));
+ EXPECT_CALL(mock_journal_replayer, is_replaying())
+ .WillOnce(Return(true));
+ mock_local_image_ctx.name = "NEW NAME";
+ mock_journal_replayer.replayer_listener->handle_notification();
// STOP
-
- MockCloseImageRequest mock_close_local_image_request;
-
- expect_shut_down(mock_local_replay, true, 0);
- EXPECT_CALL(mock_local_journal, remove_listener(_));
- EXPECT_CALL(mock_local_journal, stop_external_replay());
- expect_send(mock_close_local_image_request, 0);
-
- expect_stop_replay(mock_remote_journaler, 0);
- EXPECT_CALL(mock_remote_journaler, remove_listener(_));
- expect_shut_down(mock_remote_journaler, 0);
+ expect_shut_down(mock_journal_replayer, 0);
expect_mirror_image_status_exists(false);
C_SaferCond stop_ctx;
m_image_replayer->stop(&stop_ctx);
ASSERT_EQ(0, stop_ctx.wait());
-}
+ auto image_spec = image_replayer::util::compute_image_spec(
+ m_local_io_ctx, "NEW NAME");
+ ASSERT_EQ(image_spec, m_image_replayer->get_name());
+}
} // namespace mirror
} // namespace rbd
#include "common/WorkQueue.h"
#include "global/global_context.h"
#include "journal/Journaler.h"
-#include "journal/ReplayHandler.h"
#include "journal/Settings.h"
#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
#include "librbd/Journal.h"
#include "librbd/Operations.h"
#include "librbd/Utils.h"
-#include "librbd/journal/Replay.h"
#include "ImageDeleter.h"
#include "ImageReplayer.h"
#include "MirrorStatusUpdater.h"
#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
#include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
#include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
#include "tools/rbd_mirror/image_replayer/Utils.h"
-#include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h"
-#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
+#include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
+#include <map>
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd_mirror
#define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
<< __func__ << ": "
-using std::map;
-using std::string;
-using std::unique_ptr;
-using std::shared_ptr;
-using std::vector;
-
extern PerfCounters *g_perf_counters;
namespace rbd {
namespace mirror {
-using librbd::util::create_async_context_callback;
using librbd::util::create_context_callback;
-using librbd::util::create_rados_callback;
template <typename I>
std::ostream &operator<<(std::ostream &os,
namespace {
-template <typename I>
-struct ReplayHandler : public ::journal::ReplayHandler {
- ImageReplayer<I> *replayer;
- ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
-
- void handle_entries_available() override {
- replayer->handle_replay_ready();
- }
- void handle_complete(int r) override {
- std::stringstream ss;
- if (r == -ENOMEM) {
- ss << "not enough memory in autotune cache";
- } else if (r < 0) {
- ss << "replay completed with error: " << cpp_strerror(r);
- }
- replayer->handle_replay_complete(r, ss.str());
- }
-};
-
template <typename I>
class ImageReplayerAdminSocketCommand {
public:
Commands commands;
};
-uint32_t calculate_replay_delay(const utime_t &event_time,
- int mirroring_replay_delay) {
- if (mirroring_replay_delay <= 0) {
- return 0;
- }
-
- utime_t now = ceph_clock_now();
- if (event_time + mirroring_replay_delay <= now) {
- return 0;
- }
-
- // ensure it is rounded up when converting to integer
- return (event_time + mirroring_replay_delay - now) + 1;
-}
-
} // anonymous namespace
template <typename I>
}
template <typename I>
-void ImageReplayer<I>::RemoteJournalerListener::handle_update(
- ::journal::JournalMetadata *) {
- auto ctx = new LambdaContext([this](int r) {
- replayer->handle_remote_journal_metadata_updated();
- });
- replayer->m_threads->work_queue->queue(ctx, 0);
-}
+struct ImageReplayer<I>::ReplayerListener
+ : public image_replayer::ReplayerListener {
+ ImageReplayer<I>* image_replayer;
+
+ ReplayerListener(ImageReplayer<I>* image_replayer)
+ : image_replayer(image_replayer) {
+ }
+
+ void handle_notification() override {
+ image_replayer->handle_replayer_notification();
+ }
+};
template <typename I>
ImageReplayer<I>::ImageReplayer(
m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " +
stringify(local_io_ctx.get_id()) + " " + global_image_id)),
m_progress_cxt(this),
- m_journal_listener(new JournalListener(this)),
- m_remote_listener(this)
+ m_replayer_listener(new ReplayerListener(this))
{
// Register asok commands using a temporary "remote_pool_name/global_image_id"
// name. When the image name becomes known on start the asok commands will be
ImageReplayer<I>::~ImageReplayer()
{
unregister_admin_socket_hook();
- ceph_assert(m_event_preprocessor == nullptr);
- ceph_assert(m_replay_status_formatter == nullptr);
ceph_assert(m_local_image_ctx == nullptr);
- ceph_assert(m_local_replay == nullptr);
- ceph_assert(m_remote_journaler == nullptr);
- ceph_assert(m_replay_handler == nullptr);
ceph_assert(m_on_start_finish == nullptr);
ceph_assert(m_on_stop_finish == nullptr);
ceph_assert(m_bootstrap_request == nullptr);
- ceph_assert(m_flush_local_replay_task == nullptr);
-
- delete m_journal_listener;
+ delete m_replayer_listener;
}
template <typename I>
template <typename I>
void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
- dout(10) << r << " " << desc << dendl;
+ dout(10) << "r=" << r << ", desc=" << desc << dendl;
std::lock_guard l{m_lock};
m_last_r = r;
on_start_fail(r, "error preparing local image for replay");
return;
} else {
+ // have the correct local image name now
reregister_admin_socket_hook();
}
- // local image doesn't exist or is non-primary
prepare_remote_image();
}
journal_settings.commit_interval = cct->_conf.get_val<double>(
"rbd_mirror_journal_commit_age");
+ ceph_assert(m_remote_journaler == nullptr);
+
Context *ctx = create_context_callback<
ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
auto req = image_replayer::PrepareRemoteImageRequest<I>::create(
void ImageReplayer<I>::handle_prepare_remote_image(int r) {
dout(10) << "r=" << r << dendl;
- ceph_assert(r < 0 ? m_remote_journaler == nullptr : m_remote_journaler != nullptr);
+ ceph_assert(r < 0 ? m_remote_journaler == nullptr :
+ m_remote_journaler != nullptr);
if (r < 0 && !m_local_image_id.empty() &&
m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
// local image is primary -- fall-through
return;
}
- ceph_assert(m_local_journal == nullptr);
- {
- std::shared_lock image_locker{m_local_image_ctx->image_lock};
- if (m_local_image_ctx->journal != nullptr) {
- m_local_journal = m_local_image_ctx->journal;
- m_local_journal->add_listener(m_journal_listener);
- }
- }
-
- if (m_local_journal == nullptr) {
- on_start_fail(-EINVAL, "error accessing local journal");
- return;
- }
-
- update_mirror_image_status(false, boost::none);
- init_remote_journaler();
+ start_replay();
}
template <typename I>
-void ImageReplayer<I>::init_remote_journaler() {
+void ImageReplayer<I>::start_replay() {
dout(10) << dendl;
- Context *ctx = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
- m_remote_journaler->init(ctx);
+ // TODO support journal + snapshot replay
+ std::unique_lock locker{m_lock};
+ ceph_assert(m_replayer == nullptr);
+ m_replayer = image_replayer::journal::Replayer<I>::create(
+ &m_local_image_ctx, m_remote_journaler, m_local_mirror_uuid,
+ m_remote_image.mirror_uuid, m_replayer_listener, m_threads);
+
+ auto ctx = create_context_callback<
+ ImageReplayer<I>, &ImageReplayer<I>::handle_start_replay>(this);
+ m_replayer->init(ctx);
}
template <typename I>
-void ImageReplayer<I>::handle_init_remote_journaler(int r) {
+void ImageReplayer<I>::handle_start_replay(int r) {
dout(10) << "r=" << r << dendl;
if (on_start_interrupted()) {
return;
} else if (r < 0) {
- derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
- on_start_fail(r, "error initializing remote journal");
- return;
- }
-
- m_remote_journaler->add_listener(&m_remote_listener);
-
- cls::journal::Client client;
- r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
- if (r < 0) {
- derr << "error retrieving remote journal client: " << cpp_strerror(r)
- << dendl;
- on_start_fail(r, "error retrieving remote journal client");
- return;
- }
-
- dout(5) << "image_id=" << m_local_image_id << ", "
- << "client_meta.image_id=" << m_client_meta.image_id << ", "
- << "client.state=" << client.state << dendl;
- if (m_client_meta.image_id == m_local_image_id &&
- client.state != cls::journal::CLIENT_STATE_CONNECTED) {
- dout(5) << "client flagged disconnected, stopping image replay" << dendl;
- if (m_local_image_ctx->config.template get_val<bool>("rbd_mirroring_resync_after_disconnect")) {
+ std::string error_description = m_replayer->get_error_description();
+ if (r == -ENOTCONN && m_replayer->is_resync_requested()) {
+ std::unique_lock locker{m_lock};
m_resync_requested = true;
- on_start_fail(-ENOTCONN, "disconnected: automatic resync");
- } else {
- on_start_fail(-ENOTCONN, "disconnected");
}
- return;
- }
-
- start_replay();
-}
-
-template <typename I>
-void ImageReplayer<I>::start_replay() {
- dout(10) << dendl;
- Context *start_ctx = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
- m_local_journal->start_external_replay(&m_local_replay, start_ctx);
-}
+ // shut down not required if init failed
+ m_replayer->destroy();
+ m_replayer = nullptr;
-template <typename I>
-void ImageReplayer<I>::handle_start_replay(int r) {
- dout(10) << "r=" << r << dendl;
-
- if (r < 0) {
- ceph_assert(m_local_replay == nullptr);
derr << "error starting external replay on local image "
<< m_local_image_id << ": " << cpp_strerror(r) << dendl;
- on_start_fail(r, "error starting replay on local image");
+ on_start_fail(r, error_description);
return;
}
- m_replay_status_formatter =
- image_replayer::journal::ReplayStatusFormatter<I>::create(
- m_remote_journaler, m_local_mirror_uuid);
-
- Context *on_finish(nullptr);
+ Context *on_finish = nullptr;
{
- std::lock_guard locker{m_lock};
+ std::unique_lock locker{m_lock};
ceph_assert(m_state == STATE_STARTING);
m_state = STATE_REPLAYING;
std::swap(m_on_start_finish, on_finish);
}
- m_event_preprocessor = image_replayer::journal::EventPreprocessor<I>::create(
- *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
- &m_client_meta, m_threads->work_queue);
-
update_mirror_image_status(true, boost::none);
-
if (on_replay_interrupted()) {
if (on_finish != nullptr) {
on_finish->complete(r);
return;
}
- {
- CephContext *cct = static_cast<CephContext *>(m_local_io_ctx.cct());
- double poll_seconds = cct->_conf.get_val<double>(
- "rbd_mirror_journal_poll_age");
-
- std::lock_guard locker{m_lock};
- m_replay_handler = new ReplayHandler<I>(this);
- m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
-
- dout(10) << "m_remote_journaler=" << *m_remote_journaler << dendl;
- }
-
dout(10) << "start succeeded" << dendl;
if (on_finish != nullptr) {
dout(10) << "on finish complete, r=" << r << dendl;
template <typename I>
void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
{
- dout(10) << "r=" << r << dendl;
+ dout(10) << "r=" << r << ", desc=" << desc << dendl;
Context *ctx = new LambdaContext([this, r, desc](int _r) {
{
std::lock_guard locker{m_lock};
m_stop_requested = true;
m_state = STATE_STOPPING;
- cancel_flush_local_replay_task();
}
set_state_description(r, desc);
shut_down(0);
}
-template <typename I>
-void ImageReplayer<I>::handle_replay_ready()
-{
- dout(20) << dendl;
- if (on_replay_interrupted()) {
- return;
- }
-
- if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
- return;
- }
-
- m_event_replay_tracker.start_op();
-
- m_lock.lock();
- bool stopping = (m_state == STATE_STOPPING);
- m_lock.unlock();
-
- if (stopping) {
- dout(10) << "stopping event replay" << dendl;
- m_event_replay_tracker.finish_op();
- return;
- }
-
- if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
- preprocess_entry();
- return;
- }
-
- replay_flush();
-}
-
template <typename I>
void ImageReplayer<I>::restart(Context *on_finish)
{
template <typename I>
void ImageReplayer<I>::flush()
{
- dout(10) << dendl;
C_SaferCond ctx;
- flush_local_replay(&ctx);
- ctx.wait();
-
- update_mirror_image_status(false, boost::none);
-}
-
-
-template <typename I>
-void ImageReplayer<I>::schedule_flush_local_replay_task() {
- ceph_assert(ceph_mutex_is_locked(m_lock));
-
- std::lock_guard timer_locker{m_threads->timer_lock};
- if (m_state != STATE_REPLAYING || m_flush_local_replay_task != nullptr) {
- return;
- }
-
- dout(15) << dendl;
- m_flush_local_replay_task = create_async_context_callback(
- m_threads->work_queue, create_context_callback<
- ImageReplayer<I>,
- &ImageReplayer<I>::handle_flush_local_replay_task>(this));
- m_threads->timer->add_event_after(30, m_flush_local_replay_task);
-}
-
-template <typename I>
-void ImageReplayer<I>::cancel_flush_local_replay_task() {
- ceph_assert(ceph_mutex_is_locked(m_lock));
- std::lock_guard timer_locker{m_threads->timer_lock};
- if (m_flush_local_replay_task != nullptr) {
- auto canceled = m_threads->timer->cancel_event(m_flush_local_replay_task);
- m_flush_local_replay_task = nullptr;
- ceph_assert(canceled);
- }
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_flush_local_replay_task(int) {
- dout(15) << dendl;
-
- m_in_flight_op_tracker.start_op();
- auto on_finish = new LambdaContext([this](int) {
- {
- std::lock_guard timer_locker{m_threads->timer_lock};
- m_flush_local_replay_task = nullptr;
- }
-
- update_mirror_image_status(false, boost::none);
- m_in_flight_op_tracker.finish_op();
- });
- flush_local_replay(on_finish);
-}
-
-template <typename I>
-void ImageReplayer<I>::flush_local_replay(Context* on_flush)
-{
- m_lock.lock();
- if (m_state != STATE_REPLAYING) {
- m_lock.unlock();
- on_flush->complete(0);
- return;
- }
-
- dout(15) << dendl;
- auto ctx = new LambdaContext(
- [this, on_flush](int r) {
- handle_flush_local_replay(on_flush, r);
- });
- m_local_replay->flush(ctx);
- m_lock.unlock();
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_flush_local_replay(Context* on_flush, int r)
-{
- dout(15) << "r=" << r << dendl;
- if (r < 0) {
- derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
- on_flush->complete(r);
- return;
- }
- flush_commit_position(on_flush);
-}
+ {
+ std::unique_lock locker{m_lock};
+ if (m_state != STATE_REPLAYING) {
+ return;
+ }
-template <typename I>
-void ImageReplayer<I>::flush_commit_position(Context* on_flush)
-{
- m_lock.lock();
- if (m_state != STATE_REPLAYING) {
- m_lock.unlock();
- on_flush->complete(0);
- return;
+ dout(10) << dendl;
+ ceph_assert(m_replayer != nullptr);
+ m_replayer->flush(&ctx);
}
- dout(15) << dendl;
- auto ctx = new LambdaContext(
- [this, on_flush](int r) {
- handle_flush_commit_position(on_flush, r);
- });
- m_remote_journaler->flush_commit_position(ctx);
- m_lock.unlock();
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_flush_commit_position(Context* on_flush, int r)
-{
- dout(15) << "r=" << r << dendl;
- if (r < 0) {
- derr << "error flushing remote journal commit position: "
- << cpp_strerror(r) << dendl;
+ int r = ctx.wait();
+ if (r >= 0) {
+ update_mirror_image_status(false, boost::none);
}
-
- on_flush->complete(r);
}
template <typename I>
f->close_section();
}
-template <typename I>
-void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
-{
- dout(10) << "r=" << r << dendl;
- if (r < 0) {
- derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
- }
-
- {
- std::lock_guard locker{m_lock};
- m_stop_requested = true;
- }
- on_stop_journal_replay(r, error_desc);
-}
-
-template <typename I>
-void ImageReplayer<I>::replay_flush() {
- dout(10) << dendl;
-
- bool interrupted = false;
- {
- std::lock_guard locker{m_lock};
- if (m_state != STATE_REPLAYING) {
- dout(10) << "replay interrupted" << dendl;
- interrupted = true;
- } else {
- m_state = STATE_REPLAY_FLUSHING;
- }
- }
-
- if (interrupted) {
- m_event_replay_tracker.finish_op();
- return;
- }
-
- // shut down the replay to flush all IO and ops and create a new
- // replayer to handle the new tag epoch
- Context *ctx = create_context_callback<
- ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
- ctx = new LambdaContext([this, ctx](int r) {
- m_local_image_ctx->journal->stop_external_replay();
- m_local_replay = nullptr;
-
- if (r < 0) {
- ctx->complete(r);
- return;
- }
-
- m_local_journal->start_external_replay(&m_local_replay, ctx);
- });
- m_local_replay->shut_down(false, ctx);
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_replay_flush(int r) {
- dout(10) << "r=" << r << dendl;
-
- {
- std::lock_guard locker{m_lock};
- ceph_assert(m_state == STATE_REPLAY_FLUSHING);
- m_state = STATE_REPLAYING;
- }
-
- if (r < 0) {
- derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
- m_event_replay_tracker.finish_op();
- handle_replay_complete(r, "replay flush encountered an error");
- return;
- } else if (on_replay_interrupted()) {
- m_event_replay_tracker.finish_op();
- return;
- }
-
- get_remote_tag();
-}
-
-template <typename I>
-void ImageReplayer<I>::get_remote_tag() {
- dout(15) << "tag_tid: " << m_replay_tag_tid << dendl;
-
- Context *ctx = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
- m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_get_remote_tag(int r) {
- dout(15) << "r=" << r << dendl;
-
- if (r == 0) {
- try {
- auto it = m_replay_tag.data.cbegin();
- decode(m_replay_tag_data, it);
- } catch (const buffer::error &err) {
- r = -EBADMSG;
- }
- }
-
- if (r < 0) {
- derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
- << cpp_strerror(r) << dendl;
- m_event_replay_tracker.finish_op();
- handle_replay_complete(r, "failed to retrieve remote tag");
- return;
- }
-
- m_replay_tag_valid = true;
- dout(15) << "decoded remote tag " << m_replay_tag_tid << ": "
- << m_replay_tag_data << dendl;
-
- allocate_local_tag();
-}
-
-template <typename I>
-void ImageReplayer<I>::allocate_local_tag() {
- dout(15) << dendl;
-
- std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
- if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
- mirror_uuid = m_remote_image.mirror_uuid;
- } else if (mirror_uuid == m_local_mirror_uuid) {
- mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
- } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
- // handle possible edge condition where daemon can failover and
- // the local image has already been promoted/demoted
- auto local_tag_data = m_local_journal->get_tag_data();
- if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
- (local_tag_data.predecessor.commit_valid &&
- local_tag_data.predecessor.mirror_uuid ==
- librbd::Journal<>::LOCAL_MIRROR_UUID)) {
- dout(15) << "skipping stale demotion event" << dendl;
- handle_process_entry_safe(m_replay_entry, m_replay_start_time, 0);
- handle_replay_ready();
- return;
- } else {
- dout(5) << "encountered image demotion: stopping" << dendl;
- std::lock_guard locker{m_lock};
- m_stop_requested = true;
- }
- }
-
- librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
- if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
- predecessor.mirror_uuid = m_remote_image.mirror_uuid;
- } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
- predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
- }
-
- dout(15) << "mirror_uuid=" << mirror_uuid << ", "
- << "predecessor=" << predecessor << ", "
- << "replay_tag_tid=" << m_replay_tag_tid << dendl;
- Context *ctx = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
- m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_allocate_local_tag(int r) {
- dout(15) << "r=" << r << ", "
- << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
-
- if (r < 0) {
- derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
- m_event_replay_tracker.finish_op();
- handle_replay_complete(r, "failed to allocate journal tag");
- return;
- }
-
- preprocess_entry();
-}
-
-template <typename I>
-void ImageReplayer<I>::preprocess_entry() {
- dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
- << dendl;
-
- bufferlist data = m_replay_entry.get_data();
- auto it = data.cbegin();
- int r = m_local_replay->decode(&it, &m_event_entry);
- if (r < 0) {
- derr << "failed to decode journal event" << dendl;
- m_event_replay_tracker.finish_op();
- handle_replay_complete(r, "failed to decode journal event");
- return;
- }
-
- uint32_t delay = calculate_replay_delay(
- m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
- if (delay == 0) {
- handle_preprocess_entry_ready(0);
- return;
- }
-
- dout(20) << "delaying replay by " << delay << " sec" << dendl;
-
- std::lock_guard timer_locker{m_threads->timer_lock};
- ceph_assert(m_delayed_preprocess_task == nullptr);
- m_delayed_preprocess_task = new LambdaContext(
- [this](int r) {
- ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
- m_delayed_preprocess_task = nullptr;
- m_threads->work_queue->queue(
- create_context_callback<ImageReplayer,
- &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
- });
- m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
- dout(20) << "r=" << r << dendl;
- ceph_assert(r == 0);
-
- m_replay_start_time = ceph_clock_now();
- if (!m_event_preprocessor->is_required(m_event_entry)) {
- process_entry();
- return;
- }
-
- Context *ctx = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
- m_event_preprocessor->preprocess(&m_event_entry, ctx);
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
- dout(20) << "r=" << r << dendl;
-
- if (r < 0) {
- m_event_replay_tracker.finish_op();
-
- if (r == -ECANCELED) {
- handle_replay_complete(0, "lost exclusive lock");
- } else {
- derr << "failed to preprocess journal event" << dendl;
- handle_replay_complete(r, "failed to preprocess journal event");
- }
- return;
- }
-
- process_entry();
-}
-
-template <typename I>
-void ImageReplayer<I>::process_entry() {
- dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
- << dendl;
-
- // stop replaying events if stop has been requested
- if (on_replay_interrupted()) {
- m_event_replay_tracker.finish_op();
- return;
- }
-
- Context *on_ready = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
- Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry),
- m_replay_start_time);
-
- m_local_replay->process(m_event_entry, on_ready, on_commit);
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_process_entry_ready(int r) {
- dout(20) << dendl;
- ceph_assert(r == 0);
-
- bool update_status = false;
- {
- std::shared_lock image_locker{m_local_image_ctx->image_lock};
- if (m_local_image_name != m_local_image_ctx->name) {
- m_local_image_name = m_local_image_ctx->name;
- update_status = true;
- }
- }
-
- if (update_status) {
- update_mirror_image_status(false, {});
- }
-
- // attempt to process the next event
- handle_replay_ready();
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry &replay_entry,
- const utime_t &replay_start_time,
- int r) {
- dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
- << dendl;
-
- if (r < 0) {
- derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
- handle_replay_complete(r, "failed to commit journal event");
- } else {
- ceph_assert(m_remote_journaler != nullptr);
- m_remote_journaler->committed(replay_entry);
- }
-
- auto bytes = replay_entry.get_data().length();
- auto latency = ceph_clock_now() - replay_start_time;
-
- if (g_perf_counters) {
- g_perf_counters->inc(l_rbd_mirror_replay);
- g_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes);
- g_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
- }
-
- auto ctx = new LambdaContext(
- [this, bytes, latency](int r) {
- std::lock_guard locker{m_lock};
- schedule_flush_local_replay_task();
-
- if (m_perf_counters) {
- m_perf_counters->inc(l_rbd_mirror_replay);
- m_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes);
- m_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
- }
-
- m_event_replay_tracker.finish_op();
- });
- m_threads->work_queue->queue(ctx, 0);
-}
-
template <typename I>
void ImageReplayer<I>::update_mirror_image_status(
bool force, const OptionalState &opt_state) {
}
break;
case STATE_REPLAYING:
- case STATE_REPLAY_FLUSHING:
status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
{
+ std::string desc;
auto on_req_finish = new LambdaContext(
[this, force](int r) {
dout(15) << "replay status ready: r=" << r << dendl;
}
});
- std::string desc;
- ceph_assert(m_replay_status_formatter != nullptr);
- if (!m_replay_status_formatter->get_or_send_update(&desc,
- on_req_finish)) {
+ ceph_assert(m_replayer != nullptr);
+ if (!m_replayer->get_replay_status(&desc, on_req_finish)) {
dout(15) << "waiting for replay status" << dendl;
return;
}
void ImageReplayer<I>::shut_down(int r) {
dout(10) << "r=" << r << dendl;
- bool canceled_delayed_preprocess_task = false;
- {
- std::lock_guard timer_locker{m_threads->timer_lock};
- if (m_delayed_preprocess_task != nullptr) {
- canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
- m_delayed_preprocess_task);
- ceph_assert(canceled_delayed_preprocess_task);
- m_delayed_preprocess_task = nullptr;
- }
- }
- if (canceled_delayed_preprocess_task) {
- // wake up sleeping replay
- m_event_replay_tracker.finish_op();
- }
-
{
std::lock_guard locker{m_lock};
ceph_assert(m_state == STATE_STOPPING);
return;
}
- // NOTE: it's important to ensure that the local image is fully
- // closed before attempting to close the remote journal in
- // case the remote cluster is unreachable
-
// chain the shut down sequence (reverse order)
Context *ctx = new LambdaContext(
[this, r](int _r) {
handle_shut_down(r);
});
- // close the remote journal
+ // destruct the remote journaler created in prepare remote
if (m_remote_journaler != nullptr) {
ctx = new LambdaContext([this, ctx](int r) {
- delete m_remote_journaler;
- m_remote_journaler = nullptr;
- ctx->complete(0);
- });
- ctx = new LambdaContext([this, ctx](int r) {
- m_remote_journaler->remove_listener(&m_remote_listener);
- m_remote_journaler->shut_down(ctx);
- });
+ delete m_remote_journaler;
+ m_remote_journaler = nullptr;
+ ctx->complete(0);
+ });
}
- // stop the replay of remote journal events
- if (m_replay_handler != nullptr) {
+ // close the local image (if we aborted after a successful bootstrap)
+ if (m_local_image_ctx != nullptr) {
ctx = new LambdaContext([this, ctx](int r) {
- delete m_replay_handler;
- m_replay_handler = nullptr;
-
- m_event_replay_tracker.wait_for_ops(ctx);
- });
+ ceph_assert(m_local_image_ctx == nullptr);
+ ctx->complete(0);
+ });
ctx = new LambdaContext([this, ctx](int r) {
- m_remote_journaler->stop_replay(ctx);
- });
- }
+ if (m_local_image_ctx == nullptr) {
+ // never opened or closed via the replayer shutdown
+ ctx->complete(0);
+ return;
+ }
- // close the local image (release exclusive lock)
- if (m_local_image_ctx) {
- ctx = new LambdaContext([this, ctx](int r) {
auto request = image_replayer::CloseImageRequest<I>::create(
&m_local_image_ctx, ctx);
request->send();
});
}
- // shut down event replay into the local image
- if (m_local_journal != nullptr) {
- ctx = new LambdaContext([this, ctx](int r) {
- m_local_journal = nullptr;
- ctx->complete(0);
- });
- if (m_local_replay != nullptr) {
- ctx = new LambdaContext([this, ctx](int r) {
- m_local_journal->stop_external_replay();
- m_local_replay = nullptr;
-
- image_replayer::journal::EventPreprocessor<I>::destroy(
- m_event_preprocessor);
- m_event_preprocessor = nullptr;
- ctx->complete(0);
- });
- }
+ // close the replayer
+ if (m_replayer != nullptr) {
ctx = new LambdaContext([this, ctx](int r) {
- // blocks if listener notification is in-progress
- m_local_journal->remove_listener(m_journal_listener);
- ctx->complete(0);
- });
- }
-
- // wait for all local in-flight replay events to complete
- ctx = new LambdaContext([this, ctx](int r) {
- if (r < 0) {
- derr << "error shutting down journal replay: " << cpp_strerror(r)
- << dendl;
- }
-
- m_event_replay_tracker.wait_for_ops(ctx);
+ m_replayer->destroy();
+ m_replayer = nullptr;
+ ctx->complete(0);
});
-
- // flush any local in-flight replay events
- if (m_local_replay != nullptr) {
ctx = new LambdaContext([this, ctx](int r) {
- m_local_replay->shut_down(true, ctx);
- });
+ m_replayer->shut_down(ctx);
+ });
}
m_threads->work_queue->queue(ctx, 0);
}
dout(10) << "stop complete" << dendl;
- image_replayer::journal::ReplayStatusFormatter<I>::destroy(
- m_replay_status_formatter);
- m_replay_status_formatter = nullptr;
-
Context *on_start = nullptr;
Context *on_stop = nullptr;
{
std::swap(on_start, m_on_start_finish);
std::swap(on_stop, m_on_stop_finish);
m_stop_requested = false;
- ceph_assert(m_delayed_preprocess_task == nullptr);
ceph_assert(m_state == STATE_STOPPING);
m_state = STATE_STOPPED;
}
}
template <typename I>
-void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
- dout(20) << dendl;
+void ImageReplayer<I>::handle_replayer_notification() {
+ dout(10) << dendl;
- cls::journal::Client client;
+ // detect a rename of the local image
+ std::string local_image_name;
{
- std::lock_guard locker{m_lock};
- if (!is_running_()) {
- return;
- }
+ std::shared_lock image_locker{m_local_image_ctx->image_lock};
+ local_image_name = m_local_image_ctx->name;
+ }
- int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
- if (r < 0) {
- derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
- return;
+ {
+ std::unique_lock locker{m_lock};
+ ceph_assert(m_state == STATE_REPLAYING);
+ ceph_assert(m_replayer != nullptr);
+
+ if (m_local_image_name != local_image_name) {
+ // will re-register with new name after next status update
+ dout(10) << "image renamed" << dendl;
+ m_local_image_name = local_image_name;
}
}
- if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
- dout(0) << "client flagged disconnected, stopping image replay" << dendl;
- stop(nullptr, false, -ENOTCONN, "disconnected");
+ // replayer cannot be shut down while notification is in-flight
+ if (m_replayer->is_resync_requested()) {
+ dout(10) << "resync requested" << dendl;
+ m_resync_requested = true;
+ on_stop_journal_replay(0, "resync requested");
+ return;
}
+
+ if (!m_replayer->is_replaying()) {
+ dout(10) << "replay interrupted" << dendl;
+ on_stop_journal_replay(m_replayer->get_error_code(),
+ m_replayer->get_error_description());
+ return;
+ }
+
+ update_mirror_image_status(false, {});
}
template <typename I>
return "Starting";
case ImageReplayer<I>::STATE_REPLAYING:
return "Replaying";
- case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
- return "ReplayFlushing";
case ImageReplayer<I>::STATE_STOPPING:
return "Stopping";
case ImageReplayer<I>::STATE_STOPPED:
return "Unknown(" + stringify(state) + ")";
}
-template <typename I>
-void ImageReplayer<I>::resync_image(Context *on_finish) {
- dout(10) << dendl;
-
- m_resync_requested = true;
- stop(on_finish);
-}
-
template <typename I>
void ImageReplayer<I>::register_admin_socket_hook() {
ImageReplayerAdminSocketHook<I> *asok_hook;
return;
}
- ceph_assert(m_perf_counters == nullptr);
-
dout(15) << "registered asok hook: " << m_image_spec << dendl;
asok_hook = new ImageReplayerAdminSocketHook<I>(
g_ceph_context, m_image_spec, this);
int r = asok_hook->register_commands();
if (r == 0) {
m_asok_hook = asok_hook;
-
- CephContext *cct = static_cast<CephContext *>(m_local_io_ctx.cct());
- auto prio = cct->_conf.get_val<int64_t>(
- "rbd_mirror_image_perf_stats_prio");
- PerfCountersBuilder plb(g_ceph_context,
- "rbd_mirror_image_" + m_image_spec,
- l_rbd_mirror_first, l_rbd_mirror_last);
- plb.add_u64_counter(l_rbd_mirror_replay, "replay", "Replays", "r", prio);
- plb.add_u64_counter(l_rbd_mirror_replay_bytes, "replay_bytes",
- "Replayed data", "rb", prio, unit_t(UNIT_BYTES));
- plb.add_time_avg(l_rbd_mirror_replay_latency, "replay_latency",
- "Replay latency", "rl", prio);
- m_perf_counters = plb.create_perf_counters();
- g_ceph_context->get_perfcounters_collection()->add(m_perf_counters);
-
return;
}
derr << "error registering admin socket commands" << dendl;
dout(15) << dendl;
AdminSocketHook *asok_hook = nullptr;
- PerfCounters *perf_counters = nullptr;
{
std::lock_guard locker{m_lock};
std::swap(asok_hook, m_asok_hook);
- std::swap(perf_counters, m_perf_counters);
}
delete asok_hook;
- if (perf_counters != nullptr) {
- g_ceph_context->get_perfcounters_collection()->remove(perf_counters);
- delete perf_counters;
- }
}
template <typename I>
#include "include/rados/librados.hpp"
#include "cls/journal/cls_journal_types.h"
#include "cls/rbd/cls_rbd_types.h"
-#include "journal/JournalMetadataListener.h"
-#include "journal/ReplayEntry.h"
#include "librbd/ImageCtx.h"
#include "librbd/journal/Types.h"
#include "librbd/journal/TypeTraits.h"
#include "ProgressContext.h"
#include "tools/rbd_mirror/Types.h"
#include "tools/rbd_mirror/image_replayer/Types.h"
-
-#include <boost/noncopyable.hpp>
#include <boost/optional.hpp>
-
-#include <set>
-#include <map>
-#include <atomic>
#include <string>
-#include <vector>
class AdminSocketHook;
-class PerfCounters;
namespace journal {
struct CacheManagerHandler;
class Journaler;
-class ReplayHandler;
} // namespace journal
namespace librbd {
class ImageCtx;
-namespace journal { template <typename> class Replay; }
} // namespace librbd
namespace image_replayer {
+class Replayer;
template <typename> class BootstrapRequest;
-namespace journal {
-
-template <typename> class EventPreprocessor;
-template <typename> class ReplayStatusFormatter;
-
-} // namespace journal
} // namespace image_replayer
/**
void restart(Context *on_finish = nullptr);
void flush();
- void resync_image(Context *on_finish=nullptr);
-
void print_status(Formatter *f);
- virtual void handle_replay_ready();
- virtual void handle_replay_complete(int r, const std::string &error_desc);
-
protected:
/**
* @verbatim
* BOOTSTRAP_IMAGE * * * * * * * * * * * * * * * * * * * *
* | *
* v (error) *
- * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * *
- * | *
- * v (error) *
* START_REPLAY * * * * * * * * * * * * * * * * * * * * * *
* |
- * | /--------------------------------------------\
- * | | |
- * v v (asok flush) |
- * REPLAYING -------------> LOCAL_REPLAY_FLUSH |
- * | \ | |
- * | | v |
- * | | FLUSH_COMMIT_POSITION |
- * | | | |
- * | | \--------------------/|
- * | | |
- * | | (entries available) |
- * | \-----------> REPLAY_READY |
- * | | |
- * | | (skip if not |
- * | v needed) (error)
- * | REPLAY_FLUSH * * * * * * * * *
- * | | | *
- * | | (skip if not | *
- * | v needed) (error) *
- * | GET_REMOTE_TAG * * * * * * * *
- * | | | *
- * | | (skip if not | *
- * | v needed) (error) *
- * | ALLOCATE_LOCAL_TAG * * * * * *
- * | | | *
- * | v (error) *
- * | PREPROCESS_ENTRY * * * * * * *
- * | | | *
- * | v (error) *
- * | PROCESS_ENTRY * * * * * * * * *
- * | | | *
- * | \---------------------/ *
- * v *
- * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * *
+ * v
+ * REPLAYING
* |
* v
* JOURNAL_REPLAY_SHUT_DOWN
private:
typedef std::set<Peer<ImageCtxT>> Peers;
- typedef typename librbd::journal::TypeTraits<ImageCtxT>::ReplayEntry ReplayEntry;
enum State {
STATE_UNKNOWN,
STATE_STARTING,
STATE_REPLAYING,
- STATE_REPLAY_FLUSHING,
STATE_STOPPING,
STATE_STOPPED,
};
: io_ctx(peer.io_ctx), mirror_status_updater(peer.mirror_status_updater) {
}
};
+ struct ReplayerListener;
typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
typedef boost::optional<State> OptionalState;
typedef boost::optional<cls::rbd::MirrorImageStatusState>
OptionalMirrorImageStatusState;
- struct JournalListener : public librbd::journal::Listener {
- ImageReplayer *img_replayer;
-
- JournalListener(ImageReplayer *img_replayer)
- : img_replayer(img_replayer) {
- }
-
- void handle_close() override {
- img_replayer->on_stop_journal_replay();
- }
-
- void handle_promoted() override {
- img_replayer->on_stop_journal_replay(0, "force promoted");
- }
-
- void handle_resync() override {
- img_replayer->resync_image();
- }
- };
-
class BootstrapProgressContext : public ProgressContext {
public:
BootstrapProgressContext(ImageReplayer<ImageCtxT> *replayer) :
bool m_delete_requested = false;
bool m_resync_requested = false;
- image_replayer::journal::EventPreprocessor<ImageCtxT>*
- m_event_preprocessor = nullptr;
- image_replayer::journal::ReplayStatusFormatter<ImageCtxT>*
- m_replay_status_formatter = nullptr;
ImageCtxT *m_local_image_ctx = nullptr;
std::string m_local_image_tag_owner;
decltype(ImageCtxT::journal) m_local_journal = nullptr;
- librbd::journal::Replay<ImageCtxT> *m_local_replay = nullptr;
Journaler* m_remote_journaler = nullptr;
- ::journal::ReplayHandler *m_replay_handler = nullptr;
- librbd::journal::Listener *m_journal_listener;
+
+ image_replayer::Replayer* m_replayer = nullptr;
+ ReplayerListener* m_replayer_listener = nullptr;
Context *m_on_start_finish = nullptr;
Context *m_on_stop_finish = nullptr;
bool m_manual_stop = false;
AdminSocketHook *m_asok_hook = nullptr;
- PerfCounters *m_perf_counters = nullptr;
image_replayer::BootstrapRequest<ImageCtxT> *m_bootstrap_request = nullptr;
cls::journal::CLIENT_STATE_DISCONNECTED;
librbd::journal::MirrorPeerClientMeta m_client_meta;
- ReplayEntry m_replay_entry;
- utime_t m_replay_start_time;
- bool m_replay_tag_valid = false;
- uint64_t m_replay_tag_tid = 0;
- cls::journal::Tag m_replay_tag;
- librbd::journal::TagData m_replay_tag_data;
- librbd::journal::EventEntry m_event_entry;
- AsyncOpTracker m_event_replay_tracker;
- Context *m_delayed_preprocess_task = nullptr;
- Context* m_periodic_flush_task = nullptr;
-
AsyncOpTracker m_in_flight_op_tracker;
- Context *m_flush_local_replay_task = nullptr;
-
- struct RemoteJournalerListener : public ::journal::JournalMetadataListener {
- ImageReplayer *replayer;
-
- RemoteJournalerListener(ImageReplayer *replayer) : replayer(replayer) { }
-
- void handle_update(::journal::JournalMetadata *) override;
- } m_remote_listener;
-
- struct C_ReplayCommitted : public Context {
- ImageReplayer *replayer;
- ReplayEntry replay_entry;
- utime_t replay_start_time;
-
- C_ReplayCommitted(ImageReplayer *replayer,
- ReplayEntry &&replay_entry,
- const utime_t &replay_start_time)
- : replayer(replayer), replay_entry(std::move(replay_entry)),
- replay_start_time(replay_start_time) {
- }
- void finish(int r) override {
- replayer->handle_process_entry_safe(replay_entry, replay_start_time, r);
- }
- };
static std::string to_string(const State state);
return !is_stopped_() && m_state != STATE_STOPPING && !m_stop_requested;
}
bool is_replaying_() const {
- return (m_state == STATE_REPLAYING ||
- m_state == STATE_REPLAY_FLUSHING);
+ return (m_state == STATE_REPLAYING);
}
- void schedule_flush_local_replay_task();
- void cancel_flush_local_replay_task();
- void handle_flush_local_replay_task(int r);
-
- void flush_local_replay(Context* on_flush);
- void handle_flush_local_replay(Context* on_flush, int r);
-
- void flush_commit_position(Context* on_flush);
- void handle_flush_commit_position(Context* on_flush, int r);
-
void update_mirror_image_status(bool force, const OptionalState &state);
void set_mirror_image_status_update(bool force, const OptionalState &state);
void shut_down(int r);
void handle_shut_down(int r);
- void handle_remote_journal_metadata_updated();
void prepare_local_image();
void handle_prepare_local_image(int r);
void bootstrap();
void handle_bootstrap(int r);
- void init_remote_journaler();
- void handle_init_remote_journaler(int r);
-
void start_replay();
void handle_start_replay(int r);
- void replay_flush();
- void handle_replay_flush(int r);
-
- void get_remote_tag();
- void handle_get_remote_tag(int r);
-
- void allocate_local_tag();
- void handle_allocate_local_tag(int r);
-
- void preprocess_entry();
- void handle_preprocess_entry_ready(int r);
- void handle_preprocess_entry_safe(int r);
-
- void process_entry();
- void handle_process_entry_ready(int r);
- void handle_process_entry_safe(const ReplayEntry& replay_entry,
- const utime_t &m_replay_start_time, int r);
+ void handle_replayer_notification();
void register_admin_socket_hook();
void unregister_admin_socket_hook();
struct Replayer {
virtual ~Replayer() {}
+ virtual void destroy() = 0;
+
virtual void init(Context* on_finish) = 0;
virtual void shut_down(Context* on_finish) = 0;
template <typename I>
Replayer<I>::Replayer(
I** local_image_ctx, Journaler* remote_journaler,
- std::string local_mirror_uuid, std::string remote_mirror_uuid,
+ const std::string& local_mirror_uuid, const std::string& remote_mirror_uuid,
ReplayerListener* replayer_listener, Threads<I>* threads)
: m_local_image_ctx(local_image_ctx),
m_remote_journaler(remote_journaler),
m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
"rbd::mirror::image_replayer::journal::Replayer", this))) {
dout(10) << dendl;
+
+ {
+ std::unique_lock locker{m_lock};
+ register_perf_counters();
+ }
}
template <typename I>
Replayer<I>::~Replayer() {
dout(10) << dendl;
+
+ {
+ std::unique_lock locker{m_lock};
+ unregister_perf_counters();
+ }
+
ceph_assert(m_remote_listener == nullptr);
ceph_assert(m_local_journal_listener == nullptr);
ceph_assert(m_local_journal_replay == nullptr);
}
if (update_status) {
+ unregister_perf_counters();
+ register_perf_counters();
notify_status_updated();
}
return 0;
}
+template <typename I>
+void Replayer<I>::register_perf_counters() {
+ dout(5) << dendl;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ ceph_assert(m_perf_counters == nullptr);
+
+ auto cct = static_cast<CephContext *>((*m_local_image_ctx)->cct);
+ auto prio = cct->_conf.get_val<int64_t>("rbd_mirror_image_perf_stats_prio");
+ PerfCountersBuilder plb(g_ceph_context, "rbd_mirror_image_" + m_image_spec,
+ l_rbd_mirror_first, l_rbd_mirror_last);
+ plb.add_u64_counter(l_rbd_mirror_replay, "replay", "Replays", "r", prio);
+ plb.add_u64_counter(l_rbd_mirror_replay_bytes, "replay_bytes",
+ "Replayed data", "rb", prio, unit_t(UNIT_BYTES));
+ plb.add_time_avg(l_rbd_mirror_replay_latency, "replay_latency",
+ "Replay latency", "rl", prio);
+ m_perf_counters = plb.create_perf_counters();
+ g_ceph_context->get_perfcounters_collection()->add(m_perf_counters);
+}
+
+template <typename I>
+void Replayer<I>::unregister_perf_counters() {
+ dout(5) << dendl;
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ PerfCounters *perf_counters = nullptr;
+ std::swap(perf_counters, m_perf_counters);
+
+ if (perf_counters != nullptr) {
+ g_ceph_context->get_perfcounters_collection()->remove(perf_counters);
+ delete perf_counters;
+ }
+}
+
} // namespace journal
} // namespace image_replayer
} // namespace mirror
public:
typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
+ static Replayer* create(ImageCtxT** local_image_ctx,
+ Journaler* remote_journaler,
+ const std::string& local_mirror_uuid,
+ const std::string& remote_mirror_uuid,
+ ReplayerListener* replayer_listener,
+ Threads<ImageCtxT>* threads) {
+ return new Replayer(local_image_ctx, remote_journaler, local_mirror_uuid,
+ remote_mirror_uuid, replayer_listener, threads);
+ }
+
Replayer(
ImageCtxT** local_image_ctx, Journaler* remote_journaler,
- std::string m_local_mirror_uuid, std::string m_remote_mirror_uuid,
+ const std::string& local_mirror_uuid,
+ const std::string& remote_mirror_uuid,
ReplayerListener* replayer_listener, Threads<ImageCtxT>* threads);
~Replayer();
+ void destroy() override {
+ delete this;
+ }
+
void init(Context* on_finish) override;
void shut_down(Context* on_finish) override;
librbd::journal::MirrorPeerClientMeta* remote_client_meta,
bool* resync_requested, std::string* error);
+ void register_perf_counters();
+ void unregister_perf_counters();
+
};
} // namespace journal