From: Matt Benjamin Date: Thu, 17 Feb 2022 15:55:14 +0000 (-0500) Subject: rgwlc: remove bucket_lc_prepare, add backoff X-Git-Tag: v18.0.0~1240^2~5 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=c069eb7ff09b52003fa00a5cc83b1e52370032f5;p=ceph.git rgwlc: remove bucket_lc_prepare, add backoff Remove now-unused RGWLC::bucket_lc_prepare. Wrap serializer calls in RGWLC::process(int index...) with simple backoff, limited to 5 retries. In RGWLC::process(int index...), also open-coded the behavior of RGWLC::bucket_lc_prepare(...), as the lock sharing between these methods is error prone. For now, that method exists, so that it can be called from the single-bucket process. Signed-off-by: Matt Benjamin --- diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index b7805e65f47d9..022321f4c690d 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -14,6 +14,7 @@ #include #include "include/scope_guard.h" +#include "include/function2.hpp" #include "common/Formatter.h" #include "common/containers.h" #include @@ -284,41 +285,6 @@ static inline std::ostream& operator<<(std::ostream &os, rgw::sal::Lifecycle::LC return os; } -int RGWLC::bucket_lc_prepare(int index, LCWorker* worker) -{ - vector entries; - string marker; - - ldpp_dout(this, 5) << "RGWLC::bucket_lc_prepare(): PREPARE " - << "index: " << index << " worker ix: " << worker->ix - << dendl; - -#define MAX_LC_LIST_ENTRIES 100 - do { - int ret = sal_lc->list_entries(obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries); - if (ret < 0) - return ret; - - for (auto& entry : entries) { - entry.start_time = ceph_clock_now(); - entry.status = lc_uninitial; // lc_uninitial? really? - ret = sal_lc->set_entry(obj_names[index], entry); - if (ret < 0) { - ldpp_dout(this, 0) - << "RGWLC::bucket_lc_prepare() failed to set entry on " - << obj_names[index] << dendl; - return ret; - } - } - - if (! entries.empty()) { - marker = std::move(entries.back().bucket); - } - } while (!entries.empty()); - - return 0; -} - static bool obj_has_expired(const DoutPrefixProvider *dpp, CephContext *cct, ceph::real_time mtime, int days, ceph::real_time *expire_time = nullptr) { @@ -1825,6 +1791,41 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, return ret; } +class SimpleBackoff +{ + const int max_retries; + std::chrono::milliseconds sleep_ms; + int retries{0}; +public: + SimpleBackoff(int max_retries, std::chrono::milliseconds initial_sleep_ms) + : max_retries(max_retries), sleep_ms(initial_sleep_ms) + {} + SimpleBackoff(const SimpleBackoff&) = delete; + SimpleBackoff& operator=(const SimpleBackoff&) = delete; + + int get_retries() const { + return retries; + } + + void reset() { + retries = 0; + } + + bool wait_backoff(const fu2::unique_function& barrier) { + reset(); + while (retries < max_retries) { + auto r = barrier(); + if (r) { + return r; + } + std::this_thread::sleep_for(sleep_ms); + sleep_ms = std::chrono::milliseconds(sleep_ms*2*retries); + ++retries; + } + return false; + } +}; + int RGWLC::bucket_lc_post(int index, int max_lock_sec, rgw::sal::Lifecycle::LCEntry& entry, int& result, LCWorker* worker) @@ -1853,6 +1854,7 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec, return 0; ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] << dendl; + if (result == -ENOENT) { /* XXXX are we SURE the only way result could == ENOENT is when * there is no such bucket? It is currently the value returned @@ -1881,7 +1883,7 @@ clean: << obj_names[index] << dendl; return 0; } while (true); -} +} /* RGWLC::bucket_lc_post */ int RGWLC::list_lc_progress(string& marker, uint32_t max_entries, vector& progress_map, @@ -2111,21 +2113,29 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name, lc_shard, std::string()); - do { - utime_t now = ceph_clock_now(); - utime_t lock_for_s(max_lock_secs, 0); - + utime_t lock_for_s(max_lock_secs, 0); + const auto& lock_lambda = [&]() { ret = lock->try_lock(this, lock_for_s, null_yield); + if (ret == 0) { + return true; + } if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */ - ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on " - << lc_shard << ", sleep 5, try again" << dendl; - sleep(5); - continue; // XXXX really retry forever? - } - if (ret < 0) { - return 0; - } + return false; + } + return false; + }; + + SimpleBackoff shard_lock(5 /* max retries */, 50ms); + if (! shard_lock.wait_backoff(lock_lambda)) { + ldpp_dout(this, 0) << "RGWLC::process(): failed to aquire lock on " + << lc_shard << " after " << shard_lock.get_retries() + << dendl; + goto notlocked; + } + + do { + utime_t now = ceph_clock_now(); /* preamble: find an inital bucket/marker */ ret = sal_lc->get_head(lc_shard, head); @@ -2136,7 +2146,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, } /* if there is nothing at head, try to reinitialize head.marker with the - * entry in the queue */ + * first entry in the queue */ if (head.marker.empty()) { vector entries; int ret = sal_lc->list_entries(lc_shard, head.marker, 1, entries); @@ -2225,9 +2235,43 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, << " index: " << index << " worker ix: " << worker->ix << dendl; + /* drop lock so other instances can make progress while this + * bucket is being processed */ lock->unlock(); ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once); - bucket_lc_post(index, max_lock_secs, entry, ret, worker); + + /* postamble */ + //bucket_lc_post(index, max_lock_secs, entry, ret, worker); + if (! shard_lock.wait_backoff(lock_lambda)) { + ldpp_dout(this, 0) << "RGWLC::process(): failed to aquire lock on " + << lc_shard << " after " << shard_lock.get_retries() + << dendl; + goto notlocked; + } + + if (ret == -ENOENT) { + /* XXXX are we SURE the only way result could == ENOENT is when + * there is no such bucket? It is currently the value returned + * from bucket_lc_process(...) */ + ret = sal_lc->rm_entry(lc_shard, entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to remove entry " + << lc_shard << " (nonfatal)" + << dendl; + /* not fatal, could result from a race */ + } + } else if (ret < 0) { + entry.status = lc_failed; + } else { + entry.status = lc_complete; + } + ret = sal_lc->set_entry(lc_shard, entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on " + << lc_shard << dendl; + /* fatal, locked */ + goto exit; + } /* done with this shard */ if (head.marker.empty()) { @@ -2245,6 +2289,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, } } while(1 && !once && !going_down()); +notlocked: delete lock; return 0; diff --git a/src/rgw/rgw_lc.h b/src/rgw/rgw_lc.h index 87d44ac69a645..fc4db3932c90e 100644 --- a/src/rgw/rgw_lc.h +++ b/src/rgw/rgw_lc.h @@ -525,7 +525,6 @@ public: time_t thread_stop_at(); int list_lc_progress(std::string& marker, uint32_t max_entries, std::vector&, int& index); - int bucket_lc_prepare(int index, LCWorker* worker); int bucket_lc_process(std::string& shard_id, LCWorker* worker, time_t stop_at, bool once); int bucket_lc_post(int index, int max_lock_sec,