return sync_status;
}
-int RGWBucketPipeSyncStatusManager::run(const DoutPrefixProvider *dpp)
-{
- list<RGWCoroutinesStack *> stacks;
+namespace rgw::bucket_sync_run {
+// Retry-loop over calls to sync_bucket_shard_cr
+class ShardCR : public RGWCoroutine {
+ static constexpr auto allowed_retries = 10u;
- struct bk {
- std::vector<rgw_bucket_sync_pair_info> pairs;
- uint64_t latest_gen;
- };
- std::vector<bk> bookkeeping;
+ RGWDataSyncCtx& sc;
+ const rgw_bucket_sync_pair_info& pair;
+ const uint64_t gen;
+ unsigned retries = 0;
- for (auto& s : sources) {
- auto stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
- uint64_t num_shards, latest_gen;
- auto ret = remote_info(dpp, s, nullptr, &latest_gen, &num_shards);
- if (!ret) {
- ldpp_dout(this, 5) << "Unable to get remote info: "
- << ret << dendl;
- return ret;
+ ceph::real_time prev_progress;
+ ceph::real_time progress;
+
+public:
+
+ ShardCR(RGWDataSyncCtx& sc, const rgw_bucket_sync_pair_info& pair,
+ const uint64_t gen)
+ : RGWCoroutine(sc.cct), sc(sc), pair(pair), gen(gen) {}
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+ // Since all errors (except ECANCELED) are considered retryable,
+ // retry other errors so long as we're making progress.
+ for (retries = 0u, retcode = -EDOM;
+ (retries < allowed_retries) && (retcode != 0);
+ ++retries) {
+ ldpp_dout(dpp, 5) << "ShardCR: syncing bucket shard on: "
+ << "zone=" << sc.source_zone
+ << ", bucket=" << pair.source_bs.bucket.name
+ << ", shard=" << pair.source_bs.shard_id
+ << ", gen=" << gen
+ << dendl;
+ yield call(sync_bucket_shard_cr(&sc, nullptr, pair, gen,
+ sc.env->sync_tracer->root_node,
+ &progress));
+
+ if (retcode == -ECANCELED) {
+ ldpp_dout(dpp, -1) << "ERROR: Got -ECANCELED for "
+ << pair.source_bs << dendl;
+ drain_all();
+ return set_cr_error(retcode);
+ } else if (retcode < 0) {
+ ldpp_dout(dpp, 5) << "WARNING: Got error, retcode=" << retcode << " for "
+ << pair.source_bs << "on retry "
+ << retries + 1 << " of " << allowed_retries
+ << " allowed" << dendl;
+ // Reset the retry counter if we made any progress
+ if (progress != prev_progress) {
+ retries = 0;
+ }
+ prev_progress = progress;
+ }
+ }
+ if (retcode < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: Exhausted retries for "
+ << pair.source_bs << " retcode="
+ << retcode << dendl;
+ drain_all();
+ return set_cr_error(retcode);
+ }
+
+ drain_all();
+ return set_cr_done();
}
- bookkeeping.emplace_back();
- auto& cur = bookkeeping.back();
- cur.latest_gen = latest_gen;
- cur.pairs.resize(num_shards);
- for (auto shard = 0u; shard < num_shards; ++shard) {
- auto& pair = cur.pairs[shard];
- pair.handler = s.handler;
- pair.source_bs.bucket = s.info.bucket;
- pair.dest_bucket = s.dest;
+ return 0;
+ }
+};
+
+// Loop over calls to ShardCR with limited concurrency
+class GenCR : public RGWShardCollectCR {
+ static constexpr auto MAX_CONCURRENT_SHARDS = 64;
+
+ RGWDataSyncCtx& sc;
+ const uint64_t gen;
+
+ std::vector<rgw_bucket_sync_pair_info> pairs;
+ decltype(pairs)::const_iterator iter;
+
+public:
+ GenCR(RGWDataSyncCtx& sc, const rgw_bucket& source, const rgw_bucket& dest,
+ const uint64_t gen, const uint64_t shards,
+ const RGWBucketSyncFlowManager::pipe_handler& handler)
+ : RGWShardCollectCR(sc.cct, MAX_CONCURRENT_SHARDS),
+ sc(sc), gen(gen) {
+ pairs.resize(shards);
+ for (auto shard = 0u; shard < shards; ++shard) {
+ auto& pair = pairs[shard];
+ pair.handler = handler;
+ pair.source_bs.bucket = source;
+ pair.dest_bucket = dest;
pair.source_bs.shard_id = shard;
- stack->call(sync_bucket_shard_cr(&s.sc, nullptr, pair,
- cur.latest_gen,
- sync_env.sync_tracer->root_node,
- nullptr));
+ }
+ iter = pairs.cbegin();
+ assert(pairs.size() == shards);
+ }
+ virtual bool spawn_next() override {
+ if (iter == pairs.cend()) {
+ return false;
}
- stacks.push_back(stack);
+ spawn(new ShardCR(sc, *iter, gen), false);
+ ++iter;
+ return true;
}
- int ret = cr_mgr.run(dpp, stacks);
- if (ret < 0) {
- ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
- << bucket_str{dest_bucket} << dendl;
- return ret;
+ int handle_result(int r) override {
+ if (r < 0) {
+ ldpp_dout(sc.env->dpp, 4) << "ERROR: Error syncing shard: "
+ << cpp_strerror(r) << dendl;
+ }
+ return r;
}
+};
- return 0;
+// Read sync status, loop over calls to GenCR
+class SourceCR : public RGWCoroutine {
+ RGWDataSyncCtx& sc;
+ const RGWBucketInfo& info;
+ const rgw_bucket& dest;
+ const RGWBucketSyncFlowManager::pipe_handler& handler;
+ const rgw_raw_obj status_obj{
+ sc.env->svc->zone->get_zone_params().log_pool,
+ RGWBucketPipeSyncStatusManager::full_status_oid(sc.source_zone, info.bucket,
+ dest)};
+
+ BucketSyncState state = BucketSyncState::Incremental;
+ uint64_t gen = 0;
+ uint64_t num_shards = 0;
+ rgw_bucket_sync_status status;
+
+public:
+
+ SourceCR(RGWDataSyncCtx& sc, const RGWBucketInfo& info,
+ const rgw_bucket& dest,
+ const RGWBucketSyncFlowManager::pipe_handler& handler)
+ : RGWCoroutine(sc.cct), sc(sc), info(info), dest(dest), handler(handler) {}
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+ // Get the source's status. In incremental sync, this gives us
+ // the generation and shard count that is next needed to be run.
+ yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_status>(
+ dpp, sc.env->async_rados, sc.env->svc->sysobj,
+ status_obj, &status));
+ if (retcode < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: Unable to fetch status for zone="
+ << sc.source_zone << " retcode="
+ << retcode << dendl;
+ drain_all();
+ return set_cr_error(retcode);
+ }
+
+ if (status.state == BucketSyncState::Stopped) {
+ // Nothing to do.
+ ldpp_dout(dpp, 0) << "SourceCR: Bucket is in state Stopped, returning."
+ << dendl;
+ drain_all();
+ return set_cr_done();
+ }
+
+ do {
+ state = status.state;
+ gen = status.incremental_gen;
+ num_shards = status.shards_done_with_gen.size();
+
+ ldpp_dout(dpp, 5) << "SourceCR: "
+ << "state=" << state
+ << ", gen=" << gen
+ << ", num_shards=" << num_shards
+ << dendl;
+
+ // Special case to handle full sync. Since full sync no longer
+ // uses shards and has no generations, we sync shard zero,
+ // though use the current generation so a following
+ // incremental sync can carry on.
+ if (state != BucketSyncState::Incremental) {
+ ldpp_dout(dpp, 1) << "SourceCR: Calling GenCR with "
+ << "gen=" << gen
+ << ", num_shards=" << 1
+ << dendl;
+ yield call(new GenCR(sc, info.bucket, dest, gen, 1, handler));
+ } else {
+ ldpp_dout(dpp, 1) << "SourceCR: Calling GenCR with "
+ << "gen=" << gen
+ << ", num_shards=" << num_shards
+ << dendl;
+ yield call(new GenCR(sc, info.bucket, dest, gen, num_shards,
+ handler));
+ }
+ if (retcode < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: Giving up syncing from "
+ << sc.source_zone << " retcode="
+ << retcode << dendl;
+ drain_all();
+ return set_cr_error(retcode);
+ }
+
+ yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_status>(
+ dpp, sc.env->async_rados, sc.env->svc->sysobj,
+ status_obj, &status));
+ if (retcode < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: Unable to fetch status for zone="
+ << sc.source_zone << " retcode="
+ << retcode << dendl;
+ drain_all();
+ return set_cr_error(retcode);
+ }
+ // Repeat until we have done an incremental run and the
+ // generation remains unchanged.
+ ldpp_dout(dpp, 5) << "SourceCR: "
+ << "state=" << state
+ << ", gen=" << gen
+ << ", num_shards=" << num_shards
+ << ", status.state=" << status.state
+ << ", status.incremental_gen=" << status.incremental_gen
+ << ", status.shards_done_with_gen.size()=" << status.shards_done_with_gen.size()
+ << dendl;
+ } while (state != BucketSyncState::Incremental ||
+ gen != status.incremental_gen);
+ drain_all();
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+} // namespace rgw::bucket_sync_run
+
+int RGWBucketPipeSyncStatusManager::run(const DoutPrefixProvider *dpp)
+{
+ list<RGWCoroutinesStack *> stacks;
+ for (auto& source : sources) {
+ auto stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
+ stack->call(new rgw::bucket_sync_run::SourceCR(source.sc, source.info,
+ source.dest, source.handler));
+ stacks.push_back(stack);
+ }
+ auto ret = cr_mgr.run(dpp, stacks);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "ERROR: Sync unsuccessful on bucket "
+ << bucket_str{dest_bucket} << dendl;
+ }
+ return ret;
}
unsigned RGWBucketPipeSyncStatusManager::get_subsys() const