table/get_context.cc
table/iterator.cc
table/merging_iterator.cc
+ table/compaction_merging_iterator.cc
table/meta_blocks.cc
table/persistent_cache_helper.cc
table/plain/plain_table_bloom.cc
### Behavior changes
* Make best-efforts recovery verify SST unique ID before Version construction (#10962)
* Introduce `epoch_number` and sort L0 files by `epoch_number` instead of `largest_seqno`. `epoch_number` represents the order of a file being flushed or ingested/imported. Compaction output file will be assigned with the minimum `epoch_number` among input files'. For L0, larger `epoch_number` indicates newer L0 file.
+* Compaction output file cutting logic now considers range tombstone start keys. For example, SST partitioner now may receive ParitionRequest for range tombstone start keys.
### Bug Fixes
* Fixed a regression in iterator where range tombstones after `iterate_upper_bound` is processed.
"table/block_based/reader_common.cc",
"table/block_based/uncompression_dict_reader.cc",
"table/block_fetcher.cc",
+ "table/compaction_merging_iterator.cc",
"table/cuckoo/cuckoo_table_builder.cc",
"table/cuckoo/cuckoo_table_factory.cc",
"table/cuckoo/cuckoo_table_reader.cc",
"table/block_based/reader_common.cc",
"table/block_based/uncompression_dict_reader.cc",
"table/block_fetcher.cc",
+ "table/compaction_merging_iterator.cc",
"table/cuckoo/cuckoo_table_builder.cc",
"table/cuckoo/cuckoo_table_factory.cc",
"table/cuckoo/cuckoo_table_reader.cc",
return iter_->GetProperty(prop_name, prop);
}
+ bool IsDeleteRangeSentinelKey() const override {
+ return iter_->IsDeleteRangeSentinelKey();
+ }
+
private:
void UpdateAndCountBlobIfNeeded() {
assert(!iter_->Valid() || iter_->status().ok());
if (!iter_->Valid()) {
status_ = iter_->status();
return;
+ } else if (iter_->IsDeleteRangeSentinelKey()) {
+ // CompactionMergingIterator emits range tombstones, and range tombstone
+ // keys can be truncated at file boundaries. This means the range
+ // tombstone keys can have op_type kTypeBlobIndex.
+ // This could crash the ProcessInFlow() call below since
+ // value is empty for these keys.
+ return;
}
TEST_SYNC_POINT(
return iter_->GetProperty(prop_name, prop);
}
+ bool IsDeleteRangeSentinelKey() const override {
+ assert(valid_);
+ return iter_->IsDeleteRangeSentinelKey();
+ }
+
private:
void UpdateValid() {
assert(!iter_->Valid() || iter_->status().ok());
namespace ROCKSDB_NAMESPACE {
-const uint64_t kRangeTombstoneSentinel =
- PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);
-
int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
const InternalKey& b) {
auto c = user_cmp->CompareWithoutTimestamp(a.user_key(), b.user_key());
// The file contains class Compaction, as well as some helper functions
// and data structures used by the class.
+const uint64_t kRangeTombstoneSentinel =
+ PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);
// Utility for comparing sstable boundary keys. Returns -1 if either a or b is
// null which provides the property that a==null indicates a key that is less
// than any key and b==null indicates a key that is greater than any key. Note
value_ = input_.value();
blob_value_.Reset();
iter_stats_.num_input_records++;
+ is_range_del_ = input_.IsDeleteRangeSentinelKey();
Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
if (!pik_status.ok()) {
break;
}
TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
-
+ if (is_range_del_) {
+ validity_info_.SetValid(kRangeDeletion);
+ break;
+ }
// Update input statistics
if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion ||
ikey_.type == kTypeDeletionWithTimestamp) {
ParsedInternalKey next_ikey;
AdvanceInputIter();
+ while (input_.Valid() && input_.IsDeleteRangeSentinelKey() &&
+ ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
+ .ok() &&
+ cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
+ // skip range tombstone start keys with the same user key
+ // since they are not "real" point keys.
+ AdvanceInputIter();
+ }
// Check whether the next key exists, is not corrupt, and is the same key
// as the single delete.
ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
.ok() &&
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
+ assert(!input_.IsDeleteRangeSentinelKey());
#ifndef NDEBUG
const Compaction* c =
compaction_ ? compaction_->real_compaction() : nullptr;
// Note that a deletion marker of type kTypeDeletionWithTimestamp will be
// considered to have a different user key unless the timestamp is older
// than *full_history_ts_low_.
+ //
+ // Range tombstone start keys are skipped as they are not "real" keys.
while (!IsPausingManualCompaction() && !IsShuttingDown() &&
input_.Valid() &&
(ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
.ok()) &&
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) &&
- (prev_snapshot == 0 ||
+ (prev_snapshot == 0 || input_.IsDeleteRangeSentinelKey() ||
DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot))) {
AdvanceInputIter();
}
void CompactionIterator::PrepareOutput() {
if (Valid()) {
- if (ikey_.type == kTypeValue) {
- ExtractLargeValueIfNeeded();
- } else if (ikey_.type == kTypeBlobIndex) {
- GarbageCollectBlobIfNeeded();
+ if (LIKELY(!is_range_del_)) {
+ if (ikey_.type == kTypeValue) {
+ ExtractLargeValueIfNeeded();
+ } else if (ikey_.type == kTypeBlobIndex) {
+ GarbageCollectBlobIfNeeded();
+ }
}
if (compaction_ != nullptr && compaction_->SupportsPerKeyPlacement()) {
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
ikey_.type != kTypeMerge && current_key_committed_ &&
!output_to_penultimate_level_ &&
- ikey_.sequence < preserve_time_min_seqno_) {
+ ikey_.sequence < preserve_time_min_seqno_ && !is_range_del_) {
if (ikey_.type == kTypeDeletion ||
(ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) {
ROCKS_LOG_FATAL(
void SeekToLast() override { assert(false); }
uint64_t num_itered() const { return num_itered_; }
+ bool IsDeleteRangeSentinelKey() const override {
+ assert(Valid());
+ return inner_iter_->IsDeleteRangeSentinelKey();
+ }
private:
InternalKeyComparator icmp_;
const Status& status() const { return status_; }
const ParsedInternalKey& ikey() const { return ikey_; }
inline bool Valid() const { return validity_info_.IsValid(); }
- const Slice& user_key() const { return current_user_key_; }
+ const Slice& user_key() const {
+ if (UNLIKELY(is_range_del_)) {
+ return ikey_.user_key;
+ }
+ return current_user_key_;
+ }
const CompactionIterationStats& iter_stats() const { return iter_stats_; }
uint64_t num_input_entry_scanned() const { return input_.num_itered(); }
// If the current key should be placed on penultimate level, only valid if
}
Status InputStatus() const { return input_.status(); }
+ bool IsDeleteRangeSentinelKey() const { return is_range_del_; }
+
private:
// Processes the input stream to find the next output
void NextFromInput();
kKeepSD = 8,
kKeepDel = 9,
kNewUserKey = 10,
+ kRangeDeletion = 11,
};
struct ValidityInfo {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
return manual_compaction_canceled_.load(std::memory_order_relaxed);
}
+
+ // Stores whether the current compaction iterator output
+ // is a range tombstone start key.
+ bool is_range_del_{false};
};
inline bool CompactionIterator::DefinitelyInSnapshot(SequenceNumber seq,
while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
// returns true.
-
assert(!end.has_value() || cfd->user_comparator()->Compare(
c_iter->user_key(), end.value()) < 0);
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func) {
Status s;
+ bool is_range_del = c_iter.IsDeleteRangeSentinelKey();
+ if (is_range_del && compaction_->bottommost_level()) {
+ // We don't consider range tombstone for bottommost level since:
+ // 1. there is no grandparent and hence no overlap to consider
+ // 2. range tombstone may be dropped at bottommost level.
+ return s;
+ }
const Slice& key = c_iter.key();
-
if (ShouldStopBefore(c_iter) && HasBuilder()) {
s = close_file_func(*this, c_iter.InputStatus(), key);
if (!s.ok()) {
grandparent_boundary_switched_num_ = 0;
grandparent_overlapped_bytes_ =
GetCurrentKeyGrandparentOverlappedBytes(key);
+ if (UNLIKELY(is_range_del)) {
+ // lower bound for this new output file, this is needed as the lower bound
+ // does not come from the smallest point key in this case.
+ range_tombstone_lower_bound_.DecodeFrom(key);
+ } else {
+ range_tombstone_lower_bound_.Clear();
+ }
}
// Open output file if necessary
}
}
+ // c_iter may emit range deletion keys, so update `last_key_for_partitioner_`
+ // here before returning below when `is_range_del` is true
+ if (partitioner_) {
+ last_key_for_partitioner_.assign(c_iter.user_key().data_,
+ c_iter.user_key().size_);
+ }
+
+ if (UNLIKELY(is_range_del)) {
+ return s;
+ }
+
assert(builder_ != nullptr);
const Slice& value = c_iter.value();
s = current_output().validator.Add(key, value);
s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence,
ikey.type);
- if (partitioner_) {
- last_key_for_partitioner_.assign(c_iter.user_key().data_,
- c_iter.user_key().size_);
- }
-
return s;
}
std::string smallest_user_key;
const Slice *lower_bound, *upper_bound;
bool lower_bound_from_sub_compact = false;
-
+ bool lower_bound_from_range_tombstone = false;
size_t output_size = outputs_.size();
if (output_size == 1) {
// For the first output table, include range tombstones before the min
// key but after the subcompaction boundary.
lower_bound = comp_start_user_key;
lower_bound_from_sub_compact = true;
+ } else if (range_tombstone_lower_bound_.size() > 0) {
+ assert(meta.smallest.size() == 0 ||
+ icmp.Compare(range_tombstone_lower_bound_, meta.smallest) <= 0);
+ lower_bound_guard = range_tombstone_lower_bound_.user_key();
+ lower_bound = &lower_bound_guard;
+ lower_bound_from_range_tombstone = true;
} else if (meta.smallest.size() > 0) {
// For subsequent output tables, only include range tombstones from min
// key onwards since the previous file was extended to contain range
smallest_candidate =
InternalKey(*lower_bound, tombstone.seq_, kTypeRangeDeletion);
}
+ } else if (lower_bound_from_range_tombstone) {
+ // Range tombstone keys can be truncated at file boundaries of the files
+ // that contain them.
+ //
+ // If this lower bound is from a range tombstone key that is not
+ // truncated, i.e., it was not truncated when reading from the input
+ // files, then its sequence number and `op_type` will be
+ // kMaxSequenceNumber and kTypeRangeDeletion (see
+ // TruncatedRangeDelIterator::start_key()). In this case, when this key
+ // was used as the upper bound to cut the previous compaction output
+ // file, the previous file's largest key could have the same value as
+ // this key (see the upperbound logic below). To guarantee
+ // non-overlapping ranges between output files, we use the range
+ // tombstone's actual sequence number (tombstone.seq_) for the lower
+ // bound of this file. If this range tombstone key is truncated, then
+ // the previous file's largest key will be smaller than this range
+ // tombstone key, so we can use it as the lower bound directly.
+ if (ExtractInternalKeyFooter(range_tombstone_lower_bound_.Encode()) ==
+ kRangeTombstoneSentinel) {
+ if (ts_sz) {
+ smallest_candidate =
+ InternalKey(range_tombstone_lower_bound_.user_key(),
+ tombstone.seq_, kTypeRangeDeletion, tombstone.ts_);
+ } else {
+ smallest_candidate =
+ InternalKey(range_tombstone_lower_bound_.user_key(),
+ tombstone.seq_, kTypeRangeDeletion);
+ }
+ } else {
+ assert(GetInternalKeySeqno(range_tombstone_lower_bound_.Encode()) <
+ kMaxSequenceNumber);
+ smallest_candidate = range_tombstone_lower_bound_;
+ }
} else {
smallest_candidate = InternalKey(*lower_bound, 0, kTypeRangeDeletion);
}
std::unique_ptr<SstPartitioner> partitioner_;
// A flag determines if this subcompaction has been split by the cursor
+ // for RoundRobin compaction
bool is_split_ = false;
// We also maintain the output split key for each subcompaction to avoid
// for the current output file, how many file boundaries has it crossed,
// basically number of files overlapped * 2
size_t grandparent_boundary_switched_num_ = 0;
+
+ // The smallest key of the current output file, this is set when current
+ // output file's smallest key is a range tombstone start key.
+ InternalKey range_tombstone_lower_bound_;
};
// helper struct to concatenate the last level and penultimate level outputs
// Assign range dels aggregator, for each range_del, it can only be assigned
// to one output level, for per_key_placement, it's going to be the
// penultimate level.
+ // TODO: This does not work for per_key_placement + user-defined timestamp +
+ // DeleteRange() combo. If user-defined timestamp is enabled,
+ // it is possible for a range tombstone to belong to bottommost level (
+ // seqno < earliest snapshot) without being dropped (garbage collection
+ // for user-defined timestamp).
void AssignRangeDelAggregator(
std::unique_ptr<CompactionRangeDelAggregator>&& range_del_agg) {
if (compaction->SupportsPerKeyPlacement()) {
ASSERT_EQ(1, num_range_deletions);
}
+// Test SST partitioner cut after every single key
+class SingleKeySstPartitioner : public SstPartitioner {
+ public:
+ const char* Name() const override { return "SingleKeySstPartitioner"; }
+
+ PartitionerResult ShouldPartition(
+ const PartitionerRequest& /*request*/) override {
+ return kRequired;
+ }
+
+ bool CanDoTrivialMove(const Slice& /*smallest_user_key*/,
+ const Slice& /*largest_user_key*/) override {
+ return false;
+ }
+};
+
+class SingleKeySstPartitionerFactory : public SstPartitionerFactory {
+ public:
+ static const char* kClassName() { return "SingleKeySstPartitionerFactory"; }
+ const char* Name() const override { return kClassName(); }
+
+ std::unique_ptr<SstPartitioner> CreatePartitioner(
+ const SstPartitioner::Context& /* context */) const override {
+ return std::unique_ptr<SstPartitioner>(new SingleKeySstPartitioner());
+ }
+};
+
+TEST_F(DBRangeDelTest, LevelCompactOutputCutAtRangeTombstoneForTtlFiles) {
+ Options options = CurrentOptions();
+ options.compression = kNoCompression;
+ options.compaction_pri = kMinOverlappingRatio;
+ options.disable_auto_compactions = true;
+ options.ttl = 24 * 60 * 60; // 24 hours
+ options.target_file_size_base = 8 << 10;
+ env_->SetMockSleep();
+ options.env = env_;
+ DestroyAndReopen(options);
+
+ Random rnd(301);
+ // Fill some data so that future compactions are not bottommost level
+ // compaction, and hence they would try cut around files for ttl
+ for (int i = 5; i < 10; ++i) {
+ ASSERT_OK(Put(Key(i), rnd.RandomString(1 << 10)));
+ }
+ ASSERT_OK(Flush());
+ MoveFilesToLevel(3);
+ ASSERT_EQ("0,0,0,1", FilesPerLevel());
+
+ for (int i = 5; i < 10; ++i) {
+ ASSERT_OK(Put(Key(i), rnd.RandomString(1 << 10)));
+ }
+ ASSERT_OK(Flush());
+ MoveFilesToLevel(1);
+ ASSERT_EQ("0,1,0,1", FilesPerLevel());
+
+ env_->MockSleepForSeconds(20 * 60 * 60);
+ ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
+ Key(11), Key(12)));
+ ASSERT_OK(Put(Key(0), rnd.RandomString(1 << 10)));
+ ASSERT_OK(Flush());
+ ASSERT_EQ("1,1,0,1", FilesPerLevel());
+ // L0 file is new, L1 and L3 file are old and qualified for TTL
+ env_->MockSleepForSeconds(10 * 60 * 60);
+ MoveFilesToLevel(1);
+ // L1 output should be cut into 3 files:
+ // File 0: Key(0)
+ // File 1: (qualified for TTL): Key(5) - Key(10)
+ // File 1: DeleteRange [11, 12)
+ ASSERT_EQ("0,3,0,1", FilesPerLevel());
+}
+
+TEST_F(DBRangeDelTest, CompactionEmitRangeTombstoneToSSTPartitioner) {
+ Options options = CurrentOptions();
+ auto factory = std::make_shared<SingleKeySstPartitionerFactory>();
+ options.sst_partitioner_factory = factory;
+ options.disable_auto_compactions = true;
+ DestroyAndReopen(options);
+
+ Random rnd(301);
+ // range deletion keys are not processed when compacting to bottommost level,
+ // so creating a file at older level to make the next compaction not
+ // bottommost level
+ ASSERT_OK(db_->Put(WriteOptions(), Key(4), rnd.RandomString(10)));
+ ASSERT_OK(Flush());
+ MoveFilesToLevel(5);
+
+ ASSERT_OK(db_->Put(WriteOptions(), Key(1), rnd.RandomString(10)));
+ ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(2),
+ Key(5)));
+ ASSERT_OK(Flush());
+ ASSERT_EQ(1, NumTableFilesAtLevel(0));
+ MoveFilesToLevel(1);
+ // SSTPartitioner decides to cut when range tombstone start key is passed to
+ // it Note that the range tombstone [2, 5) itself span multiple keys but we
+ // are not able to partition in between yet.
+ ASSERT_EQ(2, NumTableFilesAtLevel(1));
+}
+
+TEST_F(DBRangeDelTest, OversizeCompactionGapBetweenPointKeyAndTombstone) {
+ // L2 has two files
+ // L2_0: 0, 1, 2, 3, 4. L2_1: 5, 6, 7
+ // L0 has 0, [5, 6), 8
+ // max_compaction_bytes is less than the size of L2_0 and L2_1.
+ // When compacting L0 into L1, it should split into 3 files.
+ const int kNumPerFile = 4, kNumFiles = 2;
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ options.target_file_size_base = 9 * 1024;
+ options.max_compaction_bytes = 9 * 1024;
+ DestroyAndReopen(options);
+ Random rnd(301);
+ for (int i = 0; i < kNumFiles; ++i) {
+ std::vector<std::string> values;
+ for (int j = 0; j < kNumPerFile; j++) {
+ values.push_back(rnd.RandomString(3 << 10));
+ ASSERT_OK(Put(Key(i * kNumPerFile + j), values[j]));
+ }
+ }
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ ASSERT_EQ(1, NumTableFilesAtLevel(0));
+ MoveFilesToLevel(2);
+ ASSERT_EQ(2, NumTableFilesAtLevel(2));
+ ASSERT_OK(Put(Key(0), rnd.RandomString(1 << 10)));
+ ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(5),
+ Key(6)));
+ ASSERT_OK(Put(Key(8), rnd.RandomString(1 << 10)));
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ ASSERT_EQ(1, NumTableFilesAtLevel(0));
+
+ ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
+ true /* disallow_trivial_move */));
+ ASSERT_EQ(3, NumTableFilesAtLevel(1));
+}
+
+TEST_F(DBRangeDelTest, OversizeCompactionGapBetweenTombstone) {
+ // L2 has two files
+ // L2_0: 0, 1, 2, 3, 4. L2_1: 5, 6, 7
+ // L0 has two range tombstones [0, 1), [7, 8).
+ // max_compaction_bytes is less than the size of L2_0.
+ // When compacting L0 into L1, the two range tombstones should be
+ // split into two files.
+ const int kNumPerFile = 4, kNumFiles = 2;
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ options.target_file_size_base = 9 * 1024;
+ options.max_compaction_bytes = 9 * 1024;
+ DestroyAndReopen(options);
+ Random rnd(301);
+ for (int i = 0; i < kNumFiles; ++i) {
+ std::vector<std::string> values;
+ // Write 12K (4 values, each 3K)
+ for (int j = 0; j < kNumPerFile; j++) {
+ values.push_back(rnd.RandomString(3 << 10));
+ ASSERT_OK(Put(Key(i * kNumPerFile + j), values[j]));
+ }
+ }
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ ASSERT_EQ(1, NumTableFilesAtLevel(0));
+ MoveFilesToLevel(2);
+ ASSERT_EQ(2, NumTableFilesAtLevel(2));
+ ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0),
+ Key(1)));
+ ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(7),
+ Key(8)));
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ ASSERT_EQ(1, NumTableFilesAtLevel(0));
+
+ ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
+ true /* disallow_trivial_move */));
+ // This is L0 -> L1 compaction
+ // The two range tombstones are broken up into two output files
+ // to limit compaction size.
+ ASSERT_EQ(2, NumTableFilesAtLevel(1));
+}
+
+TEST_F(DBRangeDelTest, OversizeCompactionPointKeyWithinRangetombstone) {
+ // L2 has two files
+ // L2_0: 0, 1, 2, 3, 4. L2_1: 6, 7, 8
+ // L0 has [0, 9) and point key 5
+ // max_compaction_bytes is less than the size of L2_0.
+ // When compacting L0 into L1, the compaction should cut at point key 5.
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ options.target_file_size_base = 9 * 1024;
+ options.max_compaction_bytes = 9 * 1024;
+ DestroyAndReopen(options);
+ Random rnd(301);
+ for (int i = 0; i < 9; ++i) {
+ if (i == 5) {
+ ++i;
+ }
+ ASSERT_OK(Put(Key(i), rnd.RandomString(3 << 10)));
+ }
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ ASSERT_EQ(1, NumTableFilesAtLevel(0));
+ MoveFilesToLevel(2);
+ ASSERT_EQ(2, NumTableFilesAtLevel(2));
+ ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0),
+ Key(9)));
+ ASSERT_OK(Put(Key(5), rnd.RandomString(1 << 10)));
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ ASSERT_EQ(1, NumTableFilesAtLevel(0));
+ ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
+ true /* disallow_trivial_move */));
+ ASSERT_EQ(2, NumTableFilesAtLevel(1));
+}
+
TEST_F(DBRangeDelTest, OverlappedTombstones) {
const int kNumPerFile = 4, kNumFiles = 2;
Options options = CurrentOptions();
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.target_file_size_base = 2 * 1024;
+ options.level_compaction_dynamic_file_size = false;
DestroyAndReopen(options);
Random rnd(301);
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.target_file_size_base = 3 * 1024;
- options.max_compaction_bytes = 1024;
+ options.max_compaction_bytes = 2048;
DestroyAndReopen(options);
// L2
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.target_file_size_base = 3 * 1024;
- options.max_compaction_bytes = 1024;
+ options.max_compaction_bytes = 3 * 1024;
DestroyAndReopen(options);
// L2
bool IsValuePinned() const override { return input_->IsValuePinned(); }
+ bool IsDeleteRangeSentinelKey() const override {
+ return input_->IsDeleteRangeSentinelKey();
+ }
+
private:
InternalIterator* input_;
const std::string filter_ts_;
s = Status::ShutdownInProgress();
return s;
}
+ // Skip range tombstones emitted by the compaction iterator.
+ if (iter->IsDeleteRangeSentinelKey()) {
+ continue;
+ }
ParsedInternalKey ikey;
assert(keys_.size() == merge_context_.GetNumOperands());
#include "db/table_cache.h"
#include "db/version_builder.h"
#include "db/version_edit_handler.h"
+#include "table/compaction_merging_iterator.h"
+
#if USE_COROUTINES
#include "folly/experimental/coro/BlockingWait.h"
#include "folly/experimental/coro/Collect.h"
c->num_input_levels() - 1
: c->num_input_levels());
InternalIterator** list = new InternalIterator*[space];
+ // First item in the pair is a pointer to range tombstones.
+ // Second item is a pointer to a member of a LevelIterator,
+ // that will be initialized to where CompactionMergingIterator stores
+ // pointer to its range tombstones. This is used by LevelIterator
+ // to update pointer to range tombstones as it traverse different SST files.
+ std::vector<
+ std::pair<TruncatedRangeDelIterator*, TruncatedRangeDelIterator***>>
+ range_tombstones;
size_t num = 0;
for (size_t which = 0; which < c->num_input_levels(); which++) {
if (c->input_levels(which)->num_files != 0) {
end.value(), fmd.smallest.user_key()) < 0) {
continue;
}
-
+ TruncatedRangeDelIterator* range_tombstone_iter = nullptr;
list[num++] = cfd->table_cache()->NewIterator(
read_options, file_options_compactions,
cfd->internal_comparator(), fmd, range_del_agg,
MaxFileSizeForL0MetaPin(*c->mutable_cf_options()),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr,
- /*allow_unprepared_value=*/false);
+ /*allow_unprepared_value=*/false,
+ /*range_del_iter=*/&range_tombstone_iter);
+ range_tombstones.emplace_back(range_tombstone_iter, nullptr);
}
} else {
// Create concatenating iterator for the files from this level
+ TruncatedRangeDelIterator*** tombstone_iter_ptr = nullptr;
list[num++] = new LevelIterator(
cfd->table_cache(), read_options, file_options_compactions,
cfd->internal_comparator(), c->input_levels(which),
/*no per level latency histogram=*/nullptr,
TableReaderCaller::kCompaction, /*skip_filters=*/false,
/*level=*/static_cast<int>(c->level(which)), range_del_agg,
- c->boundaries(which));
+ c->boundaries(which), false, &tombstone_iter_ptr);
+ range_tombstones.emplace_back(nullptr, tombstone_iter_ptr);
}
}
}
assert(num <= space);
- InternalIterator* result =
- NewMergingIterator(&c->column_family_data()->internal_comparator(), list,
- static_cast<int>(num));
+ InternalIterator* result = NewCompactionMergingIterator(
+ &c->column_family_data()->internal_comparator(), list,
+ static_cast<int>(num), range_tombstones);
delete[] list;
return result;
}
table/get_context.cc \
table/iterator.cc \
table/merging_iterator.cc \
+ table/compaction_merging_iterator.cc \
table/meta_blocks.cc \
table/persistent_cache_helper.cc \
table/plain/plain_table_bloom.cc \
--- /dev/null
+// Copyright (c) Meta Platforms, Inc. and affiliates.
+//
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#include "table/compaction_merging_iterator.h"
+
+namespace ROCKSDB_NAMESPACE {
+void CompactionMergingIterator::SeekToFirst() {
+ minHeap_.clear();
+ status_ = Status::OK();
+ for (auto& child : children_) {
+ child.iter.SeekToFirst();
+ AddToMinHeapOrCheckStatus(&child);
+ }
+
+ for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
+ if (range_tombstone_iters_[i]) {
+ range_tombstone_iters_[i]->SeekToFirst();
+ InsertRangeTombstoneAtLevel(i);
+ }
+ }
+
+ FindNextVisibleKey();
+ current_ = CurrentForward();
+}
+
+void CompactionMergingIterator::Seek(const Slice& target) {
+ minHeap_.clear();
+ status_ = Status::OK();
+ for (auto& child : children_) {
+ child.iter.Seek(target);
+ AddToMinHeapOrCheckStatus(&child);
+ }
+
+ ParsedInternalKey pik;
+ ParseInternalKey(target, &pik, false /* log_err_key */)
+ .PermitUncheckedError();
+ for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
+ if (range_tombstone_iters_[i]) {
+ range_tombstone_iters_[i]->Seek(pik.user_key);
+ // For compaction, output keys should all be after seek target.
+ while (range_tombstone_iters_[i]->Valid() &&
+ comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) <
+ 0) {
+ range_tombstone_iters_[i]->Next();
+ }
+ InsertRangeTombstoneAtLevel(i);
+ }
+ }
+
+ FindNextVisibleKey();
+ current_ = CurrentForward();
+}
+
+void CompactionMergingIterator::Next() {
+ assert(Valid());
+ // For the heap modifications below to be correct, current_ must be the
+ // current top of the heap.
+ assert(current_ == CurrentForward());
+ // as the current points to the current record. move the iterator forward.
+ if (current_->type == HeapItem::ITERATOR) {
+ current_->iter.Next();
+ if (current_->iter.Valid()) {
+ // current is still valid after the Next() call above. Call
+ // replace_top() to restore the heap property. When the same child
+ // iterator yields a sequence of keys, this is cheap.
+ assert(current_->iter.status().ok());
+ minHeap_.replace_top(current_);
+ } else {
+ // current stopped being valid, remove it from the heap.
+ considerStatus(current_->iter.status());
+ minHeap_.pop();
+ }
+ } else {
+ assert(current_->type == HeapItem::DELETE_RANGE_START);
+ size_t level = current_->level;
+ assert(range_tombstone_iters_[level]);
+ range_tombstone_iters_[level]->Next();
+ if (range_tombstone_iters_[level]->Valid()) {
+ pinned_heap_item_[level].SetTombstoneForCompaction(
+ range_tombstone_iters_[level]->start_key());
+ minHeap_.replace_top(&pinned_heap_item_[level]);
+ } else {
+ minHeap_.pop();
+ }
+ }
+ FindNextVisibleKey();
+ current_ = CurrentForward();
+}
+
+void CompactionMergingIterator::FindNextVisibleKey() {
+ // IsDeleteRangeSentinelKey() here means file boundary sentinel keys.
+ while (!minHeap_.empty() && minHeap_.top()->IsDeleteRangeSentinelKey()) {
+ HeapItem* current = minHeap_.top();
+ // range tombstone start keys from the same SSTable should have been
+ // exhausted
+ assert(!range_tombstone_iters_[current->level] ||
+ !range_tombstone_iters_[current->level]->Valid());
+ // iter is a LevelIterator, and it enters a new SST file in the Next()
+ // call here.
+ current->iter.Next();
+ if (current->iter.Valid()) {
+ assert(current->iter.status().ok());
+ minHeap_.replace_top(current);
+ } else {
+ minHeap_.pop();
+ }
+ if (range_tombstone_iters_[current->level]) {
+ InsertRangeTombstoneAtLevel(current->level);
+ }
+ }
+}
+void CompactionMergingIterator::AddToMinHeapOrCheckStatus(HeapItem* child) {
+ if (child->iter.Valid()) {
+ assert(child->iter.status().ok());
+ minHeap_.push(child);
+ } else {
+ considerStatus(child->iter.status());
+ }
+}
+
+InternalIterator* NewCompactionMergingIterator(
+ const InternalKeyComparator* comparator, InternalIterator** children, int n,
+ std::vector<std::pair<TruncatedRangeDelIterator*,
+ TruncatedRangeDelIterator***>>& range_tombstone_iters,
+ Arena* arena) {
+ assert(n >= 0);
+ if (n == 0) {
+ return NewEmptyInternalIterator<Slice>(arena);
+ } else {
+ if (arena == nullptr) {
+ return new CompactionMergingIterator(comparator, children, n, false,
+ range_tombstone_iters);
+ } else {
+ auto mem = arena->AllocateAligned(sizeof(CompactionMergingIterator));
+ return new (mem) CompactionMergingIterator(comparator, children, n, true,
+ range_tombstone_iters);
+ }
+ }
+}
+} // namespace ROCKSDB_NAMESPACE
--- /dev/null
+// Copyright (c) Meta Platforms, Inc. and affiliates.
+//
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include "db/range_del_aggregator.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/types.h"
+#include "table/merging_iterator.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class CompactionHeapItemComparator {
+ public:
+ explicit CompactionHeapItemComparator(const InternalKeyComparator* comparator)
+ : comparator_(comparator) {}
+ bool operator()(HeapItem* a, HeapItem* b) const {
+ int r = comparator_->Compare(a->key(), b->key());
+ if (r > 0) {
+ return true;
+ } else if (r < 0) {
+ return false;
+ } else {
+ // When range tombstone and point key have the same internal key,
+ // range tombstone comes first. So that when range tombstone and
+ // file's largest key are the same, the file boundary sentinel key
+ // comes after.
+ return a->type == HeapItem::ITERATOR &&
+ b->type == HeapItem::DELETE_RANGE_START;
+ }
+ }
+
+ private:
+ const InternalKeyComparator* comparator_;
+};
+
+using CompactionMinHeap = BinaryHeap<HeapItem*, CompactionHeapItemComparator>;
+/*
+ * This is a simplified version of MergingIterator and is specifically used for
+ * compaction. It merges the input `children` iterators into a sorted stream of
+ * keys. Range tombstone start keys are also emitted to prevent oversize
+ * compactions. For example, consider an L1 file with content [a, b), y, z,
+ * where [a, b) is a range tombstone and y and z are point keys. This could
+ * cause an oversize compaction as it can overlap with a wide range of key space
+ * in L2.
+ *
+ * CompactionMergingIterator emits range tombstone start keys from each LSM
+ * level's range tombstone iterator, and for each range tombstone
+ * [start,end)@seqno, the key will be start@kMaxSequenceNumber unless truncated
+ * at file boundary (see detail TruncatedRangeDelIterator::start_key()).
+ *
+ * Caller should use CompactionMergingIterator::IsDeleteRangeSentinelKey() to
+ * check if the current key is a range tombstone key.
+ * TODO(cbi): IsDeleteRangeSentinelKey() is used for two kinds of keys at
+ * different layers: file boundary and range tombstone keys. Separate them into
+ * two APIs for clarity.
+ */
+class CompactionMergingIterator : public InternalIterator {
+ public:
+ CompactionMergingIterator(
+ const InternalKeyComparator* comparator, InternalIterator** children,
+ int n, bool is_arena_mode,
+ std::vector<
+ std::pair<TruncatedRangeDelIterator*, TruncatedRangeDelIterator***>>
+ range_tombstones)
+ : is_arena_mode_(is_arena_mode),
+ comparator_(comparator),
+ current_(nullptr),
+ minHeap_(CompactionHeapItemComparator(comparator_)),
+ pinned_iters_mgr_(nullptr) {
+ children_.resize(n);
+ for (int i = 0; i < n; i++) {
+ children_[i].level = i;
+ children_[i].iter.Set(children[i]);
+ assert(children_[i].type == HeapItem::ITERATOR);
+ }
+ assert(range_tombstones.size() == static_cast<size_t>(n));
+ for (auto& p : range_tombstones) {
+ range_tombstone_iters_.push_back(p.first);
+ }
+
+ pinned_heap_item_.resize(n);
+ for (int i = 0; i < n; ++i) {
+ if (range_tombstones[i].second) {
+ // for LevelIterator
+ *range_tombstones[i].second = &range_tombstone_iters_[i];
+ }
+ pinned_heap_item_[i].level = i;
+ pinned_heap_item_[i].type = HeapItem::DELETE_RANGE_START;
+ }
+ }
+
+ void considerStatus(const Status& s) {
+ if (!s.ok() && status_.ok()) {
+ status_ = s;
+ }
+ }
+
+ ~CompactionMergingIterator() override {
+ // TODO: use unique_ptr for range_tombstone_iters_
+ for (auto child : range_tombstone_iters_) {
+ delete child;
+ }
+
+ for (auto& child : children_) {
+ child.iter.DeleteIter(is_arena_mode_);
+ }
+ status_.PermitUncheckedError();
+ }
+
+ bool Valid() const override { return current_ != nullptr && status_.ok(); }
+
+ Status status() const override { return status_; }
+
+ void SeekToFirst() override;
+
+ void Seek(const Slice& target) override;
+
+ void Next() override;
+
+ Slice key() const override {
+ assert(Valid());
+ return current_->key();
+ }
+
+ Slice value() const override {
+ assert(Valid());
+ if (LIKELY(current_->type == HeapItem::ITERATOR)) {
+ return current_->iter.value();
+ } else {
+ return dummy_tombstone_val;
+ }
+ }
+
+ // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result
+ // from current child iterator. Potentially as long as one of child iterator
+ // report out of bound is not possible, we know current key is within bound.
+ bool MayBeOutOfLowerBound() override {
+ assert(Valid());
+ return current_->type == HeapItem::DELETE_RANGE_START ||
+ current_->iter.MayBeOutOfLowerBound();
+ }
+
+ IterBoundCheck UpperBoundCheckResult() override {
+ assert(Valid());
+ return current_->type == HeapItem::DELETE_RANGE_START
+ ? IterBoundCheck::kUnknown
+ : current_->iter.UpperBoundCheckResult();
+ }
+
+ void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
+ pinned_iters_mgr_ = pinned_iters_mgr;
+ for (auto& child : children_) {
+ child.iter.SetPinnedItersMgr(pinned_iters_mgr);
+ }
+ }
+
+ bool IsDeleteRangeSentinelKey() const override {
+ assert(Valid());
+ return current_->type == HeapItem::DELETE_RANGE_START;
+ }
+
+ // Compaction uses the above subset of InternalIterator interface.
+ void SeekToLast() override { assert(false); }
+
+ void SeekForPrev(const Slice&) override { assert(false); }
+
+ void Prev() override { assert(false); }
+
+ bool NextAndGetResult(IterateResult*) override {
+ assert(false);
+ return false;
+ }
+
+ bool IsKeyPinned() const override {
+ assert(false);
+ return false;
+ }
+
+ bool IsValuePinned() const override {
+ assert(false);
+ return false;
+ }
+
+ bool PrepareValue() override {
+ assert(false);
+ return false;
+ }
+
+ private:
+ bool is_arena_mode_;
+ const InternalKeyComparator* comparator_;
+ // HeapItem for all child point iterators.
+ std::vector<HeapItem> children_;
+ // HeapItem for range tombstones. pinned_heap_item_[i] corresponds to the
+ // current range tombstone from range_tombstone_iters_[i].
+ std::vector<HeapItem> pinned_heap_item_;
+ // range_tombstone_iters_[i] contains range tombstones in the sorted run that
+ // corresponds to children_[i]. range_tombstone_iters_[i] ==
+ // nullptr means the sorted run of children_[i] does not have range
+ // tombstones (or the current SSTable does not have range tombstones in the
+ // case of LevelIterator).
+ std::vector<TruncatedRangeDelIterator*> range_tombstone_iters_;
+ // Used as value for range tombstone keys
+ std::string dummy_tombstone_val{};
+
+ // Skip file boundary sentinel keys.
+ void FindNextVisibleKey();
+
+ // top of minHeap_
+ HeapItem* current_;
+ // If any of the children have non-ok status, this is one of them.
+ Status status_;
+ CompactionMinHeap minHeap_;
+ PinnedIteratorsManager* pinned_iters_mgr_;
+ // Process a child that is not in the min heap.
+ // If valid, add to the min heap. Otherwise, check status.
+ void AddToMinHeapOrCheckStatus(HeapItem*);
+
+ HeapItem* CurrentForward() const {
+ return !minHeap_.empty() ? minHeap_.top() : nullptr;
+ }
+
+ void InsertRangeTombstoneAtLevel(size_t level) {
+ if (range_tombstone_iters_[level]->Valid()) {
+ pinned_heap_item_[level].SetTombstoneForCompaction(
+ range_tombstone_iters_[level]->start_key());
+ minHeap_.push(&pinned_heap_item_[level]);
+ }
+ }
+};
+
+InternalIterator* NewCompactionMergingIterator(
+ const InternalKeyComparator* comparator, InternalIterator** children, int n,
+ std::vector<std::pair<TruncatedRangeDelIterator*,
+ TruncatedRangeDelIterator***>>& range_tombstone_iters,
+ Arena* arena = nullptr);
+} // namespace ROCKSDB_NAMESPACE
#include "table/merging_iterator.h"
#include "db/arena_wrapped_db_iter.h"
-#include "db/dbformat.h"
-#include "db/pinned_iterators_manager.h"
-#include "memory/arena.h"
-#include "monitoring/perf_context_imp.h"
-#include "rocksdb/comparator.h"
-#include "rocksdb/iterator.h"
-#include "rocksdb/options.h"
-#include "table/internal_iterator.h"
-#include "table/iter_heap.h"
-#include "table/iterator_wrapper.h"
-#include "test_util/sync_point.h"
-#include "util/autovector.h"
-#include "util/heap.h"
-#include "util/stop_watch.h"
namespace ROCKSDB_NAMESPACE {
-// For merging iterator to process range tombstones, we treat the start and end
-// keys of a range tombstone as point keys and put them into the minHeap/maxHeap
-// used in merging iterator. Take minHeap for example, we are able to keep track
-// of currently "active" range tombstones (the ones whose start keys are popped
-// but end keys are still in the heap) in `active_`. This `active_` set of range
-// tombstones is then used to quickly determine whether the point key at heap
-// top is deleted (by heap property, the point key at heap top must be within
-// internal key range of active range tombstones).
-//
-// The HeapItem struct represents 3 types of elements in the minHeap/maxHeap:
-// point key and the start and end keys of a range tombstone.
-struct HeapItem {
- HeapItem() = default;
-
- enum Type { ITERATOR, DELETE_RANGE_START, DELETE_RANGE_END };
- IteratorWrapper iter;
- size_t level = 0;
- ParsedInternalKey parsed_ikey;
- // Will be overwritten before use, initialize here so compiler does not
- // complain.
- Type type = ITERATOR;
-
- explicit HeapItem(size_t _level, InternalIteratorBase<Slice>* _iter)
- : level(_level), type(Type::ITERATOR) {
- iter.Set(_iter);
- }
-
- void SetTombstoneKey(ParsedInternalKey&& pik) {
- // op_type is already initialized in MergingIterator::Finish().
- parsed_ikey.user_key = pik.user_key;
- parsed_ikey.sequence = pik.sequence;
- }
-
- Slice key() const {
- assert(type == ITERATOR);
- return iter.key();
- }
-
- bool IsDeleteRangeSentinelKey() const {
- if (type == Type::ITERATOR) {
- return iter.IsDeleteRangeSentinelKey();
- }
- return false;
- }
-};
-
-class MinHeapItemComparator {
- public:
- MinHeapItemComparator(const InternalKeyComparator* comparator)
- : comparator_(comparator) {}
- bool operator()(HeapItem* a, HeapItem* b) const {
- if (LIKELY(a->type == HeapItem::ITERATOR)) {
- if (LIKELY(b->type == HeapItem::ITERATOR)) {
- return comparator_->Compare(a->key(), b->key()) > 0;
- } else {
- return comparator_->Compare(a->key(), b->parsed_ikey) > 0;
- }
- } else {
- if (LIKELY(b->type == HeapItem::ITERATOR)) {
- return comparator_->Compare(a->parsed_ikey, b->key()) > 0;
- } else {
- return comparator_->Compare(a->parsed_ikey, b->parsed_ikey) > 0;
- }
- }
- }
-
- private:
- const InternalKeyComparator* comparator_;
-};
-
class MaxHeapItemComparator {
public:
MaxHeapItemComparator(const InternalKeyComparator* comparator)
bool operator()(HeapItem* a, HeapItem* b) const {
if (LIKELY(a->type == HeapItem::ITERATOR)) {
if (LIKELY(b->type == HeapItem::ITERATOR)) {
- return comparator_->Compare(a->key(), b->key()) < 0;
+ return comparator_->Compare(a->iter.key(), b->iter.key()) < 0;
} else {
- return comparator_->Compare(a->key(), b->parsed_ikey) < 0;
+ return comparator_->Compare(a->iter.key(), b->parsed_ikey) < 0;
}
} else {
if (LIKELY(b->type == HeapItem::ITERATOR)) {
- return comparator_->Compare(a->parsed_ikey, b->key()) < 0;
+ return comparator_->Compare(a->parsed_ikey, b->iter.key()) < 0;
} else {
return comparator_->Compare(a->parsed_ikey, b->parsed_ikey) < 0;
}
};
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
namespace {
-using MergerMinIterHeap = BinaryHeap<HeapItem*, MinHeapItemComparator>;
using MergerMaxIterHeap = BinaryHeap<HeapItem*, MaxHeapItemComparator>;
} // namespace
direction_(kForward),
comparator_(comparator),
current_(nullptr),
- minHeap_(comparator_),
+ minHeap_(MinHeapItemComparator(comparator_)),
pinned_iters_mgr_(nullptr),
iterate_upper_bound_(iterate_upper_bound) {
children_.resize(n);
if (child.iter.status() == Status::TryAgain()) {
continue;
}
- if (child.iter.Valid() && comparator_->Equal(target, child.key())) {
+ if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) {
assert(child.iter.status().ok());
child.iter.Next();
}
for (auto& child : children_) {
if (child.iter.status() == Status::TryAgain()) {
child.iter.Seek(target);
- if (child.iter.Valid() && comparator_->Equal(target, child.key())) {
+ if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) {
assert(child.iter.status().ok());
child.iter.Next();
}
if (&child.iter != current_) {
child.iter.SeekForPrev(target);
TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
- if (child.iter.Valid() && comparator_->Equal(target, child.key())) {
+ if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) {
assert(child.iter.status().ok());
child.iter.Prev();
}
#include "db/range_del_aggregator.h"
#include "rocksdb/slice.h"
#include "rocksdb/types.h"
+#include "table/iterator_wrapper.h"
namespace ROCKSDB_NAMESPACE {
range_del_iter_ptrs_;
};
+// For merging iterator to process range tombstones, we treat the start and end
+// keys of a range tombstone as point keys and put them into the minHeap/maxHeap
+// used in merging iterator. Take minHeap for example, we are able to keep track
+// of currently "active" range tombstones (the ones whose start keys are popped
+// but end keys are still in the heap) in `active_`. This `active_` set of range
+// tombstones is then used to quickly determine whether the point key at heap
+// top is deleted (by heap property, the point key at heap top must be within
+// internal key range of active range tombstones).
+//
+// The HeapItem struct represents 3 types of elements in the minHeap/maxHeap:
+// point key and the start and end keys of a range tombstone.
+struct HeapItem {
+ HeapItem() = default;
+
+ enum Type { ITERATOR, DELETE_RANGE_START, DELETE_RANGE_END };
+ IteratorWrapper iter;
+ size_t level = 0;
+ ParsedInternalKey parsed_ikey;
+ std::string range_tombstone_key;
+ // Will be overwritten before use, initialize here so compiler does not
+ // complain.
+ Type type = ITERATOR;
+
+ explicit HeapItem(size_t _level, InternalIteratorBase<Slice>* _iter)
+ : level(_level), type(Type::ITERATOR) {
+ iter.Set(_iter);
+ }
+
+ void SetTombstoneKey(ParsedInternalKey&& pik) {
+ // op_type is already initialized in MergingIterator::Finish().
+ parsed_ikey.user_key = pik.user_key;
+ parsed_ikey.sequence = pik.sequence;
+ }
+
+ void SetTombstoneForCompaction(const ParsedInternalKey&& pik) {
+ range_tombstone_key.clear();
+ AppendInternalKey(&range_tombstone_key, pik);
+ }
+
+ Slice key() const {
+ if (LIKELY(type == ITERATOR)) {
+ return iter.key();
+ }
+ return range_tombstone_key;
+ }
+
+ bool IsDeleteRangeSentinelKey() const {
+ if (LIKELY(type == ITERATOR)) {
+ return iter.IsDeleteRangeSentinelKey();
+ }
+ return false;
+ }
+};
+
+class MinHeapItemComparator {
+ public:
+ explicit MinHeapItemComparator(const InternalKeyComparator* comparator)
+ : comparator_(comparator) {}
+ bool operator()(HeapItem* a, HeapItem* b) const {
+ if (LIKELY(a->type == HeapItem::ITERATOR)) {
+ if (LIKELY(b->type == HeapItem::ITERATOR)) {
+ return comparator_->Compare(a->iter.key(), b->iter.key()) > 0;
+ } else {
+ return comparator_->Compare(a->iter.key(), b->parsed_ikey) > 0;
+ }
+ } else {
+ if (LIKELY(b->type == HeapItem::ITERATOR)) {
+ return comparator_->Compare(a->parsed_ikey, b->iter.key()) > 0;
+ } else {
+ return comparator_->Compare(a->parsed_ikey, b->parsed_ikey) > 0;
+ }
+ }
+ }
+
+ private:
+ const InternalKeyComparator* comparator_;
+};
+
+using MergerMinIterHeap = BinaryHeap<HeapItem*, MinHeapItemComparator>;
} // namespace ROCKSDB_NAMESPACE