}
}
+ status.shards_done_with_gen.resize(num_shards);
status.incremental_gen = info.latest_gen;
ldout(cct, 20) << "writing bucket sync state=" << status.state << dendl;
return op == CLS_RGW_OP_LINK_OLH || op == CLS_RGW_OP_UNLINK_INSTANCE;
}
+class RGWBucketShardIsDoneCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ RGWDataSyncEnv *sync_env;
+ rgw_bucket_sync_status bucket_status;
+ const rgw_raw_obj& bucket_status_obj;
+ const int shard_id;
+ RGWObjVersionTracker objv_tracker;
+ const next_bilog_result& next_log;
+ const uint64_t generation;
+
+public:
+ RGWBucketShardIsDoneCR(RGWDataSyncCtx *_sc, const rgw_raw_obj& _bucket_status_obj,
+ int _shard_id, const next_bilog_result& _next_log, const uint64_t _gen)
+ : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
+ bucket_status_obj(_bucket_status_obj),
+ shard_id(_shard_id), next_log(_next_log), generation(_gen) {}
+
+ int operate(const DoutPrefixProvider* dpp) override
+ {
+ reenter(this) {
+ do {
+ // read bucket sync status
+ objv_tracker.clear();
+ using ReadCR = RGWSimpleRadosReadCR<rgw_bucket_sync_status>;
+ yield call(new ReadCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ bucket_status_obj, &bucket_status, false, &objv_tracker));
+ if (retcode < 0) {
+ ldpp_dout(dpp, 20) << "failed to read bucket shard status: "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+
+ if (bucket_status.state != BucketSyncState::Incremental) {
+ // exit with success to avoid stale shard being
+ // retried in error repo if we lost a race
+ ldpp_dout(dpp, 20) << "RGWBucketShardIsDoneCR found sync state = " << bucket_status.state << dendl;
+ return set_cr_done();
+ }
+
+ if (bucket_status.incremental_gen != generation) {
+ // exit with success to avoid stale shard being
+ // retried in error repo if we lost a race
+ ldpp_dout(dpp, 20) << "RGWBucketShardIsDoneCR expected gen: " << generation
+ << ", got: " << bucket_status.incremental_gen << dendl;
+ return set_cr_done();
+ }
+
+ yield {
+ // update bucket_status after a shard is done with current gen
+ auto& done = bucket_status.shards_done_with_gen;
+ done[shard_id] = true;
+
+ // increment gen if all shards are already done with current gen
+ if (std::all_of(done.begin(), done.end(),
+ [] (const bool done){return done; } )) {
+ bucket_status.incremental_gen = next_log.generation;
+ done.clear();
+ done.resize(next_log.num_shards, false);
+ }
+ using WriteCR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
+ call(new WriteCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ bucket_status_obj, bucket_status, &objv_tracker, false));
+ }
+ if (retcode < 0 && retcode != -ECANCELED) {
+ ldpp_dout(dpp, 20) << "failed to write bucket sync status: " << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ } else if (retcode >= 0) {
+ return set_cr_done();
+ }
+ } while (retcode == -ECANCELED);
+ }
+ return 0;
+ }
+};
+
class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw_bucket_sync_pipe& sync_pipe;
RGWBucketSyncFlowManager::pipe_rules_ref rules;
rgw_bucket_shard& bs;
+ const rgw_raw_obj& bucket_status_obj;
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
bilog_list_result extended_result;
list<rgw_bi_log_entry> list_result;
public:
RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe,
- const std::string& status_oid,
+ const std::string& shard_status_oid,
+ const rgw_raw_obj& _bucket_status_obj,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
rgw_bucket_shard_sync_info& sync_info,
RGWSyncTraceNodeRef& _tn_parent,
ceph::real_time* stable_timestamp)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
- lease_cr(std::move(lease_cr)), sync_info(sync_info),
- zone_id(sync_env->svc->zone->get_zone().id),
+ bucket_status_obj(_bucket_status_obj), lease_cr(std::move(lease_cr)),
+ sync_info(sync_info), zone_id(sync_env->svc->zone->get_zone().id),
tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
SSTR(bucket_shard_str{bs}))),
- marker_tracker(sc, status_oid, sync_info.inc_marker, tn,
+ marker_tracker(sc, shard_status_oid, sync_info.inc_marker, tn,
objv_tracker, stable_timestamp)
{
set_description() << "bucket shard incremental sync bucket="
return 0;
});
}
+
} while (!list_result.empty() && sync_status == 0 && !syncstopped);
drain_all_cb([&](uint64_t stack_id, int ret) {
tn->log(10, SSTR("backing out with sync_status=" << sync_status));
return set_cr_error(sync_status);
}
+
+ if (!truncated && extended_result.next_log) {
+ yield call(new RGWBucketShardIsDoneCR(sc, bucket_status_obj, bs.shard_id, *extended_result.next_log, generation));
+ if (retcode < 0) {
+ ldout(cct, 20) << "failed to update bucket sync status: "
+ << cpp_strerror(retcode) << dendl;
+ drain_all();
+ return set_cr_error(retcode);
+ }
+ }
+
return set_cr_done();
}
return 0;
BucketSyncState& bucket_state;
ceph::real_time* progress;
- const std::string status_oid;
+ const std::string shard_status_oid;
+ const rgw_raw_obj bucket_status_obj;
rgw_bucket_shard_sync_info sync_status;
RGWObjVersionTracker objv_tracker;
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
lease_cr(std::move(lease_cr)), sync_pair(_sync_pair),
sync_pipe(sync_pipe), bucket_state(bucket_state), progress(progress),
- status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)),
+ shard_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)),
+ bucket_status_obj(sc->env->svc->zone->get_zone_params().log_pool,
+ RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
+ sync_pair.source_bs.bucket,
+ sync_pair.dest_bs.bucket)),
tn(tn) {
}
}
yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe,
- status_oid, lease_cr,
+ shard_status_oid, bucket_status_obj, lease_cr,
sync_status, tn,
objv_tracker, progress));
if (retcode < 0) {
}
}
+ if (sync_pair.source_bs.shard_id >= bucket_status.shards_done_with_gen.size()) {
+ tn->log(1, SSTR("bucket shard " << sync_pair.source_bs << " index out of bounds"));
+ return set_cr_done(); // return success so we don't retry
+ }
+ if (bucket_status.shards_done_with_gen[sync_pair.source_bs.shard_id]) {
+ tn->log(10, SSTR("bucket shard " << sync_pair.source_bs << " of gen " <<
+ gen << " already synced."));
+ return set_cr_done();
+ }
+
yield call(new RGWSyncBucketShardCR(sc, data_lease_cr, sync_pair,
sync_pipe, bucket_status.state,
tn, progress));