From 4c9e83f7cfa199fdc66aae374c462085896edade Mon Sep 17 00:00:00 2001 From: Shilpa Jagannath Date: Tue, 7 Jun 2022 16:58:45 -0400 Subject: [PATCH] rgw/multisite: build a coroutine class to handle error repo logic Signed-off-by: Shilpa Jagannath --- src/rgw/rgw_data_sync.cc | 88 ++++++++++++++++++++++++---------------- src/rgw/rgw_data_sync.h | 2 +- src/rgw/rgw_datalog.h | 3 +- src/rgw/rgw_rest_log.cc | 13 ++---- src/rgw/rgw_rest_log.h | 1 + 5 files changed, 60 insertions(+), 47 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 49efc9c96028a..179f59fb284c6 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1413,6 +1413,51 @@ public: } }; +class RGWFullSyncErrorRepoCR: public RGWCoroutine { + RGWDataSyncCtx *sc; + RGWDataSyncEnv *sync_env; + rgw_bucket_shard source_bs; + RGWSyncTraceNodeRef tn; + rgw_bucket_index_marker_info remote_info; + rgw_pool pool; + RGWDataChangesLog *datalog_changes{nullptr}; + + rgw_raw_obj datalog_oid_for_error_repo(rgw_bucket_shard& bs) { + int datalog_shard = datalog_changes->choose_oid(bs); + string oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, datalog_shard); + return rgw_raw_obj(pool, oid + ".retry"); + } + +public: + RGWFullSyncErrorRepoCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, RGWSyncTraceNodeRef& _tn) + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), + source_bs(_source_bs), tn(_tn) { + tn = sync_env->sync_tracer->add_node(_tn, "error_repo", SSTR(bucket_shard_str(source_bs))); + } + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + yield { + call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info)); + if (retcode < 0) { + return set_cr_error(retcode); + } + for (const auto& each : remote_info.gen_numshards) { + for (uint32_t sid = 0; sid < each.second; sid++) { + rgw_bucket_shard bs(source_bs.bucket, sid); + rgw_raw_obj error_repo = datalog_oid_for_error_repo(bs); + tn->log(10, SSTR("writing shard_id " << sid << "of gen" << each.first << " to error repo for retry")); + call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, + rgw::error_repo::encode_key(bs, each.first), + ceph::real_time{})); + } + } + } + } + return 0; + } +}; + #define DATA_SYNC_MAX_ERR_ENTRIES 10 class RGWDataSyncShardCR : public RGWCoroutine { @@ -1468,6 +1513,7 @@ class RGWDataSyncShardCR : public RGWCoroutine { RGWSyncTraceNodeRef tn; rgw_bucket_shard source_bs; + std::optional gen; // target number of entries to cache before recycling idle ones static constexpr size_t target_cache_size = 256; @@ -1478,14 +1524,6 @@ class RGWDataSyncShardCR : public RGWCoroutine { &bs.bucket, &bs.shard_id); } - rgw_raw_obj datalog_oid_for_error_repo(rgw_bucket_shard& bs) { - auto shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0); - auto datalog_shard = (ceph_str_hash_linux(bs.bucket.name.data(), bs.bucket.name.size()) + - shard_shift) % cct->_conf->rgw_data_log_num_shards; - string oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, datalog_shard); - return rgw_raw_obj(pool, oid + ".retry"); - } - RGWCoroutine* sync_single_entry(const rgw_bucket_shard& src, std::optional gen, const std::string& marker, @@ -1714,7 +1752,6 @@ public: for (; iter != error_entries.end(); ++iter) { error_marker = iter->first; entry_timestamp = rgw::error_repo::decode_value(iter->second); - std::optional gen; retcode = rgw::error_repo::decode_key(iter->first, source_bs, gen); if (retcode == -EINVAL) { // backward compatibility for string keys that don't encode a gen @@ -1726,6 +1763,13 @@ public: error_marker, entry_timestamp), false); continue; } + if (!gen) { + // write all full sync obligations for the bucket to error repo + yield call(new RGWFullSyncErrorRepoCR(sc, source_bs, tn)); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: failed to write to error repo: retcode=" << retcode)); + } + } tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp)); spawn(sync_single_entry(source_bs, gen, "", entry_timestamp, true), false); } @@ -1758,32 +1802,6 @@ public: marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp); continue; } - if (!log_iter->entry.gen) { - yield { - rgw_bucket_index_marker_info remote_info; - BucketIndexShardsManager remote_markers; - retcode = rgw_read_remote_bilog_info(sync_env->dpp, sc->conn, source_bs.bucket, - remote_info, remote_markers, null_yield); - - if (retcode < 0) { - tn->log(1, SSTR(" rgw_read_remote_bilog_info failed with retcode=" << retcode)); - return retcode; - } - for (const auto& each : remote_info.gen_numshards) { - for (int sid = 0; sid < each.second; sid++) { - rgw_bucket_shard bs(source_bs.bucket, sid); - error_repo = datalog_oid_for_error_repo(bs); - tn->log(10, SSTR("writing shard_id " << sid << "of gen" << each.first << " to error repo for retry")); - call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, - rgw::error_repo::encode_key(bs, each.first), - ceph::real_time{})); - if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode)); - } - } - } - } - } if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) { tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?")); } else { diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 07814a7ec8736..040b65d5d0b5d 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -674,7 +674,7 @@ struct rgw_bucket_index_marker_info { bool syncstopped{false}; uint64_t oldest_gen = 0; uint64_t latest_gen = 0; - std::vector> gen_numshards; + std::vector> gen_numshards; void decode_json(JSONObj *obj) { JSONDecoder::decode_json("bucket_ver", bucket_ver, obj); diff --git a/src/rgw/rgw_datalog.h b/src/rgw/rgw_datalog.h index d54f76a3249a2..0bc4837c9c1bf 100644 --- a/src/rgw/rgw_datalog.h +++ b/src/rgw/rgw_datalog.h @@ -284,7 +284,6 @@ class RGWDataChangesLog { std::thread renew_thread; std::function bucket_filter; - int choose_oid(const rgw_bucket_shard& bs); bool going_down() const; bool filter_bucket(const DoutPrefixProvider *dpp, const rgw_bucket& bucket, optional_yield y) const; int renew_entries(const DoutPrefixProvider *dpp); @@ -296,7 +295,7 @@ public: int start(const DoutPrefixProvider *dpp, const RGWZone* _zone, const RGWZoneParams& zoneparams, librados::Rados* lr); - + int choose_oid(const rgw_bucket_shard& bs); int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw::bucket_log_layout_generation& gen, int shard_id); int get_log_shard_id(rgw_bucket& bucket, int shard_id); diff --git a/src/rgw/rgw_rest_log.cc b/src/rgw/rgw_rest_log.cc index 500c85bac6f95..b3940e58de3fd 100644 --- a/src/rgw/rgw_rest_log.cc +++ b/src/rgw/rgw_rest_log.cc @@ -566,15 +566,9 @@ void RGWOp_BILog_Info::execute(optional_yield y) { oldest_gen = logs.front().gen; latest_gen = logs.back().gen; - std::vector> gen_numshards; - for (auto gen = logs.front().gen; gen <= logs.back().gen; gen++) { - auto log = std::find_if(logs.begin(), logs.end(), rgw::matches_gen(gen)); - if (log == logs.end()) { - ldpp_dout(s, 5) << "ERROR: no log layout with gen=" << gen << dendl; - op_ret = -ENOENT; - } - const auto& num_shards = log->layout.in_index.layout.num_shards; - gen_numshards.push_back(std::make_pair(gen, num_shards)); + for (auto& log : logs) { + uint32_t num_shards = log.layout.in_index.layout.num_shards; + gen_numshards.push_back(std::make_pair(log.gen, num_shards)); } } @@ -593,6 +587,7 @@ void RGWOp_BILog_Info::send_response() { encode_json("syncstopped", syncstopped, s->formatter); encode_json("oldest_gen", oldest_gen, s->formatter); encode_json("latest_gen", latest_gen, s->formatter); + //encode_json("gen_numshards", gen_numshards, s->formatter); TODO: add supporting encode/decode for std::pair s->formatter->close_section(); flusher.flush(); diff --git a/src/rgw/rgw_rest_log.h b/src/rgw/rgw_rest_log.h index 3232f03ca6854..9eed2526b76d4 100644 --- a/src/rgw/rgw_rest_log.h +++ b/src/rgw/rgw_rest_log.h @@ -53,6 +53,7 @@ class RGWOp_BILog_Info : public RGWRESTOp { bool syncstopped; uint64_t oldest_gen = 0; uint64_t latest_gen = 0; + std::vector> gen_numshards; public: RGWOp_BILog_Info() : bucket_ver(), master_ver(), syncstopped(false) {} -- 2.39.5