MOCK_METHOD1(wait_for_journal_ready, void(Context *));
MOCK_CONST_METHOD0(is_tag_owner, bool());
- MOCK_METHOD2(allocate_tag, void(const std::string &, Context *));
+ MOCK_METHOD6(allocate_tag, void(const std::string &mirror_uuid,
+ const std::string &predecessor_mirror_uuid,
+ bool predecessor_commit_valid,
+ uint64_t predecessor_tag_tid,
+ uint64_t predecessor_entry_tid,
+ Context *on_finish));
MOCK_METHOD1(open, void(Context *));
MOCK_METHOD1(close, void(Context *));
std::set<cls::journal::Client> *,
Context*));
- MOCK_METHOD1(try_pop_front, bool(MockReplayEntryProxy *));
+ MOCK_METHOD2(try_pop_front, bool(MockReplayEntryProxy *, uint64_t *));
MOCK_METHOD2(start_live_replay, void(ReplayHandler *, double));
MOCK_METHOD0(stop_replay, void());
MOCK_METHOD1(flush_commit_position, void(Context*));
MOCK_METHOD2(update_client, void(const bufferlist&, Context *on_safe));
+
+ MOCK_METHOD3(get_tag, void(uint64_t, cls::journal::Tag *, Context *));
};
struct MockJournalerProxy {
on_finish);
}
- bool try_pop_front(MockReplayEntryProxy *entry) {
- return MockJournaler::get_instance().try_pop_front(entry);
+ bool try_pop_front(MockReplayEntryProxy *entry, uint64_t *tag_tid) {
+ return MockJournaler::get_instance().try_pop_front(entry, tag_tid);
}
void start_live_replay(ReplayHandler *handler, double interval) {
MockJournaler::get_instance().start_live_replay(handler, interval);
void update_client(const bufferlist& data, Context *on_safe) {
MockJournaler::get_instance().update_client(data, on_safe);
}
+
+ void get_tag(uint64_t tag_tid, cls::journal::Tag *tag, Context *on_finish) {
+ MockJournaler::get_instance().get_tag(tag_tid, tag, on_finish);
+ }
};
std::ostream &operator<<(std::ostream &os, const MockJournalerProxy &);
}
};
- TestImageReplayer() : m_client_id("TestImageReplayer"), m_watch_handle(0)
+ TestImageReplayer() : m_watch_handle(0)
{
EXPECT_EQ("", connect_cluster_pp(m_local_cluster));
m_replayer = new ImageReplayerT(m_threads,
rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
rbd::mirror::RadosRef(new librados::Rados(m_remote_ioctx)),
- m_client_id, m_local_ioctx.get_id(), m_remote_pool_id, m_remote_image_id,
- "global image id");
+ m_local_mirror_uuid, m_remote_mirror_uuid, m_local_ioctx.get_id(),
+ m_remote_pool_id, m_remote_image_id, "global image id");
}
void start(rbd::mirror::ImageReplayer<>::BootstrapParams *bootstap_params =
cls::journal::ObjectPosition *mirror_position)
{
std::string master_client_id = "";
- std::string mirror_client_id = m_client_id;
+ std::string mirror_client_id = m_local_mirror_uuid;
C_SaferCond cond;
uint64_t minimum_set;
rbd::mirror::Threads *m_threads = nullptr;
librados::Rados m_local_cluster, m_remote_cluster;
- std::string m_client_id;
+ std::string m_local_mirror_uuid = "local mirror uuid";
+ std::string m_remote_mirror_uuid = "remote mirror uuid";
std::string m_local_pool_name, m_remote_pool_name;
librados::IoCtx m_local_ioctx, m_remote_ioctx;
std::string m_image_name;
public:
ImageReplayer(rbd::mirror::Threads *threads,
rbd::mirror::RadosRef local, rbd::mirror::RadosRef remote,
- const std::string &client_id, int64_t local_pool_id,
+ const std::string &local_mirror_uuid,
+ const std::string &remote_mirror_uuid,
+ int64_t local_pool_id,
int64_t remote_pool_id, const std::string &remote_image_id,
const std::string &global_image_id)
- : rbd::mirror::ImageReplayer<>(threads, local, remote, client_id,
- local_pool_id, remote_pool_id,
- remote_image_id, global_image_id)
+ : rbd::mirror::ImageReplayer<>(threads, local, remote, local_mirror_uuid,
+ remote_mirror_uuid, local_pool_id,
+ remote_pool_id, remote_image_id,
+ global_image_id)
{}
void set_error(const std::string &state, int r) {
template <typename I>
ImageReplayer<I>::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
- const std::string &mirror_uuid,
+ const std::string &local_mirror_uuid,
+ const std::string &remote_mirror_uuid,
int64_t local_pool_id,
int64_t remote_pool_id,
const std::string &remote_image_id,
m_threads(threads),
m_local(local),
m_remote(remote),
- m_mirror_uuid(mirror_uuid),
+ m_local_mirror_uuid(local_mirror_uuid),
+ m_remote_mirror_uuid(remote_mirror_uuid),
m_remote_pool_id(remote_pool_id),
m_local_pool_id(local_pool_id),
m_remote_image_id(remote_image_id),
m_remote_journaler = new Journaler(m_threads->work_queue,
m_threads->timer,
&m_threads->timer_lock, m_remote_ioctx,
- m_remote_image_id, m_mirror_uuid,
+ m_remote_image_id, m_local_mirror_uuid,
commit_interval);
bootstrap();
m_local_ioctx, m_remote_ioctx, &m_local_image_ctx,
m_local_image_name, m_remote_image_id, m_global_image_id,
m_threads->work_queue, m_threads->timer, &m_threads->timer_lock,
- m_mirror_uuid, m_remote_journaler, &m_client_meta, ctx);
+ m_local_mirror_uuid, m_remote_journaler, &m_client_meta, ctx);
request->send();
}
assert(m_state == STATE_STOPPING);
m_local_image_ctx->journal->stop_external_replay();
m_local_replay = nullptr;
+ m_replay_entry = ReplayEntry();
+ m_replay_tag_valid = false;
}
on_stop_local_image_close_start();
return;
}
- if (!m_remote_journaler->try_pop_front(&m_replay_entry)) {
+ if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
return;
}
- // TODO
- process_entry();
+ if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
+ process_entry();
+ return;
+ }
+
+ replay_flush();
}
template <typename I>
}
});
on_flush_local_replay_flush_start(ctx);
+ return;
}
}
void ImageReplayer<I>::replay_flush() {
dout(20) << dendl;
- // TODO
+ Context *ctx = create_context_callback<
+ ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
+ flush(ctx);
}
template <typename I>
void ImageReplayer<I>::handle_replay_flush(int r) {
dout(20) << "r=" << r << dendl;
- // TODO
+ if (r < 0) {
+ derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
+ handle_replay_complete(r);
+ return;
+ }
+
+ get_remote_tag();
}
template <typename I>
void ImageReplayer<I>::get_remote_tag() {
- dout(20) << dendl;
+ dout(20) << "tag_tid: " << m_replay_tag_tid << dendl;
- // TODO
+ Context *ctx = create_context_callback<
+ ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
+ m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
}
template <typename I>
void ImageReplayer<I>::handle_get_remote_tag(int r) {
dout(20) << "r=" << r << dendl;
- // TODO
+ if (r == 0) {
+ try {
+ bufferlist::iterator it = m_replay_tag.data.begin();
+ ::decode(m_replay_tag_data, it);
+ } catch (const buffer::error &err) {
+ r = -EBADMSG;
+ }
+ }
+
+ if (r < 0) {
+ derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
+ << cpp_strerror(r) << dendl;
+ handle_replay_complete(r);
+ return;
+ }
+
+ m_replay_tag_valid = true;
+ dout(20) << "decoded remote tag " << m_replay_tag_tid << ": "
+ << m_replay_tag_data << dendl;
+
+ allocate_local_tag();
}
template <typename I>
void ImageReplayer<I>::allocate_local_tag() {
dout(20) << dendl;
- // TODO
+ std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
+ if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID ||
+ mirror_uuid == m_local_mirror_uuid) {
+ mirror_uuid = m_remote_mirror_uuid;
+ }
+
+ std::string predecessor_mirror_uuid =
+ m_replay_tag_data.predecessor_mirror_uuid;
+ if (predecessor_mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
+ mirror_uuid = m_remote_mirror_uuid;
+ } else if (predecessor_mirror_uuid == m_local_mirror_uuid) {
+ predecessor_mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
+ }
+
+ Context *ctx = create_context_callback<
+ ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
+ m_local_image_ctx->journal->allocate_tag(
+ mirror_uuid, predecessor_mirror_uuid,
+ m_replay_tag_data.predecessor_commit_valid,
+ m_replay_tag_data.predecessor_tag_tid,
+ m_replay_tag_data.predecessor_entry_tid,
+ ctx);
}
template <typename I>
void ImageReplayer<I>::handle_allocate_local_tag(int r) {
dout(20) << "r=" << r << dendl;
- // TODO
+ if (r < 0) {
+ derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
+ handle_replay_complete(r);
+ return;
+ }
+
+ process_entry();
}
template <typename I>
};
ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
- const std::string &mirror_uuid, int64_t local_pool_id,
+ const std::string &local_mirror_uuid,
+ const std::string &remote_mirror_uuid, int64_t local_pool_id,
int64_t remote_pool_id, const std::string &remote_image_id,
const std::string &global_image_id);
virtual ~ImageReplayer();
Threads *m_threads;
RadosRef m_local, m_remote;
- std::string m_mirror_uuid;
+ std::string m_local_mirror_uuid;
+ std::string m_remote_mirror_uuid;
int64_t m_remote_pool_id, m_local_pool_id;
std::string m_remote_image_id, m_local_image_id, m_global_image_id;
std::string m_local_image_name;
librbd::journal::MirrorPeerClientMeta m_client_meta;
ReplayEntry m_replay_entry;
+ bool m_replay_tag_valid = false;
+ uint64_t m_replay_tag_tid = 0;
+ cls::journal::Tag m_replay_tag;
+ librbd::journal::TagData m_replay_tag_data;
struct C_ReplayCommitted : public Context {
ImageReplayer *replayer;
continue;
}
- std::string mirror_uuid;
- r = librbd::cls_client::mirror_uuid_get(&local_ioctx, &mirror_uuid);
+ std::string local_mirror_uuid;
+ r = librbd::cls_client::mirror_uuid_get(&local_ioctx, &local_mirror_uuid);
if (r < 0) {
- derr << "failed to retrieve mirror uuid from pool "
+ derr << "failed to retrieve local mirror uuid from pool "
<< local_ioctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
continue;
}
+ std::string remote_mirror_uuid;
+ r = librbd::cls_client::mirror_uuid_get(&remote_ioctx, &remote_mirror_uuid);
+ if (r < 0) {
+ derr << "failed to retrieve remote mirror uuid from pool "
+ << remote_ioctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
+ continue;
+ }
+
// create entry for pool if it doesn't exist
auto &pool_replayers = m_images[pool_id];
for (const auto &image_id : kv.second) {
auto it = pool_replayers.find(image_id.id);
if (it == pool_replayers.end()) {
unique_ptr<ImageReplayer<> > image_replayer(new ImageReplayer<>(
- m_threads, m_local, m_remote, mirror_uuid, local_ioctx.get_id(),
- pool_id, image_id.id, image_id.global_id));
+ m_threads, m_local, m_remote, local_mirror_uuid, remote_mirror_uuid,
+ local_ioctx.get_id(), pool_id, image_id.id, image_id.global_id));
it = pool_replayers.insert(
std::make_pair(image_id.id, std::move(image_replayer))).first;
}