rgw::bucket_sync::Handle state; // cached bucket-shard state
rgw_data_sync_obligation obligation; // input obligation
std::optional<rgw_data_sync_obligation> complete; // obligation to complete
+ uint32_t obligation_counter = 0;
RGWDataSyncShardMarkerTrack *marker_tracker;
boost::intrusive_ptr<RGWOmapAppend> error_repo;
RGWSyncTraceNodeRef tn;
tn->log(10, SSTR("canceling existing obligation " << *state->obligation));
complete = std::move(*state->obligation);
*state->obligation = std::move(obligation);
+ state->counter++;
} else {
// cancel new obligation
tn->log(10, SSTR("canceling new obligation " << obligation));
} else {
// start syncing a new obligation
state->obligation = obligation;
-
- do {
- yield {
- tn->log(4, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{state->key} << " " << *state->obligation));
-
- call(new RGWRunBucketSourcesSyncCR(sc,
- std::nullopt, /* target_bs */
- state->key,
- tn, &progress));
+ obligation_counter = state->counter;
+ state->counter++;
+
+ // loop until the latest obligation is satisfied, because other callers
+ // may update the obligation while we're syncing
+ while (state->progress_timestamp < state->obligation->timestamp &&
+ obligation_counter != state->counter) {
+ obligation_counter = state->counter;
+ progress = ceph::real_time{};
+
+ ldout(cct, 4) << "starting sync on " << bucket_shard_str{state->key}
+ << ' ' << *state->obligation << dendl;
+ yield call(new RGWRunBucketSourcesSyncCR(sc, std::nullopt, /* target_bs */
+ state->key, tn, &progress));
+ if (retcode < 0) {
+ break;
}
- } while (marker_tracker && marker_tracker->need_retry(obligation.key));
-
- tn->log(10, SSTR("sync finished on " << bucket_shard_str{state->key}));
-
+ state->progress_timestamp = std::max(progress, state->progress_timestamp);
+ }
// any new obligations will process themselves
complete = std::move(*state->obligation);
state->obligation.reset();
+
+ tn->log(10, SSTR("sync finished on " << bucket_shard_str{state->key}
+ << " progress=" << progress << ' ' << complete << " r=" << retcode));
}
sync_status = retcode;