From 7a34a049ae183e965ef5d1fc609aef14128dbe79 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 9 May 2017 13:41:30 -0700 Subject: [PATCH] rgw: split per-bucket resharding logic out from general resharding Signed-off-by: Yehuda Sadeh --- src/cls/rgw/cls_rgw_client.cc | 18 +++++++ src/cls/rgw/cls_rgw_client.h | 10 ++++ src/rgw/rgw_rados.cc | 13 +++++ src/rgw/rgw_rados.h | 2 + src/rgw/rgw_reshard.cc | 93 ++++++++++++++++++++++------------- src/rgw/rgw_reshard.h | 20 +++++++- 6 files changed, 122 insertions(+), 34 deletions(-) diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index f67701ec27874..6e6bdc801ea71 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -894,3 +894,21 @@ void cls_rgw_guard_bucket_resharding(librados::ObjectWriteOperation& op, int ret ::encode(call, in); op.exec("rgw", "guard_bucket_resharding", in); } + +static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid, + const cls_rgw_bucket_instance_entry& entry, + BucketIndexAioManager *manager) { + bufferlist in; + struct cls_rgw_set_bucket_resharding_op call; + call.entry = entry; + ::encode(call, in); + librados::ObjectWriteOperation op; + op.exec("rgw", "set_bucket_resharding", in); + return manager->aio_operate(io_ctx, oid, &op); +} + +int CLSRGWIssueSetBucketResharding::issue_op(int shard_id, const string& oid) +{ + return issue_set_bucket_resharding(io_ctx, oid, entry, &manager); +} + diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 8472cd0b59d12..8db31c4135f90 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -456,6 +456,16 @@ public: CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(dir_headers) {} }; +class CLSRGWIssueSetBucketResharding : public CLSRGWConcurrentIO { + cls_rgw_bucket_instance_entry entry; +protected: + int issue_op(int shard_id, const string& oid) override; +public: + CLSRGWIssueSetBucketResharding(librados::IoCtx& ioc, map& _bucket_objs, + const cls_rgw_bucket_instance_entry& entry, + uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {} +}; + int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx); void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index e835079f3466c..003c1a0db7c9a 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -8250,6 +8250,19 @@ int RGWRados::bucket_rebuild_index(RGWBucketInfo& bucket_info) return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); } +int RGWRados::bucket_set_reshard(RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry) +{ + librados::IoCtx index_ctx; + map bucket_objs; + + int r = open_bucket_index(bucket_info, index_ctx, bucket_objs); + if (r < 0) { + return r; + } + + return CLSRGWIssueSetBucketResharding(index_ctx, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)(); +} + int RGWRados::defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj) { RGWObjectCtx *rctx = static_cast(ctx); diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index daf7be2388898..b79cc209cbc3b 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -2181,6 +2181,7 @@ class RGWRados friend class RGWStateLog; friend class RGWReplicaLogger; friend class RGWReshard; + friend class RGWBucketReshard; friend class BucketIndexLockGuard; /** Open the pool used as root for this gateway */ @@ -3427,6 +3428,7 @@ public: map *existing_stats, map *calculated_stats); int bucket_rebuild_index(RGWBucketInfo& bucket_info); + int bucket_set_reshard(RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry); int remove_objs_from_index(RGWBucketInfo& bucket_info, list& oid_list); int move_rados_obj(librados::IoCtx& src_ioctx, const string& src_oid, const string& src_locator, diff --git a/src/rgw/rgw_reshard.cc b/src/rgw/rgw_reshard.cc index 389d1dcca940d..7c2fb7a0f7ffe 100644 --- a/src/rgw/rgw_reshard.cc +++ b/src/rgw/rgw_reshard.cc @@ -14,6 +14,65 @@ const string reshard_oid = "reshard"; const string reshard_lock_name = "reshard_process"; const string bucket_instance_lock_name = "bucket_instance_lock"; +RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info) : + store(_store), bucket_info(_bucket_info), + reshard_lock(reshard_lock_name) { + const rgw_bucket& b = bucket_info.bucket; + reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id; +} + +int RGWBucketReshard::lock_bucket() +{ +#warning set timeout for guard lock + + int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid); + if (ret < 0) { + ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << " ret=" << ret << dendl; + return ret; + } + return 0; +} + +void RGWBucketReshard::unlock_bucket() +{ + int ret = reshard_lock.unlock(&store->reshard_pool_ctx, reshard_oid); + if (ret < 0) { + ldout(store->ctx(), 0) << "WARNING: RGWReshard::add failed to drop lock on " << reshard_oid << " ret=" << ret << dendl; + } +} + +int RGWBucketReshard::init_resharding(const cls_rgw_reshard_entry& entry) +{ + if (entry.new_instance_id.empty()) { + ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl; + return -EINVAL; + } + + cls_rgw_bucket_instance_entry instance_entry; + instance_entry.new_bucket_instance_id = entry.new_instance_id; + + int ret = store->bucket_set_reshard(bucket_info, instance_entry); + if (ret < 0) { + ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: " + << cpp_strerror(-ret) << dendl; + return ret; + } + return 0; +} + +int RGWBucketReshard::clear_resharding() +{ + cls_rgw_bucket_instance_entry instance_entry; + + int ret = store->bucket_set_reshard(bucket_info, instance_entry); + if (ret < 0) { + ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: " + << cpp_strerror(-ret) << dendl; + return ret; + } + return 0; +} + RGWReshard::RGWReshard(CephContext *_cct, RGWRados* _store):cct(_cct), store(_store), instance_lock(bucket_instance_lock_name) { @@ -104,38 +163,6 @@ std::string create_bucket_index_lock_name(const string& bucket_instance_id) { return bucket_instance_lock_name + "." + bucket_instance_id; } -int RGWReshard::set_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry) -{ - rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id)); - - if (entry.new_instance_id.empty()) { - ldout(cct, 0) << "RGWReshard::" << __func__ << " missing new bucket instance id" << dendl; - return -EEXIST; - } - - int ret = l.lock_exclusive(&store->reshard_pool_ctx, bucket_instance_oid); - if (ret == -EBUSY) { - ldout(cct, 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl; - return 0; - } - if (ret < 0) - return ret; - - cls_rgw_bucket_instance_entry instance_entry; - instance_entry.new_bucket_instance_id = entry.new_instance_id; - - ret = cls_rgw_set_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid, instance_entry); - if (ret < 0) { - ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: cls_rgw_set_bucket_resharding: " - << cpp_strerror(-ret) << dendl; - goto done; - } - -done: - l.unlock(&store->reshard_pool_ctx, bucket_instance_oid); - return ret; -} - int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry) { rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id)); @@ -150,7 +177,7 @@ int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_r entry.new_instance_id.clear(); - ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid); + ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid); l.unlock(&store->reshard_pool_ctx, bucket_instance_oid); return ret; diff --git a/src/rgw/rgw_reshard.h b/src/rgw/rgw_reshard.h index 205a7688c1461..8f8a46df77849 100644 --- a/src/rgw/rgw_reshard.h +++ b/src/rgw/rgw_reshard.h @@ -33,6 +33,25 @@ protected: int unlock(); }; + +class RGWBucketReshard { + RGWRados *store; + RGWBucketInfo bucket_info; + + string reshard_oid; + rados::cls::lock::Lock reshard_lock; + + int lock_bucket(); + void unlock_bucket(); + int init_resharding(const cls_rgw_reshard_entry& entry); + int clear_resharding(); +public: + RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info); + + int reshard(); + int abort_reshard(); +}; + class RGWReshard { CephContext *cct; RGWRados *store; @@ -49,7 +68,6 @@ class RGWReshard { int get(cls_rgw_reshard_entry& entry); int remove(cls_rgw_reshard_entry& entry); int list(string& marker, uint32_t max, list& entries, bool& is_truncated); - int set_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry); int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry); /* if succefull, keeps the bucket index locked. It will be unlocked -- 2.39.5