VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
uint32_t output_path_id) {
assert(input_files.size());
+ // This compaction output should not overlap with a running compaction as
+ // `SanitizeCompactionInputFiles` should've checked earlier and db mutex
+ // shouldn't have been released since.
+ assert(!FilesRangeOverlapWithCompaction(input_files, output_level));
- // TODO(rven ): we might be able to run concurrent level 0 compaction
- // if the key ranges of the two compactions do not overlap, but for now
- // we do not allow it.
- if ((input_files[0].level == 0) && !level0_compactions_in_progress_.empty()) {
- return nullptr;
- }
- // This compaction output could overlap with a running compaction
- if (FilesRangeOverlapWithCompaction(input_files, output_level)) {
- return nullptr;
- }
auto c =
new Compaction(vstorage, ioptions_, mutable_cf_options, input_files,
output_level, compact_options.output_file_size_limit,
mutable_cf_options.max_compaction_bytes, output_path_id,
compact_options.compression, /* grandparents */ {}, true);
-
- // If it's level 0 compaction, make sure we don't execute any other level 0
- // compactions in parallel
RegisterCompaction(c);
return c;
}
// Takes a list of CompactionInputFiles and returns a (manual) Compaction
// object.
+ //
+ // Caller must provide a set of input files that has been passed through
+ // `SanitizeCompactionInputFiles` earlier. The lock should not be released
+ // between that call and this one.
Compaction* CompactFiles(const CompactionOptions& compact_options,
const std::vector<CompactionInputFiles>& input_files,
int output_level, VersionStorageInfo* vstorage,
options.statistics->getTickerCount(COMPACTION_KEY_DROP_OBSOLETE));
}
+TEST_F(DBCompactionTest, CompactFilesPendingL0Bug) {
+ // https://www.facebook.com/groups/rocksdb.dev/permalink/1389452781153232/
+ // CompactFiles() had a bug where it failed to pick a compaction when an L0
+ // compaction existed, but marked it as scheduled anyways. It'd never be
+ // unmarked as scheduled, so future compactions or DB close could hang.
+ const int kNumL0Files = 5;
+ Options options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = kNumL0Files - 1;
+ options.max_background_compactions = 2;
+ DestroyAndReopen(options);
+
+ rocksdb::SyncPoint::GetInstance()->LoadDependency(
+ {{"LevelCompactionPicker::PickCompaction:Return",
+ "DBCompactionTest::CompactFilesPendingL0Bug:Picked"},
+ {"DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted",
+ "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}});
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ auto schedule_multi_compaction_token =
+ dbfull()->TEST_write_controler().GetCompactionPressureToken();
+
+ // Files 0-3 will be included in an L0->L1 compaction.
+ //
+ // File 4 will be included in a call to CompactFiles() while the first
+ // compaction is running.
+ for (int i = 0; i < kNumL0Files - 1; ++i) {
+ ASSERT_OK(Put(Key(0), "val")); // sentinel to prevent trivial move
+ ASSERT_OK(Put(Key(i + 1), "val"));
+ ASSERT_OK(Flush());
+ }
+ TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:Picked");
+ // file 4 flushed after 0-3 picked
+ ASSERT_OK(Put(Key(kNumL0Files), "val"));
+ ASSERT_OK(Flush());
+
+ // previously DB close would hang forever as this situation caused scheduled
+ // compactions count to never decrement to zero.
+ ColumnFamilyMetaData cf_meta;
+ dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
+ ASSERT_EQ(kNumL0Files, cf_meta.levels[0].files.size());
+ std::vector<std::string> input_filenames;
+ input_filenames.push_back(cf_meta.levels[0].files.front().name);
+ ASSERT_OK(dbfull()
+ ->CompactFiles(CompactionOptions(), input_filenames,
+ 0 /* output_level */));
+ TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted");
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
TEST_F(DBCompactionTest, CompactFilesOverlapInL0Bug) {
// Regression test for bug of not pulling in L0 files that overlap the user-
// specified input files in time- and key-ranges.
c.reset(cfd->compaction_picker()->CompactFiles(
compact_options, input_files, output_level, version->storage_info(),
*cfd->GetLatestMutableCFOptions(), output_path_id));
- if (!c) {
- return Status::Aborted("Another Level 0 compaction is running");
- }
+ // we already sanitized the set of input files and checked for conflicts
+ // without releasing the lock, so we're guaranteed a compaction can be formed.
+ assert(c != nullptr);
+
c->SetInputVersion(version);
// deletion compaction currently not allowed in CompactFiles.
assert(!c->deletion_compaction());