bool exclusive)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
sync_pair(_sync_pair),
- sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair)),
+ sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair, _info.latest_gen)),
status(_status), objv_tracker(objv_tracker), info(_info), marker_mgr(_marker_mgr), exclusive(exclusive)
{}
RGWReadBucketPipeSyncStatusCoroutine(RGWDataSyncCtx *_sc,
const rgw_bucket_sync_pair_info& sync_pair,
rgw_bucket_shard_sync_info *_status,
- RGWObjVersionTracker* objv_tracker)
+ RGWObjVersionTracker* objv_tracker,
+ uint64_t gen)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)),
+ oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair, gen)),
status(_status), objv_tracker(objv_tracker)
{}
int operate(const DoutPrefixProvider *dpp) override;
CheckBucketShardStatusIsIncremental(RGWDataSyncCtx* sc,
const rgw_bucket_sync_pair_info& sync_pair,
bool* result)
- : RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &status, nullptr),
+ : RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &status, nullptr, 0 /*no gen in compat mode*/),
result(result)
{}
while (--tries) {
objv.clear();
// read current status and objv
- yield call(new ReadCR(sc, pair, &status, &objv));
+ yield call(new ReadCR(sc, pair, &status, &objv, info.latest_gen));
if (retcode < 0) {
return set_cr_error(retcode);
}
return nullptr;
}
- return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pairs[num], sync_status, nullptr);
+ return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pairs[num], sync_status, nullptr, full_status.incremental_gen);
}
RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RadosStore* _store,
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
lease_cr(std::move(lease_cr)), sync_pair(_sync_pair),
sync_pipe(sync_pipe), bucket_state(bucket_state), generation(generation), progress(progress),
- shard_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)),
+ shard_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair, generation)),
bucket_status_obj(sc->env->svc->zone->get_zone_params().log_pool,
RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
sync_pair.source_bs.bucket,
int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp)
{
reenter(this) {
- yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker));
+ yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker, generation));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, "ERROR: failed to read sync status for bucket");
return set_cr_error(retcode);
}
}
+inline std::string generation_token(uint64_t gen) {
+ return (gen == 0) ? "" : (":" + std::to_string(gen));
+}
+
string RGWBucketPipeSyncStatusManager::inc_status_oid(const rgw_zone_id& source_zone,
- const rgw_bucket_sync_pair_info& sync_pair)
+ const rgw_bucket_sync_pair_info& sync_pair,
+ uint64_t gen)
{
if (sync_pair.source_bs.bucket == sync_pair.dest_bucket) {
- return bucket_status_oid_prefix + "." + source_zone.id + ":" + sync_pair.source_bs.get_key();
+ return bucket_status_oid_prefix + "." + source_zone.id + ":" + sync_pair.source_bs.get_key() +
+ generation_token(gen);
} else {
- return bucket_status_oid_prefix + "." + source_zone.id + ":" + sync_pair.dest_bucket.get_key() + ":" + sync_pair.source_bs.get_key();
+ return bucket_status_oid_prefix + "." + source_zone.id + ":" + sync_pair.dest_bucket.get_key() + ":" + sync_pair.source_bs.get_key() +
+ generation_token(gen);
}
}
rgw::sal::RadosStore* const store;
RGWDataSyncCtx *const sc;
RGWDataSyncEnv *const env;
+ const uint64_t gen;
+
rgw_bucket_sync_pair_info sync_pair;
using Vector = std::vector<rgw_bucket_shard_sync_info>;
Vector::iterator i, end;
RGWCollectBucketSyncStatusCR(rgw::sal::RadosStore* store, RGWDataSyncCtx *sc,
const RGWBucketInfo& source_bucket_info,
const RGWBucketInfo& dest_bucket_info,
+ uint64_t gen,
Vector *status)
: RGWShardCollectCR(sc->cct, max_concurrent_shards),
- store(store), sc(sc), env(sc->env),
+ store(store), sc(sc), env(sc->env), gen(gen),
i(status->begin()), end(status->end())
{
sync_pair.source_bs = rgw_bucket_shard(source_bucket_info.bucket, source_bucket_info.layout.current_index.layout.normal.num_shards > 0 ? 0 : -1);
if (i == end) {
return false;
}
- spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i, nullptr), false);
+ spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i, nullptr, gen), false);
++i;
++sync_pair.source_bs.shard_id;
return true;
const rgw_sync_bucket_pipe& pipe,
const RGWBucketInfo& dest_bucket_info,
const RGWBucketInfo *psource_bucket_info,
+ uint64_t gen,
std::vector<rgw_bucket_shard_sync_info> *status)
{
if (!pipe.source.zone ||
return crs.run(dpp, new RGWCollectBucketSyncStatusCR(store, &sc,
*psource_bucket_info,
dest_bucket_info,
+ gen,
status));
}