]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: allow CLSRGWConcurrentIO to handle "advancing" retries
authorJ. Eric Ivancich <ivancich@redhat.com>
Mon, 19 Jul 2021 18:23:42 +0000 (14:23 -0400)
committerJ. Eric Ivancich <ivancich@redhat.com>
Wed, 4 Aug 2021 04:44:06 +0000 (00:44 -0400)
When doing an asynchronous/concurrent bucket index operation against
multiple bucket index shards, a special error code is set aside to
indicate that an "advancing" retry of a/some shard(s) is necessary. In
that case another asynchronous call is made on the indicated shard(s)
from the client (i.e., CLSRGWConcurrentIO).  It is up to the subclass
of CLSRGWConcurrentIO to handle the retry such that it "advances" and
simply doesn't get stuck, looping forever.

The retry functionality only works when the "need_multiple_rounds"
functionality is not in use.

Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/cls/rgw/cls_rgw_const.h

index a93b28bd5deac3f92c33cfd3ccd2c3e3fbb22b10..2f1a9164f33b3d615b8d4ccb063e500793dea707 100644 (file)
@@ -21,6 +21,67 @@ using namespace librados;
 const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#";
 const string BucketIndexShardsManager::SHARDS_SEPARATOR = ",";
 
+
+int CLSRGWConcurrentIO::operator()() {
+  int ret = 0;
+  iter = objs_container.begin();
+  for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
+    ret = issue_op(iter->first, iter->second);
+    if (ret < 0)
+      break;
+  }
+
+  int num_completions = 0, r = 0;
+  std::map<int, std::string> completed_objs;
+  std::map<int, std::string> retry_objs;
+  while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r,
+                                     need_multiple_rounds() ? &completed_objs : nullptr,
+                                     !need_multiple_rounds() ? &retry_objs : nullptr)) {
+    if (r >= 0 && ret >= 0) {
+      for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
+       int issue_ret = issue_op(iter->first, iter->second);
+       if (issue_ret < 0) {
+         ret = issue_ret;
+         break;
+       }
+      }
+    } else if (ret >= 0) {
+      ret = r;
+    }
+
+    // if we're at the end with this round, see if another round is needed
+    if (iter == objs_container.end()) {
+      if (need_multiple_rounds() && !completed_objs.empty()) {
+       // For those objects which need another round, use them to reset
+       // the container
+       reset_container(completed_objs);
+       iter = objs_container.begin();
+      } else if (! need_multiple_rounds() && !retry_objs.empty()) {
+       reset_container(retry_objs);
+       iter = objs_container.begin();
+      }
+
+      // re-issue ops if container was reset above (i.e., iter !=
+      // objs_container.end()); if it was not reset above (i.e., iter
+      // == objs_container.end()) the loop will exit immediately
+      // without iterating
+      for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
+       int issue_ret = issue_op(iter->first, iter->second);
+       if (issue_ret < 0) {
+         ret = issue_ret;
+         break;
+       }
+      }
+    }
+  }
+
+  if (ret < 0) {
+    cleanup();
+  }
+  return ret;
+} // CLSRGWConcurrintIO::operator()()
+
+
 /**
  * This class represents the bucket index object operation callback context.
  */
