]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: allow CLSRGWConcurrentIO to handle "advancing" retries
authorJ. Eric Ivancich <ivancich@redhat.com>
Mon, 19 Jul 2021 18:23:42 +0000 (14:23 -0400)
committerDan van der Ster <daniel.vanderster@cern.ch>
Tue, 1 Mar 2022 10:31:01 +0000 (11:31 +0100)
When doing an asynchronous/concurrent bucket index operation against
multiple bucket index shards, a special error code is set aside to
indicate that an "advancing" retry of a/some shard(s) is necessary. In
that case another asynchronous call is made on the indicated shard(s)
from the client (i.e., CLSRGWConcurrentIO).  It is up to the subclass
of CLSRGWConcurrentIO to handle the retry such that it "advances" and
simply doesn't get stuck, looping forever.

The retry functionality only works when the "need_multiple_rounds"
functionality is not in use.

Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
(cherry picked from commit 5d283074750dc6bd458877bd42921037b5bb7f4b)

src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/cls/rgw/cls_rgw_const.h

index a93b28bd5deac3f92c33cfd3ccd2c3e3fbb22b10..2f1a9164f33b3d615b8d4ccb063e500793dea707 100644 (file)
@@ -21,6 +21,67 @@ using namespace librados;
 const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#";
 const string BucketIndexShardsManager::SHARDS_SEPARATOR = ",";
 
+
+int CLSRGWConcurrentIO::operator()() {
+  int ret = 0;
+  iter = objs_container.begin();
+  for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
+    ret = issue_op(iter->first, iter->second);
+    if (ret < 0)
+      break;
+  }
+
+  int num_completions = 0, r = 0;
+  std::map<int, std::string> completed_objs;
+  std::map<int, std::string> retry_objs;
+  while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r,
+                                     need_multiple_rounds() ? &completed_objs : nullptr,
+                                     !need_multiple_rounds() ? &retry_objs : nullptr)) {
+    if (r >= 0 && ret >= 0) {
+      for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
+       int issue_ret = issue_op(iter->first, iter->second);
+       if (issue_ret < 0) {
+         ret = issue_ret;
+         break;
+       }
+      }
+    } else if (ret >= 0) {
+      ret = r;
+    }
+
+    // if we're at the end with this round, see if another round is needed
+    if (iter == objs_container.end()) {
+      if (need_multiple_rounds() && !completed_objs.empty()) {
+       // For those objects which need another round, use them to reset
+       // the container
+       reset_container(completed_objs);
+       iter = objs_container.begin();
+      } else if (! need_multiple_rounds() && !retry_objs.empty()) {
+       reset_container(retry_objs);
+       iter = objs_container.begin();
+      }
+
+      // re-issue ops if container was reset above (i.e., iter !=
+      // objs_container.end()); if it was not reset above (i.e., iter
+      // == objs_container.end()) the loop will exit immediately
+      // without iterating
+      for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
+       int issue_ret = issue_op(iter->first, iter->second);
+       if (issue_ret < 0) {
+         ret = issue_ret;
+         break;
+       }
+      }
+    }
+  }
+
+  if (ret < 0) {
+    cleanup();
+  }
+  return ret;
+} // CLSRGWConcurrintIO::operator()()
+
+
 /**
  * This class represents the bucket index object operation callback context.
  */
