RGWObjVersionTracker& objv;
sync_deltas::SyncDeltaCountersManager sync_delta_counters_manager;
+ // timestamp of remote's most recent log entry. initialized only for data sync
+ ceph::real_time last_updated;
+
public:
RGWDataSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
const string& _marker_oid,
{"shard_id", std::to_string(shard_id)}});
}
- RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override {
+ bool start(const std::string& pos, int index_pos, const real_time& timestamp, const real_time& last_update = {}) {
+ if (last_updated < last_update) {
+ last_updated = last_update;
+ }
+ return RGWSyncShardMarkerTrack::start(pos, index_pos, timestamp);
+ }
+
+ RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
sync_marker.marker = new_marker;
sync_marker.pos = index_pos;
sync_marker.timestamp = timestamp;
// last_update is only modified during incremental sync we only want to
// report deltas for incremental sync
real_time zero_time;
- if (last_update != zero_time) {
- auto delta = last_update - timestamp;
+ if (last_updated != zero_time) {
+ auto delta = last_updated - timestamp;
sync_delta_counters_manager.tset(sync_deltas::l_rgw_datalog_sync_delta, delta);
}
tn->log(1, SSTR("failed to parse bucket shard: "
<< log_iter->entry.key));
marker_tracker->try_update_high_marker(log_iter->log_id, 0,
- log_iter->log_timestamp, last_update);
+ log_iter->log_timestamp);
continue;
}
if (!marker_tracker->start(log_iter->log_id, 0,
{}
- RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override {
+ RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
sync_status.full.position = new_marker;
sync_status.full.count = index_pos;
const rgw_raw_obj& get_obj() const { return obj; }
- RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override {
+ RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
sync_marker.position = new_marker;
sync_marker.timestamp = timestamp;
struct marker_entry {
uint64_t pos;
real_time timestamp;
- real_time last_update;
marker_entry() : pos(0) {}
- marker_entry(uint64_t _p, const real_time& _ts, const real_time& _lu = {}) : pos(_p), timestamp(_ts), last_update(_lu) {}
+ marker_entry(uint64_t _p, const real_time& _ts) : pos(_p), timestamp(_ts) {}
};
typename std::map<T, marker_entry> pending;
protected:
typename std::set<K> need_retry_set;
- virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) = 0;
+ virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0;
virtual RGWOrderCallCR *allocate_order_control_cr() = 0;
virtual void handle_finish(const T& marker) { }
}
}
- bool start(const T& pos, int index_pos, const real_time& timestamp, const real_time& last_update = {}) {
+ bool start(const T& pos, int index_pos, const real_time& timestamp) {
if (pending.find(pos) != pending.end()) {
return false;
}
- pending[pos] = marker_entry(index_pos, timestamp, last_update);
+ pending[pos] = marker_entry(index_pos, timestamp);
return true;
}
- void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp, const real_time& last_update = {}) {
- finish_markers[pos] = marker_entry(index_pos, timestamp, last_update);
+ void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp) {
+ finish_markers[pos] = marker_entry(index_pos, timestamp);
}
RGWCoroutine *finish(const T& pos) {
--i;
const T& high_marker = i->first;
marker_entry& high_entry = i->second;
- RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp, high_entry.last_update));
+ RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp));
finish_markers.erase(finish_markers.begin(), last);
return cr;
}