]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multisite/datalog: Decrement with grace period
authorAdam Emerson <aemerson@redhat.com>
Fri, 12 Jul 2024 22:05:20 +0000 (18:05 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 1 Apr 2025 15:10:14 +0000 (11:10 -0400)
This guards against an excess decrement in the sequence:

RGW_a: Fetch sem_set (see key 'foo')
RGW_b: run renew_entry ('foo' is no longer in `cur_cycle`)
RGW_a: notify (does not see 'foo' in response)
RGW_a: Decrements 'foo'

Signed-off-by: Adam Emerson <aemerson@redhat.com>
src/cls/sem_set/DESIGN.md
src/rgw/driver/rados/rgw_datalog.cc
src/rgw/driver/rados/rgw_datalog.h

index b1ab92a03fbf22d47ba85ab63f19f2d5851d2529..d09ad393e05ecc11fd9fcfe7578b3505c8b61dd4 100644 (file)
@@ -57,7 +57,9 @@ after a write, the data log entry might never be made.
        respond with bs1 in their `cur_cycle`, bs1 will be decremented
        thrice)
        6. For each entry in the unordered map, decrement on the semaphore
-       object only if the object's count is greater than 0.
+       object only if the object's count is greater than 0. Send a
+       grace period corresponding to the length of time since fetch
+       times a fudge factor.
     7. If the `notify` operation errors, don't decrement anything.
 * Have some task call `compress` on a regular basis (Daily? Hourly?),
   to keep seldom used or deleted bucket shards from slowing down
index 3d51fa23ae92ae0f8c8cbce9201b393d9c3cd959..b989b81c468481a2cbe5336dc2b8ac8ba7ca827c 100644 (file)
@@ -1466,6 +1466,11 @@ asio::awaitable<void> RGWDataChangesLog::renew_run(decltype(renew_signal)) {
        ++run;
       }
 
+      if (ceph::mono_clock::now() - last_recovery < 6h)  {
+       co_await recover(&dp, recovery_signal);
+      };
+
+
       int interval = cct->_conf->rgw_data_log_window * 3 / 4;
       renew_timer->expires_after(std::chrono::seconds(interval));
       co_await renew_timer->async_wait(asio::use_awaitable);
@@ -1676,6 +1681,7 @@ RGWDataChangesLog::gather_working_sets(
 asio::awaitable<void>
 RGWDataChangesLog::decrement_sems(
   int index,
+  ceph::mono_time fetch_time,
   bc::flat_map<std::string, uint64_t>&& semcount)
 {
   namespace sem_set = neorados::cls::sem_set;
@@ -1686,9 +1692,10 @@ RGWDataChangesLog::decrement_sems(
       batch.insert(iter->first);
       semcount.erase(std::move(iter));
     }
+    auto grace = ((ceph::mono_clock::now() - fetch_time) * 4) / 3;
     co_await rados->execute(
       get_sem_set_oid(index), loc, neorados::WriteOp{}.exec(
-       sem_set::decrement(std::move(batch))),
+       sem_set::decrement(std::move(batch), grace)),
       asio::use_awaitable);
   }
 }
@@ -1700,6 +1707,7 @@ RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index)
   do {
     bc::flat_map<std::string, uint64_t> semcount;
 
+    auto fetch_time = ceph::mono_clock::now();
     // Gather entries in the shard
     std::tie(semcount, cursor) = co_await read_sems(index, std::move(cursor));
     // If we have none, no point doing the rest
@@ -1727,7 +1735,7 @@ RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index)
                        << "failed, skipping decrement" << dendl;
       continue;
     }
-    co_await decrement_sems(index, std::move(semcount));
+    co_await decrement_sems(index, fetch_time, std::move(semcount));
   } while (!cursor.empty());
   co_return;
 }
@@ -1747,6 +1755,10 @@ asio::awaitable<void> RGWDataChangesLog::recover(const DoutPrefixProvider* dpp,
       co_await group.wait();
     }(dpp),
     asio::use_awaitable);
+
+  std::unique_lock l(lock);
+  last_recovery = ceph::mono_clock::now();
+  l.unlock();
 }
 
 void RGWDataChangesLogInfo::dump(Formatter *f) const
index fdd099733911573349691f1446c8ec20fb58603e..ec55b588a79c189d272cb9cff9d709388ca298f6 100644 (file)
@@ -367,6 +367,7 @@ class RGWDataChangesLog {
     std::make_shared<asio::cancellation_signal>();
   std::shared_ptr<asio::cancellation_signal> recovery_signal =
     std::make_shared<asio::cancellation_signal>();
+  ceph::mono_time last_recovery = ceph::mono_clock::zero();
 
   const int num_shards;
   std::string get_prefix() { return "data_log"; }
@@ -524,6 +525,7 @@ public:
                      bc::flat_map<std::string, uint64_t>& semcount);
   asio::awaitable<void>
   decrement_sems(int index,
+                ceph::mono_time fetch_time,
                 bc::flat_map<std::string, uint64_t>&& semcount);
   asio::awaitable<void> recover_shard(const DoutPrefixProvider* dpp, int index);
   asio::awaitable<void> recover(const DoutPrefixProvider* dpp,