From: Adam C. Emerson Date: Thu, 2 Apr 2026 19:43:27 +0000 (-0400) Subject: rgw/multi: Reliably recover from future generation sync X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e28e7544b73b32fe3fe4f395f6ee3af342064b89;p=ceph.git rgw/multi: Reliably recover from future generation sync Since we can't map shards between generations, if we get a sync entry for a future shard, synthesize an entry on the current generation for all shards. We use the bucket cache to keep track of when we've done this so we don't give ourselves unbounded work when we get bursts of future generation shards. Fixes: https://tracker.ceph.com/issues/75786 Signed-off-by: Adam C. Emerson --- diff --git a/src/rgw/driver/rados/rgw_data_sync.cc b/src/rgw/driver/rados/rgw_data_sync.cc index abe1e611ada..058a30cac54 100644 --- a/src/rgw/driver/rados/rgw_data_sync.cc +++ b/src/rgw/driver/rados/rgw_data_sync.cc @@ -1412,13 +1412,16 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine { rgw_bucket_index_marker_info marker_info; BucketIndexShardsManager marker_mgr; + rgw::bucket_sync::GenHandle gen_state; + public: RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, boost::intrusive_ptr lease_cr, const rgw_bucket_shard& source_bs, const RGWSyncTraceNodeRef& _tn_parent, std::optional gen, - ceph::real_time* progress); + ceph::real_time* progress, + rgw::bucket_sync::GenHandle gen_state); int operate(const DoutPrefixProvider *dpp) override; }; @@ -1427,6 +1430,7 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw::bucket_sync::ShardHandle state; // cached bucket-shard state + rgw::bucket_sync::GenHandle gen_state; // cached bucket state rgw_data_sync_obligation obligation; // input obligation std::optional complete; // obligation to complete uint32_t obligation_counter = 0; @@ -1439,13 +1443,14 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { int sync_status = 0; public: RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw::bucket_sync::ShardHandle state, + rgw::bucket_sync::GenHandle gen_state, rgw_data_sync_obligation _obligation, RGWDataSyncShardMarkerTrack *_marker_tracker, const rgw_raw_obj& error_repo, boost::intrusive_ptr lease_cr, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - state(std::move(state)), obligation(std::move(_obligation)), + state(std::move(state)), gen_state(std::move(gen_state)), obligation(std::move(_obligation)), marker_tracker(_marker_tracker), error_repo(error_repo), lease_cr(std::move(lease_cr)) { set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") " << obligation; @@ -1487,7 +1492,7 @@ public: yield call(new RGWRunBucketSourcesSyncCR(sc, lease_cr, state->key.first, tn, state->obligation->gen, - &progress)); + &progress, gen_state)); if (retcode < 0) { break; } @@ -1642,13 +1647,15 @@ RGWCoroutine* data_sync_single_entry(RGWDataSyncCtx *sc, const rgw_bucket_shard& ceph::real_time timestamp, boost::intrusive_ptr lease_cr, boost::intrusive_ptr bucket_shard_cache, + boost::intrusive_ptr bucket_gen_cache, RGWDataSyncShardMarkerTrack* marker_tracker, rgw_raw_obj error_repo, RGWSyncTraceNodeRef& tn, bool retry) { auto state = bucket_shard_cache->get(src, gen); + auto gen_state = bucket_gen_cache->get(src.bucket.get_key(), gen); auto obligation = rgw_data_sync_obligation{src, gen, marker, timestamp, retry}; - return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation), + return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(gen_state), std::move(obligation), &*marker_tracker, error_repo, lease_cr.get(), tn); } @@ -1675,6 +1682,7 @@ class RGWDataFullSyncSingleEntryCR : public RGWCoroutine { ceph::real_time timestamp; boost::intrusive_ptr lease_cr; boost::intrusive_ptr bucket_shard_cache; + boost::intrusive_ptr bucket_gen_cache; RGWDataSyncShardMarkerTrack* marker_tracker; RGWSyncTraceNodeRef tn; rgw_bucket_index_marker_info remote_info; @@ -1690,11 +1698,12 @@ public: const std::string& _key, const rgw_data_sync_status& _sync_status, const rgw_raw_obj& _error_repo, ceph::real_time _timestamp, boost::intrusive_ptr _lease_cr, boost::intrusive_ptr _bucket_shard_cache, + boost::intrusive_ptr _bucket_gen_cache, RGWDataSyncShardMarkerTrack* _marker_tracker, RGWSyncTraceNodeRef& _tn) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), pool(_pool), source_bs(_source_bs), key(_key), sync_status(_sync_status), error_repo(_error_repo), timestamp(_timestamp), lease_cr(std::move(_lease_cr)), - bucket_shard_cache(_bucket_shard_cache), marker_tracker(_marker_tracker), tn(_tn) { + bucket_shard_cache(_bucket_shard_cache), bucket_gen_cache(_bucket_gen_cache), marker_tracker(_marker_tracker), tn(_tn) { error_inject = (sync_env->cct->_conf->rgw_sync_data_full_inject_err_probability > 0); } @@ -1740,7 +1749,7 @@ public: timestamp), sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), std::nullopt); } else { shard_cr = data_sync_single_entry(sc, source_bs, each->gen, key, timestamp, - lease_cr, bucket_shard_cache, nullptr, error_repo, tn, false); + lease_cr, bucket_shard_cache, bucket_gen_cache, nullptr, error_repo, tn, false); tn->log(10, SSTR("full sync: syncing shard_id " << sid << " of gen " << each->gen)); if (first_shard) { first_shard = false; @@ -1792,6 +1801,7 @@ protected: const rgw_data_sync_status& sync_status; RGWObjVersionTracker& objv; boost::intrusive_ptr bucket_shard_cache; + boost::intrusive_ptr bucket_gen_cache; std::optional marker_tracker; RGWRadosGetOmapValsCR::ResultPtr omapvals; @@ -1818,12 +1828,13 @@ protected: boost::intrusive_ptr lease_cr, const rgw_data_sync_status& sync_status, RGWObjVersionTracker& objv, - const boost::intrusive_ptr& bucket_shard_cache) + const boost::intrusive_ptr& bucket_shard_cache, + const boost::intrusive_ptr& bucket_gen_cache) : RGWCoroutine(_sc->cct), sc(_sc), pool(pool), shard_id(shard_id), sync_marker(sync_marker), tn(tn), status_oid(status_oid), error_repo(error_repo), lease_cr(std::move(lease_cr)), sync_status(sync_status), objv(objv), - bucket_shard_cache(bucket_shard_cache) {} + bucket_shard_cache(bucket_shard_cache), bucket_gen_cache(bucket_gen_cache) {} }; class RGWDataFullSyncShardCR : public RGWDataBaseSyncShardCR { @@ -1846,10 +1857,11 @@ public: const string& status_oid, const rgw_raw_obj& error_repo, boost::intrusive_ptr lease_cr, const rgw_data_sync_status& sync_status, RGWObjVersionTracker& objv, - const boost::intrusive_ptr& bucket_shard_cache) + const boost::intrusive_ptr& bucket_shard_cache, + const boost::intrusive_ptr& bucket_gen_cache) : RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn, status_oid, error_repo, std::move(lease_cr), - sync_status, objv, bucket_shard_cache) {} + sync_status, objv, bucket_shard_cache, bucket_gen_cache) {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { @@ -1905,7 +1917,7 @@ public: yield_spawn_window(new RGWDataFullSyncSingleEntryCR( sc, pool, source_bs, iter->first, sync_status, error_repo, entry_timestamp, lease_cr, - bucket_shard_cache, &*marker_tracker, tn), + bucket_shard_cache, bucket_gen_cache, &*marker_tracker, tn), sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), std::nullopt); } @@ -2000,11 +2012,12 @@ public: boost::intrusive_ptr lease_cr, const rgw_data_sync_status& sync_status, RGWObjVersionTracker& objv, const boost::intrusive_ptr& bucket_shard_cache, + const boost::intrusive_ptr& bucket_gen_cache, ceph::mutex& inc_lock, bc::flat_set& modified_shards) : RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn, status_oid, error_repo, std::move(lease_cr), - sync_status, objv, bucket_shard_cache), + sync_status, objv, bucket_shard_cache, bucket_gen_cache), inc_lock(inc_lock), modified_shards(modified_shards) {} int operate(const DoutPrefixProvider *dpp) override { @@ -2052,7 +2065,7 @@ public: << modified_iter->key)); spawn(data_sync_single_entry(sc, source_bs, modified_iter->gen, {}, ceph::real_time{}, lease_cr, - bucket_shard_cache, &*marker_tracker, + bucket_shard_cache, bucket_gen_cache, &*marker_tracker, error_repo, tn, false), false); } @@ -2098,7 +2111,7 @@ public: << " timestamp=" << entry_timestamp)); spawn(data_sync_single_entry(sc, source_bs, gen, "", entry_timestamp, lease_cr, - bucket_shard_cache, &*marker_tracker, + bucket_shard_cache, bucket_gen_cache, &*marker_tracker, error_repo, tn, true), false); } } @@ -2152,7 +2165,7 @@ public: } else { tn->log(1, SSTR("incremental sync on " << log_iter->entry.key << "shard: " << shard_id << "on gen " << log_iter->entry.gen)); yield_spawn_window(data_sync_single_entry(sc, source_bs, log_iter->entry.gen, log_iter->log_id, - log_iter->log_timestamp, lease_cr,bucket_shard_cache, + log_iter->log_timestamp, lease_cr,bucket_shard_cache, bucket_gen_cache, &*marker_tracker, error_repo, tn, false), sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), [&](uint64_t stack_id, int ret) { @@ -2221,8 +2234,10 @@ class RGWDataSyncShardCR : public RGWCoroutine { // target number of entries to cache before recycling idle ones static constexpr size_t target_cache_size = 256; - boost::intrusive_ptr bucket_shard_cache { - rgw::bucket_sync::ShardCache::create(target_cache_size) }; + boost::intrusive_ptr bucket_shard_cache{ + rgw::bucket_sync::ShardCache::create(target_cache_size)}; + boost::intrusive_ptr bucket_gen_cache{ + rgw::bucket_sync::GenCache::create(target_cache_size)}; boost::intrusive_ptr lease_cr; boost::intrusive_ptr lease_stack; @@ -2290,7 +2305,7 @@ public: sync_marker, tn, status_oid, error_repo, lease_cr, sync_status, - objv, bucket_shard_cache)); + objv, bucket_shard_cache, bucket_gen_cache)); if (retcode < 0) { if (retcode != -EBUSY) { tn->log(10, SSTR("full sync failed (retcode=" << retcode << ")")); @@ -2304,7 +2319,7 @@ public: sync_marker, tn, status_oid, error_repo, lease_cr, sync_status, - objv, bucket_shard_cache, + objv, bucket_shard_cache, bucket_gen_cache, inc_lock, modified_shards)); if (retcode < 0) { if (retcode != -EBUSY) { @@ -5326,6 +5341,7 @@ static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc, std::optional gen, const RGWSyncTraceNodeRef& tn, ceph::real_time* progress, + ceph::coarse_mono_time& last_future_generation_recovery, bool no_lease = false); RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, @@ -5333,14 +5349,16 @@ RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& source_bs, const RGWSyncTraceNodeRef& _tn_parent, std::optional gen, - ceph::real_time* progress) + ceph::real_time* progress, + rgw::bucket_sync::GenHandle gen_state) : RGWCoroutine(_sc->env->cct), sc(_sc), sync_env(_sc->env), lease_cr(std::move(lease_cr)), tn(sync_env->sync_tracer->add_node( _tn_parent, "bucket_sync_sources", SSTR( "source=" << source_bs << ":source_zone=" << sc->source_zone))), progress(progress), - gen(gen) + gen(gen), + gen_state(std::move(gen_state)) { sync_pair.source_bs = source_bs; } @@ -5375,7 +5393,7 @@ int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp) yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair, gen, tn, &*cur_shard_progress, - false), + gen_state->last_future_generation_recovery, false), sc->lcc.adj_concurrency(cct->_conf->rgw_bucket_sync_spawn_window), [&](uint64_t stack_id, int ret) { if (ret < 0) { @@ -5774,16 +5792,21 @@ class RGWSyncBucketCR : public RGWCoroutine { rgw_bucket_shard source_bs; rgw_pool pool; uint64_t current_gen = 0; + // In general operation, a reference to a pinned entry in `bucket_gen_cache` + ceph::coarse_mono_time& last_future_generation_recovery; RGWSyncTraceNodeRef tn; + static constexpr std::chrono::seconds throttle_future_recovery = std::chrono::hours(1); + public: RGWSyncBucketCR(RGWDataSyncCtx *_sc, boost::intrusive_ptr lease_cr, - const rgw_bucket_sync_pair_info& _sync_pair, + const rgw_bucket_sync_pair_info &_sync_pair, std::optional gen, const RGWSyncTraceNodeRef& _tn_parent, ceph::real_time* progress, + ceph::coarse_mono_time& last_future_generation_recovery, bool no_lease = false) : RGWCoroutine(_sc->cct), sc(_sc), env(_sc->env), data_lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), @@ -5793,7 +5816,7 @@ public: RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone, sync_pair.source_bs.bucket, sync_pair.dest_bucket)), - no_lease(no_lease), + no_lease(no_lease), last_future_generation_recovery(last_future_generation_recovery), tn(env->sync_tracer->add_node(_tn_parent, "bucket", SSTR(bucket_str{_sync_pair.dest_bucket} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) { } @@ -5807,10 +5830,11 @@ static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc, std::optional gen, const RGWSyncTraceNodeRef& tn, ceph::real_time* progress, + ceph::coarse_mono_time& last_future_generation_recovery, bool no_lease) { return new RGWSyncBucketCR(sc, std::move(lease), sync_pair, - gen, tn, progress, no_lease); + gen, tn, progress, last_future_generation_recovery, no_lease); } #define RELEASE_LOCK(cr) \ @@ -6009,29 +6033,46 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) if (*gen > current_gen) { /* In case the data log entry is missing for previous gen, it may * not be marked complete and the sync can get stuck. To avoid it, - * may be we can add this (shardid, gen) to error repo to force - * sync and mark that shard as completed. + * may be we can add an entry for every shard in the previous generation. */ pool = sc->env->svc->zone->get_zone_params().log_pool; - if ((static_cast(source_bs.shard_id) < bucket_status.shards_done_with_gen.size()) && - !bucket_status.shards_done_with_gen[source_bs.shard_id]) { + if ((ceph::coarse_mono_clock::now() - last_future_generation_recovery) > throttle_future_recovery) { + last_future_generation_recovery = ceph::coarse_mono_clock::now(); // use the error repo and sync status timestamp from the datalog shard corresponding to source_bs - error_repo = datalog_oid_for_error_repo(sc, sc->env->driver, - pool, source_bs); - yield call(rgw::error_repo::write_cr(sc->env->driver->getRados()->get_rados_handle(), error_repo, - rgw::error_repo::encode_key(source_bs, current_gen), - ceph::real_clock::zero())); - if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to log prev gen entry (bucket=" << source_bs.bucket << ", shard_id=" << source_bs.shard_id << ", gen=" << current_gen << " in error repo: retcode=" << retcode)); - } else { - tn->log(20, SSTR("logged prev gen entry (bucket=" << source_bs.bucket << ", shard_id=" << source_bs.shard_id << ", gen=" << current_gen << " in error repo: retcode=" << retcode)); + for (source_bs.shard_id = 0; + source_bs.shard_id < std::ssize(bucket_status.shards_done_with_gen); + ++source_bs.shard_id) { + error_repo = datalog_oid_for_error_repo(sc, sc->env->driver, + pool, source_bs); + tn->log(10, SSTR("writing shard_id " << source_bs.shard_id << " of gen " << current_gen << " to error repo for retry")); + yield_spawn_window( + rgw::error_repo::write_cr( + sc->env->driver->getRados()->get_rados_handle(), + error_repo, + rgw::error_repo::encode_key(source_bs, current_gen), + ceph::real_clock::zero()), + sc->lcc.adj_concurrency( + cct->_conf->rgw_data_sync_spawn_window), + [&](uint64_t stack_id, int ret) { + if (ret < 0) { + retcode = ret; + } + return 0; + }); } + drain_all_cb([&](uint64_t stack_id, int ret) { + if (ret < 0) { + tn->log(10, + SSTR("writing to error repo returned error: " << ret)); + } + return ret; + }); } - retcode = -EAGAIN; - tn->log(10, SSTR("ERROR: requested sync of future generation " - << *gen << " > " << current_gen - << ", returning " << retcode << " for later retry")); - return set_cr_error(retcode); + retcode = -EAGAIN; + tn->log(10, SSTR("ERROR: requested sync of future generation " + << *gen << " > " << current_gen + << ", returning " << retcode << " for later retry")); + return set_cr_error(retcode); } else if (*gen < current_gen) { tn->log(10, SSTR("WARNING: requested sync of past generation " << *gen << " < " << current_gen @@ -6274,11 +6315,14 @@ class ShardCR : public RGWCoroutine { ceph::real_time prev_progress; ceph::real_time progress; -public: + ceph::coarse_mono_time& last_future_generation_recovery; - ShardCR(RGWDataSyncCtx& sc, const rgw_bucket_sync_pair_info& pair, - const uint64_t gen) - : RGWCoroutine(sc.cct), sc(sc), pair(pair), gen(gen) {} +public: + ShardCR(RGWDataSyncCtx &sc, const rgw_bucket_sync_pair_info &pair, + const uint64_t gen, + ceph::coarse_mono_time &last_future_generation_recovery) + : RGWCoroutine(sc.cct), sc(sc), pair(pair), gen(gen), + last_future_generation_recovery( last_future_generation_recovery) {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { @@ -6295,6 +6339,7 @@ public: yield call(sync_bucket_shard_cr(&sc, nullptr, pair, gen, sc.env->sync_tracer->root_node, &progress, + last_future_generation_recovery, true /* no_lease: bucket sync run skips lock acquisition so it is never blocked by a background sync process*/)); @@ -6341,6 +6386,10 @@ class GenCR : public RGWShardCollectCR { std::vector pairs; decltype(pairs)::const_iterator iter; + // We do a manual sync when we're told to do a manual sync. No need + // for a cache. + ceph::coarse_mono_time last_future_generation_recovery = ceph::coarse_mono_clock::zero(); + public: GenCR(RGWDataSyncCtx& sc, const rgw_bucket& source, const rgw_bucket& dest, const uint64_t gen, const uint64_t shards, @@ -6363,7 +6412,7 @@ public: if (iter == pairs.cend()) { return false; } - spawn(new ShardCR(sc, *iter, gen), false); + spawn(new ShardCR(sc, *iter, gen, last_future_generation_recovery), false); ++iter; return true; }