]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix data loss after DB recovery by not allowing flush/compaction to be scheduled...
authorsdong <siying.d@fb.com>
Thu, 16 Jul 2015 02:58:28 +0000 (19:58 -0700)
committersdong <siying.d@fb.com>
Thu, 16 Jul 2015 21:57:50 +0000 (14:57 -0700)
Summary:
Previous run may leave some SST files with higher file numbers than manifest indicates.
Compaction or flush may start to run while DB::Open() is still going on. SST file garbage collection may happen interleaving with compaction or flush, and overwrite files generated by compaction of flushes after they are generated. This might cause data loss. This possibility of interleaving is recently introduced.
Fix it by not allowing compaction or flush to be scheduled before DB::Open() finishes.

Test Plan: Add a unit test. This verification will have a chance to fail without the fix but doesn't fix without the fix.

Reviewers: kradhakrishnan, anthony, yhchiang, IslamAbdelRahman, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D42399

db/db_impl.cc
db/fault_injection_test.cc

index 66eac6ad07085cd8ff273d148f9d69a6b2d136c0..857024c38960bfae2bc732744553ab923085b142 100644 (file)
@@ -2056,6 +2056,10 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
 
 void DBImpl::MaybeScheduleFlushOrCompaction() {
   mutex_.AssertHeld();
+  if (!opened_successfully_) {
+    // Compaction may introduce data race to DB open
+    return;
+  }
   if (bg_work_gate_closed_) {
     // gate closed for background work
     return;
@@ -2599,6 +2603,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
 
     mutex_.Unlock();
     status = compaction_job.Run();
+    TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
     mutex_.Lock();
 
     compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_);
@@ -4355,11 +4360,14 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
       }
     }
   }
