librados::IoCtx index_ctx; // context for new bucket
/* handle on going bucket resharding */
- int r = reshard->block_while_resharding(bucket_info.bucket.oid);
+ BucketIndexLockGuard guard(cct, this, bucket_info.bucket.bucket_id, bucket_info.bucket.oid,
+ reshard_pool_ctx);
+
+ int r = reshard->block_while_resharding(bucket_info.bucket.oid, guard);
if (r < 0) {
return r;
}
string dir_oid = dir_oid_prefix;
r = open_bucket_index_ctx(bucket_info, index_ctx);
if (r < 0) {
- reshard->unlock_bucket_index(bucket_info.bucket.oid);
return r;
}
map<int, string> bucket_objs;
get_bucket_index_objects(dir_oid, num_shards, bucket_objs);
- r = CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
-
- reshard->unlock_bucket_index(bucket_info.bucket.oid);
-
- return r;
+ return CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
}
void RGWRados::create_bucket_id(string *bucket_id)
map<int, struct rgw_cls_check_index_ret> bucket_objs_ret;
/* handle on going bucket resharding */
- int ret = reshard->block_while_resharding(bucket_info.bucket.oid);
+ BucketIndexLockGuard guard(cct, this, bucket_info.bucket.bucket_id, bucket_info.bucket.oid,
+ reshard_pool_ctx);
+ int ret = reshard->block_while_resharding(bucket_info.bucket.oid, guard);
if (ret < 0) {
return ret;
}
ret = open_bucket_index(bucket_info, index_ctx, oids, bucket_objs_ret);
if (ret < 0) {
- reshard->unlock_bucket_index(bucket_info.bucket.oid);
return ret;
}
ret = CLSRGWIssueBucketCheck(index_ctx, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)();
if (ret < 0) {
- reshard->unlock_bucket_index(bucket_info.bucket.oid);
return ret;
}
map<int, string> bucket_objs;
/* handle on going bucket resharding */
- int r = reshard->block_while_resharding(bucket_info.bucket.oid);
+ BucketIndexLockGuard guard(cct, this, bucket_info.bucket.bucket_id, bucket_info.bucket.oid,
+ reshard_pool_ctx);
+ int r = reshard->block_while_resharding(bucket_info.bucket.oid, guard);
if (r < 0) {
return r;
}
r = open_bucket_index(bucket_info, index_ctx, bucket_objs);
if (r < 0) {
- reshard->unlock_bucket_index(bucket_info.bucket.oid);
return r;
}
- r = CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
-
- reshard->unlock_bucket_index(bucket_info.bucket.oid);
-
- return r;
+ return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
}
-
int RGWRados::defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj)
{
RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
BucketShard *bs;
/* handle on going bucket resharding */
- int ret = store->reshard->block_while_resharding(target->get_bucket().oid);
+ BucketIndexLockGuard guard(store->ctx(), store, target->get_bucket().bucket_id, target->get_bucket().oid,
+ store->reshard_pool_ctx);
+ int ret = store->reshard->block_while_resharding(target->get_bucket().oid, guard);
if (ret < 0) {
return ret;
}
ret = get_bucket_shard(&bs);
if (ret < 0) {
ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
- store->reshard->unlock_bucket_index(target->get_bucket().oid);
return ret;
}
int r = store->cls_obj_prepare_op(*bs, op, optag, obj, bilog_flags, zones_trace);
if (r < 0) {
- store->reshard->unlock_bucket_index(target->get_bucket().oid);
return r;
}
prepared = true;
- store->reshard->unlock_bucket_index(target->get_bucket().oid);
return 0;
}
BucketShard *bs;
/* handle on going bucket resharding */
- int ret = store->reshard->block_while_resharding(target->get_bucket().oid);
+ BucketIndexLockGuard guard(store->ctx(), store, target->get_bucket().bucket_id, target->get_bucket().oid,
+ store->reshard_pool_ctx);
+ int ret = store->reshard->block_while_resharding(target->get_bucket().oid, guard);
if (ret < 0) {
return ret;
}
ret = get_bucket_shard(&bs);
if (ret < 0) {
ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
- store->reshard->unlock_bucket_index(target->get_bucket().oid);
return ret;
}
lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
}
- store->reshard->unlock_bucket_index(target->get_bucket().oid);
-
return ret;
}
BucketShard *bs;
/* handle on going bucket resharding */
- int ret = store->reshard->block_while_resharding(target->get_bucket().oid);
+ BucketIndexLockGuard guard(store->ctx(), store, target->get_bucket().bucket_id, target->get_bucket().oid,
+ store->reshard_pool_ctx);
+ int ret = store->reshard->block_while_resharding(target->get_bucket().oid, guard);
if (ret < 0) {
return ret;
}
ret = get_bucket_shard(&bs);
if (ret < 0) {
ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
- store->reshard->unlock_bucket_index(target->get_bucket().oid);
return ret;
}
lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
}
- store->reshard->unlock_bucket_index(target->get_bucket().oid);
return ret;
}
BucketShard *bs;
/* handle on going bucket resharding */
- int ret = store->reshard->block_while_resharding(target->get_bucket().oid);
+ BucketIndexLockGuard guard(store->ctx(), store, target->get_bucket().bucket_id, target->get_bucket().oid,
+ store->reshard_pool_ctx);
+ int ret = store->reshard->block_while_resharding(target->get_bucket().oid, guard);
if (ret < 0) {
return ret;
}
ret = get_bucket_shard(&bs);
if (ret < 0) {
ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
- store->reshard->unlock_bucket_index(target->get_bucket().oid);
return ret;
}
lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
}
- store->reshard->unlock_bucket_index(target->get_bucket().oid);
return ret;
}
}
/* handle on going bucket resharding */
- r = reshard->block_while_resharding(bucket_info.bucket.oid);
+ BucketIndexLockGuard guard(cct, this, bucket_info.bucket.bucket_id, bucket_info.bucket.oid,
+ reshard_pool_ctx);
+ r = reshard->block_while_resharding(bucket_info.bucket.oid, guard);
if (r < 0) {
return r;
}
r = bs.init(obj_instance.bucket, obj_instance);
if (r < 0) {
ldout(cct, 5) << "bs.init() returned ret=" << r << dendl;
- reshard->unlock_bucket_index(bucket_info.bucket.oid);
return r;
}
}
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
- r = cls_rgw_bucket_link_olh(bs.index_ctx, bs.bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
- unmod_since, high_precision_time,
- get_zone().log_data, zones_trace);
-
- reshard->unlock_bucket_index(bucket_info.bucket.oid);
- return r;
+ return cls_rgw_bucket_link_olh(bs.index_ctx, bs.bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
+ unmod_since, high_precision_time,
+ get_zone().log_data, zones_trace);
}
void RGWRados::bucket_index_guard_olh_op(RGWObjState& olh_state, ObjectOperation& op)
map<int, string> bucket_objs;
/* handle on going bucket resharding */
- int r = reshard->block_while_resharding(bucket_info.bucket.oid);
+ BucketIndexLockGuard guard(cct, this, bucket_info.bucket.bucket_id, bucket_info.bucket.oid,
+ reshard_pool_ctx);
+ int r = reshard->block_while_resharding(bucket_info.bucket.oid, guard);
if (r < 0) {
return r;
}
r = open_bucket_index(bucket_info, index_ctx, bucket_objs, shard_id);
if (r < 0) {
- reshard->unlock_bucket_index(bucket_info.bucket.oid);
return r;
}
r = start_marker_mgr.from_string(start_marker, shard_id);
if (r < 0) {
- reshard->unlock_bucket_index(bucket_info.bucket.oid);
return r;
}
r = end_marker_mgr.from_string(end_marker, shard_id);
if (r < 0) {
- reshard->unlock_bucket_index(bucket_info.bucket.oid);
return r;
}
- r = CLSRGWIssueBILogTrim(index_ctx, start_marker_mgr, end_marker_mgr, bucket_objs,
- cct->_conf->rgw_bucket_index_max_aio)();
+ return CLSRGWIssueBILogTrim(index_ctx, start_marker_mgr, end_marker_mgr, bucket_objs,
+ cct->_conf->rgw_bucket_index_max_aio)();
- reshard->unlock_bucket_index(bucket_info.bucket.oid);
return r;
}
return ret;
}
+std::string create_bucket_index_lock_name(const string& bucket_instance_id) {
+ return bucket_instance_lock_name + "." + bucket_instance_id;
+}
+
int RGWReshard::set_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
{
- rados::cls::lock::Lock l(reshard_lock_name);
+ rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id));
if (entry.new_instance_id.empty()) {
ldout(cct, 0) << "RGWReshard::" << __func__ << " missing new bucket instance id" << dendl;
int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
{
- rados::cls::lock::Lock l(bucket_instance_lock_name);
+ rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id));
int ret = l.lock_exclusive(&store->reshard_pool_ctx, bucket_instance_oid);
if (ret == -EBUSY) {
return 0;
}
-
const int num_retries = 10;
const int default_reshard_sleep_duration = 30;
-int RGWReshard::block_while_resharding(const string& bucket_instance_oid)
+int RGWReshard::block_while_resharding(const string& bucket_instance_oid,
+ BucketIndexLockGuard& guard)
{
int ret = 0;
cls_rgw_bucket_instance_entry entry;
bool resharding = false;
for (int i=0; i< num_retries;i++) {
- ret = lock_bucket_index_shared(bucket_instance_oid);
+ ret = guard.lock();
if (ret < 0) {
return ret;
}
}
if (resharding) {
- ret = unlock_bucket_index(bucket_instance_oid);
+ /* clear resharding uses the same lock */
+ ret = guard.unlock();
if (ret < 0) {
return ret;
}
}
}
ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
- ret = unlock_bucket_index(bucket_instance_oid);
- if (ret < 0) {
+ return -EAGAIN;
+}
+
+BucketIndexLockGuard::BucketIndexLockGuard(CephContext* _cct, RGWRados* _store,
+ const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) :
+ cct(_cct),store(_store),
+ l(create_bucket_index_lock_name(bucket_instance_id)),
+ oid(_oid), io_ctx(_io_ctx),locked(false)
+{
+}
+
+int BucketIndexLockGuard::lock()
+{
+ if (!locked) {
+ int ret = l.lock_shared(&store->reshard_pool_ctx, oid);
+ if (ret == -EBUSY) {
+ ldout(cct,0) << "RGWReshardLog::add failed to acquire lock on " << oid << dendl;
+ return 0;
+ }
+ if (ret < 0) {
+ return ret;
+ }
+ locked = true;
return ret;
+ } else {
+ ldout(cct,0) << " % alread lock" << oid << dendl;
+ return -EBUSY;
}
- return -EAGAIN;
+}
+
+int BucketIndexLockGuard::unlock()
+{
+ if (locked) {
+ int ret = l.unlock(&io_ctx, oid);
+ if (ret <0) {
+ ldout(cct, 0) << "failed to unlock " << oid << dendl;
+ } else {
+ locked = false;
+ }
+ return ret;
+ }
+ return 0;
+}
+
+BucketIndexLockGuard::~BucketIndexLockGuard()
+{
+ unlock();
}