From: Adam Kupczyk Date: Tue, 1 Jul 2025 11:30:59 +0000 (+0000) Subject: kv/KeyValueDB: New utility function util_divide_key_range X-Git-Tag: v21.0.1~9^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=632dc7a16905a11241fb68e3606fac3e0f48073f;p=ceph.git kv/KeyValueDB: New utility function util_divide_key_range New function splits provided range into smaller chunks. Declared in KeyValueDB, but implemented only for RocksDBStore. Useful for splitting large datasets for multiple threads to iterate in parallel. Signed-off-by: Adam Kupczyk --- diff --git a/src/kv/KeyValueDB.h b/src/kv/KeyValueDB.h index 0583107cf50..2d861eda82d 100644 --- a/src/kv/KeyValueDB.h +++ b/src/kv/KeyValueDB.h @@ -485,7 +485,25 @@ protected: /// List of matching prefixes/ColumnFamilies and merge operators std::vector > > merge_ops; - +public: + struct keyrange_t { + std::string first_key; // is in the range if exists + std::string upper_bound; // is not in the range even if exists + }; + /// splits a key range into multiple chunks of more or less equal size + virtual void util_divide_key_range( + const std::string& prefix, // table to operate on + const std::string& starting_key, // included if exists + const std::string& guardrail_key, // excluded if exists + uint64_t chunk_count, // desired chunk count, but fewer can happen + uint64_t min_chunk_size, // do not produce chunk smaller than this + float accepted_variance, // +/- fluctuation of produced chunk size, + // smaller value requires more work, use 0.1 ? + std::vector& chunks){ // out: chunks + chunks.clear(); + chunks.emplace_back(starting_key, guardrail_key); + return; + } }; #endif diff --git a/src/kv/RocksDBStore.cc b/src/kv/RocksDBStore.cc index a51968b7036..8f82ba4d164 100644 --- a/src/kv/RocksDBStore.cc +++ b/src/kv/RocksDBStore.cc @@ -3648,3 +3648,184 @@ bool RocksDBStore::get_sharding(std::string& sharding) { } return result; } + +// Find a key that is lexicographically between low and high. +// Try to select "midpoint". +// If high is a direct successor to low, return "". +static string key_between(const string& low, const string& high) +{ + string result; + ceph_assert(low.compare(high) < 0); + size_t same = 0; + while (same < std::min(low.length(), high.length()) && + low[same] == high[same]) { + same++; + } + if (same == low.length()) { + // + size_t i = same; + while (i < high.length() && high[i] == '\0') { + i++; + } + if (i == high.length()) { + // special case that "high"="len00..000"; halfway formula does not work + if (i == same + 1) { + // just "high" = "len0", no key in-between + return string(); + } else { + // add half zeros + result = low; + result.append(string((same + 1 - i) / 2, '\0')); + return result; + } + } + } + const std::string &shorter = low.length() < high.length() ? low : high; + const std::string &longer = low.length() < high.length() ? high : low; + + result = shorter; + result.resize(longer.length() + 1); + uint16_t carry = 0; + // "+" + for (size_t i = longer.length() - 1; i + 1 > same; i--) { + uint8_t a = i < shorter.length() ? shorter[i] : 0; + uint8_t b = longer[i]; + uint16_t v = ((uint16_t)a + (uint16_t)b + carry); + carry = v >> 8; + result[i + 1] = v; + } + result[same] = carry; + // ">>1" + for (size_t i = same; i < longer.length(); i++) { + uint16_t v = + ((uint16_t)(uint8_t)result[i] << 8) | (uint16_t)(uint8_t)result[i + 1]; + result[i] = v >> 1; + } + result[longer.length()] = (uint8_t)result[longer.length()] >> 7; + return result; +}; + +void RocksDBStore::util_divide_key_range( + const string& prefix, + const string& starting_key, //included if exists + const string& guardrail_key, //excluded if exists + uint64_t chunk_count, + uint64_t min_chunk_size, + float accepted_variance, + vector& chunks) +{ + dout(10) << __func__ << " chunks=" << chunk_count + << " start=" << pretty_binary_string(starting_key) + << " end=" << pretty_binary_string(guardrail_key) << dendl; + chunks.clear(); + map cs_map; + string key_from, key_to; + auto db_it = get_iterator(prefix); + db_it->lower_bound(starting_key); + if (!db_it) return; //empty range + key_from = db_it->key(); + db_it->lower_bound(guardrail_key); + if (!db_it->valid()) { + db_it->seek_to_last(); + ceph_assert(db_it->valid()); + key_to = db_it->key(); + key_to.push_back('\0'); + } else { + key_to = db_it->key(); + if (key_to <= key_from) return; + } + uint64_t full_size = estimate_range_size(prefix, key_from, key_to); + cs_map[0] = key_from; + cs_map[full_size] = key_to; + uint64_t target_chunk_size = full_size / chunk_count; + uint64_t chunk_min = target_chunk_size * (1 - accepted_variance); + uint64_t chunk_max = target_chunk_size * (1 + accepted_variance); + if (full_size <= chunk_max) { + chunks.emplace_back(key_from, key_to); + return; + } + if (target_chunk_size < chunk_min) { + chunk_count = std::min(chunk_count, full_size / chunk_min + 1); + target_chunk_size = full_size / chunk_count; + chunk_min = target_chunk_size * (1 - accepted_variance); + chunk_max = target_chunk_size * (1 + accepted_variance); + } + dout(10) << __func__ << " chunks=" << chunk_count + << " key_from=" << pretty_binary_string(key_from) + << " key_to=" << pretty_binary_string(key_to) << dendl; + + ceph_assert(chunk_count >= 2); + dout(20) << "target_chunk_size=" << target_chunk_size + << " chunk_min=" << chunk_min << " chunk_max=" << chunk_max << dendl; + // Algorithm idea: + // Have a mapping MAP with elements: estimated_db_size[key_from .. x] -> x. + // Keep bisecting [key_from .. key_to]. + // When MAP[last] - MAP[current_candidate] is in acceptable range for chunk size + // emit the range and move forward. + auto it = cs_map.begin(); + uint64_t cs_anchor = it->first; + string cs_key = it->second; + uint32_t bisect_actions = 0; + while(true) { + ceph_assert(it != cs_map.end()); + it++; + ceph_assert(cs_map.end() != it); + dout(20) << "trying " << it->first << " " << pretty_binary_string(it->second) << dendl; + if (it->first - cs_anchor > chunk_max) { + if (bisect_actions == 100) { + chunks.clear(); + chunks.emplace_back(key_from, key_to); + return; + } + bisect_actions++; + // it is too far, need to roll back and divide + auto key_e = it->second; + it--; + auto key_b = it->second; + auto imagined_key = key_between(key_b, key_e); + dout(20) << pretty_binary_string(key_b) << "..." << pretty_binary_string(key_e) + << "-> midpoint=" << pretty_binary_string(imagined_key) << dendl; + ceph_assert( key_b < imagined_key); + ceph_assert(imagined_key < key_e); + db_it->upper_bound(imagined_key); + ceph_assert(db_it->valid()); + auto real_key = db_it->key(); + if (real_key == key_e) { + db_it->prev(); + real_key = db_it->key(); + } + uint64_t cs_size = estimate_range_size(prefix, key_from, real_key); + if (cs_size > it->first) { + cs_map[cs_size] = real_key; + dout(20) << "newpoint " << cs_size << " " << pretty_binary_string(real_key) << dendl; + ceph_assert(key_b < real_key); + ceph_assert(real_key <= key_e); + continue; + } else { + // this seems to be limit for precision, no reason to attempt biseciton further + // fell out and emit region + } + } else if (it->first - cs_anchor < chunk_min) { + continue; + } + + bisect_actions = 0; + // we are satisfied enough + chunks.emplace_back(cs_key, it->second); + dout(10) << "produced chunk size=" << it->first - cs_anchor << " " + << pretty_binary_string(cs_key) << " " << pretty_binary_string(it->second) << dendl; + if (chunks.size() == chunk_count - 1) { + chunks.emplace_back(it->second, key_to); + dout(10) << "produced chunk size=" << full_size - it->first << " " + << pretty_binary_string(it->second) << " " << pretty_binary_string(key_to) << dendl; + break; + } + cs_anchor = it->first; + cs_key = it->second; + target_chunk_size = (full_size - cs_anchor) / (chunk_count - chunks.size()); + chunk_min = target_chunk_size * (1 - accepted_variance); + chunk_max = target_chunk_size * (1 + accepted_variance); + dout(20) << "target_chunk_size=" << target_chunk_size + << " chunk_min=" << chunk_min << " chunk_max=" << chunk_max << dendl; + } +} diff --git a/src/kv/RocksDBStore.h b/src/kv/RocksDBStore.h index a32cc71b0c8..e0ea9e07225 100644 --- a/src/kv/RocksDBStore.h +++ b/src/kv/RocksDBStore.h @@ -566,7 +566,15 @@ public: }; int reshard(const std::string& new_sharding, const resharding_ctrl* ctrl = nullptr); bool get_sharding(std::string& sharding); - + void util_divide_key_range( + const std::string& prefix, // table to operate on + const std::string& starting_key, // included if exists + const std::string& guardrail_key, // excluded if exists + uint64_t chunk_count, // desired chunk count, but fewer can happen + uint64_t min_chunk_size, // do not produce chunk smaller than this + float accepted_variance, // +/- fluctuation of produced chunk size, + // smaller value requires more work, use 0.1 ? + std::vector& chunks) override; }; #endif