From: Yuval Lifshitz Date: Tue, 25 May 2021 18:11:25 +0000 (+0300) Subject: rgw/multisite: track shard sync status objects per generation X-Git-Tag: v18.0.0~787^2~90 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=bfc1b7ec681c7a29d3a12bf9849795fd1077c77b;p=ceph-ci.git rgw/multisite: track shard sync status objects per generation Signed-off-by: Yuval Lifshitz --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index baba80ada3e..771fe60c37f 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2515,7 +2515,7 @@ static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::Ra } std::vector status; - r = rgw_read_bucket_inc_sync_status(dpp, store, pipe, bucket_info, &source_bucket->get_info(), &status); + r = rgw_read_bucket_inc_sync_status(dpp, store, pipe, bucket_info, &source_bucket->get_info(), full_status.incremental_gen, &status); if (r < 0) { lderr(store->ctx()) << "failed to read bucket incremental sync status: " << cpp_strerror(r) << dendl; return r; @@ -8780,6 +8780,7 @@ next: cerr << "ERROR: sync.init() returned ret=" << ret << std::endl; return -ret; } + ret = sync.read_sync_status(dpp()); if (ret < 0) { cerr << "ERROR: sync.read_sync_status() returned ret=" << ret << std::endl; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 2f53390503c..4a96ae46f15 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -2759,7 +2759,7 @@ public: bool exclusive) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pair(_sync_pair), - sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair)), + sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair, _info.latest_gen)), status(_status), objv_tracker(objv_tracker), info(_info), marker_mgr(_marker_mgr), exclusive(exclusive) {} @@ -2906,9 +2906,10 @@ public: RGWReadBucketPipeSyncStatusCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& sync_pair, rgw_bucket_shard_sync_info *_status, - RGWObjVersionTracker* objv_tracker) + RGWObjVersionTracker* objv_tracker, + uint64_t gen) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)), + oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair, gen)), status(_status), objv_tracker(objv_tracker) {} int operate(const DoutPrefixProvider *dpp) override; @@ -2942,7 +2943,7 @@ class CheckBucketShardStatusIsIncremental : public RGWReadBucketPipeSyncStatusCo CheckBucketShardStatusIsIncremental(RGWDataSyncCtx* sc, const rgw_bucket_sync_pair_info& sync_pair, bool* result) - : RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &status, nullptr), + : RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &status, nullptr, 0 /*no gen in compat mode*/), result(result) {} @@ -3036,7 +3037,7 @@ class InitBucketShardStatusCR : public RGWCoroutine { while (--tries) { objv.clear(); // read current status and objv - yield call(new ReadCR(sc, pair, &status, &objv)); + yield call(new ReadCR(sc, pair, &status, &objv, info.latest_gen)); if (retcode < 0) { return set_cr_error(retcode); } @@ -3375,7 +3376,7 @@ RGWCoroutine *RGWRemoteBucketManager::read_sync_status_cr(int num, rgw_bucket_sh return nullptr; } - return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pairs[num], sync_status, nullptr); + return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pairs[num], sync_status, nullptr, full_status.incremental_gen); } RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RadosStore* _store, @@ -5068,7 +5069,7 @@ public: : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), sync_pipe(sync_pipe), bucket_state(bucket_state), generation(generation), progress(progress), - shard_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)), + shard_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair, generation)), bucket_status_obj(sc->env->svc->zone->get_zone_params().log_pool, RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone, sync_pair.source_bs.bucket, @@ -5082,7 +5083,7 @@ public: int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp) { reenter(this) { - yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker)); + yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker, generation)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, "ERROR: failed to read sync status for bucket"); return set_cr_error(retcode); @@ -5468,13 +5469,20 @@ string RGWBucketPipeSyncStatusManager::full_status_oid(const rgw_zone_id& source } } +inline std::string generation_token(uint64_t gen) { + return (gen == 0) ? "" : (":" + std::to_string(gen)); +} + string RGWBucketPipeSyncStatusManager::inc_status_oid(const rgw_zone_id& source_zone, - const rgw_bucket_sync_pair_info& sync_pair) + const rgw_bucket_sync_pair_info& sync_pair, + uint64_t gen) { if (sync_pair.source_bs.bucket == sync_pair.dest_bucket) { - return bucket_status_oid_prefix + "." + source_zone.id + ":" + sync_pair.source_bs.get_key(); + return bucket_status_oid_prefix + "." + source_zone.id + ":" + sync_pair.source_bs.get_key() + + generation_token(gen); } else { - return bucket_status_oid_prefix + "." + source_zone.id + ":" + sync_pair.dest_bucket.get_key() + ":" + sync_pair.source_bs.get_key(); + return bucket_status_oid_prefix + "." + source_zone.id + ":" + sync_pair.dest_bucket.get_key() + ":" + sync_pair.source_bs.get_key() + + generation_token(gen); } } @@ -5523,6 +5531,8 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR { rgw::sal::RadosStore* const store; RGWDataSyncCtx *const sc; RGWDataSyncEnv *const env; + const uint64_t gen; + rgw_bucket_sync_pair_info sync_pair; using Vector = std::vector; Vector::iterator i, end; @@ -5541,9 +5551,10 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR { RGWCollectBucketSyncStatusCR(rgw::sal::RadosStore* store, RGWDataSyncCtx *sc, const RGWBucketInfo& source_bucket_info, const RGWBucketInfo& dest_bucket_info, + uint64_t gen, Vector *status) : RGWShardCollectCR(sc->cct, max_concurrent_shards), - store(store), sc(sc), env(sc->env), + store(store), sc(sc), env(sc->env), gen(gen), i(status->begin()), end(status->end()) { sync_pair.source_bs = rgw_bucket_shard(source_bucket_info.bucket, source_bucket_info.layout.current_index.layout.normal.num_shards > 0 ? 0 : -1); @@ -5554,7 +5565,7 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR { if (i == end) { return false; } - spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i, nullptr), false); + spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i, nullptr, gen), false); ++i; ++sync_pair.source_bs.shard_id; return true; @@ -5597,6 +5608,7 @@ int rgw_read_bucket_inc_sync_status(const DoutPrefixProvider *dpp, const rgw_sync_bucket_pipe& pipe, const RGWBucketInfo& dest_bucket_info, const RGWBucketInfo *psource_bucket_info, + uint64_t gen, std::vector *status) { if (!pipe.source.zone || @@ -5640,6 +5652,7 @@ int rgw_read_bucket_inc_sync_status(const DoutPrefixProvider *dpp, return crs.run(dpp, new RGWCollectBucketSyncStatusCR(store, &sc, *psource_bucket_info, dest_bucket_info, + gen, status)); } diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 538a692d5b2..e1b5b2d9cb0 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -749,7 +749,8 @@ public: const rgw_bucket& source_bucket, const rgw_bucket& dest_bucket); static std::string inc_status_oid(const rgw_zone_id& source_zone, - const rgw_bucket_sync_pair_info& bs); + const rgw_bucket_sync_pair_info& bs, + uint64_t gen); // specific source obj sync status, can be used by sync modules static std::string obj_status_oid(const rgw_bucket_sync_pipe& sync_pipe, const rgw_zone_id& source_zone, const rgw::sal::Object* obj); /* specific source obj sync status, @@ -777,6 +778,7 @@ int rgw_read_bucket_inc_sync_status(const DoutPrefixProvider *dpp, const rgw_sync_bucket_pipe& pipe, const RGWBucketInfo& dest_bucket_info, const RGWBucketInfo *psource_bucket_info, + uint64_t gen, std::vector *status); class RGWDefaultSyncModule : public RGWSyncModule { diff --git a/src/rgw/rgw_rest_log.cc b/src/rgw/rgw_rest_log.cc index 764a1420dbb..e597668dd87 100644 --- a/src/rgw/rgw_rest_log.cc +++ b/src/rgw/rgw_rest_log.cc @@ -1015,6 +1015,7 @@ void RGWOp_BILog_Status::execute(optional_yield y) pipe, bucket->get_info(), nullptr, + status.sync_status.incremental_gen, &status.inc_status); if (op_ret < 0) { ldpp_dout(this, -1) << "ERROR: rgw_read_bucket_inc_sync_status() on pipe=" << pipe << " returned ret=" << op_ret << dendl; @@ -1076,7 +1077,7 @@ void RGWOp_BILog_Status::execute(optional_yield y) } } int r = rgw_read_bucket_inc_sync_status(this, static_cast(store), - pipe, *pinfo, &bucket->get_info(), ¤t_status); + pipe, *pinfo, &bucket->get_info(), status.sync_status.incremental_gen, ¤t_status); if (r < 0) { ldpp_dout(this, -1) << "ERROR: rgw_read_bucket_inc_sync_status() on pipe=" << pipe << " returned ret=" << r << dendl; op_ret = r; diff --git a/src/rgw/rgw_sync_checkpoint.cc b/src/rgw/rgw_sync_checkpoint.cc index 9f4b4ca71a5..c9d350e73ef 100644 --- a/src/rgw/rgw_sync_checkpoint.cc +++ b/src/rgw/rgw_sync_checkpoint.cc @@ -145,7 +145,7 @@ int bucket_source_sync_checkpoint(const DoutPrefixProvider* dpp, std::vector status; status.resize(std::max(1, num_shards)); r = rgw_read_bucket_inc_sync_status(dpp, store, pipe, bucket_info, - &source_bucket_info, &status); + &source_bucket_info, full_status.incremental_gen, &status); if (r < 0) { return r; } @@ -161,7 +161,7 @@ int bucket_source_sync_checkpoint(const DoutPrefixProvider* dpp, << " remote markers: " << remote_markers << dendl; std::this_thread::sleep_until(delay_until); r = rgw_read_bucket_inc_sync_status(dpp, store, pipe, bucket_info, - &source_bucket_info, &status); + &source_bucket_info, full_status.incremental_gen, &status); if (r < 0) { return r; }