int num_shards{0};
int cur_shard{0};
bool again = false;
- std::uint64_t syncing_gen = 0; // TODO: Fill this in from bucket sync status
- std::optional<uint64_t> entry_gen;
+ std::optional<uint64_t> gen;
public:
RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
std::optional<rgw_bucket_shard> _target_bs,
std::optional<rgw_bucket_shard> _source_bs,
const RGWSyncTraceNodeRef& _tn_parent,
- std::optional<uint64_t> entry_gen,
+ std::optional<uint64_t> gen,
ceph::real_time* progress);
int operate(const DoutPrefixProvider *dpp) override;
static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease,
const rgw_bucket_sync_pair_info& sync_pair,
+ std::optional<uint64_t> gen,
const RGWSyncTraceNodeRef& tn,
ceph::real_time* progress);
std::optional<rgw_bucket_shard> _target_bs,
std::optional<rgw_bucket_shard> _source_bs,
const RGWSyncTraceNodeRef& _tn_parent,
- std::optional<uint64_t> entry_gen,
+ std::optional<uint64_t> gen,
ceph::real_time* progress)
: RGWCoroutine(_sc->env->cct), sc(_sc), sync_env(_sc->env),
lease_cr(std::move(lease_cr)), target_bs(_target_bs), source_bs(_source_bs),
":source_bucket=" << source_bucket.value_or(rgw_bucket()) <<
":source_zone=" << sc->source_zone))),
progress(progress),
- entry_gen(entry_gen)
+ gen(gen)
{
if (target_bs) {
target_bucket = target_bs->bucket;
ldpp_dout(dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl;
source_num_shards = siter->source.get_bucket_info().layout.current_index.layout.normal.num_shards;
- if (entry_gen) {
- if (*entry_gen > syncing_gen) {
- tn->log(10, "Future generation in datalog entry. Returning error so we'll retry.");
- return set_cr_error(-EAGAIN);
- } else if (*entry_gen < syncing_gen) {
- tn->log(10, "Future generation in datalog entry. Returning error so we'll retry.");
- return 0;
- }
- }
target_num_shards = siter->target.get_bucket_info().layout.current_index.layout.normal.num_shards;
if (source_bs) {
sync_pair.source_bs = *source_bs;
cur_progress = (progress ? &shard_progress[prealloc_stack_id()] : nullptr);
- yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair, tn,
- cur_progress),
+ yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair,
+ gen, tn, cur_progress),
BUCKET_SYNC_SPAWN_WINDOW,
[&](uint64_t stack_id, int ret) {
handle_complete_stack(stack_id);
boost::intrusive_ptr<RGWContinuousLeaseCR> bucket_lease_cr;
rgw_bucket_sync_pair_info sync_pair;
rgw_bucket_sync_pipe sync_pipe;
+ std::optional<uint64_t> gen;
ceph::real_time* progress;
const std::string lock_name = "bucket sync";
RGWSyncBucketCR(RGWDataSyncCtx *_sc,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
const rgw_bucket_sync_pair_info& _sync_pair,
+ std::optional<uint64_t> gen,
const RGWSyncTraceNodeRef& _tn_parent,
ceph::real_time* progress)
: RGWCoroutine(_sc->cct), sc(_sc), env(_sc->env),
- data_lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress),
+ data_lease_cr(std::move(lease_cr)), sync_pair(_sync_pair),
+ gen(gen), progress(progress),
lock_duration(cct->_conf->rgw_sync_lease_period),
status_obj(env->svc->zone->get_zone_params().log_pool,
RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease,
const rgw_bucket_sync_pair_info& sync_pair,
+ std::optional<uint64_t> gen,
const RGWSyncTraceNodeRef& tn,
ceph::real_time* progress)
{
- return new RGWSyncBucketCR(sc, std::move(lease), sync_pair, tn, progress);
+ return new RGWSyncBucketCR(sc, std::move(lease), sync_pair,
+ gen, tn, progress);
}
int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
drain_all();
bucket_lease_cr.reset();
}
+
+ // if a specific gen was requested, compare that to the sync status
+ if (gen) {
+ const auto current_gen = bucket_status.incremental_gen;
+ if (*gen > current_gen) {
+ retcode = -EAGAIN;
+ tn->log(10, SSTR("requested sync of future generation "
+ << *gen << " > " << current_gen
+ << ", returning " << retcode << " for later retry"));
+ return set_cr_error(retcode);
+ } else if (*gen < current_gen) {
+ tn->log(10, SSTR("requested sync of past generation "
+ << *gen << " < " << current_gen
+ << ", returning success"));
+ return set_cr_done();
+ }
+ }
+
yield call(new RGWSyncBucketShardCR(sc, data_lease_cr, sync_pair,
sync_pipe, bucket_status.state,
tn, progress));
return nullptr;
}
- return sync_bucket_shard_cr(&sc, nullptr, sync_pairs[num], sync_env->sync_tracer->root_node, nullptr);
+ constexpr std::optional<uint64_t> gen; // sync current gen
+ return sync_bucket_shard_cr(&sc, nullptr, sync_pairs[num], gen,
+ sync_env->sync_tracer->root_node, nullptr);
}
int RGWBucketPipeSyncStatusManager::init(const DoutPrefixProvider *dpp)