]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: image-sync: Periodically update sync point object number
authorRicardo Dias <rdias@suse.com>
Thu, 2 Jun 2016 09:04:41 +0000 (10:04 +0100)
committerRicardo Dias <rdias@suse.com>
Mon, 27 Jun 2016 13:51:34 +0000 (14:51 +0100)
Fixes: http://tracker.ceph.com/issues/15108
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/common/config_opts.h
src/tools/rbd_mirror/image_sync/ImageCopyRequest.cc
src/tools/rbd_mirror/image_sync/ImageCopyRequest.h

index 10d6a6b983cf6e5171b07e0de5c157c8c6681a24..c1b94e22034b03651805a79c6234b07d2bd22458 100644 (file)
@@ -1234,6 +1234,11 @@ OPTION(rbd_journal_object_flush_bytes, OPT_INT, 0) // maximum number of pending
 OPTION(rbd_journal_object_flush_age, OPT_DOUBLE, 0) // maximum age (in seconds) for pending commits
 OPTION(rbd_journal_pool, OPT_STR, "") // pool for journal objects
 
+/**
+ * RBD Mirror options
+ */
+OPTION(rbd_mirror_sync_point_update_age, OPT_DOUBLE, 30) // number of seconds between each update of the image sync point object number
+
 OPTION(nss_db_path, OPT_STR, "") // path to nss db
 
 
index e1845e4b97d246939fd16a8bb79426a8f7439da7..336f114c41e4083ff1ad0b41aecedc8a0cee4879 100644 (file)
@@ -5,6 +5,7 @@
 #include "ObjectCopyRequest.h"
 #include "include/stringify.h"
 #include "common/errno.h"
+#include "common/Timer.h"
 #include "journal/Journaler.h"
 #include "librbd/Utils.h"
 #include "tools/rbd_mirror/ProgressContext.h"
@@ -36,6 +37,8 @@ ImageCopyRequest<I>::ImageCopyRequest(I *local_image_ctx, I *remote_image_ctx,
     m_client_meta(client_meta), m_sync_point(sync_point),
     m_progress_ctx(progress_ctx),
     m_lock(unique_lock_name("ImageCopyRequest::m_lock", this)),
+    m_updating_sync_point(false), m_update_sync_ctx(nullptr),
+    m_update_sync_point_interval(g_ceph_context->_conf->rbd_mirror_sync_point_update_age),
     m_client_meta_copy(*client_meta) {
   assert(!m_client_meta_copy.sync_points.empty());
 }
@@ -145,7 +148,22 @@ void ImageCopyRequest<I>::send_object_copies() {
       }
     }
     complete = (m_current_ops == 0);
+
+    if (!complete) {
+      m_update_sync_ctx = new FunctionContext([this](int r) {
+          this->send_update_sync_point();
+      });
+    }
   }
+
+  {
+    Mutex::Locker timer_locker(*m_timer_lock);
+    if (m_update_sync_ctx) {
+      m_timer->add_event_after(m_update_sync_point_interval,
+                               m_update_sync_ctx);
+    }
+  }
+
   if (complete) {
     send_flush_sync_point();
   }
@@ -204,6 +222,92 @@ void ImageCopyRequest<I>::handle_object_copy(int r) {
   update_progress("COPY_OBJECT " + stringify(percent) + "%", false);
 
   if (complete) {
+    bool do_flush = true;
+    {
+      Mutex::Locker timer_locker(*m_timer_lock);
+      Mutex::Locker locker(m_lock);
+      if (!m_updating_sync_point) {
+        if (m_update_sync_ctx != nullptr) {
+          m_timer->cancel_event(m_update_sync_ctx);
+          m_update_sync_ctx = nullptr;
+        }
+      } else {
+        do_flush = false;
+      }
+    }
+
+    if (do_flush) {
+      send_flush_sync_point();
+    }
+  }
+}
+
+template <typename I>
+void ImageCopyRequest<I>::send_update_sync_point() {
+  Mutex::Locker l(m_lock);
+
+  m_update_sync_ctx = nullptr;
+
+  if (m_canceled || m_ret_val < 0 || m_current_ops == 0) {
+    return;
+  }
+
+  if (m_sync_point->object_number &&
+      (m_object_no-1) == m_sync_point->object_number.get()) {
+    // update sync point did not progress since last sync
+    return;
+  }
+
+  m_updating_sync_point = true;
+
+  m_client_meta_copy = *m_client_meta;
+  m_sync_point->object_number = m_object_no - 1;
+
+  CephContext *cct = m_local_image_ctx->cct;
+  ldout(cct, 20) << ": sync_point=" << *m_sync_point << dendl;
+
+  bufferlist client_data_bl;
+  librbd::journal::ClientData client_data(*m_client_meta);
+  ::encode(client_data, client_data_bl);
+
+  Context *ctx = create_context_callback<
+    ImageCopyRequest<I>, &ImageCopyRequest<I>::handle_update_sync_point>(
+      this);
+  m_journaler->update_client(client_data_bl, ctx);
+}
+
+template <typename I>
+void ImageCopyRequest<I>::handle_update_sync_point(int r) {
+  CephContext *cct = m_local_image_ctx->cct;
+  ldout(cct, 20) << ": r=" << r << dendl;
+
+  if (r < 0) {
+    *m_client_meta = m_client_meta_copy;
+    lderr(cct) << ": failed to update client data: " << cpp_strerror(r)
+               << dendl;
+  }
+
+  bool complete;
+  {
+    Mutex::Locker l(m_lock);
+    m_updating_sync_point = false;
+
+    complete = m_current_ops == 0 || m_canceled || m_ret_val < 0;
+
+    if (!complete) {
+      m_update_sync_ctx = new FunctionContext([this](int r) {
+          this->send_update_sync_point();
+      });
+    }
+  }
+
+  if (!complete) {
+    Mutex::Locker timer_lock(*m_timer_lock);
+    if (m_update_sync_ctx) {
+      m_timer->add_event_after(m_update_sync_point_interval,
+                               m_update_sync_ctx);
+    }
+  } else {
     send_flush_sync_point();
   }
 }
index 118b48d80235224ce82fa531ec4c375ee811275f..85dfe9b28316f96b491ac6a234832c951d7c3efd 100644 (file)
@@ -100,6 +100,10 @@ private:
   uint64_t m_current_ops = 0;
   int m_ret_val = 0;
 
+  bool m_updating_sync_point;
+  Context *m_update_sync_ctx;
+  double m_update_sync_point_interval;
+
   MirrorPeerClientMeta m_client_meta_copy;
 
   void send_update_max_object_count();
@@ -109,6 +113,9 @@ private:
   void send_next_object_copy();
   void handle_object_copy(int r);
 
+  void send_update_sync_point();
+  void handle_update_sync_point(int r);
+
   void send_flush_sync_point();
   void handle_flush_sync_point(int r);