vstorage->GetOverlappingInputs(output_level, &smallest, &largest,
&output_level_inputs->files, *parent_index,
parent_index);
- if (!output_level_inputs->empty()) {
- ExpandWhileOverlapping(cf_name, vstorage, output_level_inputs);
- }
-
if (FilesInCompaction(output_level_inputs->files)) {
return false;
}
+ if (!output_level_inputs->empty()) {
+ if (!ExpandWhileOverlapping(cf_name, vstorage, output_level_inputs)) {
+ return false;
+ }
+ }
// See if we can further grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up. We also choose NOT
!vstorage->HasOverlappingUserKey(&expanded0.files, input_level)) {
InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit);
- std::vector<FileMetaData*> expanded1;
+ CompactionInputFiles expanded1;
+ expanded1.level = output_level;
vstorage->GetOverlappingInputs(output_level, &new_start, &new_limit,
- &expanded1, *parent_index, parent_index);
- if (expanded1.size() == output_level_inputs->size() &&
- !FilesInCompaction(expanded1)) {
+ &expanded1.files, *parent_index,
+ parent_index);
+ assert(!expanded1.empty());
+ if (!FilesInCompaction(expanded1.files) &&
+ ExpandWhileOverlapping(cf_name, vstorage, &expanded1) &&
+ expanded1.size() == output_level_inputs->size()) {
Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
"[%s] Expanding@%d %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt "(%" PRIu64
"+%" PRIu64 " bytes) to %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt
smallest = new_start;
largest = new_limit;
inputs->files = expanded0.files;
- output_level_inputs->files = expanded1;
+ output_level_inputs->files = expanded1.files;
}
}
}
vstorage_->GenerateLevelFilesBrief();
vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_);
vstorage_->GenerateLevel0NonOverlapping();
+ vstorage_->ComputeFilesMarkedForCompaction();
vstorage_->SetFinalized();
}
};
ASSERT_EQ(7U, compaction->input(1, 0)->fd.GetNumber());
}
+TEST_F(CompactionPickerTest, OverlappingUserKeys5) {
+ NewVersionStorage(6, kCompactionStyleLevel);
+ // Overlapping user keys on same level and output level
+ Add(1, 1U, "200", "400", 1000000000U);
+ Add(1, 2U, "400", "500", 1U, 0, 0);
+ Add(2, 3U, "000", "100", 1U);
+ Add(2, 4U, "100", "600", 1U, 0, 0);
+ Add(2, 5U, "600", "700", 1U, 0, 0);
+
+ vstorage_->LevelFiles(2)[2]->being_compacted = true;
+
+ UpdateVersionStorageInfo();
+
+ std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
+ cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+ ASSERT_TRUE(compaction.get() == nullptr);
+}
+
+TEST_F(CompactionPickerTest, OverlappingUserKeys6) {
+ NewVersionStorage(6, kCompactionStyleLevel);
+ // Overlapping user keys on same level and output level
+ Add(1, 1U, "200", "400", 1U, 0, 0);
+ Add(1, 2U, "401", "500", 1U, 0, 0);
+ Add(2, 3U, "000", "100", 1U);
+ Add(2, 4U, "100", "300", 1U, 0, 0);
+ Add(2, 5U, "305", "450", 1U, 0, 0);
+ Add(2, 6U, "460", "600", 1U, 0, 0);
+ Add(2, 7U, "600", "700", 1U, 0, 0);
+
+ vstorage_->LevelFiles(1)[0]->marked_for_compaction = true;
+ vstorage_->LevelFiles(1)[1]->marked_for_compaction = true;
+
+ UpdateVersionStorageInfo();
+
+ std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
+ cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+ ASSERT_TRUE(compaction.get() != nullptr);
+ ASSERT_EQ(2U, compaction->num_input_levels());
+ ASSERT_EQ(1U, compaction->num_input_files(0));
+ ASSERT_EQ(3U, compaction->num_input_files(1));
+}
+
+TEST_F(CompactionPickerTest, OverlappingUserKeys7) {
+ NewVersionStorage(6, kCompactionStyleLevel);
+ mutable_cf_options_.max_compaction_bytes = 100000000000u;
+ // Overlapping user keys on same level and output level
+ Add(1, 1U, "200", "400", 1U, 0, 0);
+ Add(1, 2U, "401", "500", 1000000000U, 0, 0);
+ Add(2, 3U, "100", "250", 1U);
+ Add(2, 4U, "300", "600", 1U, 0, 0);
+ Add(2, 5U, "600", "800", 1U, 0, 0);
+
+ UpdateVersionStorageInfo();
+
+ std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
+ cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+ ASSERT_TRUE(compaction.get() != nullptr);
+ ASSERT_EQ(2U, compaction->num_input_levels());
+ ASSERT_GE(1U, compaction->num_input_files(0));
+ ASSERT_GE(2U, compaction->num_input_files(1));
+ // File 5 has to be included in the compaction
+ ASSERT_EQ(5U, compaction->inputs(1)->back()->fd.GetNumber());
+}
+
TEST_F(CompactionPickerTest, NotScheduleL1IfL0WithHigherPri1) {
NewVersionStorage(6, kCompactionStyleLevel);
mutable_cf_options_.level0_file_num_compaction_trigger = 2;