]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
[wal changes 1/3] fixed unbounded wal growth in some workloads
authorMike Kolupaev <kolmike@fb.com>
Thu, 2 Jul 2015 21:27:00 +0000 (14:27 -0700)
committeragiardullo <agiardullo@fb.com>
Tue, 7 Jul 2015 19:33:12 +0000 (12:33 -0700)
Summary:
This fixes the following scenario we've hit:
 - we reached max_total_wal_size, created a new wal and scheduled flushing all memtables corresponding to the old one,
 - before the last of these flushes started its column family was dropped; the last background flush call was a no-op; no one removed the old wal from alive_logs_,
 - hours have passed and no flushes happened even though lots of data was written; data is written to different column families, compactions are disabled; old column families are dropped before memtable grows big enough to trigger a flush; the old wal still sits in alive_logs_ preventing max_total_wal_size limit from kicking in,
 - a few more hours pass and we run out disk space because of one huge .log file.

Test Plan: `make check`; backported the new test, checked that it fails without this diff

Reviewers: igor

Reviewed By: igor

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D40893
(cherry picked from commit 218487d8dc45c4cb03dbb80bd4d7031b131b9f25)

db/db_impl.cc
db/db_impl.h
db/db_impl_debug.cc
db/db_test.cc
db/version_set.h

index e7465e0f99edbc2376877036cd7b71fcada1c59e..c599f1ef298c505f593e159605ec75da58d5a907 100644 (file)
@@ -527,6 +527,24 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
   versions_->GetObsoleteFiles(&job_context->sst_delete_files,
                               job_context->min_pending_output);
 
+  uint64_t min_log_number = versions_->MinLogNumber();
+  if (!alive_log_files_.empty()) {
+    // find newly obsoleted log files
+    while (alive_log_files_.begin()->number < min_log_number) {
+      auto& earliest = *alive_log_files_.begin();
+      job_context->log_delete_files.push_back(earliest.number);
+      total_log_size_ -= earliest.size;
+      alive_log_files_.pop_front();
+      // Current log should always stay alive since it can't have
+      // number < MinLogNumber().
+      assert(alive_log_files_.size());
+    }
+  }
+
+  // We're just cleaning up for DB::Write().
+  job_context->logs_to_free = logs_to_free_;
+  logs_to_free_.clear();
+
   // store the current filenum, lognum, etc
   job_context->manifest_file_number = versions_->manifest_file_number();
   job_context->pending_manifest_file_number =
@@ -1309,17 +1327,6 @@ Status DBImpl::FlushMemTableToOutputFile(
     VersionStorageInfo::LevelSummaryStorage tmp;
     LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(),
                 cfd->current()->storage_info()->LevelSummary(&tmp));
