]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/sync: track last_update timestamp per-shard instead of per-entry 61418/head
authorCasey Bodley <cbodley@redhat.com>
Tue, 21 May 2024 15:03:04 +0000 (11:03 -0400)
committerCasey Bodley <cbodley@redhat.com>
Wed, 29 Jan 2025 17:12:21 +0000 (12:12 -0500)
this way, store_entry() always uses the latest value from the remote
instead of the value from when we fetched the given datalog entry

since this last_update timestamp only applies to data incremental sync,
all related logic was moved out of the base class into RGWDataSyncShardMarkerTrack

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/driver/rados/rgw_data_sync.cc
src/rgw/driver/rados/rgw_sync.cc
src/rgw/driver/rados/rgw_sync.h

index 5972b3bd55ee5ade78bd276c734f9fd883e88478..ebb2f0c68ddd061c071a3bffe42ae2e66fc04b60 100644 (file)
@@ -1116,6 +1116,9 @@ class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, strin
   RGWObjVersionTracker& objv;
   sync_deltas::SyncDeltaCountersManager sync_delta_counters_manager;
 
+  // timestamp of remote's most recent log entry. initialized only for data sync
+  ceph::real_time last_updated;
+
 public:
   RGWDataSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
                          const string& _marker_oid,
@@ -1138,7 +1141,14 @@ public:
         {"shard_id", std::to_string(shard_id)}});
   }
 
-  RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override {
+  bool start(const std::string& pos, int index_pos, const real_time& timestamp, const real_time& last_update = {}) {
+    if (last_updated < last_update) {
+      last_updated = last_update;
+    }
+    return RGWSyncShardMarkerTrack::start(pos, index_pos, timestamp);
+  }
+
+  RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
     sync_marker.marker = new_marker;
     sync_marker.pos = index_pos;
     sync_marker.timestamp = timestamp;
@@ -1147,8 +1157,8 @@ public:
     // last_update is only modified during incremental sync we only want to
     // report deltas for incremental sync
     real_time zero_time;
-    if (last_update != zero_time) {
-      auto delta = last_update - timestamp;
+    if (last_updated != zero_time) {
+      auto delta = last_updated - timestamp;
       sync_delta_counters_manager.tset(sync_deltas::l_rgw_datalog_sync_delta, delta);
     }
 
@@ -2128,7 +2138,7 @@ public:
             tn->log(1, SSTR("failed to parse bucket shard: "
                            << log_iter->entry.key));
             marker_tracker->try_update_high_marker(log_iter->log_id, 0,
-                                                  log_iter->log_timestamp, last_update);
+                                                  log_iter->log_timestamp);
             continue;
           }
           if (!marker_tracker->start(log_iter->log_id, 0,
@@ -4252,7 +4262,7 @@ public:
   {}
 
 
-  RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override {
+  RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
     sync_status.full.position = new_marker;
     sync_status.full.count = index_pos;
 
@@ -4353,7 +4363,7 @@ public:
 
   const rgw_raw_obj& get_obj() const { return obj; }
 
-  RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override {
+  RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
     sync_marker.position = new_marker;
     sync_marker.timestamp = timestamp;
 
index aeaeb52188c1e3ab5a1500edb8677dfe3a40e7e4..bd730dfd6c25996ddc0986969dea65b97cdd112b 100644 (file)
@@ -1256,7 +1256,7 @@ public:
                                                                 sync_marker(_marker),
                                                                 tn(_tn){}
 
-  RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override {
+  RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
     sync_marker.marker = new_marker;
     if (index_pos > 0) {
       sync_marker.pos = index_pos;
index 942ea444efe115f9a77d61a559c9d8134ca415cc..f0ee28056afa73e902418f1bb82178a376c8e31c 100644 (file)
@@ -343,10 +343,9 @@ class RGWSyncShardMarkerTrack {
   struct marker_entry {
     uint64_t pos;
     real_time timestamp;
-    real_time last_update;
 
     marker_entry() : pos(0) {}
-    marker_entry(uint64_t _p, const real_time& _ts, const real_time& _lu = {}) : pos(_p), timestamp(_ts), last_update(_lu) {}
+    marker_entry(uint64_t _p, const real_time& _ts) : pos(_p), timestamp(_ts) {}
   };
   typename std::map<T, marker_entry> pending;
 
@@ -360,7 +359,7 @@ class RGWSyncShardMarkerTrack {
 protected:
   typename std::set<K> need_retry_set;
 
-  virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) = 0;
+  virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0;
   virtual RGWOrderCallCR *allocate_order_control_cr() = 0;
   virtual void handle_finish(const T& marker) { }
 
@@ -372,16 +371,16 @@ public:
     }
   }
 
-  bool start(const T& pos, int index_pos, const real_time& timestamp, const real_time& last_update = {}) {
+  bool start(const T& pos, int index_pos, const real_time& timestamp) {
     if (pending.find(pos) != pending.end()) {
       return false;
     }
-    pending[pos] = marker_entry(index_pos, timestamp, last_update);
+    pending[pos] = marker_entry(index_pos, timestamp);
     return true;
   }
 
-  void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp, const real_time& last_update = {}) {
-    finish_markers[pos] = marker_entry(index_pos, timestamp, last_update);
+  void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp) {
+    finish_markers[pos] = marker_entry(index_pos, timestamp);
   }
 
   RGWCoroutine *finish(const T& pos) {
@@ -437,7 +436,7 @@ public:
     --i;
     const T& high_marker = i->first;
     marker_entry& high_entry = i->second;
-    RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp, high_entry.last_update));
+    RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp));
     finish_markers.erase(finish_markers.begin(), last);
     return cr;
   }