return 0;
}
+ void expect_local_journal_add_listener(
+ librbd::MockTestJournal& mock_local_journal,
+ librbd::journal::Listener** local_journal_listener) {
+ EXPECT_CALL(mock_local_journal, add_listener(_))
+ .WillOnce(SaveArg<0>(local_journal_listener));
+ expect_is_tag_owner(mock_local_journal, false);
+ expect_is_resync_requested(mock_local_journal, 0, false);
+ }
+
int init_entry_replayer(MockReplayer& mock_replayer,
MockThreads& mock_threads,
MockReplayerListener& mock_replayer_listener,
{librbd::journal::MirrorPeerClientMeta{}}, 0);
expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
0);
- EXPECT_CALL(mock_local_journal, add_listener(_))
- .WillOnce(SaveArg<0>(local_journal_listener));
- expect_is_tag_owner(mock_local_journal, false);
- expect_is_resync_requested(mock_local_journal, 0, false);
+ expect_local_journal_add_listener(mock_local_journal,
+ local_journal_listener);
EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _))
.WillOnce(SaveArg<0>(remote_replay_handler));
expect_notification(mock_threads, mock_replayer_listener);
mock_local_journal_replay));
}
-TEST_F(TestMockImageReplayerJournalReplayer, InitNoLocalJournal) {
+TEST_F(TestMockImageReplayerJournalReplayer, InitRemoteJournalerError) {
librbd::MockTestJournal mock_local_journal;
librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
mock_local_journal};
::journal::MockJournaler mock_remote_journaler;
MockReplayerListener mock_replayer_listener;
MockThreads mock_threads{m_threads};
-
- mock_local_image_ctx.journal = nullptr;
MockStateBuilder mock_state_builder(mock_local_image_ctx,
mock_remote_journaler,
{});
InSequence seq;
+ expect_init(mock_remote_journaler, -EINVAL);
MockCloseImageRequest mock_close_image_request;
expect_send(mock_close_image_request, 0);
ASSERT_EQ(-EINVAL, init_ctx.wait());
}
-TEST_F(TestMockImageReplayerJournalReplayer, InitRemoteJournalerError) {
+TEST_F(TestMockImageReplayerJournalReplayer, InitRemoteJournalerGetClientError) {
librbd::MockTestJournal mock_local_journal;
librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
mock_local_journal};
InSequence seq;
- expect_init(mock_remote_journaler, -EINVAL);
+ expect_init(mock_remote_journaler, 0);
+ EXPECT_CALL(mock_remote_journaler, add_listener(_));
+ expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {},
+ {librbd::journal::MirrorPeerClientMeta{}}, -EINVAL);
MockCloseImageRequest mock_close_image_request;
expect_send(mock_close_image_request, 0);
+ EXPECT_CALL(mock_remote_journaler, remove_listener(_));
C_SaferCond init_ctx;
mock_replayer.init(&init_ctx);
ASSERT_EQ(-EINVAL, init_ctx.wait());
}
-TEST_F(TestMockImageReplayerJournalReplayer, InitRemoteJournalerGetClientError) {
+TEST_F(TestMockImageReplayerJournalReplayer, InitNoLocalJournal) {
librbd::MockTestJournal mock_local_journal;
librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
mock_local_journal};
::journal::MockJournaler mock_remote_journaler;
MockReplayerListener mock_replayer_listener;
MockThreads mock_threads{m_threads};
+
+ mock_local_image_ctx.journal = nullptr;
MockStateBuilder mock_state_builder(mock_local_image_ctx,
mock_remote_journaler,
{});
expect_work_queue_repeatedly(mock_threads);
InSequence seq;
-
expect_init(mock_remote_journaler, 0);
EXPECT_CALL(mock_remote_journaler, add_listener(_));
expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {},
- {librbd::journal::MirrorPeerClientMeta{}}, -EINVAL);
+ {librbd::journal::MirrorPeerClientMeta{}}, 0);
+
MockCloseImageRequest mock_close_image_request;
expect_send(mock_close_image_request, 0);
EXPECT_CALL(mock_remote_journaler, remove_listener(_));
// replay_flush
expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
EXPECT_CALL(mock_local_journal, stop_external_replay());
expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
0);
+ expect_local_journal_add_listener(mock_local_journal,
+ &local_journal_listener);
expect_get_tag(mock_remote_journaler, tag, 0);
expect_allocate_tag(mock_local_journal, 0);
// replay_flush
expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
EXPECT_CALL(mock_local_journal, stop_external_replay());
expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
0);
+ expect_local_journal_add_listener(mock_local_journal,
+ &local_journal_listener);
expect_get_tag(mock_remote_journaler, tag, 0);
expect_allocate_tag(mock_local_journal, 0);
// replay_flush
expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
EXPECT_CALL(mock_local_journal, stop_external_replay());
expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
0);
+ expect_local_journal_add_listener(mock_local_journal,
+ &local_journal_listener);
expect_get_tag(mock_remote_journaler, tag, 0);
expect_allocate_tag(mock_local_journal, 0);
expect_try_pop_front(mock_remote_journaler, 1, true);
expect_shut_down(mock_local_journal_replay, false, -EINVAL);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
EXPECT_CALL(mock_local_journal, stop_external_replay());
expect_notification(mock_threads, mock_replayer_listener);
remote_replay_handler->handle_entries_available();
wait_for_notification();
ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
- EXPECT_CALL(mock_local_journal, remove_listener(_));
MockCloseImageRequest mock_close_image_request;
expect_send(mock_close_image_request, 0);
expect_stop_replay(mock_remote_journaler, 0);
expect_try_pop_front(mock_remote_journaler, 1, true);
expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
EXPECT_CALL(mock_local_journal, stop_external_replay());
expect_start_external_replay(mock_local_journal, nullptr, -EINVAL);
expect_notification(mock_threads, mock_replayer_listener);
wait_for_notification();
ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
- EXPECT_CALL(mock_local_journal, remove_listener(_));
MockCloseImageRequest mock_close_image_request;
expect_send(mock_close_image_request, 0);
expect_stop_replay(mock_remote_journaler, 0);
true, 0, 0})};
expect_try_pop_front(mock_remote_journaler, tag.tid, true);
expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
EXPECT_CALL(mock_local_journal, stop_external_replay());
expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
0);
+ expect_local_journal_add_listener(mock_local_journal,
+ &local_journal_listener);
expect_get_tag(mock_remote_journaler, tag, -EINVAL);
expect_notification(mock_threads, mock_replayer_listener);
remote_replay_handler->handle_entries_available();
expect_try_pop_front(mock_remote_journaler, tag.tid, true);
expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
EXPECT_CALL(mock_local_journal, stop_external_replay());
expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
0);
+ expect_local_journal_add_listener(mock_local_journal,
+ &local_journal_listener);
expect_get_tag(mock_remote_journaler, tag, 0);
expect_get_tag_data(mock_local_journal, {});
expect_allocate_tag(mock_local_journal, 0);
expect_try_pop_front(mock_remote_journaler, tag.tid, true);
expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
EXPECT_CALL(mock_local_journal, stop_external_replay());
expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
0);
+ expect_local_journal_add_listener(mock_local_journal,
+ &local_journal_listener);
expect_get_tag(mock_remote_journaler, tag, 0);
expect_allocate_tag(mock_local_journal, -EINVAL);
expect_notification(mock_threads, mock_replayer_listener);
expect_try_pop_front(mock_remote_journaler, tag.tid, true);
expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
EXPECT_CALL(mock_local_journal, stop_external_replay());
expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
0);
+ expect_local_journal_add_listener(mock_local_journal,
+ &local_journal_listener);
expect_get_tag(mock_remote_journaler, tag, 0);
expect_allocate_tag(mock_local_journal, 0);
EXPECT_CALL(mock_replay_entry, get_data());
expect_try_pop_front(mock_remote_journaler, tag.tid, true);
expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
EXPECT_CALL(mock_local_journal, stop_external_replay());
expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
0);
+ expect_local_journal_add_listener(mock_local_journal,
+ &local_journal_listener);
expect_get_tag(mock_remote_journaler, tag, 0);
expect_allocate_tag(mock_local_journal, 0);
EXPECT_CALL(mock_replay_entry, get_data());
expect_try_pop_front(mock_remote_journaler, tag.tid, true);
expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
EXPECT_CALL(mock_local_journal, stop_external_replay());
expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
0);
+ expect_local_journal_add_listener(mock_local_journal,
+ &local_journal_listener);
expect_get_tag(mock_remote_journaler, tag, 0);
expect_allocate_tag(mock_local_journal, 0);
EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
void Replayer<I>::init(Context* on_finish) {
dout(10) << dendl;
- ceph_assert(m_local_journal == nullptr);
{
auto local_image_ctx = m_state_builder->local_image_ctx;
std::shared_lock image_locker{local_image_ctx->image_lock};
m_image_spec = util::compute_image_spec(local_image_ctx->md_ctx,
local_image_ctx->name);
- m_local_journal = local_image_ctx->journal;
}
ceph_assert(m_on_init_shutdown == nullptr);
m_on_init_shutdown = on_finish;
- if (m_local_journal == nullptr) {
- std::unique_lock locker{m_lock};
- m_state = STATE_COMPLETE;
- m_state_builder->remote_journaler = nullptr;
-
- handle_replay_complete(locker, -EINVAL, "error accessing local journal");
- close_local_image();
- return;
- }
-
init_remote_journaler();
}
return;
}
- start_external_replay();
+ start_external_replay(locker);
}
template <typename I>
-void Replayer<I>::start_external_replay() {
+void Replayer<I>::start_external_replay(std::unique_lock<ceph::mutex>& locker) {
dout(10) << dendl;
+ auto local_image_ctx = m_state_builder->local_image_ctx;
+ std::shared_lock local_image_locker{local_image_ctx->image_lock};
+
+ ceph_assert(m_local_journal == nullptr);
+ m_local_journal = local_image_ctx->journal;
+ if (m_local_journal == nullptr) {
+ local_image_locker.unlock();
+
+ derr << "local image journal closed" << dendl;
+ handle_replay_complete(locker, -EINVAL, "error accessing local journal");
+ close_local_image();
+ return;
+ }
+
+ // safe to hold pointer to journal after external playback starts
Context *start_ctx = create_context_callback<
Replayer, &Replayer<I>::handle_start_external_replay>(this);
m_local_journal->start_external_replay(&m_local_journal_replay, start_ctx);
m_state = STATE_REPLAYING;
+ // check for resync/promotion state after adding listener
+ if (!add_local_journal_listener(locker)) {
+ return;
+ }
+
+ // start remote journal replay
+ m_event_preprocessor = EventPreprocessor<I>::create(
+ *m_state_builder->local_image_ctx, *m_state_builder->remote_journaler,
+ m_local_mirror_uuid, &m_state_builder->remote_client_meta,
+ m_threads->work_queue);
+ m_replay_status_formatter = ReplayStatusFormatter<I>::create(
+ m_state_builder->remote_journaler, m_local_mirror_uuid);
+
+ auto cct = static_cast<CephContext *>(m_state_builder->local_image_ctx->cct);
+ double poll_seconds = cct->_conf.get_val<double>(
+ "rbd_mirror_journal_poll_age");
+ m_remote_replay_handler = new RemoteReplayHandler(this);
+ m_state_builder->remote_journaler->start_live_replay(m_remote_replay_handler,
+ poll_seconds);
+
+ notify_status_updated();
+}
+
+template <typename I>
+bool Replayer<I>::add_local_journal_listener(
+ std::unique_lock<ceph::mutex>& locker) {
+ dout(10) << dendl;
+
// listen for promotion and resync requests against local journal
+ ceph_assert(m_local_journal_listener == nullptr);
m_local_journal_listener = new LocalJournalListener(this);
m_local_journal->add_listener(m_local_journal_listener);
if (m_local_journal->is_tag_owner()) {
dout(10) << "local image force-promoted" << dendl;
handle_replay_complete(locker, 0, "force promoted");
- return;
+ return false;
}
bool resync_requested = false;
- r = m_local_journal->is_resync_requested(&resync_requested);
+ int r = m_local_journal->is_resync_requested(&resync_requested);
if (r < 0) {
dout(10) << "failed to determine resync state: " << cpp_strerror(r)
<< dendl;
handle_replay_complete(locker, r, "error parsing resync state");
- return;
+ return false;
} else if (resync_requested) {
dout(10) << "local image resync requested" << dendl;
handle_replay_complete(locker, 0, "resync requested");
- return;
+ return false;
}
- // start remote journal replay
- m_event_preprocessor = EventPreprocessor<I>::create(
- *m_state_builder->local_image_ctx, *m_state_builder->remote_journaler,
- m_local_mirror_uuid, &m_state_builder->remote_client_meta,
- m_threads->work_queue);
- m_replay_status_formatter = ReplayStatusFormatter<I>::create(
- m_state_builder->remote_journaler, m_local_mirror_uuid);
-
- auto cct = static_cast<CephContext *>(m_state_builder->local_image_ctx->cct);
- double poll_seconds = cct->_conf.get_val<double>(
- "rbd_mirror_journal_poll_age");
- m_remote_replay_handler = new RemoteReplayHandler(this);
- m_state_builder->remote_journaler->start_live_replay(m_remote_replay_handler,
- poll_seconds);
-
- notify_status_updated();
+ return true;
}
template <typename I>
template <typename I>
void Replayer<I>::handle_replay_flush_shut_down(int r) {
- {
- std::unique_lock locker{m_lock};
- ceph_assert(m_local_journal != nullptr);
- m_local_journal->stop_external_replay();
- m_local_journal_replay = nullptr;
- }
-
+ std::unique_lock locker{m_lock};
dout(10) << "r=" << r << dendl;
+
+ ceph_assert(m_local_journal != nullptr);
+ ceph_assert(m_local_journal_listener != nullptr);
+
+ // blocks if listener notification is in-progress
+ m_local_journal->remove_listener(m_local_journal_listener);
+ delete m_local_journal_listener;
+ m_local_journal_listener = nullptr;
+
+ m_local_journal->stop_external_replay();
+ m_local_journal_replay = nullptr;
+ m_local_journal.reset();
+
if (r < 0) {
+ locker.unlock();
+
handle_replay_flush(r);
return;
}
+ // journal might have been closed now that we stopped external replay
+ auto local_image_ctx = m_state_builder->local_image_ctx;
+ std::shared_lock local_image_locker{local_image_ctx->image_lock};
+ m_local_journal = local_image_ctx->journal;
+ if (m_local_journal == nullptr) {
+ local_image_locker.unlock();
+ locker.unlock();
+
+ derr << "local image journal closed" << dendl;
+ handle_replay_flush(-EINVAL);
+ return;
+ }
+
auto ctx = create_context_callback<
Replayer<I>, &Replayer<I>::handle_replay_flush>(this);
m_local_journal->start_external_replay(&m_local_journal_replay, ctx);
template <typename I>
void Replayer<I>::handle_replay_flush(int r) {
+ std::unique_lock locker{m_lock};
dout(10) << "r=" << r << dendl;
if (r < 0) {
derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
- handle_replay_complete(r, "replay flush encountered an error");
+ handle_replay_complete(locker, r, "replay flush encountered an error");
+ m_event_replay_tracker.finish_op();
+ return;
+ } else if (is_replay_complete(locker)) {
m_event_replay_tracker.finish_op();
return;
- } else if (is_replay_complete()) {
+ }
+
+ // check for resync/promotion state after adding listener
+ if (!add_local_journal_listener(locker)) {
m_event_replay_tracker.finish_op();
return;
}
+ locker.unlock();
get_remote_tag();
}