rgw_bucket_index_marker_info marker_info;
BucketIndexShardsManager marker_mgr;
+ rgw::bucket_sync::GenHandle gen_state;
+
public:
RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
const rgw_bucket_shard& source_bs,
const RGWSyncTraceNodeRef& _tn_parent,
std::optional<uint64_t> gen,
- ceph::real_time* progress);
+ ceph::real_time* progress,
+ rgw::bucket_sync::GenHandle gen_state);
int operate(const DoutPrefixProvider *dpp) override;
};
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw::bucket_sync::ShardHandle state; // cached bucket-shard state
+ rgw::bucket_sync::GenHandle gen_state; // cached bucket state
rgw_data_sync_obligation obligation; // input obligation
std::optional<rgw_data_sync_obligation> complete; // obligation to complete
uint32_t obligation_counter = 0;
int sync_status = 0;
public:
RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw::bucket_sync::ShardHandle state,
+ rgw::bucket_sync::GenHandle gen_state,
rgw_data_sync_obligation _obligation,
RGWDataSyncShardMarkerTrack *_marker_tracker,
const rgw_raw_obj& error_repo,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
const RGWSyncTraceNodeRef& _tn_parent)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- state(std::move(state)), obligation(std::move(_obligation)),
+ state(std::move(state)), gen_state(std::move(gen_state)), obligation(std::move(_obligation)),
marker_tracker(_marker_tracker), error_repo(error_repo),
lease_cr(std::move(lease_cr)) {
set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") " << obligation;
yield call(new RGWRunBucketSourcesSyncCR(sc, lease_cr,
state->key.first, tn,
state->obligation->gen,
- &progress));
+ &progress, gen_state));
if (retcode < 0) {
break;
}
ceph::real_time timestamp,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
boost::intrusive_ptr<rgw::bucket_sync::ShardCache> bucket_shard_cache,
+ boost::intrusive_ptr<rgw::bucket_sync::GenCache> bucket_gen_cache,
RGWDataSyncShardMarkerTrack* marker_tracker,
rgw_raw_obj error_repo,
RGWSyncTraceNodeRef& tn,
bool retry) {
auto state = bucket_shard_cache->get(src, gen);
+ auto gen_state = bucket_gen_cache->get(src.bucket.get_key(), gen);
auto obligation = rgw_data_sync_obligation{src, gen, marker, timestamp, retry};
- return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation),
+ return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(gen_state), std::move(obligation),
&*marker_tracker, error_repo,
lease_cr.get(), tn);
}
ceph::real_time timestamp;
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
boost::intrusive_ptr<rgw::bucket_sync::ShardCache> bucket_shard_cache;
+ boost::intrusive_ptr<rgw::bucket_sync::GenCache> bucket_gen_cache;
RGWDataSyncShardMarkerTrack* marker_tracker;
RGWSyncTraceNodeRef tn;
rgw_bucket_index_marker_info remote_info;
const std::string& _key, const rgw_data_sync_status& _sync_status, const rgw_raw_obj& _error_repo,
ceph::real_time _timestamp, boost::intrusive_ptr<const RGWContinuousLeaseCR> _lease_cr,
boost::intrusive_ptr<rgw::bucket_sync::ShardCache> _bucket_shard_cache,
+ boost::intrusive_ptr<rgw::bucket_sync::GenCache> _bucket_gen_cache,
RGWDataSyncShardMarkerTrack* _marker_tracker,
RGWSyncTraceNodeRef& _tn)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), pool(_pool), source_bs(_source_bs), key(_key),
sync_status(_sync_status), error_repo(_error_repo), timestamp(_timestamp), lease_cr(std::move(_lease_cr)),
- bucket_shard_cache(_bucket_shard_cache), marker_tracker(_marker_tracker), tn(_tn) {
+ bucket_shard_cache(_bucket_shard_cache), bucket_gen_cache(_bucket_gen_cache), marker_tracker(_marker_tracker), tn(_tn) {
error_inject = (sync_env->cct->_conf->rgw_sync_data_full_inject_err_probability > 0);
}
timestamp), sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), std::nullopt);
} else {
shard_cr = data_sync_single_entry(sc, source_bs, each->gen, key, timestamp,
- lease_cr, bucket_shard_cache, nullptr, error_repo, tn, false);
+ lease_cr, bucket_shard_cache, bucket_gen_cache, nullptr, error_repo, tn, false);
tn->log(10, SSTR("full sync: syncing shard_id " << sid << " of gen " << each->gen));
if (first_shard) {
first_shard = false;
const rgw_data_sync_status& sync_status;
RGWObjVersionTracker& objv;
boost::intrusive_ptr<rgw::bucket_sync::ShardCache> bucket_shard_cache;
+ boost::intrusive_ptr<rgw::bucket_sync::GenCache> bucket_gen_cache;
std::optional<RGWDataSyncShardMarkerTrack> marker_tracker;
RGWRadosGetOmapValsCR::ResultPtr omapvals;
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
const rgw_data_sync_status& sync_status,
RGWObjVersionTracker& objv,
- const boost::intrusive_ptr<rgw::bucket_sync::ShardCache>& bucket_shard_cache)
+ const boost::intrusive_ptr<rgw::bucket_sync::ShardCache>& bucket_shard_cache,
+ const boost::intrusive_ptr<rgw::bucket_sync::GenCache>& bucket_gen_cache)
: RGWCoroutine(_sc->cct), sc(_sc), pool(pool), shard_id(shard_id),
sync_marker(sync_marker), tn(tn), status_oid(status_oid),
error_repo(error_repo), lease_cr(std::move(lease_cr)),
sync_status(sync_status), objv(objv),
- bucket_shard_cache(bucket_shard_cache) {}
+ bucket_shard_cache(bucket_shard_cache), bucket_gen_cache(bucket_gen_cache) {}
};
class RGWDataFullSyncShardCR : public RGWDataBaseSyncShardCR {
const string& status_oid, const rgw_raw_obj& error_repo,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
const rgw_data_sync_status& sync_status, RGWObjVersionTracker& objv,
- const boost::intrusive_ptr<rgw::bucket_sync::ShardCache>& bucket_shard_cache)
+ const boost::intrusive_ptr<rgw::bucket_sync::ShardCache>& bucket_shard_cache,
+ const boost::intrusive_ptr<rgw::bucket_sync::GenCache>& bucket_gen_cache)
: RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn,
status_oid, error_repo, std::move(lease_cr),
- sync_status, objv, bucket_shard_cache) {}
+ sync_status, objv, bucket_shard_cache, bucket_gen_cache) {}
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
yield_spawn_window(new RGWDataFullSyncSingleEntryCR(
sc, pool, source_bs, iter->first, sync_status,
error_repo, entry_timestamp, lease_cr,
- bucket_shard_cache, &*marker_tracker, tn),
+ bucket_shard_cache, bucket_gen_cache, &*marker_tracker, tn),
sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window),
std::nullopt);
}
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
const rgw_data_sync_status& sync_status, RGWObjVersionTracker& objv,
const boost::intrusive_ptr<rgw::bucket_sync::ShardCache>& bucket_shard_cache,
+ const boost::intrusive_ptr<rgw::bucket_sync::GenCache>& bucket_gen_cache,
ceph::mutex& inc_lock,
bc::flat_set<rgw_data_notify_entry>& modified_shards)
: RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn,
status_oid, error_repo, std::move(lease_cr),
- sync_status, objv, bucket_shard_cache),
+ sync_status, objv, bucket_shard_cache, bucket_gen_cache),
inc_lock(inc_lock), modified_shards(modified_shards) {}
int operate(const DoutPrefixProvider *dpp) override {
<< modified_iter->key));
spawn(data_sync_single_entry(sc, source_bs, modified_iter->gen, {},
ceph::real_time{}, lease_cr,
- bucket_shard_cache, &*marker_tracker,
+ bucket_shard_cache, bucket_gen_cache, &*marker_tracker,
error_repo, tn, false), false);
}
<< " timestamp=" << entry_timestamp));
spawn(data_sync_single_entry(sc, source_bs, gen, "",
entry_timestamp, lease_cr,
- bucket_shard_cache, &*marker_tracker,
+ bucket_shard_cache, bucket_gen_cache, &*marker_tracker,
error_repo, tn, true), false);
}
}
} else {
tn->log(1, SSTR("incremental sync on " << log_iter->entry.key << "shard: " << shard_id << "on gen " << log_iter->entry.gen));
yield_spawn_window(data_sync_single_entry(sc, source_bs, log_iter->entry.gen, log_iter->log_id,
- log_iter->log_timestamp, lease_cr,bucket_shard_cache,
+ log_iter->log_timestamp, lease_cr,bucket_shard_cache, bucket_gen_cache,
&*marker_tracker, error_repo, tn, false),
sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window),
[&](uint64_t stack_id, int ret) {
// target number of entries to cache before recycling idle ones
static constexpr size_t target_cache_size = 256;
- boost::intrusive_ptr<rgw::bucket_sync::ShardCache> bucket_shard_cache {
- rgw::bucket_sync::ShardCache::create(target_cache_size) };
+ boost::intrusive_ptr<rgw::bucket_sync::ShardCache> bucket_shard_cache{
+ rgw::bucket_sync::ShardCache::create(target_cache_size)};
+ boost::intrusive_ptr<rgw::bucket_sync::GenCache> bucket_gen_cache{
+ rgw::bucket_sync::GenCache::create(target_cache_size)};
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
sync_marker, tn,
status_oid, error_repo,
lease_cr, sync_status,
- objv, bucket_shard_cache));
+ objv, bucket_shard_cache, bucket_gen_cache));
if (retcode < 0) {
if (retcode != -EBUSY) {
tn->log(10, SSTR("full sync failed (retcode=" << retcode << ")"));
sync_marker, tn,
status_oid, error_repo,
lease_cr, sync_status,
- objv, bucket_shard_cache,
+ objv, bucket_shard_cache, bucket_gen_cache,
inc_lock, modified_shards));
if (retcode < 0) {
if (retcode != -EBUSY) {
std::optional<uint64_t> gen,
const RGWSyncTraceNodeRef& tn,
ceph::real_time* progress,
+ ceph::coarse_mono_time& last_future_generation_recovery,
bool no_lease = false);
RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
const rgw_bucket_shard& source_bs,
const RGWSyncTraceNodeRef& _tn_parent,
std::optional<uint64_t> gen,
- ceph::real_time* progress)
+ ceph::real_time* progress,
+ rgw::bucket_sync::GenHandle gen_state)
: RGWCoroutine(_sc->env->cct), sc(_sc), sync_env(_sc->env),
lease_cr(std::move(lease_cr)),
tn(sync_env->sync_tracer->add_node(
_tn_parent, "bucket_sync_sources",
SSTR( "source=" << source_bs << ":source_zone=" << sc->source_zone))),
progress(progress),
- gen(gen)
+ gen(gen),
+ gen_state(std::move(gen_state))
{
sync_pair.source_bs = source_bs;
}
yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair,
gen, tn, &*cur_shard_progress,
- false),
+ gen_state->last_future_generation_recovery, false),
sc->lcc.adj_concurrency(cct->_conf->rgw_bucket_sync_spawn_window),
[&](uint64_t stack_id, int ret) {
if (ret < 0) {
rgw_bucket_shard source_bs;
rgw_pool pool;
uint64_t current_gen = 0;
+ // In general operation, a reference to a pinned entry in `bucket_gen_cache`
+ ceph::coarse_mono_time& last_future_generation_recovery;
RGWSyncTraceNodeRef tn;
+ static constexpr std::chrono::seconds throttle_future_recovery = std::chrono::hours(1);
+
public:
RGWSyncBucketCR(RGWDataSyncCtx *_sc,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
- const rgw_bucket_sync_pair_info& _sync_pair,
+ const rgw_bucket_sync_pair_info &_sync_pair,
std::optional<uint64_t> gen,
const RGWSyncTraceNodeRef& _tn_parent,
ceph::real_time* progress,
+ ceph::coarse_mono_time& last_future_generation_recovery,
bool no_lease = false)
: RGWCoroutine(_sc->cct), sc(_sc), env(_sc->env),
data_lease_cr(std::move(lease_cr)), sync_pair(_sync_pair),
RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
sync_pair.source_bs.bucket,
sync_pair.dest_bucket)),
- no_lease(no_lease),
+ no_lease(no_lease), last_future_generation_recovery(last_future_generation_recovery),
tn(env->sync_tracer->add_node(_tn_parent, "bucket",
SSTR(bucket_str{_sync_pair.dest_bucket} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) {
}
std::optional<uint64_t> gen,
const RGWSyncTraceNodeRef& tn,
ceph::real_time* progress,
+ ceph::coarse_mono_time& last_future_generation_recovery,
bool no_lease)
{
return new RGWSyncBucketCR(sc, std::move(lease), sync_pair,
- gen, tn, progress, no_lease);
+ gen, tn, progress, last_future_generation_recovery, no_lease);
}
#define RELEASE_LOCK(cr) \
if (*gen > current_gen) {
/* In case the data log entry is missing for previous gen, it may
* not be marked complete and the sync can get stuck. To avoid it,
- * may be we can add this (shardid, gen) to error repo to force
- * sync and mark that shard as completed.
+ * may be we can add an entry for every shard in the previous generation.
*/
pool = sc->env->svc->zone->get_zone_params().log_pool;
- if ((static_cast<std::size_t>(source_bs.shard_id) < bucket_status.shards_done_with_gen.size()) &&
- !bucket_status.shards_done_with_gen[source_bs.shard_id]) {
+ if ((ceph::coarse_mono_clock::now() - last_future_generation_recovery) > throttle_future_recovery) {
+ last_future_generation_recovery = ceph::coarse_mono_clock::now();
// use the error repo and sync status timestamp from the datalog shard corresponding to source_bs
- error_repo = datalog_oid_for_error_repo(sc, sc->env->driver,
- pool, source_bs);
- yield call(rgw::error_repo::write_cr(sc->env->driver->getRados()->get_rados_handle(), error_repo,
- rgw::error_repo::encode_key(source_bs, current_gen),
- ceph::real_clock::zero()));
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to log prev gen entry (bucket=" << source_bs.bucket << ", shard_id=" << source_bs.shard_id << ", gen=" << current_gen << " in error repo: retcode=" << retcode));
- } else {
- tn->log(20, SSTR("logged prev gen entry (bucket=" << source_bs.bucket << ", shard_id=" << source_bs.shard_id << ", gen=" << current_gen << " in error repo: retcode=" << retcode));
+ for (source_bs.shard_id = 0;
+ source_bs.shard_id < std::ssize(bucket_status.shards_done_with_gen);
+ ++source_bs.shard_id) {
+ error_repo = datalog_oid_for_error_repo(sc, sc->env->driver,
+ pool, source_bs);
+ tn->log(10, SSTR("writing shard_id " << source_bs.shard_id << " of gen " << current_gen << " to error repo for retry"));
+ yield_spawn_window(
+ rgw::error_repo::write_cr(
+ sc->env->driver->getRados()->get_rados_handle(),
+ error_repo,
+ rgw::error_repo::encode_key(source_bs, current_gen),
+ ceph::real_clock::zero()),
+ sc->lcc.adj_concurrency(
+ cct->_conf->rgw_data_sync_spawn_window),
+ [&](uint64_t stack_id, int ret) {
+ if (ret < 0) {
+ retcode = ret;
+ }
+ return 0;
+ });
}
+ drain_all_cb([&](uint64_t stack_id, int ret) {
+ if (ret < 0) {
+ tn->log(10,
+ SSTR("writing to error repo returned error: " << ret));
+ }
+ return ret;
+ });
}
- retcode = -EAGAIN;
- tn->log(10, SSTR("ERROR: requested sync of future generation "
- << *gen << " > " << current_gen
- << ", returning " << retcode << " for later retry"));
- return set_cr_error(retcode);
+ retcode = -EAGAIN;
+ tn->log(10, SSTR("ERROR: requested sync of future generation "
+ << *gen << " > " << current_gen
+ << ", returning " << retcode << " for later retry"));
+ return set_cr_error(retcode);
} else if (*gen < current_gen) {
tn->log(10, SSTR("WARNING: requested sync of past generation "
<< *gen << " < " << current_gen
ceph::real_time prev_progress;
ceph::real_time progress;
-public:
+ ceph::coarse_mono_time& last_future_generation_recovery;
- ShardCR(RGWDataSyncCtx& sc, const rgw_bucket_sync_pair_info& pair,
- const uint64_t gen)
- : RGWCoroutine(sc.cct), sc(sc), pair(pair), gen(gen) {}
+public:
+ ShardCR(RGWDataSyncCtx &sc, const rgw_bucket_sync_pair_info &pair,
+ const uint64_t gen,
+ ceph::coarse_mono_time &last_future_generation_recovery)
+ : RGWCoroutine(sc.cct), sc(sc), pair(pair), gen(gen),
+ last_future_generation_recovery( last_future_generation_recovery) {}
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
yield call(sync_bucket_shard_cr(&sc, nullptr, pair, gen,
sc.env->sync_tracer->root_node,
&progress,
+ last_future_generation_recovery,
true /* no_lease: bucket sync run skips
lock acquisition so it is never
blocked by a background sync process*/));
std::vector<rgw_bucket_sync_pair_info> pairs;
decltype(pairs)::const_iterator iter;
+ // We do a manual sync when we're told to do a manual sync. No need
+ // for a cache.
+ ceph::coarse_mono_time last_future_generation_recovery = ceph::coarse_mono_clock::zero();
+
public:
GenCR(RGWDataSyncCtx& sc, const rgw_bucket& source, const rgw_bucket& dest,
const uint64_t gen, const uint64_t shards,
if (iter == pairs.cend()) {
return false;
}
- spawn(new ShardCR(sc, *iter, gen), false);
+ spawn(new ShardCR(sc, *iter, gen, last_future_generation_recovery), false);
++iter;
return true;
}