From 4247edbb983750d4a970cf81b779672a40378c38 Mon Sep 17 00:00:00 2001 From: Shilpa Jagannath Date: Mon, 13 Jun 2022 23:27:24 -0400 Subject: [PATCH] rgw/multisite: Store generation and its corresponding num shards in a struct. Call remove_cr() to remove full sync obligations after writing all shard entries in to error repo. Replace call() with spawn() and yield_spawn_window() Signed-off-by: Shilpa Jagannath --- src/rgw/rgw_data_sync.cc | 71 ++++++++++++++++++++++++++++------------ src/rgw/rgw_data_sync.h | 18 +++++++++- src/rgw/rgw_rest_log.cc | 4 +-- src/rgw/rgw_rest_log.h | 3 +- 4 files changed, 71 insertions(+), 25 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 179f59fb284c6..8d897d175c4b1 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1417,10 +1417,16 @@ class RGWFullSyncErrorRepoCR: public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_bucket_shard source_bs; + ceph::real_time timestamp; + int *error_result; RGWSyncTraceNodeRef tn; rgw_bucket_index_marker_info remote_info; rgw_pool pool; RGWDataChangesLog *datalog_changes{nullptr}; + rgw_raw_obj error_repo; + uint32_t sid; + rgw_bucket_shard bs; + std::vector::const_iterator each; rgw_raw_obj datalog_oid_for_error_repo(rgw_bucket_shard& bs) { int datalog_shard = datalog_changes->choose_oid(bs); @@ -1429,30 +1435,48 @@ class RGWFullSyncErrorRepoCR: public RGWCoroutine { } public: - RGWFullSyncErrorRepoCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, RGWSyncTraceNodeRef& _tn) + RGWFullSyncErrorRepoCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, + ceph::real_time& _timestamp, int *_error_result, RGWSyncTraceNodeRef& _tn) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - source_bs(_source_bs), tn(_tn) { + source_bs(_source_bs), timestamp(_timestamp), error_result(_error_result), tn(_tn) { tn = sync_env->sync_tracer->add_node(_tn, "error_repo", SSTR(bucket_shard_str(source_bs))); } int operate(const DoutPrefixProvider *dpp) override { reenter(this) { - yield { - call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info)); - if (retcode < 0) { - return set_cr_error(retcode); - } - for (const auto& each : remote_info.gen_numshards) { - for (uint32_t sid = 0; sid < each.second; sid++) { - rgw_bucket_shard bs(source_bs.bucket, sid); - rgw_raw_obj error_repo = datalog_oid_for_error_repo(bs); - tn->log(10, SSTR("writing shard_id " << sid << "of gen" << each.first << " to error repo for retry")); - call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, - rgw::error_repo::encode_key(bs, each.first), - ceph::real_time{})); - } + yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info)); + if (retcode < 0) { + return set_cr_error(retcode); + } + + each = remote_info.generations.cbegin(); + for (; each != remote_info.generations.cend(); each++) { + for (sid = 0; sid < each->num_shards; sid++) { + bs.bucket = source_bs.bucket; + bs.shard_id = sid; + error_repo = datalog_oid_for_error_repo(bs); + tn->log(10, SSTR("writing shard_id " << sid << " of gen " << each->gen << " to error repo for retry")); + yield_spawn_window(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, + rgw::error_repo::encode_key(bs, each->gen), + timestamp), cct->_conf->rgw_data_sync_spawn_window, + [&](uint64_t stack_id, int ret) { + if (ret < 0) { + tn->log(10, SSTR("writing to error repo returned error: " << ret)); + *error_result = ret; + } + return 0; + }); } } + drain_all_cb([&](uint64_t stack_id, int ret) { + if (ret < 0) { + tn->log(10, SSTR("writing to error repo returned error: " << ret)); + *error_result = ret; + } + return 0; + }); + + return set_cr_done(); } return 0; } @@ -1514,6 +1538,7 @@ class RGWDataSyncShardCR : public RGWCoroutine { rgw_bucket_shard source_bs; std::optional gen; + int error_result{0}; // target number of entries to cache before recycling idle ones static constexpr size_t target_cache_size = 256; @@ -1765,13 +1790,17 @@ public: } if (!gen) { // write all full sync obligations for the bucket to error repo - yield call(new RGWFullSyncErrorRepoCR(sc, source_bs, tn)); - if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to write to error repo: retcode=" << retcode)); + spawn(new RGWFullSyncErrorRepoCR(sc, source_bs, entry_timestamp, &error_result, tn), false); + if (error_result == 0) { + spawn(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo, + error_marker, entry_timestamp), false); + } else { + tn->log(0, SSTR("ERROR: failed to write to error repo: retcode=" << error_result)); } + } else { + tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp)); + spawn(sync_single_entry(source_bs, gen, "", entry_timestamp, true), false); } - tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp)); - spawn(sync_single_entry(source_bs, gen, "", entry_timestamp, true), false); } if (!omapvals->more) { error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs); diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 040b65d5d0b5d..b35744206cf61 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -667,6 +667,21 @@ struct bilog_status_v2 { void decode_json(JSONObj *obj); }; +struct store_gen_shards { + uint64_t gen = 0; + uint32_t num_shards = 0; + + void dump(Formatter *f) const { + encode_json("gen", gen, f); + encode_json("num_shards", num_shards, f); + } + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("gen", gen, obj); + JSONDecoder::decode_json("num_shards", num_shards, obj); + } +}; + struct rgw_bucket_index_marker_info { std::string bucket_ver; std::string master_ver; @@ -674,7 +689,7 @@ struct rgw_bucket_index_marker_info { bool syncstopped{false}; uint64_t oldest_gen = 0; uint64_t latest_gen = 0; - std::vector> gen_numshards; + std::vector generations; void decode_json(JSONObj *obj) { JSONDecoder::decode_json("bucket_ver", bucket_ver, obj); @@ -683,6 +698,7 @@ struct rgw_bucket_index_marker_info { JSONDecoder::decode_json("syncstopped", syncstopped, obj); JSONDecoder::decode_json("oldest_gen", oldest_gen, obj); JSONDecoder::decode_json("latest_gen", latest_gen, obj); + JSONDecoder::decode_json("generations", generations, obj); } }; diff --git a/src/rgw/rgw_rest_log.cc b/src/rgw/rgw_rest_log.cc index b3940e58de3fd..81622cab5b41c 100644 --- a/src/rgw/rgw_rest_log.cc +++ b/src/rgw/rgw_rest_log.cc @@ -568,7 +568,7 @@ void RGWOp_BILog_Info::execute(optional_yield y) { for (auto& log : logs) { uint32_t num_shards = log.layout.in_index.layout.num_shards; - gen_numshards.push_back(std::make_pair(log.gen, num_shards)); + generations.push_back({log.gen, num_shards}); } } @@ -587,7 +587,7 @@ void RGWOp_BILog_Info::send_response() { encode_json("syncstopped", syncstopped, s->formatter); encode_json("oldest_gen", oldest_gen, s->formatter); encode_json("latest_gen", latest_gen, s->formatter); - //encode_json("gen_numshards", gen_numshards, s->formatter); TODO: add supporting encode/decode for std::pair + encode_json("generations", generations, s->formatter); s->formatter->close_section(); flusher.flush(); diff --git a/src/rgw/rgw_rest_log.h b/src/rgw/rgw_rest_log.h index 9eed2526b76d4..36936f1eb4f7b 100644 --- a/src/rgw/rgw_rest_log.h +++ b/src/rgw/rgw_rest_log.h @@ -20,6 +20,7 @@ #include "rgw_rest_s3.h" #include "rgw_metadata.h" #include "rgw_mdlog.h" +#include "rgw_data_sync.h" class RGWOp_BILog_List : public RGWRESTOp { bool sent_header; @@ -53,7 +54,7 @@ class RGWOp_BILog_Info : public RGWRESTOp { bool syncstopped; uint64_t oldest_gen = 0; uint64_t latest_gen = 0; - std::vector> gen_numshards; + std::vector generations; public: RGWOp_BILog_Info() : bucket_ver(), master_ver(), syncstopped(false) {} -- 2.39.5