void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
{
auto cf = db->get_cf_handle(prefix);
- if (cf) {
- string endprefix("\xff\xff\xff\xff"); // FIXME: this is cheating...
- bat.DeleteRange(cf, string(), endprefix);
- } else {
- string endprefix = prefix;
- endprefix.push_back('\x01');
- bat.DeleteRange(db->default_cf,
- combine_strings(prefix, string()),
- combine_strings(endprefix, string()));
+ uint64_t cnt = db->delete_range_threshold;
+ bat.SetSavePoint();
+ auto it = db->get_iterator(prefix);
+ for (it->seek_to_first(); it->valid(); it->next()) {
+ if (!cnt) {
+ bat.RollbackToSavePoint();
+ if (cf) {
+ string endprefix = "\xff\xff\xff\xff"; // FIXME: this is cheating...
+ bat.DeleteRange(cf, string(), endprefix);
+ } else {
+ string endprefix = prefix;
+ endprefix.push_back('\x01');
+ bat.DeleteRange(db->default_cf,
+ combine_strings(prefix, string()),
+ combine_strings(endprefix, string()));
+ }
+ return;
+ }
+ if (cf) {
+ bat.Delete(cf, rocksdb::Slice(it->key()));
+ } else {
+ bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
+ }
+ --cnt;
}
+ bat.PopSavePoint();
}
void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
const string &end)
{
auto cf = db->get_cf_handle(prefix);
- if (cf) {
- bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
- } else {
- bat.DeleteRange(
- db->default_cf,
- rocksdb::Slice(combine_strings(prefix, start)),
- rocksdb::Slice(combine_strings(prefix, end)));
+
+ uint64_t cnt = db->delete_range_threshold;
+ auto it = db->get_iterator(prefix);
+ bat.SetSavePoint();
+ it->lower_bound(start);
+ while (it->valid()) {
+ if (it->key() >= end) {
+ break;
+ }
+ if (!cnt) {
+ bat.RollbackToSavePoint();
+ if (cf) {
+ bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
+ } else {
+ bat.DeleteRange(db->default_cf,
+ rocksdb::Slice(combine_strings(prefix, start)),
+ rocksdb::Slice(combine_strings(prefix, end)));
+ }
+ return;
+ }
+ if (cf) {
+ bat.Delete(cf, rocksdb::Slice(it->key()));
+ } else {
+ bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
+ }
+ it->next();
+ --cnt;
}
+ bat.PopSavePoint();
}
void RocksDBStore::RocksDBTransactionImpl::merge(