]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: in replay status store number of entries behind master
authorMykola Golub <mgolub@mirantis.com>
Thu, 21 Apr 2016 06:52:47 +0000 (09:52 +0300)
committerAbhishek Varshney <abhishek.varshney@flipkart.com>
Mon, 2 May 2016 06:35:58 +0000 (12:05 +0530)
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
(cherry picked from commit e4229d1d4b163f1003e68fef1db63f796393ba23)

src/test/rbd_mirror/test_mock_ImageReplayer.cc
src/tools/Makefile-client.am
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h
src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.cc [new file with mode: 0644]
src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h [new file with mode: 0644]

index cf19c6747d47341ec58a171f80b1d3134c83f789..3233b48df723ed52ca30812270e4fe69438a970d 100644 (file)
@@ -125,9 +125,28 @@ struct OpenLocalImageRequest<librbd::MockImageReplayerImageCtx> {
   MOCK_METHOD0(send, void());
 };
 
+template<>
+struct ReplayStatusFormatter<librbd::MockImageReplayerImageCtx> {
+  static ReplayStatusFormatter* s_instance;
+
+  static ReplayStatusFormatter* create(::journal::MockJournalerProxy *journaler,
+                                      const std::string &mirror_uuid) {
+    assert(s_instance != nullptr);
+    return s_instance;
+  }
+
+  ReplayStatusFormatter() {
+    assert(s_instance == nullptr);
+    s_instance = this;
+  }
+
+  MOCK_METHOD2(get_or_send_update, bool(std::string *description, Context *on_finish));
+};
+
 BootstrapRequest<librbd::MockImageReplayerImageCtx>* BootstrapRequest<librbd::MockImageReplayerImageCtx>::s_instance = nullptr;
 CloseImageRequest<librbd::MockImageReplayerImageCtx>* CloseImageRequest<librbd::MockImageReplayerImageCtx>::s_instance = nullptr;
 OpenLocalImageRequest<librbd::MockImageReplayerImageCtx>* OpenLocalImageRequest<librbd::MockImageReplayerImageCtx>::s_instance = nullptr;
+ReplayStatusFormatter<librbd::MockImageReplayerImageCtx>* ReplayStatusFormatter<librbd::MockImageReplayerImageCtx>::s_instance = nullptr;
 
 } // namespace image_replayer
 } // namespace mirror
index 68614e2232959e87d5f45663bd0032de200c3d66..a1b2e95675d728d6f6040e6b5a9a3ec27dc3718d 100644 (file)
@@ -100,6 +100,7 @@ librbd_mirror_internal_la_SOURCES = \
        tools/rbd_mirror/image_replayer/BootstrapRequest.cc \
        tools/rbd_mirror/image_replayer/CloseImageRequest.cc \
        tools/rbd_mirror/image_replayer/OpenLocalImageRequest.cc \
+       tools/rbd_mirror/image_replayer/ReplayStatusFormatter.cc \
        tools/rbd_mirror/image_sync/ImageCopyRequest.cc \
        tools/rbd_mirror/image_sync/ObjectCopyRequest.cc \
        tools/rbd_mirror/image_sync/SnapshotCopyRequest.cc \
@@ -119,6 +120,7 @@ noinst_HEADERS += \
        tools/rbd_mirror/image_replayer/BootstrapRequest.h \
        tools/rbd_mirror/image_replayer/CloseImageRequest.h \
        tools/rbd_mirror/image_replayer/OpenLocalImageRequest.h \
+       tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h \
        tools/rbd_mirror/image_sync/ImageCopyRequest.h \
        tools/rbd_mirror/image_sync/ObjectCopyRequest.h \
        tools/rbd_mirror/image_sync/SnapshotCopyRequest.h \
index 698eefb7fb65a0933a7aeaee186cef7939f2b9ad..65dde98c0d764d864901ccda929bd6e4a0d6fea1 100644 (file)
@@ -23,6 +23,7 @@
 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
 #include "tools/rbd_mirror/image_replayer/OpenLocalImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
 
 #define dout_subsys ceph_subsys_rbd_mirror
 #undef dout_prefix
