#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
#include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
#include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
#include "test/rbd_mirror/test_mock_fixture.h"
#include "test/journal/mock/MockJournaler.h"
#include "test/librbd/mock/MockImageCtx.h"
const std::string &global_image_id,
const std::string &local_mirror_uuid,
const std::string &local_image_id,
- const journal::Settings &settings,
- journal::CacheManagerHandler *cache_manager_handler,
+ const ::journal::Settings &settings,
+ ::journal::CacheManagerHandler *cache_manager_handler,
std::string *remote_mirror_uuid,
std::string *remote_image_id,
::journal::MockJournalerProxy **remote_journaler,
MOCK_METHOD2(preprocess, void(librbd::journal::EventEntry *, Context *));
};
+BootstrapRequest<librbd::MockTestImageCtx>* BootstrapRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
+CloseImageRequest<librbd::MockTestImageCtx>* CloseImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
+EventPreprocessor<librbd::MockTestImageCtx>* EventPreprocessor<librbd::MockTestImageCtx>::s_instance = nullptr;
+PrepareLocalImageRequest<librbd::MockTestImageCtx>* PrepareLocalImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
+PrepareRemoteImageRequest<librbd::MockTestImageCtx>* PrepareRemoteImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
+
+namespace journal {
+
template<>
struct ReplayStatusFormatter<librbd::MockTestImageCtx> {
static ReplayStatusFormatter* s_instance;
MOCK_METHOD2(get_or_send_update, bool(std::string *description, Context *on_finish));
};
-BootstrapRequest<librbd::MockTestImageCtx>* BootstrapRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
-CloseImageRequest<librbd::MockTestImageCtx>* CloseImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
-EventPreprocessor<librbd::MockTestImageCtx>* EventPreprocessor<librbd::MockTestImageCtx>::s_instance = nullptr;
-PrepareLocalImageRequest<librbd::MockTestImageCtx>* PrepareLocalImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
-PrepareRemoteImageRequest<librbd::MockTestImageCtx>* PrepareRemoteImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
ReplayStatusFormatter<librbd::MockTestImageCtx>* ReplayStatusFormatter<librbd::MockTestImageCtx>::s_instance = nullptr;
+} // namespace journal
} // namespace image_replayer
} // namespace mirror
} // namespace rbd
typedef image_replayer::EventPreprocessor<librbd::MockTestImageCtx> MockEventPreprocessor;
typedef image_replayer::PrepareLocalImageRequest<librbd::MockTestImageCtx> MockPrepareLocalImageRequest;
typedef image_replayer::PrepareRemoteImageRequest<librbd::MockTestImageCtx> MockPrepareRemoteImageRequest;
- typedef image_replayer::ReplayStatusFormatter<librbd::MockTestImageCtx> MockReplayStatusFormatter;
+ typedef image_replayer::journal::ReplayStatusFormatter<librbd::MockTestImageCtx> MockReplayStatusFormatter;
typedef librbd::journal::Replay<librbd::MockTestImageCtx> MockReplay;
typedef ImageReplayer<librbd::MockTestImageCtx> MockImageReplayer;
typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
image_replayer/OpenLocalImageRequest.cc
image_replayer/PrepareLocalImageRequest.cc
image_replayer/PrepareRemoteImageRequest.cc
- image_replayer/ReplayStatusFormatter.cc
image_replayer/Utils.cc
+ image_replayer/journal/ReplayStatusFormatter.cc
image_sync/SyncPointCreateRequest.cc
image_sync/SyncPointPruneRequest.cc
pool_watcher/RefreshImagesRequest.cc
#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
#include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
#include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
-#include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
+#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd_mirror
return;
}
- m_replay_status_formatter = image_replayer::ReplayStatusFormatter<I>::create(
- m_remote_journaler, m_local_mirror_uuid);
+ m_replay_status_formatter =
+ image_replayer::journal::ReplayStatusFormatter<I>::create(
+ m_remote_journaler, m_local_mirror_uuid);
Context *on_finish(nullptr);
{
}
dout(10) << "stop complete" << dendl;
- image_replayer::ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
+ image_replayer::journal::ReplayStatusFormatter<I>::destroy(
+ m_replay_status_formatter);
m_replay_status_formatter = nullptr;
Context *on_start = nullptr;
namespace journal {
struct CacheManagerHandler;
-
class Journaler;
class ReplayHandler;
-}
+} // namespace journal
namespace librbd {
class ImageCtx;
namespace journal { template <typename> class Replay; }
-}
+} // namespace librbd
namespace rbd {
namespace mirror {
template <typename> struct MirrorStatusUpdater;
template <typename> struct Threads;
-namespace image_replayer { template <typename> class BootstrapRequest; }
-namespace image_replayer { template <typename> class EventPreprocessor; }
-namespace image_replayer { template <typename> class ReplayStatusFormatter; }
+namespace image_replayer {
+
+template <typename> class BootstrapRequest;
+template <typename> class EventPreprocessor;
+
+namespace journal { template <typename> class ReplayStatusFormatter; }
+
+} // namespace image_replayer
/**
* Replays changes from a remote cluster for a single image.
bool m_resync_requested = false;
image_replayer::EventPreprocessor<ImageCtxT> *m_event_preprocessor = nullptr;
- image_replayer::ReplayStatusFormatter<ImageCtxT> *m_replay_status_formatter =
- nullptr;
+ image_replayer::journal::ReplayStatusFormatter<ImageCtxT>*
+ m_replay_status_formatter = nullptr;
ImageCtxT *m_local_image_ctx = nullptr;
std::string m_local_image_tag_owner;
const std::string &global_image_id,
const std::string &local_mirror_uuid,
const std::string &local_image_id,
- const journal::Settings &settings,
- journal::CacheManagerHandler *cache_manager_handler,
+ const ::journal::Settings &settings,
+ ::journal::CacheManagerHandler *cache_manager_handler,
std::string *remote_mirror_uuid,
std::string *remote_image_id,
Journaler **remote_journaler,
const std::string &global_image_id,
const std::string &local_mirror_uuid,
const std::string &local_image_id,
- const journal::Settings &journal_settings,
- journal::CacheManagerHandler *cache_manager_handler,
+ const ::journal::Settings &journal_settings,
+ ::journal::CacheManagerHandler *cache_manager_handler,
std::string *remote_mirror_uuid,
std::string *remote_image_id,
Journaler **remote_journaler,
std::string m_global_image_id;
std::string m_local_mirror_uuid;
std::string m_local_image_id;
- journal::Settings m_journal_settings;
- journal::CacheManagerHandler *m_cache_manager_handler;
+ ::journal::Settings m_journal_settings;
+ ::journal::CacheManagerHandler *m_cache_manager_handler;
std::string *m_remote_mirror_uuid;
std::string *m_remote_image_id;
Journaler **m_remote_journaler;
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "ReplayStatusFormatter.h"
-#include "common/debug.h"
-#include "common/dout.h"
-#include "common/errno.h"
-#include "journal/Journaler.h"
-#include "librbd/ImageCtx.h"
-#include "librbd/Journal.h"
-#include "librbd/Utils.h"
-
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_rbd_mirror
-#undef dout_prefix
-#define dout_prefix *_dout << "rbd::mirror::image_replayer::ReplayStatusFormatter: " \
- << this << " " << __func__ << ": "
-
-namespace rbd {
-namespace mirror {
-namespace image_replayer {
-
-using librbd::util::unique_lock_name;
-
-template <typename I>
-ReplayStatusFormatter<I>::ReplayStatusFormatter(Journaler *journaler,
- const std::string &mirror_uuid)
- : m_journaler(journaler),
- m_mirror_uuid(mirror_uuid),
- m_lock(ceph::make_mutex(unique_lock_name("ReplayStatusFormatter::m_lock", this))) {
-}
-
-template <typename I>
-bool ReplayStatusFormatter<I>::get_or_send_update(std::string *description,
- Context *on_finish) {
- dout(20) << dendl;
-
- bool in_progress = false;
- {
- std::lock_guard locker{m_lock};
- if (m_on_finish) {
- in_progress = true;
- } else {
- m_on_finish = on_finish;
- }
- }
-
- if (in_progress) {
- dout(10) << "previous request is still in progress, ignoring" << dendl;
- on_finish->complete(-EAGAIN);
- return false;
- }
-
- m_master_position = cls::journal::ObjectPosition();
- m_mirror_position = cls::journal::ObjectPosition();
-
- cls::journal::Client master_client, mirror_client;
- int r;
-
- r = m_journaler->get_cached_client(librbd::Journal<>::IMAGE_CLIENT_ID,
- &master_client);
- if (r < 0) {
- derr << "error retrieving registered master client: "
- << cpp_strerror(r) << dendl;
- } else {
- r = m_journaler->get_cached_client(m_mirror_uuid, &mirror_client);
- if (r < 0) {
- derr << "error retrieving registered mirror client: "
- << cpp_strerror(r) << dendl;
- }
- }
-
- if (!master_client.commit_position.object_positions.empty()) {
- m_master_position =
- *(master_client.commit_position.object_positions.begin());
- }
-
- if (!mirror_client.commit_position.object_positions.empty()) {
- m_mirror_position =
- *(mirror_client.commit_position.object_positions.begin());
- }
-
- if (!calculate_behind_master_or_send_update()) {
- dout(20) << "need to update tag cache" << dendl;
- return false;
- }
-
- format(description);
-
- {
- std::lock_guard locker{m_lock};
- ceph_assert(m_on_finish == on_finish);
- m_on_finish = nullptr;
- }
-
- on_finish->complete(-EEXIST);
- return true;
-}
-
-template <typename I>
-bool ReplayStatusFormatter<I>::calculate_behind_master_or_send_update() {
- dout(20) << "m_master_position=" << m_master_position
- << ", m_mirror_position=" << m_mirror_position << dendl;
-
- m_entries_behind_master = 0;
-
- if (m_master_position == cls::journal::ObjectPosition() ||
- m_master_position.tag_tid < m_mirror_position.tag_tid) {
- return true;
- }
-
- cls::journal::ObjectPosition master = m_master_position;
- uint64_t mirror_tag_tid = m_mirror_position.tag_tid;
-
- while (master.tag_tid > mirror_tag_tid) {
- auto tag_it = m_tag_cache.find(master.tag_tid);
- if (tag_it == m_tag_cache.end()) {
- send_update_tag_cache(master.tag_tid, mirror_tag_tid);
- return false;
- }
- librbd::journal::TagData &tag_data = tag_it->second;
- m_entries_behind_master += master.entry_tid;
- master = {0, tag_data.predecessor.tag_tid, tag_data.predecessor.entry_tid};
- }
- if (master.tag_tid == mirror_tag_tid &&
- master.entry_tid > m_mirror_position.entry_tid) {
- m_entries_behind_master += master.entry_tid - m_mirror_position.entry_tid;
- }
-
- dout(20) << "clearing tags not needed any more (below mirror position)"
- << dendl;
-
- uint64_t tag_tid = mirror_tag_tid;
- size_t old_size = m_tag_cache.size();
- while (tag_tid != 0) {
- auto tag_it = m_tag_cache.find(tag_tid);
- if (tag_it == m_tag_cache.end()) {
- break;
- }
- librbd::journal::TagData &tag_data = tag_it->second;
-
- dout(20) << "erasing tag " << tag_data << "for tag_tid " << tag_tid
- << dendl;
-
- tag_tid = tag_data.predecessor.tag_tid;
- m_tag_cache.erase(tag_it);
- }
-
- dout(20) << old_size - m_tag_cache.size() << " entries cleared" << dendl;
-
- return true;
-}
-
-template <typename I>
-void ReplayStatusFormatter<I>::send_update_tag_cache(uint64_t master_tag_tid,
- uint64_t mirror_tag_tid) {
- if (master_tag_tid <= mirror_tag_tid ||
- m_tag_cache.find(master_tag_tid) != m_tag_cache.end()) {
- Context *on_finish = nullptr;
- {
- std::lock_guard locker{m_lock};
- std::swap(m_on_finish, on_finish);
- }
-
- ceph_assert(on_finish);
- on_finish->complete(0);
- return;
- }
-
- dout(20) << "master_tag_tid=" << master_tag_tid << ", mirror_tag_tid="
- << mirror_tag_tid << dendl;
-
- auto ctx = new LambdaContext(
- [this, master_tag_tid, mirror_tag_tid](int r) {
- handle_update_tag_cache(master_tag_tid, mirror_tag_tid, r);
- });
- m_journaler->get_tag(master_tag_tid, &m_tag, ctx);
-}
-
-template <typename I>
-void ReplayStatusFormatter<I>::handle_update_tag_cache(uint64_t master_tag_tid,
- uint64_t mirror_tag_tid,
- int r) {
- librbd::journal::TagData tag_data;
-
- if (r < 0) {
- derr << "error retrieving tag " << master_tag_tid << ": " << cpp_strerror(r)
- << dendl;
- } else {
- dout(20) << "retrieved tag " << master_tag_tid << ": " << m_tag << dendl;
-
- auto it = m_tag.data.cbegin();
- try {
- decode(tag_data, it);
- } catch (const buffer::error &err) {
- derr << "error decoding tag " << master_tag_tid << ": " << err.what()
- << dendl;
- }
- }
-
- if (tag_data.predecessor.mirror_uuid !=
- librbd::Journal<>::LOCAL_MIRROR_UUID &&
- tag_data.predecessor.mirror_uuid !=
- librbd::Journal<>::ORPHAN_MIRROR_UUID) {
- dout(20) << "hit remote image non-primary epoch" << dendl;
- tag_data.predecessor = {};
- }
-
- dout(20) << "decoded tag " << master_tag_tid << ": " << tag_data << dendl;
-
- m_tag_cache[master_tag_tid] = tag_data;
- send_update_tag_cache(tag_data.predecessor.tag_tid, mirror_tag_tid);
-}
-
-template <typename I>
-void ReplayStatusFormatter<I>::format(std::string *description) {
-
- dout(20) << "m_master_position=" << m_master_position
- << ", m_mirror_position=" << m_mirror_position
- << ", m_entries_behind_master=" << m_entries_behind_master << dendl;
-
- std::stringstream ss;
- ss << "master_position=";
- if (m_master_position == cls::journal::ObjectPosition()) {
- ss << "[]";
- } else {
- ss << m_master_position;
- }
- ss << ", mirror_position=";
- if (m_mirror_position == cls::journal::ObjectPosition()) {
- ss << "[]";
- } else {
- ss << m_mirror_position;
- }
- ss << ", entries_behind_master="
- << (m_entries_behind_master > 0 ? m_entries_behind_master : 0);
-
- *description = ss.str();
-}
-
-} // namespace image_replayer
-} // namespace mirror
-} // namespace rbd
-
-template class
-rbd::mirror::image_replayer::ReplayStatusFormatter<librbd::ImageCtx>;
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H
-#define RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H
-
-#include "include/Context.h"
-#include "common/ceph_mutex.h"
-#include "cls/journal/cls_journal_types.h"
-#include "librbd/journal/Types.h"
-#include "librbd/journal/TypeTraits.h"
-
-namespace journal { class Journaler; }
-namespace librbd { class ImageCtx; }
-
-namespace rbd {
-namespace mirror {
-namespace image_replayer {
-
-template <typename ImageCtxT = librbd::ImageCtx>
-class ReplayStatusFormatter {
-public:
- typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
-
- static ReplayStatusFormatter* create(Journaler *journaler,
- const std::string &mirror_uuid) {
- return new ReplayStatusFormatter(journaler, mirror_uuid);
- }
-
- static void destroy(ReplayStatusFormatter* formatter) {
- delete formatter;
- }
-
- ReplayStatusFormatter(Journaler *journaler, const std::string &mirror_uuid);
-
- bool get_or_send_update(std::string *description, Context *on_finish);
-
-private:
- Journaler *m_journaler;
- std::string m_mirror_uuid;
- ceph::mutex m_lock;
- Context *m_on_finish = nullptr;
- cls::journal::ObjectPosition m_master_position;
- cls::journal::ObjectPosition m_mirror_position;
- int m_entries_behind_master = 0;
- cls::journal::Tag m_tag;
- std::map<uint64_t, librbd::journal::TagData> m_tag_cache;
-
- bool calculate_behind_master_or_send_update();
- void send_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid);
- void handle_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid,
- int r);
- void format(std::string *description);
-};
-
-} // namespace image_replayer
-} // namespace mirror
-} // namespace rbd
-
-#endif // RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ReplayStatusFormatter.h"
+#include "common/debug.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "journal/Journaler.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Journal.h"
+#include "librbd/Utils.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \
+ << "ReplayStatusFormatter: " << this << " " \
+ << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+using librbd::util::unique_lock_name;
+
+template <typename I>
+ReplayStatusFormatter<I>::ReplayStatusFormatter(Journaler *journaler,
+ const std::string &mirror_uuid)
+ : m_journaler(journaler),
+ m_mirror_uuid(mirror_uuid),
+ m_lock(ceph::make_mutex(unique_lock_name("ReplayStatusFormatter::m_lock", this))) {
+}
+
+template <typename I>
+bool ReplayStatusFormatter<I>::get_or_send_update(std::string *description,
+ Context *on_finish) {
+ dout(20) << dendl;
+
+ bool in_progress = false;
+ {
+ std::lock_guard locker{m_lock};
+ if (m_on_finish) {
+ in_progress = true;
+ } else {
+ m_on_finish = on_finish;
+ }
+ }
+
+ if (in_progress) {
+ dout(10) << "previous request is still in progress, ignoring" << dendl;
+ on_finish->complete(-EAGAIN);
+ return false;
+ }
+
+ m_master_position = cls::journal::ObjectPosition();
+ m_mirror_position = cls::journal::ObjectPosition();
+
+ cls::journal::Client master_client, mirror_client;
+ int r;
+
+ r = m_journaler->get_cached_client(librbd::Journal<>::IMAGE_CLIENT_ID,
+ &master_client);
+ if (r < 0) {
+ derr << "error retrieving registered master client: "
+ << cpp_strerror(r) << dendl;
+ } else {
+ r = m_journaler->get_cached_client(m_mirror_uuid, &mirror_client);
+ if (r < 0) {
+ derr << "error retrieving registered mirror client: "
+ << cpp_strerror(r) << dendl;
+ }
+ }
+
+ if (!master_client.commit_position.object_positions.empty()) {
+ m_master_position =
+ *(master_client.commit_position.object_positions.begin());
+ }
+
+ if (!mirror_client.commit_position.object_positions.empty()) {
+ m_mirror_position =
+ *(mirror_client.commit_position.object_positions.begin());
+ }
+
+ if (!calculate_behind_master_or_send_update()) {
+ dout(20) << "need to update tag cache" << dendl;
+ return false;
+ }
+
+ format(description);
+
+ {
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_on_finish == on_finish);
+ m_on_finish = nullptr;
+ }
+
+ on_finish->complete(-EEXIST);
+ return true;
+}
+
+template <typename I>
+bool ReplayStatusFormatter<I>::calculate_behind_master_or_send_update() {
+ dout(20) << "m_master_position=" << m_master_position
+ << ", m_mirror_position=" << m_mirror_position << dendl;
+
+ m_entries_behind_master = 0;
+
+ if (m_master_position == cls::journal::ObjectPosition() ||
+ m_master_position.tag_tid < m_mirror_position.tag_tid) {
+ return true;
+ }
+
+ cls::journal::ObjectPosition master = m_master_position;
+ uint64_t mirror_tag_tid = m_mirror_position.tag_tid;
+
+ while (master.tag_tid > mirror_tag_tid) {
+ auto tag_it = m_tag_cache.find(master.tag_tid);
+ if (tag_it == m_tag_cache.end()) {
+ send_update_tag_cache(master.tag_tid, mirror_tag_tid);
+ return false;
+ }
+ librbd::journal::TagData &tag_data = tag_it->second;
+ m_entries_behind_master += master.entry_tid;
+ master = {0, tag_data.predecessor.tag_tid, tag_data.predecessor.entry_tid};
+ }
+ if (master.tag_tid == mirror_tag_tid &&
+ master.entry_tid > m_mirror_position.entry_tid) {
+ m_entries_behind_master += master.entry_tid - m_mirror_position.entry_tid;
+ }
+
+ dout(20) << "clearing tags not needed any more (below mirror position)"
+ << dendl;
+
+ uint64_t tag_tid = mirror_tag_tid;
+ size_t old_size = m_tag_cache.size();
+ while (tag_tid != 0) {
+ auto tag_it = m_tag_cache.find(tag_tid);
+ if (tag_it == m_tag_cache.end()) {
+ break;
+ }
+ librbd::journal::TagData &tag_data = tag_it->second;
+
+ dout(20) << "erasing tag " << tag_data << "for tag_tid " << tag_tid
+ << dendl;
+
+ tag_tid = tag_data.predecessor.tag_tid;
+ m_tag_cache.erase(tag_it);
+ }
+
+ dout(20) << old_size - m_tag_cache.size() << " entries cleared" << dendl;
+
+ return true;
+}
+
+template <typename I>
+void ReplayStatusFormatter<I>::send_update_tag_cache(uint64_t master_tag_tid,
+ uint64_t mirror_tag_tid) {
+ if (master_tag_tid <= mirror_tag_tid ||
+ m_tag_cache.find(master_tag_tid) != m_tag_cache.end()) {
+ Context *on_finish = nullptr;
+ {
+ std::lock_guard locker{m_lock};
+ std::swap(m_on_finish, on_finish);
+ }
+
+ ceph_assert(on_finish);
+ on_finish->complete(0);
+ return;
+ }
+
+ dout(20) << "master_tag_tid=" << master_tag_tid << ", mirror_tag_tid="
+ << mirror_tag_tid << dendl;
+
+ auto ctx = new LambdaContext(
+ [this, master_tag_tid, mirror_tag_tid](int r) {
+ handle_update_tag_cache(master_tag_tid, mirror_tag_tid, r);
+ });
+ m_journaler->get_tag(master_tag_tid, &m_tag, ctx);
+}
+
+template <typename I>
+void ReplayStatusFormatter<I>::handle_update_tag_cache(uint64_t master_tag_tid,
+ uint64_t mirror_tag_tid,
+ int r) {
+ librbd::journal::TagData tag_data;
+
+ if (r < 0) {
+ derr << "error retrieving tag " << master_tag_tid << ": " << cpp_strerror(r)
+ << dendl;
+ } else {
+ dout(20) << "retrieved tag " << master_tag_tid << ": " << m_tag << dendl;
+
+ auto it = m_tag.data.cbegin();
+ try {
+ decode(tag_data, it);
+ } catch (const buffer::error &err) {
+ derr << "error decoding tag " << master_tag_tid << ": " << err.what()
+ << dendl;
+ }
+ }
+
+ if (tag_data.predecessor.mirror_uuid !=
+ librbd::Journal<>::LOCAL_MIRROR_UUID &&
+ tag_data.predecessor.mirror_uuid !=
+ librbd::Journal<>::ORPHAN_MIRROR_UUID) {
+ dout(20) << "hit remote image non-primary epoch" << dendl;
+ tag_data.predecessor = {};
+ }
+
+ dout(20) << "decoded tag " << master_tag_tid << ": " << tag_data << dendl;
+
+ m_tag_cache[master_tag_tid] = tag_data;
+ send_update_tag_cache(tag_data.predecessor.tag_tid, mirror_tag_tid);
+}
+
+template <typename I>
+void ReplayStatusFormatter<I>::format(std::string *description) {
+
+ dout(20) << "m_master_position=" << m_master_position
+ << ", m_mirror_position=" << m_mirror_position
+ << ", m_entries_behind_master=" << m_entries_behind_master << dendl;
+
+ std::stringstream ss;
+ ss << "master_position=";
+ if (m_master_position == cls::journal::ObjectPosition()) {
+ ss << "[]";
+ } else {
+ ss << m_master_position;
+ }
+ ss << ", mirror_position=";
+ if (m_mirror_position == cls::journal::ObjectPosition()) {
+ ss << "[]";
+ } else {
+ ss << m_mirror_position;
+ }
+ ss << ", entries_behind_master="
+ << (m_entries_behind_master > 0 ? m_entries_behind_master : 0);
+
+ *description = ss.str();
+}
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::image_replayer::journal::ReplayStatusFormatter<librbd::ImageCtx>;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H
+#define RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H
+
+#include "include/Context.h"
+#include "common/ceph_mutex.h"
+#include "cls/journal/cls_journal_types.h"
+#include "librbd/journal/Types.h"
+#include "librbd/journal/TypeTraits.h"
+
+namespace journal { class Journaler; }
+namespace librbd { class ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class ReplayStatusFormatter {
+public:
+ typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
+
+ static ReplayStatusFormatter* create(Journaler *journaler,
+ const std::string &mirror_uuid) {
+ return new ReplayStatusFormatter(journaler, mirror_uuid);
+ }
+
+ static void destroy(ReplayStatusFormatter* formatter) {
+ delete formatter;
+ }
+
+ ReplayStatusFormatter(Journaler *journaler, const std::string &mirror_uuid);
+
+ bool get_or_send_update(std::string *description, Context *on_finish);
+
+private:
+ Journaler *m_journaler;
+ std::string m_mirror_uuid;
+ ceph::mutex m_lock;
+ Context *m_on_finish = nullptr;
+ cls::journal::ObjectPosition m_master_position;
+ cls::journal::ObjectPosition m_mirror_position;
+ int m_entries_behind_master = 0;
+ cls::journal::Tag m_tag;
+ std::map<uint64_t, librbd::journal::TagData> m_tag_cache;
+
+ bool calculate_behind_master_or_send_update();
+ void send_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid);
+ void handle_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid,
+ int r);
+ void format(std::string *description);
+};
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+extern template class rbd::mirror::image_replayer::journal::ReplayStatusFormatter<librbd::ImageCtx>;
+
+#endif // RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H