From 10bd6e73c4a57c2d631e6c01b1dcedd6a47904ca Mon Sep 17 00:00:00 2001 From: Shilpa Jagannath Date: Mon, 27 Jun 2022 16:55:00 -0400 Subject: [PATCH] rgw/multisite: full_sync changes. Signed-off-by: Shilpa Jagannath --- src/rgw/rgw_data_sync.cc | 82 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 77 insertions(+), 5 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index a63558ed0dfeb..392cd2ed66797 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1543,6 +1543,9 @@ class RGWDataSyncShardCR : public RGWCoroutine { // target number of entries to cache before recycling idle ones static constexpr size_t target_cache_size = 256; boost::intrusive_ptr bucket_shard_cache; + rgw_bucket_index_marker_info remote_info; + uint32_t sid; + std::vector::iterator each; int parse_bucket_key(const std::string& key, rgw_bucket_shard& bs) const { return rgw_bucket_parse_bucket_key(sync_env->cct, key, @@ -1679,11 +1682,80 @@ public: if (!marker_tracker->start(iter->first, total_entries, entry_timestamp)) { tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?")); } else { - // fetch remote and write locally - yield_spawn_window(sync_single_entry(source_bs, std::nullopt, iter->first, - entry_timestamp, false), - cct->_conf->rgw_data_sync_spawn_window, std::nullopt); - } + yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info)); + if (retcode < 0) { + tn->log(10, SSTR("full sync: failed to read remote bucket info. Writing " + << source_bs.shard_id << " to error repo for retry")); + yield call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, + rgw::error_repo::encode_key(source_bs, std::nullopt), + entry_timestamp)); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: failed to log " << source_bs.shard_id << " in error repo: retcode=" << retcode)); + } + return set_cr_error(retcode); + } + + //wait to sync the first shard of the oldest generation and then sync all other shards. + //if any of the operations fail at any time, write them into error repo for later retry. + + source_bs.shard_id = 0; + yield call(sync_single_entry(source_bs, remote_info.oldest_gen, iter->first, entry_timestamp, false)); + if (retcode < 0) { + tn->log(10, SSTR("full sync: failed to sync " << source_bs.shard_id << " of gen " + << remote_info.oldest_gen << ". Writing to error repo for retry")); + yield call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, + rgw::error_repo::encode_key(source_bs, remote_info.oldest_gen), + entry_timestamp)); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: failed to write " << remote_info.oldest_gen << ":" << source_bs.shard_id + << " in error repo: retcode=" << retcode)); + return set_cr_error(retcode); + } + } + each = remote_info.generations.begin(); + for (; each != remote_info.generations.end(); each++) { + for (sid = 0; sid < each->num_shards; sid++) { + source_bs.shard_id = sid; + tn->log(10, SSTR("full sync: syncing shard_id " << sid << " of gen " << each->gen)); + yield_spawn_window(sync_single_entry(source_bs, each->gen, iter->first, entry_timestamp, false), + cct->_conf->rgw_data_sync_spawn_window, + [&](uint64_t stack_id, int ret) { + if (ret < 0) { + sid = source_bs.shard_id; + for (; sid < each->num_shards; sid++) { + source_bs.shard_id = sid; + spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, + rgw::error_repo::encode_key(source_bs, each->gen), + entry_timestamp), false); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":" + << sid << " to error repo: retcode=" << retcode)); + } + } + auto i = std::distance(remote_info.generations.begin(), each); + for (each[i]; each != remote_info.generations.end(); each++) { + for (sid = 0; sid < each->num_shards; sid++){ + spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, + rgw::error_repo::encode_key(source_bs, each->gen), + entry_timestamp), false); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":" + << sid << " to error repo: retcode=" << retcode)); + } + } + } + } + return 0; + }); + drain_all_cb([&](uint64_t stack_id, int ret) { + if (ret < 0) { + tn->log(10, SSTR("a sync operation returned error: " << ret)); + } + return ret; + }); + } + } + } sync_marker.marker = iter->first; } } while (omapvals->more); -- 2.39.5