]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgwlc: restore already_run_today guard
authorMatt Benjamin <mbenjamin@redhat.com>
Tue, 15 Mar 2022 19:36:45 +0000 (15:36 -0400)
committerMatt Benjamin <mbenjamin@redhat.com>
Thu, 17 Mar 2022 15:08:12 +0000 (11:08 -0400)
On review, this constraint was correct--it does reliably prevent
same-cycle re-runs when a lc threads rendezvous on a bucket.

Also, for concurrent (or stale) and already processed buckets,
remember to advance head past the corresponding buckets.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
src/rgw/rgw_lc.cc
src/rgw/rgw_lc.h

index b5c84a58195ee222c018d6b4932a15254cff4ded..28f70bfe914f8229125985fb84b75f98c68fb97e 100644 (file)
@@ -2110,7 +2110,61 @@ static inline bool allow_shard_rollover(CephContext* cct, time_t now, time_t sha
     return true;
   }
   return false;
-}
+} /* allow_shard_rollover */
+
+static inline bool already_run_today(CephContext* cct, time_t start_date)
+{
+  struct tm bdt;
+  time_t begin_of_day;
+  utime_t now = ceph_clock_now();
+  localtime_r(&start_date, &bdt);
+
+  if (cct->_conf->rgw_lc_debug_interval > 0) {
+    if (now - start_date < cct->_conf->rgw_lc_debug_interval)
+      return true;
+    else
+      return false;
+  }
+
+  bdt.tm_hour = 0;
+  bdt.tm_min = 0;
+  bdt.tm_sec = 0;
+  begin_of_day = mktime(&bdt);
+  if (now - begin_of_day < 24*60*60)
+    return true;
+  else
+    return false;
+} /* already_run_today */
+
+inline int RGWLC::advance_head(const std::string& lc_shard,
+                              rgw::sal::Lifecycle::LCHead& head,
+                              rgw::sal::Lifecycle::LCEntry& entry,
+                              time_t start_date)
+{
+  int ret{0};
+  rgw::sal::Lifecycle::LCEntry next_entry;
+
+  ret = sal_lc->get_next_entry(lc_shard, entry.bucket, next_entry);
+  if (ret < 0) {
+    ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
+                      << lc_shard << dendl;
+    goto exit;
+  }
+
+  /* save the next position */
+  head.marker = next_entry.bucket;
+  head.start_date = start_date;
+
+  ret = sal_lc->put_head(lc_shard, head);
+  if (ret < 0) {
+    ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
+                      << lc_shard
+                      << dendl;
+    goto exit;
+  }
+exit:
+  return ret;
+} /* advance head */
 
 int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
                   bool once = false)
@@ -2119,7 +2173,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
   const auto& lc_shard = obj_names[index];
 
   rgw::sal::Lifecycle::LCHead head{};
-  rgw::sal::Lifecycle::LCEntry entry, next_entry; //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
+  rgw::sal::Lifecycle::LCEntry entry; //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
 
   ldpp_dout(this, 5) << "RGWLC::process(): ENTER: "
          << "index: " << index << " worker ix: " << worker->ix
@@ -2210,8 +2264,54 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
           ldpp_dout(this, 5)
               << "RGWLC::process(): ACTIVE entry: " << entry
               << " index: " << index << " worker ix: " << worker->ix << dendl;
-          goto exit;
+         /* skip to next entry */
+         if (advance_head(lc_shard, head, entry, now) < 0) {
+           goto exit;
+         }
+             /* done with this shard */
+         if (head.marker.empty()) {
+           ldpp_dout(this, 5) <<
+             "RGWLC::process() cycle finished lc_shard="
+                              << lc_shard
+                              << dendl;
+           head.shard_rollover_date = ceph_clock_now();
+           ret = sal_lc->put_head(lc_shard,  head);
+           if (ret < 0) {
+             ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
+                                << lc_shard
+                                << dendl;
+           }
+           goto exit;
+         }
+          continue;
         }
+      } else {
+       if ((entry.status == lc_complete) &&
+           already_run_today(cct, entry.start_time)) {
+         /* skip to next entry */
+         if (advance_head(lc_shard, head, entry, now) < 0) {
+           goto exit;
+         }
+         ldpp_dout(this, 5) << "RGWLC::process() worker ix; " << worker->ix
+                            << " SKIP processing for already-processed bucket " << entry.bucket
+                            << dendl;
+         /* done with this shard */
+         if (head.marker.empty()) {
+           ldpp_dout(this, 5) <<
+             "RGWLC::process() cycle finished lc_shard="
+                              << lc_shard
+                              << dendl;
+           head.shard_rollover_date = ceph_clock_now();
+           ret = sal_lc->put_head(lc_shard,  head);
+           if (ret < 0) {
+             ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
+                                << lc_shard
+                                << dendl;
+           }
+           goto exit;
+         }
+         continue;
+       }
       }
     } else {
       ldpp_dout(this, 5) << "RGWLC::process() entry.bucket.empty() == true at START 1"
@@ -2239,22 +2339,8 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
       goto exit;
     }
 
-    ret = sal_lc->get_next_entry(lc_shard, entry.bucket, next_entry);
-    if (ret < 0) {
-      ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
-          << lc_shard << dendl;
-      goto exit;
-    }
-
-    /* save the next position */
-    head.marker = next_entry.bucket;
-    head.start_date = now;
-
-    ret = sal_lc->put_head(lc_shard,  head);
-    if (ret < 0) {
-      ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
-                        << lc_shard
-             << dendl;
+    /* advance head for next waiter, then process */
+    if (advance_head(lc_shard, head, entry, now) < 0) {
       goto exit;
     }
 
index fc4db3932c90ed0d1f3cb6e11830e1755ceaa2da..79cb7bb855ca35377ea36286691f886a3a93371f 100644 (file)
@@ -518,6 +518,10 @@ public:
   int process(LCWorker* worker,
              const std::unique_ptr<rgw::sal::Bucket>& optional_bucket,
              bool once);
+  int advance_head(const std::string& lc_shard,
+                  rgw::sal::Lifecycle::LCHead& head,
+                  rgw::sal::Lifecycle::LCEntry& entry,
+                  time_t start_date);
   int process(int index, int max_lock_secs, LCWorker* worker, bool once);
   int process_bucket(int index, int max_lock_secs, LCWorker* worker,
                     const std::string& bucket_entry_marker, bool once);