void expect_add_event(MockThreads &mock_threads) {
EXPECT_CALL(*mock_threads.timer, add_event_after(_,_))
.WillOnce(DoAll(WithArg<1>(Invoke([this](Context *ctx) {
+ auto wrapped_ctx = new FunctionContext([this, ctx](int r) {
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ ctx->complete(r);
+ });
+ m_threads->work_queue->queue(wrapped_ctx, 0);
+ })), ReturnArg<1>()));
+ }
+
+ void expect_rebalance_event(MockThreads &mock_threads) {
+ EXPECT_CALL(*mock_threads.timer, add_event_after(_,_))
+ .WillOnce(DoAll(WithArg<1>(Invoke([this](Context *ctx) {
+ // disable rebalance so as to not reschedule it again
+ CephContext *cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
+ cct->_conf->set_val("rbd_mirror_image_policy_rebalance_timeout", "0");
+
auto wrapped_ctx = new FunctionContext([this, ctx](int r) {
Mutex::Locker timer_locker(m_threads->timer_lock);
ctx->complete(r);
ASSERT_EQ(0, when_shut_down(mock_image_map.get()));
}
+TEST_F(TestMockImageMap, RebalanceImageMap) {
+ MockThreads mock_threads(m_threads);
+ expect_work_queue(mock_threads);
+
+ InSequence seq;
+
+ MockLoadRequest mock_load_request;
+ expect_load_request(mock_load_request, 0);
+
+ MockListener mock_listener(this);
+
+ std::unique_ptr<MockImageMap> mock_image_map{
+ MockImageMap::create(m_local_io_ctx, &mock_threads, m_local_instance_id,
+ mock_listener)};
+
+ C_SaferCond cond;
+ mock_image_map->init(&cond);
+ ASSERT_EQ(0, cond.wait());
+
+ std::set<std::string> global_image_ids{
+ "global id 1", "global id 2", "global id 3", "global id 4", "global id 5",
+ "global id 6", "global id 7", "global id 8", "global id 9", "global id 10",
+ };
+ std::set<std::string> global_image_ids_ack(global_image_ids);
+
+ // UPDATE_MAPPING+ACQUIRE
+ expect_add_event(mock_threads);
+ MockUpdateRequest mock_update_request;
+ expect_update_request(mock_update_request, 0);
+ expect_add_event(mock_threads);
+ std::map<std::string, Context*> peer_ack_ctxs;
+ listener_acquire_images(mock_listener, global_image_ids,
+ &peer_ack_ctxs);
+
+ // initial image list
+ mock_image_map->update_images("", std::move(global_image_ids), {});
+
+ ASSERT_TRUE(wait_for_map_update(1));
+ ASSERT_TRUE(wait_for_listener_notify(global_image_ids_ack.size()));
+
+ // remote peer ACKs image acquire request
+ remote_peer_ack_nowait(mock_image_map.get(), global_image_ids_ack, 0,
+ &peer_ack_ctxs);
+ wait_for_scheduled_task();
+
+ mock_image_map->update_instances_added({m_local_instance_id});
+
+ std::set<std::string> shuffled_global_image_ids;
+
+ // RELEASE+UPDATE_MAPPING+ACQUIRE
+ expect_add_event(mock_threads);
+ expect_listener_images_unmapped(mock_listener, 5, &shuffled_global_image_ids,
+ &peer_ack_ctxs);
+
+ mock_image_map->update_instances_added({"9876"});
+
+ wait_for_scheduled_task();
+ ASSERT_TRUE(wait_for_listener_notify(shuffled_global_image_ids.size()));
+
+ update_map_and_acquire(mock_threads, mock_update_request,
+ mock_listener, shuffled_global_image_ids, 0,
+ &peer_ack_ctxs);
+ remote_peer_ack_listener_wait(mock_image_map.get(), shuffled_global_image_ids,
+ 0, &peer_ack_ctxs);
+
+ // completion shuffle action for now (re)mapped images
+ remote_peer_ack_nowait(mock_image_map.get(), shuffled_global_image_ids, 0,
+ &peer_ack_ctxs);
+
+ wait_for_scheduled_task();
+
+ // remove all shuffled images -- make way for rebalance
+ std::set<std::string> shuffled_global_image_ids_ack(shuffled_global_image_ids);
+
+ // RELEASE+REMOVE_MAPPING
+ expect_add_event(mock_threads);
+ listener_release_images(mock_listener, shuffled_global_image_ids,
+ &peer_ack_ctxs);
+ update_map_request(mock_threads, mock_update_request, shuffled_global_image_ids,
+ 0);
+
+ mock_image_map->update_images("", {}, std::move(shuffled_global_image_ids));
+ ASSERT_TRUE(wait_for_listener_notify(shuffled_global_image_ids_ack.size()));
+
+ remote_peer_ack_wait(mock_image_map.get(), shuffled_global_image_ids_ack, 0,
+ &peer_ack_ctxs);
+ wait_for_scheduled_task();
+
+ shuffled_global_image_ids.clear();
+ shuffled_global_image_ids_ack.clear();
+
+ std::set<std::string> new_global_image_ids = {
+ "global id 11"
+ };
+ std::set<std::string> new_global_image_ids_ack(new_global_image_ids);
+
+ expect_add_event(mock_threads);
+ expect_update_request(mock_update_request, 0);
+ expect_add_event(mock_threads);
+ listener_acquire_images(mock_listener, new_global_image_ids, &peer_ack_ctxs);
+
+ expect_rebalance_event(mock_threads); // rebalance task
+ expect_add_event(mock_threads); // update task scheduled by
+ // rebalance task
+ expect_listener_images_unmapped(mock_listener, 2, &shuffled_global_image_ids,
+ &peer_ack_ctxs);
+
+ mock_image_map->update_images("", std::move(new_global_image_ids), {});
+
+ ASSERT_TRUE(wait_for_map_update(1));
+ ASSERT_TRUE(wait_for_listener_notify(new_global_image_ids_ack.size()));
+
+ // set rebalance interval
+ CephContext *cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
+ cct->_conf->set_val("rbd_mirror_image_policy_rebalance_timeout", "5");
+ remote_peer_ack_nowait(mock_image_map.get(), new_global_image_ids_ack, 0,
+ &peer_ack_ctxs);
+
+ wait_for_scheduled_task();
+ ASSERT_TRUE(wait_for_listener_notify(shuffled_global_image_ids.size()));
+
+ update_map_and_acquire(mock_threads, mock_update_request,
+ mock_listener, shuffled_global_image_ids, 0,
+ &peer_ack_ctxs);
+ remote_peer_ack_listener_wait(mock_image_map.get(), shuffled_global_image_ids,
+ 0, &peer_ack_ctxs);
+
+ // completion shuffle action for now (re)mapped images
+ remote_peer_ack_nowait(mock_image_map.get(), shuffled_global_image_ids, 0,
+ &peer_ack_ctxs);
+
+ wait_for_scheduled_task();
+ ASSERT_EQ(0, when_shut_down(mock_image_map.get()));
+}
+
} // namespace mirror
} // namespace rbd