From: Matt Benjamin Date: Mon, 14 Feb 2022 21:39:27 +0000 (-0500) Subject: rgwlc: remove explicit lc shard resets at start-of-run X-Git-Tag: v18.0.0~1240^2~6 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=b3fdd8894cbb9023a004536a90c8a555a310782a;p=ceph.git rgwlc: remove explicit lc shard resets at start-of-run This is an alternative solution to the (newly exposed) lifecycle shard starvation problem reported by Jeegen Chen. There was always an starvation condition implied by the reset of lc shard head at the start of processing. The introduction of "stale sessions" in parallel lifecycle changes made it more visible, in particular when rgw_lc_debug_interval was set to a small value and many buckets had lifecycle policy. My hypothesis in this change is that lifecycle processing for each lc shard should /always/ continue through the full set of eligible buckets for the shard, regardless of how many processing cycles might be required to do so. In general, restarting at the first eligible bucket on each reschedule invites starvation when processing "gets behind", so just avoid it. Fixes: https://tracker.ceph.com/issues/49446 Signed-off-by: Matt Benjamin (cherry picked from commit 6e2ae13adced6b3dbb2fe16b547a30e9d68dfa06) rgwlc: add a wraparound to continued shard processing If the full set of buckets for a given lc shard couldn't be processed in the prior cycle, processing will start with a non-empty marker. Note the initial marker position, then when the end of shard is reached, allow processing to wrap around to the logical beginning of the shard and proceeding through the initial marker. Signed-off-by: Matt Benjamin Please enter the commit message for your changes. Lines starting (cherry picked from commit 0b8f683d3cf444cc68fd30c3f179b9aa0ea08e7c) don't report clearing incorrectly Signed-off-by: Matt Benjamin --- diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index fc98082fabd76..b7805e65f47d9 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -273,30 +273,6 @@ void RGWLC::finalize() delete[] obj_names; } -bool RGWLC::if_already_run_today(time_t start_date) -{ - struct tm bdt; - time_t begin_of_day; - utime_t now = ceph_clock_now(); - localtime_r(&start_date, &bdt); - - if (cct->_conf->rgw_lc_debug_interval > 0) { - if (now - start_date < cct->_conf->rgw_lc_debug_interval) - return true; - else - return false; - } - - bdt.tm_hour = 0; - bdt.tm_min = 0; - bdt.tm_sec = 0; - begin_of_day = mktime(&bdt); - if (now - begin_of_day < 24*60*60) - return true; - else - return false; -} - static inline std::ostream& operator<<(std::ostream &os, rgw::sal::Lifecycle::LCEntry& ent) { os << "ix << dendl; - int ret = 0; - rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name, - obj_names[index], - std::string()); + rgw::sal::LCSerializer* lock = + sal_lc->get_serializer(lc_index_lock_name, lc_shard, std::string()); + do { utime_t now = ceph_clock_now(); - //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS - rgw::sal::Lifecycle::LCEntry entry; - if (max_lock_secs <= 0) - return -EAGAIN; + utime_t lock_for_s(max_lock_secs, 0); - utime_t time(max_lock_secs, 0); - ret = lock->try_lock(this, time, null_yield); + ret = lock->try_lock(this, lock_for_s, null_yield); if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */ ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on " - << obj_names[index] << ", sleep 5, try again" << dendl; + << lc_shard << ", sleep 5, try again" << dendl; sleep(5); continue; // XXXX really retry forever? } - if (ret < 0) + if (ret < 0) { return 0; + } - rgw::sal::Lifecycle::LCHead head; - ret = sal_lc->get_head(obj_names[index], head); + /* preamble: find an inital bucket/marker */ + ret = sal_lc->get_head(lc_shard, head); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head " - << obj_names[index] << ", ret=" << ret << dendl; + << lc_shard << ", ret=" << ret << dendl; goto exit; } - ret = sal_lc->get_entry(obj_names[index], head.marker, entry); - if (ret >= 0) { - if (entry.status == lc_processing) { - if (expired_session(entry.start_time)) { - ldpp_dout(this, 5) << "RGWLC::process(): STALE lc session found for: " << entry - << " index: " << index << " worker ix: " << worker->ix - << " (clearing)" - << dendl; - } else { - ldpp_dout(this, 5) << "RGWLC::process(): ACTIVE entry: " << entry - << " index: " << index << " worker ix: " << worker->ix - << dendl; - goto exit; - } + /* if there is nothing at head, try to reinitialize head.marker with the + * entry in the queue */ + if (head.marker.empty()) { + vector entries; + int ret = sal_lc->list_entries(lc_shard, head.marker, 1, entries); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() sal_lc->list_entries(lc_shard, head.marker, 1, " + << "entries) returned error ret==" << ret << dendl; + goto exit; } - } - - if(!if_already_run_today(head.start_date) || - once) { + entry = entries.front(); + head.marker = entry.bucket; head.start_date = now; - head.marker.clear(); - ret = bucket_lc_prepare(index, worker); - if (ret < 0) { - ldpp_dout(this, 0) << "RGWLC::process() failed to update lc object " - << obj_names[index] - << ", ret=" << ret + } else { + ldpp_dout(this, 0) << "RGWLC::process() head.marker !empty() at START for shard==" + << lc_shard << " head last stored at " + << rgw_to_asctime(utime_t(time_t(head.start_date), 0)) << dendl; - goto exit; + + /* fetches the entry pointed to by head.bucket */ + ret = sal_lc->get_entry(lc_shard, head.marker, entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() sal_lc->get_entry(lc_shard, head.marker, entry) " + << "returned error ret==" << ret << dendl; + goto exit; } } - ret = sal_lc->get_next_entry(obj_names[index], head.marker, entry); - if (ret < 0) { - ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry " - << obj_names[index] << dendl; + if (! entry.bucket.empty()) { + if (entry.status == lc_processing) { + if (expired_session(entry.start_time)) { + ldpp_dout(this, 5) + << "RGWLC::process(): STALE lc session found for: " << entry + << " index: " << index << " worker ix: " << worker->ix + << " (clearing)" << dendl; + } else { + ldpp_dout(this, 5) + << "RGWLC::process(): ACTIVE entry: " << entry + << " index: " << index << " worker ix: " << worker->ix << dendl; + goto exit; + } + } + } else { + ldpp_dout(this, 0) << "RGWLC::process() entry.bucket.empty() == true at START 1" + << " (this is impossible, but stop now)" + << dendl; goto exit; } - /* termination condition (eof) */ - if (entry.bucket.empty()) - goto exit; - + /* When there are no more entries to process, entry will be + * equivalent to an empty marker and so the following resets the + * processing for the shard automatically when processing is + * finished for the shard */ ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry << " index: " << index << " worker ix: " << worker->ix << dendl; entry.status = lc_processing; - ret = sal_lc->set_entry(obj_names[index], entry); + entry.start_time = now; + + ret = sal_lc->set_entry(lc_shard, entry); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry " - << obj_names[index] << entry.bucket << entry.status << dendl; + << lc_shard << entry.bucket << entry.status << dendl; goto exit; } - head.marker = entry.bucket; - ret = sal_lc->put_head(obj_names[index], head); + ret = sal_lc->get_next_entry(lc_shard, entry.bucket, next_entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry " + << lc_shard << dendl; + goto exit; + } + + /* save the next position */ + head.marker = next_entry.bucket; + head.start_date = now; + + ret = sal_lc->put_head(lc_shard, head); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to put head " - << obj_names[index] + << lc_shard << dendl; goto exit; } @@ -2227,7 +2228,22 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, lock->unlock(); ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once); bucket_lc_post(index, max_lock_secs, entry, ret, worker); - } while(1 && !once); + + /* done with this shard */ + if (head.marker.empty()) { + ldpp_dout(this, 5) << + "RGWLC::process() cycle finished lc_shard=" + << lc_shard + << dendl; + ret = sal_lc->put_head(lc_shard, head); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to put head " + << lc_shard + << dendl; + } + goto exit; + } + } while(1 && !once && !going_down()); delete lock; return 0; diff --git a/src/rgw/rgw_lc.h b/src/rgw/rgw_lc.h index f79dd23ae7c56..87d44ac69a645 100644 --- a/src/rgw/rgw_lc.h +++ b/src/rgw/rgw_lc.h @@ -521,7 +521,6 @@ public: int process(int index, int max_lock_secs, LCWorker* worker, bool once); int process_bucket(int index, int max_lock_secs, LCWorker* worker, const std::string& bucket_entry_marker, bool once); - bool if_already_run_today(time_t start_date); bool expired_session(time_t started); time_t thread_stop_at(); int list_lc_progress(std::string& marker, uint32_t max_entries,