]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rbd-mirror: moved replay status formatter to sub-namespace
authorJason Dillaman <dillaman@redhat.com>
Sat, 7 Dec 2019 03:03:02 +0000 (22:03 -0500)
committerJason Dillaman <dillaman@redhat.com>
Mon, 16 Dec 2019 01:03:36 +0000 (20:03 -0500)
The current implementation is tied to journal-based replaying so
move it down into a new journal sub-namespace.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/test/rbd_mirror/test_mock_ImageReplayer.cc
src/tools/rbd_mirror/CMakeLists.txt
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h
src/tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h
src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.cc [deleted file]
src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h [deleted file]
src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.cc [new file with mode: 0644]
src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h [new file with mode: 0644]

index 310e17b0c2b98311d284381bf3f21db03effab43..a47b4456e6f65ba3815d65cf1a2f29885623750e 100644 (file)
@@ -14,6 +14,7 @@
 #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
 #include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
 #include "test/rbd_mirror/test_mock_fixture.h"
 #include "test/journal/mock/MockJournaler.h"
 #include "test/librbd/mock/MockImageCtx.h"
@@ -165,8 +166,8 @@ struct PrepareRemoteImageRequest<librbd::MockTestImageCtx> {
                                            const std::string &global_image_id,
                                            const std::string &local_mirror_uuid,
                                            const std::string &local_image_id,
-                                           const journal::Settings &settings,
-                                           journal::CacheManagerHandler *cache_manager_handler,
+                                           const ::journal::Settings &settings,
+                                           ::journal::CacheManagerHandler *cache_manager_handler,
                                            std::string *remote_mirror_uuid,
                                            std::string *remote_image_id,
                                            ::journal::MockJournalerProxy **remote_journaler,
@@ -299,6 +300,14 @@ struct EventPreprocessor<librbd::MockTestImageCtx> {
   MOCK_METHOD2(preprocess, void(librbd::journal::EventEntry *, Context *));
 };
 
+BootstrapRequest<librbd::MockTestImageCtx>* BootstrapRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
+CloseImageRequest<librbd::MockTestImageCtx>* CloseImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
+EventPreprocessor<librbd::MockTestImageCtx>* EventPreprocessor<librbd::MockTestImageCtx>::s_instance = nullptr;
+PrepareLocalImageRequest<librbd::MockTestImageCtx>* PrepareLocalImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
+PrepareRemoteImageRequest<librbd::MockTestImageCtx>* PrepareRemoteImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
+
+namespace journal {
+
 template<>
 struct ReplayStatusFormatter<librbd::MockTestImageCtx> {
   static ReplayStatusFormatter* s_instance;
@@ -325,13 +334,9 @@ struct ReplayStatusFormatter<librbd::MockTestImageCtx> {
   MOCK_METHOD2(get_or_send_update, bool(std::string *description, Context *on_finish));
 };
 
-BootstrapRequest<librbd::MockTestImageCtx>* BootstrapRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
-CloseImageRequest<librbd::MockTestImageCtx>* CloseImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
-EventPreprocessor<librbd::MockTestImageCtx>* EventPreprocessor<librbd::MockTestImageCtx>::s_instance = nullptr;
-PrepareLocalImageRequest<librbd::MockTestImageCtx>* PrepareLocalImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
-PrepareRemoteImageRequest<librbd::MockTestImageCtx>* PrepareRemoteImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
 ReplayStatusFormatter<librbd::MockTestImageCtx>* ReplayStatusFormatter<librbd::MockTestImageCtx>::s_instance = nullptr;
 
+} // namespace journal
 } // namespace image_replayer
 } // namespace mirror
 } // namespace rbd
@@ -363,7 +368,7 @@ public:
   typedef image_replayer::EventPreprocessor<librbd::MockTestImageCtx> MockEventPreprocessor;
   typedef image_replayer::PrepareLocalImageRequest<librbd::MockTestImageCtx> MockPrepareLocalImageRequest;
   typedef image_replayer::PrepareRemoteImageRequest<librbd::MockTestImageCtx> MockPrepareRemoteImageRequest;
-  typedef image_replayer::ReplayStatusFormatter<librbd::MockTestImageCtx> MockReplayStatusFormatter;
+  typedef image_replayer::journal::ReplayStatusFormatter<librbd::MockTestImageCtx> MockReplayStatusFormatter;
   typedef librbd::journal::Replay<librbd::MockTestImageCtx> MockReplay;
   typedef ImageReplayer<librbd::MockTestImageCtx> MockImageReplayer;
   typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
