From: Yehuda Sadeh Date: Fri, 19 Sep 2014 21:55:12 +0000 (-0700) Subject: cls_rgw, rgw: create base class for common bucket shard operations X-Git-Tag: v0.92~12^2~32 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=04441f28789094c6259115f8a9dc38f9e2b44e1a;p=ceph.git cls_rgw, rgw: create base class for common bucket shard operations Instead of copy pasting the same code all over again, create a base class for the needed concurrent IO operations. Signed-off-by: Yehuda Sadeh --- diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index ed937bc52eb0..f67b914aca8c 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -103,46 +103,6 @@ static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx, return r; } -int cls_rgw_bucket_index_init_op(librados::IoCtx& io_ctx, - const vector& bucket_objs, uint32_t max_aio) -{ - int ret = 0; - vector::const_iterator iter = bucket_objs.begin(); - BucketIndexAioManager manager; - // Issue *max_aio* requests, all subsequent requests are issued upon - // pending request finishing - for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) { - ret = issue_bucket_index_init_op(io_ctx, *iter, &manager); - if (ret < 0) - break; - } - - int num_completions, r = 0; - while (manager.wait_for_completions(-EEXIST, &num_completions, &r)) { - if (r >= 0 && ret >= 0) { - for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) { - int issue_ret = issue_bucket_index_init_op(io_ctx, *iter, &manager); - if(issue_ret < 0) { - ret = issue_ret; - break; - } - } - } else if (ret >= 0) { - ret = r; - } - } - - if (ret < 0) { - // Do best effort removal - vector::const_iterator citer = bucket_objs.begin(); - for(; citer != iter; ++citer) { - io_ctx.remove(*citer); - } - } - - return ret; -} - static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx, const string& oid, uint64_t timeout, BucketIndexAioManager *manager) { bufferlist in; @@ -160,14 +120,12 @@ static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx, return r; } -int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx, const vector& bucket_objs, - uint64_t tag_timeout, uint32_t max_aio) -{ +int CLSRGWConcurrentIO::operator()() { int ret = 0; vector::const_iterator iter = bucket_objs.begin(); BucketIndexAioManager manager; for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) { - ret = issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager); + ret = issue_op(*iter); if (ret < 0) break; } @@ -176,7 +134,7 @@ int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx, const vector while (manager.wait_for_completions(0, &num_completions, &r)) { if (r >= 0 && ret >= 0) { for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) { - int issue_ret = issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager); + int issue_ret = issue_op(*iter); if(issue_ret < 0) { ret = issue_ret; break; @@ -186,9 +144,32 @@ int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx, const vector ret = r; } } + + if (ret < 0) { + cleanup(); + } return ret; } +int CLSRGWIssueBucketIndexInit::issue_op(const string& obj) +{ + issued_objs.push_back(obj); + return issue_bucket_index_init_op(io_ctx, obj, &manager); +} + +void CLSRGWIssueBucketIndexInit::cleanup() +{ + // Do best effort removal + for (vector::iterator iter = issued_objs.begin(); iter != issued_objs.end(); ++iter) { + io_ctx.remove(*iter); + } +} + +int CLSRGWIssueSetTagTimeout::issue_op(const string& obj) +{ + return issue_bucket_set_tag_timeout_op(io_ctx, obj, tag_timeout, &manager); +} + void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag, string& name, string& locator, bool log_op) { diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index b28425e1fba6..e60589d7995d 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -111,20 +111,46 @@ public: /* bucket index */ void cls_rgw_bucket_init(librados::ObjectWriteOperation& o); -/** - * Init bucket index objects. - * - * io_ctx - IO context for rados. - * bucket_objs - a lit of bucket index objects. - * max_io - the maximum number of AIO (for throttling). - * - * Reutrn 0 on success, a failure code otherwise. - */ -int cls_rgw_bucket_index_init_op(librados::IoCtx &io_ctx, - const vector& bucket_objs, uint32_t max_aio); +class CLSRGWConcurrentIO { +protected: + librados::IoCtx& io_ctx; + vector& bucket_objs; + uint32_t max_aio; + BucketIndexAioManager manager; + + virtual int issue_op(const string& obj) = 0; + + virtual void cleanup() {} + virtual int valid_ret_code() { return 0; } + +public: + CLSRGWConcurrentIO(librados::IoCtx& ioc, vector& _bucket_objs, + uint32_t _max_aio) : io_ctx(ioc), bucket_objs(_bucket_objs), max_aio(_max_aio) {} + virtual ~CLSRGWConcurrentIO() {} -int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx, - const vector& bucket_objs, uint64_t tag_timeout, uint32_t max_aio); + int operator()(); +}; + +class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO { + vector issued_objs; +protected: + int issue_op(const string& obj); + int valid_ret_code() { return -EEXIST; } + void cleanup(); +public: + CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, vector& _bucket_objs, + uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {} +}; + +class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO { + uint64_t tag_timeout; +protected: + int issue_op(const string& obj); +public: + CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, vector& _bucket_objs, + uint32_t _max_aio, uint64_t _tag_timeout) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), + tag_timeout(_tag_timeout) {} +}; void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag, string& name, string& locator, bool log_op); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index a5afa49b916e..7b1ea8afc0f1 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2380,7 +2380,7 @@ int RGWRados::init_bucket_index(rgw_bucket& bucket) vector bucket_objs; get_bucket_index_objects(dir_oid, bucket_index_max_shards, bucket_objs); - return cls_rgw_bucket_index_init_op(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio); + return CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); } /** @@ -6249,7 +6249,7 @@ int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeou if (r < 0) return r; - return cls_rgw_bucket_set_tag_timeout(index_ctx, bucket_objs, timeout, cct->_conf->rgw_bucket_index_max_aio); + return CLSRGWIssueSetTagTimeout(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio, timeout)(); } int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix,