if (!s.ok()) {
break;
}
+ TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
+ TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
}
}
if (!s.ok()) {
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
return;
- } else if (bg_manual_only_) {
- // manual only
- return;
}
while (unscheduled_flushes_ > 0 &&
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
}
+ if (bg_manual_only_) {
+ // only manual compactions are allowed to run. don't schedule automatic
+ // compactions
+ return;
+ }
+
if (db_options_.max_background_flushes == 0 &&
bg_compaction_scheduled_ < db_options_.max_background_compactions &&
unscheduled_flushes_ > 0) {
ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
}
+// Make sure that Flushes can proceed in parallel with CompactRange()
+TEST_F(DBTest, FlushesInParallelWithCompactRange) {
+ // iter == 0 -- leveled
+ // iter == 1 -- leveled, but throw in a flush between two levels compacting
+ // iter == 2 -- universal
+ for (int iter = 0; iter < 3; ++iter) {
+ printf("iter %d\n", iter);
+ Options options = CurrentOptions();
+ if (iter < 2) {
+ options.compaction_style = kCompactionStyleLevel;
+ } else {
+ options.compaction_style = kCompactionStyleUniversal;
+ }
+ options.write_buffer_size = 110 << 10;
+ options.level0_file_num_compaction_trigger = 4;
+ options.num_levels = 4;
+ options.compression = kNoCompression;
+ options.max_bytes_for_level_base = 450 << 10;
+ options.target_file_size_base = 98 << 10;
+ options.max_write_buffer_number = 2;
+
+ DestroyAndReopen(options);
+
+ Random rnd(301);
+ for (int num = 0; num < 14; num++) {
+ GenerateNewRandomFile(&rnd);
+ }
+
+ if (iter == 1) {
+ rocksdb::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::RunManualCompaction()::1",
+ "DBTest::FlushesInParallelWithCompactRange:1"},
+ {"DBTest::FlushesInParallelWithCompactRange:2",
+ "DBImpl::RunManualCompaction()::2"}});
+ } else {
+ rocksdb::SyncPoint::GetInstance()->LoadDependency(
+ {{"CompactionJob::Run():Start",
+ "DBTest::FlushesInParallelWithCompactRange:1"},
+ {"DBTest::FlushesInParallelWithCompactRange:2",
+ "CompactionJob::Run():End"}});
+ }
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ std::vector<std::thread> threads;
+ threads.emplace_back([&]() { Compact("a", "z"); });
+
+ TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:1");
+
+ // this has to start a flush. if flushes are blocked, this will try to
+ // create
+ // 3 memtables, and that will fail because max_write_buffer_number is 2
+ for (int num = 0; num < 3; num++) {
+ GenerateNewRandomFile(&rnd, /* nowait */ true);
+ }
+
+ TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:2");
+
+ for (auto& t : threads) {
+ t.join();
+ }
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ }
+}
+
} // namespace rocksdb
int main(int argc, char** argv) {