r = store->guard_reshard(this, &bs, c->obj, bucket_info,
[&](RGWRados::BucketShard *bs) -> int {
librados::ObjectWriteOperation o;
+ o.assert_exists(); // bucket index shard must exist
cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
cls_rgw_bucket_complete_op(o, c->op, c->tag, c->ver, c->key, c->dir_meta, &c->remove_objs,
c->log_op, c->bilog_op, &c->zones_trace);
return r;
}
- return CLSRGWIssueSetBucketResharding(index_pool.ioctx(), bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)();
+ r = CLSRGWIssueSetBucketResharding(index_pool.ioctx(), bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)();
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
+ ": unable to issue set bucket resharding, r=" << r << " (" <<
+ cpp_strerror(-r) << ")" << dendl;
+ }
+
+ return r;
}
int RGWRados::defer_gc(const DoutPrefixProvider *dpp, void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj, optional_yield y)
ldpp_dout(dpp, 5) << "failed to get BucketShard object: ret=" << ret << dendl;
return ret;
}
+
r = call(bs);
- if (r != -ERR_BUSY_RESHARDING) {
+ if (r != -ERR_BUSY_RESHARDING && r != -ENOENT) {
break;
}
- ldpp_dout(dpp, 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
- string new_bucket_id;
- r = store->block_while_resharding(bs, &new_bucket_id,
- target->bucket_info, null_yield, dpp);
- if (r == -ERR_BUSY_RESHARDING) {
- continue;
- }
- if (r < 0) {
- return r;
+
+ std::string new_bucket_id;
+
+ // different logic depending whether resharding completed or is
+ // underway
+
+ if (r == -ENOENT) { // case where resharding must have completed
+ ldpp_dout(dpp, 0) <<
+ "NOTICE: resharding operation recently completed, invalidating "
+ "old BucketInfo" << dendl;
+
+ r = store->fetch_new_bucket_id(target->bucket_info,
+ nullptr,
+ new_bucket_id, dpp);
+ if (r == -ENOENT) {
+ // apparently this op raced with a bucket deletion
+ ldpp_dout(dpp, 10) << "WARNING: " << __func__ <<
+ " unable to fetch bucket_id, apparently due to race "
+ "with deletion of bucket: " <<
+ target->bucket_info.bucket.get_key() << dendl;
+ return -ERR_NO_SUCH_BUCKET;
+ } else if (r < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
+ " unable to refresh stale bucket_id after reshard; r=" <<
+ r << dendl;
+ return r;
+ }
+ } else { // must have been resharding at the time
+ ldpp_dout(dpp, 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
+
+ r = store->block_while_resharding(bs, &new_bucket_id,
+ target->bucket_info, null_yield, dpp);
+ if (r == -ERR_BUSY_RESHARDING) {
+ continue;
+ }
+ if (r < 0) {
+ return r;
+ }
+
+ ldpp_dout(dpp, 20) << "reshard completion identified, new_bucket_id=" << new_bucket_id << dendl;
+ i = 0; /* resharding is finished, make sure we can retry */
}
- ldpp_dout(dpp, 20) << "reshard completion identified, new_bucket_id=" << new_bucket_id << dendl;
- i = 0; /* resharding is finished, make sure we can retry */
+
+ // common portion -- finished resharding either way
+
r = target->update_bucket_id(new_bucket_id, dpp);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: update_bucket_id() new_bucket_id=" << new_bucket_id << " returned r=" << r << dendl;
return r;
}
+
invalidate_bs();
} // for loop
return 0;
}
+
+int RGWRados::fetch_new_bucket_id(
+ const RGWBucketInfo& curr_bucket_info,
+ RGWBucketInfo* save_bucket_info, // nullptr -> no save
+ std::string& new_bucket_id,
+ const DoutPrefixProvider* dpp)
+{
+ RGWBucketInfo local_bucket_info; // use if save_bucket_info is null
+ RGWBucketInfo* bip = save_bucket_info ? save_bucket_info : &local_bucket_info;
+ *bip = curr_bucket_info; // copy
+
+ int ret = try_refresh_bucket_info(*bip, nullptr, dpp);
+ if (ret < 0) {
+ return ret;
+ }
+
+ new_bucket_id = bip->bucket.bucket_id;
+ return 0;
+} // fetch_new_bucket_id
+
+
int RGWRados::block_while_resharding(RGWRados::BucketShard *bs,
- string *new_bucket_id,
+ std::string *new_bucket_id,
const RGWBucketInfo& bucket_info,
optional_yield y,
const DoutPrefixProvider *dpp)
int ret = 0;
cls_rgw_bucket_instance_entry entry;
- // since we want to run this recovery code from two distinct places,
- // let's just put it in a lambda so we can easily re-use; if the
- // lambda successfully fetches a new bucket id, it sets
- // new_bucket_id and returns 0, otherwise it returns a negative
- // error code
- auto fetch_new_bucket_id =
- [this, &bucket_info, dpp](const std::string& log_tag,
- std::string* new_bucket_id) -> int {
- RGWBucketInfo fresh_bucket_info = bucket_info;
- int ret = try_refresh_bucket_info(fresh_bucket_info, nullptr, dpp);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << __func__ <<
- " ERROR: failed to refresh bucket info after reshard at " <<
- log_tag << ": " << cpp_strerror(-ret) << dendl;
- return ret;
- }
- *new_bucket_id = fresh_bucket_info.bucket.bucket_id;
- return 0;
- };
-
constexpr int num_retries = 10;
for (int i = 1; i <= num_retries; i++) { // nb: 1-based for loop
auto& ref = bs->bucket_obj.get_ref();
ret = cls_rgw_get_bucket_resharding(ref.pool.ioctx(), ref.obj.oid, &entry);
if (ret == -ENOENT) {
- return fetch_new_bucket_id("get_bucket_resharding_failed", new_bucket_id);
+ ret = fetch_new_bucket_id(bucket_info, nullptr, *new_bucket_id, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
+ " failed to refresh bucket info after reshard when get bucket "
+ "resharding failed, error: " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
} else if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ <<
" ERROR: failed to get bucket resharding : " << cpp_strerror(-ret) <<
}
if (!entry.resharding_in_progress()) {
- return fetch_new_bucket_id("get_bucket_resharding_succeeded",
- new_bucket_id);
+ ret = fetch_new_bucket_id(bucket_info, nullptr, *new_bucket_id, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
+ " failed to refresh bucket info after reshard when get bucket "
+ "resharding succeeded, error: " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
}
ldpp_dout(dpp, 20) << "NOTICE: reshard still in progress; " <<
std::string bucket_id = b.get_key();
RGWBucketReshardLock reshard_lock(this->store, bucket_info, true);
ret = reshard_lock.lock(dpp);
- if (ret < 0) {
+ if (ret == -ENOENT) {
+ continue;
+ } else if (ret < 0) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ <<
": failed to take reshard lock for bucket " <<
bucket_id << "; expected if resharding underway" << dendl;
ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__ <<
": was able to take reshard lock for bucket " <<
bucket_id << dendl;
+
ret = RGWBucketReshard::clear_resharding(dpp, this->store, bucket_info);
- if (ret < 0) {
- reshard_lock.unlock();
+ reshard_lock.unlock();
+
+ if (ret == -ENOENT) {
+ ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ <<
+ " INFO: no need to reset reshard flags; old shards apparently"
+ " removed after successful resharding of bucket " <<
+ bucket_id << dendl;
+ continue;
+ } else if (ret < 0) {
ldpp_dout(dpp, 0) << __PRETTY_FUNCTION__ <<
" ERROR: failed to clear resharding flags for bucket " <<
- bucket_id << dendl;
+ bucket_id << ", " << cpp_strerror(-ret) << dendl;
} else {
- reshard_lock.unlock();
ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ <<
": apparently successfully cleared resharding flags for "
"bucket " << bucket_id << dendl;
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
auto& ref = bs->bucket_obj.get_ref();
librados::ObjectWriteOperation op;
+ op.assert_exists(); // bucket index shard must exist
cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
cls_rgw_bucket_link_olh(op, key, olh_state.olh_tag,
delete_marker, op_tag, meta, olh_epoch,
[&](BucketShard *bs) -> int {
auto& ref = bs->bucket_obj.get_ref();
librados::ObjectWriteOperation op;
+ op.assert_exists(); // bucket index shard must exist
cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
cls_rgw_bucket_unlink_instance(op, key, op_tag,
olh_tag, olh_epoch, svc.zone->get_zone().log_data, zones_trace);
ret = guard_reshard(dpp, &bs, obj_instance, bucket_info,
[&](BucketShard *pbs) -> int {
ObjectWriteOperation op;
+ op.assert_exists(); // bucket index shard must exist
cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
cls_rgw_trim_olh_log(op, key, ver, olh_tag);
return pbs->bucket_obj.operate(dpp, &op, null_yield);
int ret = guard_reshard(dpp, &bs, obj_instance, bucket_info,
[&](BucketShard *pbs) -> int {
ObjectWriteOperation op;
+ op.assert_exists(); // bucket index shard must exist
auto& ref = pbs->bucket_obj.get_ref();
cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
cls_rgw_clear_olh(op, key, olh_tag);
zones_trace.insert(svc.zone->get_zone().id, bs.bucket.get_key());
ObjectWriteOperation o;
+ o.assert_exists(); // bucket index shard must exist
+
cls_rgw_obj_key key(obj.key.get_index_key_name(), obj.key.instance);
cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
cls_rgw_bucket_prepare_op(o, op, tag, key, obj.key.get_loc(), svc.zone->get_zone().log_data, bilog_flags, zones_trace);
list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *_zones_trace)
{
ObjectWriteOperation o;
+ o.assert_exists(); // bucket index shard must exist
+
rgw_bucket_dir_entry_meta dir_meta;
dir_meta = ent.meta;
dir_meta.category = category;