librbd::MockTestImageCtx& mock_remote_image_ctx,
MockReplayerListener& mock_replayer_listener,
librbd::UpdateWatchCtx** update_watch_ctx) {
- expect_register_update_watcher(mock_remote_image_ctx, update_watch_ctx, 123,
+ expect_register_update_watcher(mock_local_image_ctx, update_watch_ctx, 123,
+ 0);
+ expect_register_update_watcher(mock_remote_image_ctx, update_watch_ctx, 234,
0);
expect_is_refresh_required(mock_local_image_ctx, false);
expect_is_refresh_required(mock_remote_image_ctx, false);
int shut_down_entry_replayer(MockReplayer& mock_replayer,
MockThreads& mock_threads,
+ librbd::MockTestImageCtx& mock_local_image_ctx,
librbd::MockTestImageCtx& mock_remote_image_ctx) {
- expect_unregister_update_watcher(mock_remote_image_ctx, 123, 0);
+ expect_unregister_update_watcher(mock_remote_image_ctx, 234, 0);
+ expect_unregister_update_watcher(mock_local_image_ctx, 123, 0);
C_SaferCond shutdown_ctx;
mock_replayer.shut_down(&shutdown_ctx);
mock_replayer_listener,
&update_watch_ctx));
ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_image_ctx,
mock_remote_image_ctx));
}
librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
// init
- expect_register_update_watcher(mock_remote_image_ctx, &update_watch_ctx, 123,
+ expect_register_update_watcher(mock_local_image_ctx, &update_watch_ctx, 123,
+ 0);
+ expect_register_update_watcher(mock_remote_image_ctx, &update_watch_ctx, 234,
0);
// sync snap1
// shut down
ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_image_ctx,
mock_remote_image_ctx));
}
ASSERT_EQ(0, wait_for_notification(2));
ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_image_ctx,
mock_remote_image_ctx));
}
ASSERT_FALSE(mock_replayer.is_replaying());
ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_image_ctx,
mock_remote_image_ctx));
}
ASSERT_FALSE(mock_replayer.is_replaying());
ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_image_ctx,
mock_remote_image_ctx));
}
-TEST_F(TestMockImageReplayerSnapshotReplayer, RegisterUpdateWatcherError) {
+TEST_F(TestMockImageReplayerSnapshotReplayer, RegisterLocalUpdateWatcherError) {
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
+ librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
+
+ MockThreads mock_threads(m_threads);
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ MockStateBuilder mock_state_builder(mock_local_image_ctx,
+ mock_remote_image_ctx);
+ MockReplayerListener mock_replayer_listener;
+ MockReplayer mock_replayer{&mock_threads, "local mirror uuid",
+ &m_pool_meta_cache, &mock_state_builder,
+ &mock_replayer_listener};
+ m_pool_meta_cache.set_remote_pool_meta(
+ m_remote_io_ctx.get_id(),
+ {"remote mirror uuid", "remote mirror peer uuid"});
+
+ // init
+ librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
+ expect_register_update_watcher(mock_local_image_ctx, &update_watch_ctx, 123,
+ -EINVAL);
+
+ // fire init
+ C_SaferCond init_ctx;
+ mock_replayer.init(&init_ctx);
+ ASSERT_EQ(-EINVAL, init_ctx.wait());
+}
+
+TEST_F(TestMockImageReplayerSnapshotReplayer, RegisterRemoteUpdateWatcherError) {
librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
// init
librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
- expect_register_update_watcher(mock_remote_image_ctx, &update_watch_ctx, 123,
+ expect_register_update_watcher(mock_local_image_ctx, &update_watch_ctx, 123,
+ 0);
+ expect_register_update_watcher(mock_remote_image_ctx, &update_watch_ctx, 234,
-EINVAL);
+ expect_unregister_update_watcher(mock_local_image_ctx, 123, 0);
+
// fire init
C_SaferCond init_ctx;
mock_replayer.init(&init_ctx);
ASSERT_EQ(-EINVAL, init_ctx.wait());
}
-TEST_F(TestMockImageReplayerSnapshotReplayer, UnregisterUpdateWatcherError) {
+TEST_F(TestMockImageReplayerSnapshotReplayer, UnregisterRemoteUpdateWatcherError) {
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
+ librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
+
+ MockThreads mock_threads(m_threads);
+ expect_work_queue_repeatedly(mock_threads);
+
+ MockReplayerListener mock_replayer_listener;
+ expect_notification(mock_threads, mock_replayer_listener);
+
+ InSequence seq;
+
+ MockStateBuilder mock_state_builder(mock_local_image_ctx,
+ mock_remote_image_ctx);
+ MockReplayer mock_replayer{&mock_threads, "local mirror uuid",
+ &m_pool_meta_cache, &mock_state_builder,
+ &mock_replayer_listener};
+ m_pool_meta_cache.set_remote_pool_meta(
+ m_remote_io_ctx.get_id(),
+ {"remote mirror uuid", "remote mirror peer uuid"});
+
+ librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_local_image_ctx,
+ mock_remote_image_ctx,
+ mock_replayer_listener,
+ &update_watch_ctx));
+
+
+ // shut down
+ expect_unregister_update_watcher(mock_remote_image_ctx, 234, -EINVAL);
+ expect_unregister_update_watcher(mock_local_image_ctx, 123, 0);
+
+ C_SaferCond shutdown_ctx;
+ mock_replayer.shut_down(&shutdown_ctx);
+ ASSERT_EQ(-EINVAL, shutdown_ctx.wait());
+}
+
+TEST_F(TestMockImageReplayerSnapshotReplayer, UnregisterLocalUpdateWatcherError) {
librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
// shut down
- expect_unregister_update_watcher(mock_remote_image_ctx, 123, -EINVAL);
+ expect_unregister_update_watcher(mock_remote_image_ctx, 234, 0);
+ expect_unregister_update_watcher(mock_local_image_ctx, 123, -EINVAL);
C_SaferCond shutdown_ctx;
mock_replayer.shut_down(&shutdown_ctx);
ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_image_ctx,
mock_remote_image_ctx));
}
ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_image_ctx,
mock_remote_image_ctx));
}
ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_image_ctx,
mock_remote_image_ctx));
}
ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_image_ctx,
mock_remote_image_ctx));
}
ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_image_ctx,
mock_remote_image_ctx));
}
ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_image_ctx,
mock_remote_image_ctx));
}
ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_image_ctx,
mock_remote_image_ctx));
}
ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_image_ctx,
mock_remote_image_ctx));
}
}
void handle_notify() override {
- replayer->handle_remote_image_update_notify();
+ replayer->handle_image_update_notify();
}
};
ceph_assert(m_on_init_shutdown == nullptr);
m_on_init_shutdown = on_finish;
- register_update_watcher();
+ register_local_update_watcher();
}
template <typename I>
}
locker.unlock();
- unregister_update_watcher();
+ unregister_remote_update_watcher();
}
template <typename I>
dout(10) << dendl;
// reset state in case new snapshot is added while we are scanning
- m_remote_image_updated = false;
+ m_image_updated = false;
bool remote_demoted = false;
auto remote_image_ctx = m_state_builder->remote_image_ctx;
}
}
- if (m_remote_image_updated) {
+ if (m_image_updated) {
// received update notification while scanning image, restart ...
- m_remote_image_updated = false;
+ m_image_updated = false;
locker->unlock();
dout(10) << "restarting snapshot scan due to remote update notification"
}
template <typename I>
-void Replayer<I>::register_update_watcher() {
+void Replayer<I>::register_local_update_watcher() {
dout(10) << dendl;
m_update_watch_ctx = new C_UpdateWatchCtx(this);
- int r = m_state_builder->remote_image_ctx->state->register_update_watcher(
- m_update_watch_ctx, &m_update_watcher_handle);
+
+ int r = m_state_builder->local_image_ctx->state->register_update_watcher(
+ m_update_watch_ctx, &m_local_update_watcher_handle);
auto ctx = create_context_callback<
- Replayer<I>, &Replayer<I>::handle_register_update_watcher>(this);
+ Replayer<I>, &Replayer<I>::handle_register_local_update_watcher>(this);
m_threads->work_queue->queue(ctx, r);
}
template <typename I>
-void Replayer<I>::handle_register_update_watcher(int r) {
+void Replayer<I>::handle_register_local_update_watcher(int r) {
dout(10) << "r=" << r << dendl;
if (r < 0) {
- derr << "failed to register update watcher: " << cpp_strerror(r) << dendl;
- handle_replay_complete(r, "failed to register remote image update watcher");
+ derr << "failed to register local update watcher: " << cpp_strerror(r)
+ << dendl;
+ handle_replay_complete(r, "failed to register local image update watcher");
m_state = STATE_COMPLETE;
delete m_update_watch_ctx;
m_update_watch_ctx = nullptr;
- } else {
- m_state = STATE_REPLAYING;
+
+ Context* on_init = nullptr;
+ std::swap(on_init, m_on_init_shutdown);
+ on_init->complete(r);
+ return;
+ }
+
+ register_remote_update_watcher();
+}
+
+template <typename I>
+void Replayer<I>::register_remote_update_watcher() {
+ dout(10) << dendl;
+
+ int r = m_state_builder->remote_image_ctx->state->register_update_watcher(
+ m_update_watch_ctx, &m_remote_update_watcher_handle);
+ auto ctx = create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_register_remote_update_watcher>(this);
+ m_threads->work_queue->queue(ctx, r);
+}
+
+template <typename I>
+void Replayer<I>::handle_register_remote_update_watcher(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ derr << "failed to register remote update watcher: " << cpp_strerror(r)
+ << dendl;
+ handle_replay_complete(r, "failed to register remote image update watcher");
+ m_state = STATE_COMPLETE;
+
+ unregister_local_update_watcher();
+ return;
}
+ m_state = STATE_REPLAYING;
+
Context* on_init = nullptr;
std::swap(on_init, m_on_init_shutdown);
- on_init->complete(r);
+ on_init->complete(0);
// delay initial snapshot scan until after we have alerted
// image replayer that we have initialized in case an error
// occurs
- if (r >= 0) {
- {
- std::unique_lock locker{m_lock};
- notify_status_updated();
- }
-
- refresh_local_image();
+ {
+ std::unique_lock locker{m_lock};
+ notify_status_updated();
}
+
+ refresh_local_image();
}
template <typename I>
-void Replayer<I>::unregister_update_watcher() {
+void Replayer<I>::unregister_remote_update_watcher() {
dout(10) << dendl;
auto ctx = create_context_callback<
Replayer<I>,
- &Replayer<I>::handle_unregister_update_watcher>(this);
+ &Replayer<I>::handle_unregister_remote_update_watcher>(this);
m_state_builder->remote_image_ctx->state->unregister_update_watcher(
- m_update_watcher_handle, ctx);
+ m_remote_update_watcher_handle, ctx);
}
template <typename I>
-void Replayer<I>::handle_unregister_update_watcher(int r) {
+void Replayer<I>::handle_unregister_remote_update_watcher(int r) {
dout(10) << "r=" << r << dendl;
if (r < 0) {
- derr << "failed to unregister update watcher: " << cpp_strerror(r) << dendl;
+ derr << "failed to unregister remote update watcher: " << cpp_strerror(r)
+ << dendl;
handle_replay_complete(
r, "failed to unregister remote image update watcher");
}
+ unregister_local_update_watcher();
+}
+
+template <typename I>
+void Replayer<I>::unregister_local_update_watcher() {
+ dout(10) << dendl;
+
+ auto ctx = create_context_callback<
+ Replayer<I>,
+ &Replayer<I>::handle_unregister_local_update_watcher>(this);
+ m_state_builder->local_image_ctx->state->unregister_update_watcher(
+ m_local_update_watcher_handle, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_unregister_local_update_watcher(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ derr << "failed to unregister local update watcher: " << cpp_strerror(r)
+ << dendl;
+ handle_replay_complete(
+ r, "failed to unregister local image update watcher");
+ }
+
delete m_update_watch_ctx;
m_update_watch_ctx = nullptr;
}
template <typename I>
-void Replayer<I>::handle_remote_image_update_notify() {
+void Replayer<I>::handle_image_update_notify() {
dout(10) << dendl;
std::unique_lock locker{m_lock};
if (m_state == STATE_REPLAYING) {
dout(15) << "flagging snapshot rescan required" << dendl;
- m_remote_image_updated = true;
+ m_image_updated = true;
} else if (m_state == STATE_IDLE) {
m_state = STATE_REPLAYING;
locker.unlock();
locker->unlock();
dout(10) << "resuming pending shut down" << dendl;
- unregister_update_watcher();
+ unregister_remote_update_watcher();
return true;
}
return false;
* <init>
* |
* v
- * REGISTER_UPDATE_WATCHER
+ * REGISTER_LOCAL_UPDATE_WATCHER
+ * |
+ * v
+ * REGISTER_REMOTE_UPDATE_WATCHER
* |
* v (skip if not needed)
* REFRESH_LOCAL_IMAGE <------------------------------\
* <shut down>
* |
* v
- * UNREGISTER_UPDATE_WATCHER
+ * UNREGISTER_REMOTE_UPDATE_WATCHER
+ * |
+ * v
+ * UNREGISTER_LOCAL_UPDATE_WATCHER
* |
* v
* WAIT_FOR_IN_FLIGHT_OPS
std::string m_error_description;
C_UpdateWatchCtx* m_update_watch_ctx;
- uint64_t m_update_watcher_handle = 0;
+ uint64_t m_local_update_watcher_handle = 0;
+ uint64_t m_remote_update_watcher_handle = 0;
+ bool m_image_updated = false;
AsyncOpTracker m_in_flight_op_tracker;
void unlink_peer();
void handle_unlink_peer(int r);
- void register_update_watcher();
- void handle_register_update_watcher(int r);
+ void register_local_update_watcher();
+ void handle_register_local_update_watcher(int r);
+
+ void register_remote_update_watcher();
+ void handle_register_remote_update_watcher(int r);
+
+ void unregister_remote_update_watcher();
+ void handle_unregister_remote_update_watcher(int r);
- void unregister_update_watcher();
- void handle_unregister_update_watcher(int r);
+ void unregister_local_update_watcher();
+ void handle_unregister_local_update_watcher(int r);
void wait_for_in_flight_ops();
void handle_wait_for_in_flight_ops(int r);
- void handle_remote_image_update_notify();
+ void handle_image_update_notify();
void handle_replay_complete(int r, const std::string& description);
void handle_replay_complete(std::unique_lock<ceph::mutex>* locker,