});
}
+BtreeOMapManager::omap_rm_keys_ret
+BtreeOMapManager::omap_rm_keys(
+ omap_root_t &omap_root,
+ Transaction &t,
+ std::set<std::string>& keys)
+{
+ auto type = omap_root.get_type();
+ return trans_intr::do_for_each(
+ keys.begin(),
+ keys.end(),
+ [&t, &omap_root, type, this](auto &p)
+ {
+ LOG_PREFIX(BtreeOMapManager::omap_rm_keys);
+ DEBUGT("{} remove key={} ...", t, type, p);
+ return omap_rm_key(omap_root, t, p);
+ });
+}
+
BtreeOMapManager::omap_rm_key_range_ret
BtreeOMapManager::omap_rm_key_range(
omap_root_t &omap_root,
co_return;
}
+static inline bool add_decimal_string(std::string& s, size_t add)
+{
+ size_t carry = add;
+ for (int i = (int)s.size() - 1; i >= 0 && carry > 0; --i) {
+ char c = s[i];
+ if (c < '0' || c > '9')
+ continue;
+ int sum = (c - '0') + (int)(carry % 10);
+ carry /= 10;
+ s[i] = char('0' + (sum % 10));
+ carry += (size_t)(sum / 10);
+ }
+ return carry == 0;
+}
+
+bool is_continuous_fixed_width(const std::set<std::string>& keys)
+{
+ const auto& first = *keys.begin();
+ const auto& last = *keys.rbegin();
+
+ auto dot1 = first.find('.');
+ auto dot2 = last.find('.');
+ if (dot1 == std::string::npos || dot2 == std::string::npos) return false;
+
+ std::string seq = first.substr(dot1 + 1);
+ if (!add_decimal_string(seq, keys.size() - 1)) return false;
+
+ if (seq == last.substr(dot2 + 1)) {
+ // First we check equality (same epoch),
+ // then we increment it by 1 to check the rollover case.
+ std::string_view before_dot2 = std::string_view(last).substr(0, dot2);
+ std::string before_dot1 = first.substr(0, dot1);
+ if (before_dot1 == before_dot2) {
+ return true;
+ }
+ if (!add_decimal_string(before_dot1, 1)) return false;
+ if (before_dot1 == before_dot2) {
+ return true;
+ }
+ }
+ return false;
+}
+
+LogManager::omap_rm_keys_ret
+LogManager::omap_rm_keys(
+ omap_root_t& log_root,
+ Transaction& t,
+ std::set<std::string>& keys)
+{
+ LOG_PREFIX(LogManager::omap_rm_keys);
+ DEBUGT("key size={}", t, keys.size());
+ assert(log_root.get_type() == omap_type_t::LOG);
+
+ std::set<std::string> dup_keys;
+ auto begin = keys.lower_bound("dup_");
+ auto end = keys.lower_bound("dup`");
+ while (begin != end) {
+ auto nh = keys.extract(begin++);
+ dup_keys.insert(std::move(nh));
+ }
+
+
+ // Deletion of pg_log_entry_t entries is performed by omap_rm_keys using a set.
+ // For example, omap_rm_keys might be called with a set containing
+ // pg_log_entry_t entries ranging from 0011.0001 to 0011.0010.
+ // In this case, calling omap_rm_key individually for each entry is inefficient,
+ // because each call triggers a traversal of the entire list.
+ auto remove_key_set = [&](auto& key_set, laddr_t addr) -> omap_rm_key_ret {
+ if (key_set.empty())
+ co_return;
+
+ bool continuous = is_continuous_fixed_width(key_set);
+ if (continuous) {
+ // fast path
+ co_await remove_kvs(
+ t, addr,
+ *key_set.begin(),
+ *key_set.rbegin(),
+ nullptr);
+ } else {
+ for (auto& p : key_set) {
+ co_await remove_kv(t, log_root.addr, p, nullptr);
+ }
+ }
+ };
+ co_await remove_key_set(keys, log_root.addr);
+ co_await remove_key_set(dup_keys,
+ co_await get_dup_addr_from_root(t, log_root.addr));
+ co_return;
+}
+
LogManager::omap_rm_key_range_ret
LogManager::omap_rm_key_range(
omap_root_t &log_root,
std::move(manager),
std::move(root),
std::move(keys),
- [&t, &onode, type]
+ [&t, &onode]
(auto &manager, auto &root, auto &keys)
{
- return trans_intr::do_for_each(
- keys.begin(),
- keys.end(),
- [&manager, &t, &root, FNAME, type](auto &p)
- {
- DEBUGT("{} remove key={} ...", t, type, p);
- return manager.omap_rm_key(root, t, p);
- }).si_then([&t, &root, &onode] {
+ return manager.omap_rm_keys(root, t, keys
+ ).si_then([&t, &root, &onode] {
if (root.must_update()) {
omaptree_update_root(t, root, onode);
}