int num_shards{0};
int cur_shard{0};
bool again = false;
+ std::uint64_t syncing_gen = 0; // TODO: Fill this in from bucket sync status
+ std::optional<uint64_t> entry_gen;
public:
RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
std::optional<rgw_bucket_shard> _target_bs,
std::optional<rgw_bucket_shard> _source_bs,
const RGWSyncTraceNodeRef& _tn_parent,
+ std::optional<uint64_t> entry_gen,
ceph::real_time* progress);
int operate(const DoutPrefixProvider *dpp) override;
<< ' ' << *state->obligation << dendl;
yield call(new RGWRunBucketSourcesSyncCR(sc, lease_cr,
std::nullopt, /* target_bs */
- state->key, tn, &progress));
+ state->key, tn,
+ state->obligation->gen,
+ &progress));
if (retcode < 0) {
break;
}
std::optional<rgw_bucket_shard> _target_bs,
std::optional<rgw_bucket_shard> _source_bs,
const RGWSyncTraceNodeRef& _tn_parent,
+ std::optional<uint64_t> entry_gen,
ceph::real_time* progress)
: RGWCoroutine(_sc->env->cct), sc(_sc), sync_env(_sc->env),
lease_cr(std::move(lease_cr)), target_bs(_target_bs), source_bs(_source_bs),
- tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
- SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << sc->source_zone))),
- progress(progress)
+ tn(sync_env->sync_tracer->add_node(
+ _tn_parent, "bucket_sync_sources",
+ SSTR( "target=" << target_bucket.value_or(rgw_bucket()) <<
+ ":source_bucket=" << source_bucket.value_or(rgw_bucket()) <<
+ ":source_zone=" << sc->source_zone))),
+ progress(progress),
+ entry_gen(entry_gen)
{
if (target_bs) {
target_bucket = target_bs->bucket;
ldpp_dout(dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl;
source_num_shards = siter->source.get_bucket_info().layout.current_index.layout.normal.num_shards;
- target_num_shards = siter->target.get_bucket_info().layout.current_index.layout.normal.num_shards;
+ if (entry_gen) {
+ if (*entry_gen > syncing_gen) {
+ tn->log(10, "Future generation in datalog entry. Returning error so we'll retry.");
+ return set_cr_error(-EAGAIN);
+ } else if (*entry_gen < syncing_gen) {
+ tn->log(10, "Future generation in datalog entry. Returning error so we'll retry.");
+ return 0;
+ }
+ }
+ target_num_shards = siter->target.get_bucket_info().layout.current_index.layout.normal.num_shards;
if (source_bs) {
sync_pair.source_bs = *source_bs;
} else {