]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgwlc: remove explicit lc shard resets at start-of-run
authorMatt Benjamin <mbenjamin@redhat.com>
Mon, 14 Feb 2022 21:39:27 +0000 (16:39 -0500)
committerMatt Benjamin <mbenjamin@redhat.com>
Thu, 17 Mar 2022 15:07:15 +0000 (11:07 -0400)
This is an alternative solution to the (newly exposed) lifecycle
shard starvation problem reported by Jeegen Chen.

There was always an starvation condition implied by the
reset of lc shard head at the start of processing.  The introduction
of "stale sessions" in parallel lifecycle changes made it more
visible, in particular when rgw_lc_debug_interval was set to a small
value and many buckets had lifecycle policy.

My hypothesis in this change is that lifecycle processing for each
lc shard should /always/ continue through the full set of eligible
buckets for the shard, regardless of how many processing cycles might
be required to do so.  In general, restarting at the first eligible
bucket on each reschedule invites starvation when processing "gets
behind", so just avoid it.

Fixes: https://tracker.ceph.com/issues/49446
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
(cherry picked from commit 6e2ae13adced6b3dbb2fe16b547a30e9d68dfa06)

rgwlc: add a wraparound to continued shard processing

If the full set of buckets for a given lc shard couldn't be
processed in the prior cycle, processing will start with a
non-empty marker.  Note the initial marker position, then
when the end of shard is reached, allow processing to wrap
around to the logical beginning of the shard and proceeding
through the initial marker.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
Please enter the commit message for your changes. Lines starting
(cherry picked from commit 0b8f683d3cf444cc68fd30c3f179b9aa0ea08e7c)

don't report clearing incorrectly

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
src/rgw/rgw_lc.cc
src/rgw/rgw_lc.h

index fc98082fabd76b1c94b27783ecbfca84c92aa886..b7805e65f47d9e67ac46e4e03ecd2f2e7fc37f6d 100644 (file)
@@ -273,30 +273,6 @@ void RGWLC::finalize()
   delete[] obj_names;
 }
 
