}
-int RGWRados::Bucket::UpdateIndex::guard_reshard(BucketShard *pbs, std::function<int(BucketShard *)> call)
+int RGWRados::Bucket::UpdateIndex::guard_reshard(BucketShard **pbs, std::function<int(BucketShard *)> call)
{
RGWRados *store = target->get_store();
BucketShard *bs;
RGWRados *store = target->get_store();
BucketShard *bs;
- ret = guard_reshard(&bs, [&](BucketShard *bs) -> int {
+ int ret = guard_reshard(&bs, [&](BucketShard *bs) -> int {
return store->cls_obj_complete_cancel(*bs, optag, obj, bilog_flags, zones_trace);
});
return ret;
}
+int RGWRados::guard_reshard(BucketShard *bs, const rgw_obj& obj_instance, std::function<int(BucketShard *)> call)
+{
+ rgw_obj obj;
+ const rgw_obj *pobj = &obj_instance;
+ int r;
+
+ for (int i = 0; i < NUM_RESHARD_RETRIES; ++i) {
+ r = bs->init(pobj->bucket, *pobj);
+ if (r < 0) {
+ ldout(cct, 5) << "bs.init() returned ret=" << r << dendl;
+ return r;
+ }
+ r = call(bs);
+ if (r != -ERR_BUSY_RESHARDING) {
+ break;
+ }
+ ldout(cct, 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
+ RGWReshard reshard(this);
+ string new_bucket_id;
+ r = reshard.block_while_resharding(bs, &new_bucket_id);
+ if (r == -ERR_BUSY_RESHARDING) {
+ continue;
+ }
+ if (r < 0) {
+ return r;
+ }
+ ldout(cct, 20) << "reshard completion identified, new_bucket_id=" << new_bucket_id << dendl;
+ i = 0; /* resharding is finished, make sure we can retry */
+
+ obj = *pobj;
+ obj.bucket.update_bucket_id(new_bucket_id);
+ pobj = &obj;
+ }
+
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+}
+
int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjState& olh_state, const rgw_obj& obj_instance,
bool delete_marker,
const string& op_tag,
return r;
}
- BucketShard bs(this);
- r = bs.init(obj_instance.bucket, obj_instance);
- if (r < 0) {
- ldout(cct, 5) << "bs.init() returned ret=" << r << dendl;
- return r;
- }
-
rgw_zone_set zones_trace;
if (_zones_trace) {
zones_trace = *_zones_trace;
zones_trace.insert(get_zone().id);
}
+ BucketShard bs(this);
+
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
- return cls_rgw_bucket_link_olh(bs.index_ctx, bs.bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
- unmod_since, high_precision_time,
- get_zone().log_data, zones_trace);
+ r = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int {
+ librados::ObjectWriteOperation op;
+ cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+ return cls_rgw_bucket_link_olh(bs->index_ctx, op,
+ bs->bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
+ unmod_since, high_precision_time,
+ get_zone().log_data, zones_trace);
+ });
+ if (r < 0) {
+ ldout(cct, 20) << "cls_rgw_bucket_link_olh() returned r=" << r << dendl;
+ return r;
+ }
+
+ return 0;
}
void RGWRados::bucket_index_guard_olh_op(RGWObjState& olh_state, ObjectOperation& op)
return r;
}
- BucketShard bs(this);
- int ret = bs.init(obj_instance.bucket, obj_instance);
- if (ret < 0) {
- ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
- return ret;
- }
-
rgw_zone_set zones_trace;
if (_zones_trace) {
zones_trace = *_zones_trace;
}
zones_trace.insert(get_zone().id);
+ BucketShard bs(this);
+
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
- ret = cls_rgw_bucket_unlink_instance(bs.index_ctx, bs.bucket_obj, key, op_tag, olh_tag, olh_epoch, get_zone().log_data, zones_trace);
- if (ret < 0) {
- return ret;
+ r = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int {
+ librados::ObjectWriteOperation op;
+ cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+ return cls_rgw_bucket_unlink_instance(bs->index_ctx, op, bs->bucket_obj, key, op_tag,
+ olh_tag, olh_epoch, get_zone().log_data, zones_trace);
+ });
+ if (r < 0) {
+ ldout(cct, 20) << "cls_rgw_bucket_link_olh() returned r=" << r << dendl;
+ return r;
}
return 0;
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());
- ObjectReadOperation op;
-
- ret = cls_rgw_get_olh_log(bs.index_ctx, bs.bucket_obj, op, key, ver_marker, olh_tag, log, is_truncated);
- if (ret < 0)
+ ret = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int {
+ ObjectReadOperation op;
+ cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+ return cls_rgw_get_olh_log(bs->index_ctx, bs->bucket_obj, op,
+ key, ver_marker, olh_tag, log, is_truncated);
+ });
+ if (ret < 0) {
+ ldout(cct, 20) << "cls_rgw_get_olh_log() returned r=" << r << dendl;
return ret;
+ }
return 0;
}
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());
- ObjectWriteOperation op;
-
- cls_rgw_trim_olh_log(op, key, ver, olh_tag);
-
- ret = bs.index_ctx.operate(bs.bucket_obj, &op);
- if (ret < 0)
+ ret = guard_reshard(&bs, obj_instance, [&](BucketShard *pbs) -> int {
+ ObjectWriteOperation op;
+ cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+ cls_rgw_trim_olh_log(op, key, ver, olh_tag);
+ return pbs->index_ctx.operate(pbs->bucket_obj, &op);
+ });
+ if (ret < 0) {
+ ldout(cct, 20) << "cls_rgw_trim_olh_log() returned r=" << ret << dendl;
return ret;
+ }
return 0;
}
}
BucketShard bs(this);
- int ret = bs.init(obj_instance.bucket, obj_instance);
- if (ret < 0) {
- ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
- return ret;
- }
string olh_tag(state.olh_tag.c_str(), state.olh_tag.length());
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());
- ret = cls_rgw_clear_olh(bs.index_ctx, bs.bucket_obj, key, olh_tag);
+ int ret = guard_reshard(&bs, obj_instance, [&](BucketShard *pbs) -> int {
+ ObjectWriteOperation op;
+ cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+ return cls_rgw_clear_olh(pbs->index_ctx, op, pbs->bucket_obj, key, olh_tag);
+ });
if (ret < 0) {
ldout(cct, 5) << "cls_rgw_clear_olh() returned ret=" << ret << dendl;
return ret;