]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: allow CLSRGWConcurrentIO to handle "advancing" retries
authorJ. Eric Ivancich <ivancich@redhat.com>
Mon, 19 Jul 2021 18:23:42 +0000 (14:23 -0400)
committerDan van der Ster <daniel.vanderster@cern.ch>
Fri, 18 Feb 2022 21:26:59 +0000 (22:26 +0100)
When doing an asynchronous/concurrent bucket index operation against
multiple bucket index shards, a special error code is set aside to
indicate that an "advancing" retry of a/some shard(s) is necessary. In
that case another asynchronous call is made on the indicated shard(s)
from the client (i.e., CLSRGWConcurrentIO).  It is up to the subclass
of CLSRGWConcurrentIO to handle the retry such that it "advances" and
simply doesn't get stuck, looping forever.

The retry functionality only works when the "need_multiple_rounds"
functionality is not in use.

Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
(cherry picked from commit 5d283074750dc6bd458877bd42921037b5bb7f4b)

Conflicts:
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h

Resolved by taking the patch version -- all cases of auto type and std::

src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/cls/rgw/cls_rgw_const.h

index 414ecbad345eaca306452fa8c91a8917ddf4c46b..380afee40c9f2a2832395fc218208eee968c5b95 100644 (file)
@@ -13,6 +13,67 @@ using namespace librados;
 const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#";
 const string BucketIndexShardsManager::SHARDS_SEPARATOR = ",";
 
+
+int CLSRGWConcurrentIO::operator()() {
+  int ret = 0;
+  iter = objs_container.begin();
+  for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
+    ret = issue_op(iter->first, iter->second);
+    if (ret < 0)
+      break;
+  }
+
+  int num_completions = 0, r = 0;
+  std::map<int, std::string> completed_objs;
+  std::map<int, std::string> retry_objs;
+  while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r,
+                                     need_multiple_rounds() ? &completed_objs : nullptr,
+                                     !need_multiple_rounds() ? &retry_objs : nullptr)) {
+    if (r >= 0 && ret >= 0) {
+      for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
+       int issue_ret = issue_op(iter->first, iter->second);
+       if (issue_ret < 0) {
+         ret = issue_ret;
+         break;
+       }
+      }
+    } else if (ret >= 0) {
+      ret = r;
+    }
+
+    // if we're at the end with this round, see if another round is needed
+    if (iter == objs_container.end()) {
+      if (need_multiple_rounds() && !completed_objs.empty()) {
+       // For those objects which need another round, use them to reset
+       // the container
+       reset_container(completed_objs);
+       iter = objs_container.begin();
+      } else if (! need_multiple_rounds() && !retry_objs.empty()) {
+       reset_container(retry_objs);
+       iter = objs_container.begin();
+      }
+
+      // re-issue ops if container was reset above (i.e., iter !=
+      // objs_container.end()); if it was not reset above (i.e., iter
+      // == objs_container.end()) the loop will exit immediately
+      // without iterating
+      for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
+       int issue_ret = issue_op(iter->first, iter->second);
+       if (issue_ret < 0) {
+         ret = issue_ret;
+         break;
+       }
+      }
+    }
+  }
+
+  if (ret < 0) {
+    cleanup();
+  }
+  return ret;
+} // CLSRGWConcurrintIO::operator()()
+
+
 /**
  * This class represents the bucket index object operation callback context.
  */
