]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Allow flushes to run in parallel with manual compaction
authorIgor Canadi <icanadi@fb.com>
Mon, 18 May 2015 22:34:33 +0000 (15:34 -0700)
committerIgor Canadi <icanadi@fb.com>
Mon, 18 May 2015 22:34:33 +0000 (15:34 -0700)
Summary: As title. I spent some time thinking about it and I don't think there should be any issue with running manual compaction and flushes in parallel

Test Plan: make check works

Reviewers: rven, yhchiang, sdong

Reviewed By: yhchiang, sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D38355

db/db_impl.cc
db/db_test.cc

index 8cf3037eec8a2261675b4d805a6bddabf882f4e4..19ce9c311e1874256b45286e26de9d2734e0e424 100644 (file)
@@ -1338,6 +1338,8 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
       if (!s.ok()) {
         break;
       }
+      TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
+      TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
     }
   }
   if (!s.ok()) {
@@ -1865,9 +1867,6 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
   } else if (shutting_down_.load(std::memory_order_acquire)) {
     // DB is being deleted; no more background compactions
     return;
-  } else if (bg_manual_only_) {
-    // manual only
-    return;
   }
 
   while (unscheduled_flushes_ > 0 &&
@@ -1877,6 +1876,12 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
     env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
   }
 
+  if (bg_manual_only_) {
+    // only manual compactions are allowed to run. don't schedule automatic
+    // compactions
+    return;
+  }
+
   if (db_options_.max_background_flushes == 0 &&
       bg_compaction_scheduled_ < db_options_.max_background_compactions &&
       unscheduled_flushes_ > 0) {
index 94e4d6a5ed61dcc8270d53ab9e88cb4dfbb118a7..9f0bd43d2acabc6bb74eeb82083cef9ded57b5df 100644 (file)
@@ -12881,6 +12881,70 @@ TEST_F(DBTest, LargeBatchWithColumnFamilies) {
   ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
 }
 
+// Make sure that Flushes can proceed in parallel with CompactRange()
+TEST_F(DBTest, FlushesInParallelWithCompactRange) {
+  // iter == 0 -- leveled
+  // iter == 1 -- leveled, but throw in a flush between two levels compacting
+  // iter == 2 -- universal
+  for (int iter = 0; iter < 3; ++iter) {
+    printf("iter %d\n", iter);
+    Options options = CurrentOptions();
+    if (iter < 2) {
+      options.compaction_style = kCompactionStyleLevel;
+    } else {
+      options.compaction_style = kCompactionStyleUniversal;
+    }
+    options.write_buffer_size = 110 << 10;
+    options.level0_file_num_compaction_trigger = 4;
+    options.num_levels = 4;
+    options.compression = kNoCompression;
+    options.max_bytes_for_level_base = 450 << 10;
+    options.target_file_size_base = 98 << 10;
+    options.max_write_buffer_number = 2;
+
+    DestroyAndReopen(options);
+
+    Random rnd(301);
+    for (int num = 0; num < 14; num++) {
+      GenerateNewRandomFile(&rnd);
+    }
+
+    if (iter == 1) {
+    rocksdb::SyncPoint::GetInstance()->LoadDependency(
+        {{"DBImpl::RunManualCompaction()::1",
+          "DBTest::FlushesInParallelWithCompactRange:1"},
+         {"DBTest::FlushesInParallelWithCompactRange:2",
+          "DBImpl::RunManualCompaction()::2"}});
+    } else {
+      rocksdb::SyncPoint::GetInstance()->LoadDependency(
+          {{"CompactionJob::Run():Start",
+            "DBTest::FlushesInParallelWithCompactRange:1"},
+           {"DBTest::FlushesInParallelWithCompactRange:2",
+            "CompactionJob::Run():End"}});
+    }
+    rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+    std::vector<std::thread> threads;
+    threads.emplace_back([&]() { Compact("a", "z"); });
+
+    TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:1");
+
+    // this has to start a flush. if flushes are blocked, this will try to
+    // create
+    // 3 memtables, and that will fail because max_write_buffer_number is 2
+    for (int num = 0; num < 3; num++) {
+      GenerateNewRandomFile(&rnd, /* nowait */ true);
+    }
+
+    TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:2");
+
+    for (auto& t : threads) {
+      t.join();
+    }
+    rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+  }
+}
+
 }  // namespace rocksdb
 
 int main(int argc, char** argv) {