]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgwlc: remove bucket_lc_prepare, add backoff
authorMatt Benjamin <mbenjamin@redhat.com>
Thu, 17 Feb 2022 15:55:14 +0000 (10:55 -0500)
committerMatt Benjamin <mbenjamin@redhat.com>
Thu, 17 Mar 2022 15:07:24 +0000 (11:07 -0400)
Remove now-unused RGWLC::bucket_lc_prepare.  Wrap serializer calls
in RGWLC::process(int index...) with simple backoff, limited to 5
retries.

In RGWLC::process(int index...), also open-coded the behavior of
RGWLC::bucket_lc_prepare(...), as the lock sharing between these
methods is error prone.  For now, that method exists, so that it can
be called from the single-bucket process.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
src/rgw/rgw_lc.cc
src/rgw/rgw_lc.h

index b7805e65f47d9e67ac46e4e03ecd2f2e7fc37f6d..022321f4c690d288cde0ab880db460c0f2b972e2 100644 (file)
@@ -14,6 +14,7 @@
 #include <boost/variant.hpp>
 
 #include "include/scope_guard.h"
+#include "include/function2.hpp"
 #include "common/Formatter.h"
 #include "common/containers.h"
 #include <common/errno.h>
@@ -284,41 +285,6 @@ static inline std::ostream& operator<<(std::ostream &os, rgw::sal::Lifecycle::LC
     return os;
 }
 
-int RGWLC::bucket_lc_prepare(int index, LCWorker* worker)
-{
-  vector<rgw::sal::Lifecycle::LCEntry> entries;
-  string marker;
-
-  ldpp_dout(this, 5) << "RGWLC::bucket_lc_prepare(): PREPARE "
-         << "index: " << index << " worker ix: " << worker->ix
-         << dendl;
-
-#define MAX_LC_LIST_ENTRIES 100
-  do {
-    int ret = sal_lc->list_entries(obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
-    if (ret < 0)
-      return ret;
-
-    for (auto& entry : entries) {
-      entry.start_time = ceph_clock_now();
-      entry.status = lc_uninitial; // lc_uninitial? really?
-      ret = sal_lc->set_entry(obj_names[index], entry);
-      if (ret < 0) {
-        ldpp_dout(this, 0)
-         << "RGWLC::bucket_lc_prepare() failed to set entry on "
-         << obj_names[index] << dendl;
-        return ret;
-      }
-    }
-
-    if (! entries.empty()) {
-      marker = std::move(entries.back().bucket);
-    }
-  } while (!entries.empty());
-
-  return 0;
-}
-
 static bool obj_has_expired(const DoutPrefixProvider *dpp, CephContext *cct, ceph::real_time mtime, int days,
                            ceph::real_time *expire_time = nullptr)
 {
@@ -1825,6 +1791,41 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
   return ret;
 }
 
+class SimpleBackoff
+{
+  const int max_retries;
+  std::chrono::milliseconds sleep_ms;
+  int retries{0};
+public:
+  SimpleBackoff(int max_retries, std::chrono::milliseconds initial_sleep_ms)
+    : max_retries(max_retries), sleep_ms(initial_sleep_ms)
+    {}
+  SimpleBackoff(const SimpleBackoff&) = delete;
+  SimpleBackoff& operator=(const SimpleBackoff&) = delete;
+
+  int get_retries() const {
+    return retries;
+  }
+
+  void reset() {
+    retries = 0;
+  }
+
+  bool wait_backoff(const fu2::unique_function<bool(void) const>& barrier) {
+    reset();
+    while (retries < max_retries) {
+      auto r = barrier();
+      if (r) {
+       return r;
+      }
+      std::this_thread::sleep_for(sleep_ms);
+      sleep_ms = std::chrono::milliseconds(sleep_ms*2*retries);
+      ++retries;
+    }
+    return false;
+  }
+};
+
 int RGWLC::bucket_lc_post(int index, int max_lock_sec,
                          rgw::sal::Lifecycle::LCEntry& entry, int& result,
                          LCWorker* worker)
@@ -1853,6 +1854,7 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec,
       return 0;
     ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index]
                        << dendl;
