int RGWBucketReshard::set_resharding_status(const DoutPrefixProvider *dpp,
rgw::sal::RadosStore* store,
const RGWBucketInfo& bucket_info,
- const string& new_instance_id,
+ const string& instance_id,
int32_t num_shards,
- cls_rgw_reshard_status status)
+ cls_rgw_reshard_status status)
{
- if (new_instance_id.empty()) {
- ldpp_dout(dpp, 0) << __func__ << " missing new bucket instance id" << dendl;
- return -EINVAL;
- }
-
cls_rgw_bucket_instance_entry instance_entry;
- instance_entry.set_status(new_instance_id, num_shards, status);
+ instance_entry.set_status(instance_id, num_shards, status);
int ret = store->getRados()->bucket_set_reshard(dpp, bucket_info, instance_entry);
if (ret < 0) {
static int create_new_bucket_instance(rgw::sal::RadosStore* store,
int new_num_shards,
- const RGWBucketInfo& bucket_info,
+ RGWBucketInfo& bucket_info,
map<string, bufferlist>& attrs,
RGWBucketInfo& new_bucket_info,
const DoutPrefixProvider *dpp)
store->getRados()->create_bucket_id(&new_bucket_info.bucket.bucket_id);
- new_bucket_info.layout.current_index.layout.normal.num_shards = new_num_shards;
- new_bucket_info.objv_tracker.clear();
+ bucket_info.layout.resharding = rgw::BucketReshardState::None;
new_bucket_info.new_bucket_instance_id.clear();
new_bucket_info.reshard_status = cls_rgw_reshard_status::NOT_RESHARDING;
bool in_progress{false};
- int set_status(cls_rgw_reshard_status s, const DoutPrefixProvider *dpp) {
- bucket_info.reshard_status = s;
+ int set_status(rgw::BucketReshardState s, const DoutPrefixProvider *dpp) {
+ bucket_info.layout.resharding = s;
int ret = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs, dpp);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl;
BucketInfoReshardUpdate(const DoutPrefixProvider *_dpp,
rgw::sal::RadosStore* _store,
RGWBucketInfo& _bucket_info,
- map<string, bufferlist>& _bucket_attrs,
- const string& new_bucket_id) :
+ map<string, bufferlist>& _bucket_attrs) :
dpp(_dpp),
store(_store),
bucket_info(_bucket_info),
bucket_attrs(_bucket_attrs)
- {
- bucket_info.new_bucket_instance_id = new_bucket_id;
- }
+ {}
~BucketInfoReshardUpdate() {
if (in_progress) {
bucket_info.new_bucket_instance_id.clear();
// clears new_bucket_instance as well
- set_status(cls_rgw_reshard_status::NOT_RESHARDING, dpp);
+ set_status(rgw::BucketReshardState::None, dpp);
}
}
int start() {
- int ret = set_status(cls_rgw_reshard_status::IN_PROGRESS, dpp);
+ int ret = set_status(rgw::BucketReshardState::InProgress, dpp);
if (ret < 0) {
return ret;
}
}
int complete() {
- int ret = set_status(cls_rgw_reshard_status::DONE, dpp);
+ int ret = set_status(rgw::BucketReshardState::None, dpp);
if (ret < 0) {
return ret;
}
int RGWBucketReshard::do_reshard(int num_shards,
- RGWBucketInfo& new_bucket_info,
int max_entries,
bool verbose,
ostream *out,
const DoutPrefixProvider *dpp)
{
if (out) {
- const rgw_bucket& bucket = bucket_info.bucket;
- (*out) << "tenant: " << bucket.tenant << std::endl;
- (*out) << "bucket name: " << bucket.name << std::endl;
- (*out) << "old bucket instance id: " << bucket.bucket_id <<
- std::endl;
- (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id <<
- std::endl;
+ (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl;
+ (*out) << "bucket name: " << bucket_info.bucket.name << std::endl;
}
/* update bucket info -- in progress*/
// NB: destructor cleans up sharding state if reshard does not
// complete successfully
- BucketInfoReshardUpdate bucket_info_updater(dpp, store, bucket_info, bucket_attrs, new_bucket_info.bucket.bucket_id);
+ BucketInfoReshardUpdate bucket_info_updater(dpp, store, bucket_info, bucket_attrs);
int ret = bucket_info_updater.start();
if (ret < 0) {
return ret;
}
- int num_target_shards = (new_bucket_info.layout.current_index.layout.normal.num_shards > 0 ? new_bucket_info.layout.current_index.layout.normal.num_shards : 1);
+ int num_target_shards = bucket_info.layout.target_index->layout.normal.num_shards;
- BucketReshardManager target_shards_mgr(dpp, store, new_bucket_info, num_target_shards);
+ BucketReshardManager target_shards_mgr(dpp, store, bucket_info, num_target_shards);
bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr);
rgw_bucket_category_stats stats;
bool account = entry.get_info(&cls_key, &category, &stats);
rgw_obj_key key(cls_key);
- rgw_obj obj(new_bucket_info.bucket, key);
+ rgw_obj obj(bucket_info.bucket, key);
RGWMPObj mp;
if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) {
// place the multipart .meta object on the same shard as its head object
obj.index_hash_source = mp.get_key();
}
- int ret = store->getRados()->get_target_shard_id(new_bucket_info.layout.current_index.layout.normal, obj.get_hash_object(), &target_shard_id);
+ int ret = store->getRados()->get_target_shard_id(bucket_info.layout.target_index->layout.normal, obj.get_hash_object(), &target_shard_id);
if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
return ret;
return -EIO;
}
- ret = store->ctl()->bucket->link_bucket(new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time, null_yield, dpp);
- if (ret < 0) {
- ldpp_dout(dpp, -1) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl;
- return ret;
- }
-
- ret = bucket_info_updater.complete();
+ //overwrite current_index for the next reshard process
+ bucket_info.layout.current_index = *bucket_info.layout.target_index;
+ bucket_info.layout.resharding = rgw::BucketReshardState::None;
+ ret = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs, dpp);
if (ret < 0) {
- ldpp_dout(dpp, 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
- /* don't error out, reshard process succeeded */
+ ldpp_dout(dpp, -1) << "ERROR: failed writing bucket instance info: " << dendl;
+ return ret;
}
return 0;
RGWBucketInfo new_bucket_info;
ret = create_new_bucket_instance(num_shards, new_bucket_info, dpp);
if (ret < 0) {
- // shard state is uncertain, but this will attempt to remove them anyway
+ return ret;
goto error_out;
}
if (reshard_log) {
- ret = reshard_log->update(dpp, bucket_info, new_bucket_info);
+ ret = reshard_log->update(dpp, bucket_info);
if (ret < 0) {
+ return ret;
goto error_out;
}
}
// set resharding status of current bucket_info & shards with
// information about planned resharding
- ret = set_resharding_status(dpp, new_bucket_info.bucket.bucket_id,
+ ret = set_resharding_status(dpp, bucket_info.bucket.bucket_id,
num_shards, cls_rgw_reshard_status::IN_PROGRESS);
if (ret < 0) {
+ return ret;
goto error_out;
}
ret = do_reshard(num_shards,
- new_bucket_info,
max_op_entries,
verbose, out, formatter, dpp);
if (ret < 0) {
reshard_lock.unlock();
- // resharding successful, so remove old bucket index shards; use
- // best effort and don't report out an error; the lock isn't needed
- // at this point since all we're using a best effor to to remove old
- // shard objects
- ret = store->svc()->bi->clean_index(dpp, bucket_info);
- if (ret < 0) {
- ldpp_dout(dpp, -1) << "Error: " << __func__ <<
- " failed to clean up old shards; " <<
- "RGWRados::clean_bucket_index returned " << ret << dendl;
- }
+ // resharding successful, so remove old bucket index shards; use
+ // best effort and don't report out an error; the lock isn't needed
+ // at this point since all we're using a best effor to to remove old
+ // shard objects
- ret = store->ctl()->bucket->remove_bucket_instance_info(bucket_info.bucket,
- bucket_info, null_yield, dpp);
- if (ret < 0) {
- ldpp_dout(dpp, -1) << "Error: " << __func__ <<
- " failed to clean old bucket info object \"" <<
- bucket_info.bucket.get_key() <<
- "\"created after successful resharding with error " << ret << dendl;
+ ret = store->svc()->bi->clean_index(dpp, bucket_info);
+ if (ret < 0) {
+ lderr(store->ctx()) << "Error: " << __func__ <<
+ " failed to clean up old shards; " <<
+ "RGWRados::clean_bucket_index returned " << ret << dendl;
}
ldpp_dout(dpp, 1) << __func__ <<
- " INFO: reshard of bucket \"" << bucket_info.bucket.name << "\" from \"" <<
- bucket_info.bucket.get_key() << "\" to \"" <<
- new_bucket_info.bucket.get_key() << "\" completed successfully" << dendl;
+ " INFO: reshard of bucket \"" << bucket_info.bucket.name << "\" completed successfully" << dendl;
return 0;
reshard_lock.unlock();
+
+ //TODO: Cleanup failed incomplete resharding
// since the real problem is the issue that led to this error code
// path, we won't touch ret and instead use another variable to
// temporarily error codes
- int ret2 = store->svc()->bi->clean_index(dpp, new_bucket_info);
+ int ret2 = store->svc()->bi->clean_index(dpp, bucket_info);
if (ret2 < 0) {
ldpp_dout(dpp, -1) << "Error: " << __func__ <<
" failed to clean up shards from failed incomplete resharding; " <<
"RGWRados::clean_bucket_index returned " << ret2 << dendl;
}
- ret2 = store->ctl()->bucket->remove_bucket_instance_info(new_bucket_info.bucket,
- new_bucket_info,
- null_yield, dpp);
- if (ret2 < 0) {
- ldpp_dout(dpp, -1) << "Error: " << __func__ <<
- " failed to clean bucket info object \"" <<
- new_bucket_info.bucket.get_key() <<
- "\"created during incomplete resharding with error " << ret2 << dendl;
- }
-
return ret;
} // execute
int RGWReshard::add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry)
{
- if (!store->svc()->zone->can_reshard()) {
- ldpp_dout(dpp, 20) << __func__ << " Resharding is disabled" << dendl;
- return 0;
- }
-
string logshard_oid;
get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
return 0;
}
-int RGWReshard::update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info)
+int RGWReshard::update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info)
{
cls_rgw_reshard_entry entry;
entry.bucket_name = bucket_info.bucket.name;
return ret;
}
- entry.new_instance_id = new_bucket_info.bucket.name + ":" + new_bucket_info.bucket.bucket_id;
-
ret = add(dpp, entry);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
}
for(auto& entry: entries) { // logshard entries
- if(entry.new_instance_id.empty()) {
ldpp_dout(dpp, 20) << __func__ << " resharding " <<
entry.bucket_name << dendl;
goto finished_entry;
}
+ {
RGWBucketReshard br(store, bucket_info, attrs, nullptr);
ret = br.execute(entry.new_num_shards, max_entries, dpp, false, nullptr,
nullptr, this);
entry.bucket_name << " from resharding queue: " <<
cpp_strerror(-ret) << dendl;
return ret;
+ }
}
- } // if new instance id is empty
finished_entry:
int RGWReshard::process_all_logshards(const DoutPrefixProvider *dpp)
{
- if (!store->svc()->zone->can_reshard()) {
- ldpp_dout(dpp, 20) << __func__ << " Resharding is disabled" << dendl;
- return 0;
- }
int ret = 0;
for (int i = 0; i < num_logshards; i++) {