}
};
+class RGWFullSyncErrorRepoCR: public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ RGWDataSyncEnv *sync_env;
+ rgw_bucket_shard source_bs;
+ RGWSyncTraceNodeRef tn;
+ rgw_bucket_index_marker_info remote_info;
+ rgw_pool pool;
+ RGWDataChangesLog *datalog_changes{nullptr};
+
+ rgw_raw_obj datalog_oid_for_error_repo(rgw_bucket_shard& bs) {
+ int datalog_shard = datalog_changes->choose_oid(bs);
+ string oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, datalog_shard);
+ return rgw_raw_obj(pool, oid + ".retry");
+ }
+
+public:
+ RGWFullSyncErrorRepoCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, RGWSyncTraceNodeRef& _tn)
+ : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
+ source_bs(_source_bs), tn(_tn) {
+ tn = sync_env->sync_tracer->add_node(_tn, "error_repo", SSTR(bucket_shard_str(source_bs)));
+ }
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+ yield {
+ call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ for (const auto& each : remote_info.gen_numshards) {
+ for (uint32_t sid = 0; sid < each.second; sid++) {
+ rgw_bucket_shard bs(source_bs.bucket, sid);
+ rgw_raw_obj error_repo = datalog_oid_for_error_repo(bs);
+ tn->log(10, SSTR("writing shard_id " << sid << "of gen" << each.first << " to error repo for retry"));
+ call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+ rgw::error_repo::encode_key(bs, each.first),
+ ceph::real_time{}));
+ }
+ }
+ }
+ }
+ return 0;
+ }
+};
+
#define DATA_SYNC_MAX_ERR_ENTRIES 10
class RGWDataSyncShardCR : public RGWCoroutine {
RGWSyncTraceNodeRef tn;
rgw_bucket_shard source_bs;
+ std::optional<uint64_t> gen;
// target number of entries to cache before recycling idle ones
static constexpr size_t target_cache_size = 256;
&bs.bucket, &bs.shard_id);
}
- rgw_raw_obj datalog_oid_for_error_repo(rgw_bucket_shard& bs) {
- auto shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
- auto datalog_shard = (ceph_str_hash_linux(bs.bucket.name.data(), bs.bucket.name.size()) +
- shard_shift) % cct->_conf->rgw_data_log_num_shards;
- string oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, datalog_shard);
- return rgw_raw_obj(pool, oid + ".retry");
- }
-
RGWCoroutine* sync_single_entry(const rgw_bucket_shard& src,
std::optional<uint64_t> gen,
const std::string& marker,
for (; iter != error_entries.end(); ++iter) {
error_marker = iter->first;
entry_timestamp = rgw::error_repo::decode_value(iter->second);
- std::optional<uint64_t> gen;
retcode = rgw::error_repo::decode_key(iter->first, source_bs, gen);
if (retcode == -EINVAL) {
// backward compatibility for string keys that don't encode a gen
error_marker, entry_timestamp), false);
continue;
}
+ if (!gen) {
+ // write all full sync obligations for the bucket to error repo
+ yield call(new RGWFullSyncErrorRepoCR(sc, source_bs, tn));
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: failed to write to error repo: retcode=" << retcode));
+ }
+ }
tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp));
spawn(sync_single_entry(source_bs, gen, "", entry_timestamp, true), false);
}
marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
continue;
}
- if (!log_iter->entry.gen) {
- yield {
- rgw_bucket_index_marker_info remote_info;
- BucketIndexShardsManager remote_markers;
- retcode = rgw_read_remote_bilog_info(sync_env->dpp, sc->conn, source_bs.bucket,
- remote_info, remote_markers, null_yield);
-
- if (retcode < 0) {
- tn->log(1, SSTR(" rgw_read_remote_bilog_info failed with retcode=" << retcode));
- return retcode;
- }
- for (const auto& each : remote_info.gen_numshards) {
- for (int sid = 0; sid < each.second; sid++) {
- rgw_bucket_shard bs(source_bs.bucket, sid);
- error_repo = datalog_oid_for_error_repo(bs);
- tn->log(10, SSTR("writing shard_id " << sid << "of gen" << each.first << " to error repo for retry"));
- call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
- rgw::error_repo::encode_key(bs, each.first),
- ceph::real_time{}));
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
- }
- }
- }
- }
- }
if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
} else {
oldest_gen = logs.front().gen;
latest_gen = logs.back().gen;
- std::vector<std::pair<int, int>> gen_numshards;
- for (auto gen = logs.front().gen; gen <= logs.back().gen; gen++) {
- auto log = std::find_if(logs.begin(), logs.end(), rgw::matches_gen(gen));
- if (log == logs.end()) {
- ldpp_dout(s, 5) << "ERROR: no log layout with gen=" << gen << dendl;
- op_ret = -ENOENT;
- }
- const auto& num_shards = log->layout.in_index.layout.num_shards;
- gen_numshards.push_back(std::make_pair(gen, num_shards));
+ for (auto& log : logs) {
+ uint32_t num_shards = log.layout.in_index.layout.num_shards;
+ gen_numshards.push_back(std::make_pair(log.gen, num_shards));
}
}
encode_json("syncstopped", syncstopped, s->formatter);
encode_json("oldest_gen", oldest_gen, s->formatter);
encode_json("latest_gen", latest_gen, s->formatter);
+ //encode_json("gen_numshards", gen_numshards, s->formatter); TODO: add supporting encode/decode for std::pair
s->formatter->close_section();
flusher.flush();