respond with bs1 in their `cur_cycle`, bs1 will be decremented
thrice)
6. For each entry in the unordered map, decrement on the semaphore
- object only if the object's count is greater than 0.
+ object only if the object's count is greater than 0. Send a
+ grace period corresponding to the length of time since fetch
+ times a fudge factor.
7. If the `notify` operation errors, don't decrement anything.
* Have some task call `compress` on a regular basis (Daily? Hourly?),
to keep seldom used or deleted bucket shards from slowing down
++run;
}
+ if (ceph::mono_clock::now() - last_recovery < 6h) {
+ co_await recover(&dp, recovery_signal);
+ };
+
+
int interval = cct->_conf->rgw_data_log_window * 3 / 4;
renew_timer->expires_after(std::chrono::seconds(interval));
co_await renew_timer->async_wait(asio::use_awaitable);
asio::awaitable<void>
RGWDataChangesLog::decrement_sems(
int index,
+ ceph::mono_time fetch_time,
bc::flat_map<std::string, uint64_t>&& semcount)
{
namespace sem_set = neorados::cls::sem_set;
batch.insert(iter->first);
semcount.erase(std::move(iter));
}
+ auto grace = ((ceph::mono_clock::now() - fetch_time) * 4) / 3;
co_await rados->execute(
get_sem_set_oid(index), loc, neorados::WriteOp{}.exec(
- sem_set::decrement(std::move(batch))),
+ sem_set::decrement(std::move(batch), grace)),
asio::use_awaitable);
}
}
do {
bc::flat_map<std::string, uint64_t> semcount;
+ auto fetch_time = ceph::mono_clock::now();
// Gather entries in the shard
std::tie(semcount, cursor) = co_await read_sems(index, std::move(cursor));
// If we have none, no point doing the rest
<< "failed, skipping decrement" << dendl;
continue;
}
- co_await decrement_sems(index, std::move(semcount));
+ co_await decrement_sems(index, fetch_time, std::move(semcount));
} while (!cursor.empty());
co_return;
}
co_await group.wait();
}(dpp),
asio::use_awaitable);
+
+ std::unique_lock l(lock);
+ last_recovery = ceph::mono_clock::now();
+ l.unlock();
}
void RGWDataChangesLogInfo::dump(Formatter *f) const
std::make_shared<asio::cancellation_signal>();
std::shared_ptr<asio::cancellation_signal> recovery_signal =
std::make_shared<asio::cancellation_signal>();
+ ceph::mono_time last_recovery = ceph::mono_clock::zero();
const int num_shards;
std::string get_prefix() { return "data_log"; }
bc::flat_map<std::string, uint64_t>& semcount);
asio::awaitable<void>
decrement_sems(int index,
+ ceph::mono_time fetch_time,
bc::flat_map<std::string, uint64_t>&& semcount);
asio::awaitable<void> recover_shard(const DoutPrefixProvider* dpp, int index);
asio::awaitable<void> recover(const DoutPrefixProvider* dpp,