}
void expect_mirror_status_update(
- const MirrorImageSiteStatuses& mirror_image_site_statuses, int r) {
+ const MirrorImageSiteStatuses& mirror_image_site_statuses,
+ const std::string& site_name, const std::string& fsid, int r) {
EXPECT_CALL(*m_mock_local_io_ctx, aio_operate(_, _, _, _, _))
.WillOnce(Invoke([this](auto&&... args) {
int r = m_mock_local_io_ctx->do_aio_operate(decltype(args)(args)...);
return r;
}));
- for (auto& [global_image_id, mirror_image_status] :
+ if (!site_name.empty()) {
+ // status updates to remote site include ping
+ bufferlist in_bl;
+ encode(site_name, in_bl);
+ encode(fsid, in_bl);
+ encode(static_cast<uint8_t>(cls::rbd::MIRROR_PEER_DIRECTION_TX), in_bl);
+ EXPECT_CALL(*m_mock_local_io_ctx,
+ exec(RBD_MIRRORING, _, StrEq("rbd"),
+ StrEq("mirror_peer_ping"), ContentsEqual(in_bl), _, _))
+ .WillOnce(Return(0));
+ }
+
+ for (auto [global_image_id, mirror_image_status] :
mirror_image_site_statuses) {
+ mirror_image_status.fsid = fsid;
expect_mirror_status_update(global_image_id, mirror_image_status, r);
if (r < 0) {
break;
TEST_F(TestMockMirrorStatusUpdater, InitShutDown) {
MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
- m_mock_threads);
+ m_mock_threads, "");
MockMirrorStatusWatcher* mock_mirror_status_watcher =
new MockMirrorStatusWatcher();
TEST_F(TestMockMirrorStatusUpdater, InitStatusWatcherError) {
MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
- m_mock_threads);
+ m_mock_threads, "");
MockMirrorStatusWatcher* mock_mirror_status_watcher =
new MockMirrorStatusWatcher();
TEST_F(TestMockMirrorStatusUpdater, ShutDownStatusWatcherError) {
MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
- m_mock_threads);
+ m_mock_threads, "");
MockMirrorStatusWatcher* mock_mirror_status_watcher =
new MockMirrorStatusWatcher();
TEST_F(TestMockMirrorStatusUpdater, SmallBatch) {
MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
- m_mock_threads);
+ m_mock_threads, "");
MockMirrorStatusWatcher* mock_mirror_status_watcher =
new MockMirrorStatusWatcher();
Context* update_task = nullptr;
fire_timer_event(&timer_event, &update_task);
- expect_mirror_status_update(mirror_image_site_statuses, 0);
+ expect_mirror_status_update(mirror_image_site_statuses, "", "", 0);
update_task->complete(0);
shut_down_mirror_status_updater(mock_mirror_status_updater,
TEST_F(TestMockMirrorStatusUpdater, LargeBatch) {
MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
- m_mock_threads);
+ m_mock_threads, "");
MockMirrorStatusWatcher* mock_mirror_status_watcher =
new MockMirrorStatusWatcher();
Context* update_task = nullptr;
fire_timer_event(&timer_event, &update_task);
- expect_mirror_status_update(mirror_image_site_statuses_1, 0);
- expect_mirror_status_update(mirror_image_site_statuses_2, 0);
+ expect_mirror_status_update(mirror_image_site_statuses_1, "", "", 0);
+ expect_mirror_status_update(mirror_image_site_statuses_2, "", "", 0);
update_task->complete(0);
shut_down_mirror_status_updater(mock_mirror_status_updater,
TEST_F(TestMockMirrorStatusUpdater, OverwriteStatus) {
MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
- m_mock_threads);
+ m_mock_threads, "");
MockMirrorStatusWatcher* mock_mirror_status_watcher =
new MockMirrorStatusWatcher();
expect_mirror_status_update(
{{"1", cls::rbd::MirrorImageSiteStatus{
- "", cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING, "description"}}}, 0);
+ "", cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING, "description"}}},
+ "", "", 0);
update_task->complete(0);
shut_down_mirror_status_updater(mock_mirror_status_updater,
TEST_F(TestMockMirrorStatusUpdater, OverwriteStatusInFlight) {
MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
- m_mock_threads);
+ m_mock_threads, "");
MockMirrorStatusWatcher* mock_mirror_status_watcher =
new MockMirrorStatusWatcher();
expect_work_queue(false);
expect_mirror_status_update(
{{"1", cls::rbd::MirrorImageSiteStatus{
- "", cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING, "description"}}}, 0);
+ "", cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING, "description"}}},
+ "", "", 0);
update_task->complete(0);
TEST_F(TestMockMirrorStatusUpdater, ImmediateUpdate) {
MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
- m_mock_threads);
+ m_mock_threads, "");
MockMirrorStatusWatcher* mock_mirror_status_watcher =
new MockMirrorStatusWatcher();
*mock_mirror_status_watcher, &timer_event);
expect_work_queue(false);
- expect_mirror_status_update({{"1", cls::rbd::MirrorImageSiteStatus{}}}, 0);
+ expect_mirror_status_update({{"1", cls::rbd::MirrorImageSiteStatus{}}},
+ "", "", 0);
mock_mirror_status_updater.set_mirror_image_status("1", {}, true);
shut_down_mirror_status_updater(mock_mirror_status_updater,
TEST_F(TestMockMirrorStatusUpdater, RemoveIdleStatus) {
MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
- m_mock_threads);
+ m_mock_threads, "");
MockMirrorStatusWatcher* mock_mirror_status_watcher =
new MockMirrorStatusWatcher();
TEST_F(TestMockMirrorStatusUpdater, RemoveInFlightStatus) {
MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
- m_mock_threads);
+ m_mock_threads, "");
MockMirrorStatusWatcher* mock_mirror_status_watcher =
new MockMirrorStatusWatcher();
TEST_F(TestMockMirrorStatusUpdater, ShutDownWhileUpdating) {
MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
- m_mock_threads);
+ m_mock_threads, "");
MockMirrorStatusWatcher* mock_mirror_status_watcher =
new MockMirrorStatusWatcher();
ASSERT_EQ(0, on_shutdown.wait());
}
+TEST_F(TestMockMirrorStatusUpdater, MirrorPeerSitePing) {
+ std::string fsid;
+ ASSERT_EQ(0, _rados->cluster_fsid(&fsid));
+
+ MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
+ m_mock_threads, "siteA");
+ MockMirrorStatusWatcher* mock_mirror_status_watcher =
+ new MockMirrorStatusWatcher();
+
+ InSequence seq;
+
+ Context* timer_event = nullptr;
+ init_mirror_status_updater(mock_mirror_status_updater,
+ *mock_mirror_status_watcher, &timer_event);
+
+ MirrorImageSiteStatuses mirror_image_site_statuses;
+ for (auto i = 0; i < 100; ++i) {
+ auto pair = mirror_image_site_statuses.emplace(
+ stringify(i), cls::rbd::MirrorImageSiteStatus{});
+ mock_mirror_status_updater.set_mirror_image_status(pair.first->first,
+ pair.first->second,
+ false);
+ }
+
+ Context* update_task = nullptr;
+ fire_timer_event(&timer_event, &update_task);
+
+ expect_mirror_status_update(mirror_image_site_statuses, "siteA", fsid, 0);
+ update_task->complete(0);
+
+ shut_down_mirror_status_updater(mock_mirror_status_updater,
+ *mock_mirror_status_watcher);
+}
+
} // namespace mirror
} // namespace rbd
template <typename I>
MirrorStatusUpdater<I>::MirrorStatusUpdater(
- librados::IoCtx& io_ctx, Threads<I> *threads)
- : m_io_ctx(io_ctx), m_threads(threads),
+ librados::IoCtx& io_ctx, Threads<I> *threads,
+ const std::string& site_name)
+ : m_io_ctx(io_ctx), m_threads(threads), m_site_name(site_name),
m_lock(ceph::make_mutex("rbd::mirror::MirrorStatusUpdater " +
stringify(m_io_ctx.get_id()))) {
- dout(10) << "pool_id=" << m_io_ctx.get_id() << dendl;
+ dout(10) << "site_name=" << site_name << ", "
+ << "pool_id=" << m_io_ctx.get_id() << dendl;
}
template <typename I>
void MirrorStatusUpdater<I>::init(Context* on_finish) {
dout(10) << dendl;
+ if (!m_site_name.empty()) {
+ librados::Rados rados(m_io_ctx);
+ int r = rados.cluster_fsid(&m_fsid);
+ if (r < 0) {
+ derr << "failed to retrieve fsid: " << cpp_strerror(r) << dendl;
+ m_threads->work_queue->queue(on_finish, r);
+ return;
+ }
+ }
+
ceph_assert(!m_initialized);
m_initialized = true;
librados::ObjectWriteOperation op;
uint32_t op_count = 0;
+ if (!m_site_name.empty()) {
+ // updates to remote sites should include local site name
+ // to ensure status includes this peer
+ librbd::cls_client::mirror_peer_ping(&op, m_site_name, m_fsid);
+ }
+
while (it != updating_global_image_ids.end() &&
op_count < MAX_UPDATES_PER_OP) {
auto& global_image_id = *it;
continue;
}
+ status_it->second.fsid = m_fsid;
librbd::cls_client::mirror_image_status_set(&op, global_image_id,
status_it->second);
++op_count;