From: Adam C. Emerson Date: Fri, 14 May 2021 19:44:01 +0000 (-0400) Subject: rgw: BucketInstanceTrimCR knows about generations X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=6a36cb11ea6786177433cd63aff6f3e8d447bb18;p=ceph.git rgw: BucketInstanceTrimCR knows about generations Fetch the current generation from remote peers and trim the minimum marker on the minimum generation. Signed-off-by: Adam C. Emerson --- diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index 1c18d3ea1aa24..65ac4e0aa55c2 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -604,17 +604,19 @@ int RGWAsyncGetBucketInstanceInfo::_send_request(const DoutPrefixProvider *dpp) return 0; } -RGWRadosBILogTrimCR::RGWRadosBILogTrimCR(const DoutPrefixProvider *dpp, - rgw::sal::RadosStore* store, - const RGWBucketInfo& bucket_info, - int shard_id, - const std::string& start_marker, - const std::string& end_marker) +RGWRadosBILogTrimCR::RGWRadosBILogTrimCR( + const DoutPrefixProvider *dpp, + rgw::sal::RadosStore* store, + const RGWBucketInfo& bucket_info, + int shard_id, + const rgw::bucket_index_layout_generation& generation, + const std::string& start_marker, + const std::string& end_marker) : RGWSimpleCoroutine(store->ctx()), bs(store->getRados()), start_marker(BucketIndexShardsManager::get_shard_marker(start_marker)), end_marker(BucketIndexShardsManager::get_shard_marker(end_marker)) { - bs.init(dpp, bucket_info, bucket_info.layout.current_index, shard_id); + bs.init(dpp, bucket_info, generation, shard_id); } int RGWRadosBILogTrimCR::send_request(const DoutPrefixProvider *dpp) diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index 8dc5a9d17d270..f83080d53e7bb 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -929,7 +929,9 @@ class RGWRadosBILogTrimCR : public RGWSimpleCoroutine { public: RGWRadosBILogTrimCR(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* store, const RGWBucketInfo& bucket_info, - int shard_id, const std::string& start_marker, + int shard_id, + const rgw::bucket_index_layout_generation& generation, + const std::string& start_marker, const std::string& end_marker); int send_request(const DoutPrefixProvider *dpp) override; diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h index e0e1b12abe5d7..a0be6e2dd2fe3 100644 --- a/src/rgw/rgw_rest_conn.h +++ b/src/rgw/rgw_rest_conn.h @@ -16,8 +16,8 @@ namespace rgw { namespace sal { class RGWSI_Zone; -template -static int parse_decode_json(T& t, bufferlist& bl) +template +inline int parse_decode_json(T& t, bufferlist& bl) { JSONParser p; if (!p.parse(bl.c_str(), bl.length())) { diff --git a/src/rgw/rgw_trim_bilog.cc b/src/rgw/rgw_trim_bilog.cc index 9508d9d029831..7848d979dc331 100644 --- a/src/rgw/rgw_trim_bilog.cc +++ b/src/rgw/rgw_trim_bilog.cc @@ -353,32 +353,6 @@ struct BucketTrimObserver { virtual bool trimmed_recently(const std::string_view& bucket_instance) = 0; }; -/// populate the status with the minimum stable marker of each shard -template -int take_min_status(CephContext *cct, Iter first, Iter last, - std::vector *status) -{ - for (auto peer = first; peer != last; ++peer) { - if (peer->size() != status->size()) { - // all peers must agree on the number of shards - return -EINVAL; - } - auto m = status->begin(); - for (auto& shard : *peer) { - auto& marker = *m++; - // if no sync has started, we can safely trim everything - if (shard.state == rgw_bucket_shard_sync_info::StateInit) { - continue; - } - // always take the first marker, or any later marker that's smaller - if (peer == first || marker > shard.inc_marker.position) { - marker = std::move(shard.inc_marker.position); - } - } - } - return 0; -} - /// trim each bilog shard to the given marker, while limiting the number of /// concurrent requests class BucketTrimShardCollectCR : public RGWShardCollectCR { @@ -386,6 +360,7 @@ class BucketTrimShardCollectCR : public RGWShardCollectCR { const DoutPrefixProvider *dpp; rgw::sal::RadosStore* const store; const RGWBucketInfo& bucket_info; + rgw::bucket_index_layout_generation generation; const std::vector& markers; //< shard markers to trim size_t i{0}; //< index of current shard marker @@ -401,9 +376,11 @@ class BucketTrimShardCollectCR : public RGWShardCollectCR { public: BucketTrimShardCollectCR(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* store, const RGWBucketInfo& bucket_info, + rgw::bucket_index_layout_generation generation, const std::vector& markers) : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS), - dpp(dpp), store(store), bucket_info(bucket_info), markers(markers) + dpp(dpp), store(store), bucket_info(bucket_info), + generation(generation), markers(markers) {} bool spawn_next() override; }; @@ -419,7 +396,7 @@ bool BucketTrimShardCollectCR::spawn_next() ldpp_dout(dpp, 10) << "trimming bilog shard " << shard_id << " of " << bucket_info.bucket << " at marker " << marker << dendl; spawn(new RGWRadosBILogTrimCR(dpp, store, bucket_info, shard_id, - std::string{}, marker), + generation, std::string{}, marker), false); return true; } @@ -437,15 +414,60 @@ class BucketTrimInstanceCR : public RGWCoroutine { std::shared_ptr source_policy; rgw_bucket bucket; const std::string& zone_id; //< my zone id - RGWBucketInfo _bucket_info; + RGWBucketInfo _bucket_info; const RGWBucketInfo *pbucket_info; //< pointer to bucket instance info to locate bucket indices int child_ret = 0; const DoutPrefixProvider *dpp; - using StatusShards = std::vector; +public: + struct StatusShards { + uint64_t generation = 0; + std::vector shards; + }; +private: std::vector peer_status; //< sync status for each peer std::vector min_markers; //< min marker per shard + /// The layout of the generation to trim + rgw::bucket_index_layout_generation totrim; + + int take_min_generation() { + // Initialize the min_generation to the bucket's current + // generation, used in case we have no peers. + auto min_generation = pbucket_info->layout.logs.back().gen; + + // Determine the minimum generation + if (auto m = std::min_element(peer_status.begin(), + peer_status.end(), + [](const StatusShards& l, + const StatusShards& r) { + return l.generation < r.generation; + }); m != peer_status.end()) { + min_generation = m->generation; + } + + auto& logs = pbucket_info->layout.logs; + auto log = std::find_if(logs.begin(), logs.end(), + rgw::matches_gen(min_generation)); + if (log == logs.end()) { + ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << "ERROR: No log layout for min_generation=" + << min_generation << dendl; + return -ENOENT; + } + + if (log->layout.type == rgw::BucketLogType::InIndex) { + totrim = log->layout.in_index; + } else { + ldpp_dout(dpp, 0) << "Unable to convert log of unknown type " + << log->layout.type + << " to rgw::bucket_index_layout_generation " << dendl; + return -EINVAL; + } + + return 0; + } + public: BucketTrimInstanceCR(rgw::sal::RadosStore* store, RGWHTTPManager *http, BucketTrimObserver *observer, @@ -463,6 +485,69 @@ class BucketTrimInstanceCR : public RGWCoroutine { int operate(const DoutPrefixProvider *dpp) override; }; +namespace { +/// populate the status with the minimum stable marker of each shard +int take_min_status( + CephContext *cct, + const uint64_t min_generation, + std::vector::const_iterator first, + std::vector::const_iterator last, + std::vector *status) { + for (auto peer = first; peer != last; ++peer) { + if (peer->shards.size() != status->size()) { + // all peers must agree on the number of shards + return -EINVAL; + } + + auto m = status->begin(); + for (auto& shard : peer->shards) { + auto& marker = *m++; + // Peers on later generations don't get a say in the matter + if (peer->generation > min_generation) { + continue; + } + // if no sync has started, we can safely trim everything + if (shard.state == rgw_bucket_shard_sync_info::StateInit) { + continue; + } + // always take the first marker, or any later marker that's smaller + if (peer == first || marker > shard.inc_marker.position) { + marker = std::move(shard.inc_marker.position); + } + } + } + return 0; +} +} + +template<> +inline int parse_decode_json( + BucketTrimInstanceCR::StatusShards& s, bufferlist& bl) +{ + JSONParser p; + if (!p.parse(bl.c_str(), bl.length())) { + return -EINVAL; + } + + try { + bilog_status_v2 v; + decode_json_obj(v, &p); + s.generation = v.sync_status.incremental_gen; + s.shards = std::move(v.inc_status); + } catch (JSONDecoder::err& e) { + try { + // Fall back if we're talking to an old node that can't give v2 + // output. + s.generation = 0; + decode_json_obj(s.shards, &p); + } catch (JSONDecoder::err& e) { + return -EINVAL; + } + } + return 0; +} + + int BucketTrimInstanceCR::operate(const DoutPrefixProvider *dpp) { reenter(this) { @@ -521,6 +606,7 @@ int BucketTrimInstanceCR::operate(const DoutPrefixProvider *dpp) { "bucket", bucket_instance.c_str() }, /* equal to source-bucket when `options==merge` and source-bucket param is not provided */ { "source-zone", zone_id.c_str() }, + { "version", "2" }, { nullptr, nullptr } }; @@ -529,7 +615,8 @@ int BucketTrimInstanceCR::operate(const DoutPrefixProvider *dpp) ldpp_dout(dpp, 0) << "WARNING: no connection to zone " << zid << ", can't trim bucket: " << bucket << dendl; return set_cr_error(-ECANCELED); } - using StatusCR = RGWReadRESTResourceCR; + + using StatusCR = RGWReadRESTResourceCR; spawn(new StatusCR(cct, ziter->second, http, "/admin/log/", params, &*p), false); ++p; @@ -550,9 +637,15 @@ int BucketTrimInstanceCR::operate(const DoutPrefixProvider *dpp) min_markers.assign(std::max(1u, pbucket_info->layout.current_index.layout.normal.num_shards), RGWSyncLogTrimCR::max_marker); + // Determine the minimum generation + retcode = take_min_generation(); + if (retcode < 0) { + ldpp_dout(dpp, 4) << "failed to find minimum generation" << dendl; + return set_cr_error(retcode); + } // determine the minimum marker for each shard - retcode = take_min_status(cct, peer_status.begin(), peer_status.end(), - &min_markers); + retcode = take_min_status(cct, totrim.gen, peer_status.cbegin(), + peer_status.cend(), &min_markers); if (retcode < 0) { ldpp_dout(dpp, 4) << "failed to correlate bucket sync status from peers" << dendl; return set_cr_error(retcode); @@ -562,7 +655,8 @@ int BucketTrimInstanceCR::operate(const DoutPrefixProvider *dpp) ldpp_dout(dpp, 10) << "trimming bilogs for bucket=" << pbucket_info->bucket << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl; set_status("trimming bilog shards"); - yield call(new BucketTrimShardCollectCR(dpp, store, *pbucket_info, min_markers)); + yield call(new BucketTrimShardCollectCR(dpp, store, *pbucket_info, totrim, + min_markers)); // ENODATA just means there were no keys to trim if (retcode == -ENODATA) { retcode = 0;