From 61fee0ef048289599992dc68db2baeb5fb92b087 Mon Sep 17 00:00:00 2001 From: Adam Emerson Date: Fri, 12 Jul 2024 18:05:20 -0400 Subject: [PATCH] rgw/multisite/datalog: Decrement with grace period This guards against an excess decrement in the sequence: RGW_a: Fetch sem_set (see key 'foo') RGW_b: run renew_entry ('foo' is no longer in `cur_cycle`) RGW_a: notify (does not see 'foo' in response) RGW_a: Decrements 'foo' Signed-off-by: Adam Emerson --- src/cls/sem_set/DESIGN.md | 4 +++- src/rgw/driver/rados/rgw_datalog.cc | 16 ++++++++++++++-- src/rgw/driver/rados/rgw_datalog.h | 2 ++ 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/cls/sem_set/DESIGN.md b/src/cls/sem_set/DESIGN.md index b1ab92a03fb..d09ad393e05 100644 --- a/src/cls/sem_set/DESIGN.md +++ b/src/cls/sem_set/DESIGN.md @@ -57,7 +57,9 @@ after a write, the data log entry might never be made. respond with bs1 in their `cur_cycle`, bs1 will be decremented thrice) 6. For each entry in the unordered map, decrement on the semaphore - object only if the object's count is greater than 0. + object only if the object's count is greater than 0. Send a + grace period corresponding to the length of time since fetch + times a fudge factor. 7. If the `notify` operation errors, don't decrement anything. * Have some task call `compress` on a regular basis (Daily? Hourly?), to keep seldom used or deleted bucket shards from slowing down diff --git a/src/rgw/driver/rados/rgw_datalog.cc b/src/rgw/driver/rados/rgw_datalog.cc index 3d51fa23ae9..b989b81c468 100644 --- a/src/rgw/driver/rados/rgw_datalog.cc +++ b/src/rgw/driver/rados/rgw_datalog.cc @@ -1466,6 +1466,11 @@ asio::awaitable RGWDataChangesLog::renew_run(decltype(renew_signal)) { ++run; } + if (ceph::mono_clock::now() - last_recovery < 6h) { + co_await recover(&dp, recovery_signal); + }; + + int interval = cct->_conf->rgw_data_log_window * 3 / 4; renew_timer->expires_after(std::chrono::seconds(interval)); co_await renew_timer->async_wait(asio::use_awaitable); @@ -1676,6 +1681,7 @@ RGWDataChangesLog::gather_working_sets( asio::awaitable RGWDataChangesLog::decrement_sems( int index, + ceph::mono_time fetch_time, bc::flat_map&& semcount) { namespace sem_set = neorados::cls::sem_set; @@ -1686,9 +1692,10 @@ RGWDataChangesLog::decrement_sems( batch.insert(iter->first); semcount.erase(std::move(iter)); } + auto grace = ((ceph::mono_clock::now() - fetch_time) * 4) / 3; co_await rados->execute( get_sem_set_oid(index), loc, neorados::WriteOp{}.exec( - sem_set::decrement(std::move(batch))), + sem_set::decrement(std::move(batch), grace)), asio::use_awaitable); } } @@ -1700,6 +1707,7 @@ RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index) do { bc::flat_map semcount; + auto fetch_time = ceph::mono_clock::now(); // Gather entries in the shard std::tie(semcount, cursor) = co_await read_sems(index, std::move(cursor)); // If we have none, no point doing the rest @@ -1727,7 +1735,7 @@ RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index) << "failed, skipping decrement" << dendl; continue; } - co_await decrement_sems(index, std::move(semcount)); + co_await decrement_sems(index, fetch_time, std::move(semcount)); } while (!cursor.empty()); co_return; } @@ -1747,6 +1755,10 @@ asio::awaitable RGWDataChangesLog::recover(const DoutPrefixProvider* dpp, co_await group.wait(); }(dpp), asio::use_awaitable); + + std::unique_lock l(lock); + last_recovery = ceph::mono_clock::now(); + l.unlock(); } void RGWDataChangesLogInfo::dump(Formatter *f) const diff --git a/src/rgw/driver/rados/rgw_datalog.h b/src/rgw/driver/rados/rgw_datalog.h index fdd09973391..ec55b588a79 100644 --- a/src/rgw/driver/rados/rgw_datalog.h +++ b/src/rgw/driver/rados/rgw_datalog.h @@ -367,6 +367,7 @@ class RGWDataChangesLog { std::make_shared(); std::shared_ptr recovery_signal = std::make_shared(); + ceph::mono_time last_recovery = ceph::mono_clock::zero(); const int num_shards; std::string get_prefix() { return "data_log"; } @@ -524,6 +525,7 @@ public: bc::flat_map& semcount); asio::awaitable decrement_sems(int index, + ceph::mono_time fetch_time, bc::flat_map&& semcount); asio::awaitable recover_shard(const DoutPrefixProvider* dpp, int index); asio::awaitable recover(const DoutPrefixProvider* dpp, -- 2.39.5