From 6b1c4a0bc263a3deef0638b8958dae95e1c6b462 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 5 Dec 2014 14:10:50 -0800 Subject: [PATCH] cls_rgw: clean up CLSRGWConcurrentIO Class is no longer a template, and keeps a map of oids by shard_id. Call issue_op() using both shard_id and oids. Shard id is used for mapping the results in the derived classes. Signed-off-by: Yehuda Sadeh --- src/cls/rgw/cls_rgw_client.cc | 52 +++++++-------- src/cls/rgw/cls_rgw_client.h | 120 +++++++++++++++++++--------------- 2 files changed, 92 insertions(+), 80 deletions(-) diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index c0c5d7c7731e7..51f1d0fbae2fb 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -115,22 +115,22 @@ static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx, return manager->aio_operate(io_ctx, oid, &op); } -int CLSRGWIssueBucketIndexInit::issue_op() +int CLSRGWIssueBucketIndexInit::issue_op(int shard_id, const string& oid) { - return issue_bucket_index_init_op(io_ctx, *iter, &manager); + return issue_bucket_index_init_op(io_ctx, oid, &manager); } void CLSRGWIssueBucketIndexInit::cleanup() { // Do best effort removal - for (vector::iterator citer = objs_container.begin(); citer != iter; ++citer) { - io_ctx.remove(*citer); + for (map::iterator citer = objs_container.begin(); citer != iter; ++citer) { + io_ctx.remove(citer->second); } } -int CLSRGWIssueSetTagTimeout::issue_op() +int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid) { - return issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager); + return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager); } void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag, @@ -182,17 +182,17 @@ static bool issue_bucket_list_op(librados::IoCtx& io_ctx, return manager->aio_operate(io_ctx, oid, &op); } -int CLSRGWIssueBucketList::issue_op() +int CLSRGWIssueBucketList::issue_op(int shard_id, const string& oid) { - return issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second); + return issue_bucket_list_op(io_ctx, oid, start_obj, filter_prefix, num_entries, &manager, &result[shard_id]); } -static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, - const string& oid, BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager, +static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, int shard_id, + BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager, struct cls_rgw_bi_log_list_ret *pdata) { bufferlist in; cls_rgw_bi_log_list_op call; - call.marker = marker_mgr.get(oid, ""); + call.marker = marker_mgr.get(shard_id, ""); call.max = max; ::encode(call, in); @@ -201,27 +201,27 @@ static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, return manager->aio_operate(io_ctx, oid, &op); } -int CLSRGWIssueBILogList::issue_op() +int CLSRGWIssueBILogList::issue_op(int shard_id, const string& oid) { - return issue_bi_log_list_op(io_ctx, iter->first, marker_mgr, max, &manager, &iter->second); + return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]); } -static bool issue_bi_log_trim(librados::IoCtx& io_ctx, - string& oid, BucketIndexShardsManager& start_marker_mgr, - BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) { +static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id, + BucketIndexShardsManager& start_marker_mgr, + BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) { bufferlist in; cls_rgw_bi_log_trim_op call; - call.start_marker = start_marker_mgr.get(oid, ""); - call.end_marker = end_marker_mgr.get(oid, ""); + call.start_marker = start_marker_mgr.get(shard_id, ""); + call.end_marker = end_marker_mgr.get(shard_id, ""); ::encode(call, in); ObjectWriteOperation op; op.exec("rgw", "bi_log_trim", in); return manager->aio_operate(io_ctx, oid, &op); } -int CLSRGWIssueBILogTrim::issue_op() +int CLSRGWIssueBILogTrim::issue_op(int shard_id, const string& oid) { - return issue_bi_log_trim(io_ctx, *iter, start_marker_mgr, end_marker_mgr, &manager); + return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager); } static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager, @@ -233,9 +233,9 @@ static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, Bucket return manager->aio_operate(io_ctx, oid, &op); } -int CLSRGWIssueBucketCheck::issue_op() +int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid) { - return issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second); + return issue_bucket_check_index_op(io_ctx, oid, &manager, &result[shard_id]); } static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid, @@ -246,9 +246,9 @@ static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid, return manager->aio_operate(io_ctx, oid, &op); } -int CLSRGWIssueBucketRebuild::issue_op() +int CLSRGWIssueBucketRebuild::issue_op(int shard_id, const string& oid) { - return issue_bucket_rebuild_index_op(io_ctx, *iter, &manager); + return issue_bucket_rebuild_index_op(io_ctx, oid, &manager); } void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates) @@ -262,9 +262,9 @@ void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates) o.exec("rgw", "dir_suggest_changes", updates); } -int CLSRGWIssueGetDirHeader::issue_op() +int CLSRGWIssueGetDirHeader::issue_op(int shard_id, const string& oid) { - return issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second); + return issue_bucket_list_op(io_ctx, oid, "", "", 0, &manager, &result[shard_id]); } class GetDirHeaderCompletion : public ObjectOperationCompletion { diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 14a702cc42962..cd6e852baaac5 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -129,17 +129,17 @@ public: class BucketIndexShardsManager { private: // Per shard setting manager, for example, marker. - map value_by_shards; + map value_by_shards; public: const static string KEY_VALUE_SEPARATOR; const static string SHARDS_SEPARATOR; - void add(const string& shard, const string& value) { + void add(int shard, const string& value) { value_by_shards[shard] = value; } - const string& get(const string& shard, const string& default_value) { - map::iterator iter = value_by_shards.find(shard); + const string& get(int shard, const string& default_value) { + map::iterator iter = value_by_shards.find(shard); return (iter == value_by_shards.end() ? default_value : iter->second); } @@ -149,7 +149,7 @@ public: void to_string(string *out) const { if (out) { - map::const_iterator iter = value_by_shards.begin(); + map::const_iterator iter = value_by_shards.begin(); // No shards if (value_by_shards.size() == 1) { *out = iter->second; @@ -159,7 +159,9 @@ public: // Not the first item, append a separator first out->append(SHARDS_SEPARATOR); } - out->append(iter->first); + char buf[16]; + snprintf(buf, sizeof(buf), "%d", iter->first); + out->append(buf); out->append(KEY_VALUE_SEPARATOR); out->append(iter->second); } @@ -167,20 +169,26 @@ public: } } - int from_string(const string& composed_marker, bool has_shards, const string& oid) { + int from_string(const string& composed_marker, bool has_shards) { value_by_shards.clear(); if (!has_shards) { - add(oid, composed_marker); + add(0, composed_marker); } else { list shards; get_str_list(composed_marker, SHARDS_SEPARATOR.c_str(), shards); list::const_iterator iter = shards.begin(); for (; iter != shards.end(); ++iter) { size_t pos = iter->find(KEY_VALUE_SEPARATOR); - if (pos == string::npos) + if (pos == string::npos) { return -EINVAL; - string name = iter->substr(0, pos); - value_by_shards[name] = iter->substr(pos + 1, iter->length() - pos - 1); + } + string shard_str = iter->substr(0, pos); + string err; + int shard = (int)strict_strtol(shard_str.c_str(), 10, &err); + if (!err.empty()) { + return -EINVAL; + } + value_by_shards[shard] = iter->substr(pos + 1); } } return 0; @@ -190,16 +198,15 @@ public: /* bucket index */ void cls_rgw_bucket_init(librados::ObjectWriteOperation& o); -template class CLSRGWConcurrentIO { protected: librados::IoCtx& io_ctx; - T& objs_container; - typename T::iterator iter; + map& objs_container; + map::iterator iter; uint32_t max_aio; BucketIndexAioManager manager; - virtual int issue_op() = 0; + virtual int issue_op(int shard_id, const string& oid) = 0; virtual void cleanup() {} virtual int valid_ret_code() { return 0; } @@ -211,7 +218,7 @@ protected: virtual void reset_container(vector& objs) {} public: - CLSRGWConcurrentIO(librados::IoCtx& ioc, T& _objs_container, + CLSRGWConcurrentIO(librados::IoCtx& ioc, map& _objs_container, uint32_t _max_aio) : io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio) {} virtual ~CLSRGWConcurrentIO() {} @@ -219,7 +226,7 @@ public: int ret = 0; iter = objs_container.begin(); for (; iter != objs_container.end() && max_aio-- > 0; ++iter) { - ret = issue_op(); + ret = issue_op(iter->first, iter->second); if (ret < 0) break; } @@ -229,7 +236,7 @@ public: while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, &objs)) { if (r >= 0 && ret >= 0) { for(int i = 0; i < num_completions && iter != objs_container.end(); ++i, ++iter) { - int issue_ret = issue_op(); + int issue_ret = issue_op(iter->first, iter->second); if(issue_ret < 0) { ret = issue_ret; break; @@ -252,26 +259,25 @@ public: } }; -class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO > { +class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO { protected: - int issue_op(); + int issue_op(int shard_id, const string& oid); int valid_ret_code() { return -EEXIST; } void cleanup(); public: - CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, vector& _bucket_objs, + CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, map& _bucket_objs, uint32_t _max_aio) : - CLSRGWConcurrentIO >(ioc, _bucket_objs, _max_aio) {} + CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {} }; -class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO > { +class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO { uint64_t tag_timeout; protected: - int issue_op(); + int issue_op(int shard_id, const string& oid); public: - CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, vector& _bucket_objs, + CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, map& _bucket_objs, uint32_t _max_aio, uint64_t _tag_timeout) : - CLSRGWConcurrentIO >(ioc, _bucket_objs, _max_aio), - tag_timeout(_tag_timeout) {} + CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), tag_timeout(_tag_timeout) {} }; void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag, @@ -298,51 +304,55 @@ void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation& o, RGWModifyOp o * Return 0 on success, a failure code otherwise. */ -class CLSRGWIssueBucketList : public CLSRGWConcurrentIO > { +class CLSRGWIssueBucketList : public CLSRGWConcurrentIO { string start_obj; string filter_prefix; uint32_t num_entries; + map& result; protected: - int issue_op(); + int issue_op(int shard_id, const string& oid); public: CLSRGWIssueBucketList(librados::IoCtx& io_ctx, const string& _start_obj, const string& _filter_prefix, uint32_t _num_entries, - map& list_results, + map& oids, + map& list_results, uint32_t max_aio) : - CLSRGWConcurrentIO > (io_ctx, list_results, max_aio), - start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries) {} + CLSRGWConcurrentIO(io_ctx, oids, max_aio), + start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries), result(list_results) {} }; -class CLSRGWIssueBILogList : public CLSRGWConcurrentIO > { +class CLSRGWIssueBILogList : public CLSRGWConcurrentIO { + map& result; BucketIndexShardsManager& marker_mgr; uint32_t max; protected: - int issue_op(); + int issue_op(int shard_id, const string& oid); public: CLSRGWIssueBILogList(librados::IoCtx& io_ctx, BucketIndexShardsManager& _marker_mgr, uint32_t _max, - map& bi_log_lists, uint32_t max_aio) : - CLSRGWConcurrentIO >(io_ctx, bi_log_lists, max_aio), + map& oids, + map& bi_log_lists, uint32_t max_aio) : + CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(bi_log_lists), marker_mgr(_marker_mgr), max(_max) {} }; -class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO > { +class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO { BucketIndexShardsManager& start_marker_mgr; BucketIndexShardsManager& end_marker_mgr; protected: - int issue_op(); + int issue_op(int shard_id, const string& oid); // Trim until -ENODATA is returned. int valid_ret_code() { return -ENODATA; } bool need_multiple_rounds() { return true; } - void add_object(const string& oid) { objs_container.push_back(oid); } - void reset_container(vector& objs) { + void add_object(int shard, const string& oid) { objs_container[shard] = oid; } + void reset_container(map& objs) { objs_container.swap(objs); iter = objs_container.begin(); objs.clear(); } public: CLSRGWIssueBILogTrim(librados::IoCtx& io_ctx, BucketIndexShardsManager& _start_marker_mgr, - BucketIndexShardsManager& _end_marker_mgr, vector& _bucket_objs, uint32_t max_aio) : - CLSRGWConcurrentIO >(io_ctx, _bucket_objs, max_aio), + BucketIndexShardsManager& _end_marker_mgr, map& _bucket_objs, uint32_t max_aio) : + CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio), start_marker_mgr(_start_marker_mgr), end_marker_mgr(_end_marker_mgr) {} }; @@ -355,30 +365,32 @@ public: * * Return 0 on success, a failure code otherwise. */ -class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO > { +class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO /* >*/ { + map& result; protected: - int issue_op(); + int issue_op(int shard_id, const string& oid); public: - CLSRGWIssueBucketCheck(librados::IoCtx& ioc, map& bucket_objs_ret, + CLSRGWIssueBucketCheck(librados::IoCtx& ioc, map& oids, map& bucket_objs_ret, uint32_t _max_aio) : - CLSRGWConcurrentIO >(ioc, bucket_objs_ret, _max_aio) {} + CLSRGWConcurrentIO(ioc, oids, _max_aio), result(bucket_objs_ret) {} }; -class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO > { +class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO { protected: - int issue_op(); + int issue_op(int shard_id, const string& oid); public: - CLSRGWIssueBucketRebuild(librados::IoCtx& io_ctx, vector& bucket_objs, - uint32_t max_aio) : CLSRGWConcurrentIO >(io_ctx, bucket_objs, max_aio) {} + CLSRGWIssueBucketRebuild(librados::IoCtx& io_ctx, map& bucket_objs, + uint32_t max_aio) : CLSRGWConcurrentIO(io_ctx, bucket_objs, max_aio) {} }; -class CLSRGWIssueGetDirHeader : public CLSRGWConcurrentIO > { +class CLSRGWIssueGetDirHeader : public CLSRGWConcurrentIO { + map& result; protected: - int issue_op(); + int issue_op(int shard_id, const string& oid); public: - CLSRGWIssueGetDirHeader(librados::IoCtx& io_ctx, map& dir_headers, + CLSRGWIssueGetDirHeader(librados::IoCtx& io_ctx, map& oids, map& dir_headers, uint32_t max_aio) : - CLSRGWConcurrentIO >(io_ctx, dir_headers, max_aio) {} + CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(dir_headers) {} }; int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, map& dir_headers, -- 2.47.3