From b65b5b5a9f45479023db984668815591d29617d9 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 31 Mar 2020 09:23:14 -0400 Subject: [PATCH] rgw: DataSyncSingleEntry does not duplicate bucket sync Signed-off-by: Casey Bodley --- src/rgw/rgw_data_sync.cc | 62 ++++++++++++++++++++++++++-------------- 1 file changed, 41 insertions(+), 21 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index dd7e3b99a5d..f0162da1679 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1290,7 +1290,8 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw::bucket_sync::Handle state; // cached bucket-shard state - rgw_data_sync_obligation obligation; + rgw_data_sync_obligation obligation; // input obligation + std::optional complete; // obligation to complete RGWDataSyncShardMarkerTrack *marker_tracker; boost::intrusive_ptr error_repo; RGWSyncTraceNodeRef tn; @@ -1311,59 +1312,78 @@ public: int operate() override { reenter(this) { - do { - if (marker_tracker) { - marker_tracker->reset_need_retry(obligation.key); + if (state->obligation) { + // this is already syncing in another DataSyncSingleEntryCR + if (state->obligation->timestamp < obligation.timestamp) { + // cancel existing obligation and overwrite it + tn->log(10, SSTR("canceling existing obligation " << *state->obligation)); + complete = std::move(*state->obligation); + *state->obligation = std::move(obligation); + } else { + // cancel new obligation + tn->log(10, SSTR("canceling new obligation " << obligation)); + complete = std::move(obligation); } - tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{state->key})); + } else { + // start syncing a new obligation + state->obligation = obligation; - yield call(new RGWRunBucketSourcesSyncCR(sc, - std::nullopt, /* target_bs */ - state->key, - tn, &progress)); - if (retcode == 0) { - tn->log(20, SSTR("RunBucketSources progress=" << progress)); - } - } while (marker_tracker && marker_tracker->need_retry(obligation.key)); + do { + yield { + tn->log(4, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{state->key} << " " << *state->obligation)); + + call(new RGWRunBucketSourcesSyncCR(sc, + std::nullopt, /* target_bs */ + state->key, + tn, &progress)); + } + } while (marker_tracker && marker_tracker->need_retry(obligation.key)); + tn->log(10, SSTR("sync finished on " << bucket_shard_str{state->key})); + + // any new obligations will process themselves + complete = std::move(*state->obligation); + state->obligation.reset(); + } sync_status = retcode; if (sync_status == -ENOENT) { // this was added when 'tenant/' was added to datalog entries, because // preexisting tenant buckets could never sync and would stay in the // error_repo forever - tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << obligation.key)); + tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << complete->key)); sync_status = 0; } if (sync_status < 0) { // write actual sync failures for 'radosgw-admin sync error list' if (sync_status != -EBUSY && sync_status != -EAGAIN) { - yield call(sync_env->error_logger->log_error_cr(sc->conn->get_remote_id(), "data", obligation.key, + yield call(sync_env->error_logger->log_error_cr(sc->conn->get_remote_id(), "data", complete->key, -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status))); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode)); } } - if (error_repo) { + if (error_repo && complete->timestamp != ceph::real_time{}) { + tn->log(10, SSTR("writing " << *complete << " to error repo for retry")); yield call(rgw_error_repo_write_cr(sync_env->store->svc()->rados, error_repo->get_obj(), - obligation.key, obligation.timestamp)); + complete->key, complete->timestamp)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode)); } } - } else if (error_repo && obligation.retry) { + } else if (error_repo && complete->retry) { yield call(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo->get_obj(), - obligation.key, obligation.timestamp)); + complete->key, complete->timestamp)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to remove omap key from error repo (" << error_repo->get_obj() << " retcode=" << retcode)); } } /* FIXME: what do do in case of error */ - if (marker_tracker && !obligation.marker.empty()) { + if (marker_tracker && !complete->marker.empty()) { /* update marker */ - yield call(marker_tracker->finish(obligation.marker)); + yield call(marker_tracker->finish(complete->marker)); } if (sync_status == 0) { sync_status = retcode; -- 2.39.5