From: Matt Benjamin Date: Tue, 15 Mar 2022 19:36:45 +0000 (-0400) Subject: rgwlc: restore already_run_today guard X-Git-Tag: v18.0.0~1240^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=0c42370d60cca6f2dab2f8ba036bebfc3d2b1b39;p=ceph-ci.git rgwlc: restore already_run_today guard On review, this constraint was correct--it does reliably prevent same-cycle re-runs when a lc threads rendezvous on a bucket. Also, for concurrent (or stale) and already processed buckets, remember to advance head past the corresponding buckets. Signed-off-by: Matt Benjamin --- diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index b5c84a58195..28f70bfe914 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -2110,7 +2110,61 @@ static inline bool allow_shard_rollover(CephContext* cct, time_t now, time_t sha return true; } return false; -} +} /* allow_shard_rollover */ + +static inline bool already_run_today(CephContext* cct, time_t start_date) +{ + struct tm bdt; + time_t begin_of_day; + utime_t now = ceph_clock_now(); + localtime_r(&start_date, &bdt); + + if (cct->_conf->rgw_lc_debug_interval > 0) { + if (now - start_date < cct->_conf->rgw_lc_debug_interval) + return true; + else + return false; + } + + bdt.tm_hour = 0; + bdt.tm_min = 0; + bdt.tm_sec = 0; + begin_of_day = mktime(&bdt); + if (now - begin_of_day < 24*60*60) + return true; + else + return false; +} /* already_run_today */ + +inline int RGWLC::advance_head(const std::string& lc_shard, + rgw::sal::Lifecycle::LCHead& head, + rgw::sal::Lifecycle::LCEntry& entry, + time_t start_date) +{ + int ret{0}; + rgw::sal::Lifecycle::LCEntry next_entry; + + ret = sal_lc->get_next_entry(lc_shard, entry.bucket, next_entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry " + << lc_shard << dendl; + goto exit; + } + + /* save the next position */ + head.marker = next_entry.bucket; + head.start_date = start_date; + + ret = sal_lc->put_head(lc_shard, head); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to put head " + << lc_shard + << dendl; + goto exit; + } +exit: + return ret; +} /* advance head */ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, bool once = false) @@ -2119,7 +2173,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, const auto& lc_shard = obj_names[index]; rgw::sal::Lifecycle::LCHead head{}; - rgw::sal::Lifecycle::LCEntry entry, next_entry; //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS + rgw::sal::Lifecycle::LCEntry entry; //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS ldpp_dout(this, 5) << "RGWLC::process(): ENTER: " << "index: " << index << " worker ix: " << worker->ix @@ -2210,8 +2264,54 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, ldpp_dout(this, 5) << "RGWLC::process(): ACTIVE entry: " << entry << " index: " << index << " worker ix: " << worker->ix << dendl; - goto exit; + /* skip to next entry */ + if (advance_head(lc_shard, head, entry, now) < 0) { + goto exit; + } + /* done with this shard */ + if (head.marker.empty()) { + ldpp_dout(this, 5) << + "RGWLC::process() cycle finished lc_shard=" + << lc_shard + << dendl; + head.shard_rollover_date = ceph_clock_now(); + ret = sal_lc->put_head(lc_shard, head); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to put head " + << lc_shard + << dendl; + } + goto exit; + } + continue; } + } else { + if ((entry.status == lc_complete) && + already_run_today(cct, entry.start_time)) { + /* skip to next entry */ + if (advance_head(lc_shard, head, entry, now) < 0) { + goto exit; + } + ldpp_dout(this, 5) << "RGWLC::process() worker ix; " << worker->ix + << " SKIP processing for already-processed bucket " << entry.bucket + << dendl; + /* done with this shard */ + if (head.marker.empty()) { + ldpp_dout(this, 5) << + "RGWLC::process() cycle finished lc_shard=" + << lc_shard + << dendl; + head.shard_rollover_date = ceph_clock_now(); + ret = sal_lc->put_head(lc_shard, head); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to put head " + << lc_shard + << dendl; + } + goto exit; + } + continue; + } } } else { ldpp_dout(this, 5) << "RGWLC::process() entry.bucket.empty() == true at START 1" @@ -2239,22 +2339,8 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, goto exit; } - ret = sal_lc->get_next_entry(lc_shard, entry.bucket, next_entry); - if (ret < 0) { - ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry " - << lc_shard << dendl; - goto exit; - } - - /* save the next position */ - head.marker = next_entry.bucket; - head.start_date = now; - - ret = sal_lc->put_head(lc_shard, head); - if (ret < 0) { - ldpp_dout(this, 0) << "RGWLC::process() failed to put head " - << lc_shard - << dendl; + /* advance head for next waiter, then process */ + if (advance_head(lc_shard, head, entry, now) < 0) { goto exit; } diff --git a/src/rgw/rgw_lc.h b/src/rgw/rgw_lc.h index fc4db3932c9..79cb7bb855c 100644 --- a/src/rgw/rgw_lc.h +++ b/src/rgw/rgw_lc.h @@ -518,6 +518,10 @@ public: int process(LCWorker* worker, const std::unique_ptr& optional_bucket, bool once); + int advance_head(const std::string& lc_shard, + rgw::sal::Lifecycle::LCHead& head, + rgw::sal::Lifecycle::LCEntry& entry, + time_t start_date); int process(int index, int max_lock_secs, LCWorker* worker, bool once); int process_bucket(int index, int max_lock_secs, LCWorker* worker, const std::string& bucket_entry_marker, bool once);