]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kv/KeyValueDB: New utility function util_divide_key_range
authorAdam Kupczyk <akupczyk@ibm.com>
Tue, 1 Jul 2025 11:30:59 +0000 (11:30 +0000)
committerAdam Kupczyk <akupczyk@ibm.com>
Mon, 1 Jun 2026 16:14:02 +0000 (16:14 +0000)
New function splits provided range into smaller chunks.
Declared in KeyValueDB, but implemented only for RocksDBStore.
Useful for splitting large datasets for multiple threads to
iterate in parallel.

Signed-off-by: Adam Kupczyk <akupczyk@ibm.com>
src/kv/KeyValueDB.h
src/kv/RocksDBStore.cc
src/kv/RocksDBStore.h

index 0583107cf5032e6ab227f95200f3841fac0ed0c2..2d861eda82d1d0b60f50c90a4be650b7bdb3ae7b 100644 (file)
@@ -485,7 +485,25 @@ protected:
   /// List of matching prefixes/ColumnFamilies and merge operators
   std::vector<std::pair<std::string,
                        std::shared_ptr<MergeOperator> > > merge_ops;
-
+public:
+  struct keyrange_t {
+    std::string first_key;    // is in the range if exists
+    std::string upper_bound;  // is not in the range even if exists
+  };
+  /// splits a key range into multiple chunks of more or less equal size
+  virtual void util_divide_key_range(
+    const std::string& prefix,        // table to operate on
+    const std::string& starting_key,  // included if exists
+    const std::string& guardrail_key, // excluded if exists
+    uint64_t chunk_count,             // desired chunk count, but fewer can happen
+    uint64_t min_chunk_size,          // do not produce chunk smaller than this
+    float accepted_variance,          // +/- fluctuation of produced chunk size,
+                                      // smaller value requires more work, use 0.1 ?
+    std::vector<keyrange_t>& chunks){ // out: chunks
+      chunks.clear();
+      chunks.emplace_back(starting_key, guardrail_key);
+      return;
+    }
 };
 
 #endif
index a51968b7036aea9829b0f3a8fb85bb15fd9b148a..8f82ba4d164baae8ef0428ac21fb2c17944663f3 100644 (file)
@@ -3648,3 +3648,184 @@ bool RocksDBStore::get_sharding(std::string& sharding) {
   }
   return result;
 }
