]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
[3.6.fb] Fix corruption bug in compaction and Mac compilation error
authorYueh-Hsuan Chiang <yhchiang@fb.com>
Wed, 29 Oct 2014 17:43:01 +0000 (10:43 -0700)
committerYueh-Hsuan Chiang <yhchiang@fb.com>
Wed, 29 Oct 2014 17:52:58 +0000 (10:52 -0700)
Summary:
Bring compaction and VersionEdit fix https://reviews.facebook.net/D25581
into 3.6.fb.

Test Plan:
./db_test
./version_edit_test

Reviewers: igor, ljin, sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D27801

db/db_impl.cc
db/db_test.cc
db/version_edit.cc
db/version_edit.h
db/version_edit_test.cc
include/rocksdb/version.h

index 383e2f6699649b62ccdb1c4bea14baf5910281bd..1562a0da64faf06a44a30de88710ae8e0b4bf061 100644 (file)
@@ -2453,7 +2453,7 @@ void DBImpl::CleanupCompaction(CompactionState* compact, Status status) {
     compact->builder->Abandon();
     compact->builder.reset();
   } else {
-    assert(compact->outfile == nullptr);
+    assert(!status.ok() || compact->outfile == nullptr);
   }
   for (size_t i = 0; i < compact->outputs.size(); i++) {
     const CompactionState::Output& out = compact->outputs[i];
@@ -2508,6 +2508,20 @@ Status DBImpl::OpenCompactionOutputFile(
     pending_outputs_[file_number] = compact->compaction->GetOutputPathId();
     mutex_.Unlock();
   }
+  // Make the output file
+  std::string fname = TableFileName(db_options_.db_paths, file_number,
+                                    compact->compaction->GetOutputPathId());
+  Status s = env_->NewWritableFile(fname, &compact->outfile, env_options_);
+
+  if (!s.ok()) {
+    Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
+        "[%s] OpenCompactionOutputFiles for table #%" PRIu64 " "
+        "fails at NewWritableFile with status %s",
+        compact->compaction->column_family_data()->GetName().c_str(),
+        file_number, s.ToString().c_str());
+    LogFlush(db_options_.info_log);
+    return s;
+  }
   CompactionState::Output out;
   out.number = file_number;
   out.path_id = compact->compaction->GetOutputPathId();
@@ -2516,22 +2530,15 @@ Status DBImpl::OpenCompactionOutputFile(
   out.smallest_seqno = out.largest_seqno = 0;
   compact->outputs.push_back(out);
 
-  // Make the output file
-  std::string fname = TableFileName(db_options_.db_paths, file_number,
-                                    compact->compaction->GetOutputPathId());
-  Status s = env_->NewWritableFile(fname, &compact->outfile, env_options_);
+  compact->outfile->SetIOPriority(Env::IO_LOW);
+  compact->outfile->SetPreallocationBlockSize(
+      compact->compaction->OutputFilePreallocationSize(mutable_cf_options));
 
-  if (s.ok()) {
-    compact->outfile->SetIOPriority(Env::IO_LOW);
-    compact->outfile->SetPreallocationBlockSize(
-        compact->compaction->OutputFilePreallocationSize(mutable_cf_options));
-
-    ColumnFamilyData* cfd = compact->compaction->column_family_data();
-    compact->builder.reset(NewTableBuilder(
-        *cfd->ioptions(), cfd->internal_comparator(), compact->outfile.get(),
-        compact->compaction->OutputCompressionType(),
-        cfd->ioptions()->compression_opts));
-  }
+  ColumnFamilyData* cfd = compact->compaction->column_family_data();
+  compact->builder.reset(NewTableBuilder(
+      *cfd->ioptions(), cfd->internal_comparator(), compact->outfile.get(),
+      compact->compaction->OutputCompressionType(),
+      cfd->ioptions()->compression_opts));
   LogFlush(db_options_.info_log);
   return s;
 }