-bool RGWLC::if_already_run_today(time_t start_date)
-{
-  struct tm bdt;
-  time_t begin_of_day;
-  utime_t now = ceph_clock_now();
-  localtime_r(&start_date, &bdt);
-
-  if (cct->_conf->rgw_lc_debug_interval > 0) {
-    if (now - start_date < cct->_conf->rgw_lc_debug_interval)
-      return true;
-    else
-      return false;
-  }
-
-  bdt.tm_hour = 0;
-  bdt.tm_min = 0;
-  bdt.tm_sec = 0;
-  begin_of_day = mktime(&bdt);
-  if (now - begin_of_day < 24*60*60)
-    return true;
-  else
-    return false;
-}
-
 static inline std::ostream& operator<<(std::ostream &os, rgw::sal::Lifecycle::LCEntry& ent) {
   os << "<ent: bucket=";
   os << ent.bucket;
@@ -2122,100 +2098,125 @@ int RGWLC::process_bucket(int index, int max_lock_secs, LCWorker* worker,
 int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
                   bool once = false)
 {
+  int ret{0};
+  const auto& lc_shard = obj_names[index];
+
+  rgw::sal::Lifecycle::LCHead head{};
+  rgw::sal::Lifecycle::LCEntry entry, next_entry; //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
+
   ldpp_dout(this, 5) << "RGWLC::process(): ENTER: "
          << "index: " << index << " worker ix: " << worker->ix
          << dendl;
 
-  int ret = 0;
-  rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name,
-                                                       obj_names[index],
-                                                       std::string());
+  rgw::sal::LCSerializer* lock =
+    sal_lc->get_serializer(lc_index_lock_name, lc_shard, std::string());
+
   do {
     utime_t now = ceph_clock_now();
-    //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
-    rgw::sal::Lifecycle::LCEntry entry;
-    if (max_lock_secs <= 0)
-      return -EAGAIN;
+    utime_t lock_for_s(max_lock_secs, 0);
 
-    utime_t time(max_lock_secs, 0);
-    ret = lock->try_lock(this, time, null_yield);
+    ret = lock->try_lock(this, lock_for_s, null_yield);
     if (ret == -EBUSY || ret == -EEXIST) {
       /* already locked by another lc processor */
       ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
-          << obj_names[index] << ", sleep 5, try again" << dendl;
+          << lc_shard << ", sleep 5, try again" << dendl;
       sleep(5);
       continue; // XXXX really retry forever?
     }
-    if (ret < 0)
+    if (ret < 0) {
       return 0;
+    }
 
-    rgw::sal::Lifecycle::LCHead head;
-    ret = sal_lc->get_head(obj_names[index], head);
+    /* preamble: find an inital bucket/marker */
+    ret = sal_lc->get_head(lc_shard, head);
     if (ret < 0) {
       ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head "
-          << obj_names[index] << ", ret=" << ret << dendl;
+          << lc_shard << ", ret=" << ret << dendl;
       goto exit;
     }
 
-    ret = sal_lc->get_entry(obj_names[index], head.marker, entry);
-    if (ret >= 0) {
-      if (entry.status == lc_processing) {
-       if (expired_session(entry.start_time)) {
-         ldpp_dout(this, 5) << "RGWLC::process(): STALE lc session found for: " << entry
-                            << " index: " << index << " worker ix: " << worker->ix
-                            << " (clearing)"
-                            << dendl;
-       } else {
-         ldpp_dout(this, 5) << "RGWLC::process(): ACTIVE entry: " << entry
-                            << " index: " << index << " worker ix: " << worker->ix
-                            << dendl;
-         goto exit;
-       }
+    /* if there is nothing at head, try to reinitialize head.marker with the
+     * entry in the queue */
+    if (head.marker.empty()) {
+      vector<rgw::sal::Lifecycle::LCEntry> entries;
+      int ret = sal_lc->list_entries(lc_shard, head.marker, 1, entries);
+      if (ret < 0) {
+       ldpp_dout(this, 0) << "RGWLC::process() sal_lc->list_entries(lc_shard, head.marker, 1, "
+                          << "entries) returned error ret==" << ret << dendl;
+       goto exit;
       }
-    }
-
-    if(!if_already_run_today(head.start_date) ||
-       once) {
+      entry = entries.front();
+      head.marker = entry.bucket;
       head.start_date = now;
-      head.marker.clear();
-      ret = bucket_lc_prepare(index, worker);
-      if (ret < 0) {
-      ldpp_dout(this, 0) << "RGWLC::process() failed to update lc object "
-                        << obj_names[index]
-                        << ", ret=" << ret
+    } else {
+      ldpp_dout(this, 0) << "RGWLC::process() head.marker !empty() at START for shard=="
+                        << lc_shard << " head last stored at "
+                        << rgw_to_asctime(utime_t(time_t(head.start_date), 0))
                         << dendl;
-      goto exit;
+
+      /* fetches the entry pointed to by head.bucket */
+      ret = sal_lc->get_entry(lc_shard, head.marker, entry);
+      if (ret < 0) {
+       ldpp_dout(this, 0) << "RGWLC::process() sal_lc->get_entry(lc_shard, head.marker, entry) "
+                          << "returned error ret==" << ret << dendl;
+       goto exit;
       }
     }
 
-    ret = sal_lc->get_next_entry(obj_names[index], head.marker, entry);
-    if (ret < 0) {
-      ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
-          << obj_names[index] << dendl;
+    if (! entry.bucket.empty()) {
+      if (entry.status == lc_processing) {
+        if (expired_session(entry.start_time)) {
+          ldpp_dout(this, 5)
+              << "RGWLC::process(): STALE lc session found for: " << entry
+              << " index: " << index << " worker ix: " << worker->ix
+              << " (clearing)" << dendl;
+        } else {
+          ldpp_dout(this, 5)
+              << "RGWLC::process(): ACTIVE entry: " << entry
+              << " index: " << index << " worker ix: " << worker->ix << dendl;
+          goto exit;
+        }
+      }
+    } else {
+      ldpp_dout(this, 0) << "RGWLC::process() entry.bucket.empty() == true at START 1"
+                        << " (this is impossible, but stop now)"
+                         << dendl;
       goto exit;
     }
 
-    /* termination condition (eof) */
-    if (entry.bucket.empty())
-      goto exit;
-
+    /* When there are no more entries to process, entry will be
+     * equivalent to an empty marker and so the following resets the
+     * processing for the shard automatically when processing is
+     * finished for the shard */
     ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry
            << " index: " << index << " worker ix: " << worker->ix
            << dendl;
 
     entry.status = lc_processing;
-    ret = sal_lc->set_entry(obj_names[index], entry);
+    entry.start_time = now;
+
+    ret = sal_lc->set_entry(lc_shard, entry);
     if (ret < 0) {
       ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry "
-             << obj_names[index] << entry.bucket << entry.status << dendl;
+             << lc_shard << entry.bucket << entry.status << dendl;
       goto exit;
     }
 
-    head.marker = entry.bucket;
-    ret = sal_lc->put_head(obj_names[index],  head);
+    ret = sal_lc->get_next_entry(lc_shard, entry.bucket, next_entry);
+    if (ret < 0) {
+      ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
+          << lc_shard << dendl;
+      goto exit;
+    }
+
+    /* save the next position */
+    head.marker = next_entry.bucket;
+    head.start_date = now;
+
+    ret = sal_lc->put_head(lc_shard,  head);
     if (ret < 0) {
       ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
-                        << obj_names[index]
+                        << lc_shard
              << dendl;
       goto exit;
     }
@@ -2227,7 +2228,22 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
     lock->unlock();
     ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once);
     bucket_lc_post(index, max_lock_secs, entry, ret, worker);
-  } while(1 && !once);
+
+    /* done with this shard */
+    if (head.marker.empty()) {
+      ldpp_dout(this, 5) <<
+       "RGWLC::process() cycle finished lc_shard="
+                        << lc_shard
+                        << dendl;
+      ret = sal_lc->put_head(lc_shard,  head);
+      if (ret < 0) {
+       ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
+                          << lc_shard
+                          << dendl;
+      }
+      goto exit;
+    }
+  } while(1 && !once && !going_down());
 
   delete lock;
   return 0;
index f79dd23ae7c565c3a16c04d56f4cffdacfc129e2..87d44ac69a64596d47beee48aed24772b60d8f89 100644 (file)
@@ -521,7 +521,6 @@ public:
   int process(int index, int max_lock_secs, LCWorker* worker, bool once);
   int process_bucket(int index, int max_lock_secs, LCWorker* worker,
                     const std::string& bucket_entry_marker, bool once);
-  bool if_already_run_today(time_t start_date);
   bool expired_session(time_t started);
   time_t thread_stop_at();
   int list_lc_progress(std::string& marker, uint32_t max_entries,