From cccf22630334246c66c95add39bfc4390aefb9e1 Mon Sep 17 00:00:00 2001 From: Shilpa Jagannath Date: Tue, 5 Jul 2022 15:54:26 -0400 Subject: [PATCH] rgw/multisite: changes to error handling. Signed-off-by: Shilpa Jagannath --- src/rgw/rgw_data_sync.cc | 127 +++++++++++++++++++++------------------ 1 file changed, 67 insertions(+), 60 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 8c9fe5189a406..f191f45d231f8 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1417,8 +1417,8 @@ class RGWFullSyncErrorRepoCR: public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_bucket_shard source_bs; + std::string error_marker; ceph::real_time timestamp; - int *error_result; RGWSyncTraceNodeRef tn; rgw_bucket_index_marker_info remote_info; rgw_pool pool; @@ -1435,10 +1435,10 @@ class RGWFullSyncErrorRepoCR: public RGWCoroutine { } public: - RGWFullSyncErrorRepoCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, - ceph::real_time& _timestamp, int *_error_result, RGWSyncTraceNodeRef& _tn) + RGWFullSyncErrorRepoCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, std::string _error_marker, + ceph::real_time& _timestamp, RGWSyncTraceNodeRef& _tn) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - source_bs(_source_bs), timestamp(_timestamp), error_result(_error_result), tn(_tn) { + source_bs(_source_bs), error_marker(_error_marker), timestamp(_timestamp), tn(_tn) { tn = sync_env->sync_tracer->add_node(_tn, "error_repo", SSTR(bucket_shard_str(source_bs))); } @@ -1462,20 +1462,21 @@ public: [&](uint64_t stack_id, int ret) { if (ret < 0) { tn->log(10, SSTR("writing to error repo returned error: " << ret)); - *error_result = ret; } - return 0; - }); + return ret; + }); } } drain_all_cb([&](uint64_t stack_id, int ret) { if (ret < 0) { tn->log(10, SSTR("writing to error repo returned error: " << ret)); - *error_result = ret; } - return 0; + return ret; }); + spawn(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo, + error_marker, timestamp), false); + return set_cr_done(); } return 0; @@ -1499,7 +1500,7 @@ RGWCoroutine* sync_single_entry(RGWDataSyncCtx *sc, const rgw_bucket_shard& src, lease_cr.get(), tn); } -class RGWHandleFullSyncCR : public RGWCoroutine { +class RGWDataFullSyncSingleEntryCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_bucket_shard source_bs; @@ -1508,22 +1509,21 @@ class RGWHandleFullSyncCR : public RGWCoroutine { ceph::real_time timestamp; boost::intrusive_ptr lease_cr; boost::intrusive_ptr bucket_shard_cache; - std::optional marker_tracker; RGWSyncTraceNodeRef tn; rgw_bucket_index_marker_info remote_info; uint32_t sid; std::vector::iterator each; + uint64_t i{0}; public: - RGWHandleFullSyncCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, + RGWDataFullSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, const std::string& _key, rgw_raw_obj& _error_repo, ceph::real_time& _timestamp, boost::intrusive_ptr _lease_cr, boost::intrusive_ptr _bucket_shard_cache, - std::optional _marker_tracker, RGWSyncTraceNodeRef& _tn) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), source_bs(_source_bs), key(_key), error_repo(_error_repo), timestamp(_timestamp), lease_cr(_lease_cr), - bucket_shard_cache(_bucket_shard_cache), marker_tracker(_marker_tracker), tn(_tn) {} + bucket_shard_cache(_bucket_shard_cache), tn(_tn) {} int operate(const DoutPrefixProvider *dpp) override { @@ -1546,7 +1546,7 @@ public: source_bs.shard_id = 0; yield call(sync_single_entry(sc, source_bs, remote_info.oldest_gen, key, timestamp, - lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false)); + lease_cr, bucket_shard_cache, std::nullopt, error_repo, tn, false)); if (retcode < 0) { tn->log(10, SSTR("full sync: failed to sync " << source_bs.shard_id << " of gen " << remote_info.oldest_gen << ". Writing to error repo for retry")); @@ -1562,47 +1562,59 @@ public: each = remote_info.generations.begin(); for (; each != remote_info.generations.end(); each++) { for (sid = 0; sid < each->num_shards; sid++) { + if (retcode < 0) { + sid = source_bs.shard_id; + for (; sid < each->num_shards; sid++) { + source_bs.shard_id = sid; + yield_spawn_window(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, + rgw::error_repo::encode_key(source_bs, each->gen), + timestamp), cct->_conf->rgw_data_sync_spawn_window, + [&](uint64_t stack_id, int ret) { + if (ret < 0) { + tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":" + << sid << " to error repo: retcode=" << ret)); + } + return ret; + }); + } + i = std::distance(remote_info.generations.begin(), each); + for (each[i]; each != remote_info.generations.end(); each++) { + for (sid = 0; sid < each->num_shards; sid++){ + yield_spawn_window(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, + rgw::error_repo::encode_key(source_bs, each->gen), + timestamp), cct->_conf->rgw_data_sync_spawn_window, + [&](uint64_t stack_id, int ret) { + if (ret < 0) { + tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":" + << sid << " to error repo: retcode=" << ret)); + } + return ret; + }); + } + } + } else { source_bs.shard_id = sid; tn->log(10, SSTR("full sync: syncing shard_id " << sid << " of gen " << each->gen)); yield_spawn_window(sync_single_entry(sc, source_bs, each->gen, key, timestamp, - lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false), + lease_cr, bucket_shard_cache, std::nullopt, error_repo, tn, false), cct->_conf->rgw_data_sync_spawn_window, [&](uint64_t stack_id, int ret) { - if (ret < 0) { - sid = source_bs.shard_id; - for (; sid < each->num_shards; sid++) { - source_bs.shard_id = sid; - spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, - rgw::error_repo::encode_key(source_bs, each->gen), - timestamp), false); - if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":" - << sid << " to error repo: retcode=" << retcode)); - } - } - auto i = std::distance(remote_info.generations.begin(), each); - for (each[i]; each != remote_info.generations.end(); each++) { - for (sid = 0; sid < each->num_shards; sid++){ - spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, - rgw::error_repo::encode_key(source_bs, each->gen), - timestamp), false); - if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":" - << sid << " to error repo: retcode=" << retcode)); - } - } + if (ret < 0) { + retcode = ret; } - } - return 0; - }); - drain_all_cb([&](uint64_t stack_id, int ret) { - if (ret < 0) { - tn->log(10, SSTR("a sync operation returned error: " << ret)); - } - return ret; - }); + return retcode; + }); + } } } + + drain_all_cb([&](uint64_t stack_id, int ret) { + if (ret < 0) { + tn->log(10, SSTR("a sync operation returned error: " << ret)); + } + return ret; + }); + return set_cr_done(); } return 0; @@ -1665,14 +1677,10 @@ class RGWDataSyncShardCR : public RGWCoroutine { rgw_bucket_shard source_bs; std::optional gen; - int error_result{0}; // target number of entries to cache before recycling idle ones static constexpr size_t target_cache_size = 256; boost::intrusive_ptr bucket_shard_cache; - rgw_bucket_index_marker_info remote_info; - uint32_t sid; - std::vector::iterator each; int parse_bucket_key(const std::string& key, rgw_bucket_shard& bs) const { return rgw_bucket_parse_bucket_key(sync_env->cct, key, @@ -1799,8 +1807,10 @@ public: if (!marker_tracker->start(iter->first, total_entries, entry_timestamp)) { tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?")); } else { - yield call(new RGWHandleFullSyncCR(sc, source_bs, iter->first, error_repo, entry_timestamp, - lease_cr, bucket_shard_cache, marker_tracker, tn)); + yield_spawn_window(new RGWDataFullSyncSingleEntryCR(sc, source_bs, iter->first, error_repo, entry_timestamp, + lease_cr, bucket_shard_cache, tn), cct->_conf->rgw_data_sync_spawn_window, std::nullopt); + + yield call(marker_tracker->finish(iter->first)); } sync_marker.marker = iter->first; } @@ -1908,12 +1918,9 @@ public: } if (!gen) { // write all full sync obligations for the bucket to error repo - spawn(new RGWFullSyncErrorRepoCR(sc, source_bs, entry_timestamp, &error_result, tn), false); - if (error_result == 0) { - spawn(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo, - error_marker, entry_timestamp), false); - } else { - tn->log(0, SSTR("ERROR: failed to write to error repo: retcode=" << error_result)); + spawn(new RGWFullSyncErrorRepoCR(sc, source_bs, error_marker, entry_timestamp, tn), false); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: failed to write to error repo: retcode=" << retcode)); } } else { tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp)); -- 2.39.5