@@ -33,7 +94,9 @@ public:
   ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { ceph_assert(data); }
   ~ClsBucketIndexOpCtx() override {}
   void handle_completion(int r, bufferlist& outbl) override {
-    if (r >= 0) {
+    // if successful, or we're asked for a retry, copy result into
+    // destination (*data)
+    if (r >= 0 || r == RGWBIAdvanceAndRetryError) {
       try {
         auto iter = outbl.cbegin();
         decode((*data), iter);
@@ -67,7 +130,11 @@ void BucketIndexAioManager::do_completion(const int request_id) {
 }
 
 bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
-    int *num_completions, int *ret_code, map<int, string> *objs) {
+                                                int *num_completions,
+                                                int *ret_code,
+                                                std::map<int, std::string> *completed_objs,
+                                                std::map<int, std::string> *retry_objs)
+{
   std::unique_lock locker{lock};
   if (pendings.empty() && completions.empty()) {
     return false;
@@ -82,18 +149,38 @@ bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
   auto iter = completions.begin();
   for (; iter != completions.end(); ++iter) {
     int r = iter->second->get_return_value();
-    if (objs && r == 0) { /* update list of successfully completed objs */
+
+    // see if we may need to copy completions or retries
+    if (completed_objs || retry_objs) {
       auto liter = completion_objs.find(iter->first);
       if (liter != completion_objs.end()) {
-       (*objs)[liter->second.shard_id] = liter->second.oid;
+       if (completed_objs && r == 0) { /* update list of successfully completed objs */
+         (*completed_objs)[liter->second.shard_id] = liter->second.oid;
+       }
+
+       if (r == RGWBIAdvanceAndRetryError) {
+         r = 0;
+         if (retry_objs) {
+           (*retry_objs)[liter->second.shard_id] = liter->second.oid;
+         }
+       }
+      } else {
+       // NB: should we log an error here; currently no logging
+       // context to use
       }
     }
-    if (ret_code && (r < 0 && r != valid_ret_code))
+
+    if (ret_code && (r < 0 && r != valid_ret_code)) {
       (*ret_code) = r;
+    }
+
     iter->second->release();
   }
-  if (num_completions)
+
+  if (num_completions) {
     (*num_completions) = completions.size();
+  }
+
   completions.clear();
 
   return true;
index bcabd51391ff070d8b4f705523b9b14028b80ba9..44905671c1372400a6442e33a0e977ebac2f064b 100644 (file)
@@ -107,8 +107,11 @@ public:
    *
    * Return false if there is no pending AIO, true otherwise.
    */
-  bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code,
-      std::map<int, std::string> *objs);
+  bool wait_for_completions(int valid_ret_code,
+                           int *num_completions = nullptr,
+                           int *ret_code = nullptr,
+                           std::map<int, std::string> *completed_objs = nullptr,
+                           std::map<int, std::string> *retry_objs = nullptr);
 
   /**
    * Do aio read operation.
@@ -262,8 +265,12 @@ void cls_rgw_bucket_init_index(librados::ObjectWriteOperation& o);
 class CLSRGWConcurrentIO {
 protected:
   librados::IoCtx& io_ctx;
+
+  // map of shard # to oid; the shards that are remaining to be processed
   std::map<int, std::string>& objs_container;
+  // iterator to work through objs_container
   std::map<int, std::string>::iterator iter;
+
   uint32_t max_aio;
   BucketIndexAioManager manager;
 
@@ -289,51 +296,9 @@ public:
   virtual ~CLSRGWConcurrentIO()
   {}
 
-  int operator()() {
-    int ret = 0;
-    iter = objs_container.begin();
-    for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
-      ret = issue_op(iter->first, iter->second);
-      if (ret < 0)
-        break;
-    }
+  int operator()();
+}; // class CLSRGWConcurrentIO
 
-    int num_completions = 0, r = 0;
-    std::map<int, std::string> objs;
-    std::map<int, std::string> *pobjs = (need_multiple_rounds() ? &objs : NULL);
-    while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, pobjs)) {
-      if (r >= 0 && ret >= 0) {
-        for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
-          int issue_ret = issue_op(iter->first, iter->second);
-          if (issue_ret < 0) {
-            ret = issue_ret;
-            break;
-          }
-        }
-      } else if (ret >= 0) {
-        ret = r;
-      }
-      if (need_multiple_rounds() && iter == objs_container.end() && !objs.empty()) {
-        // For those objects which need another round, use them to reset
-        // the container
-        reset_container(objs);
-        iter = objs_container.begin();
-        for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
-          int issue_ret = issue_op(iter->first, iter->second);
-          if (issue_ret < 0) {
-            ret = issue_ret;
-            break;
-          }
-        }
-      }
-    }
-
-    if (ret < 0) {
-      cleanup();
-    }
-    return ret;
-  }
-};
 
 class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
 protected:
@@ -341,8 +306,9 @@ protected:
   int valid_ret_code() override { return -EEXIST; }
   void cleanup() override;
 public:
-  CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, std::map<int, std::string>& _bucket_objs,
-                     uint32_t _max_aio) :
+  CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc,
+                            std::map<int, std::string>& _bucket_objs,
+                            uint32_t _max_aio) :
     CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
 };
 
index 63f7517334488cc2e1d8f2b717f87f75a2a15e08..6015872f8be811f6f891dc701610f950fdbecc96 100644 (file)
@@ -5,6 +5,11 @@
 
 #define RGW_CLASS "rgw"
 
+/* Special error code returned by cls bucket list operation if it was
+ * unable to skip past enough not visibile entries to return any
+ * entries in the call. */
+constexpr int RGWBIAdvanceAndRetryError = -EFBIG;
+
 /* bucket index */
 #define RGW_BUCKET_INIT_INDEX "bucket_init_index"