return 0;
} // cancel_reshard
-static int commit_reshard(rgw::sal::RadosStore* store,
- RGWBucketInfo& bucket_info,
- std::map<std::string, bufferlist>& bucket_attrs,
- ReshardFaultInjector& fault,
- const DoutPrefixProvider *dpp)
+static int commit_target_layout(rgw::sal::RadosStore* store,
+ RGWBucketInfo& bucket_info,
+ std::map<std::string, bufferlist>& bucket_attrs,
+ ReshardFaultInjector& fault,
+ const DoutPrefixProvider *dpp)
{
- static constexpr auto max_retries = 10;
auto& layout = bucket_info.layout;
- auto prev = layout; // make a copy for cleanup
const auto next_log_gen = layout.logs.empty() ? 1 :
layout.logs.back().gen + 1;
- bool remove_index = true;
-
if (!store->svc()->zone->need_to_log_data()) {
// if we're not syncing data, we can drop any existing logs
layout.logs.clear();
- } else {
- const auto last_index_gen = prev.current_index.gen;
- for (const auto& log : layout.logs) {
- if (log.layout.type == rgw::BucketLogType::InIndex &&
- log.layout.in_index.gen == last_index_gen) {
- // we're storing logs in this index gen, we can't delete it yet
- remove_index = false;
- break;
- }
- }
}
// use the new index layout as current
int ret = fault.check("commit_target_layout");
if (ret == 0) { // no fault injected, write the bucket instance metadata
- int tries = 0;
- do {
- ret =
- store->getRados()->put_bucket_instance_info(bucket_info, false,
- real_time(),
- &bucket_attrs, dpp);
- ++tries;
- } while (ret == -ECANCELED && tries < max_retries);
+ ret = store->getRados()->put_bucket_instance_info(
+ bucket_info, false, real_time(), &bucket_attrs, dpp);
+ } else if (ret == -ECANCELED) {
+ fault.clear(); // clear the fault so a retry can succeed
}
+ return ret;
+} // commit_target_layout
+
+static int commit_reshard(rgw::sal::RadosStore* store,
+ RGWBucketInfo& bucket_info,
+ std::map<std::string, bufferlist>& bucket_attrs,
+ ReshardFaultInjector& fault,
+ const DoutPrefixProvider *dpp)
+{
+ auto prev = bucket_info.layout; // make a copy for cleanup
+
+ // retry in case of racing writes to the bucket instance metadata
+ static constexpr auto max_retries = 10;
+ int tries = 0;
+ int ret = 0;
+ do {
+ ret = commit_target_layout(store, bucket_info, bucket_attrs, fault, dpp);
+ if (ret == -ECANCELED) {
+ // racing write detected, read the latest bucket info and try again
+ auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
+ int ret2 = store->getRados()->get_bucket_instance_info(
+ obj_ctx, bucket_info.bucket, bucket_info,
+ nullptr, &bucket_attrs, null_yield, dpp);
+ if (ret2 < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to read "
+ "bucket info: " << cpp_strerror(ret2) << dendl;
+ ret = ret2;
+ break;
+ }
+
+ // check that we're still in the reshard state we started in
+ if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress) {
+ ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " raced with "
+ "reshard cancel" << dendl;
+ return -ECANCELED; // whatever canceled us already did the cleanup
+ }
+ if (bucket_info.layout.current_index != prev.current_index ||
+ bucket_info.layout.target_index != prev.target_index) {
+ ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " raced with "
+ "another reshard" << dendl;
+ return -ECANCELED; // whatever canceled us already did the cleanup
+ }
+
+ prev = bucket_info.layout; // update the copy
+ }
+ ++tries;
+ } while (ret == -ECANCELED && tries < max_retries);
if (ret < 0) {
- ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to write "
- "current index layout in bucket info: " << cpp_strerror(ret) << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to commit "
+ "target index layout: " << cpp_strerror(ret) << dendl;
bucket_info.layout = std::move(prev); // restore in-memory layout
}
}
- // on success, delete index shard objects from the old layout (ignore errors)
- if (remove_index) {
+ // check whether the old index objects are still needed for bilogs
+ const auto& logs = bucket_info.layout.logs;
+ auto log = std::find_if(logs.begin(), logs.end(),
+ [&prev] (const rgw::bucket_log_layout_generation& log) {
+ return log.layout.type == rgw::BucketLogType::InIndex
+ && log.layout.in_index.gen == prev.current_index.gen;
+ });
+ if (log == logs.end()) {
+ // delete the index objects (ignore errors)
store->svc()->bi->clean_index(dpp, bucket_info, prev.current_index);
}
return 0;