From e653337f82af7532956bc09731dcdfa3fded1f64 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Mon, 22 Nov 2021 13:05:40 -0500 Subject: [PATCH] rgw/multisite: RunBucketSourcesSync no longer takes optional target RGWDataSyncSingleEntryCR is the only caller of RGWRunBucketSourcesSyncCR it always provides a source_bs, and never provides a target_bs. so remove all the complexity related to target_bs, and the idea that we'd need to sync several source bucket shards related to the target bucket we now just have the single loop over the target buckets that use the given bucket as a source Signed-off-by: Casey Bodley --- src/rgw/rgw_data_sync.cc | 108 +++++++++------------------------------ 1 file changed, 23 insertions(+), 85 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 13a93f0cebf40..8b6affd4ae87c 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1250,12 +1250,6 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; boost::intrusive_ptr lease_cr; - std::optional target_bs; - std::optional source_bs; - - std::optional target_bucket; - std::optional source_bucket; - rgw_sync_pipe_info_set pipes; rgw_sync_pipe_info_set::iterator siter; @@ -1271,12 +1265,6 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine { RGWRESTConn *conn{nullptr}; rgw_zone_id last_zone; - int source_num_shards{0}; - int target_num_shards{0}; - - int num_shards{0}; - int cur_shard{0}; - bool again = false; std::optional gen; rgw_bucket_index_marker_info marker_info; BucketIndexShardsManager marker_mgr; @@ -1284,8 +1272,7 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine { public: RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, boost::intrusive_ptr lease_cr, - std::optional _target_bs, - std::optional _source_bs, + const rgw_bucket_shard& source_bs, const RGWSyncTraceNodeRef& _tn_parent, std::optional gen, ceph::real_time* progress); @@ -1373,7 +1360,6 @@ public: ldout(cct, 4) << "starting sync on " << bucket_shard_str{state->key} << ' ' << *state->obligation << dendl; yield call(new RGWRunBucketSourcesSyncCR(sc, lease_cr, - std::nullopt, /* target_bs */ state->key, tn, state->obligation->gen, &progress)); @@ -4747,39 +4733,32 @@ static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc, RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, boost::intrusive_ptr lease_cr, - std::optional _target_bs, - std::optional _source_bs, + const rgw_bucket_shard& source_bs, const RGWSyncTraceNodeRef& _tn_parent, std::optional gen, ceph::real_time* progress) : RGWCoroutine(_sc->env->cct), sc(_sc), sync_env(_sc->env), - lease_cr(std::move(lease_cr)), target_bs(_target_bs), source_bs(_source_bs), + lease_cr(std::move(lease_cr)), tn(sync_env->sync_tracer->add_node( _tn_parent, "bucket_sync_sources", - SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << - ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << - ":source_zone=" << sc->source_zone))), + SSTR( "source=" << source_bs << ":source_zone=" << sc->source_zone))), progress(progress), gen(gen) { - if (target_bs) { - target_bucket = target_bs->bucket; - } - if (source_bs) { - source_bucket = source_bs->bucket; - } + sync_pair.source_bs = source_bs; } int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp) { reenter(this) { - yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, sc->source_zone, source_bucket, &pipes, tn)); + yield call(new RGWGetBucketPeersCR(sync_env, std::nullopt, sc->source_zone, + sync_pair.source_bs.bucket, &pipes, tn)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, SSTR("ERROR: failed to read sync status for bucket. error: " << retcode)); return set_cr_error(retcode); } - ldpp_dout(dpp, 20) << __func__ << "(): requested source_bs=" << source_bs << " target_bs=" << target_bs << dendl; + ldpp_dout(dpp, 20) << __func__ << "(): requested source_bs=" << sync_pair.source_bs << dendl; if (pipes.empty()) { ldpp_dout(dpp, 20) << __func__ << "(): no relevant sync pipes found" << dendl; @@ -4787,66 +4766,25 @@ int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp) } for (siter = pipes.begin(); siter != pipes.end(); ++siter) { - { - ldpp_dout(dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl; - yield call(new RGWReadRemoteBucketIndexLogInfoCR( - sc, siter->source.get_bucket_info().bucket, &marker_info)); - if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to fetch markers for bucket: " - << siter->source.get_bucket_info().bucket)); - return set_cr_error(retcode); - } - retcode = marker_mgr.from_string(marker_info.max_marker, -1); - if (retcode < 0) { - ldpp_dout(dpp, 0) << "ERROR: failed to decode markers for bucket: " - << siter->source.get_bucket_info().bucket << dendl; - return set_cr_error(retcode); - } - source_num_shards = marker_mgr.get().size(); + ldpp_dout(dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl; - target_num_shards = siter->target.get_bucket_info().layout.current_index.layout.normal.num_shards; - if (source_bs) { - sync_pair.source_bs = *source_bs; - } else { - sync_pair.source_bs.bucket = siter->source.get_bucket(); - } - sync_pair.dest_bucket = siter->target.get_bucket(); + sync_pair.dest_bucket = siter->target.get_bucket(); + sync_pair.handler = siter->handler; - sync_pair.handler = siter->handler; + ldpp_dout(dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl; - if (sync_pair.source_bs.shard_id >= 0) { - num_shards = 1; - cur_shard = sync_pair.source_bs.shard_id; - } else { - num_shards = std::max(1, source_num_shards); - cur_shard = std::min(0, source_num_shards); - } - } - - ldpp_dout(dpp, 20) << __func__ << "(): num shards=" << num_shards << " cur_shard=" << cur_shard << dendl; + cur_progress = (progress ? &shard_progress[prealloc_stack_id()] : nullptr); - for (; num_shards > 0; --num_shards, ++cur_shard) { - /* - * use a negatvie shard_id for backward compatibility, - * this affects the crafted status oid - */ - sync_pair.source_bs.shard_id = (source_num_shards > 0 ? cur_shard : -1); - - ldpp_dout(dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl; - - cur_progress = (progress ? &shard_progress[prealloc_stack_id()] : nullptr); - - yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair, - gen, tn, cur_progress), - BUCKET_SYNC_SPAWN_WINDOW, - [&](uint64_t stack_id, int ret) { - handle_complete_stack(stack_id); - if (ret < 0) { - tn->log(10, SSTR("ERROR: a sync operation returned error: " << ret)); - } - return ret; - }); - } + yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair, + gen, tn, cur_progress), + BUCKET_SYNC_SPAWN_WINDOW, + [&](uint64_t stack_id, int ret) { + handle_complete_stack(stack_id); + if (ret < 0) { + tn->log(10, SSTR("ERROR: a sync operation returned error: " << ret)); + } + return ret; + }); } drain_all_cb([&](uint64_t stack_id, int ret) { handle_complete_stack(stack_id); -- 2.39.5