@@ -33,7 +94,9 @@ public:
   ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { ceph_assert(data); }
   ~ClsBucketIndexOpCtx() override {}
   void handle_completion(int r, bufferlist& outbl) override {
-    if (r >= 0) {
+    // if successful, or we're asked for a retry, copy result into
+    // destination (*data)
+    if (r >= 0 || r == RGWBIAdvanceAndRetryError) {
       try {
         auto iter = outbl.cbegin();
         decode((*data), iter);
@@ -67,7 +130,11 @@ void BucketIndexAioManager::do_completion(const int request_id) {
 }
 
 bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
-    int *num_completions, int *ret_code, map<int, string> *objs) {
+                                                int *num_completions,
+                                                int *ret_code,
+                                                std::map<int, std::string> *completed_objs,
+                                                std::map<int, std::string> *retry_objs)
+{
   std::unique_lock locker{lock};
   if (pendings.empty() && completions.empty()) {
     return false;
@@ -82,18 +149,38 @@ bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
   auto iter = completions.begin();
   for (; iter != completions.end(); ++iter) {
     int r = iter->second->get_return_value();
-    if (objs && r == 0) { /* update list of successfully completed objs */
+
+    // see if we may need to copy completions or retries
+    if (completed_objs || retry_objs) {
       auto liter = completion_objs.find(iter->first);
       if (liter != completion_objs.end()) {
-       (*objs)[liter->second.shard_id] = liter->second.oid;
+       if (completed_objs && r == 0) { /* update list of successfully completed objs */
+         (*completed_objs)[liter->second.shard_id] = liter->second.oid;
+       }
+
+       if (r == RGWBIAdvanceAndRetryError) {
+         r = 0;
+         if (retry_objs) {
+           (*retry_objs)[liter->second.shard_id] = liter->second.oid;
+         }
+       }
+      } else {
+       // NB: should we log an error here; currently no logging
+       // context to use
       }
     }
-    if (ret_code && (r < 0 && r != valid_ret_code))
+
+    if (ret_code && (r < 0 && r != valid_ret_code)) {
       (*ret_code) = r;
+    }
+
     iter->second->release();
   }
-  if (num_completions)
+
+  if (num_completions) {
     (*num_completions) = completions.size();
+  }
+
   completions.clear();
 
   return true;
index 6cff4a2d5ba87801c92b447bd2d885eb8c3859af..7fa8e52e91cca0665d42af0e6529c097974b86bb 100644 (file)
@@ -108,8 +108,11 @@ public:
    *
    * Return false if there is no pending AIO, true otherwise.
    */
-  bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code,
-      std::map<int, std::string> *objs);
+  bool wait_for_completions(int valid_ret_code,
+                           int *num_completions = nullptr,
+                           int *ret_code = nullptr,
+                           std::map<int, std::string> *completed_objs = nullptr,
+                           std::map<int, std::string> *retry_objs = nullptr);
 
   /**
    * Do aio read operation.
@@ -263,8 +266,12 @@ void cls_rgw_bucket_init_index(librados::ObjectWriteOperation& o);
 class CLSRGWConcurrentIO {
 protected:
   librados::IoCtx& io_ctx;
+
+  // map of shard # to oid; the shards that are remaining to be processed
   std::map<int, std::string>& objs_container;
+  // iterator to work through objs_container
   std::map<int, std::string>::iterator iter;
+
   uint32_t max_aio;
   BucketIndexAioManager manager;
 
@@ -290,51 +297,9 @@ public:
   virtual ~CLSRGWConcurrentIO()
   {}
 
-  int operator()() {
-    int ret = 0;
-    iter = objs_container.begin();
-    for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
-      ret = issue_op(iter->first, iter->second);
-      if (ret < 0)
-        break;
-    }
+  int operator()();
+}; // class CLSRGWConcurrentIO
 
-    int num_completions = 0, r = 0;
-    std::map<int, std::string> objs;
-    std::map<int, std::string> *pobjs = (need_multiple_rounds() ? &objs : NULL);
-    while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, pobjs)) {
-      if (r >= 0 && ret >= 0) {
-        for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
-          int issue_ret = issue_op(iter->first, iter->second);
-          if (issue_ret < 0) {
-            ret = issue_ret;
-            break;
-          }
-        }
-      } else if (ret >= 0) {
-        ret = r;
-      }
-      if (need_multiple_rounds() && iter == objs_container.end() && !objs.empty()) {
-        // For those objects which need another round, use them to reset
-        // the container
-        reset_container(objs);
-        iter = objs_container.begin();
-        for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
-          int issue_ret = issue_op(iter->first, iter->second);
-          if (issue_ret < 0) {
-            ret = issue_ret;
-            break;
-          }
-        }
-      }
-    }
-
-    if (ret < 0) {
-      cleanup();
-    }
-    return ret;
-  }
-};
 
 class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
 protected:
@@ -342,8 +307,9 @@ protected:
   int valid_ret_code() override { return -EEXIST; }
   void cleanup() override;
 public:
-  CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, std::map<int, std::string>& _bucket_objs,
-                     uint32_t _max_aio) :
+  CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc,
+                            std::map<int, std::string>& _bucket_objs,
+                            uint32_t _max_aio) :
     CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
 };
 
index f39919d24b60b3fc5b4f8fa073a3560eb9ae0c26..61f06eac982821d74a217775bdd7929f41af3f68 100644 (file)
@@ -6,6 +6,11 @@
 
 #define RGW_CLASS "rgw"
 
+/* Special error code returned by cls bucket list operation if it was
+ * unable to skip past enough not visibile entries to return any
+ * entries in the call. */
+constexpr int RGWBIAdvanceAndRetryError = -EFBIG;
+
 /* bucket index */
 #define RGW_BUCKET_INIT_INDEX "bucket_init_index"