ldout(store->ctx(), 20) << __func__ << "(): handling completion for key=" << c->key << dendl;
RGWRados::BucketShard bs(store);
+ RGWBucketInfo bucket_info;
- int r = bs.init(c->obj.bucket, c->obj);
+ int r = bs.init(c->obj.bucket, c->obj, &bucket_info);
if (r < 0) {
ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to initialize BucketShard, obj=" << c->obj << " r=" << r << dendl;
/* not much to do */
continue;
}
- r = store->guard_reshard(&bs, c->obj, [&](RGWRados::BucketShard *bs) -> int {
- librados::ObjectWriteOperation o;
- cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
- cls_rgw_bucket_complete_op(o, c->op, c->tag, c->ver, c->key, c->dir_meta, &c->remove_objs,
- c->log_op, c->bilog_op, &c->zones_trace);
-
- return bs->index_ctx.operate(bs->bucket_obj, &o);
+ r = store->guard_reshard(&bs, c->obj, bucket_info,
+ [&](RGWRados::BucketShard *bs) -> int {
+ librados::ObjectWriteOperation o;
+ cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
+ cls_rgw_bucket_complete_op(o, c->op, c->tag, c->ver, c->key, c->dir_meta, &c->remove_objs,
+ c->log_op, c->bilog_op, &c->zones_trace);
+ return bs->index_ctx.operate(bs->bucket_obj, &o);
});
if (r < 0) {
ldout(cct, 0) << "ERROR: " << __func__ << "(): bucket index completion failed, obj=" << c->obj << " r=" << r << dendl;
return 0;
}
-int RGWRados::BucketShard::init(const rgw_bucket& _bucket, const rgw_obj& obj)
+int RGWRados::BucketShard::init(const rgw_bucket& _bucket,
+ const rgw_obj& obj,
+ RGWBucketInfo* bucket_info_out)
{
bucket = _bucket;
RGWObjectCtx obj_ctx(store);
RGWBucketInfo bucket_info;
- int ret = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
+ RGWBucketInfo* bucket_info_p =
+ bucket_info_out ? bucket_info_out : &bucket_info;
+
+ int ret = store->get_bucket_instance_info(obj_ctx, bucket, *bucket_info_p, NULL, NULL);
if (ret < 0) {
return ret;
}
- ret = store->open_bucket_index_shard(bucket_info, index_ctx, obj.get_hash_object(), &bucket_obj, &shard_id);
+ ret = store->open_bucket_index_shard(*bucket_info_p, index_ctx, obj.get_hash_object(), &bucket_obj, &shard_id);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWRados::BucketShard::init(const rgw_bucket& _bucket, int sid)
+int RGWRados::BucketShard::init(const rgw_bucket& _bucket,
+ int sid,
+ RGWBucketInfo* bucket_info_out)
{
bucket = _bucket;
shard_id = sid;
RGWObjectCtx obj_ctx(store);
RGWBucketInfo bucket_info;
- int ret = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
+ RGWBucketInfo* bucket_info_p =
+ bucket_info_out ? bucket_info_out : &bucket_info;
+ int ret = store->get_bucket_instance_info(obj_ctx, bucket, *bucket_info_p, NULL, NULL);
if (ret < 0) {
return ret;
}
- ret = store->open_bucket_index_shard(bucket_info, index_ctx, shard_id, &bucket_obj);
+ ret = store->open_bucket_index_shard(*bucket_info_p, index_ctx, shard_id, &bucket_obj);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
return ret;
return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
}
-int RGWRados::bucket_set_reshard(RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry)
+int RGWRados::bucket_set_reshard(const RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry)
{
librados::IoCtx index_ctx;
map<int, string> bucket_objs;
}
ldout(store->ctx(), 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
string new_bucket_id;
- r = store->block_while_resharding(bs, &new_bucket_id);
+ r = store->block_while_resharding(bs, &new_bucket_id, target->bucket_info);
if (r == -ERR_BUSY_RESHARDING) {
continue;
}
}
}
- int r = guard_reshard(nullptr, [&](BucketShard *bs) -> int {
- return store->cls_obj_prepare_op(*bs, op, optag, obj, bilog_flags, zones_trace);
- });
+ int r = guard_reshard(nullptr, [&](BucketShard *bs) -> int {
+ return store->cls_obj_prepare_op(*bs, op, optag, obj, bilog_flags, zones_trace);
+ });
if (r < 0) {
return r;
RGWRados *store = target->get_store();
BucketShard *bs;
- int ret = guard_reshard(&bs, [&](BucketShard *bs) -> int {
- return store->cls_obj_complete_cancel(*bs, optag, obj, bilog_flags, zones_trace);
- });
+ int ret = guard_reshard(&bs, [&](BucketShard *bs) -> int {
+ return store->cls_obj_complete_cancel(*bs, optag, obj, bilog_flags, zones_trace);
+ });
/*
* need to update data log anyhow, so that whoever follows needs to update its internal markers
return ret;
}
-int RGWRados::guard_reshard(BucketShard *bs, const rgw_obj& obj_instance, std::function<int(BucketShard *)> call)
+int RGWRados::guard_reshard(BucketShard *bs,
+ const rgw_obj& obj_instance,
+ const RGWBucketInfo& bucket_info,
+ std::function<int(BucketShard *)> call)
{
rgw_obj obj;
const rgw_obj *pobj = &obj_instance;
int r;
for (int i = 0; i < NUM_RESHARD_RETRIES; ++i) {
- r = bs->init(pobj->bucket, *pobj);
+ r = bs->init(pobj->bucket, *pobj, nullptr /* no RGWBucketInfo */);
if (r < 0) {
ldout(cct, 5) << "bs.init() returned ret=" << r << dendl;
return r;
}
ldout(cct, 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
string new_bucket_id;
- r = block_while_resharding(bs, &new_bucket_id);
+ r = block_while_resharding(bs, &new_bucket_id, bucket_info);
if (r == -ERR_BUSY_RESHARDING) {
continue;
}
return 0;
}
-int RGWRados::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id)
+int RGWRados::block_while_resharding(RGWRados::BucketShard *bs,
+ string *new_bucket_id,
+ const RGWBucketInfo& bucket_info)
{
std::shared_ptr<RGWReshardWait> waiter = reshard_wait;
- return waiter->block_while_resharding(bs, new_bucket_id);
+ return waiter->block_while_resharding(bs, new_bucket_id, bucket_info);
}
int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjState& olh_state, const rgw_obj& obj_instance,
BucketShard bs(this);
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
- r = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int {
- librados::ObjectWriteOperation op;
- cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
- return cls_rgw_bucket_link_olh(bs->index_ctx, op,
- bs->bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
- unmod_since, high_precision_time,
- get_zone().log_data, zones_trace);
+ r = guard_reshard(&bs, obj_instance, bucket_info,
+ [&](BucketShard *bs) -> int {
+ librados::ObjectWriteOperation op;
+ cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+ return cls_rgw_bucket_link_olh(bs->index_ctx, op,
+ bs->bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
+ unmod_since, high_precision_time,
+ get_zone().log_data, zones_trace);
});
if (r < 0) {
ldout(cct, 20) << "cls_rgw_bucket_link_olh() returned r=" << r << dendl;
BucketShard bs(this);
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
- r = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int {
- librados::ObjectWriteOperation op;
- cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
- return cls_rgw_bucket_unlink_instance(bs->index_ctx, op, bs->bucket_obj, key, op_tag,
- olh_tag, olh_epoch, get_zone().log_data, zones_trace);
+ r = guard_reshard(&bs, obj_instance, bucket_info,
+ [&](BucketShard *bs) -> int {
+ librados::ObjectWriteOperation op;
+ cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+ return cls_rgw_bucket_unlink_instance(bs->index_ctx, op, bs->bucket_obj, key, op_tag,
+ olh_tag, olh_epoch, get_zone().log_data, zones_trace);
});
if (r < 0) {
ldout(cct, 20) << "cls_rgw_bucket_link_olh() returned r=" << r << dendl;
}
BucketShard bs(this);
- int ret = bs.init(obj_instance.bucket, obj_instance);
+ int ret =
+ bs.init(obj_instance.bucket, obj_instance, nullptr /* no RGWBucketInfo */);
if (ret < 0) {
ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
return ret;
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());
- ret = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int {
- ObjectReadOperation op;
- cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
- return cls_rgw_get_olh_log(bs->index_ctx, bs->bucket_obj, op,
- key, ver_marker, olh_tag, log, is_truncated);
- });
+ ret = guard_reshard(&bs, obj_instance, bucket_info,
+ [&](BucketShard *bs) -> int {
+ ObjectReadOperation op;
+ cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+ return cls_rgw_get_olh_log(bs->index_ctx, bs->bucket_obj, op,
+ key, ver_marker, olh_tag, log, is_truncated);
+ });
if (ret < 0) {
ldout(cct, 20) << "cls_rgw_get_olh_log() returned r=" << r << dendl;
return ret;
}
BucketShard bs(this);
- int ret = bs.init(obj_instance.bucket, obj_instance);
+ int ret =
+ bs.init(obj_instance.bucket, obj_instance, nullptr /* no RGWBucketInfo */);
if (ret < 0) {
ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
return ret;
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());
- ret = guard_reshard(&bs, obj_instance, [&](BucketShard *pbs) -> int {
- ObjectWriteOperation op;
- cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
- cls_rgw_trim_olh_log(op, key, ver, olh_tag);
- return pbs->index_ctx.operate(pbs->bucket_obj, &op);
+ ret = guard_reshard(&bs, obj_instance, bucket_info,
+ [&](BucketShard *pbs) -> int {
+ ObjectWriteOperation op;
+ cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+ cls_rgw_trim_olh_log(op, key, ver, olh_tag);
+ return pbs->index_ctx.operate(pbs->bucket_obj, &op);
});
if (ret < 0) {
ldout(cct, 20) << "cls_rgw_trim_olh_log() returned r=" << ret << dendl;
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());
- int ret = guard_reshard(&bs, obj_instance, [&](BucketShard *pbs) -> int {
- ObjectWriteOperation op;
- cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
- return cls_rgw_clear_olh(pbs->index_ctx, op, pbs->bucket_obj, key, olh_tag);
+ int ret = guard_reshard(&bs, obj_instance, bucket_info,
+ [&](BucketShard *pbs) -> int {
+ ObjectWriteOperation op;
+ cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+ return cls_rgw_clear_olh(pbs->index_ctx, op, pbs->bucket_obj, key, olh_tag);
});
if (ret < 0) {
ldout(cct, 5) << "cls_rgw_clear_olh() returned ret=" << ret << dendl;
int RGWRados::bi_get(rgw_bucket& bucket, rgw_obj& obj, BIIndexType index_type, rgw_cls_bi_entry *entry)
{
BucketShard bs(this);
- int ret = bs.init(bucket, obj);
+ int ret = bs.init(bucket, obj, nullptr /* no RGWBucketInfo */);
if (ret < 0) {
ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
return ret;
int RGWRados::bi_put(rgw_bucket& bucket, rgw_obj& obj, rgw_cls_bi_entry& entry)
{
BucketShard bs(this);
- int ret = bs.init(bucket, obj);
+ int ret = bs.init(bucket, obj, nullptr /* no RGWBucketInfo */);
if (ret < 0) {
ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
return ret;
{
rgw_obj obj(bucket, obj_name);
BucketShard bs(this);
- int ret = bs.init(bucket, obj);
+ int ret = bs.init(bucket, obj, nullptr /* no RGWBucketInfo */);
if (ret < 0) {
ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
return ret;
int RGWRados::bi_list(rgw_bucket& bucket, int shard_id, const string& filter_obj, const string& marker, uint32_t max, list<rgw_cls_bi_entry> *entries, bool *is_truncated)
{
BucketShard bs(this);
- int ret = bs.init(bucket, shard_id);
+ int ret = bs.init(bucket, shard_id, nullptr /* no RGWBucketInfo */);
if (ret < 0) {
ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
return ret;
aio_completions(_completions)
{
num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
- bs.init(bucket_info.bucket, num_shard);
+ bs.init(bucket_info.bucket, num_shard, nullptr /* no RGWBucketInfo */);
}
int get_num_shard() {
{ }
int RGWBucketReshard::set_resharding_status(RGWRados* store,
- RGWBucketInfo& bucket_info,
+ const RGWBucketInfo& bucket_info,
const string& new_instance_id,
int32_t num_shards,
cls_rgw_reshard_status status)
}
// reshard lock assumes lock is held
-int RGWBucketReshard::clear_resharding()
+int RGWBucketReshard::clear_resharding(RGWRados* store,
+ const RGWBucketInfo& bucket_info)
{
- int ret = clear_index_shard_reshard_status();
+ int ret = clear_index_shard_reshard_status(store, bucket_info);
if (ret < 0) {
ldout(store->ctx(), 0) << "RGWBucketReshard::" << __func__ <<
" ERROR: error clearing reshard status from index shard " <<
}
int RGWBucketReshard::clear_index_shard_reshard_status(RGWRados* store,
- RGWBucketInfo& bucket_info)
+ const RGWBucketInfo& bucket_info)
{
uint32_t num_shards = bucket_info.num_shards;
return 0;
}
-int RGWReshardWait::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id)
+int RGWReshardWait::block_while_resharding(RGWRados::BucketShard *bs,
+ string *new_bucket_id,
+ const RGWBucketInfo& bucket_info)
{
int ret = 0;
cls_rgw_bucket_instance_entry entry;
return 0;
}
ldout(store->ctx(), 20) << "NOTICE: reshard still in progress; " << (i < num_retries - 1 ? "retrying" : "too many retries") << dendl;
- /* needed to unlock as clear resharding uses the same lock */
if (i == num_retries - 1) {
break;
}
+ // If bucket is erroneously marked as resharding (e.g., crash or
+ // other error) then fix it. If we can take the bucket reshard
+ // lock then it means no other resharding should be taking place,
+ // and we're free to clear the flags.
+ {
+ // since we expect to do this rarely, we'll do our work in a
+ // block and erase our work after each try
+
+ RGWObjectCtx obj_ctx(bs->store);
+ const rgw_bucket& b = bs->bucket;
+ std::string bucket_id = b.get_key();
+ RGWBucketReshardLock reshard_lock(bs->store, bucket_info, true);
+ ret = reshard_lock.lock();
+ if (ret < 0) {
+ ldout(store->ctx(), 20) << __func__ <<
+ " INFO: failed to take reshard lock for bucket " <<
+ bucket_id << "; expected if resharding underway" << dendl;
+ } else {
+ ldout(store->ctx(), 10) << __func__ <<
+ " INFO: was able to take reshard lock for bucket " <<
+ bucket_id << dendl;
+ ret = RGWBucketReshard::clear_resharding(bs->store, bucket_info);
+ if (ret < 0) {
+ reshard_lock.unlock();
+ ldout(store->ctx(), 0) << __func__ <<
+ " ERROR: failed to clear resharding flags for bucket " <<
+ bucket_id << dendl;
+ } else {
+ reshard_lock.unlock();
+ ldout(store->ctx(), 5) << __func__ <<
+ " INFO: apparently successfully cleared resharding flags for "
+ "bucket " << bucket_id << dendl;
+ continue; // if we apparently succeed immediately test again
+ } // if clear resharding succeeded
+ } // if taking of lock succeeded
+ } // block to encapsulate recovery from incomplete reshard
+
ret = do_wait();
if (ret < 0) {
ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;