#include "ObjectCopyRequest.h"
#include "include/stringify.h"
#include "common/errno.h"
+#include "common/Timer.h"
#include "journal/Journaler.h"
#include "librbd/Utils.h"
#include "tools/rbd_mirror/ProgressContext.h"
m_client_meta(client_meta), m_sync_point(sync_point),
m_progress_ctx(progress_ctx),
m_lock(unique_lock_name("ImageCopyRequest::m_lock", this)),
+ m_updating_sync_point(false), m_update_sync_ctx(nullptr),
+ m_update_sync_point_interval(g_ceph_context->_conf->rbd_mirror_sync_point_update_age),
m_client_meta_copy(*client_meta) {
assert(!m_client_meta_copy.sync_points.empty());
}
}
}
complete = (m_current_ops == 0);
+
+ if (!complete) {
+ m_update_sync_ctx = new FunctionContext([this](int r) {
+ this->send_update_sync_point();
+ });
+ }
}
+
+ {
+ Mutex::Locker timer_locker(*m_timer_lock);
+ if (m_update_sync_ctx) {
+ m_timer->add_event_after(m_update_sync_point_interval,
+ m_update_sync_ctx);
+ }
+ }
+
if (complete) {
send_flush_sync_point();
}
update_progress("COPY_OBJECT " + stringify(percent) + "%", false);
if (complete) {
+ bool do_flush = true;
+ {
+ Mutex::Locker timer_locker(*m_timer_lock);
+ Mutex::Locker locker(m_lock);
+ if (!m_updating_sync_point) {
+ if (m_update_sync_ctx != nullptr) {
+ m_timer->cancel_event(m_update_sync_ctx);
+ m_update_sync_ctx = nullptr;
+ }
+ } else {
+ do_flush = false;
+ }
+ }
+
+ if (do_flush) {
+ send_flush_sync_point();
+ }
+ }
+}
+
+template <typename I>
+void ImageCopyRequest<I>::send_update_sync_point() {
+ Mutex::Locker l(m_lock);
+
+ m_update_sync_ctx = nullptr;
+
+ if (m_canceled || m_ret_val < 0 || m_current_ops == 0) {
+ return;
+ }
+
+ if (m_sync_point->object_number &&
+ (m_object_no-1) == m_sync_point->object_number.get()) {
+ // update sync point did not progress since last sync
+ return;
+ }
+
+ m_updating_sync_point = true;
+
+ m_client_meta_copy = *m_client_meta;
+ m_sync_point->object_number = m_object_no - 1;
+
+ CephContext *cct = m_local_image_ctx->cct;
+ ldout(cct, 20) << ": sync_point=" << *m_sync_point << dendl;
+
+ bufferlist client_data_bl;
+ librbd::journal::ClientData client_data(*m_client_meta);
+ ::encode(client_data, client_data_bl);
+
+ Context *ctx = create_context_callback<
+ ImageCopyRequest<I>, &ImageCopyRequest<I>::handle_update_sync_point>(
+ this);
+ m_journaler->update_client(client_data_bl, ctx);
+}
+
+template <typename I>
+void ImageCopyRequest<I>::handle_update_sync_point(int r) {
+ CephContext *cct = m_local_image_ctx->cct;
+ ldout(cct, 20) << ": r=" << r << dendl;
+
+ if (r < 0) {
+ *m_client_meta = m_client_meta_copy;
+ lderr(cct) << ": failed to update client data: " << cpp_strerror(r)
+ << dendl;
+ }
+
+ bool complete;
+ {
+ Mutex::Locker l(m_lock);
+ m_updating_sync_point = false;
+
+ complete = m_current_ops == 0 || m_canceled || m_ret_val < 0;
+
+ if (!complete) {
+ m_update_sync_ctx = new FunctionContext([this](int r) {
+ this->send_update_sync_point();
+ });
+ }
+ }
+
+ if (!complete) {
+ Mutex::Locker timer_lock(*m_timer_lock);
+ if (m_update_sync_ctx) {
+ m_timer->add_event_after(m_update_sync_point_interval,
+ m_update_sync_ctx);
+ }
+ } else {
send_flush_sync_point();
}
}