ProgressContext(Replayer* replayer) : replayer(replayer) {
}
- int update_progress(uint64_t offset, uint64_t total) override {
- replayer->handle_copy_image_progress(offset, total);
+ int update_progress(uint64_t object_number, uint64_t object_count) override {
+ replayer->handle_copy_image_progress(object_number, object_count);
return 0;
}
};
}
template <typename I>
-void Replayer<I>::handle_copy_image_progress(uint64_t offset, uint64_t total) {
- dout(10) << "offset=" << offset << ", total=" << total << dendl;
+void Replayer<I>::handle_copy_image_progress(uint64_t object_number,
+ uint64_t object_count) {
+ dout(10) << "object_number=" << object_number << ", "
+ << "object_count=" << object_count << dendl;
- // TODO
+ std::unique_lock locker{m_lock};
+ m_local_mirror_snap_ns.last_copied_object_number = object_number;
+
+ update_non_primary_snapshot(false);
}
template <typename I>
return;
}
+ std::unique_lock locker{m_lock};
update_non_primary_snapshot(true);
}
template <typename I>
void Replayer<I>::update_non_primary_snapshot(bool complete) {
- dout(10) << dendl;
-
- if (complete) {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ if (!complete) {
+ // disallow two in-flight updates if this isn't the completion of the sync
+ if (m_updating_sync_point) {
+ return;
+ }
+ m_updating_sync_point = true;
+ } else {
m_local_mirror_snap_ns.complete = true;
}
+ dout(10) << dendl;
+
librados::ObjectWriteOperation op;
librbd::cls_client::mirror_image_snapshot_set_copy_progress(
&op, m_local_snap_id_end, m_local_mirror_snap_ns.complete,
m_local_mirror_snap_ns.last_copied_object_number);
- auto ctx = new LambdaContext([this, complete](int r) {
+ auto ctx = new C_TrackedOp(this, new LambdaContext([this, complete](int r) {
handle_update_non_primary_snapshot(complete, r);
- });
+ }));
auto aio_comp = create_rados_callback(ctx);
int r = m_state_builder->local_image_ctx->md_ctx.aio_operate(
m_state_builder->local_image_ctx->header_oid, aio_comp, &op);
if (r < 0) {
derr << "failed to update local snapshot progress: " << cpp_strerror(r)
<< dendl;
- handle_replay_complete(r, "failed to update local snapshot progress");
+ if (complete) {
+ // only fail if this was the final update
+ handle_replay_complete(r, "failed to update local snapshot progress");
+ return;
+ }
+ }
+
+ if (!complete) {
+ // periodic sync-point update -- do not advance state machine
+ std::unique_lock locker{m_lock};
+
+ ceph_assert(m_updating_sync_point);
+ m_updating_sync_point = false;
return;
}
ProgressContext* m_progress_ctx = nullptr;
bool m_remote_image_updated = false;
+ bool m_updating_sync_point = false;
void refresh_local_image();
void handle_refresh_local_image(int r);
void copy_image();
void handle_copy_image(int r);
- void handle_copy_image_progress(uint64_t offset, uint64_t total);
+ void handle_copy_image_progress(uint64_t object_number,
+ uint64_t object_count);
void apply_image_state();
void handle_apply_image_state(int r);