@@ -2729,7 +2736,7 @@ Status DBImpl::ProcessKeyValueCompaction(
   int64_t key_drop_obsolete = 0;
   int64_t loop_cnt = 0;
   while (input->Valid() && !shutting_down_.Acquire_Load() &&
-         !cfd->IsDropped()) {
+         !cfd->IsDropped() && status.ok()) {
     if (++loop_cnt > 1000) {
       if (key_drop_user > 0) {
         RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
index c67c45786de773cd8047b0b3c7c1b2123703df81..a3aa42bdaf2fb46a3db2f6f5cbddb8867c540a11 100644 (file)
@@ -120,6 +120,8 @@ static std::string Key(int i) {
 // Special Env used to delay background operations
 class SpecialEnv : public EnvWrapper {
  public:
+  Random rnd_;
+
   // sstable Sync() calls are blocked while this pointer is non-nullptr.
   port::AtomicPointer delay_sstable_sync_;
 
@@ -153,7 +155,13 @@ class SpecialEnv : public EnvWrapper {
 
   std::atomic<int> sync_counter_;
 
-  explicit SpecialEnv(Env* base) : EnvWrapper(base) {
+  std::atomic<uint32_t> non_writeable_rate_;
+
+  std::atomic<uint32_t> new_writable_count_;
+
+  std::atomic<uint32_t> periodic_non_writable_;
+
+  explicit SpecialEnv(Env* base) : EnvWrapper(base), rnd_(301) {
     delay_sstable_sync_.Release_Store(nullptr);
     drop_writes_.Release_Store(nullptr);
     no_space_.Release_Store(nullptr);
@@ -165,6 +173,9 @@ class SpecialEnv : public EnvWrapper {
     log_write_error_.Release_Store(nullptr);
     bytes_written_ = 0;
     sync_counter_ = 0;
+    non_writeable_rate_ = 0;
+    new_writable_count_ = 0;
+    periodic_non_writable_ = 0;
   }
 
   Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
@@ -250,8 +261,19 @@ class SpecialEnv : public EnvWrapper {
       }
     };
 
-    if (non_writable_.Acquire_Load() != nullptr) {
-      return Status::IOError("simulated write error");
+    if (non_writeable_rate_.load(std::memory_order_acquire) > 0) {
+      auto random_number = rnd_.Uniform(100);
+      if (random_number < non_writeable_rate_.load()) {
+        return Status::IOError("simulated random write error");
+      }
+    }
+
+    new_writable_count_++;
+
+    auto periodic_fail = periodic_non_writable_.load();
+    if (periodic_fail > 0 &&
+        new_writable_count_.load() % periodic_fail == 0) {
+      return Status::IOError("simulated periodic write error");
     }
 
     Status s = target()->NewWritableFile(f, r, soptions);
@@ -5864,7 +5886,7 @@ TEST(DBTest, NonWritableFileSystem) {
     options.env = env_;
     Reopen(&options);
     ASSERT_OK(Put("foo", "v1"));
-    env_->non_writable_.Release_Store(env_); // Force errors for new files
+    env_->non_writeable_rate_.store(100);  // Force errors for new files
     std::string big(100000, 'x');
     int errors = 0;
     for (int i = 0; i < 20; i++) {
@@ -5874,7 +5896,7 @@ TEST(DBTest, NonWritableFileSystem) {
       }
     }
     ASSERT_GT(errors, 0);
-    env_->non_writable_.Release_Store(nullptr);
+    env_->non_writeable_rate_.store(0);
   } while (ChangeCompactOptions());
 }
 
@@ -8589,6 +8611,142 @@ TEST(DBTest, DynamicMemtableOptions) {
   ASSERT_TRUE(SizeAtLevel(0) > k128KB + k64KB - 2 * k5KB);
 }
 
+TEST(DBTest, FileCreationRandomFailure) {
+  Options options;
+  options.env = env_;
+  options.create_if_missing = true;
+  options.write_buffer_size = 100000;  // Small write buffer
+  options.target_file_size_base = 200000;
+  options.max_bytes_for_level_base = 1000000;
+  options.max_bytes_for_level_multiplier = 2;
+
+  DestroyAndReopen(&options);
+  Random rnd(301);
+
+  const int kTestSize = kCDTKeysPerBuffer * 4096;
+  const int kTotalIteration = 100;
+  // the second half of the test involves in random failure
+  // of file creation.
+  const int kRandomFailureTest = kTotalIteration / 2;
+  std::vector<std::string> values;
+  for (int i = 0; i < kTestSize; ++i) {
+    values.push_back("NOT_FOUND");
+  }
+  for (int j = 0; j < kTotalIteration; ++j) {
+    if (j == kRandomFailureTest) {
+      env_->non_writeable_rate_.store(90);
+    }
+    for (int k = 0; k < kTestSize; ++k) {
+      // here we expect some of the Put fails.
+      std::string value = RandomString(&rnd, 100);
+      Status s = Put(Key(k), Slice(value));
+      if (s.ok()) {
+        // update the latest successful put
+        values[k] = value;
+      }
+      // But everything before we simulate the failure-test should succeed.
+      if (j < kRandomFailureTest) {
+        ASSERT_OK(s);
+      }
+    }
+  }
+
+  // If rocksdb does not do the correct job, internal assert will fail here.
+  dbfull()->TEST_WaitForFlushMemTable();
+  dbfull()->TEST_WaitForCompact();
+
+  // verify we have the latest successful update
+  for (int k = 0; k < kTestSize; ++k) {
+    auto v = Get(Key(k));
+    ASSERT_EQ(v, values[k]);
+  }
+
+  // reopen and reverify we have the latest successful update
+  env_->non_writeable_rate_.store(0);
+  Reopen(&options);
+  for (int k = 0; k < kTestSize; ++k) {
+    auto v = Get(Key(k));
+    ASSERT_EQ(v, values[k]);
+  }
+}
+
+TEST(DBTest, PartialCompactionFailure) {
+  Options options;
+  const int kKeySize = 16;
+  const int kKvSize = 1000;
+  const int kKeysPerBuffer = 100;
+  const int kNumL1Files = 5;
+  options.create_if_missing = true;
+  options.write_buffer_size = kKeysPerBuffer * kKvSize;
+  options.max_write_buffer_number = 2;
+  options.target_file_size_base =
+      options.write_buffer_size *
+      (options.max_write_buffer_number - 1);
+  options.level0_file_num_compaction_trigger = kNumL1Files;
+  options.max_bytes_for_level_base =
+      options.level0_file_num_compaction_trigger *
+      options.target_file_size_base;
+  options.max_bytes_for_level_multiplier = 2;
+  options.compression = kNoCompression;
+
+  // The number of NewWritableFiles calls required by each operation.
+  const int kNumInitialNewWritableFiles = 4;
+  const int kNumLevel0FlushNewWritableFiles =
+      options.level0_file_num_compaction_trigger * 2;
+  const int kNumLevel1NewWritableFiles =
+      options.level0_file_num_compaction_trigger + 1;
+  // This setting will make one of the file-creation fail
+  // in the first L0 -> L1 compaction while making sure
+  // all flushes succeeed.
+  env_->periodic_non_writable_ =
+      kNumInitialNewWritableFiles + kNumLevel0FlushNewWritableFiles +
+      kNumLevel1NewWritableFiles - 3;
+  options.env = env_;
+
+  DestroyAndReopen(&options);
+
+  const int kNumKeys =
+      options.level0_file_num_compaction_trigger *
+      (options.max_write_buffer_number - 1) *
+      kKeysPerBuffer * 1.0;
+
+  Random rnd(301);
+  std::vector<std::string> keys;
+  std::vector<std::string> values;
+  for (int k = 0; k < kNumKeys; ++k) {
+    keys.emplace_back(RandomString(&rnd, kKeySize));
+    values.emplace_back(RandomString(&rnd, kKvSize - kKeySize));
+    ASSERT_OK(Put(Slice(keys[k]), Slice(values[k])));
+  }
+
+  dbfull()->TEST_WaitForFlushMemTable();
+  // Make sure the number of L0 files can trigger compaction.
+  ASSERT_GE(NumTableFilesAtLevel(0),
+            options.level0_file_num_compaction_trigger);
+  auto previous_num_level0_files = NumTableFilesAtLevel(0);
+  // Expect compaction to fail here as one file will fail its
+  // creation.
+  dbfull()->TEST_WaitForCompact();
+  // Verify L0 -> L1 compaction does fail.
+  ASSERT_EQ(NumTableFilesAtLevel(1), 0);
+  // Verify all L0 files are still there.
+  ASSERT_EQ(NumTableFilesAtLevel(0), previous_num_level0_files);
+
+  // All key-values must exist after compaction fails.
+  for (int k = 0; k < kNumKeys; ++k) {
+    ASSERT_EQ(values[k], Get(keys[k]));
+  }
+
+  // Make sure RocksDB will not get into corrupted state.
+  Reopen(&options);
+
+  // Verify again after reopen.
+  for (int k = 0; k < kNumKeys; ++k) {
+    ASSERT_EQ(values[k], Get(keys[k]));
+  }
+}
+
+
 TEST(DBTest, DynamicCompactionOptions) {
   const uint64_t k64KB = 1 << 16;
   const uint64_t k128KB = 1 << 17;
@@ -8615,7 +8773,7 @@ TEST(DBTest, DynamicCompactionOptions) {
   options.max_bytes_for_level_multiplier = 4;
   DestroyAndReopen(&options);
 
-  auto gen_l0_kb = [this](int start, int size, int stride = 1) {
+  auto gen_l0_kb = [this](int start, int size, int stride) {
     Random rnd(301);
     std::vector<std::string> values;
     for (int i = 0; i < size; i++) {
@@ -8627,11 +8785,11 @@ TEST(DBTest, DynamicCompactionOptions) {
 
   // Write 3 files that have the same key range, trigger compaction and
   // result in one L1 file
-  gen_l0_kb(0, 128);
+  gen_l0_kb(0, 128, 1);
   ASSERT_EQ(NumTableFilesAtLevel(0), 1);
-  gen_l0_kb(0, 128);
+  gen_l0_kb(0, 128, 1);
   ASSERT_EQ(NumTableFilesAtLevel(0), 2);
-  gen_l0_kb(0, 128);
+  gen_l0_kb(0, 128, 1);
   dbfull()->TEST_WaitForCompact();
   ASSERT_EQ("0,1", FilesPerLevel());
   std::vector<LiveFileMetaData> metadata;
@@ -8646,9 +8804,9 @@ TEST(DBTest, DynamicCompactionOptions) {
     {"target_file_size_base", "65536"}
   }));
 
-  gen_l0_kb(0, 128);
+  gen_l0_kb(0, 128, 1);
   ASSERT_EQ("1,1", FilesPerLevel());
-  gen_l0_kb(0, 128);
+  gen_l0_kb(0, 128, 1);
   dbfull()->TEST_WaitForCompact();
   ASSERT_EQ("0,2", FilesPerLevel());
   metadata.clear();
index 4e2cf8f5b40cc4f956214dd430e98a4e9498f2b9..2c77a2712403cf043fb5825218869f24eba4f2f8 100644 (file)
@@ -64,7 +64,7 @@ void VersionEdit::Clear() {
   column_family_name_.clear();
 }
 
-void VersionEdit::EncodeTo(std::string* dst) const {
+bool VersionEdit::EncodeTo(std::string* dst) const {
   if (has_comparator_) {
     PutVarint32(dst, kComparator);
     PutLengthPrefixedSlice(dst, comparator_);
@@ -98,6 +98,9 @@ void VersionEdit::EncodeTo(std::string* dst) const {
 
   for (size_t i = 0; i < new_files_.size(); i++) {
     const FileMetaData& f = new_files_[i].second;
+    if (!f.smallest.Valid() || !f.largest.Valid()) {
+      return false;
+    }
     if (f.fd.GetPathId() == 0) {
       // Use older format to make sure user can roll back the build if they
       // don't config multiple DB paths.
@@ -131,6 +134,7 @@ void VersionEdit::EncodeTo(std::string* dst) const {
   if (is_column_family_drop_) {
     PutVarint32(dst, kColumnFamilyDrop);
   }
+  return true;
 }
 
 static bool GetInternalKey(Slice* input, InternalKey* dst) {
index db133402c95fb2f099c35711c59393bc3e164322..1cf2e1b7ca4eafc440ed8d3d99780cbf0e36e942 100644 (file)
@@ -212,7 +212,8 @@ class VersionEdit {
     is_column_family_drop_ = true;
   }
 
-  void EncodeTo(std::string* dst) const;
+  // return true on success.
+  bool EncodeTo(std::string* dst) const;
   Status DecodeFrom(const Slice& src);
 
   std::string DebugString(bool hex_key = false) const;
index 850f242c1f1bf33fa4ed55c988ceba5512c207fe..fe663c766c22c8358f0fe6425ccdd805ca7227e2 100644 (file)
@@ -44,6 +44,16 @@ TEST(VersionEditTest, EncodeDecode) {
   TestEncodeDecode(edit);
 }
 
+TEST(VersionEditTest, EncodeEmptyFile) {
+  VersionEdit edit;
+  edit.AddFile(0, 0, 0, 0,
+               InternalKey(),
+               InternalKey(),
+               0, 0);
+  std::string buffer;
+  ASSERT_TRUE(!edit.EncodeTo(&buffer));
+}
+
 TEST(VersionEditTest, ColumnFamilyTest) {
   VersionEdit edit;
   edit.SetColumnFamily(2);
index 84ece982ac74c2671dc66dcb70d7481db6f9b6af..a17dfc1d16bcef8135544e1b9519f98dfa9c0ca7 100644 (file)
@@ -6,7 +6,7 @@
 
 #define ROCKSDB_MAJOR 3
 #define ROCKSDB_MINOR 6
-#define ROCKSDB_PATCH 1
+#define ROCKSDB_PATCH 2
 
 // Do not use these. We made the mistake of declaring macros starting with
 // double underscore. Now we have to live with our choice. We'll deprecate these