+
+// Find a key that is lexicographically between low and high.
+// Try to select "midpoint".
+// If high is a direct successor to low, return "".
+static string key_between(const string& low, const string& high)
+{
+  string result;
+  ceph_assert(low.compare(high) < 0);
+  size_t same = 0;
+  while (same < std::min(low.length(), high.length()) &&
+         low[same] == high[same]) {
+    same++;
+  }
+  if (same == low.length()) {
+    //
+    size_t i = same;
+    while (i < high.length() && high[i] == '\0') {
+      i++;
+    }
+    if (i == high.length()) {
+      // special case that "high"="len00..000"; halfway formula does not work
+      if (i == same + 1) {
+        // just "high" = "len0", no key in-between
+        return string();
+      } else {
+        // add half zeros
+        result = low;
+        result.append(string((same + 1 - i) / 2, '\0'));
+        return result;
+      }
+    }
+  }
+  const std::string &shorter = low.length() < high.length() ? low : high;
+  const std::string &longer = low.length() < high.length() ? high : low;
+
+  result = shorter;
+  result.resize(longer.length() + 1);
+  uint16_t carry = 0;
+  // "+"
+  for (size_t i = longer.length() - 1; i + 1 > same; i--) {
+    uint8_t a = i < shorter.length() ? shorter[i] : 0;
+    uint8_t b = longer[i];
+    uint16_t v = ((uint16_t)a + (uint16_t)b + carry);
+    carry = v >> 8;
+    result[i + 1] = v;
+  }
+  result[same] = carry;
+  //  ">>1"
+  for (size_t i = same; i < longer.length(); i++) {
+    uint16_t v =
+        ((uint16_t)(uint8_t)result[i] << 8) | (uint16_t)(uint8_t)result[i + 1];
+    result[i] = v >> 1;
+  }
+  result[longer.length()] = (uint8_t)result[longer.length()] >> 7;
+  return result;
+};
+
+void RocksDBStore::util_divide_key_range(
+  const string& prefix,
+  const string& starting_key,   //included if exists
+  const string& guardrail_key,  //excluded if exists
+  uint64_t chunk_count,
+  uint64_t min_chunk_size,
+  float accepted_variance,
+  vector<keyrange_t>& chunks)
+{
+  dout(10) << __func__ << " chunks=" << chunk_count
+    << " start=" << pretty_binary_string(starting_key)
+    << " end=" << pretty_binary_string(guardrail_key) << dendl;
+  chunks.clear();
+  map<uint64_t, string> cs_map;
+  string key_from, key_to;
+  auto db_it = get_iterator(prefix);
+  db_it->lower_bound(starting_key);
+  if (!db_it) return; //empty range
+  key_from = db_it->key();
+  db_it->lower_bound(guardrail_key);
+  if (!db_it->valid()) {
+    db_it->seek_to_last();
+    ceph_assert(db_it->valid());
+    key_to = db_it->key();
+    key_to.push_back('\0');
+  } else {
+    key_to = db_it->key();
+    if (key_to <= key_from) return;
+  }
+  uint64_t full_size = estimate_range_size(prefix, key_from, key_to);
+  cs_map[0] = key_from;
+  cs_map[full_size] = key_to;
+  uint64_t target_chunk_size = full_size / chunk_count;
+  uint64_t chunk_min = target_chunk_size * (1 - accepted_variance);
+  uint64_t chunk_max = target_chunk_size * (1 + accepted_variance);
+  if (full_size <= chunk_max) {
+    chunks.emplace_back(key_from, key_to);
+    return;
+  }
+  if (target_chunk_size < chunk_min) {
+    chunk_count = std::min<uint64_t>(chunk_count, full_size / chunk_min + 1);
+    target_chunk_size = full_size / chunk_count;
+    chunk_min = target_chunk_size * (1 - accepted_variance);
+    chunk_max = target_chunk_size * (1 + accepted_variance);
+  }
+  dout(10) << __func__ << " chunks=" << chunk_count
+    << " key_from=" << pretty_binary_string(key_from)
+    << " key_to=" << pretty_binary_string(key_to) << dendl;
+
+  ceph_assert(chunk_count >= 2);
+  dout(20) << "target_chunk_size=" << target_chunk_size
+    << " chunk_min=" << chunk_min << " chunk_max=" << chunk_max << dendl;
+  // Algorithm idea:
+  // Have a mapping MAP with elements: estimated_db_size[key_from .. x] -> x.
+  // Keep bisecting [key_from .. key_to].
+  // When MAP[last] - MAP[current_candidate] is in acceptable range for chunk size
+  //   emit the range and move forward.
+  auto it = cs_map.begin();
+  uint64_t cs_anchor = it->first;
+  string cs_key = it->second;
+  uint32_t bisect_actions = 0;
+  while(true) {
+    ceph_assert(it != cs_map.end());
+    it++;
+    ceph_assert(cs_map.end() != it);
+    dout(20) << "trying   " << it->first << " " << pretty_binary_string(it->second) << dendl;
+    if (it->first - cs_anchor > chunk_max) {
+      if (bisect_actions == 100) {
+        chunks.clear();
+        chunks.emplace_back(key_from, key_to);
+        return;
+      }
+      bisect_actions++;
+      // it is too far, need to roll back and divide
+      auto key_e = it->second;
+      it--;
+      auto key_b = it->second;
+      auto imagined_key = key_between(key_b, key_e);
+      dout(20) << pretty_binary_string(key_b) << "..." << pretty_binary_string(key_e)
+        << "-> midpoint=" << pretty_binary_string(imagined_key) << dendl;
+      ceph_assert( key_b < imagined_key);
+      ceph_assert(imagined_key < key_e);
+      db_it->upper_bound(imagined_key);
+      ceph_assert(db_it->valid());
+      auto real_key = db_it->key();
+      if (real_key == key_e) {
+        db_it->prev();
+        real_key = db_it->key();
+      }
+      uint64_t cs_size = estimate_range_size(prefix, key_from, real_key);
+      if (cs_size > it->first) {
+        cs_map[cs_size] = real_key;
+        dout(20) << "newpoint " << cs_size << " " << pretty_binary_string(real_key) << dendl;
+        ceph_assert(key_b < real_key);
+        ceph_assert(real_key <= key_e);
+        continue;
+      } else {
+        // this seems to be limit for precision, no reason to attempt biseciton further
+        // fell out and emit region
+      }
+    } else if (it->first - cs_anchor < chunk_min) {
+      continue;
+    }
+
+    bisect_actions = 0;
+    // we are satisfied enough
+    chunks.emplace_back(cs_key, it->second);
+    dout(10) << "produced chunk size=" << it->first - cs_anchor << " "
+      << pretty_binary_string(cs_key) << " " << pretty_binary_string(it->second) << dendl;
+    if (chunks.size() == chunk_count - 1) {
+      chunks.emplace_back(it->second, key_to);
+      dout(10) << "produced chunk size=" << full_size - it->first << " "
+        << pretty_binary_string(it->second) << " " << pretty_binary_string(key_to) << dendl;
+      break;
+    }
+    cs_anchor = it->first;
+    cs_key = it->second;
+    target_chunk_size = (full_size - cs_anchor) / (chunk_count - chunks.size());
+    chunk_min = target_chunk_size * (1 - accepted_variance);
+    chunk_max = target_chunk_size * (1 + accepted_variance);
+    dout(20) << "target_chunk_size=" << target_chunk_size
+      << " chunk_min=" << chunk_min << " chunk_max=" << chunk_max << dendl;
+  }
+}
index a32cc71b0c8d846de44244116d07db4882aedc5f..e0ea9e07225e6e99f1b0cedbce925e149bca748d 100644 (file)
@@ -566,7 +566,15 @@ public:
   };
   int reshard(const std::string& new_sharding, const resharding_ctrl* ctrl = nullptr);
   bool get_sharding(std::string& sharding);
-
+  void util_divide_key_range(
+    const std::string& prefix,        // table to operate on
+    const std::string& starting_key,  // included if exists
+    const std::string& guardrail_key, // excluded if exists
+    uint64_t chunk_count,             // desired chunk count, but fewer can happen
+    uint64_t min_chunk_size,          // do not produce chunk smaller than this
+    float accepted_variance,          // +/- fluctuation of produced chunk size,
+                                      // smaller value requires more work, use 0.1 ?
+    std::vector<keyrange_t>& chunks) override;
 };
 
 #endif