-
+  TEST_SYNC_POINT("DBImpl::Open:Opened");
+  if (s.ok()) {
+    impl->opened_successfully_ = true;
+    impl->MaybeScheduleFlushOrCompaction();
+  }
   impl->mutex_.Unlock();
 
   if (s.ok()) {
-    impl->opened_successfully_ = true;
     Log(InfoLogLevel::INFO_LEVEL, impl->db_options_.info_log, "DB pointer %p",
         impl);
     *dbptr = impl;
index 85157c8e6b9aa754927adf638e4e2537f7f5c269..da269a069983a9b6c295a082a1529a32cdb048b7 100644 (file)
@@ -25,6 +25,7 @@
 #include "util/logging.h"
 #include "util/mock_env.h"
 #include "util/mutexlock.h"
+#include "util/sync_point.h"
 #include "util/testharness.h"
 #include "util/testutil.h"
 
@@ -185,6 +186,13 @@ class FaultInjectionTestEnv : public EnvWrapper {
   Status NewWritableFile(const std::string& fname,
                          unique_ptr<WritableFile>* result,
                          const EnvOptions& soptions) override {
+    if (!IsFilesystemActive()) {
+      return Status::Corruption("Not Active");
+    }
+    // Not allow overwriting files
+    if (target()->FileExists(fname)) {
+      return Status::Corruption("File already exists.");
+    }
     Status s = target()->NewWritableFile(fname, result, soptions);
     if (s.ok()) {
       result->reset(new TestWritableFile(fname, std::move(*result), this));
@@ -201,6 +209,9 @@ class FaultInjectionTestEnv : public EnvWrapper {
   }
 
   virtual Status DeleteFile(const std::string& f) override {
+    if (!IsFilesystemActive()) {
+      return Status::Corruption("Not Active");
+    }
     Status s = EnvWrapper::DeleteFile(f);
     if (!s.ok()) {
       fprintf(stderr, "Cannot delete file %s: %s\n", f.c_str(),
@@ -215,6 +226,9 @@ class FaultInjectionTestEnv : public EnvWrapper {
 
   virtual Status RenameFile(const std::string& s,
                             const std::string& t) override {
+    if (!IsFilesystemActive()) {
+      return Status::Corruption("Not Active");
+    }
     Status ret = EnvWrapper::RenameFile(s, t);
 
     if (ret.ok()) {
@@ -373,8 +387,11 @@ TestWritableFile::~TestWritableFile() {
 }
 
 Status TestWritableFile::Append(const Slice& data) {
+  if (!env_->IsFilesystemActive()) {
+    return Status::Corruption("Not Active");
+  }
   Status s = target_->Append(data);
-  if (s.ok() && env_->IsFilesystemActive()) {
+  if (s.ok()) {
     state_.pos_ += data.size();
   }
   return s;
@@ -544,33 +561,34 @@ class FaultInjectionTest : public testing::Test {
     ASSERT_OK(s);
   }
 
-  void Build(const WriteOptions& write_options, int start_idx, int num_vals) {
+  void Build(const WriteOptions& write_options, int start_idx, int num_vals,
+             bool sequential = true) {
     std::string key_space, value_space;
     WriteBatch batch;
     for (int i = start_idx; i < start_idx + num_vals; i++) {
-      Slice key = Key(i, &key_space);
+      Slice key = Key(sequential, i, &key_space);
       batch.Clear();
       batch.Put(key, Value(i, &value_space));
       ASSERT_OK(db_->Write(write_options, &batch));
     }
   }
 
-  Status ReadValue(int i, std::string* val) const {
+  Status ReadValue(int i, std::string* val, bool sequential) const {
     std::string key_space, value_space;
-    Slice key = Key(i, &key_space);
+    Slice key = Key(sequential, i, &key_space);
     Value(i, &value_space);
     ReadOptions options;
     return db_->Get(options, key, val);
   }
 
-  Status Verify(int start_idx, int num_vals,
-                ExpectedVerifResult expected) const {
+  Status Verify(int start_idx, int num_vals, ExpectedVerifResult expected,
+                bool seqeuntial = true) const {
     std::string val;
     std::string value_space;
     Status s;
     for (int i = start_idx; i < start_idx + num_vals && s.ok(); i++) {
       Value(i, &value_space);
-      s = ReadValue(i, &val);
+      s = ReadValue(i, &val, seqeuntial);
       if (s.ok()) {
         EXPECT_EQ(value_space, val);
       }
@@ -590,9 +608,16 @@ class FaultInjectionTest : public testing::Test {
   }
 
   // Return the ith key
-  Slice Key(int i, std::string* storage) const {
+  Slice Key(bool seqeuntial, int i, std::string* storage) const {
+    int num = i;
+    if (!seqeuntial) {
+      // random transfer
+      const int m = 0x5bd1e995;
+      num *= m;
+      num ^= num << 24;
+    }
     char buf[100];
-    snprintf(buf, sizeof(buf), "%016d", i);
+    snprintf(buf, sizeof(buf), "%016d", num);
     storage->assign(buf, strlen(buf));
     return Slice(*storage);
   }
@@ -772,14 +797,14 @@ TEST_F(FaultInjectionTest, DISABLED_WriteOptionSyncTest) {
   write_options.sync = false;
 
   std::string key_space, value_space;
-  ASSERT_OK(
-      db_->Put(write_options, Key(1, &key_space), Value(1, &value_space)));
+  ASSERT_OK(db_->Put(write_options, Key(true, 1, &key_space),
+                     Value(1, &value_space)));
   FlushOptions flush_options;
   flush_options.wait = false;
   ASSERT_OK(db_->Flush(flush_options));
   write_options.sync = true;
-  ASSERT_OK(
-      db_->Put(write_options, Key(2, &key_space), Value(2, &value_space)));
+  ASSERT_OK(db_->Put(write_options, Key(true, 2, &key_space),
+                     Value(2, &value_space)));
 
   env_->SetFilesystemActive(false);
   NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
@@ -788,14 +813,59 @@ TEST_F(FaultInjectionTest, DISABLED_WriteOptionSyncTest) {
   ASSERT_OK(OpenDB());
   std::string val;
   Value(2, &value_space);
-  ASSERT_OK(ReadValue(2, &val));
+  ASSERT_OK(ReadValue(2, &val, true));
   ASSERT_EQ(value_space, val);
 
   Value(1, &value_space);
-  ASSERT_OK(ReadValue(1, &val));
+  ASSERT_OK(ReadValue(1, &val, true));
   ASSERT_EQ(value_space, val);
 }
 
+TEST_F(FaultInjectionTest, UninstalledCompaction) {
+  options_.target_file_size_base = 32 * 1024;
+  options_.write_buffer_size = 100 << 10;  // 100KB
+  options_.level0_file_num_compaction_trigger = 6;
+  options_.level0_stop_writes_trigger = 1 << 10;
+  options_.level0_slowdown_writes_trigger = 1 << 10;
+  options_.max_background_compactions = 1;
+  OpenDB();
+
+  rocksdb::SyncPoint::GetInstance()->LoadDependency({
+      {"FaultInjectionTest::FaultTest:0", "DBImpl::BGWorkCompaction"},
+      {"CompactionJob::Run():End", "FaultInjectionTest::FaultTest:1"},
+      {"FaultInjectionTest::FaultTest:2",
+       "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"},
+  });
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+  int kNumKeys = 1000;
+  Build(WriteOptions(), 0, kNumKeys, false);
+  FlushOptions flush_options;
+  flush_options.wait = true;
+  db_->Flush(flush_options);
+  ASSERT_OK(db_->Put(WriteOptions(), "", ""));
+  TEST_SYNC_POINT("FaultInjectionTest::FaultTest:0");
+  TEST_SYNC_POINT("FaultInjectionTest::FaultTest:1");
+  env_->SetFilesystemActive(false);
+  TEST_SYNC_POINT("FaultInjectionTest::FaultTest:2");
+  CloseDB();
+  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+  ResetDBState(kResetDropUnsyncedData);
+
+  std::atomic<bool> opened(false);
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::Open:Opened", [&](void* arg) { opened.store(true); });
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::BGWorkCompaction",
+      [&](void* arg) { ASSERT_TRUE(opened.load()); });
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+  ASSERT_OK(OpenDB());
+  static_cast<DBImpl*>(db_)->TEST_WaitForCompact();
+  ASSERT_OK(Verify(0, kNumKeys, FaultInjectionTest::kValExpectFound, false));
+  ASSERT_OK(db_->Put(WriteOptions(), "", ""));
+  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
 }  // namespace rocksdb
 
 int main(int argc, char** argv) {