From 5d283074750dc6bd458877bd42921037b5bb7f4b Mon Sep 17 00:00:00 2001 From: "J. Eric Ivancich" Date: Mon, 19 Jul 2021 14:23:42 -0400 Subject: [PATCH] rgw: allow CLSRGWConcurrentIO to handle "advancing" retries When doing an asynchronous/concurrent bucket index operation against multiple bucket index shards, a special error code is set aside to indicate that an "advancing" retry of a/some shard(s) is necessary. In that case another asynchronous call is made on the indicated shard(s) from the client (i.e., CLSRGWConcurrentIO). It is up to the subclass of CLSRGWConcurrentIO to handle the retry such that it "advances" and simply doesn't get stuck, looping forever. The retry functionality only works when the "need_multiple_rounds" functionality is not in use. Signed-off-by: J. Eric Ivancich --- src/cls/rgw/cls_rgw_client.cc | 99 ++++++++++++++++++++++++++++++++--- src/cls/rgw/cls_rgw_client.h | 62 +++++----------------- src/cls/rgw/cls_rgw_const.h | 5 ++ 3 files changed, 112 insertions(+), 54 deletions(-) diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index a93b28bd5deac..2f1a9164f33b3 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -21,6 +21,67 @@ using namespace librados; const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#"; const string BucketIndexShardsManager::SHARDS_SEPARATOR = ","; + +int CLSRGWConcurrentIO::operator()() { + int ret = 0; + iter = objs_container.begin(); + for (; iter != objs_container.end() && max_aio-- > 0; ++iter) { + ret = issue_op(iter->first, iter->second); + if (ret < 0) + break; + } + + int num_completions = 0, r = 0; + std::map completed_objs; + std::map retry_objs; + while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, + need_multiple_rounds() ? &completed_objs : nullptr, + !need_multiple_rounds() ? &retry_objs : nullptr)) { + if (r >= 0 && ret >= 0) { + for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) { + int issue_ret = issue_op(iter->first, iter->second); + if (issue_ret < 0) { + ret = issue_ret; + break; + } + } + } else if (ret >= 0) { + ret = r; + } + + // if we're at the end with this round, see if another round is needed + if (iter == objs_container.end()) { + if (need_multiple_rounds() && !completed_objs.empty()) { + // For those objects which need another round, use them to reset + // the container + reset_container(completed_objs); + iter = objs_container.begin(); + } else if (! need_multiple_rounds() && !retry_objs.empty()) { + reset_container(retry_objs); + iter = objs_container.begin(); + } + + // re-issue ops if container was reset above (i.e., iter != + // objs_container.end()); if it was not reset above (i.e., iter + // == objs_container.end()) the loop will exit immediately + // without iterating + for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) { + int issue_ret = issue_op(iter->first, iter->second); + if (issue_ret < 0) { + ret = issue_ret; + break; + } + } + } + } + + if (ret < 0) { + cleanup(); + } + return ret; +} // CLSRGWConcurrintIO::operator()() + + /** * This class represents the bucket index object operation callback context. */ @@ -33,7 +94,9 @@ public: ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { ceph_assert(data); } ~ClsBucketIndexOpCtx() override {} void handle_completion(int r, bufferlist& outbl) override { - if (r >= 0) { + // if successful, or we're asked for a retry, copy result into + // destination (*data) + if (r >= 0 || r == RGWBIAdvanceAndRetryError) { try { auto iter = outbl.cbegin(); decode((*data), iter); @@ -67,7 +130,11 @@ void BucketIndexAioManager::do_completion(const int request_id) { } bool BucketIndexAioManager::wait_for_completions(int valid_ret_code, - int *num_completions, int *ret_code, map *objs) { + int *num_completions, + int *ret_code, + std::map *completed_objs, + std::map *retry_objs) +{ std::unique_lock locker{lock}; if (pendings.empty() && completions.empty()) { return false; @@ -82,18 +149,38 @@ bool BucketIndexAioManager::wait_for_completions(int valid_ret_code, auto iter = completions.begin(); for (; iter != completions.end(); ++iter) { int r = iter->second->get_return_value(); - if (objs && r == 0) { /* update list of successfully completed objs */ + + // see if we may need to copy completions or retries + if (completed_objs || retry_objs) { auto liter = completion_objs.find(iter->first); if (liter != completion_objs.end()) { - (*objs)[liter->second.shard_id] = liter->second.oid; + if (completed_objs && r == 0) { /* update list of successfully completed objs */ + (*completed_objs)[liter->second.shard_id] = liter->second.oid; + } + + if (r == RGWBIAdvanceAndRetryError) { + r = 0; + if (retry_objs) { + (*retry_objs)[liter->second.shard_id] = liter->second.oid; + } + } + } else { + // NB: should we log an error here; currently no logging + // context to use } } - if (ret_code && (r < 0 && r != valid_ret_code)) + + if (ret_code && (r < 0 && r != valid_ret_code)) { (*ret_code) = r; + } + iter->second->release(); } - if (num_completions) + + if (num_completions) { (*num_completions) = completions.size(); + } + completions.clear(); return true; diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index bcabd51391ff0..44905671c1372 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -107,8 +107,11 @@ public: * * Return false if there is no pending AIO, true otherwise. */ - bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code, - std::map *objs); + bool wait_for_completions(int valid_ret_code, + int *num_completions = nullptr, + int *ret_code = nullptr, + std::map *completed_objs = nullptr, + std::map *retry_objs = nullptr); /** * Do aio read operation. @@ -262,8 +265,12 @@ void cls_rgw_bucket_init_index(librados::ObjectWriteOperation& o); class CLSRGWConcurrentIO { protected: librados::IoCtx& io_ctx; + + // map of shard # to oid; the shards that are remaining to be processed std::map& objs_container; + // iterator to work through objs_container std::map::iterator iter; + uint32_t max_aio; BucketIndexAioManager manager; @@ -289,51 +296,9 @@ public: virtual ~CLSRGWConcurrentIO() {} - int operator()() { - int ret = 0; - iter = objs_container.begin(); - for (; iter != objs_container.end() && max_aio-- > 0; ++iter) { - ret = issue_op(iter->first, iter->second); - if (ret < 0) - break; - } + int operator()(); +}; // class CLSRGWConcurrentIO - int num_completions = 0, r = 0; - std::map objs; - std::map *pobjs = (need_multiple_rounds() ? &objs : NULL); - while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, pobjs)) { - if (r >= 0 && ret >= 0) { - for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) { - int issue_ret = issue_op(iter->first, iter->second); - if (issue_ret < 0) { - ret = issue_ret; - break; - } - } - } else if (ret >= 0) { - ret = r; - } - if (need_multiple_rounds() && iter == objs_container.end() && !objs.empty()) { - // For those objects which need another round, use them to reset - // the container - reset_container(objs); - iter = objs_container.begin(); - for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) { - int issue_ret = issue_op(iter->first, iter->second); - if (issue_ret < 0) { - ret = issue_ret; - break; - } - } - } - } - - if (ret < 0) { - cleanup(); - } - return ret; - } -}; class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO { protected: @@ -341,8 +306,9 @@ protected: int valid_ret_code() override { return -EEXIST; } void cleanup() override; public: - CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, std::map& _bucket_objs, - uint32_t _max_aio) : + CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, + std::map& _bucket_objs, + uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {} }; diff --git a/src/cls/rgw/cls_rgw_const.h b/src/cls/rgw/cls_rgw_const.h index 63f7517334488..6015872f8be81 100644 --- a/src/cls/rgw/cls_rgw_const.h +++ b/src/cls/rgw/cls_rgw_const.h @@ -5,6 +5,11 @@ #define RGW_CLASS "rgw" +/* Special error code returned by cls bucket list operation if it was + * unable to skip past enough not visibile entries to return any + * entries in the call. */ +constexpr int RGWBIAdvanceAndRetryError = -EFBIG; + /* bucket index */ #define RGW_BUCKET_INIT_INDEX "bucket_init_index" -- 2.39.5