delete[] obj_names;
}
-bool RGWLC::if_already_run_today(time_t start_date)
-{
- struct tm bdt;
- time_t begin_of_day;
- utime_t now = ceph_clock_now();
- localtime_r(&start_date, &bdt);
-
- if (cct->_conf->rgw_lc_debug_interval > 0) {
- if (now - start_date < cct->_conf->rgw_lc_debug_interval)
- return true;
- else
- return false;
- }
-
- bdt.tm_hour = 0;
- bdt.tm_min = 0;
- bdt.tm_sec = 0;
- begin_of_day = mktime(&bdt);
- if (now - begin_of_day < 24*60*60)
- return true;
- else
- return false;
-}
-
static inline std::ostream& operator<<(std::ostream &os, rgw::sal::Lifecycle::LCEntry& ent) {
os << "<ent: bucket=";
os << ent.bucket;
int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
bool once = false)
{
+ int ret{0};
+ const auto& lc_shard = obj_names[index];
+
+ rgw::sal::Lifecycle::LCHead head{};
+ rgw::sal::Lifecycle::LCEntry entry, next_entry; //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
+
ldpp_dout(this, 5) << "RGWLC::process(): ENTER: "
<< "index: " << index << " worker ix: " << worker->ix
<< dendl;
- int ret = 0;
- rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name,
- obj_names[index],
- std::string());
+ rgw::sal::LCSerializer* lock =
+ sal_lc->get_serializer(lc_index_lock_name, lc_shard, std::string());
+
do {
utime_t now = ceph_clock_now();
- //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
- rgw::sal::Lifecycle::LCEntry entry;
- if (max_lock_secs <= 0)
- return -EAGAIN;
+ utime_t lock_for_s(max_lock_secs, 0);
- utime_t time(max_lock_secs, 0);
- ret = lock->try_lock(this, time, null_yield);
+ ret = lock->try_lock(this, lock_for_s, null_yield);
if (ret == -EBUSY || ret == -EEXIST) {
/* already locked by another lc processor */
ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
- << obj_names[index] << ", sleep 5, try again" << dendl;
+ << lc_shard << ", sleep 5, try again" << dendl;
sleep(5);
continue; // XXXX really retry forever?
}
- if (ret < 0)
+ if (ret < 0) {
return 0;
+ }
- rgw::sal::Lifecycle::LCHead head;
- ret = sal_lc->get_head(obj_names[index], head);
+ /* preamble: find an inital bucket/marker */
+ ret = sal_lc->get_head(lc_shard, head);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head "
- << obj_names[index] << ", ret=" << ret << dendl;
+ << lc_shard << ", ret=" << ret << dendl;
goto exit;
}
- ret = sal_lc->get_entry(obj_names[index], head.marker, entry);
- if (ret >= 0) {
- if (entry.status == lc_processing) {
- if (expired_session(entry.start_time)) {
- ldpp_dout(this, 5) << "RGWLC::process(): STALE lc session found for: " << entry
- << " index: " << index << " worker ix: " << worker->ix
- << " (clearing)"
- << dendl;
- } else {
- ldpp_dout(this, 5) << "RGWLC::process(): ACTIVE entry: " << entry
- << " index: " << index << " worker ix: " << worker->ix
- << dendl;
- goto exit;
- }
+ /* if there is nothing at head, try to reinitialize head.marker with the
+ * entry in the queue */
+ if (head.marker.empty()) {
+ vector<rgw::sal::Lifecycle::LCEntry> entries;
+ int ret = sal_lc->list_entries(lc_shard, head.marker, 1, entries);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "RGWLC::process() sal_lc->list_entries(lc_shard, head.marker, 1, "
+ << "entries) returned error ret==" << ret << dendl;
+ goto exit;
}
- }
-
- if(!if_already_run_today(head.start_date) ||
- once) {
+ entry = entries.front();
+ head.marker = entry.bucket;
head.start_date = now;
- head.marker.clear();
- ret = bucket_lc_prepare(index, worker);
- if (ret < 0) {
- ldpp_dout(this, 0) << "RGWLC::process() failed to update lc object "
- << obj_names[index]
- << ", ret=" << ret
+ } else {
+ ldpp_dout(this, 0) << "RGWLC::process() head.marker !empty() at START for shard=="
+ << lc_shard << " head last stored at "
+ << rgw_to_asctime(utime_t(time_t(head.start_date), 0))
<< dendl;
- goto exit;
+
+ /* fetches the entry pointed to by head.bucket */
+ ret = sal_lc->get_entry(lc_shard, head.marker, entry);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "RGWLC::process() sal_lc->get_entry(lc_shard, head.marker, entry) "
+ << "returned error ret==" << ret << dendl;
+ goto exit;
}
}
- ret = sal_lc->get_next_entry(obj_names[index], head.marker, entry);
- if (ret < 0) {
- ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
- << obj_names[index] << dendl;
+ if (! entry.bucket.empty()) {
+ if (entry.status == lc_processing) {
+ if (expired_session(entry.start_time)) {
+ ldpp_dout(this, 5)
+ << "RGWLC::process(): STALE lc session found for: " << entry
+ << " index: " << index << " worker ix: " << worker->ix
+ << " (clearing)" << dendl;
+ } else {
+ ldpp_dout(this, 5)
+ << "RGWLC::process(): ACTIVE entry: " << entry
+ << " index: " << index << " worker ix: " << worker->ix << dendl;
+ goto exit;
+ }
+ }
+ } else {
+ ldpp_dout(this, 0) << "RGWLC::process() entry.bucket.empty() == true at START 1"
+ << " (this is impossible, but stop now)"
+ << dendl;
goto exit;
}
- /* termination condition (eof) */
- if (entry.bucket.empty())
- goto exit;
-
+ /* When there are no more entries to process, entry will be
+ * equivalent to an empty marker and so the following resets the
+ * processing for the shard automatically when processing is
+ * finished for the shard */
ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry
<< " index: " << index << " worker ix: " << worker->ix
<< dendl;
entry.status = lc_processing;
- ret = sal_lc->set_entry(obj_names[index], entry);
+ entry.start_time = now;
+
+ ret = sal_lc->set_entry(lc_shard, entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry "
- << obj_names[index] << entry.bucket << entry.status << dendl;
+ << lc_shard << entry.bucket << entry.status << dendl;
goto exit;
}
- head.marker = entry.bucket;
- ret = sal_lc->put_head(obj_names[index], head);
+ ret = sal_lc->get_next_entry(lc_shard, entry.bucket, next_entry);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
+ << lc_shard << dendl;
+ goto exit;
+ }
+
+ /* save the next position */
+ head.marker = next_entry.bucket;
+ head.start_date = now;
+
+ ret = sal_lc->put_head(lc_shard, head);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
- << obj_names[index]
+ << lc_shard
<< dendl;
goto exit;
}
lock->unlock();
ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once);
bucket_lc_post(index, max_lock_secs, entry, ret, worker);
- } while(1 && !once);
+
+ /* done with this shard */
+ if (head.marker.empty()) {
+ ldpp_dout(this, 5) <<
+ "RGWLC::process() cycle finished lc_shard="
+ << lc_shard
+ << dendl;
+ ret = sal_lc->put_head(lc_shard, head);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
+ << lc_shard
+ << dendl;
+ }
+ goto exit;
+ }
+ } while(1 && !once && !going_down());
delete lock;
return 0;