]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: DataSyncSingleEntry loops based on progress
authorCasey Bodley <cbodley@redhat.com>
Tue, 31 Mar 2020 13:23:16 +0000 (09:23 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 13 Apr 2020 18:08:18 +0000 (14:08 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_bucket_sync_cache.h
src/rgw/rgw_data_sync.cc

index b781e92378edaaf811f97af7f6ae7d3fe764337b..7d340d327645628d0bd73593d18476aa23ecc9ba 100644 (file)
@@ -26,6 +26,8 @@ struct State {
   rgw_bucket_shard key;
   // current sync obligation being processed by DataSyncSingleEntry
   std::optional<rgw_data_sync_obligation> obligation;
+  // incremented with each new obligation
+  uint32_t counter = 0;
   // highest timestamp applied by all sources
   ceph::real_time progress_timestamp;
 
index f0162da1679f3a135d20c32ab266cb936daf33c7..952625748e1dfcd91e3e337416f9f24b928ed4e7 100644 (file)
@@ -1292,6 +1292,7 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
   rgw::bucket_sync::Handle state; // cached bucket-shard state
   rgw_data_sync_obligation obligation; // input obligation
   std::optional<rgw_data_sync_obligation> complete; // obligation to complete
+  uint32_t obligation_counter = 0;
   RGWDataSyncShardMarkerTrack *marker_tracker;
   boost::intrusive_ptr<RGWOmapAppend> error_repo;
   RGWSyncTraceNodeRef tn;
@@ -1319,6 +1320,7 @@ public:
           tn->log(10, SSTR("canceling existing obligation " << *state->obligation));
           complete = std::move(*state->obligation);
           *state->obligation = std::move(obligation);
+          state->counter++;
         } else {
           // cancel new obligation
           tn->log(10, SSTR("canceling new obligation " << obligation));
@@ -1327,23 +1329,31 @@ public:
       } else {
         // start syncing a new obligation
         state->obligation = obligation;
-
-        do {
-          yield {
-            tn->log(4, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{state->key} << " " << *state->obligation));
-
-            call(new RGWRunBucketSourcesSyncCR(sc,
-                                               std::nullopt, /* target_bs */
-                                               state->key,
-                                               tn, &progress));
+        obligation_counter = state->counter;
+        state->counter++;
+
+        // loop until the latest obligation is satisfied, because other callers
+        // may update the obligation while we're syncing
+        while (state->progress_timestamp < state->obligation->timestamp &&
+               obligation_counter != state->counter) {
+          obligation_counter = state->counter;
+          progress = ceph::real_time{};
+
+          ldout(cct, 4) << "starting sync on " << bucket_shard_str{state->key}
+              << ' ' << *state->obligation << dendl;
+          yield call(new RGWRunBucketSourcesSyncCR(sc, std::nullopt, /* target_bs */
+                                                   state->key, tn, &progress));
+          if (retcode < 0) {
+            break;
           }
-        } while (marker_tracker && marker_tracker->need_retry(obligation.key));
-
-        tn->log(10, SSTR("sync finished on " << bucket_shard_str{state->key}));
-
+          state->progress_timestamp = std::max(progress, state->progress_timestamp);
+        }
         // any new obligations will process themselves
         complete = std::move(*state->obligation);
         state->obligation.reset();
+
+        tn->log(10, SSTR("sync finished on " << bucket_shard_str{state->key}
+                         << " progress=" << progress << ' ' << complete << " r=" << retcode));
       }
       sync_status = retcode;