From: J. Eric Ivancich Date: Mon, 19 Jul 2021 18:23:42 +0000 (-0400) Subject: rgw: allow CLSRGWConcurrentIO to handle "advancing" retries X-Git-Tag: v15.2.17~95^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5b4416671a3a9f1fbef0d20d081d05e41851d926;p=ceph.git rgw: allow CLSRGWConcurrentIO to handle "advancing" retries When doing an asynchronous/concurrent bucket index operation against multiple bucket index shards, a special error code is set aside to indicate that an "advancing" retry of a/some shard(s) is necessary. In that case another asynchronous call is made on the indicated shard(s) from the client (i.e., CLSRGWConcurrentIO). It is up to the subclass of CLSRGWConcurrentIO to handle the retry such that it "advances" and simply doesn't get stuck, looping forever. The retry functionality only works when the "need_multiple_rounds" functionality is not in use. Signed-off-by: J. Eric Ivancich (cherry picked from commit 5d283074750dc6bd458877bd42921037b5bb7f4b) Conflicts: src/cls/rgw/cls_rgw_client.cc src/cls/rgw/cls_rgw_client.h Resolved by taking the patch version -- all cases of auto type and std:: --- diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 414ecbad345..380afee40c9 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -13,6 +13,67 @@ using namespace librados; const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#"; const string BucketIndexShardsManager::SHARDS_SEPARATOR = ","; + +int CLSRGWConcurrentIO::operator()() { + int ret = 0; + iter = objs_container.begin(); + for (; iter != objs_container.end() && max_aio-- > 0; ++iter) { + ret = issue_op(iter->first, iter->second); + if (ret < 0) + break; + } + + int num_completions = 0, r = 0; + std::map completed_objs; + std::map retry_objs; + while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, + need_multiple_rounds() ? &completed_objs : nullptr, + !need_multiple_rounds() ? &retry_objs : nullptr)) { + if (r >= 0 && ret >= 0) { + for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) { + int issue_ret = issue_op(iter->first, iter->second); + if (issue_ret < 0) { + ret = issue_ret; + break; + } + } + } else if (ret >= 0) { + ret = r; + } + + // if we're at the end with this round, see if another round is needed + if (iter == objs_container.end()) { + if (need_multiple_rounds() && !completed_objs.empty()) { + // For those objects which need another round, use them to reset + // the container + reset_container(completed_objs); + iter = objs_container.begin(); + } else if (! need_multiple_rounds() && !retry_objs.empty()) { + reset_container(retry_objs); + iter = objs_container.begin(); + } + + // re-issue ops if container was reset above (i.e., iter != + // objs_container.end()); if it was not reset above (i.e., iter + // == objs_container.end()) the loop will exit immediately + // without iterating + for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) { + int issue_ret = issue_op(iter->first, iter->second); + if (issue_ret < 0) { + ret = issue_ret; + break; + } + } + } + } + + if (ret < 0) { + cleanup(); + } + return ret; +} // CLSRGWConcurrintIO::operator()() + + /** * This class represents the bucket index object operation callback context. */ @@ -25,7 +86,9 @@ public: ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { ceph_assert(data); } ~ClsBucketIndexOpCtx() override {} void handle_completion(int r, bufferlist& outbl) override { - if (r >= 0) { + // if successful, or we're asked for a retry, copy result into + // destination (*data) + if (r >= 0 || r == RGWBIAdvanceAndRetryError) { try { auto iter = outbl.cbegin(); decode((*data), iter); @@ -59,7 +122,11 @@ void BucketIndexAioManager::do_completion(const int request_id) { } bool BucketIndexAioManager::wait_for_completions(int valid_ret_code, - int *num_completions, int *ret_code, map *objs) { + int *num_completions, + int *ret_code, + std::map *completed_objs, + std::map *retry_objs) +{ std::unique_lock locker{lock}; if (pendings.empty() && completions.empty()) { return false; @@ -74,18 +141,38 @@ bool BucketIndexAioManager::wait_for_completions(int valid_ret_code, map::iterator iter = completions.begin(); for (; iter != completions.end(); ++iter) { int r = iter->second->get_return_value(); - if (objs && r == 0) { /* update list of successfully completed objs */ - map::iterator liter = completion_objs.find(iter->first); + + // see if we may need to copy completions or retries + if (completed_objs || retry_objs) { + auto liter = completion_objs.find(iter->first); if (liter != completion_objs.end()) { - (*objs)[liter->second.shard_id] = liter->second.oid; + if (completed_objs && r == 0) { /* update list of successfully completed objs */ + (*completed_objs)[liter->second.shard_id] = liter->second.oid; + } + + if (r == RGWBIAdvanceAndRetryError) { + r = 0; + if (retry_objs) { + (*retry_objs)[liter->second.shard_id] = liter->second.oid; + } + } + } else { + // NB: should we log an error here; currently no logging + // context to use } } - if (ret_code && (r < 0 && r != valid_ret_code)) + + if (ret_code && (r < 0 && r != valid_ret_code)) { (*ret_code) = r; + } + iter->second->release(); } - if (num_completions) + + if (num_completions) { (*num_completions) = completions.size(); + } + completions.clear(); return true; diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 43225b9dbe6..cfaab7c389d 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -108,8 +108,11 @@ public: * * Return false if there is no pending AIO, true otherwise. */ - bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code, - map *objs); + bool wait_for_completions(int valid_ret_code, + int *num_completions = nullptr, + int *ret_code = nullptr, + std::map *completed_objs = nullptr, + std::map *retry_objs = nullptr); /** * Do aio read operation. @@ -261,8 +264,12 @@ void cls_rgw_bucket_init_index(librados::ObjectWriteOperation& o); class CLSRGWConcurrentIO { protected: librados::IoCtx& io_ctx; - map& objs_container; - map::iterator iter; + + // map of shard # to oid; the shards that are remaining to be processed + std::map& objs_container; + // iterator to work through objs_container + std::map::iterator iter; + uint32_t max_aio; BucketIndexAioManager manager; @@ -288,51 +295,9 @@ public: virtual ~CLSRGWConcurrentIO() {} - int operator()() { - int ret = 0; - iter = objs_container.begin(); - for (; iter != objs_container.end() && max_aio-- > 0; ++iter) { - ret = issue_op(iter->first, iter->second); - if (ret < 0) - break; - } + int operator()(); +}; // class CLSRGWConcurrentIO - int num_completions = 0, r = 0; - map objs; - map *pobjs = (need_multiple_rounds() ? &objs : NULL); - while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, pobjs)) { - if (r >= 0 && ret >= 0) { - for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) { - int issue_ret = issue_op(iter->first, iter->second); - if (issue_ret < 0) { - ret = issue_ret; - break; - } - } - } else if (ret >= 0) { - ret = r; - } - if (need_multiple_rounds() && iter == objs_container.end() && !objs.empty()) { - // For those objects which need another round, use them to reset - // the container - reset_container(objs); - iter = objs_container.begin(); - for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) { - int issue_ret = issue_op(iter->first, iter->second); - if (issue_ret < 0) { - ret = issue_ret; - break; - } - } - } - } - - if (ret < 0) { - cleanup(); - } - return ret; - } -}; class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO { protected: @@ -340,8 +305,9 @@ protected: int valid_ret_code() override { return -EEXIST; } void cleanup() override; public: - CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, map& _bucket_objs, - uint32_t _max_aio) : + CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, + std::map& _bucket_objs, + uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {} }; diff --git a/src/cls/rgw/cls_rgw_const.h b/src/cls/rgw/cls_rgw_const.h index 5957d2ffbfd..ad0b9e70961 100644 --- a/src/cls/rgw/cls_rgw_const.h +++ b/src/cls/rgw/cls_rgw_const.h @@ -6,6 +6,11 @@ #define RGW_CLASS "rgw" +/* Special error code returned by cls bucket list operation if it was + * unable to skip past enough not visibile entries to return any + * entries in the call. */ +constexpr int RGWBIAdvanceAndRetryError = -EFBIG; + /* bucket index */ #define RGW_BUCKET_INIT_INDEX "bucket_init_index"