From: Adam C. Emerson Date: Sat, 14 May 2022 05:11:57 +0000 (-0400) Subject: rgw: bucket sync run walks over generations X-Git-Tag: v18.0.0~787^2~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=97cfd37b0fc485ab937b45e5f489ce4c35d939f6;p=ceph.git rgw: bucket sync run walks over generations This should make the troubleshooting use case of bucket sync init/run usable with multisite reshard. This also fixes a few issues with the original bucket sync run code, by spawning multiple shards at a time and retrying retryable errors. Signed-off-by: Adam C. Emerson --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 40d25a7a4c26..2794d80c2e4b 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -5508,52 +5508,248 @@ RGWBucketPipeSyncStatusManager::read_sync_status( return sync_status; } -int RGWBucketPipeSyncStatusManager::run(const DoutPrefixProvider *dpp) -{ - list stacks; +namespace rgw::bucket_sync_run { +// Retry-loop over calls to sync_bucket_shard_cr +class ShardCR : public RGWCoroutine { + static constexpr auto allowed_retries = 10u; - struct bk { - std::vector pairs; - uint64_t latest_gen; - }; - std::vector bookkeeping; + RGWDataSyncCtx& sc; + const rgw_bucket_sync_pair_info& pair; + const uint64_t gen; + unsigned retries = 0; - for (auto& s : sources) { - auto stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr); - uint64_t num_shards, latest_gen; - auto ret = remote_info(dpp, s, nullptr, &latest_gen, &num_shards); - if (!ret) { - ldpp_dout(this, 5) << "Unable to get remote info: " - << ret << dendl; - return ret; + ceph::real_time prev_progress; + ceph::real_time progress; + +public: + + ShardCR(RGWDataSyncCtx& sc, const rgw_bucket_sync_pair_info& pair, + const uint64_t gen) + : RGWCoroutine(sc.cct), sc(sc), pair(pair), gen(gen) {} + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + // Since all errors (except ECANCELED) are considered retryable, + // retry other errors so long as we're making progress. + for (retries = 0u, retcode = -EDOM; + (retries < allowed_retries) && (retcode != 0); + ++retries) { + ldpp_dout(dpp, 5) << "ShardCR: syncing bucket shard on: " + << "zone=" << sc.source_zone + << ", bucket=" << pair.source_bs.bucket.name + << ", shard=" << pair.source_bs.shard_id + << ", gen=" << gen + << dendl; + yield call(sync_bucket_shard_cr(&sc, nullptr, pair, gen, + sc.env->sync_tracer->root_node, + &progress)); + + if (retcode == -ECANCELED) { + ldpp_dout(dpp, -1) << "ERROR: Got -ECANCELED for " + << pair.source_bs << dendl; + drain_all(); + return set_cr_error(retcode); + } else if (retcode < 0) { + ldpp_dout(dpp, 5) << "WARNING: Got error, retcode=" << retcode << " for " + << pair.source_bs << "on retry " + << retries + 1 << " of " << allowed_retries + << " allowed" << dendl; + // Reset the retry counter if we made any progress + if (progress != prev_progress) { + retries = 0; + } + prev_progress = progress; + } + } + if (retcode < 0) { + ldpp_dout(dpp, -1) << "ERROR: Exhausted retries for " + << pair.source_bs << " retcode=" + << retcode << dendl; + drain_all(); + return set_cr_error(retcode); + } + + drain_all(); + return set_cr_done(); } - bookkeeping.emplace_back(); - auto& cur = bookkeeping.back(); - cur.latest_gen = latest_gen; - cur.pairs.resize(num_shards); - for (auto shard = 0u; shard < num_shards; ++shard) { - auto& pair = cur.pairs[shard]; - pair.handler = s.handler; - pair.source_bs.bucket = s.info.bucket; - pair.dest_bucket = s.dest; + return 0; + } +}; + +// Loop over calls to ShardCR with limited concurrency +class GenCR : public RGWShardCollectCR { + static constexpr auto MAX_CONCURRENT_SHARDS = 64; + + RGWDataSyncCtx& sc; + const uint64_t gen; + + std::vector pairs; + decltype(pairs)::const_iterator iter; + +public: + GenCR(RGWDataSyncCtx& sc, const rgw_bucket& source, const rgw_bucket& dest, + const uint64_t gen, const uint64_t shards, + const RGWBucketSyncFlowManager::pipe_handler& handler) + : RGWShardCollectCR(sc.cct, MAX_CONCURRENT_SHARDS), + sc(sc), gen(gen) { + pairs.resize(shards); + for (auto shard = 0u; shard < shards; ++shard) { + auto& pair = pairs[shard]; + pair.handler = handler; + pair.source_bs.bucket = source; + pair.dest_bucket = dest; pair.source_bs.shard_id = shard; - stack->call(sync_bucket_shard_cr(&s.sc, nullptr, pair, - cur.latest_gen, - sync_env.sync_tracer->root_node, - nullptr)); + } + iter = pairs.cbegin(); + assert(pairs.size() == shards); + } + virtual bool spawn_next() override { + if (iter == pairs.cend()) { + return false; } - stacks.push_back(stack); + spawn(new ShardCR(sc, *iter, gen), false); + ++iter; + return true; } - int ret = cr_mgr.run(dpp, stacks); - if (ret < 0) { - ldpp_dout(this, 0) << "ERROR: failed to read sync status for " - << bucket_str{dest_bucket} << dendl; - return ret; + int handle_result(int r) override { + if (r < 0) { + ldpp_dout(sc.env->dpp, 4) << "ERROR: Error syncing shard: " + << cpp_strerror(r) << dendl; + } + return r; } +}; - return 0; +// Read sync status, loop over calls to GenCR +class SourceCR : public RGWCoroutine { + RGWDataSyncCtx& sc; + const RGWBucketInfo& info; + const rgw_bucket& dest; + const RGWBucketSyncFlowManager::pipe_handler& handler; + const rgw_raw_obj status_obj{ + sc.env->svc->zone->get_zone_params().log_pool, + RGWBucketPipeSyncStatusManager::full_status_oid(sc.source_zone, info.bucket, + dest)}; + + BucketSyncState state = BucketSyncState::Incremental; + uint64_t gen = 0; + uint64_t num_shards = 0; + rgw_bucket_sync_status status; + +public: + + SourceCR(RGWDataSyncCtx& sc, const RGWBucketInfo& info, + const rgw_bucket& dest, + const RGWBucketSyncFlowManager::pipe_handler& handler) + : RGWCoroutine(sc.cct), sc(sc), info(info), dest(dest), handler(handler) {} + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + // Get the source's status. In incremental sync, this gives us + // the generation and shard count that is next needed to be run. + yield call(new RGWSimpleRadosReadCR( + dpp, sc.env->async_rados, sc.env->svc->sysobj, + status_obj, &status)); + if (retcode < 0) { + ldpp_dout(dpp, -1) << "ERROR: Unable to fetch status for zone=" + << sc.source_zone << " retcode=" + << retcode << dendl; + drain_all(); + return set_cr_error(retcode); + } + + if (status.state == BucketSyncState::Stopped) { + // Nothing to do. + ldpp_dout(dpp, 0) << "SourceCR: Bucket is in state Stopped, returning." + << dendl; + drain_all(); + return set_cr_done(); + } + + do { + state = status.state; + gen = status.incremental_gen; + num_shards = status.shards_done_with_gen.size(); + + ldpp_dout(dpp, 5) << "SourceCR: " + << "state=" << state + << ", gen=" << gen + << ", num_shards=" << num_shards + << dendl; + + // Special case to handle full sync. Since full sync no longer + // uses shards and has no generations, we sync shard zero, + // though use the current generation so a following + // incremental sync can carry on. + if (state != BucketSyncState::Incremental) { + ldpp_dout(dpp, 1) << "SourceCR: Calling GenCR with " + << "gen=" << gen + << ", num_shards=" << 1 + << dendl; + yield call(new GenCR(sc, info.bucket, dest, gen, 1, handler)); + } else { + ldpp_dout(dpp, 1) << "SourceCR: Calling GenCR with " + << "gen=" << gen + << ", num_shards=" << num_shards + << dendl; + yield call(new GenCR(sc, info.bucket, dest, gen, num_shards, + handler)); + } + if (retcode < 0) { + ldpp_dout(dpp, -1) << "ERROR: Giving up syncing from " + << sc.source_zone << " retcode=" + << retcode << dendl; + drain_all(); + return set_cr_error(retcode); + } + + yield call(new RGWSimpleRadosReadCR( + dpp, sc.env->async_rados, sc.env->svc->sysobj, + status_obj, &status)); + if (retcode < 0) { + ldpp_dout(dpp, -1) << "ERROR: Unable to fetch status for zone=" + << sc.source_zone << " retcode=" + << retcode << dendl; + drain_all(); + return set_cr_error(retcode); + } + // Repeat until we have done an incremental run and the + // generation remains unchanged. + ldpp_dout(dpp, 5) << "SourceCR: " + << "state=" << state + << ", gen=" << gen + << ", num_shards=" << num_shards + << ", status.state=" << status.state + << ", status.incremental_gen=" << status.incremental_gen + << ", status.shards_done_with_gen.size()=" << status.shards_done_with_gen.size() + << dendl; + } while (state != BucketSyncState::Incremental || + gen != status.incremental_gen); + drain_all(); + return set_cr_done(); + } + return 0; + } +}; +} // namespace rgw::bucket_sync_run + +int RGWBucketPipeSyncStatusManager::run(const DoutPrefixProvider *dpp) +{ + list stacks; + for (auto& source : sources) { + auto stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr); + stack->call(new rgw::bucket_sync_run::SourceCR(source.sc, source.info, + source.dest, source.handler)); + stacks.push_back(stack); + } + auto ret = cr_mgr.run(dpp, stacks); + if (ret < 0) { + ldpp_dout(this, 0) << "ERROR: Sync unsuccessful on bucket " + << bucket_str{dest_bucket} << dendl; + } + return ret; } unsigned RGWBucketPipeSyncStatusManager::get_subsys() const