outer_reshard_lock(_outer_reshard_lock)
{ }
-int RGWBucketReshard::set_resharding_status(const DoutPrefixProvider *dpp,
- rgw::sal::RadosStore* store,
- const RGWBucketInfo& bucket_info,
- cls_rgw_reshard_status status)
+// sets reshard status of bucket index shards for the current index layout
+static int set_resharding_status(const DoutPrefixProvider *dpp,
+ rgw::sal::RadosStore* store,
+ const RGWBucketInfo& bucket_info,
+ cls_rgw_reshard_status status)
{
cls_rgw_bucket_instance_entry instance_entry;
instance_entry.set_status(status);
return 0;
}
-// reshard lock assumes lock is held
-int RGWBucketReshard::clear_resharding(const DoutPrefixProvider *dpp,
- rgw::sal::RadosStore* store,
- const RGWBucketInfo& bucket_info)
+// initialize a target index layout, create its bucket index shard objects, and
+// write the target layout to the bucket instance metadata
+static int init_target_layout(rgw::sal::RadosStore* store,
+ RGWBucketInfo& bucket_info,
+ const ReshardFaultInjector& fault,
+ uint32_t new_num_shards,
+ const DoutPrefixProvider* dpp)
{
- int ret = clear_index_shard_reshard_status(dpp, store, bucket_info);
+ uint64_t gen = bucket_info.layout.current_index.gen + 1;
+
+ auto& target = bucket_info.layout.target_index;
+ if (target) {
+ // a previous reshard failed or stalled, and its reshard lock dropped
+ ldpp_dout(dpp, 10) << __func__ << " removing existing target index "
+ "objects from a previous reshard attempt" << dendl;
+ // delete its existing shard objects (ignore errors)
+ store->svc()->bi->clean_index(dpp, bucket_info, *target);
+ // don't reuse this same generation in the new target layout, in case
+ // something is still trying to operate on its shard objects
+ gen = target->gen + 1;
+ }
+
+ // initialize a new normal target index layout generation
+ target.emplace();
+ target->layout.type = rgw::BucketIndexType::Normal;
+ target->layout.normal.num_shards = new_num_shards;
+ target->gen = gen;
+ // update resharding state
+ bucket_info.layout.resharding = rgw::BucketReshardState::InProgress;
+
+ // initialize the new bucket index shard objects
+ int ret = store->svc()->bi->init_index(dpp, bucket_info, *target);
if (ret < 0) {
- ldpp_dout(dpp, 0) << "RGWBucketReshard::" << __func__ <<
- " ERROR: error clearing reshard status from index shard " <<
- cpp_strerror(-ret) << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to initialize "
+ " target index shard objects: " << cpp_strerror(ret) << dendl;
return ret;
}
- cls_rgw_bucket_instance_entry instance_entry;
- ret = store->getRados()->bucket_set_reshard(dpp, bucket_info, instance_entry);
+ if (ret = fault.check("set_target_layout");
+ ret == 0) { // no fault injected, write the bucket instance metadata
+ ret = store->getRados()->put_bucket_instance_info(bucket_info, false,
+ real_time(), nullptr, dpp);
+ }
+
if (ret < 0) {
- ldpp_dout(dpp, 0) << "RGWReshard::" << __func__ <<
- " ERROR: error setting bucket resharding flag on bucket index: " <<
- cpp_strerror(-ret) << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to write "
+ "target index layout to bucket info: " << cpp_strerror(ret) << dendl;
+ // delete the target shard objects (ignore errors)
+ store->svc()->bi->clean_index(dpp, bucket_info, *target);
return ret;
}
-
return 0;
-}
-
-int RGWBucketReshard::clear_index_shard_reshard_status(const DoutPrefixProvider *dpp,
- rgw::sal::RadosStore* store,
- const RGWBucketInfo& bucket_info)
+} // init_target_layout
+
+// delete the bucket index shards associated with the target layout and remove
+// it from the bucket instance metadata
+static int revert_target_layout(rgw::sal::RadosStore* store,
+ RGWBucketInfo& bucket_info,
+ const ReshardFaultInjector& fault,
+ const DoutPrefixProvider* dpp)
{
- uint32_t num_shards = bucket_info.layout.current_index.layout.normal.num_shards;
+ auto& layout = bucket_info.layout;
+ auto prev = layout; // make a copy for cleanup
- if (num_shards < std::numeric_limits<uint32_t>::max()) {
- int ret = set_resharding_status(dpp, store, bucket_info,
- cls_rgw_reshard_status::NOT_RESHARDING);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << "RGWBucketReshard::" << __func__ <<
- " ERROR: error clearing reshard status from index shard " <<
- cpp_strerror(-ret) << dendl;
- return ret;
- }
+ // remove target index shard objects
+ int ret = store->svc()->bi->clean_index(dpp, bucket_info, *layout.target_index);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to remove "
+ "target index with: " << cpp_strerror(ret) << dendl;
+ ret = 0; // non-fatal error
}
- return 0;
-}
-
-static int set_target_layout(rgw::sal::RadosStore *store,
- int new_num_shards,
- RGWBucketInfo& bucket_info,
- const DoutPrefixProvider *dpp)
-{
- bucket_info.layout.target_index = std::nullopt; // to ensure empty target_index in case of an abruptly interrupted reshard process
- ceph_assert(!bucket_info.layout.target_index);
- bucket_info.layout.target_index.emplace();
-
- bucket_info.layout.target_index->layout.normal.num_shards = new_num_shards;
+ // clear target_index and resharding state
+ layout.target_index = std::nullopt;
+ layout.resharding = rgw::BucketReshardState::None;
- //increment generation number
- bucket_info.layout.target_index->gen = bucket_info.layout.current_index.gen;
- bucket_info.layout.target_index->gen++;
+ if (ret = fault.check("revert_target_layout");
+ ret == 0) { // no fault injected, revert the bucket instance metadata
+ ret = store->getRados()->put_bucket_instance_info(bucket_info, false,
+ real_time(), nullptr, dpp);
+ }
- int ret = static_cast<rgw::sal::RadosStore*>(store)->svc()->bi->init_index(dpp, bucket_info, *(bucket_info.layout.target_index));
if (ret < 0) {
- return ret;
- }
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to clear "
+ "target index layout in bucket info: " << cpp_strerror(ret) << dendl;
+ bucket_info.layout = std::move(prev); // restore in-memory layout
+ return ret;
+ }
return 0;
-}
+} // remove_target_layout
-int RGWBucketReshard::set_target_layout(int new_num_shards, const DoutPrefixProvider *dpp)
+static int init_reshard(rgw::sal::RadosStore* store,
+ RGWBucketInfo& bucket_info,
+ const ReshardFaultInjector& fault,
+ uint32_t new_num_shards,
+ const DoutPrefixProvider *dpp)
{
- int ret = update_bucket(rgw::BucketReshardState::InProgress, dpp);
+ int ret = init_target_layout(store, bucket_info, fault, new_num_shards, dpp);
if (ret < 0) {
- lderr(store->ctx()) << "ERROR: failed to store updated bucket instance info: " << dendl;
return ret;
}
- return ::set_target_layout(store, new_num_shards, bucket_info, dpp);
-}
-int RGWBucketReshard::cancel(const DoutPrefixProvider *dpp)
-{
- int ret = reshard_lock.lock(dpp);
+ if (ret = fault.check("block_writes");
+ ret == 0) { // no fault injected, block writes to the current index shards
+ ret = set_resharding_status(dpp, static_cast<rgw::sal::RadosStore*>(store), bucket_info,
+ cls_rgw_reshard_status::IN_PROGRESS);
+ }
+
if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to pause "
+ "writes to the current index: " << cpp_strerror(ret) << dendl;
+ // clean up the target layout (ignore errors)
+ revert_target_layout(store, bucket_info, fault, dpp);
return ret;
}
+ return 0;
+} // init_reshard
- ret = clear_resharding(dpp);
+static int cancel_reshard(rgw::sal::RadosStore* store,
+ RGWBucketInfo& bucket_info,
+ const ReshardFaultInjector& fault,
+ const DoutPrefixProvider *dpp)
+{
+ // unblock writes to the current index shard objects
+ int ret = set_resharding_status(dpp, static_cast<rgw::sal::RadosStore*>(store), bucket_info,
+ cls_rgw_reshard_status::NOT_RESHARDING);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to unblock "
+ "writes to current index objects: " << cpp_strerror(ret) << dendl;
+ ret = 0; // non-fatal error
+ }
- reshard_lock.unlock();
- return ret;
-}
+ return revert_target_layout(store, bucket_info, fault, dpp);
+} // cancel_reshard
-class BucketInfoReshardUpdate
+static int commit_reshard(rgw::sal::RadosStore* store,
+ RGWBucketInfo& bucket_info,
+ const ReshardFaultInjector& fault,
+ const DoutPrefixProvider *dpp)
{
- const DoutPrefixProvider *dpp;
- rgw::sal::RadosStore* store;
- RGWBucketInfo& bucket_info;
+ auto& layout = bucket_info.layout;
+ auto prev = layout; // make a copy for cleanup
+
+ // use the new index layout as current
+ ceph_assert(layout.target_index);
+ layout.current_index = std::move(*layout.target_index);
+ layout.target_index = std::nullopt;
+ layout.resharding = rgw::BucketReshardState::None;
+
+ int ret = fault.check("commit_target_layout");
+ if (ret == 0) { // no fault injected, write the bucket instance metadata
+ ret = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), nullptr, dpp);
+ }
- bool in_progress{false};
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to write "
+ "current index layout in bucket info: " << cpp_strerror(ret) << dendl;
- int set_status(rgw::BucketReshardState s, const DoutPrefixProvider *dpp) {
- bucket_info.layout.resharding = s;
- int ret = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), nullptr, dpp);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl;
- return ret;
- }
- return 0;
- }
+ bucket_info.layout = std::move(prev); // restore in-memory layout
-public:
- BucketInfoReshardUpdate(const DoutPrefixProvider *_dpp,
- rgw::sal::RadosStore *_store,
- RGWBucketInfo& _bucket_info) :
- dpp(_dpp),
- store(_store),
- bucket_info(_bucket_info)
- {}
-
- ~BucketInfoReshardUpdate() {
- if (in_progress) {
- // resharding must not have ended correctly, clean up
- int ret =
- RGWBucketReshard::clear_index_shard_reshard_status(dpp, store, bucket_info);
- if (ret < 0) {
- ldpp_dout(dpp, -1) << "Error: " << __func__ <<
- " clear_index_shard_status returned " << ret << dendl;
- }
- set_status(rgw::BucketReshardState::None, dpp);
+ // unblock writes to the current index shard objects
+ int ret2 = set_resharding_status(dpp, store, bucket_info,
+ cls_rgw_reshard_status::NOT_RESHARDING);
+ if (ret2 < 0) {
+ ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to unblock "
+ "writes to current index objects: " << cpp_strerror(ret2) << dendl;
+ // non-fatal error
}
+ return ret;
}
- int start() {
- int ret = set_status(rgw::BucketReshardState::InProgress, dpp);
- if (ret < 0) {
- return ret;
- }
- in_progress = true;
- return 0;
+ // on success, delete index shard objects from the old layout (ignore errors)
+ store->svc()->bi->clean_index(dpp, bucket_info, prev.current_index);
+
+ return 0;
+} // commit_reshard
+
+int RGWBucketReshard::clear_resharding(rgw::sal::RadosStore* store,
+ RGWBucketInfo& bucket_info,
+ const DoutPrefixProvider* dpp)
+{
+ constexpr ReshardFaultInjector no_fault;
+ return cancel_reshard(store, bucket_info, no_fault, dpp);
+}
+
+int RGWBucketReshard::cancel(const DoutPrefixProvider* dpp)
+{
+ int ret = reshard_lock.lock(dpp);
+ if (ret < 0) {
+ return ret;
}
- int complete() {
- int ret = set_status(rgw::BucketReshardState::None, dpp);
- if (ret < 0) {
- return ret;
- }
- in_progress = false;
- return 0;
+ if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress) {
+ ldpp_dout(dpp, -1) << "ERROR: bucket is not resharding" << dendl;
+ ret = -EINVAL;
+ } else {
+ ret = clear_resharding(store, bucket_info, dpp);
}
-};
+ reshard_lock.unlock();
+ return ret;
+}
RGWBucketReshardLock::RGWBucketReshardLock(rgw::sal::RadosStore* _store,
const std::string& reshard_lock_oid,
int RGWBucketReshard::do_reshard(int num_shards,
int max_entries,
- const ReshardFaultInjector& f,
bool verbose,
ostream *out,
Formatter *formatter,
return -EINVAL;
}
- // NB: destructor cleans up sharding state if reshard does not
- // complete successfully
- BucketInfoReshardUpdate bucket_info_updater(dpp, store, bucket_info);
-
- int ret = bucket_info_updater.start();
- if (ret < 0) {
- ldpp_dout(dpp, 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
- return ret;
- }
-
- int num_target_shards = bucket_info.layout.target_index->layout.normal.num_shards;
+ int num_target_shards = rgw::current_num_shards(bucket_info.layout);
BucketReshardManager target_shards_mgr(dpp, store, bucket_info, num_target_shards);
const std::string null_object_filter; // empty string since we're not filtering by object
while (is_truncated) {
entries.clear();
- ret = store->getRados()->bi_list(dpp, bucket_info, i, null_object_filter, marker, max_entries, &entries, &is_truncated);
+ int ret = store->getRados()->bi_list(dpp, bucket_info, i, null_object_filter, marker, max_entries, &entries, &is_truncated);
if (ret < 0 && ret != -ENOENT) {
derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
return ret;
return ret;
}
- if (ret = f.check("before_target_shard_entry"); ret < 0) { return ret; }
-
int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
ret = target_shards_mgr.add_entry(shard_index, entry, account,
return ret;
}
- if (ret = f.check("after_target_shard_entry"); ret < 0) { return ret; }
-
Clock::time_point now = Clock::now();
if (reshard_lock.should_renew(now)) {
// assume outer locks have timespans at least the size of ours, so
} else if (out) {
(*out) << " " << total_entries << std::endl;
}
-
- ret = target_shards_mgr.finish();
+
+ int ret = target_shards_mgr.finish();
if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed to reshard" << dendl;
return -EIO;
}
-
- if (ret = f.check("before_layout_overwrite"); ret < 0) { return ret; }
-
- //overwrite current_index for the next reshard process
- bucket_info.layout.current_index = *bucket_info.layout.target_index;
- bucket_info.layout.target_index = std::nullopt; // target_layout doesn't need to exist after reshard
-
- ret = update_bucket(rgw::BucketReshardState::None, dpp);
- if (ret < 0) {
- ldpp_dout(dpp, -1) << "ERROR: failed writing bucket instance info: " << dendl;
- return ret;
- }
-
return 0;
- // NB: some error clean-up is done by ~BucketInfoReshardUpdate
} // RGWBucketReshard::do_reshard
int RGWBucketReshard::get_status(const DoutPrefixProvider *dpp, list<cls_rgw_bucket_instance_entry> *status)
return store->svc()->bi_rados->get_reshard_status(dpp, bucket_info, status);
}
-int RGWBucketReshard::update_bucket(rgw::BucketReshardState s, const DoutPrefixProvider* dpp) {
- bucket_info.layout.resharding = s;
- int ret = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), nullptr, dpp);
- if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl;
- return ret;
- }
- return 0;
-}
-
int RGWBucketReshard::execute(int num_shards,
- const ReshardFaultInjector& f,
+ const ReshardFaultInjector& fault,
int max_op_entries,
const DoutPrefixProvider *dpp,
bool verbose, ostream *out,
Formatter *formatter,
RGWReshard* reshard_log)
{
+ // take a reshard lock on the bucket
int ret = reshard_lock.lock(dpp);
if (ret < 0) {
return ret;
}
-
- ret = set_target_layout(num_shards, dpp); //modifies existing bucket
- if (ret < 0) {
- // shard state is uncertain, but this will attempt to remove them anyway
- goto error_out;
- }
+ // unlock when scope exits
+ auto unlock = make_scope_guard([this] { reshard_lock.unlock(); });
if (reshard_log) {
ret = reshard_log->update(dpp, bucket_info);
if (ret < 0) {
- goto error_out;
+ return ret;
}
}
- // keep a copy of old index layout
- prev_index = bucket_info.layout.current_index;
-
- ret = do_reshard(num_shards, max_op_entries, f, verbose, out, formatter, dpp);
+ // prepare the target index and add its layout the bucket info
+ ret = init_reshard(store, bucket_info, fault, num_shards, dpp);
if (ret < 0) {
- goto error_out;
+ return ret;
}
- // at this point we've done the main work; we'll make a best-effort
- // to clean-up but will not indicate any errors encountered
-
- reshard_lock.unlock();
-
- // resharding successful, so remove old bucket index shards; use
- // best effort and don't report out an error; the lock isn't needed
- // at this point since all we're using a best effort to remove old
- // shard objects
-
- ret = store->svc()->bi->clean_index(dpp, bucket_info, prev_index);
- if (ret < 0) {
- ldpp_dout(dpp, -1) << __func__ << "Error: " << __func__ <<
- " failed to clean up old shards; " <<
- "RGWRados::clean_bucket_index returned " << ret << dendl;
+ if (ret = fault.check("do_reshard");
+ ret == 0) { // no fault injected, do the reshard
+ ret = do_reshard(num_shards, max_op_entries,
+ verbose, out, formatter, dpp);
}
- ldpp_dout(dpp, 1) << __func__ <<
- " INFO: reshard of bucket \"" << bucket_info.bucket.name << "\" completed successfully" << dendl;
-
- return 0;
-
-error_out:
-
- reshard_lock.unlock();
-
- //TODO: Cleanup failed incomplete resharding
- // since the real problem is the issue that led to this error code
- // path, we won't touch ret and instead use another variable to
- // temporarily error codes
+ if (ret < 0) {
+ cancel_reshard(store, bucket_info, fault, dpp);
- if (bucket_info.layout.target_index != std::nullopt) {
- int ret2 = store->svc()->bi->clean_index(dpp, bucket_info, *(bucket_info.layout.target_index));
- if (ret2 < 0) {
- ldpp_dout(dpp, -1) << "Error: " << __func__ <<
- " failed to clean up shards from failed incomplete resharding; " <<
- "RGWRados::clean_bucket_index returned " << ret2 << dendl;
- }
+ ldpp_dout(dpp, 1) << __func__ << " INFO: reshard of bucket \""
+ << bucket_info.bucket.name << "\" canceled due to errors" << dendl;
+ return ret;
}
- // restore old index
- bucket_info.layout.current_index = prev_index;
- ret = update_bucket(rgw::BucketReshardState::None, dpp);
+ ret = commit_reshard(store, bucket_info, fault, dpp);
if (ret < 0) {
- lderr(store->ctx()) << "ERROR: failed to store updated bucket instance info: " << dendl;
return ret;
}
- return ret;
+ ldpp_dout(dpp, 1) << __func__ << " INFO: reshard of bucket \""
+ << bucket_info.bucket.name << "\" completed successfully" << dendl;
+ return 0;
} // execute