virtual bool trimmed_recently(const std::string_view& bucket_instance) = 0;
};
-/// populate the status with the minimum stable marker of each shard
-template <typename Iter>
-int take_min_status(CephContext *cct, Iter first, Iter last,
- std::vector<std::string> *status)
-{
- for (auto peer = first; peer != last; ++peer) {
- if (peer->size() != status->size()) {
- // all peers must agree on the number of shards
- return -EINVAL;
- }
- auto m = status->begin();
- for (auto& shard : *peer) {
- auto& marker = *m++;
- // if no sync has started, we can safely trim everything
- if (shard.state == rgw_bucket_shard_sync_info::StateInit) {
- continue;
- }
- // always take the first marker, or any later marker that's smaller
- if (peer == first || marker > shard.inc_marker.position) {
- marker = std::move(shard.inc_marker.position);
- }
- }
- }
- return 0;
-}
-
/// trim each bilog shard to the given marker, while limiting the number of
/// concurrent requests
class BucketTrimShardCollectCR : public RGWShardCollectCR {
const DoutPrefixProvider *dpp;
rgw::sal::RadosStore* const store;
const RGWBucketInfo& bucket_info;
+ rgw::bucket_index_layout_generation generation;
const std::vector<std::string>& markers; //< shard markers to trim
size_t i{0}; //< index of current shard marker
public:
BucketTrimShardCollectCR(const DoutPrefixProvider *dpp,
rgw::sal::RadosStore* store, const RGWBucketInfo& bucket_info,
+ rgw::bucket_index_layout_generation generation,
const std::vector<std::string>& markers)
: RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS),
- dpp(dpp), store(store), bucket_info(bucket_info), markers(markers)
+ dpp(dpp), store(store), bucket_info(bucket_info),
+ generation(generation), markers(markers)
{}
bool spawn_next() override;
};
ldpp_dout(dpp, 10) << "trimming bilog shard " << shard_id
<< " of " << bucket_info.bucket << " at marker " << marker << dendl;
spawn(new RGWRadosBILogTrimCR(dpp, store, bucket_info, shard_id,
- std::string{}, marker),
+ generation, std::string{}, marker),
false);
return true;
}
std::shared_ptr<rgw_bucket_get_sync_policy_result> source_policy;
rgw_bucket bucket;
const std::string& zone_id; //< my zone id
- RGWBucketInfo _bucket_info;
+ RGWBucketInfo _bucket_info;
const RGWBucketInfo *pbucket_info; //< pointer to bucket instance info to locate bucket indices
int child_ret = 0;
const DoutPrefixProvider *dpp;
- using StatusShards = std::vector<rgw_bucket_shard_sync_info>;
+public:
+ struct StatusShards {
+ uint64_t generation = 0;
+ std::vector<rgw_bucket_shard_sync_info> shards;
+ };
+private:
std::vector<StatusShards> peer_status; //< sync status for each peer
std::vector<std::string> min_markers; //< min marker per shard
+ /// The layout of the generation to trim
+ rgw::bucket_index_layout_generation totrim;
+
+ int take_min_generation() {
+ // Initialize the min_generation to the bucket's current
+ // generation, used in case we have no peers.
+ auto min_generation = pbucket_info->layout.logs.back().gen;
+
+ // Determine the minimum generation
+ if (auto m = std::min_element(peer_status.begin(),
+ peer_status.end(),
+ [](const StatusShards& l,
+ const StatusShards& r) {
+ return l.generation < r.generation;
+ }); m != peer_status.end()) {
+ min_generation = m->generation;
+ }
+
+ auto& logs = pbucket_info->layout.logs;
+ auto log = std::find_if(logs.begin(), logs.end(),
+ rgw::matches_gen(min_generation));
+ if (log == logs.end()) {
+ ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << "ERROR: No log layout for min_generation="
+ << min_generation << dendl;
+ return -ENOENT;
+ }
+
+ if (log->layout.type == rgw::BucketLogType::InIndex) {
+ totrim = log->layout.in_index;
+ } else {
+ ldpp_dout(dpp, 0) << "Unable to convert log of unknown type "
+ << log->layout.type
+ << " to rgw::bucket_index_layout_generation " << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+ }
+
public:
BucketTrimInstanceCR(rgw::sal::RadosStore* store, RGWHTTPManager *http,
BucketTrimObserver *observer,
int operate(const DoutPrefixProvider *dpp) override;
};
+namespace {
+/// populate the status with the minimum stable marker of each shard
+int take_min_status(
+ CephContext *cct,
+ const uint64_t min_generation,
+ std::vector<BucketTrimInstanceCR::StatusShards>::const_iterator first,
+ std::vector<BucketTrimInstanceCR::StatusShards>::const_iterator last,
+ std::vector<std::string> *status) {
+ for (auto peer = first; peer != last; ++peer) {
+ if (peer->shards.size() != status->size()) {
+ // all peers must agree on the number of shards
+ return -EINVAL;
+ }
+
+ auto m = status->begin();
+ for (auto& shard : peer->shards) {
+ auto& marker = *m++;
+ // Peers on later generations don't get a say in the matter
+ if (peer->generation > min_generation) {
+ continue;
+ }
+ // if no sync has started, we can safely trim everything
+ if (shard.state == rgw_bucket_shard_sync_info::StateInit) {
+ continue;
+ }
+ // always take the first marker, or any later marker that's smaller
+ if (peer == first || marker > shard.inc_marker.position) {
+ marker = std::move(shard.inc_marker.position);
+ }
+ }
+ }
+ return 0;
+}
+}
+
+template<>
+inline int parse_decode_json<BucketTrimInstanceCR::StatusShards>(
+ BucketTrimInstanceCR::StatusShards& s, bufferlist& bl)
+{
+ JSONParser p;
+ if (!p.parse(bl.c_str(), bl.length())) {
+ return -EINVAL;
+ }
+
+ try {
+ bilog_status_v2 v;
+ decode_json_obj(v, &p);
+ s.generation = v.sync_status.incremental_gen;
+ s.shards = std::move(v.inc_status);
+ } catch (JSONDecoder::err& e) {
+ try {
+ // Fall back if we're talking to an old node that can't give v2
+ // output.
+ s.generation = 0;
+ decode_json_obj(s.shards, &p);
+ } catch (JSONDecoder::err& e) {
+ return -EINVAL;
+ }
+ }
+ return 0;
+}
+
+
int BucketTrimInstanceCR::operate(const DoutPrefixProvider *dpp)
{
reenter(this) {
{ "bucket", bucket_instance.c_str() }, /* equal to source-bucket when `options==merge` and source-bucket
param is not provided */
{ "source-zone", zone_id.c_str() },
+ { "version", "2" },
{ nullptr, nullptr }
};
ldpp_dout(dpp, 0) << "WARNING: no connection to zone " << zid << ", can't trim bucket: " << bucket << dendl;
return set_cr_error(-ECANCELED);
}
- using StatusCR = RGWReadRESTResourceCR<StatusShards>;
+
+ using StatusCR = RGWReadRESTResourceCR<StatusShards>;
spawn(new StatusCR(cct, ziter->second, http, "/admin/log/", params, &*p),
false);
++p;
min_markers.assign(std::max(1u, pbucket_info->layout.current_index.layout.normal.num_shards),
RGWSyncLogTrimCR::max_marker);
+ // Determine the minimum generation
+ retcode = take_min_generation();
+ if (retcode < 0) {
+ ldpp_dout(dpp, 4) << "failed to find minimum generation" << dendl;
+ return set_cr_error(retcode);
+ }
// determine the minimum marker for each shard
- retcode = take_min_status(cct, peer_status.begin(), peer_status.end(),
- &min_markers);
+ retcode = take_min_status(cct, totrim.gen, peer_status.cbegin(),
+ peer_status.cend(), &min_markers);
if (retcode < 0) {
ldpp_dout(dpp, 4) << "failed to correlate bucket sync status from peers" << dendl;
return set_cr_error(retcode);
ldpp_dout(dpp, 10) << "trimming bilogs for bucket=" << pbucket_info->bucket
<< " markers=" << min_markers << ", shards=" << min_markers.size() << dendl;
set_status("trimming bilog shards");
- yield call(new BucketTrimShardCollectCR(dpp, store, *pbucket_info, min_markers));
+ yield call(new BucketTrimShardCollectCR(dpp, store, *pbucket_info, totrim,
+ min_markers));
// ENODATA just means there were no keys to trim
if (retcode == -ENODATA) {
retcode = 0;