public:
BucketTrimShardCollectCR(const DoutPrefixProvider *dpp,
rgw::sal::RadosStore* store, const RGWBucketInfo& bucket_info,
- rgw::bucket_index_layout_generation generation,
+ const rgw::bucket_index_layout_generation& generation,
const std::vector<std::string>& markers)
: RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS),
dpp(dpp), store(store), bucket_info(bucket_info),
return false;
}
+/// Delete a BI generation, limiting the number of requests in flight.
+class BucketCleanIndexCollectCR : public RGWShardCollectCR {
+ static constexpr int MAX_CONCURRENT_SHARDS = 16;
+ const DoutPrefixProvider *dpp;
+ rgw::sal::RadosStore* const store;
+ const RGWBucketInfo& bucket_info;
+ rgw::bucket_index_layout_generation index;
+ uint32_t shard = 0;
+ const uint32_t num_shards = rgw::num_shards(index);
+
+ int handle_result(int r) override {
+ if (r == -ENOENT) { // ENOENT is not a fatal error
+ return 0;
+ }
+ if (r < 0) {
+ ldout(cct, 4) << "clean index: " << cpp_strerror(r) << dendl;
+ }
+ return r;
+ }
+ public:
+ BucketCleanIndexCollectCR(const DoutPrefixProvider *dpp,
+ rgw::sal::RadosStore* store,
+ const RGWBucketInfo& bucket_info,
+ rgw::bucket_index_layout_generation index)
+ : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS),
+ dpp(dpp), store(store), bucket_info(bucket_info),
+ index(index)
+ {}
+ bool spawn_next() override {
+ if (shard < num_shards) {
+ RGWRados::BucketShard bs(store->getRados());
+ bs.init(dpp, bucket_info, index, shard);
+ spawn(new RGWRadosRemoveOidCR(store, std::move(bs.bucket_obj), nullptr),
+ false);
+ ++shard;
+ return true;
+ } else {
+ return false;
+ }
+ }
+};
+
+
/// trim the bilog of all of the given bucket instance's shards
class BucketTrimInstanceCR : public RGWCoroutine {
+ static constexpr auto MAX_RETRIES = 25u;
rgw::sal::RadosStore* const store;
RGWHTTPManager *const http;
BucketTrimObserver *const observer;
const RGWBucketInfo *pbucket_info; //< pointer to bucket instance info to locate bucket indices
int child_ret = 0;
const DoutPrefixProvider *dpp;
-
public:
struct StatusShards {
uint64_t generation = 0;
/// The layout of the generation to trim
rgw::bucket_index_layout_generation totrim;
+ /// Generation to be cleaned/New bucket info (if any)
+ std::optional<std::pair<RGWBucketInfo,
+ rgw::bucket_index_layout_generation>> clean_info;
+ /// Maximum number of times to attempt to put bucket info
+ unsigned retries = 0;
+
int take_min_generation() {
// Initialize the min_generation to the bucket's current
// generation, used in case we have no peers.
return 0;
}
+ /// If there is a generation below the minimum, prepare to clean it up.
+ int maybe_remove_generation() {
+ if (clean_info)
+ return 0;
+
+
+ if (pbucket_info->layout.logs.front().gen < totrim.gen) {
+ clean_info = {*pbucket_info, {}};
+ auto log = clean_info->first.layout.logs.cbegin();
+ if (log->layout.type == rgw::BucketLogType::InIndex) {
+ clean_info->second = log->layout.in_index;
+ } else {
+ ldpp_dout(dpp, 0) << "Unable to convert log of unknown type "
+ << log->layout.type
+ << " to rgw::bucket_index_layout_generation " << dendl;
+ return -EINVAL;
+ }
+
+ clean_info->first.layout.logs.erase(log);
+ }
+ return 0;
+ }
+
public:
BucketTrimInstanceCR(rgw::sal::RadosStore* store, RGWHTTPManager *http,
BucketTrimObserver *observer,
return 0;
}
-
int BucketTrimInstanceCR::operate(const DoutPrefixProvider *dpp)
{
reenter(this) {
}
}
- // initialize each shard with the maximum marker, which is only used when
- // there are no peers syncing from us
- min_markers.assign(std::max(1u, pbucket_info->layout.current_index.layout.normal.num_shards),
- RGWSyncLogTrimCR::max_marker);
-
// Determine the minimum generation
retcode = take_min_generation();
if (retcode < 0) {
ldpp_dout(dpp, 4) << "failed to find minimum generation" << dendl;
return set_cr_error(retcode);
}
- // determine the minimum marker for each shard
- retcode = take_min_status(cct, totrim.gen, peer_status.cbegin(),
- peer_status.cend(), &min_markers);
+ retcode = maybe_remove_generation();
if (retcode < 0) {
- ldpp_dout(dpp, 4) << "failed to correlate bucket sync status from peers" << dendl;
+ ldpp_dout(dpp, 4) << "error removing old generation from log: "
+ << cpp_strerror(retcode) << dendl;
return set_cr_error(retcode);
}
- // trim shards with a ShardCollectCR
- ldpp_dout(dpp, 10) << "trimming bilogs for bucket=" << pbucket_info->bucket
- << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl;
- set_status("trimming bilog shards");
- yield call(new BucketTrimShardCollectCR(dpp, store, *pbucket_info, totrim,
- min_markers));
- // ENODATA just means there were no keys to trim
- if (retcode == -ENODATA) {
- retcode = 0;
- }
- if (retcode < 0) {
- ldpp_dout(dpp, 4) << "failed to trim bilog shards: "
- << cpp_strerror(retcode) << dendl;
- return set_cr_error(retcode);
+ if (clean_info) {
+ yield call(new BucketCleanIndexCollectCR(dpp, store, clean_info->first,
+ clean_info->second));
+ if (retcode < 0) {
+ ldpp_dout(dpp, 0) << "failed to remove previous generation: "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+ while (clean_info && retries < MAX_RETRIES) {
+ yield call(new RGWPutBucketInstanceInfoCR(
+ store->svc()->rados->get_async_processor(),
+ store, clean_info->first, false, {},
+ nullptr, dpp));
+
+ // Raced, try again.
+ if (retcode == -ECANCELED) {
+ yield call(new RGWGetBucketInstanceInfoCR(
+ store->svc()->rados->get_async_processor(),
+ store, clean_info->first.bucket,
+ &(clean_info->first), nullptr, dpp));
+ if (retcode < 0) {
+ ldpp_dout(dpp, 0) << "failed to get bucket info: "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+ if (clean_info->first.layout.logs.front().gen ==
+ clean_info->second.gen) {
+ clean_info->first.layout.logs.erase(
+ clean_info->first.layout.logs.begin());
+ ++retries;
+ continue;
+ }
+ // Raced, but someone else did what we needed to.
+ retcode = 0;
+ }
+
+ if (retcode < 0) {
+ ldpp_dout(dpp, 0) << "failed to put bucket info: "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+ clean_info = std::nullopt;
+ }
+ } else {
+ // To avoid hammering the OSD too hard, either trim old
+ // generations OR trim the current one.
+
+ // determine the minimum marker for each shard
+
+ // initialize each shard with the maximum marker, which is only used when
+ // there are no peers syncing from us
+ min_markers.assign(std::max(1u, rgw::num_shards(totrim)),
+ RGWSyncLogTrimCR::max_marker);
+
+
+ retcode = take_min_status(cct, totrim.gen, peer_status.cbegin(),
+ peer_status.cend(), &min_markers);
+ if (retcode < 0) {
+ ldpp_dout(dpp, 4) << "failed to correlate bucket sync status from peers" << dendl;
+ return set_cr_error(retcode);
+ }
+
+ // trim shards with a ShardCollectCR
+ ldpp_dout(dpp, 10) << "trimming bilogs for bucket=" << pbucket_info->bucket
+ << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl;
+ set_status("trimming bilog shards");
+ yield call(new BucketTrimShardCollectCR(dpp, store, *pbucket_info, totrim,
+ min_markers));
+ // ENODATA just means there were no keys to trim
+ if (retcode == -ENODATA) {
+ retcode = 0;
+ }
+ if (retcode < 0) {
+ ldpp_dout(dpp, 4) << "failed to trim bilog shards: "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
}
observer->on_bucket_trimmed(std::move(bucket_instance));
}
/// trim each bucket instance while limiting the number of concurrent operations
+
class BucketTrimInstanceCollectCR : public RGWShardCollectCR {
rgw::sal::RadosStore* const store;
RGWHTTPManager *const http;
auto log_layout = *log;
- auto r = static_cast<rgw::sal::RadosStore*>(store)->svc()->bilog_rados
- ->log_trim(p, bucket_info, log_layout, shard_id, start_marker, end_marker);
+ auto r = store->svc()->bilog_rados->log_trim(p, bucket_info, log_layout, shard_id, start_marker, end_marker);
if (r < 0) {
ldpp_dout(p, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< "ERROR: bilog_rados->log_trim returned r=" << r << dendl;