return ret;
} /* advance head */
+inline int RGWLC::check_if_shard_done(const std::string& lc_shard,
+ rgw::sal::Lifecycle::LCHead& head, int worker_ix)
+{
+ int ret{0};
+
+ if (head.get_marker().empty()) {
+ /* done with this shard */
+ ldpp_dout(this, 5) <<
+ "RGWLC::process() next_entry not found. cycle finished lc_shard="
+ << lc_shard << " worker=" << worker_ix
+ << dendl;
+ head.set_shard_rollover_date(ceph_clock_now());
+ ret = sal_lc->put_head(lc_shard, head);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
+ << lc_shard
+ << dendl;
+ }
+ ret = 1; // to mark that shard is done
+ }
+ return ret;
+}
+
+inline int RGWLC::update_head(const std::string& lc_shard,
+ rgw::sal::Lifecycle::LCHead& head,
+ rgw::sal::Lifecycle::LCEntry& entry,
+ time_t start_date, int worker_ix)
+{
+ int ret{0};
+
+ ret = advance_head(lc_shard, head, entry, start_date);
+ if (ret != 0) {
+ ldpp_dout(this, 0) << "RGWLC::update_head() failed to advance head "
+ << lc_shard
+ << dendl;
+ goto exit;
+ }
+
+ ret = check_if_shard_done(lc_shard, head, worker_ix);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "RGWLC::update_head() failed to check if shard is done "
+ << lc_shard
+ << dendl;
+ }
+
+exit:
+ return ret;
+}
+
int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
bool once = false)
{
ret = sal_lc->get_entry(lc_shard, head->get_marker(), &entry);
if (ret == -ENOENT) {
/* skip to next entry */
- std::unique_ptr<rgw::sal::Lifecycle::LCEntry> tmp_entry = sal_lc->get_entry();
- tmp_entry->set_bucket(head->get_marker());
- if (advance_head(lc_shard, *head.get(), *tmp_entry.get(), now) < 0) {
- goto exit;
- }
- /* done with this shard */
- if (head->get_marker().empty()) {
- ldpp_dout(this, 5) <<
- "RGWLC::process() next_entry not found. cycle finished lc_shard="
- << lc_shard << " worker=" << worker->ix
- << dendl;
- head->set_shard_rollover_date(ceph_clock_now());
- ret = sal_lc->put_head(lc_shard, *head.get());
- if (ret < 0) {
- ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
- << lc_shard
- << dendl;
- }
- goto exit;
- }
- continue;
+ std::unique_ptr<rgw::sal::Lifecycle::LCEntry> tmp_entry = sal_lc->get_entry();
+ tmp_entry->set_bucket(head->get_marker());
+
+ if (update_head(lc_shard, *head.get(), *tmp_entry.get(), now, worker->ix) != 0) {
+ goto exit;
+ }
+ continue;
}
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() sal_lc->get_entry(lc_shard, head.marker, entry) "
<< "RGWLC::process(): ACTIVE entry: " << entry
<< " index: " << index << " worker ix: " << worker->ix << dendl;
/* skip to next entry */
- if (advance_head(lc_shard, *head.get(), *entry.get(), now) < 0) {
- goto exit;
- }
- /* done with this shard */
- if (head->get_marker().empty()) {
- ldpp_dout(this, 5) <<
- "RGWLC::process() cycle finished lc_shard="
- << lc_shard << " worker=" << worker->ix
- << dendl;
- head->set_shard_rollover_date(ceph_clock_now());
- ret = sal_lc->put_head(lc_shard, *head.get());
- if (ret < 0) {
- ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
- << lc_shard
- << dendl;
- }
- goto exit;
+ if (update_head(lc_shard, *head.get(), *entry.get(), now, worker->ix) != 0) {
+ goto exit;
}
continue;
}
} else {
if ((entry->get_status() == lc_complete) &&
already_run_today(cct, entry->get_start_time())) {
- /* skip to next entry */
- if (advance_head(lc_shard, *head.get(), *entry.get(), now) < 0) {
- goto exit;
- }
ldpp_dout(this, 5) << "RGWLC::process() worker ix: " << worker->ix
<< " SKIP processing for already-processed bucket " << entry->get_bucket()
<< dendl;
- /* done with this shard */
- if (head->get_marker().empty()) {
- ldpp_dout(this, 5) <<
- "RGWLC::process() cycle finished lc_shard="
- << lc_shard << " worker=" << worker->ix
- << dendl;
- head->set_shard_rollover_date(ceph_clock_now());
- ret = sal_lc->put_head(lc_shard, *head.get());
- if (ret < 0) {
- ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
- << lc_shard
- << dendl;
- }
- goto exit;
- }
+ /* skip to next entry */
+ if (update_head(lc_shard, *head.get(), *entry.get(), now, worker->ix) != 0) {
+ goto exit;
+ }
continue;
}
}
}
}
- /* done with this shard */
- if (head->get_marker().empty()) {
- ldpp_dout(this, 5) <<
- "RGWLC::process() cycle finished lc_shard="
- << lc_shard << " worker=" << worker->ix
- << dendl;
- head->set_shard_rollover_date(ceph_clock_now());
- ret = sal_lc->put_head(lc_shard, *head.get());
- if (ret < 0) {
- ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
- << lc_shard
- << dendl;
- }
+ if (check_if_shard_done(lc_shard, *head.get(), worker->ix) != 0 ) {
goto exit;
}
} while(1 && !once && !going_down());