-
-    if (disable_delete_obsolete_files_ == 0) {
-      // add to deletion state
-      while (alive_log_files_.size() &&
-             alive_log_files_.begin()->number < versions_->MinLogNumber()) {
-        const auto& earliest = *alive_log_files_.begin();
-        job_context->log_delete_files.push_back(earliest.number);
-        total_log_size_ -= earliest.size;
-        alive_log_files_.pop_front();
-      }
-    }
   }
 
   if (!s.ok() && !s.IsShutdownInProgress() && db_options_.paranoid_checks &&
@@ -2145,7 +2152,9 @@ void DBImpl::RecordFlushIOStats() {
 
 void DBImpl::BGWorkFlush(void* db) {
   IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
+  TEST_SYNC_POINT("DBImpl::BGWorkFlush");
   reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush();
+  TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
 }
 
 void DBImpl::BGWorkCompaction(void* db) {
@@ -2238,10 +2247,6 @@ void DBImpl::BackgroundCallFlush() {
 
     ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
 
-    // We're just cleaning up for DB::Write()
-    job_context.logs_to_free = logs_to_free_;
-    logs_to_free_.clear();
-
     // If flush failed, we want to delete all temporary files that we might have
     // created. Thus, we force full scan in FindObsoleteFiles()
     FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
@@ -2308,10 +2313,6 @@ void DBImpl::BackgroundCallCompaction() {
 
     ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
 
-    // We're just cleaning up for DB::Write()
-    job_context.logs_to_free = logs_to_free_;
-    logs_to_free_.clear();
-
     // If compaction failed, we want to delete all temporary files that we might
     // have created (they might not be all recorded in job_context in case of a
     // failure). Thus, we force full scan in FindObsoleteFiles()
index a649b2baa24db32377e85fbaca77ea1806057921..dd37dd03160dd683634e47c5c6a5949c9f4e5613 100644 (file)
@@ -290,6 +290,8 @@ class DBImpl : public DB {
 
   size_t TEST_LogsToFreeSize();
 
+  uint64_t TEST_LogfileNumber();
+
 #endif  // ROCKSDB_LITE
 
   // Returns the list of live files in 'live' and the list
index 35703cf1aa71c6db786782a0e6101ce2ae63a03f..66177ed7af3c0cea4859eb981047c3a71008914c 100644 (file)
@@ -148,5 +148,10 @@ size_t DBImpl::TEST_LogsToFreeSize() {
   return logs_to_free_.size();
 }
 
+uint64_t DBImpl::TEST_LogfileNumber() {
+  InstrumentedMutexLock l(&mutex_);
+  return logfile_number_;
+}
+
 }  // namespace rocksdb
 #endif  // ROCKSDB_LITE
index c5d2a9b64bb365e1988c6724f26ad88a1cd7e9f3..ddf7de9a48755b01306989118706ebaeced2f2d7 100644 (file)
@@ -8540,7 +8540,6 @@ TEST_F(DBTest, TransactionLogIterator) {
   } while (ChangeCompactOptions());
 }
 
-#ifndef NDEBUG // sync point is not included with DNDEBUG build
 TEST_F(DBTest, TransactionLogIteratorRace) {
   static const int LOG_ITERATOR_RACE_TEST_COUNT = 2;
   static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = {
@@ -8595,7 +8594,6 @@ TEST_F(DBTest, TransactionLogIteratorRace) {
     } while (ChangeCompactOptions());
   }
 }
-#endif
 
 TEST_F(DBTest, TransactionLogIteratorStallAtLastRecord) {
   do {
@@ -14136,6 +14134,40 @@ TEST_F(DBTest, PrevAfterMerge) {
   ASSERT_EQ("1", it->key().ToString());
 }
 
+TEST_F(DBTest, DeletingOldWalAfterDrop) {
+  rocksdb::SyncPoint::GetInstance()->LoadDependency(
+      { { "Test:AllowFlushes", "DBImpl::BGWorkFlush" },
+        { "DBImpl::BGWorkFlush:done", "Test:WaitForFlush"} });
+  rocksdb::SyncPoint::GetInstance()->ClearTrace();
+
+  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+  Options options = CurrentOptions();
+  options.max_total_wal_size = 8192;
+  options.compression = kNoCompression;
+  options.write_buffer_size = 1 << 20;
+  options.level0_file_num_compaction_trigger = (1<<30);
+  options.level0_slowdown_writes_trigger = (1<<30);
+  options.level0_stop_writes_trigger = (1<<30);
+  options.disable_auto_compactions = true;
+  DestroyAndReopen(options);
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+  CreateColumnFamilies({"cf1", "cf2"}, options);
+  ASSERT_OK(Put(0, "key1", DummyString(8192)));
+  ASSERT_OK(Put(0, "key2", DummyString(8192)));
+  // the oldest wal should now be getting_flushed
+  ASSERT_OK(db_->DropColumnFamily(handles_[0]));
+  // all flushes should now do nothing because their CF is dropped
+  TEST_SYNC_POINT("Test:AllowFlushes");
+  TEST_SYNC_POINT("Test:WaitForFlush");
+  uint64_t lognum1 = dbfull()->TEST_LogfileNumber();
+  ASSERT_OK(Put(1, "key3", DummyString(8192)));
+  ASSERT_OK(Put(1, "key4", DummyString(8192)));
+  // new wal should have been created
+  uint64_t lognum2 = dbfull()->TEST_LogfileNumber();
+  EXPECT_GT(lognum2, lognum1);
+}
+
 }  // namespace rocksdb
 
 int main(int argc, char** argv) {
index 9ee6aeaa92e0497a9b7b65670c87b1b2b119c1f0..778e537f52b276849814e41a0bef822434f701fc 100644 (file)
@@ -612,7 +612,9 @@ class VersionSet {
   uint64_t MinLogNumber() const {
     uint64_t min_log_num = std::numeric_limits<uint64_t>::max();
     for (auto cfd : *column_family_set_) {
-      if (min_log_num > cfd->GetLogNumber()) {
+      // It's safe to ignore dropped column families here:
+      // cfd->IsDropped() becomes true after the drop is persisted in MANIFEST.
+      if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) {
         min_log_num = cfd->GetLogNumber();
       }
     }