"global image id"));
}
+ void expect_schedule_image_delete(MockImageDeleter& mock_image_deleter,
+ const std::string& global_image_id,
+ bool ignore_orphan) {
+ EXPECT_CALL(mock_image_deleter,
+ schedule_image_delete(_, _, global_image_id, ignore_orphan));
+ }
+
bufferlist encode_tag_data(const librbd::journal::TagData &tag_data) {
bufferlist bl;
::encode(tag_data, bl);
"remote mirror uuid", 0);
expect_send(mock_prepare_remote_image_request, "remote mirror uuid",
"", -ENOENT);
+ expect_schedule_image_delete(mock_image_deleter, "global image id", false);
create_image_replayer(mock_threads, mock_image_deleter);
ASSERT_EQ(0, stop_ctx.wait());
}
+
} // namespace mirror
} // namespace rbd
MOCK_METHOD0(is_stopped, bool());
MOCK_METHOD0(is_blacklisted, bool());
+ MOCK_CONST_METHOD0(is_finished, bool());
+ MOCK_METHOD1(set_finished, void(bool));
+
MOCK_CONST_METHOD0(get_health_state, image_replayer::HealthState());
};
C_SaferCond on_acquire;
EXPECT_CALL(mock_image_replayer, add_peer("peer_uuid", _));
+ EXPECT_CALL(mock_image_replayer, set_finished(false));
EXPECT_CALL(mock_image_replayer, is_stopped()).WillOnce(Return(true));
EXPECT_CALL(mock_image_replayer, is_blacklisted()).WillOnce(Return(false));
+ EXPECT_CALL(mock_image_replayer, is_finished()).WillOnce(Return(false));
EXPECT_CALL(mock_image_replayer, start(nullptr, false));
expect_work_queue(mock_threads);
delete timer_ctx;
}
+TEST_F(TestMockInstanceReplayer, RemoveFinishedImage) {
+ MockThreads mock_threads(m_threads);
+ MockServiceDaemon mock_service_daemon;
+ MockImageDeleter mock_image_deleter;
+ MockInstanceWatcher mock_instance_watcher;
+ MockImageReplayer mock_image_replayer;
+ MockInstanceReplayer instance_replayer(
+ &mock_threads, &mock_service_daemon, &mock_image_deleter,
+ rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
+ "local_mirror_uuid", m_local_io_ctx.get_id());
+ std::string global_image_id("global_image_id");
+
+ EXPECT_CALL(mock_image_replayer, get_global_image_id())
+ .WillRepeatedly(ReturnRef(global_image_id));
+
+ InSequence seq;
+ expect_work_queue(mock_threads);
+ Context *timer_ctx1 = nullptr;
+ expect_add_event_after(mock_threads, &timer_ctx1);
+ instance_replayer.init();
+ instance_replayer.add_peer("peer_uuid", m_remote_io_ctx);
+
+ // Acquire
+
+ C_SaferCond on_acquire;
+ EXPECT_CALL(mock_image_replayer, add_peer("peer_uuid", _));
+ EXPECT_CALL(mock_image_replayer, set_finished(false));
+ EXPECT_CALL(mock_image_replayer, is_stopped()).WillOnce(Return(true));
+ EXPECT_CALL(mock_image_replayer, is_blacklisted()).WillOnce(Return(false));
+ EXPECT_CALL(mock_image_replayer, is_finished()).WillOnce(Return(false));
+ EXPECT_CALL(mock_image_replayer, start(nullptr, false));
+ expect_work_queue(mock_threads);
+
+ instance_replayer.acquire_image(&mock_instance_watcher, global_image_id,
+ &on_acquire);
+ ASSERT_EQ(0, on_acquire.wait());
+
+ // periodic start timer
+ Context *timer_ctx2 = nullptr;
+ expect_add_event_after(mock_threads, &timer_ctx2);
+
+ Context *start_image_replayers_ctx = nullptr;
+ EXPECT_CALL(*mock_threads.work_queue, queue(_, 0))
+ .WillOnce(Invoke([&start_image_replayers_ctx](Context *ctx, int r) {
+ start_image_replayers_ctx = ctx;
+ }));
+
+ ASSERT_TRUE(timer_ctx1 != nullptr);
+ {
+ Mutex::Locker timer_locker(mock_threads.timer_lock);
+ timer_ctx1->complete(0);
+ }
+
+ // remove finished image replayer
+ EXPECT_CALL(mock_image_replayer, get_health_state()).WillOnce(
+ Return(image_replayer::HEALTH_STATE_OK));
+ EXPECT_CALL(mock_image_replayer, is_stopped()).WillOnce(Return(true));
+ EXPECT_CALL(mock_image_replayer, is_blacklisted()).WillOnce(Return(false));
+ EXPECT_CALL(mock_image_replayer, is_finished()).WillOnce(Return(true));
+ EXPECT_CALL(mock_image_replayer, destroy());
+ EXPECT_CALL(mock_service_daemon,add_or_update_attribute(_, _, _)).Times(3);
+
+ ASSERT_TRUE(start_image_replayers_ctx != nullptr);
+ start_image_replayers_ctx->complete(0);
+
+ // shut down
+ expect_work_queue(mock_threads);
+ expect_cancel_event(mock_threads, true);
+ expect_work_queue(mock_threads);
+ instance_replayer.shut_down();
+ ASSERT_TRUE(timer_ctx2 != nullptr);
+ delete timer_ctx2;
+}
} // namespace mirror
} // namespace rbd
m_last_r = 0;
m_state_desc.clear();
m_manual_stop = false;
+ m_delete_requested = false;
if (on_finish != nullptr) {
assert(m_on_start_finish == nullptr);
void ImageReplayer<I>::prepare_local_image() {
dout(20) << dendl;
+ m_local_image_id = "";
Context *ctx = create_context_callback<
ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
auto req = PrepareLocalImageRequest<I>::create(
template <typename I>
void ImageReplayer<I>::prepare_remote_image() {
dout(20) << dendl;
- if (m_peers.empty()) {
- on_start_fail(-EREMOTEIO, "waiting for primary remote image");
- return;
- }
- // TODO bootstrap will need to support multiple remote images
+ // TODO need to support multiple remote images
+ assert(!m_peers.empty());
m_remote_image = {*m_peers.begin()};
Context *ctx = create_context_callback<
if (r == -ENOENT) {
dout(20) << "remote image does not exist" << dendl;
+ // TODO need to support multiple remote images
if (!m_local_image_id.empty() &&
m_local_image_tag_owner == m_remote_image.mirror_uuid) {
// local image exists and is non-primary and linked to the missing
// remote image
- // TODO schedule image deletion
+ m_delete_requested = true;
on_start_fail(0, "remote image no longer exists");
} else {
on_start_fail(-ENOENT, "remote image does not exist");
return;
}
- if (m_resync_requested) {
+ bool delete_requested = false;
+ if (m_delete_requested && !m_local_image_id.empty()) {
+ assert(m_remote_image.image_id.empty());
+ dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
+ delete_requested = true;
+ }
+ if (delete_requested || m_resync_requested) {
m_image_deleter->schedule_image_delete(m_local,
m_local_pool_id,
m_global_image_id,
- true);
+ m_resync_requested);
m_resync_requested = false;
+ } else if (m_last_r == -ENOENT &&
+ m_local_image_id.empty() && m_remote_image.image_id.empty()) {
+ dout(0) << "mirror image no longer exists" << dendl;
+ m_finished = true;
}
}
std::string get_name() { Mutex::Locker l(m_lock); return m_name; };
void set_state_description(int r, const std::string &desc);
+ // TODO temporary until policy handles release of image replayers
+ inline bool is_finished() const {
+ Mutex::Locker locker(m_lock);
+ return m_finished;
+ }
+ inline void set_finished(bool finished) {
+ Mutex::Locker locker(m_lock);
+ m_finished = finished;
+ }
+
inline bool is_blacklisted() const {
Mutex::Locker locker(m_lock);
return (m_last_r == -EBLACKLISTED);
std::string m_local_image_id;
std::string m_global_image_id;
std::string m_name;
+
mutable Mutex m_lock;
State m_state = STATE_STOPPED;
std::string m_state_desc;
int m_last_r = 0;
BootstrapProgressContext m_progress_cxt;
+
+ bool m_finished = false;
+ bool m_delete_requested = false;
bool m_resync_requested = false;
+
image_replayer::EventPreprocessor<ImageCtxT> *m_event_preprocessor = nullptr;
image_replayer::ReplayStatusFormatter<ImageCtxT> *m_replay_status_formatter =
nullptr;
image_replayer->add_peer(peer.peer_uuid, peer.io_ctx);
}
- start_image_replayer(it->second);
+ auto& image_replayer = it->second;
+ // TODO temporary until policy integrated
+ image_replayer->set_finished(false);
+
+ start_image_replayer(image_replayer);
m_threads->work_queue->queue(on_finish, 0);
}
dout(20) << "global_image_id=" << global_image_id << ", "
<< "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
- // TODO
+ Mutex::Locker locker(m_lock);
+ assert(m_on_shut_down == nullptr);
+
+ auto it = m_image_replayers.find(global_image_id);
+ if (it != m_image_replayers.end()) {
+ // TODO only a single peer is currently supported, therefore
+ // we can just interrupt the current image replayer and
+ // it will eventually detect that the peer image is missing and
+ // determine if a delete propagation is required.
+ auto image_replayer = it->second;
+ image_replayer->restart();
+ }
m_threads->work_queue->queue(on_finish, 0);
}
} else if (image_replayer->is_blacklisted()) {
derr << "blacklisted detected during image replay" << dendl;
return;
+ } else if (image_replayer->is_finished()) {
+ // TODO temporary until policy integrated
+ dout(5) << "removing image replayer for global_image_id="
+ << global_image_id << dendl;
+ m_image_replayers.erase(image_replayer->get_global_image_id());
+ image_replayer->destroy();
+ return;
}
image_replayer->start(nullptr, false);
size_t image_count = 0;
size_t warning_count = 0;
size_t error_count = 0;
- for (auto &it : m_image_replayers) {
+ for (auto it = m_image_replayers.begin();
+ it != m_image_replayers.end();) {
+ auto current_it(it);
+ ++it;
+
++image_count;
- auto health_state = it.second->get_health_state();
+ auto health_state = current_it->second->get_health_state();
if (health_state == image_replayer::HEALTH_STATE_WARNING) {
++warning_count;
} else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
++error_count;
}
- start_image_replayer(it.second);
+ start_image_replayer(current_it->second);
}
m_service_daemon->add_or_update_attribute(