@@ -25,7 +86,9 @@ public:
   ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { ceph_assert(data); }
   ~ClsBucketIndexOpCtx() override {}
   void handle_completion(int r, bufferlist& outbl) override {
-    if (r >= 0) {
+    // if successful, or we're asked for a retry, copy result into
+    // destination (*data)
+    if (r >= 0 || r == RGWBIAdvanceAndRetryError) {
       try {
         auto iter = outbl.cbegin();
         decode((*data), iter);
@@ -59,7 +122,11 @@ void BucketIndexAioManager::do_completion(const int request_id) {
 }
 
 bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
-    int *num_completions, int *ret_code, map<int, string> *objs) {
+                                                int *num_completions,
+                                                int *ret_code,
+                                                std::map<int, std::string> *completed_objs,
+                                                std::map<int, std::string> *retry_objs)
+{
   std::unique_lock locker{lock};
   if (pendings.empty() && completions.empty()) {
     return false;
@@ -74,18 +141,38 @@ bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
   map<int, librados::AioCompletion*>::iterator iter = completions.begin();
   for (; iter != completions.end(); ++iter) {
     int r = iter->second->get_return_value();
-    if (objs && r == 0) { /* update list of successfully completed objs */
-      map<int, string>::iterator liter = completion_objs.find(iter->first);
+
+    // see if we may need to copy completions or retries
+    if (completed_objs || retry_objs) {
+      auto liter = completion_objs.find(iter->first);
       if (liter != completion_objs.end()) {
-       (*objs)[liter->second.shard_id] = liter->second.oid;
+       if (completed_objs && r == 0) { /* update list of successfully completed objs */
+         (*completed_objs)[liter->second.shard_id] = liter->second.oid;
+       }
+
+       if (r == RGWBIAdvanceAndRetryError) {
+         r = 0;
+         if (retry_objs) {
+           (*retry_objs)[liter->second.shard_id] = liter->second.oid;
+         }
+       }
+      } else {
+       // NB: should we log an error here; currently no logging
+       // context to use
       }
     }
-    if (ret_code && (r < 0 && r != valid_ret_code))
+
+    if (ret_code && (r < 0 && r != valid_ret_code)) {
       (*ret_code) = r;
+    }
+
     iter->second->release();
   }
-  if (num_completions)
+
+  if (num_completions) {
     (*num_completions) = completions.size();
+  }
+
   completions.clear();
 
   return true;
index 43225b9dbe61be5ca20309a7712b6bade9153685..cfaab7c389d4262c39b1426200dd51d9f1e435e9 100644 (file)
@@ -108,8 +108,11 @@ public:
    *
    * Return false if there is no pending AIO, true otherwise.
    */
-  bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code,
-      map<int, string> *objs);
+  bool wait_for_completions(int valid_ret_code,
+                           int *num_completions = nullptr,
+                           int *ret_code = nullptr,
+                           std::map<int, std::string> *completed_objs = nullptr,
+                           std::map<int, std::string> *retry_objs = nullptr);
 
   /**
    * Do aio read operation.
@@ -261,8 +264,12 @@ void cls_rgw_bucket_init_index(librados::ObjectWriteOperation& o);
 class CLSRGWConcurrentIO {
 protected:
   librados::IoCtx& io_ctx;
-  map<int, string>& objs_container;
-  map<int, string>::iterator iter;
+
+  // map of shard # to oid; the shards that are remaining to be processed
+  std::map<int, std::string>& objs_container;
+  // iterator to work through objs_container
+  std::map<int, std::string>::iterator iter;
+
   uint32_t max_aio;
   BucketIndexAioManager manager;
 
@@ -288,51 +295,9 @@ public:
   virtual ~CLSRGWConcurrentIO()
   {}
 
-  int operator()() {
-    int ret = 0;
-    iter = objs_container.begin();
-    for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
-      ret = issue_op(iter->first, iter->second);
-      if (ret < 0)
-        break;
-    }
+  int operator()();
+}; // class CLSRGWConcurrentIO
 
-    int num_completions = 0, r = 0;
-    map<int, string> objs;
-    map<int, string> *pobjs = (need_multiple_rounds() ? &objs : NULL);
-    while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, pobjs)) {
-      if (r >= 0 && ret >= 0) {
-        for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
-          int issue_ret = issue_op(iter->first, iter->second);
-          if (issue_ret < 0) {
-            ret = issue_ret;
-            break;
-          }
-        }
-      } else if (ret >= 0) {
-        ret = r;
-      }
-      if (need_multiple_rounds() && iter == objs_container.end() && !objs.empty()) {
-        // For those objects which need another round, use them to reset
-        // the container
-        reset_container(objs);
-        iter = objs_container.begin();
-        for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
-          int issue_ret = issue_op(iter->first, iter->second);
-          if (issue_ret < 0) {
-            ret = issue_ret;
-            break;
-          }
-        }
-      }
-    }
-
-    if (ret < 0) {
-      cleanup();
-    }
-    return ret;
-  }
-};
 
 class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
 protected:
@@ -340,8 +305,9 @@ protected:
   int valid_ret_code() override { return -EEXIST; }
   void cleanup() override;
 public:
-  CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
-                     uint32_t _max_aio) :
+  CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc,
+                            std::map<int, std::string>& _bucket_objs,
+                            uint32_t _max_aio) :
     CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
 };
 
index 5957d2ffbfd7c8a55f3efdd6241b4e1d2c588c08..ad0b9e709612d8ea8305fca8a3bd4600b1b429d8 100644 (file)
@@ -6,6 +6,11 @@
 
 #define RGW_CLASS "rgw"
 
+/* Special error code returned by cls bucket list operation if it was
+ * unable to skip past enough not visibile entries to return any
+ * entries in the call. */
+constexpr int RGWBIAdvanceAndRetryError = -EFBIG;
+
 /* bucket index */
 #define RGW_BUCKET_INIT_INDEX "bucket_init_index"