From: Yehuda Sadeh Date: Fri, 19 Sep 2014 22:34:54 +0000 (-0700) Subject: cls_rgw, rgw: switch different ops to new concurrent infrastructure X-Git-Tag: v0.92~12^2~30 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=47665b23df9aed88a2d91586873ffee61139fccf;p=ceph.git cls_rgw, rgw: switch different ops to new concurrent infrastructure Make all the relevant ops use the CLSRGWConcurrentIO infrastructure, which simplifies things. Signed-off-by: Yehuda Sadeh --- diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index b1876a0478f7..e253474d7beb 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -193,35 +193,9 @@ static bool issue_bucket_list_op(librados::IoCtx& io_ctx, return r; } -int cls_rgw_list_op(IoCtx& io_ctx, const string& start_obj, - const string& filter_prefix, uint32_t num_entries, - map& list_results, uint32_t max_aio) +int CLSRGWIssueBucketList::issue_op() { - int ret = 0; - BucketIndexAioManager manager; - map::iterator iter = list_results.begin(); - for (; iter != list_results.end() && max_aio-- > 0; ++iter) { - ret = issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second); - if (ret < 0) - break; - } - - int num_completions, r = 0; - while (manager.wait_for_completions(0, &num_completions, &r)) { - if (r >= 0 && ret >= 0) { - for (int i = 0; i < num_completions && iter != list_results.end(); ++i, ++iter) { - int issue_ret = issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second); - if (issue_ret < 0) { - ret = issue_ret; - break; - } - } - } else if (ret >= 0) { - ret = r; - } - } - - return ret; + return issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second); } static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager, @@ -239,33 +213,9 @@ static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, Bucket return r; } -int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, - map& bucket_objs_ret, uint32_t max_aio) +int CLSRGWIssueBucketCheck::issue_op() { - int ret = 0; - BucketIndexAioManager manager; - map::iterator iter = bucket_objs_ret.begin(); - for (; iter != bucket_objs_ret.end() && max_aio-- > 0; ++iter) { - ret = issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second); - if (ret < 0) - break; - } - - int num_completions, r = 0; - while (manager.wait_for_completions(0, &num_completions, &r)) { - if (r >= 0 && ret >= 0) { - for (int i = 0; i < num_completions && iter != bucket_objs_ret.end(); ++i, ++iter) { - int issue_ret = issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second); - if (issue_ret < 0) { - ret = issue_ret; - break; - } - } - } else if (ret >= 0) { - ret = r; - } - } - return ret; + return issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second); } static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid, @@ -282,33 +232,9 @@ static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid, return r; } -int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, const vector& bucket_objs, - uint32_t max_aio) +int CLSRGWIssueBucketRebuild::issue_op() { - int ret = 0; - BucketIndexAioManager manager; - vector::const_iterator iter = bucket_objs.begin(); - for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) { - ret = issue_bucket_rebuild_index_op(io_ctx, *iter, &manager); - if (ret < 0) - break; - } - - int num_completions, r = 0; - while (manager.wait_for_completions(0, &num_completions, &r)) { - if (r >= 0 && ret >= 0) { - for (int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) { - int issue_ret = issue_bucket_rebuild_index_op(io_ctx, *iter, &manager); - if (issue_ret < 0) { - ret = issue_ret; - break; - } - } - } else if (ret >= 0) { - ret = r; - } - } - return ret; + return issue_bucket_rebuild_index_op(io_ctx, *iter, &manager); } void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates) @@ -322,33 +248,9 @@ void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates) o.exec("rgw", "dir_suggest_changes", updates); } -int cls_rgw_get_dir_header(IoCtx& io_ctx, map& dir_headers, - uint32_t max_aio) +int CLSRGWIssueGetDirHeader::issue_op() { - int ret = 0; - BucketIndexAioManager manager; - map::iterator iter = dir_headers.begin(); - for (; iter != dir_headers.end() && max_aio-- > 0; ++iter) { - ret = issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second); - if (ret < 0) - break; - } - - int num_completions, r = 0; - while (manager.wait_for_completions(0, &num_completions, &r)) { - if (r >= 0 && ret >= 0) { - for (int i = 0; i < num_completions && iter != dir_headers.end(); ++i, ++iter) { - int issue_ret = issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second); - if (issue_ret < 0) { - ret = issue_ret; - break; - } - } - } else if (ret >= 0) { - ret = r; - } - } - return ret; + return issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second); } class GetDirHeaderCompletion : public ObjectOperationCompletion { diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 19c1ab1e2214..ebd005fba335 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -4,6 +4,7 @@ #include "include/types.h" #include "include/rados/librados.hpp" #include "cls_rgw_types.h" +#include "cls_rgw_ops.h" #include "common/RefCountedObj.h" /* @@ -116,7 +117,7 @@ class CLSRGWConcurrentIO { protected: librados::IoCtx& io_ctx; T& objs_container; - typename T::const_iterator iter; + typename T::iterator iter; uint32_t max_aio; BucketIndexAioManager manager; @@ -126,7 +127,7 @@ protected: virtual int valid_ret_code() { return 0; } public: - CLSRGWConcurrentIO(librados::IoCtx& ioc, vector& _objs_container, + CLSRGWConcurrentIO(librados::IoCtx& ioc, T& _objs_container, uint32_t _max_aio) : io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio) {} virtual ~CLSRGWConcurrentIO() {} @@ -168,7 +169,8 @@ protected: void cleanup(); public: CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, vector& _bucket_objs, - uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {} + uint32_t _max_aio) : + CLSRGWConcurrentIO >(ioc, _bucket_objs, _max_aio) {} }; class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO > { @@ -177,8 +179,9 @@ protected: int issue_op(); public: CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, vector& _bucket_objs, - uint32_t _max_aio, uint64_t _tag_timeout) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), - tag_timeout(_tag_timeout) {} + uint32_t _max_aio, uint64_t _tag_timeout) : + CLSRGWConcurrentIO >(ioc, _bucket_objs, _max_aio), + tag_timeout(_tag_timeout) {} }; void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag, @@ -204,10 +207,21 @@ void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation& o, RGWModifyOp o * * Return 0 on success, a failure code otherwise. */ -int cls_rgw_list_op(librados::IoCtx& io_ctx, const string & start_obj, - const string& filter_prefix, uint32_t num_entries, - map& list_results, - uint32_t max_aio); + +class CLSRGWIssueBucketList : public CLSRGWConcurrentIO > { + string start_obj; + string filter_prefix; + uint32_t num_entries; +protected: + int issue_op(); +public: + CLSRGWIssueBucketList(librados::IoCtx& io_ctx, const string& _start_obj, + const string& _filter_prefix, uint32_t _num_entries, + map& list_results, + uint32_t max_aio) : + CLSRGWConcurrentIO > (io_ctx, list_results, max_aio), + start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries) {} +}; /** * Check the bucket index. @@ -218,11 +232,32 @@ int cls_rgw_list_op(librados::IoCtx& io_ctx, const string & start_obj, * * Return 0 on success, a failure code otherwise. */ -int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, - map& bucket_objs_ret, uint32_t max_aio); -int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, const vector& bucket_objs, - uint32_t max_aio); - +class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO > { +protected: + int issue_op(); +public: + CLSRGWIssueBucketCheck(librados::IoCtx& ioc, map& bucket_objs_ret, + uint32_t _max_aio) : + CLSRGWConcurrentIO >(ioc, bucket_objs_ret, _max_aio) {} +}; + +class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO > { +protected: + int issue_op(); +public: + CLSRGWIssueBucketRebuild(librados::IoCtx& io_ctx, vector& bucket_objs, + uint32_t max_aio) : CLSRGWConcurrentIO >(io_ctx, bucket_objs, max_aio) {} +}; + +class CLSRGWIssueGetDirHeader : public CLSRGWConcurrentIO > { +protected: + int issue_op(); +public: + CLSRGWIssueGetDirHeader(librados::IoCtx& io_ctx, map& dir_headers, + uint32_t max_aio) : + CLSRGWConcurrentIO >(io_ctx, dir_headers, max_aio) {} +}; + int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, map& dir_headers, uint32_t max_aio); int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 7b1ea8afc0f1..605b3ffee685 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -3876,7 +3876,7 @@ int RGWRados::bucket_check_index(rgw_bucket& bucket, if (ret < 0) return ret; - ret = cls_rgw_bucket_check_index_op(index_ctx, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio); + ret = CLSRGWIssueBucketCheck(index_ctx, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)(); if (ret < 0) return ret; @@ -3898,7 +3898,7 @@ int RGWRados::bucket_rebuild_index(rgw_bucket& bucket) if (r < 0) return r; - return cls_rgw_bucket_rebuild_index_op(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio); + return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); } @@ -6266,7 +6266,7 @@ int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const str if (r < 0) return r; - r = cls_rgw_list_op(index_ctx, start, prefix, num_entries, list_results, cct->_conf->rgw_bucket_index_max_aio); + r = CLSRGWIssueBucketList(index_ctx, start, prefix, num_entries, list_results, cct->_conf->rgw_bucket_index_max_aio)(); if (r < 0) return r; @@ -6558,7 +6558,7 @@ int RGWRados::cls_bucket_head(rgw_bucket& bucket, map_conf->rgw_bucket_index_max_aio); + r = CLSRGWIssueGetDirHeader(index_ctx, list_results, cct->_conf->rgw_bucket_index_max_aio)(); if (r < 0) return r;