return true;
}
return false;
-}
+} /* allow_shard_rollover */
+
+static inline bool already_run_today(CephContext* cct, time_t start_date)
+{
+ struct tm bdt;
+ time_t begin_of_day;
+ utime_t now = ceph_clock_now();
+ localtime_r(&start_date, &bdt);
+
+ if (cct->_conf->rgw_lc_debug_interval > 0) {
+ if (now - start_date < cct->_conf->rgw_lc_debug_interval)
+ return true;
+ else
+ return false;
+ }
+
+ bdt.tm_hour = 0;
+ bdt.tm_min = 0;
+ bdt.tm_sec = 0;
+ begin_of_day = mktime(&bdt);
+ if (now - begin_of_day < 24*60*60)
+ return true;
+ else
+ return false;
+} /* already_run_today */
+
+inline int RGWLC::advance_head(const std::string& lc_shard,
+ rgw::sal::Lifecycle::LCHead& head,
+ rgw::sal::Lifecycle::LCEntry& entry,
+ time_t start_date)
+{
+ int ret{0};
+ rgw::sal::Lifecycle::LCEntry next_entry;
+
+ ret = sal_lc->get_next_entry(lc_shard, entry.bucket, next_entry);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
+ << lc_shard << dendl;
+ goto exit;
+ }
+
+ /* save the next position */
+ head.marker = next_entry.bucket;
+ head.start_date = start_date;
+
+ ret = sal_lc->put_head(lc_shard, head);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
+ << lc_shard
+ << dendl;
+ goto exit;
+ }
+exit:
+ return ret;
+} /* advance head */
int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
bool once = false)
const auto& lc_shard = obj_names[index];
rgw::sal::Lifecycle::LCHead head{};
- rgw::sal::Lifecycle::LCEntry entry, next_entry; //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
+ rgw::sal::Lifecycle::LCEntry entry; //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
ldpp_dout(this, 5) << "RGWLC::process(): ENTER: "
<< "index: " << index << " worker ix: " << worker->ix
ldpp_dout(this, 5)
<< "RGWLC::process(): ACTIVE entry: " << entry
<< " index: " << index << " worker ix: " << worker->ix << dendl;
- goto exit;
+ /* skip to next entry */
+ if (advance_head(lc_shard, head, entry, now) < 0) {
+ goto exit;
+ }
+ /* done with this shard */
+ if (head.marker.empty()) {
+ ldpp_dout(this, 5) <<
+ "RGWLC::process() cycle finished lc_shard="
+ << lc_shard
+ << dendl;
+ head.shard_rollover_date = ceph_clock_now();
+ ret = sal_lc->put_head(lc_shard, head);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
+ << lc_shard
+ << dendl;
+ }
+ goto exit;
+ }
+ continue;
}
+ } else {
+ if ((entry.status == lc_complete) &&
+ already_run_today(cct, entry.start_time)) {
+ /* skip to next entry */
+ if (advance_head(lc_shard, head, entry, now) < 0) {
+ goto exit;
+ }
+ ldpp_dout(this, 5) << "RGWLC::process() worker ix; " << worker->ix
+ << " SKIP processing for already-processed bucket " << entry.bucket
+ << dendl;
+ /* done with this shard */
+ if (head.marker.empty()) {
+ ldpp_dout(this, 5) <<
+ "RGWLC::process() cycle finished lc_shard="
+ << lc_shard
+ << dendl;
+ head.shard_rollover_date = ceph_clock_now();
+ ret = sal_lc->put_head(lc_shard, head);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
+ << lc_shard
+ << dendl;
+ }
+ goto exit;
+ }
+ continue;
+ }
}
} else {
ldpp_dout(this, 5) << "RGWLC::process() entry.bucket.empty() == true at START 1"
goto exit;
}
- ret = sal_lc->get_next_entry(lc_shard, entry.bucket, next_entry);
- if (ret < 0) {
- ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
- << lc_shard << dendl;
- goto exit;
- }
-
- /* save the next position */
- head.marker = next_entry.bucket;
- head.start_date = now;
-
- ret = sal_lc->put_head(lc_shard, head);
- if (ret < 0) {
- ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
- << lc_shard
- << dendl;
+ /* advance head for next waiter, then process */
+ if (advance_head(lc_shard, head, entry, now) < 0) {
goto exit;
}