From: Casey Bodley Date: Tue, 31 Mar 2020 13:23:16 +0000 (-0400) Subject: rgw: DataSyncSingleEntry loops based on progress X-Git-Tag: v16.1.0~2586^2~9 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9826a115147a5787f6bd410404357beb9fd2508e;p=ceph.git rgw: DataSyncSingleEntry loops based on progress Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_bucket_sync_cache.h b/src/rgw/rgw_bucket_sync_cache.h index b781e92378ed..7d340d327645 100644 --- a/src/rgw/rgw_bucket_sync_cache.h +++ b/src/rgw/rgw_bucket_sync_cache.h @@ -26,6 +26,8 @@ struct State { rgw_bucket_shard key; // current sync obligation being processed by DataSyncSingleEntry std::optional obligation; + // incremented with each new obligation + uint32_t counter = 0; // highest timestamp applied by all sources ceph::real_time progress_timestamp; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index f0162da1679f..952625748e1d 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1292,6 +1292,7 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { rgw::bucket_sync::Handle state; // cached bucket-shard state rgw_data_sync_obligation obligation; // input obligation std::optional complete; // obligation to complete + uint32_t obligation_counter = 0; RGWDataSyncShardMarkerTrack *marker_tracker; boost::intrusive_ptr error_repo; RGWSyncTraceNodeRef tn; @@ -1319,6 +1320,7 @@ public: tn->log(10, SSTR("canceling existing obligation " << *state->obligation)); complete = std::move(*state->obligation); *state->obligation = std::move(obligation); + state->counter++; } else { // cancel new obligation tn->log(10, SSTR("canceling new obligation " << obligation)); @@ -1327,23 +1329,31 @@ public: } else { // start syncing a new obligation state->obligation = obligation; - - do { - yield { - tn->log(4, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{state->key} << " " << *state->obligation)); - - call(new RGWRunBucketSourcesSyncCR(sc, - std::nullopt, /* target_bs */ - state->key, - tn, &progress)); + obligation_counter = state->counter; + state->counter++; + + // loop until the latest obligation is satisfied, because other callers + // may update the obligation while we're syncing + while (state->progress_timestamp < state->obligation->timestamp && + obligation_counter != state->counter) { + obligation_counter = state->counter; + progress = ceph::real_time{}; + + ldout(cct, 4) << "starting sync on " << bucket_shard_str{state->key} + << ' ' << *state->obligation << dendl; + yield call(new RGWRunBucketSourcesSyncCR(sc, std::nullopt, /* target_bs */ + state->key, tn, &progress)); + if (retcode < 0) { + break; } - } while (marker_tracker && marker_tracker->need_retry(obligation.key)); - - tn->log(10, SSTR("sync finished on " << bucket_shard_str{state->key})); - + state->progress_timestamp = std::max(progress, state->progress_timestamp); + } // any new obligations will process themselves complete = std::move(*state->obligation); state->obligation.reset(); + + tn->log(10, SSTR("sync finished on " << bucket_shard_str{state->key} + << " progress=" << progress << ' ' << complete << " r=" << retcode)); } sync_status = retcode;