+
     if (result ==  -ENOENT) {
       /* XXXX are we SURE the only way result could == ENOENT is when
        * there is no such bucket?  It is currently the value returned
@@ -1881,7 +1883,7 @@ clean:
                        << obj_names[index] << dendl;
     return 0;
   } while (true);
-}
+} /* RGWLC::bucket_lc_post */
 
 int RGWLC::list_lc_progress(string& marker, uint32_t max_entries,
                            vector<rgw::sal::Lifecycle::LCEntry>& progress_map,
@@ -2111,21 +2113,29 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
   rgw::sal::LCSerializer* lock =
     sal_lc->get_serializer(lc_index_lock_name, lc_shard, std::string());
 
-  do {
-    utime_t now = ceph_clock_now();
-    utime_t lock_for_s(max_lock_secs, 0);
-
+  utime_t lock_for_s(max_lock_secs, 0);
+  const auto& lock_lambda = [&]() {
     ret = lock->try_lock(this, lock_for_s, null_yield);
+    if (ret == 0) {
+      return true;
+    }
     if (ret == -EBUSY || ret == -EEXIST) {
       /* already locked by another lc processor */
-      ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
-          << lc_shard << ", sleep 5, try again" << dendl;
-      sleep(5);
-      continue; // XXXX really retry forever?
-    }
-    if (ret < 0) {
-      return 0;
-    }
+      return false;
+      }
+    return false;
+  };
+
+  SimpleBackoff shard_lock(5 /* max retries */, 50ms);
+  if (! shard_lock.wait_backoff(lock_lambda)) {
+    ldpp_dout(this, 0) << "RGWLC::process(): failed to aquire lock on "
+                      << lc_shard << " after " << shard_lock.get_retries()
+                      << dendl;
+    goto notlocked;
+  }
+
+  do {
+    utime_t now = ceph_clock_now();
 
     /* preamble: find an inital bucket/marker */
     ret = sal_lc->get_head(lc_shard, head);
@@ -2136,7 +2146,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
     }
 
     /* if there is nothing at head, try to reinitialize head.marker with the
-     * entry in the queue */
+     * first entry in the queue */
     if (head.marker.empty()) {
       vector<rgw::sal::Lifecycle::LCEntry> entries;
       int ret = sal_lc->list_entries(lc_shard, head.marker, 1, entries);
@@ -2225,9 +2235,43 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
            << " index: " << index << " worker ix: " << worker->ix
            << dendl;
 
+    /* drop lock so other instances can make progress while this
+     * bucket is being processed */
     lock->unlock();
     ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once);
-    bucket_lc_post(index, max_lock_secs, entry, ret, worker);
+
+    /* postamble */
+    //bucket_lc_post(index, max_lock_secs, entry, ret, worker);
+    if (! shard_lock.wait_backoff(lock_lambda)) {
+      ldpp_dout(this, 0) << "RGWLC::process(): failed to aquire lock on "
+                        << lc_shard << " after " << shard_lock.get_retries()
+                        << dendl;
+      goto notlocked;
+    }
+
+    if (ret == -ENOENT) {
+      /* XXXX are we SURE the only way result could == ENOENT is when
+       * there is no such bucket?  It is currently the value returned
+       * from bucket_lc_process(...) */
+      ret = sal_lc->rm_entry(lc_shard,  entry);
+      if (ret < 0) {
+        ldpp_dout(this, 0) << "RGWLC::process() failed to remove entry "
+                          << lc_shard << " (nonfatal)"
+                          << dendl;
+       /* not fatal, could result from a race */
+      }
+    } else if (ret < 0) {
+      entry.status = lc_failed;
+    } else {
+      entry.status = lc_complete;
+    }
+    ret = sal_lc->set_entry(lc_shard,  entry);
+    if (ret < 0) {
+      ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on "
+          << lc_shard << dendl;
+      /* fatal, locked */
+      goto exit;
+    }
 
     /* done with this shard */
     if (head.marker.empty()) {
@@ -2245,6 +2289,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
     }
   } while(1 && !once && !going_down());
 
+notlocked:
   delete lock;
   return 0;
 
index 87d44ac69a64596d47beee48aed24772b60d8f89..fc4db3932c90ed0d1f3cb6e11830e1755ceaa2da 100644 (file)
@@ -525,7 +525,6 @@ public:
   time_t thread_stop_at();
   int list_lc_progress(std::string& marker, uint32_t max_entries,
                       std::vector<rgw::sal::Lifecycle::LCEntry>&, int& index);
-  int bucket_lc_prepare(int index, LCWorker* worker);
   int bucket_lc_process(std::string& shard_id, LCWorker* worker, time_t stop_at,
                        bool once);
   int bucket_lc_post(int index, int max_lock_sec,