From 4f1ed965dcfc0c1627fce7b99da3a158f4617d8e Mon Sep 17 00:00:00 2001 From: Shilpa Jagannath Date: Wed, 29 Jun 2022 10:57:47 -0400 Subject: [PATCH] rgw/multisite: move out full sync trigger handling block to a new cr class. Signed-off-by: Shilpa Jagannath --- src/rgw/rgw_data_sync.cc | 189 +++++++++++++++++++++++---------------- 1 file changed, 113 insertions(+), 76 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 7d0a12b7d947f..8c9fe5189a406 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1484,7 +1484,7 @@ public: RGWCoroutine* sync_single_entry(RGWDataSyncCtx *sc, const rgw_bucket_shard& src, std::optional gen, - const std::string& marker, + const std::string marker, ceph::real_time timestamp, boost::intrusive_ptr lease_cr, boost::intrusive_ptr bucket_shard_cache, @@ -1499,6 +1499,116 @@ RGWCoroutine* sync_single_entry(RGWDataSyncCtx *sc, const rgw_bucket_shard& src, lease_cr.get(), tn); } +class RGWHandleFullSyncCR : public RGWCoroutine { + RGWDataSyncCtx *sc; + RGWDataSyncEnv *sync_env; + rgw_bucket_shard source_bs; + const std::string key; + rgw_raw_obj error_repo; + ceph::real_time timestamp; + boost::intrusive_ptr lease_cr; + boost::intrusive_ptr bucket_shard_cache; + std::optional marker_tracker; + RGWSyncTraceNodeRef tn; + rgw_bucket_index_marker_info remote_info; + uint32_t sid; + std::vector::iterator each; + +public: + RGWHandleFullSyncCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, + const std::string& _key, rgw_raw_obj& _error_repo, + ceph::real_time& _timestamp, boost::intrusive_ptr _lease_cr, + boost::intrusive_ptr _bucket_shard_cache, + std::optional _marker_tracker, + RGWSyncTraceNodeRef& _tn) + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), source_bs(_source_bs), key(_key), + error_repo(_error_repo), timestamp(_timestamp), lease_cr(_lease_cr), + bucket_shard_cache(_bucket_shard_cache), marker_tracker(_marker_tracker), tn(_tn) {} + + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info)); + if (retcode < 0) { + tn->log(10, SSTR("full sync: failed to read remote bucket info. Writing " + << source_bs.shard_id << " to error repo for retry")); + yield call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, + rgw::error_repo::encode_key(source_bs, std::nullopt), + timestamp)); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: failed to log " << source_bs.shard_id << " in error repo: retcode=" << retcode)); + } + return set_cr_error(retcode); + } + + //wait to sync the first shard of the oldest generation and then sync all other shards. + //if any of the operations fail at any time, write them into error repo for later retry. + + source_bs.shard_id = 0; + yield call(sync_single_entry(sc, source_bs, remote_info.oldest_gen, key, timestamp, + lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false)); + if (retcode < 0) { + tn->log(10, SSTR("full sync: failed to sync " << source_bs.shard_id << " of gen " + << remote_info.oldest_gen << ". Writing to error repo for retry")); + yield call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, + rgw::error_repo::encode_key(source_bs, remote_info.oldest_gen), + timestamp)); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: failed to write " << remote_info.oldest_gen << ":" << source_bs.shard_id + << " in error repo: retcode=" << retcode)); + return set_cr_error(retcode); + } + } + each = remote_info.generations.begin(); + for (; each != remote_info.generations.end(); each++) { + for (sid = 0; sid < each->num_shards; sid++) { + source_bs.shard_id = sid; + tn->log(10, SSTR("full sync: syncing shard_id " << sid << " of gen " << each->gen)); + yield_spawn_window(sync_single_entry(sc, source_bs, each->gen, key, timestamp, + lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false), + cct->_conf->rgw_data_sync_spawn_window, + [&](uint64_t stack_id, int ret) { + if (ret < 0) { + sid = source_bs.shard_id; + for (; sid < each->num_shards; sid++) { + source_bs.shard_id = sid; + spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, + rgw::error_repo::encode_key(source_bs, each->gen), + timestamp), false); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":" + << sid << " to error repo: retcode=" << retcode)); + } + } + auto i = std::distance(remote_info.generations.begin(), each); + for (each[i]; each != remote_info.generations.end(); each++) { + for (sid = 0; sid < each->num_shards; sid++){ + spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, + rgw::error_repo::encode_key(source_bs, each->gen), + timestamp), false); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":" + << sid << " to error repo: retcode=" << retcode)); + } + } + } + } + return 0; + }); + drain_all_cb([&](uint64_t stack_id, int ret) { + if (ret < 0) { + tn->log(10, SSTR("a sync operation returned error: " << ret)); + } + return ret; + }); + } + } + return set_cr_done(); + } + return 0; + } +}; + #define DATA_SYNC_MAX_ERR_ENTRIES 10 class RGWDataSyncShardCR : public RGWCoroutine { @@ -1689,81 +1799,8 @@ public: if (!marker_tracker->start(iter->first, total_entries, entry_timestamp)) { tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?")); } else { - yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info)); - if (retcode < 0) { - tn->log(10, SSTR("full sync: failed to read remote bucket info. Writing " - << source_bs.shard_id << " to error repo for retry")); - yield call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, - rgw::error_repo::encode_key(source_bs, std::nullopt), - entry_timestamp)); - if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to log " << source_bs.shard_id << " in error repo: retcode=" << retcode)); - } - return set_cr_error(retcode); - } - - //wait to sync the first shard of the oldest generation and then sync all other shards. - //if any of the operations fail at any time, write them into error repo for later retry. - - source_bs.shard_id = 0; - yield call(sync_single_entry(sc, source_bs, remote_info.oldest_gen, iter->first, entry_timestamp, - lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false)); - if (retcode < 0) { - tn->log(10, SSTR("full sync: failed to sync " << source_bs.shard_id << " of gen " - << remote_info.oldest_gen << ". Writing to error repo for retry")); - yield call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, - rgw::error_repo::encode_key(source_bs, remote_info.oldest_gen), - entry_timestamp)); - if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to write " << remote_info.oldest_gen << ":" << source_bs.shard_id - << " in error repo: retcode=" << retcode)); - return set_cr_error(retcode); - } - } - each = remote_info.generations.begin(); - for (; each != remote_info.generations.end(); each++) { - for (sid = 0; sid < each->num_shards; sid++) { - source_bs.shard_id = sid; - tn->log(10, SSTR("full sync: syncing shard_id " << sid << " of gen " << each->gen)); - yield_spawn_window(sync_single_entry(sc, source_bs, each->gen, iter->first, entry_timestamp, - lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false), - cct->_conf->rgw_data_sync_spawn_window, - [&](uint64_t stack_id, int ret) { - if (ret < 0) { - sid = source_bs.shard_id; - for (; sid < each->num_shards; sid++) { - source_bs.shard_id = sid; - spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, - rgw::error_repo::encode_key(source_bs, each->gen), - entry_timestamp), false); - if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":" - << sid << " to error repo: retcode=" << retcode)); - } - } - auto i = std::distance(remote_info.generations.begin(), each); - for (each[i]; each != remote_info.generations.end(); each++) { - for (sid = 0; sid < each->num_shards; sid++){ - spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, - rgw::error_repo::encode_key(source_bs, each->gen), - entry_timestamp), false); - if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":" - << sid << " to error repo: retcode=" << retcode)); - } - } - } - } - return 0; - }); - drain_all_cb([&](uint64_t stack_id, int ret) { - if (ret < 0) { - tn->log(10, SSTR("a sync operation returned error: " << ret)); - } - return ret; - }); - } - } + yield call(new RGWHandleFullSyncCR(sc, source_bs, iter->first, error_repo, entry_timestamp, + lease_cr, bucket_shard_cache, marker_tracker, tn)); } sync_marker.marker = iter->first; } -- 2.39.5