From a8e622c31b8498bdfaafa305162b08b1ae5defde Mon Sep 17 00:00:00 2001 From: Shilpa Jagannath Date: Tue, 2 Aug 2022 15:22:13 -0400 Subject: [PATCH] rgw/multisite: wait for outstanding data_sync_single_entry()s to complete before spawning shards of next generation Signed-off-by: Shilpa Jagannath --- src/rgw/rgw_data_sync.cc | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index f6cfb41749ff9..d75f55f1a5186 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1341,7 +1341,8 @@ public: progress = ceph::real_time{}; ldout(cct, 4) << "starting sync on " << bucket_shard_str{state->key.first} - << ' ' << *state->obligation << dendl; + << ' ' << *state->obligation << "progress timestamp " << state->progress_timestamp + << "progress " << progress << dendl; yield call(new RGWRunBucketSourcesSyncCR(sc, lease_cr, state->key.first, tn, state->obligation->gen, @@ -1530,13 +1531,13 @@ class RGWDataFullSyncSingleEntryCR : public RGWCoroutine { bool error_inject; public: - RGWDataFullSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw_pool& pool, const rgw_bucket_shard& _source_bs, + RGWDataFullSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw_pool& _pool, const rgw_bucket_shard& _source_bs, const std::string& _key, const rgw_data_sync_status& sync_status, const rgw_raw_obj& _error_repo, ceph::real_time _timestamp, boost::intrusive_ptr _lease_cr, boost::intrusive_ptr _bucket_shard_cache, RGWDataSyncShardMarkerTrack* _marker_tracker, RGWSyncTraceNodeRef& _tn) - : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), source_bs(_source_bs), key(_key), + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), pool(_pool), source_bs(_source_bs), key(_key), error_repo(_error_repo), timestamp(_timestamp), lease_cr(_lease_cr), bucket_shard_cache(_bucket_shard_cache), marker_tracker(_marker_tracker), tn(_tn) { error_inject = (sync_env->cct->_conf->rgw_sync_data_full_inject_err_probability > 0); @@ -1600,6 +1601,12 @@ public: } } } + drain_all_cb([&](uint64_t stack_id, int ret) { + if (ret < 0) { + retcode = ret; + } + return retcode; + }); } drain_all_cb([&](uint64_t stack_id, int ret) { @@ -1956,6 +1963,7 @@ public: if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) { tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?")); } else { + tn->log(1, SSTR("incremental sync on " << log_iter->entry.key << "shard: " << shard_id << "on gen " << log_iter->entry.gen)); yield_spawn_window(data_sync_single_entry(sc, source_bs, log_iter->entry.gen, log_iter->log_id, log_iter->log_timestamp, lease_cr,bucket_shard_cache, &*marker_tracker, error_repo, tn, false), @@ -4373,6 +4381,7 @@ public: done.clear(); done.resize(next_log.num_shards, false); } + ldpp_dout(dpp, 20) << "bucket status incremental gen is " << bucket_status.incremental_gen << dendl; using WriteCR = RGWSimpleRadosWriteCR; call(new WriteCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, bucket_status_obj, bucket_status, &objv_tracker, false)); -- 2.39.5