RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc,
const rgw_bucket_sync_pair_info& _sync_pair,
rgw_bucket_shard_sync_info& _status,
- uint64_t latest_gen,
+ uint64_t gen,
const BucketIndexShardsManager& _marker_mgr,
RGWObjVersionTracker& objv_tracker,
bool exclusive)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
sync_pair(_sync_pair),
- sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair, latest_gen)),
+ sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair, gen)),
status(_status), objv_tracker(objv_tracker), marker_mgr(_marker_mgr), exclusive(exclusive)
{}
rgw_bucket_sync_pair_info pair;
rgw_bucket_shard_sync_info status;
RGWObjVersionTracker objv;
- const uint64_t latest_gen;
+ const uint64_t gen;
const BucketIndexShardsManager& marker_mgr;
public:
InitBucketShardStatusCR(RGWDataSyncCtx* sc,
const rgw_bucket_sync_pair_info& pair,
- uint64_t latest_gen,
+ uint64_t gen,
const BucketIndexShardsManager& marker_mgr)
- : RGWCoroutine(sc->cct), sc(sc), pair(pair), latest_gen(latest_gen), marker_mgr(marker_mgr)
+ : RGWCoroutine(sc->cct), sc(sc), pair(pair), gen(gen), marker_mgr(marker_mgr)
{}
int operate(const DoutPrefixProvider *dpp) {
reenter(this) {
// non exclusive create with empty status
objv.generate_new_write_ver(cct);
- yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, pair, status, latest_gen, marker_mgr, objv, false));
+ yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, pair, status, gen, marker_mgr, objv, false));
if (retcode < 0) {
- assert(retcode != -EEXIST && retcode != -ECANCELED);
return set_cr_error(retcode);
}
return set_cr_done();
static constexpr int max_concurrent_shards = 16;
RGWDataSyncCtx* sc;
rgw_bucket_sync_pair_info sync_pair;
- const uint64_t latest_gen;
+ const uint64_t gen;
const BucketIndexShardsManager& marker_mgr;
const int num_shards;
public:
InitBucketShardStatusCollectCR(RGWDataSyncCtx* sc,
const rgw_bucket_sync_pair_info& sync_pair,
- uint64_t latest_gen,
+ uint64_t gen,
const BucketIndexShardsManager& marker_mgr,
int num_shards)
: RGWShardCollectCR(sc->cct, max_concurrent_shards),
- sc(sc), sync_pair(sync_pair), latest_gen(latest_gen), marker_mgr(marker_mgr), num_shards(num_shards)
+ sc(sc), sync_pair(sync_pair), gen(gen), marker_mgr(marker_mgr), num_shards(num_shards)
{}
bool spawn_next() override {
return false;
}
sync_pair.source_bs.shard_id = shard++;
- spawn(new InitBucketShardStatusCR(sc, sync_pair, latest_gen, marker_mgr), false);
+ spawn(new InitBucketShardStatusCR(sc, sync_pair, gen, marker_mgr), false);
return true;
}
};
public:
RemoveBucketShardStatusCR(RGWDataSyncCtx* sc,
- const rgw_bucket_sync_pair_info& sync_pair, uint64_t latest_gen)
+ const rgw_bucket_sync_pair_info& sync_pair, uint64_t gen)
: RGWCoroutine(sc->cct), sc(sc), sync_env(sc->env),
sync_pair(sync_pair),
obj(sync_env->svc->zone->get_zone_params().log_pool,
- RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair, latest_gen))
+ RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair, gen))
{}
int operate(const DoutPrefixProvider *dpp) override {
RGWDataSyncCtx* const sc;
RGWDataSyncEnv* const sync_env;
rgw_bucket_sync_pair_info sync_pair;
- const uint64_t latest_gen;
+ const uint64_t gen;
const int num_shards;
int shard = 0;
public:
RemoveBucketShardStatusCollectCR(RGWDataSyncCtx* sc,
const rgw_bucket_sync_pair_info& sync_pair,
- uint64_t latest_gen,
+ uint64_t gen,
int num_shards)
: RGWShardCollectCR(sc->cct, max_concurrent_shards),
- sc(sc), sync_env(sc->env), sync_pair(sync_pair), latest_gen(latest_gen), num_shards(num_shards)
+ sc(sc), sync_env(sc->env), sync_pair(sync_pair), gen(gen), num_shards(num_shards)
{}
bool spawn_next() override {
- if (shard >= num_shards || status < 0) { // stop spawning on any errors
+ if (shard >= num_shards) {
return false;
}
sync_pair.source_bs.shard_id = shard++;
- spawn(new RemoveBucketShardStatusCR(sc, sync_pair, latest_gen), false);
+ spawn(new RemoveBucketShardStatusCR(sc, sync_pair, gen), false);
return true;
}
};
if (check_compat) {
// try to convert existing per-shard incremental status for backward compatibility
yield call(new CheckAllBucketShardStatusIsIncremental(sc, sync_pair, num_shards, &all_incremental));
- ldout(cct, 20) << "check for 'all incremental' in compatibility mode" << dendl;
if (retcode < 0) {
return set_cr_error(retcode);
}
bucket_status.state == BucketSyncState::Stopped ||
bucket_stopped) {
// if state is Init or Stopped, we query the remote RGW for ther state
- yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pair.dest_bucket, &info));
+ yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pair.dest_bucket, &info));
if (retcode < 0) {
return set_cr_error(retcode);
}
}
}
+ // if state was incremental, remove all per-shard status objects
+ if (bucket_status.state == BucketSyncState::Incremental) {
+ yield {
+ const auto num_shards = bucket_status.shards_done_with_gen.size();
+ const auto gen = bucket_status.incremental_gen;
+ call(new RemoveBucketShardStatusCollectCR(sc, sync_pair, gen, num_shards));
+ }
+ }
+
// check if local state is "stopped"
yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj,
status_obj, &bucket_status, false, &objv));
return set_cr_error(retcode);
}
}
- yield {
- const int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards;
- call(new RemoveBucketShardStatusCollectCR(sc, sync_pair, info.latest_gen, num_shards));
- }
RELEASE_LOCK(bucket_lease_cr);
return set_cr_done();
- }
+ }
if (bucket_stopped) {
tn->log(20, SSTR("ERROR: switched from 'stop' to 'start' sync. while state is: " << bucket_status.state));
bucket_stopped = false;