#include <boost/variant.hpp>
#include "include/scope_guard.h"
+#include "include/function2.hpp"
#include "common/Formatter.h"
#include "common/containers.h"
#include <common/errno.h>
return os;
}
-int RGWLC::bucket_lc_prepare(int index, LCWorker* worker)
-{
- vector<rgw::sal::Lifecycle::LCEntry> entries;
- string marker;
-
- ldpp_dout(this, 5) << "RGWLC::bucket_lc_prepare(): PREPARE "
- << "index: " << index << " worker ix: " << worker->ix
- << dendl;
-
-#define MAX_LC_LIST_ENTRIES 100
- do {
- int ret = sal_lc->list_entries(obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
- if (ret < 0)
- return ret;
-
- for (auto& entry : entries) {
- entry.start_time = ceph_clock_now();
- entry.status = lc_uninitial; // lc_uninitial? really?
- ret = sal_lc->set_entry(obj_names[index], entry);
- if (ret < 0) {
- ldpp_dout(this, 0)
- << "RGWLC::bucket_lc_prepare() failed to set entry on "
- << obj_names[index] << dendl;
- return ret;
- }
- }
-
- if (! entries.empty()) {
- marker = std::move(entries.back().bucket);
- }
- } while (!entries.empty());
-
- return 0;
-}
-
static bool obj_has_expired(const DoutPrefixProvider *dpp, CephContext *cct, ceph::real_time mtime, int days,
ceph::real_time *expire_time = nullptr)
{
return ret;
}
+class SimpleBackoff
+{
+ const int max_retries;
+ std::chrono::milliseconds sleep_ms;
+ int retries{0};
+public:
+ SimpleBackoff(int max_retries, std::chrono::milliseconds initial_sleep_ms)
+ : max_retries(max_retries), sleep_ms(initial_sleep_ms)
+ {}
+ SimpleBackoff(const SimpleBackoff&) = delete;
+ SimpleBackoff& operator=(const SimpleBackoff&) = delete;
+
+ int get_retries() const {
+ return retries;
+ }
+
+ void reset() {
+ retries = 0;
+ }
+
+ bool wait_backoff(const fu2::unique_function<bool(void) const>& barrier) {
+ reset();
+ while (retries < max_retries) {
+ auto r = barrier();
+ if (r) {
+ return r;
+ }
+ std::this_thread::sleep_for(sleep_ms);
+ sleep_ms = std::chrono::milliseconds(sleep_ms*2*retries);
+ ++retries;
+ }
+ return false;
+ }
+};
+
int RGWLC::bucket_lc_post(int index, int max_lock_sec,
rgw::sal::Lifecycle::LCEntry& entry, int& result,
LCWorker* worker)
return 0;
ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index]
<< dendl;
+
if (result == -ENOENT) {
/* XXXX are we SURE the only way result could == ENOENT is when
* there is no such bucket? It is currently the value returned
<< obj_names[index] << dendl;
return 0;
} while (true);
-}
+} /* RGWLC::bucket_lc_post */
int RGWLC::list_lc_progress(string& marker, uint32_t max_entries,
vector<rgw::sal::Lifecycle::LCEntry>& progress_map,
rgw::sal::LCSerializer* lock =
sal_lc->get_serializer(lc_index_lock_name, lc_shard, std::string());
- do {
- utime_t now = ceph_clock_now();
- utime_t lock_for_s(max_lock_secs, 0);
-
+ utime_t lock_for_s(max_lock_secs, 0);
+ const auto& lock_lambda = [&]() {
ret = lock->try_lock(this, lock_for_s, null_yield);
+ if (ret == 0) {
+ return true;
+ }
if (ret == -EBUSY || ret == -EEXIST) {
/* already locked by another lc processor */
- ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
- << lc_shard << ", sleep 5, try again" << dendl;
- sleep(5);
- continue; // XXXX really retry forever?
- }
- if (ret < 0) {
- return 0;
- }
+ return false;
+ }
+ return false;
+ };
+
+ SimpleBackoff shard_lock(5 /* max retries */, 50ms);
+ if (! shard_lock.wait_backoff(lock_lambda)) {
+ ldpp_dout(this, 0) << "RGWLC::process(): failed to aquire lock on "
+ << lc_shard << " after " << shard_lock.get_retries()
+ << dendl;
+ goto notlocked;
+ }
+
+ do {
+ utime_t now = ceph_clock_now();
/* preamble: find an inital bucket/marker */
ret = sal_lc->get_head(lc_shard, head);
}
/* if there is nothing at head, try to reinitialize head.marker with the
- * entry in the queue */
+ * first entry in the queue */
if (head.marker.empty()) {
vector<rgw::sal::Lifecycle::LCEntry> entries;
int ret = sal_lc->list_entries(lc_shard, head.marker, 1, entries);
<< " index: " << index << " worker ix: " << worker->ix
<< dendl;
+ /* drop lock so other instances can make progress while this
+ * bucket is being processed */
lock->unlock();
ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once);
- bucket_lc_post(index, max_lock_secs, entry, ret, worker);
+
+ /* postamble */
+ //bucket_lc_post(index, max_lock_secs, entry, ret, worker);
+ if (! shard_lock.wait_backoff(lock_lambda)) {
+ ldpp_dout(this, 0) << "RGWLC::process(): failed to aquire lock on "
+ << lc_shard << " after " << shard_lock.get_retries()
+ << dendl;
+ goto notlocked;
+ }
+
+ if (ret == -ENOENT) {
+ /* XXXX are we SURE the only way result could == ENOENT is when
+ * there is no such bucket? It is currently the value returned
+ * from bucket_lc_process(...) */
+ ret = sal_lc->rm_entry(lc_shard, entry);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "RGWLC::process() failed to remove entry "
+ << lc_shard << " (nonfatal)"
+ << dendl;
+ /* not fatal, could result from a race */
+ }
+ } else if (ret < 0) {
+ entry.status = lc_failed;
+ } else {
+ entry.status = lc_complete;
+ }
+ ret = sal_lc->set_entry(lc_shard, entry);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on "
+ << lc_shard << dendl;
+ /* fatal, locked */
+ goto exit;
+ }
/* done with this shard */
if (head.marker.empty()) {
}
} while(1 && !once && !going_down());
+notlocked:
delete lock;
return 0;