@@ -50,8 +51,7 @@ template <typename I>
 struct ReplayHandler : public ::journal::ReplayHandler {
   ImageReplayer<I> *replayer;
   ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
-
-  virtual void get() {}
+virtual void get() {}
   virtual void put() {}
 
   virtual void handle_entries_available() {
@@ -202,6 +202,7 @@ ImageReplayer<I>::ImageReplayer(Threads *threads, RadosRef local, RadosRef remot
 template <typename I>
 ImageReplayer<I>::~ImageReplayer()
 {
+  assert(m_replay_status_formatter == nullptr);
   assert(m_local_image_ctx == nullptr);
   assert(m_local_replay == nullptr);
   assert(m_remote_journaler == nullptr);
@@ -268,7 +269,6 @@ void ImageReplayer<I>::start(Context *on_finish,
                                     &m_threads->timer_lock, m_remote_ioctx,
                                     m_remote_image_id, m_local_mirror_uuid,
                                      commit_interval);
-
   bootstrap();
 }
 
@@ -379,6 +379,8 @@ void ImageReplayer<I>::start_replay() {
     std::swap(m_on_start_finish, on_finish);
   }
 
+  m_replay_status_formatter =
+    ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
   update_mirror_image_status();
   reschedule_update_status_task(30);
 
@@ -584,6 +586,9 @@ void ImageReplayer<I>::on_stop_local_image_close_finish(int r)
 
   m_local_ioctx.close();
 
+  delete m_replay_status_formatter;
+  m_replay_status_formatter = nullptr;
+
   m_remote_journaler->stop_replay();
   m_remote_journaler->shut_down();
   delete m_remote_journaler;
@@ -924,16 +929,24 @@ void ImageReplayer<I>::shut_down_journal_replay(bool cancel_ops)
 }
 
 template <typename I>
-void ImageReplayer<I>::update_mirror_image_status(bool final)
+void ImageReplayer<I>::update_mirror_image_status(bool final,
+                                                 State expected_state)
 {
-  dout(20) << dendl;
+  dout(20) << "final=" << final << ", expected_state=" << expected_state
+          << dendl;
 
   cls::rbd::MirrorImageStatus status;
 
   {
     Mutex::Locker locker(m_lock);
 
+    assert(!final || !is_running_());
+
     if (!final) {
+      if (expected_state != STATE_UNKNOWN && expected_state != m_state) {
+       dout(20) << "state changed" << dendl;
+       return;
+      }
       if (m_update_status_comp) {
        dout(20) << "already sending update" << dendl;
        m_update_status_pending = true;
@@ -956,7 +969,7 @@ void ImageReplayer<I>::update_mirror_image_status(bool final)
          if (comp) {
            comp->release();
          }
-         if (pending && r == 0) {
+         if (pending && r == 0 && is_running_()) {
            update_mirror_image_status();
          }
        });
@@ -987,7 +1000,6 @@ void ImageReplayer<I>::update_mirror_image_status(bool final)
       break;
     case STATE_REPLAYING:
       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
-      status.description = get_replay_status_description();
       break;
     case STATE_STOPPING:
       if (m_local_image_ctx) {
@@ -1010,6 +1022,28 @@ void ImageReplayer<I>::update_mirror_image_status(bool final)
     }
   }
 
+  if (status.state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING) {
+    Context *on_req_finish = new FunctionContext(
+      [this](int r) {
+       if (r == 0) {
+         librados::AioCompletion *comp = nullptr;
+         {
+           Mutex::Locker locker(m_lock);
+           std::swap(m_update_status_comp, comp);
+         }
+         if (comp) {
+           comp->release();
+         }
+         update_mirror_image_status(false, STATE_REPLAYING);
+       }
+      });
+    std::string desc;
+    if (!m_replay_status_formatter->get_or_send_update(&desc, on_req_finish)) {
+      return;
+    }
+    status.description = "replaying, " + desc;
+  }
+
   dout(20) << "status=" << status << dendl;
 
   librados::ObjectWriteOperation op;
@@ -1081,46 +1115,6 @@ void ImageReplayer<I>::start_update_status_task()
   m_threads->work_queue->queue(ctx, 0);
 }
 
-template <typename I>
-std::string ImageReplayer<I>::get_replay_status_description() {
-  assert(m_lock.is_locked());
-  assert(m_state == STATE_REPLAYING);
-
-  std::stringstream ss;
-  ss << "replaying";
-
-  cls::journal::Client master;
-  int r = m_remote_journaler->get_cached_client(
-    librbd::Journal<>::IMAGE_CLIENT_ID, &master);
-  if (r == 0) {
-    ss << ", master_position=";
-    cls::journal::ObjectPositions &object_positions =
-      master.commit_position.object_positions;
-
-    if (object_positions.begin() != object_positions.end()) {
-      ss << *(object_positions.begin());
-    } else {
-      ss << "[]";
-    }
-  }
-
-  cls::journal::Client mirror;
-  r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &mirror);
-  if (r == 0) {
-    ss << ", mirror_position=";
-    cls::journal::ObjectPositions &object_positions =
-      mirror.commit_position.object_positions;
-
-    if (object_positions.begin() != object_positions.end()) {
-      ss << *(object_positions.begin());
-    } else {
-      ss << "[]";
-    }
-  }
-
-  return ss.str();
-}
-
 template <typename I>
 std::string ImageReplayer<I>::to_string(const State state) {
   switch (state) {
index 01cbf85bdbad03e88af6c2344a46173a0d9b9158..9cff9bba44638ec2c05010bfa6010cfecf8e5d70 100644 (file)
@@ -40,6 +40,8 @@ namespace mirror {
 
 struct Threads;
 
+namespace image_replayer { template <typename> class ReplayStatusFormatter; }
+
 /**
  * Replays changes from a remote cluster for a single image.
  */
@@ -49,6 +51,7 @@ public:
   typedef typename librbd::journal::TypeTraits<ImageCtxT>::ReplayEntry ReplayEntry;
 
   enum State {
+    STATE_UNKNOWN,
     STATE_UNINITIALIZED,
     STATE_STARTING,
     STATE_REPLAYING,
@@ -208,6 +211,8 @@ private:
   int m_last_r = 0;
   std::string m_state_desc;
   BootstrapProgressContext m_progress_cxt;
+  image_replayer::ReplayStatusFormatter<ImageCtxT> *m_replay_status_formatter =
+    nullptr;
   librados::IoCtx m_local_ioctx, m_remote_ioctx;
   ImageCtxT *m_local_image_ctx = nullptr;
   librbd::journal::Replay<ImageCtxT> *m_local_replay = nullptr;
@@ -254,12 +259,11 @@ private:
 
   void shut_down_journal_replay(bool cancel_ops);
 
-  void update_mirror_image_status(bool final = false);
+  void update_mirror_image_status(bool final = false,
+                                 State expected_state = STATE_UNKNOWN);
   void reschedule_update_status_task(int new_interval = 0);
   void start_update_status_task();
 
-  std::string get_replay_status_description();
-
   void bootstrap();
   void handle_bootstrap(int r);
 
diff --git a/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.cc b/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.cc
new file mode 100644 (file)
index 0000000..303fb58
--- /dev/null
@@ -0,0 +1,240 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ReplayStatusFormatter.h"
+#include "common/debug.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "journal/Journaler.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Journal.h"
+#include "librbd/Utils.h"
+
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::image_replayer::ReplayStatusFormatter: " \
+    << this << " " << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+
+using librbd::util::unique_lock_name;
+
+template <typename I>
+ReplayStatusFormatter<I>::ReplayStatusFormatter(Journaler *journaler,
+                                               const std::string &mirror_uuid)
+  : m_journaler(journaler),
+    m_mirror_uuid(mirror_uuid),
+    m_lock(unique_lock_name("ReplayStatusFormatter::m_lock", this)) {
+}
+
+template <typename I>
+bool ReplayStatusFormatter<I>::get_or_send_update(std::string *description,
+                                                 Context *on_finish) {
+  dout(20) << dendl;
+
+  bool in_progress = false;
+  {
+    Mutex::Locker locker(m_lock);
+    if (m_on_finish) {
+      in_progress = true;
+    } else {
+      m_on_finish = on_finish;
+    }
+  }
+
+  if (in_progress) {
+    dout(10) << "previous request is still in progress, ignoring" << dendl;
+    on_finish->complete(-EAGAIN);
+    return false;
+  }
+
+  m_master_position = cls::journal::ObjectPosition();
+  m_mirror_position = cls::journal::ObjectPosition();
+
+  cls::journal::Client master_client, mirror_client;
+  int r;
+
+  r = m_journaler->get_cached_client(librbd::Journal<>::IMAGE_CLIENT_ID,
+                                     &master_client);
+  if (r < 0) {
+    derr << "error retrieving registered master client: "
+        << cpp_strerror(r) << dendl;
+  } else {
+    r = m_journaler->get_cached_client(m_mirror_uuid, &mirror_client);
+    if (r < 0) {
+      derr << "error retrieving registered mirror client: "
+          << cpp_strerror(r) << dendl;
+    }
+  }
+
+  if (!master_client.commit_position.object_positions.empty()) {
+    m_master_position =
+      *(master_client.commit_position.object_positions.begin());
+  }
+
+  if (!mirror_client.commit_position.object_positions.empty()) {
+    m_mirror_position =
+      *(mirror_client.commit_position.object_positions.begin());
+  }
+
+  if (!calculate_behind_master_or_send_update()) {
+    dout(20) << "need to update tag cache" << dendl;
+    return false;
+  }
+
+  format(description);
+
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_on_finish == on_finish);
+    m_on_finish = nullptr;
+  }
+
+  on_finish->complete(-EEXIST);
+  return true;
+}
+
+template <typename I>
+bool ReplayStatusFormatter<I>::calculate_behind_master_or_send_update() {
+  dout(20) << "m_master_position=" << m_master_position
+          << ", m_mirror_position=" << m_mirror_position << dendl;
+
+  m_entries_behind_master = 0;
+
+  if (m_master_position == cls::journal::ObjectPosition()) {
+    return true;
+  }
+
+  cls::journal::ObjectPosition master = m_master_position;
+  uint64_t mirror_tag_tid = m_mirror_position.tag_tid;
+
+  while (master.tag_tid != mirror_tag_tid) {
+    auto tag_it = m_tag_cache.find(master.tag_tid);
+    if (tag_it == m_tag_cache.end()) {
+      send_update_tag_cache(master.tag_tid, mirror_tag_tid);
+      return false;
+    }
+    librbd::journal::TagData &tag_data = tag_it->second;
+    m_entries_behind_master += master.entry_tid;
+    master = cls::journal::ObjectPosition(0, tag_data.predecessor_tag_tid,
+                                         tag_data.predecessor_entry_tid);
+  }
+  m_entries_behind_master += master.entry_tid - m_mirror_position.entry_tid;
+
+  dout(20) << "clearing tags not needed any more (below mirror position)"
+          << dendl;
+
+  uint64_t tag_tid = mirror_tag_tid;
+  size_t old_size = m_tag_cache.size();
+  while (tag_tid != 0) {
+    auto tag_it = m_tag_cache.find(tag_tid);
+    if (tag_it == m_tag_cache.end()) {
+      break;
+    }
+    librbd::journal::TagData &tag_data = tag_it->second;
+
+    dout(20) << "erasing tag " <<  tag_data << "for tag_tid " << tag_tid
+            << dendl;
+
+    tag_tid = tag_data.predecessor_tag_tid;
+    m_tag_cache.erase(tag_it);
+  }
+
+  dout(20) << old_size - m_tag_cache.size() << " entries cleared" << dendl;
+
+  return true;
+}
+
+template <typename I>
+void ReplayStatusFormatter<I>::send_update_tag_cache(uint64_t master_tag_tid,
+                                                    uint64_t mirror_tag_tid) {
+
+  dout(20) << "master_tag_tid=" << master_tag_tid << ", mirror_tag_tid="
+          << mirror_tag_tid << dendl;
+
+  if (master_tag_tid == mirror_tag_tid) {
+    Context *on_finish = nullptr;
+    {
+      Mutex::Locker locker(m_lock);
+      std::swap(m_on_finish, on_finish);
+    }
+
+    assert(on_finish);
+    on_finish->complete(0);
+    return;
+  }
+
+  FunctionContext *ctx = new FunctionContext(
+    [this, master_tag_tid, mirror_tag_tid](int r) {
+      handle_update_tag_cache(master_tag_tid, mirror_tag_tid, r);
+    });
+  m_journaler->get_tag(master_tag_tid, &m_tag, ctx);
+}
+
+template <typename I>
+void ReplayStatusFormatter<I>::handle_update_tag_cache(uint64_t master_tag_tid,
+                                                      uint64_t mirror_tag_tid,
+                                                      int r) {
+  librbd::journal::TagData tag_data;
+
+  if (r < 0) {
+    derr << "error retrieving tag " << master_tag_tid << ": " << cpp_strerror(r)
+        << dendl;
+  } else {
+    dout(20) << "retrieved tag " << master_tag_tid << ": " << m_tag << dendl;
+
+    bufferlist::iterator it = m_tag.data.begin();
+    try {
+      ::decode(tag_data, it);
+    } catch (const buffer::error &err) {
+      derr << "error decoding tag " << master_tag_tid << ": " << err.what()
+          << dendl;
+    }
+  }
+
+  if (tag_data.predecessor_tag_tid == 0) {
+    // We failed. Don't consider this fatal, just terminate retrieving.
+    dout(20) << "making fake tag" << dendl;
+    tag_data.predecessor_tag_tid = mirror_tag_tid;
+  }
+
+  dout(20) << "decoded tag " << master_tag_tid << ": " << tag_data << dendl;
+
+  m_tag_cache.insert(std::make_pair(master_tag_tid, tag_data));
+  send_update_tag_cache(tag_data.predecessor_tag_tid, mirror_tag_tid);
+}
+
+template <typename I>
+void ReplayStatusFormatter<I>::format(std::string *description) {
+
+  dout(20) << "m_master_position=" << m_master_position
+          << ", m_mirror_position=" << m_mirror_position
+          << ", m_entries_behind_master=" << m_entries_behind_master << dendl;
+
+  std::stringstream ss;
+  ss << "master_position=";
+  if (m_master_position == cls::journal::ObjectPosition()) {
+    ss << "[]";
+  } else {
+    ss << m_master_position;
+  }
+  ss << ", mirror_position=";
+  if (m_mirror_position == cls::journal::ObjectPosition()) {
+    ss << "[]";
+  } else {
+    ss << m_mirror_position;
+  }
+  ss << ", entries_behind_master="
+     << (m_entries_behind_master > 0 ? m_entries_behind_master : 0);
+
+  *description = ss.str();
+}
+
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+template class
+rbd::mirror::image_replayer::ReplayStatusFormatter<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h b/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h
new file mode 100644 (file)
index 0000000..00d7a05
--- /dev/null
@@ -0,0 +1,56 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H
+#define RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H
+
+#include "include/Context.h"
+#include "common/Mutex.h"
+#include "cls/journal/cls_journal_types.h"
+#include "librbd/journal/Types.h"
+#include "librbd/journal/TypeTraits.h"
+
+namespace journal { class Journaler; }
+namespace librbd { class ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class ReplayStatusFormatter {
+public:
+  typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
+
+  static ReplayStatusFormatter* create(Journaler *journaler,
+                                      const std::string &mirror_uuid) {
+    return new ReplayStatusFormatter(journaler, mirror_uuid);
+  }
+
+  ReplayStatusFormatter(Journaler *journaler, const std::string &mirror_uuid);
+
+  bool get_or_send_update(std::string *description, Context *on_finish);
+
+private:
+  Journaler *m_journaler;
+  std::string m_mirror_uuid;
+  Mutex m_lock;
+  Context *m_on_finish = nullptr;
+  cls::journal::ObjectPosition m_master_position;
+  cls::journal::ObjectPosition m_mirror_position;
+  int m_entries_behind_master = 0;
+  cls::journal::Tag m_tag;
+  std::map<uint64_t, librbd::journal::TagData> m_tag_cache;
+
+  bool calculate_behind_master_or_send_update();
+  void send_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid);
+  void handle_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid,
+                              int r);
+  void format(std::string *description);
+};
+
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+#endif // RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H