RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw_bucket_shard source_bs;
+ rgw_raw_obj error_repo;
std::string error_marker;
ceph::real_time timestamp;
RGWSyncTraceNodeRef tn;
rgw_bucket_index_marker_info remote_info;
rgw_pool pool;
- rgw_raw_obj error_repo;
uint32_t sid;
rgw_bucket_shard bs;
std::vector<store_gen_shards>::const_iterator each;
public:
- RGWDataIncrementalSyncFullObligationCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, std::string _error_marker,
- ceph::real_time& _timestamp, RGWSyncTraceNodeRef& _tn)
- : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- source_bs(_source_bs), error_marker(_error_marker), timestamp(_timestamp), tn(_tn) {
- tn = sync_env->sync_tracer->add_node(_tn, "error_repo", SSTR(bucket_shard_str(source_bs)));
- }
+ RGWDataIncrementalSyncFullObligationCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs,
+ const rgw_raw_obj& error_repo, const std::string& _error_marker,
+ ceph::real_time& _timestamp, RGWSyncTraceNodeRef& _tn)
+ : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), source_bs(_source_bs),
+ error_repo(error_repo), error_marker(_error_marker), timestamp(_timestamp),
+ tn(sync_env->sync_tracer->add_node(_tn, "error_repo", SSTR(bucket_shard_str(source_bs))))
+ {}
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
return set_cr_error(retcode);
}
+ // once everything succeeds, remove the full sync obligation from the error repo
+ yield call(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo,
+ error_marker, timestamp));
return set_cr_done();
}
return 0;
tn->log(10, SSTR("gen is " << gen));
if (!gen) {
// write all full sync obligations for the bucket to error repo
- spawn(new RGWDataIncrementalSyncFullObligationCR(
- sc, source_bs,error_marker, entry_timestamp, tn), false);
+ spawn(new RGWDataIncrementalSyncFullObligationCR(sc, source_bs,
+ error_repo, error_marker, entry_timestamp, tn), false);
} else {
tn->log(20, SSTR("handle error entry key="
<< to_string(source_bs, gen)