index d38353bcf1788f358aee737d272119fb35a216b7..736dbe9246cb1b5aecbb9c9c026710051ff8001b 100644 (file)
@@ -41,8 +41,8 @@ set(rbd_mirror_internal
   image_replayer/OpenLocalImageRequest.cc
   image_replayer/PrepareLocalImageRequest.cc
   image_replayer/PrepareRemoteImageRequest.cc
-  image_replayer/ReplayStatusFormatter.cc
   image_replayer/Utils.cc
+  image_replayer/journal/ReplayStatusFormatter.cc
   image_sync/SyncPointCreateRequest.cc
   image_sync/SyncPointPruneRequest.cc
   pool_watcher/RefreshImagesRequest.cc
index e08e4dde00acd8ee21322f27c5496bcb5aa3b18a..9933dd7509b17423842f59c7972c8474d6ad5d41 100644 (file)
@@ -30,7 +30,7 @@
 #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
 #include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
-#include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
+#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rbd_mirror
@@ -621,8 +621,9 @@ void ImageReplayer<I>::handle_start_replay(int r) {
     return;
   }
 
-  m_replay_status_formatter = image_replayer::ReplayStatusFormatter<I>::create(
-    m_remote_journaler, m_local_mirror_uuid);
+  m_replay_status_formatter =
+    image_replayer::journal::ReplayStatusFormatter<I>::create(
+      m_remote_journaler, m_local_mirror_uuid);
 
   Context *on_finish(nullptr);
   {
@@ -1643,7 +1644,8 @@ void ImageReplayer<I>::handle_shut_down(int r) {
   }
 
   dout(10) << "stop complete" << dendl;
-  image_replayer::ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
+  image_replayer::journal::ReplayStatusFormatter<I>::destroy(
+    m_replay_status_formatter);
   m_replay_status_formatter = nullptr;
 
   Context *on_start = nullptr;
index 166723b3a0d7c4a66ce1cfe698640082ac2d81c8..29613c87cac6dd540639e2b1596a608bba1b1692 100644 (file)
@@ -34,18 +34,17 @@ class PerfCounters;
 namespace journal {
 
 struct CacheManagerHandler;
-
 class Journaler;
 class ReplayHandler;
 
-}
+} // namespace journal
 
 namespace librbd {
 
 class ImageCtx;
 namespace journal { template <typename> class Replay; }
 
-}
+} // namespace librbd
 
 namespace rbd {
 namespace mirror {
@@ -54,9 +53,14 @@ template <typename> struct InstanceWatcher;
 template <typename> struct MirrorStatusUpdater;
 template <typename> struct Threads;
 
-namespace image_replayer { template <typename> class BootstrapRequest; }
-namespace image_replayer { template <typename> class EventPreprocessor; }
-namespace image_replayer { template <typename> class ReplayStatusFormatter; }
+namespace image_replayer {
+
+template <typename> class BootstrapRequest;
+template <typename> class EventPreprocessor;
+
+namespace journal { template <typename> class ReplayStatusFormatter; }
+
+} // namespace image_replayer
 
 /**
  * Replays changes from a remote cluster for a single image.
@@ -308,8 +312,8 @@ private:
   bool m_resync_requested = false;
 
   image_replayer::EventPreprocessor<ImageCtxT> *m_event_preprocessor = nullptr;
-  image_replayer::ReplayStatusFormatter<ImageCtxT> *m_replay_status_formatter =
-    nullptr;
+  image_replayer::journal::ReplayStatusFormatter<ImageCtxT>*
+    m_replay_status_formatter = nullptr;
   ImageCtxT *m_local_image_ctx = nullptr;
   std::string m_local_image_tag_owner;
 
index e6bd76cbfb9dfd67afe2cdc65db51d9c0d9eb3e1..8746430f31eabb3cb19bfae1ee3a5cb634ad3439 100644 (file)
@@ -39,8 +39,8 @@ public:
                                            const std::string &global_image_id,
                                            const std::string &local_mirror_uuid,
                                            const std::string &local_image_id,
-                                           const journal::Settings &settings,
-                                           journal::CacheManagerHandler *cache_manager_handler,
+                                           const ::journal::Settings &settings,
+                                           ::journal::CacheManagerHandler *cache_manager_handler,
                                            std::string *remote_mirror_uuid,
                                            std::string *remote_image_id,
                                            Journaler **remote_journaler,
@@ -61,8 +61,8 @@ public:
                             const std::string &global_image_id,
                             const std::string &local_mirror_uuid,
                             const std::string &local_image_id,
-                            const journal::Settings &journal_settings,
-                            journal::CacheManagerHandler *cache_manager_handler,
+                            const ::journal::Settings &journal_settings,
+                            ::journal::CacheManagerHandler *cache_manager_handler,
                             std::string *remote_mirror_uuid,
                             std::string *remote_image_id,
                             Journaler **remote_journaler,
@@ -111,8 +111,8 @@ private:
   std::string m_global_image_id;
   std::string m_local_mirror_uuid;
   std::string m_local_image_id;
-  journal::Settings m_journal_settings;
-  journal::CacheManagerHandler *m_cache_manager_handler;
+  ::journal::Settings m_journal_settings;
+  ::journal::CacheManagerHandler *m_cache_manager_handler;
   std::string *m_remote_mirror_uuid;
   std::string *m_remote_image_id;
   Journaler **m_remote_journaler;
diff --git a/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.cc b/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.cc
deleted file mode 100644 (file)
index 9471901..0000000
+++ /dev/null
@@ -1,246 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "ReplayStatusFormatter.h"
-#include "common/debug.h"
-#include "common/dout.h"
-#include "common/errno.h"
-#include "journal/Journaler.h"
-#include "librbd/ImageCtx.h"
-#include "librbd/Journal.h"
-#include "librbd/Utils.h"
-
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_rbd_mirror
-#undef dout_prefix
-#define dout_prefix *_dout << "rbd::mirror::image_replayer::ReplayStatusFormatter: " \
-    << this << " " << __func__ << ": "
-
-namespace rbd {
-namespace mirror {
-namespace image_replayer {
-
-using librbd::util::unique_lock_name;
-
-template <typename I>
-ReplayStatusFormatter<I>::ReplayStatusFormatter(Journaler *journaler,
-                                               const std::string &mirror_uuid)
-  : m_journaler(journaler),
-    m_mirror_uuid(mirror_uuid),
-    m_lock(ceph::make_mutex(unique_lock_name("ReplayStatusFormatter::m_lock", this))) {
-}
-
-template <typename I>
-bool ReplayStatusFormatter<I>::get_or_send_update(std::string *description,
-                                                 Context *on_finish) {
-  dout(20) << dendl;
-
-  bool in_progress = false;
-  {
-    std::lock_guard locker{m_lock};
-    if (m_on_finish) {
-      in_progress = true;
-    } else {
-      m_on_finish = on_finish;
-    }
-  }
-
-  if (in_progress) {
-    dout(10) << "previous request is still in progress, ignoring" << dendl;
-    on_finish->complete(-EAGAIN);
-    return false;
-  }
-
-  m_master_position = cls::journal::ObjectPosition();
-  m_mirror_position = cls::journal::ObjectPosition();
-
-  cls::journal::Client master_client, mirror_client;
-  int r;
-
-  r = m_journaler->get_cached_client(librbd::Journal<>::IMAGE_CLIENT_ID,
-                                     &master_client);
-  if (r < 0) {
-    derr << "error retrieving registered master client: "
-        << cpp_strerror(r) << dendl;
-  } else {
-    r = m_journaler->get_cached_client(m_mirror_uuid, &mirror_client);
-    if (r < 0) {
-      derr << "error retrieving registered mirror client: "
-          << cpp_strerror(r) << dendl;
-    }
-  }
-
-  if (!master_client.commit_position.object_positions.empty()) {
-    m_master_position =
-      *(master_client.commit_position.object_positions.begin());
-  }
-
-  if (!mirror_client.commit_position.object_positions.empty()) {
-    m_mirror_position =
-      *(mirror_client.commit_position.object_positions.begin());
-  }
-
-  if (!calculate_behind_master_or_send_update()) {
-    dout(20) << "need to update tag cache" << dendl;
-    return false;
-  }
-
-  format(description);
-
-  {
-    std::lock_guard locker{m_lock};
-    ceph_assert(m_on_finish == on_finish);
-    m_on_finish = nullptr;
-  }
-
-  on_finish->complete(-EEXIST);
-  return true;
-}
-
-template <typename I>
-bool ReplayStatusFormatter<I>::calculate_behind_master_or_send_update() {
-  dout(20) << "m_master_position=" << m_master_position
-          << ", m_mirror_position=" << m_mirror_position << dendl;
-
-  m_entries_behind_master = 0;
-
-  if (m_master_position == cls::journal::ObjectPosition() ||
-      m_master_position.tag_tid < m_mirror_position.tag_tid) {
-    return true;
-  }
-
-  cls::journal::ObjectPosition master = m_master_position;
-  uint64_t mirror_tag_tid = m_mirror_position.tag_tid;
-
-  while (master.tag_tid > mirror_tag_tid) {
-    auto tag_it = m_tag_cache.find(master.tag_tid);
-    if (tag_it == m_tag_cache.end()) {
-      send_update_tag_cache(master.tag_tid, mirror_tag_tid);
-      return false;
-    }
-    librbd::journal::TagData &tag_data = tag_it->second;
-    m_entries_behind_master += master.entry_tid;
-    master = {0, tag_data.predecessor.tag_tid, tag_data.predecessor.entry_tid};
-  }
-  if (master.tag_tid == mirror_tag_tid &&
-      master.entry_tid > m_mirror_position.entry_tid) {
-    m_entries_behind_master += master.entry_tid - m_mirror_position.entry_tid;
-  }
-
-  dout(20) << "clearing tags not needed any more (below mirror position)"
-          << dendl;
-
-  uint64_t tag_tid = mirror_tag_tid;
-  size_t old_size = m_tag_cache.size();
-  while (tag_tid != 0) {
-    auto tag_it = m_tag_cache.find(tag_tid);
-    if (tag_it == m_tag_cache.end()) {
-      break;
-    }
-    librbd::journal::TagData &tag_data = tag_it->second;
-
-    dout(20) << "erasing tag " <<  tag_data << "for tag_tid " << tag_tid
-            << dendl;
-
-    tag_tid = tag_data.predecessor.tag_tid;
-    m_tag_cache.erase(tag_it);
-  }
-
-  dout(20) << old_size - m_tag_cache.size() << " entries cleared" << dendl;
-
-  return true;
-}
-
-template <typename I>
-void ReplayStatusFormatter<I>::send_update_tag_cache(uint64_t master_tag_tid,
-                                                    uint64_t mirror_tag_tid) {
-  if (master_tag_tid <= mirror_tag_tid ||
-      m_tag_cache.find(master_tag_tid) != m_tag_cache.end()) {
-    Context *on_finish = nullptr;
-    {
-      std::lock_guard locker{m_lock};
-      std::swap(m_on_finish, on_finish);
-    }
-
-    ceph_assert(on_finish);
-    on_finish->complete(0);
-    return;
-  }
-
-  dout(20) << "master_tag_tid=" << master_tag_tid << ", mirror_tag_tid="
-          << mirror_tag_tid << dendl;
-
-  auto ctx = new LambdaContext(
-    [this, master_tag_tid, mirror_tag_tid](int r) {
-      handle_update_tag_cache(master_tag_tid, mirror_tag_tid, r);
-    });
-  m_journaler->get_tag(master_tag_tid, &m_tag, ctx);
-}
-
-template <typename I>
-void ReplayStatusFormatter<I>::handle_update_tag_cache(uint64_t master_tag_tid,
-                                                      uint64_t mirror_tag_tid,
-                                                      int r) {
-  librbd::journal::TagData tag_data;
-
-  if (r < 0) {
-    derr << "error retrieving tag " << master_tag_tid << ": " << cpp_strerror(r)
-        << dendl;
-  } else {
-    dout(20) << "retrieved tag " << master_tag_tid << ": " << m_tag << dendl;
-
-    auto it = m_tag.data.cbegin();
-    try {
-      decode(tag_data, it);
-    } catch (const buffer::error &err) {
-      derr << "error decoding tag " << master_tag_tid << ": " << err.what()
-          << dendl;
-    }
-  }
-
-  if (tag_data.predecessor.mirror_uuid !=
-        librbd::Journal<>::LOCAL_MIRROR_UUID &&
-      tag_data.predecessor.mirror_uuid !=
-        librbd::Journal<>::ORPHAN_MIRROR_UUID) {
-    dout(20) << "hit remote image non-primary epoch" << dendl;
-    tag_data.predecessor = {};
-  }
-
-  dout(20) << "decoded tag " << master_tag_tid << ": " << tag_data << dendl;
-
-  m_tag_cache[master_tag_tid] = tag_data;
-  send_update_tag_cache(tag_data.predecessor.tag_tid, mirror_tag_tid);
-}
-
-template <typename I>
-void ReplayStatusFormatter<I>::format(std::string *description) {
-
-  dout(20) << "m_master_position=" << m_master_position
-          << ", m_mirror_position=" << m_mirror_position
-          << ", m_entries_behind_master=" << m_entries_behind_master << dendl;
-
-  std::stringstream ss;
-  ss << "master_position=";
-  if (m_master_position == cls::journal::ObjectPosition()) {
-    ss << "[]";
-  } else {
-    ss << m_master_position;
-  }
-  ss << ", mirror_position=";
-  if (m_mirror_position == cls::journal::ObjectPosition()) {
-    ss << "[]";
-  } else {
-    ss << m_mirror_position;
-  }
-  ss << ", entries_behind_master="
-     << (m_entries_behind_master > 0 ? m_entries_behind_master : 0);
-
-  *description = ss.str();
-}
-
-} // namespace image_replayer
-} // namespace mirror
-} // namespace rbd
-
-template class
-rbd::mirror::image_replayer::ReplayStatusFormatter<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h b/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h
deleted file mode 100644 (file)
index f1e9bf5..0000000
+++ /dev/null
@@ -1,60 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H
-#define RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H
-
-#include "include/Context.h"
-#include "common/ceph_mutex.h"
-#include "cls/journal/cls_journal_types.h"
-#include "librbd/journal/Types.h"
-#include "librbd/journal/TypeTraits.h"
-
-namespace journal { class Journaler; }
-namespace librbd { class ImageCtx; }
-
-namespace rbd {
-namespace mirror {
-namespace image_replayer {
-
-template <typename ImageCtxT = librbd::ImageCtx>
-class ReplayStatusFormatter {
-public:
-  typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
-
-  static ReplayStatusFormatter* create(Journaler *journaler,
-                                      const std::string &mirror_uuid) {
-    return new ReplayStatusFormatter(journaler, mirror_uuid);
-  }
-
-  static void destroy(ReplayStatusFormatter* formatter) {
-    delete formatter;
-  }
-
-  ReplayStatusFormatter(Journaler *journaler, const std::string &mirror_uuid);
-
-  bool get_or_send_update(std::string *description, Context *on_finish);
-
-private:
-  Journaler *m_journaler;
-  std::string m_mirror_uuid;
-  ceph::mutex m_lock;
-  Context *m_on_finish = nullptr;
-  cls::journal::ObjectPosition m_master_position;
-  cls::journal::ObjectPosition m_mirror_position;
-  int m_entries_behind_master = 0;
-  cls::journal::Tag m_tag;
-  std::map<uint64_t, librbd::journal::TagData> m_tag_cache;
-
-  bool calculate_behind_master_or_send_update();
-  void send_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid);
-  void handle_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid,
-                              int r);
-  void format(std::string *description);
-};
-
-} // namespace image_replayer
-} // namespace mirror
-} // namespace rbd
-
-#endif // RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H
diff --git a/src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.cc b/src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.cc
new file mode 100644 (file)
index 0000000..79b4b36
--- /dev/null
@@ -0,0 +1,248 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ReplayStatusFormatter.h"
+#include "common/debug.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "journal/Journaler.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Journal.h"
+#include "librbd/Utils.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \
+                           << "ReplayStatusFormatter: " << this << " " \
+                           << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+using librbd::util::unique_lock_name;
+
+template <typename I>
+ReplayStatusFormatter<I>::ReplayStatusFormatter(Journaler *journaler,
+                                               const std::string &mirror_uuid)
+  : m_journaler(journaler),
+    m_mirror_uuid(mirror_uuid),
+    m_lock(ceph::make_mutex(unique_lock_name("ReplayStatusFormatter::m_lock", this))) {
+}
+
+template <typename I>
+bool ReplayStatusFormatter<I>::get_or_send_update(std::string *description,
+                                                 Context *on_finish) {
+  dout(20) << dendl;
+
+  bool in_progress = false;
+  {
+    std::lock_guard locker{m_lock};
+    if (m_on_finish) {
+      in_progress = true;
+    } else {
+      m_on_finish = on_finish;
+    }
+  }
+
+  if (in_progress) {
+    dout(10) << "previous request is still in progress, ignoring" << dendl;
+    on_finish->complete(-EAGAIN);
+    return false;
+  }
+
+  m_master_position = cls::journal::ObjectPosition();
+  m_mirror_position = cls::journal::ObjectPosition();
+
+  cls::journal::Client master_client, mirror_client;
+  int r;
+
+  r = m_journaler->get_cached_client(librbd::Journal<>::IMAGE_CLIENT_ID,
+                                     &master_client);
+  if (r < 0) {
+    derr << "error retrieving registered master client: "
+        << cpp_strerror(r) << dendl;
+  } else {
+    r = m_journaler->get_cached_client(m_mirror_uuid, &mirror_client);
+    if (r < 0) {
+      derr << "error retrieving registered mirror client: "
+          << cpp_strerror(r) << dendl;
+    }
+  }
+
+  if (!master_client.commit_position.object_positions.empty()) {
+    m_master_position =
+      *(master_client.commit_position.object_positions.begin());
+  }
+
+  if (!mirror_client.commit_position.object_positions.empty()) {
+    m_mirror_position =
+      *(mirror_client.commit_position.object_positions.begin());
+  }
+
+  if (!calculate_behind_master_or_send_update()) {
+    dout(20) << "need to update tag cache" << dendl;
+    return false;
+  }
+
+  format(description);
+
+  {
+    std::lock_guard locker{m_lock};
+    ceph_assert(m_on_finish == on_finish);
+    m_on_finish = nullptr;
+  }
+
+  on_finish->complete(-EEXIST);
+  return true;
+}
+
+template <typename I>
+bool ReplayStatusFormatter<I>::calculate_behind_master_or_send_update() {
+  dout(20) << "m_master_position=" << m_master_position
+          << ", m_mirror_position=" << m_mirror_position << dendl;
+
+  m_entries_behind_master = 0;
+
+  if (m_master_position == cls::journal::ObjectPosition() ||
+      m_master_position.tag_tid < m_mirror_position.tag_tid) {
+    return true;
+  }
+
+  cls::journal::ObjectPosition master = m_master_position;
+  uint64_t mirror_tag_tid = m_mirror_position.tag_tid;
+
+  while (master.tag_tid > mirror_tag_tid) {
+    auto tag_it = m_tag_cache.find(master.tag_tid);
+    if (tag_it == m_tag_cache.end()) {
+      send_update_tag_cache(master.tag_tid, mirror_tag_tid);
+      return false;
+    }
+    librbd::journal::TagData &tag_data = tag_it->second;
+    m_entries_behind_master += master.entry_tid;
+    master = {0, tag_data.predecessor.tag_tid, tag_data.predecessor.entry_tid};
+  }
+  if (master.tag_tid == mirror_tag_tid &&
+      master.entry_tid > m_mirror_position.entry_tid) {
+    m_entries_behind_master += master.entry_tid - m_mirror_position.entry_tid;
+  }
+
+  dout(20) << "clearing tags not needed any more (below mirror position)"
+          << dendl;
+
+  uint64_t tag_tid = mirror_tag_tid;
+  size_t old_size = m_tag_cache.size();
+  while (tag_tid != 0) {
+    auto tag_it = m_tag_cache.find(tag_tid);
+    if (tag_it == m_tag_cache.end()) {
+      break;
+    }
+    librbd::journal::TagData &tag_data = tag_it->second;
+
+    dout(20) << "erasing tag " <<  tag_data << "for tag_tid " << tag_tid
+            << dendl;
+
+    tag_tid = tag_data.predecessor.tag_tid;
+    m_tag_cache.erase(tag_it);
+  }
+
+  dout(20) << old_size - m_tag_cache.size() << " entries cleared" << dendl;
+
+  return true;
+}
+
+template <typename I>
+void ReplayStatusFormatter<I>::send_update_tag_cache(uint64_t master_tag_tid,
+                                                    uint64_t mirror_tag_tid) {
+  if (master_tag_tid <= mirror_tag_tid ||
+      m_tag_cache.find(master_tag_tid) != m_tag_cache.end()) {
+    Context *on_finish = nullptr;
+    {
+      std::lock_guard locker{m_lock};
+      std::swap(m_on_finish, on_finish);
+    }
+
+    ceph_assert(on_finish);
+    on_finish->complete(0);
+    return;
+  }
+
+  dout(20) << "master_tag_tid=" << master_tag_tid << ", mirror_tag_tid="
+          << mirror_tag_tid << dendl;
+
+  auto ctx = new LambdaContext(
+    [this, master_tag_tid, mirror_tag_tid](int r) {
+      handle_update_tag_cache(master_tag_tid, mirror_tag_tid, r);
+    });
+  m_journaler->get_tag(master_tag_tid, &m_tag, ctx);
+}
+
+template <typename I>
+void ReplayStatusFormatter<I>::handle_update_tag_cache(uint64_t master_tag_tid,
+                                                      uint64_t mirror_tag_tid,
+                                                      int r) {
+  librbd::journal::TagData tag_data;
+
+  if (r < 0) {
+    derr << "error retrieving tag " << master_tag_tid << ": " << cpp_strerror(r)
+        << dendl;
+  } else {
+    dout(20) << "retrieved tag " << master_tag_tid << ": " << m_tag << dendl;
+
+    auto it = m_tag.data.cbegin();
+    try {
+      decode(tag_data, it);
+    } catch (const buffer::error &err) {
+      derr << "error decoding tag " << master_tag_tid << ": " << err.what()
+          << dendl;
+    }
+  }
+
+  if (tag_data.predecessor.mirror_uuid !=
+        librbd::Journal<>::LOCAL_MIRROR_UUID &&
+      tag_data.predecessor.mirror_uuid !=
+        librbd::Journal<>::ORPHAN_MIRROR_UUID) {
+    dout(20) << "hit remote image non-primary epoch" << dendl;
+    tag_data.predecessor = {};
+  }
+
+  dout(20) << "decoded tag " << master_tag_tid << ": " << tag_data << dendl;
+
+  m_tag_cache[master_tag_tid] = tag_data;
+  send_update_tag_cache(tag_data.predecessor.tag_tid, mirror_tag_tid);
+}
+
+template <typename I>
+void ReplayStatusFormatter<I>::format(std::string *description) {
+
+  dout(20) << "m_master_position=" << m_master_position
+          << ", m_mirror_position=" << m_mirror_position
+          << ", m_entries_behind_master=" << m_entries_behind_master << dendl;
+
+  std::stringstream ss;
+  ss << "master_position=";
+  if (m_master_position == cls::journal::ObjectPosition()) {
+    ss << "[]";
+  } else {
+    ss << m_master_position;
+  }
+  ss << ", mirror_position=";
+  if (m_mirror_position == cls::journal::ObjectPosition()) {
+    ss << "[]";
+  } else {
+    ss << m_mirror_position;
+  }
+  ss << ", entries_behind_master="
+     << (m_entries_behind_master > 0 ? m_entries_behind_master : 0);
+
+  *description = ss.str();
+}
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::image_replayer::journal::ReplayStatusFormatter<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h b/src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h
new file mode 100644 (file)
index 0000000..5ba000a
--- /dev/null
@@ -0,0 +1,64 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H
+#define RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H
+
+#include "include/Context.h"
+#include "common/ceph_mutex.h"
+#include "cls/journal/cls_journal_types.h"
+#include "librbd/journal/Types.h"
+#include "librbd/journal/TypeTraits.h"
+
+namespace journal { class Journaler; }
+namespace librbd { class ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class ReplayStatusFormatter {
+public:
+  typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
+
+  static ReplayStatusFormatter* create(Journaler *journaler,
+                                      const std::string &mirror_uuid) {
+    return new ReplayStatusFormatter(journaler, mirror_uuid);
+  }
+
+  static void destroy(ReplayStatusFormatter* formatter) {
+    delete formatter;
+  }
+
+  ReplayStatusFormatter(Journaler *journaler, const std::string &mirror_uuid);
+
+  bool get_or_send_update(std::string *description, Context *on_finish);
+
+private:
+  Journaler *m_journaler;
+  std::string m_mirror_uuid;
+  ceph::mutex m_lock;
+  Context *m_on_finish = nullptr;
+  cls::journal::ObjectPosition m_master_position;
+  cls::journal::ObjectPosition m_mirror_position;
+  int m_entries_behind_master = 0;
+  cls::journal::Tag m_tag;
+  std::map<uint64_t, librbd::journal::TagData> m_tag_cache;
+
+  bool calculate_behind_master_or_send_update();
+  void send_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid);
+  void handle_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid,
+                              int r);
+  void format(std::string *description);
+};
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+extern template class rbd::mirror::image_replayer::journal::ReplayStatusFormatter<librbd::ImageCtx>;
+
+#endif // RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H