RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw_bucket_shard source_bs;
+ ceph::real_time timestamp;
+ int *error_result;
RGWSyncTraceNodeRef tn;
rgw_bucket_index_marker_info remote_info;
rgw_pool pool;
RGWDataChangesLog *datalog_changes{nullptr};
+ rgw_raw_obj error_repo;
+ uint32_t sid;
+ rgw_bucket_shard bs;
+ std::vector<store_gen_shards>::const_iterator each;
rgw_raw_obj datalog_oid_for_error_repo(rgw_bucket_shard& bs) {
int datalog_shard = datalog_changes->choose_oid(bs);
}
public:
- RGWFullSyncErrorRepoCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, RGWSyncTraceNodeRef& _tn)
+ RGWFullSyncErrorRepoCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs,
+ ceph::real_time& _timestamp, int *_error_result, RGWSyncTraceNodeRef& _tn)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- source_bs(_source_bs), tn(_tn) {
+ source_bs(_source_bs), timestamp(_timestamp), error_result(_error_result), tn(_tn) {
tn = sync_env->sync_tracer->add_node(_tn, "error_repo", SSTR(bucket_shard_str(source_bs)));
}
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
- yield {
- call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info));
- if (retcode < 0) {
- return set_cr_error(retcode);
- }
- for (const auto& each : remote_info.gen_numshards) {
- for (uint32_t sid = 0; sid < each.second; sid++) {
- rgw_bucket_shard bs(source_bs.bucket, sid);
- rgw_raw_obj error_repo = datalog_oid_for_error_repo(bs);
- tn->log(10, SSTR("writing shard_id " << sid << "of gen" << each.first << " to error repo for retry"));
- call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
- rgw::error_repo::encode_key(bs, each.first),
- ceph::real_time{}));
- }
+ yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ each = remote_info.generations.cbegin();
+ for (; each != remote_info.generations.cend(); each++) {
+ for (sid = 0; sid < each->num_shards; sid++) {
+ bs.bucket = source_bs.bucket;
+ bs.shard_id = sid;
+ error_repo = datalog_oid_for_error_repo(bs);
+ tn->log(10, SSTR("writing shard_id " << sid << " of gen " << each->gen << " to error repo for retry"));
+ yield_spawn_window(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+ rgw::error_repo::encode_key(bs, each->gen),
+ timestamp), cct->_conf->rgw_data_sync_spawn_window,
+ [&](uint64_t stack_id, int ret) {
+ if (ret < 0) {
+ tn->log(10, SSTR("writing to error repo returned error: " << ret));
+ *error_result = ret;
+ }
+ return 0;
+ });
}
}
+ drain_all_cb([&](uint64_t stack_id, int ret) {
+ if (ret < 0) {
+ tn->log(10, SSTR("writing to error repo returned error: " << ret));
+ *error_result = ret;
+ }
+ return 0;
+ });
+
+ return set_cr_done();
}
return 0;
}
rgw_bucket_shard source_bs;
std::optional<uint64_t> gen;
+ int error_result{0};
// target number of entries to cache before recycling idle ones
static constexpr size_t target_cache_size = 256;
}
if (!gen) {
// write all full sync obligations for the bucket to error repo
- yield call(new RGWFullSyncErrorRepoCR(sc, source_bs, tn));
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to write to error repo: retcode=" << retcode));
+ spawn(new RGWFullSyncErrorRepoCR(sc, source_bs, entry_timestamp, &error_result, tn), false);
+ if (error_result == 0) {
+ spawn(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo,
+ error_marker, entry_timestamp), false);
+ } else {
+ tn->log(0, SSTR("ERROR: failed to write to error repo: retcode=" << error_result));
}
+ } else {
+ tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp));
+ spawn(sync_single_entry(source_bs, gen, "", entry_timestamp, true), false);
}
- tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp));
- spawn(sync_single_entry(source_bs, gen, "", entry_timestamp, true), false);
}
if (!omapvals->more) {
error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
void decode_json(JSONObj *obj);
};
+struct store_gen_shards {
+ uint64_t gen = 0;
+ uint32_t num_shards = 0;
+
+ void dump(Formatter *f) const {
+ encode_json("gen", gen, f);
+ encode_json("num_shards", num_shards, f);
+ }
+
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("gen", gen, obj);
+ JSONDecoder::decode_json("num_shards", num_shards, obj);
+ }
+};
+
struct rgw_bucket_index_marker_info {
std::string bucket_ver;
std::string master_ver;
bool syncstopped{false};
uint64_t oldest_gen = 0;
uint64_t latest_gen = 0;
- std::vector<std::pair<uint64_t, uint32_t>> gen_numshards;
+ std::vector<store_gen_shards> generations;
void decode_json(JSONObj *obj) {
JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
JSONDecoder::decode_json("syncstopped", syncstopped, obj);
JSONDecoder::decode_json("oldest_gen", oldest_gen, obj);
JSONDecoder::decode_json("latest_gen", latest_gen, obj);
+ JSONDecoder::decode_json("generations", generations, obj);
}
};