From: Orit Wasserman Date: Mon, 1 May 2017 06:24:19 +0000 (+0300) Subject: rgw: add guard class for bucket index lock X-Git-Tag: v12.1.0~276^2~68 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f384708196a07098cb96fe0a6203da1082074447;p=ceph-ci.git rgw: add guard class for bucket index lock Signed-off-by: Orit Wasserman --- diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 4b0774eebbd..fde3a0a3149 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -5391,7 +5391,10 @@ int RGWRados::init_bucket_index(RGWBucketInfo& bucket_info, int num_shards) librados::IoCtx index_ctx; // context for new bucket /* handle on going bucket resharding */ - int r = reshard->block_while_resharding(bucket_info.bucket.oid); + BucketIndexLockGuard guard(cct, this, bucket_info.bucket.bucket_id, bucket_info.bucket.oid, + reshard_pool_ctx); + + int r = reshard->block_while_resharding(bucket_info.bucket.oid, guard); if (r < 0) { return r; } @@ -5399,7 +5402,6 @@ int RGWRados::init_bucket_index(RGWBucketInfo& bucket_info, int num_shards) string dir_oid = dir_oid_prefix; r = open_bucket_index_ctx(bucket_info, index_ctx); if (r < 0) { - reshard->unlock_bucket_index(bucket_info.bucket.oid); return r; } @@ -5408,11 +5410,7 @@ int RGWRados::init_bucket_index(RGWBucketInfo& bucket_info, int num_shards) map bucket_objs; get_bucket_index_objects(dir_oid, num_shards, bucket_objs); - r = CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); - - reshard->unlock_bucket_index(bucket_info.bucket.oid); - - return r; + return CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); } void RGWRados::create_bucket_id(string *bucket_id) @@ -8229,20 +8227,20 @@ int RGWRados::bucket_check_index(RGWBucketInfo& bucket_info, map bucket_objs_ret; /* handle on going bucket resharding */ - int ret = reshard->block_while_resharding(bucket_info.bucket.oid); + BucketIndexLockGuard guard(cct, this, bucket_info.bucket.bucket_id, bucket_info.bucket.oid, + reshard_pool_ctx); + int ret = reshard->block_while_resharding(bucket_info.bucket.oid, guard); if (ret < 0) { return ret; } ret = open_bucket_index(bucket_info, index_ctx, oids, bucket_objs_ret); if (ret < 0) { - reshard->unlock_bucket_index(bucket_info.bucket.oid); return ret; } ret = CLSRGWIssueBucketCheck(index_ctx, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)(); if (ret < 0) { - reshard->unlock_bucket_index(bucket_info.bucket.oid); return ret; } @@ -8262,25 +8260,21 @@ int RGWRados::bucket_rebuild_index(RGWBucketInfo& bucket_info) map bucket_objs; /* handle on going bucket resharding */ - int r = reshard->block_while_resharding(bucket_info.bucket.oid); + BucketIndexLockGuard guard(cct, this, bucket_info.bucket.bucket_id, bucket_info.bucket.oid, + reshard_pool_ctx); + int r = reshard->block_while_resharding(bucket_info.bucket.oid, guard); if (r < 0) { return r; } r = open_bucket_index(bucket_info, index_ctx, bucket_objs); if (r < 0) { - reshard->unlock_bucket_index(bucket_info.bucket.oid); return r; } - r = CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); - - reshard->unlock_bucket_index(bucket_info.bucket.oid); - - return r; + return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); } - int RGWRados::defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj) { RGWObjectCtx *rctx = static_cast(ctx); @@ -9542,7 +9536,9 @@ int RGWRados::Bucket::UpdateIndex::prepare(RGWModifyOp op, const string *write_t BucketShard *bs; /* handle on going bucket resharding */ - int ret = store->reshard->block_while_resharding(target->get_bucket().oid); + BucketIndexLockGuard guard(store->ctx(), store, target->get_bucket().bucket_id, target->get_bucket().oid, + store->reshard_pool_ctx); + int ret = store->reshard->block_while_resharding(target->get_bucket().oid, guard); if (ret < 0) { return ret; } @@ -9550,7 +9546,6 @@ int RGWRados::Bucket::UpdateIndex::prepare(RGWModifyOp op, const string *write_t ret = get_bucket_shard(&bs); if (ret < 0) { ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl; - store->reshard->unlock_bucket_index(target->get_bucket().oid); return ret; } @@ -9564,12 +9559,10 @@ int RGWRados::Bucket::UpdateIndex::prepare(RGWModifyOp op, const string *write_t int r = store->cls_obj_prepare_op(*bs, op, optag, obj, bilog_flags, zones_trace); if (r < 0) { - store->reshard->unlock_bucket_index(target->get_bucket().oid); return r; } prepared = true; - store->reshard->unlock_bucket_index(target->get_bucket().oid); return 0; } @@ -9588,7 +9581,9 @@ int RGWRados::Bucket::UpdateIndex::complete(int64_t poolid, uint64_t epoch, BucketShard *bs; /* handle on going bucket resharding */ - int ret = store->reshard->block_while_resharding(target->get_bucket().oid); + BucketIndexLockGuard guard(store->ctx(), store, target->get_bucket().bucket_id, target->get_bucket().oid, + store->reshard_pool_ctx); + int ret = store->reshard->block_while_resharding(target->get_bucket().oid, guard); if (ret < 0) { return ret; } @@ -9596,7 +9591,6 @@ int RGWRados::Bucket::UpdateIndex::complete(int64_t poolid, uint64_t epoch, ret = get_bucket_shard(&bs); if (ret < 0) { ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl; - store->reshard->unlock_bucket_index(target->get_bucket().oid); return ret; } @@ -9627,8 +9621,6 @@ int RGWRados::Bucket::UpdateIndex::complete(int64_t poolid, uint64_t epoch, lderr(store->ctx()) << "ERROR: failed writing data log" << dendl; } - store->reshard->unlock_bucket_index(target->get_bucket().oid); - return ret; } @@ -9643,7 +9635,9 @@ int RGWRados::Bucket::UpdateIndex::complete_del(int64_t poolid, uint64_t epoch, BucketShard *bs; /* handle on going bucket resharding */ - int ret = store->reshard->block_while_resharding(target->get_bucket().oid); + BucketIndexLockGuard guard(store->ctx(), store, target->get_bucket().bucket_id, target->get_bucket().oid, + store->reshard_pool_ctx); + int ret = store->reshard->block_while_resharding(target->get_bucket().oid, guard); if (ret < 0) { return ret; } @@ -9651,7 +9645,6 @@ int RGWRados::Bucket::UpdateIndex::complete_del(int64_t poolid, uint64_t epoch, ret = get_bucket_shard(&bs); if (ret < 0) { ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl; - store->reshard->unlock_bucket_index(target->get_bucket().oid); return ret; } @@ -9662,7 +9655,6 @@ int RGWRados::Bucket::UpdateIndex::complete_del(int64_t poolid, uint64_t epoch, lderr(store->ctx()) << "ERROR: failed writing data log" << dendl; } - store->reshard->unlock_bucket_index(target->get_bucket().oid); return ret; } @@ -9676,7 +9668,9 @@ int RGWRados::Bucket::UpdateIndex::cancel() BucketShard *bs; /* handle on going bucket resharding */ - int ret = store->reshard->block_while_resharding(target->get_bucket().oid); + BucketIndexLockGuard guard(store->ctx(), store, target->get_bucket().bucket_id, target->get_bucket().oid, + store->reshard_pool_ctx); + int ret = store->reshard->block_while_resharding(target->get_bucket().oid, guard); if (ret < 0) { return ret; } @@ -9684,7 +9678,6 @@ int RGWRados::Bucket::UpdateIndex::cancel() ret = get_bucket_shard(&bs); if (ret < 0) { ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl; - store->reshard->unlock_bucket_index(target->get_bucket().oid); return ret; } @@ -9700,7 +9693,6 @@ int RGWRados::Bucket::UpdateIndex::cancel() lderr(store->ctx()) << "ERROR: failed writing data log" << dendl; } - store->reshard->unlock_bucket_index(target->get_bucket().oid); return ret; } @@ -10501,7 +10493,9 @@ int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjStat } /* handle on going bucket resharding */ - r = reshard->block_while_resharding(bucket_info.bucket.oid); + BucketIndexLockGuard guard(cct, this, bucket_info.bucket.bucket_id, bucket_info.bucket.oid, + reshard_pool_ctx); + r = reshard->block_while_resharding(bucket_info.bucket.oid, guard); if (r < 0) { return r; } @@ -10510,7 +10504,6 @@ int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjStat r = bs.init(obj_instance.bucket, obj_instance); if (r < 0) { ldout(cct, 5) << "bs.init() returned ret=" << r << dendl; - reshard->unlock_bucket_index(bucket_info.bucket.oid); return r; } @@ -10523,12 +10516,9 @@ int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjStat } cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance); - r = cls_rgw_bucket_link_olh(bs.index_ctx, bs.bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch, - unmod_since, high_precision_time, - get_zone().log_data, zones_trace); - - reshard->unlock_bucket_index(bucket_info.bucket.oid); - return r; + return cls_rgw_bucket_link_olh(bs.index_ctx, bs.bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch, + unmod_since, high_precision_time, + get_zone().log_data, zones_trace); } void RGWRados::bucket_index_guard_olh_op(RGWObjState& olh_state, ObjectOperation& op) @@ -11949,7 +11939,9 @@ int RGWRados::trim_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id, stri map bucket_objs; /* handle on going bucket resharding */ - int r = reshard->block_while_resharding(bucket_info.bucket.oid); + BucketIndexLockGuard guard(cct, this, bucket_info.bucket.bucket_id, bucket_info.bucket.oid, + reshard_pool_ctx); + int r = reshard->block_while_resharding(bucket_info.bucket.oid, guard); if (r < 0) { return r; } @@ -11959,26 +11951,22 @@ int RGWRados::trim_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id, stri r = open_bucket_index(bucket_info, index_ctx, bucket_objs, shard_id); if (r < 0) { - reshard->unlock_bucket_index(bucket_info.bucket.oid); return r; } r = start_marker_mgr.from_string(start_marker, shard_id); if (r < 0) { - reshard->unlock_bucket_index(bucket_info.bucket.oid); return r; } r = end_marker_mgr.from_string(end_marker, shard_id); if (r < 0) { - reshard->unlock_bucket_index(bucket_info.bucket.oid); return r; } - r = CLSRGWIssueBILogTrim(index_ctx, start_marker_mgr, end_marker_mgr, bucket_objs, - cct->_conf->rgw_bucket_index_max_aio)(); + return CLSRGWIssueBILogTrim(index_ctx, start_marker_mgr, end_marker_mgr, bucket_objs, + cct->_conf->rgw_bucket_index_max_aio)(); - reshard->unlock_bucket_index(bucket_info.bucket.oid); return r; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 0f1e990a21c..daf7be23888 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -2181,6 +2181,7 @@ class RGWRados friend class RGWStateLog; friend class RGWReplicaLogger; friend class RGWReshard; + friend class BucketIndexLockGuard; /** Open the pool used as root for this gateway */ int open_root_pool_ctx(); diff --git a/src/rgw/rgw_reshard.cc b/src/rgw/rgw_reshard.cc index 896d9b46205..1ba40cd4320 100644 --- a/src/rgw/rgw_reshard.cc +++ b/src/rgw/rgw_reshard.cc @@ -100,9 +100,13 @@ int RGWReshard::remove(cls_rgw_reshard_entry& entry) return ret; } +std::string create_bucket_index_lock_name(const string& bucket_instance_id) { + return bucket_instance_lock_name + "." + bucket_instance_id; +} + int RGWReshard::set_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry) { - rados::cls::lock::Lock l(reshard_lock_name); + rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id)); if (entry.new_instance_id.empty()) { ldout(cct, 0) << "RGWReshard::" << __func__ << " missing new bucket instance id" << dendl; @@ -155,7 +159,7 @@ done: int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry) { - rados::cls::lock::Lock l(bucket_instance_lock_name); + rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id)); int ret = l.lock_exclusive(&store->reshard_pool_ctx, bucket_instance_oid); if (ret == -EBUSY) { @@ -214,18 +218,18 @@ int RGWReshard::unlock_bucket_index(const string& oid) return 0; } - const int num_retries = 10; const int default_reshard_sleep_duration = 30; -int RGWReshard::block_while_resharding(const string& bucket_instance_oid) +int RGWReshard::block_while_resharding(const string& bucket_instance_oid, + BucketIndexLockGuard& guard) { int ret = 0; cls_rgw_bucket_instance_entry entry; bool resharding = false; for (int i=0; i< num_retries;i++) { - ret = lock_bucket_index_shared(bucket_instance_oid); + ret = guard.lock(); if (ret < 0) { return ret; } @@ -239,7 +243,8 @@ int RGWReshard::block_while_resharding(const string& bucket_instance_oid) } if (resharding) { - ret = unlock_bucket_index(bucket_instance_oid); + /* clear resharding uses the same lock */ + ret = guard.unlock(); if (ret < 0) { return ret; } @@ -249,9 +254,51 @@ int RGWReshard::block_while_resharding(const string& bucket_instance_oid) } } ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: bucket is still resharding, please retry" << dendl; - ret = unlock_bucket_index(bucket_instance_oid); - if (ret < 0) { + return -EAGAIN; +} + +BucketIndexLockGuard::BucketIndexLockGuard(CephContext* _cct, RGWRados* _store, + const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) : + cct(_cct),store(_store), + l(create_bucket_index_lock_name(bucket_instance_id)), + oid(_oid), io_ctx(_io_ctx),locked(false) +{ +} + +int BucketIndexLockGuard::lock() +{ + if (!locked) { + int ret = l.lock_shared(&store->reshard_pool_ctx, oid); + if (ret == -EBUSY) { + ldout(cct,0) << "RGWReshardLog::add failed to acquire lock on " << oid << dendl; + return 0; + } + if (ret < 0) { + return ret; + } + locked = true; return ret; + } else { + ldout(cct,0) << " % alread lock" << oid << dendl; + return -EBUSY; } - return -EAGAIN; +} + +int BucketIndexLockGuard::unlock() +{ + if (locked) { + int ret = l.unlock(&io_ctx, oid); + if (ret <0) { + ldout(cct, 0) << "failed to unlock " << oid << dendl; + } else { + locked = false; + } + return ret; + } + return 0; +} + +BucketIndexLockGuard::~BucketIndexLockGuard() +{ + unlock(); } diff --git a/src/rgw/rgw_reshard.h b/src/rgw/rgw_reshard.h index 8ddb91109c0..205a7688c14 100644 --- a/src/rgw/rgw_reshard.h +++ b/src/rgw/rgw_reshard.h @@ -12,8 +12,26 @@ class CephContext; class RGWRados; -#define dout_context g_ceph_context -#define dout_subsys ceph_subsys_rgw + +/* gets a locked lock , release it when exiting context */ +class BucketIndexLockGuard +{ + CephContext *cct; + RGWRados *store; + rados::cls::lock::Lock l; + string oid; + librados::IoCtx io_ctx; + bool locked; +public: + BucketIndexLockGuard(CephContext* cct, RGWRados* store, const string& bucket_instance_id, + const string& oid, const librados::IoCtx& io_ctx); + /* unlocks the lock */ + ~BucketIndexLockGuard(); +protected: + friend class RGWReshard; + int lock(); + int unlock(); +}; class RGWReshard { CephContext *cct; @@ -22,6 +40,9 @@ class RGWReshard { int max_jobs; rados::cls::lock::Lock instance_lock; + int lock_bucket_index_shared(const string& oid); + int unlock_bucket_index(const string& oid); + public: RGWReshard(CephContext* cct, RGWRados* _store); int add(cls_rgw_reshard_entry& entry); @@ -30,6 +51,11 @@ class RGWReshard { int list(string& marker, uint32_t max, list& entries, bool& is_truncated); int set_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry); int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry); + /* + if succefull, keeps the bucket index locked. It will be unlocked + in the guard dtor. + */ + int block_while_resharding(const string& bucket_instance_oid, BucketIndexLockGuard& guard); }; #endif