From: Yueh-Hsuan Chiang Date: Wed, 29 Oct 2014 17:43:01 +0000 (-0700) Subject: [3.6.fb] Fix corruption bug in compaction and Mac compilation error X-Git-Tag: rocksdb-3.6.2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ca41f994ddc099c2cd74c8128e7f2244ab45dbd7;p=rocksdb.git [3.6.fb] Fix corruption bug in compaction and Mac compilation error Summary: Bring compaction and VersionEdit fix https://reviews.facebook.net/D25581 into 3.6.fb. Test Plan: ./db_test ./version_edit_test Reviewers: igor, ljin, sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D27801 --- diff --git a/db/db_impl.cc b/db/db_impl.cc index 383e2f66..1562a0da 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2453,7 +2453,7 @@ void DBImpl::CleanupCompaction(CompactionState* compact, Status status) { compact->builder->Abandon(); compact->builder.reset(); } else { - assert(compact->outfile == nullptr); + assert(!status.ok() || compact->outfile == nullptr); } for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; @@ -2508,6 +2508,20 @@ Status DBImpl::OpenCompactionOutputFile( pending_outputs_[file_number] = compact->compaction->GetOutputPathId(); mutex_.Unlock(); } + // Make the output file + std::string fname = TableFileName(db_options_.db_paths, file_number, + compact->compaction->GetOutputPathId()); + Status s = env_->NewWritableFile(fname, &compact->outfile, env_options_); + + if (!s.ok()) { + Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, + "[%s] OpenCompactionOutputFiles for table #%" PRIu64 " " + "fails at NewWritableFile with status %s", + compact->compaction->column_family_data()->GetName().c_str(), + file_number, s.ToString().c_str()); + LogFlush(db_options_.info_log); + return s; + } CompactionState::Output out; out.number = file_number; out.path_id = compact->compaction->GetOutputPathId(); @@ -2516,22 +2530,15 @@ Status DBImpl::OpenCompactionOutputFile( out.smallest_seqno = out.largest_seqno = 0; compact->outputs.push_back(out); - // Make the output file - std::string fname = TableFileName(db_options_.db_paths, file_number, - compact->compaction->GetOutputPathId()); - Status s = env_->NewWritableFile(fname, &compact->outfile, env_options_); + compact->outfile->SetIOPriority(Env::IO_LOW); + compact->outfile->SetPreallocationBlockSize( + compact->compaction->OutputFilePreallocationSize(mutable_cf_options)); - if (s.ok()) { - compact->outfile->SetIOPriority(Env::IO_LOW); - compact->outfile->SetPreallocationBlockSize( - compact->compaction->OutputFilePreallocationSize(mutable_cf_options)); - - ColumnFamilyData* cfd = compact->compaction->column_family_data(); - compact->builder.reset(NewTableBuilder( - *cfd->ioptions(), cfd->internal_comparator(), compact->outfile.get(), - compact->compaction->OutputCompressionType(), - cfd->ioptions()->compression_opts)); - } + ColumnFamilyData* cfd = compact->compaction->column_family_data(); + compact->builder.reset(NewTableBuilder( + *cfd->ioptions(), cfd->internal_comparator(), compact->outfile.get(), + compact->compaction->OutputCompressionType(), + cfd->ioptions()->compression_opts)); LogFlush(db_options_.info_log); return s; } @@ -2729,7 +2736,7 @@ Status DBImpl::ProcessKeyValueCompaction( int64_t key_drop_obsolete = 0; int64_t loop_cnt = 0; while (input->Valid() && !shutting_down_.Acquire_Load() && - !cfd->IsDropped()) { + !cfd->IsDropped() && status.ok()) { if (++loop_cnt > 1000) { if (key_drop_user > 0) { RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user); diff --git a/db/db_test.cc b/db/db_test.cc index c67c4578..a3aa42bd 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -120,6 +120,8 @@ static std::string Key(int i) { // Special Env used to delay background operations class SpecialEnv : public EnvWrapper { public: + Random rnd_; + // sstable Sync() calls are blocked while this pointer is non-nullptr. port::AtomicPointer delay_sstable_sync_; @@ -153,7 +155,13 @@ class SpecialEnv : public EnvWrapper { std::atomic sync_counter_; - explicit SpecialEnv(Env* base) : EnvWrapper(base) { + std::atomic non_writeable_rate_; + + std::atomic new_writable_count_; + + std::atomic periodic_non_writable_; + + explicit SpecialEnv(Env* base) : EnvWrapper(base), rnd_(301) { delay_sstable_sync_.Release_Store(nullptr); drop_writes_.Release_Store(nullptr); no_space_.Release_Store(nullptr); @@ -165,6 +173,9 @@ class SpecialEnv : public EnvWrapper { log_write_error_.Release_Store(nullptr); bytes_written_ = 0; sync_counter_ = 0; + non_writeable_rate_ = 0; + new_writable_count_ = 0; + periodic_non_writable_ = 0; } Status NewWritableFile(const std::string& f, unique_ptr* r, @@ -250,8 +261,19 @@ class SpecialEnv : public EnvWrapper { } }; - if (non_writable_.Acquire_Load() != nullptr) { - return Status::IOError("simulated write error"); + if (non_writeable_rate_.load(std::memory_order_acquire) > 0) { + auto random_number = rnd_.Uniform(100); + if (random_number < non_writeable_rate_.load()) { + return Status::IOError("simulated random write error"); + } + } + + new_writable_count_++; + + auto periodic_fail = periodic_non_writable_.load(); + if (periodic_fail > 0 && + new_writable_count_.load() % periodic_fail == 0) { + return Status::IOError("simulated periodic write error"); } Status s = target()->NewWritableFile(f, r, soptions); @@ -5864,7 +5886,7 @@ TEST(DBTest, NonWritableFileSystem) { options.env = env_; Reopen(&options); ASSERT_OK(Put("foo", "v1")); - env_->non_writable_.Release_Store(env_); // Force errors for new files + env_->non_writeable_rate_.store(100); // Force errors for new files std::string big(100000, 'x'); int errors = 0; for (int i = 0; i < 20; i++) { @@ -5874,7 +5896,7 @@ TEST(DBTest, NonWritableFileSystem) { } } ASSERT_GT(errors, 0); - env_->non_writable_.Release_Store(nullptr); + env_->non_writeable_rate_.store(0); } while (ChangeCompactOptions()); } @@ -8589,6 +8611,142 @@ TEST(DBTest, DynamicMemtableOptions) { ASSERT_TRUE(SizeAtLevel(0) > k128KB + k64KB - 2 * k5KB); } +TEST(DBTest, FileCreationRandomFailure) { + Options options; + options.env = env_; + options.create_if_missing = true; + options.write_buffer_size = 100000; // Small write buffer + options.target_file_size_base = 200000; + options.max_bytes_for_level_base = 1000000; + options.max_bytes_for_level_multiplier = 2; + + DestroyAndReopen(&options); + Random rnd(301); + + const int kTestSize = kCDTKeysPerBuffer * 4096; + const int kTotalIteration = 100; + // the second half of the test involves in random failure + // of file creation. + const int kRandomFailureTest = kTotalIteration / 2; + std::vector values; + for (int i = 0; i < kTestSize; ++i) { + values.push_back("NOT_FOUND"); + } + for (int j = 0; j < kTotalIteration; ++j) { + if (j == kRandomFailureTest) { + env_->non_writeable_rate_.store(90); + } + for (int k = 0; k < kTestSize; ++k) { + // here we expect some of the Put fails. + std::string value = RandomString(&rnd, 100); + Status s = Put(Key(k), Slice(value)); + if (s.ok()) { + // update the latest successful put + values[k] = value; + } + // But everything before we simulate the failure-test should succeed. + if (j < kRandomFailureTest) { + ASSERT_OK(s); + } + } + } + + // If rocksdb does not do the correct job, internal assert will fail here. + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + + // verify we have the latest successful update + for (int k = 0; k < kTestSize; ++k) { + auto v = Get(Key(k)); + ASSERT_EQ(v, values[k]); + } + + // reopen and reverify we have the latest successful update + env_->non_writeable_rate_.store(0); + Reopen(&options); + for (int k = 0; k < kTestSize; ++k) { + auto v = Get(Key(k)); + ASSERT_EQ(v, values[k]); + } +} + +TEST(DBTest, PartialCompactionFailure) { + Options options; + const int kKeySize = 16; + const int kKvSize = 1000; + const int kKeysPerBuffer = 100; + const int kNumL1Files = 5; + options.create_if_missing = true; + options.write_buffer_size = kKeysPerBuffer * kKvSize; + options.max_write_buffer_number = 2; + options.target_file_size_base = + options.write_buffer_size * + (options.max_write_buffer_number - 1); + options.level0_file_num_compaction_trigger = kNumL1Files; + options.max_bytes_for_level_base = + options.level0_file_num_compaction_trigger * + options.target_file_size_base; + options.max_bytes_for_level_multiplier = 2; + options.compression = kNoCompression; + + // The number of NewWritableFiles calls required by each operation. + const int kNumInitialNewWritableFiles = 4; + const int kNumLevel0FlushNewWritableFiles = + options.level0_file_num_compaction_trigger * 2; + const int kNumLevel1NewWritableFiles = + options.level0_file_num_compaction_trigger + 1; + // This setting will make one of the file-creation fail + // in the first L0 -> L1 compaction while making sure + // all flushes succeeed. + env_->periodic_non_writable_ = + kNumInitialNewWritableFiles + kNumLevel0FlushNewWritableFiles + + kNumLevel1NewWritableFiles - 3; + options.env = env_; + + DestroyAndReopen(&options); + + const int kNumKeys = + options.level0_file_num_compaction_trigger * + (options.max_write_buffer_number - 1) * + kKeysPerBuffer * 1.0; + + Random rnd(301); + std::vector keys; + std::vector values; + for (int k = 0; k < kNumKeys; ++k) { + keys.emplace_back(RandomString(&rnd, kKeySize)); + values.emplace_back(RandomString(&rnd, kKvSize - kKeySize)); + ASSERT_OK(Put(Slice(keys[k]), Slice(values[k]))); + } + + dbfull()->TEST_WaitForFlushMemTable(); + // Make sure the number of L0 files can trigger compaction. + ASSERT_GE(NumTableFilesAtLevel(0), + options.level0_file_num_compaction_trigger); + auto previous_num_level0_files = NumTableFilesAtLevel(0); + // Expect compaction to fail here as one file will fail its + // creation. + dbfull()->TEST_WaitForCompact(); + // Verify L0 -> L1 compaction does fail. + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + // Verify all L0 files are still there. + ASSERT_EQ(NumTableFilesAtLevel(0), previous_num_level0_files); + + // All key-values must exist after compaction fails. + for (int k = 0; k < kNumKeys; ++k) { + ASSERT_EQ(values[k], Get(keys[k])); + } + + // Make sure RocksDB will not get into corrupted state. + Reopen(&options); + + // Verify again after reopen. + for (int k = 0; k < kNumKeys; ++k) { + ASSERT_EQ(values[k], Get(keys[k])); + } +} + + TEST(DBTest, DynamicCompactionOptions) { const uint64_t k64KB = 1 << 16; const uint64_t k128KB = 1 << 17; @@ -8615,7 +8773,7 @@ TEST(DBTest, DynamicCompactionOptions) { options.max_bytes_for_level_multiplier = 4; DestroyAndReopen(&options); - auto gen_l0_kb = [this](int start, int size, int stride = 1) { + auto gen_l0_kb = [this](int start, int size, int stride) { Random rnd(301); std::vector values; for (int i = 0; i < size; i++) { @@ -8627,11 +8785,11 @@ TEST(DBTest, DynamicCompactionOptions) { // Write 3 files that have the same key range, trigger compaction and // result in one L1 file - gen_l0_kb(0, 128); + gen_l0_kb(0, 128, 1); ASSERT_EQ(NumTableFilesAtLevel(0), 1); - gen_l0_kb(0, 128); + gen_l0_kb(0, 128, 1); ASSERT_EQ(NumTableFilesAtLevel(0), 2); - gen_l0_kb(0, 128); + gen_l0_kb(0, 128, 1); dbfull()->TEST_WaitForCompact(); ASSERT_EQ("0,1", FilesPerLevel()); std::vector metadata; @@ -8646,9 +8804,9 @@ TEST(DBTest, DynamicCompactionOptions) { {"target_file_size_base", "65536"} })); - gen_l0_kb(0, 128); + gen_l0_kb(0, 128, 1); ASSERT_EQ("1,1", FilesPerLevel()); - gen_l0_kb(0, 128); + gen_l0_kb(0, 128, 1); dbfull()->TEST_WaitForCompact(); ASSERT_EQ("0,2", FilesPerLevel()); metadata.clear(); diff --git a/db/version_edit.cc b/db/version_edit.cc index 4e2cf8f5..2c77a271 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -64,7 +64,7 @@ void VersionEdit::Clear() { column_family_name_.clear(); } -void VersionEdit::EncodeTo(std::string* dst) const { +bool VersionEdit::EncodeTo(std::string* dst) const { if (has_comparator_) { PutVarint32(dst, kComparator); PutLengthPrefixedSlice(dst, comparator_); @@ -98,6 +98,9 @@ void VersionEdit::EncodeTo(std::string* dst) const { for (size_t i = 0; i < new_files_.size(); i++) { const FileMetaData& f = new_files_[i].second; + if (!f.smallest.Valid() || !f.largest.Valid()) { + return false; + } if (f.fd.GetPathId() == 0) { // Use older format to make sure user can roll back the build if they // don't config multiple DB paths. @@ -131,6 +134,7 @@ void VersionEdit::EncodeTo(std::string* dst) const { if (is_column_family_drop_) { PutVarint32(dst, kColumnFamilyDrop); } + return true; } static bool GetInternalKey(Slice* input, InternalKey* dst) { diff --git a/db/version_edit.h b/db/version_edit.h index db133402..1cf2e1b7 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -212,7 +212,8 @@ class VersionEdit { is_column_family_drop_ = true; } - void EncodeTo(std::string* dst) const; + // return true on success. + bool EncodeTo(std::string* dst) const; Status DecodeFrom(const Slice& src); std::string DebugString(bool hex_key = false) const; diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 850f242c..fe663c76 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -44,6 +44,16 @@ TEST(VersionEditTest, EncodeDecode) { TestEncodeDecode(edit); } +TEST(VersionEditTest, EncodeEmptyFile) { + VersionEdit edit; + edit.AddFile(0, 0, 0, 0, + InternalKey(), + InternalKey(), + 0, 0); + std::string buffer; + ASSERT_TRUE(!edit.EncodeTo(&buffer)); +} + TEST(VersionEditTest, ColumnFamilyTest) { VersionEdit edit; edit.SetColumnFamily(2); diff --git a/include/rocksdb/version.h b/include/rocksdb/version.h index 84ece982..a17dfc1d 100644 --- a/include/rocksdb/version.h +++ b/include/rocksdb/version.h @@ -6,7 +6,7 @@ #define ROCKSDB_MAJOR 3 #define ROCKSDB_MINOR 6 -#define ROCKSDB_PATCH 1 +#define ROCKSDB_PATCH 2 // Do not use these. We made the mistake of declaring macros starting with // double underscore. Now we have to live with our choice. We'll deprecate these