From c7a49c9e41bf61583689388b6906d011d528190c Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 9 Feb 2021 18:00:14 -0500 Subject: [PATCH] rgw: handle older/newer generations after reading bucket sync status wait until we've read the bucket sync status and found that we're in incremental sync before we start using incremental_gen for comparison Signed-off-by: Casey Bodley --- src/rgw/rgw_data_sync.cc | 54 ++++++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 19 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index a1d2e2e9e2893..b1117567c5415 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1233,8 +1233,7 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine { int num_shards{0}; int cur_shard{0}; bool again = false; - std::uint64_t syncing_gen = 0; // TODO: Fill this in from bucket sync status - std::optional entry_gen; + std::optional gen; public: RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, @@ -1242,7 +1241,7 @@ public: std::optional _target_bs, std::optional _source_bs, const RGWSyncTraceNodeRef& _tn_parent, - std::optional entry_gen, + std::optional gen, ceph::real_time* progress); int operate(const DoutPrefixProvider *dpp) override; @@ -4580,6 +4579,7 @@ std::ostream& operator<<(std::ostream& out, std::optional& bs) static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc, boost::intrusive_ptr lease, const rgw_bucket_sync_pair_info& sync_pair, + std::optional gen, const RGWSyncTraceNodeRef& tn, ceph::real_time* progress); @@ -4588,7 +4588,7 @@ RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, std::optional _target_bs, std::optional _source_bs, const RGWSyncTraceNodeRef& _tn_parent, - std::optional entry_gen, + std::optional gen, ceph::real_time* progress) : RGWCoroutine(_sc->env->cct), sc(_sc), sync_env(_sc->env), lease_cr(std::move(lease_cr)), target_bs(_target_bs), source_bs(_source_bs), @@ -4598,7 +4598,7 @@ RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << sc->source_zone))), progress(progress), - entry_gen(entry_gen) + gen(gen) { if (target_bs) { target_bucket = target_bs->bucket; @@ -4629,15 +4629,6 @@ int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp) ldpp_dout(dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl; source_num_shards = siter->source.get_bucket_info().layout.current_index.layout.normal.num_shards; - if (entry_gen) { - if (*entry_gen > syncing_gen) { - tn->log(10, "Future generation in datalog entry. Returning error so we'll retry."); - return set_cr_error(-EAGAIN); - } else if (*entry_gen < syncing_gen) { - tn->log(10, "Future generation in datalog entry. Returning error so we'll retry."); - return 0; - } - } target_num_shards = siter->target.get_bucket_info().layout.current_index.layout.normal.num_shards; if (source_bs) { sync_pair.source_bs = *source_bs; @@ -4675,8 +4666,8 @@ int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp) cur_progress = (progress ? &shard_progress[prealloc_stack_id()] : nullptr); - yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair, tn, - cur_progress), + yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair, + gen, tn, cur_progress), BUCKET_SYNC_SPAWN_WINDOW, [&](uint64_t stack_id, int ret) { handle_complete_stack(stack_id); @@ -5049,6 +5040,7 @@ class RGWSyncBucketCR : public RGWCoroutine { boost::intrusive_ptr bucket_lease_cr; rgw_bucket_sync_pair_info sync_pair; rgw_bucket_sync_pipe sync_pipe; + std::optional gen; ceph::real_time* progress; const std::string lock_name = "bucket sync"; @@ -5064,10 +5056,12 @@ public: RGWSyncBucketCR(RGWDataSyncCtx *_sc, boost::intrusive_ptr lease_cr, const rgw_bucket_sync_pair_info& _sync_pair, + std::optional gen, const RGWSyncTraceNodeRef& _tn_parent, ceph::real_time* progress) : RGWCoroutine(_sc->cct), sc(_sc), env(_sc->env), - data_lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress), + data_lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), + gen(gen), progress(progress), lock_duration(cct->_conf->rgw_sync_lease_period), status_obj(env->svc->zone->get_zone_params().log_pool, RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone, @@ -5083,10 +5077,12 @@ public: static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc, boost::intrusive_ptr lease, const rgw_bucket_sync_pair_info& sync_pair, + std::optional gen, const RGWSyncTraceNodeRef& tn, ceph::real_time* progress) { - return new RGWSyncBucketCR(sc, std::move(lease), sync_pair, tn, progress); + return new RGWSyncBucketCR(sc, std::move(lease), sync_pair, + gen, tn, progress); } int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) @@ -5199,6 +5195,24 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) drain_all(); bucket_lease_cr.reset(); } + + // if a specific gen was requested, compare that to the sync status + if (gen) { + const auto current_gen = bucket_status.incremental_gen; + if (*gen > current_gen) { + retcode = -EAGAIN; + tn->log(10, SSTR("requested sync of future generation " + << *gen << " > " << current_gen + << ", returning " << retcode << " for later retry")); + return set_cr_error(retcode); + } else if (*gen < current_gen) { + tn->log(10, SSTR("requested sync of past generation " + << *gen << " < " << current_gen + << ", returning success")); + return set_cr_done(); + } + } + yield call(new RGWSyncBucketShardCR(sc, data_lease_cr, sync_pair, sync_pipe, bucket_status.state, tn, progress)); @@ -5221,7 +5235,9 @@ RGWCoroutine *RGWRemoteBucketManager::run_sync_cr(int num) return nullptr; } - return sync_bucket_shard_cr(&sc, nullptr, sync_pairs[num], sync_env->sync_tracer->root_node, nullptr); + constexpr std::optional gen; // sync current gen + return sync_bucket_shard_cr(&sc, nullptr, sync_pairs[num], gen, + sync_env->sync_tracer->root_node, nullptr); } int RGWBucketPipeSyncStatusManager::init(const DoutPrefixProvider *dpp) -- 2.39.5