From: Jay Huh Date: Tue, 15 Oct 2024 01:26:17 +0000 (-0700) Subject: Add file_checksum from FileChecksumGenFactory and Tests for corrupted output (#13060) X-Git-Tag: v9.7.3~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=eca4f106bb9994045e31ca463187ecf5dc211b3f;p=rocksdb.git Add file_checksum from FileChecksumGenFactory and Tests for corrupted output (#13060) Summary: - When `FileChecksumGenFactory` is set, include the `file_checksum` and `file_checksum_func_name` in the output file metadata - ~~In Remote Compaction, try opening the output files in the temporary directory to do a quick sanity check before returning the result with status.~~ - After offline discussion, we decided to rely on Primary's existing Compaction flow to sanity check the output files. If the output file is corrupted, we will still be able to catch it and not installing it even after renaming them to cf_paths. The corrupted file in the cf_path won't be added to the MANIFEST and will be purged as part of the next `PurgeObsoleteFiles()` call. - Unit Test has been added to validate above. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13060 Test Plan: Unit test added ``` ./compaction_service_test --gtest_filter="*CorruptedOutput*" ./compaction_service_test --gtest_filter="*TruncatedOutput*" ./compaction_service_test --gtest_filter="*CustomFileChecksum*" ./compaction_job_test --gtest_filter="*ResultSerialization*" ``` Reviewed By: cbi42 Differential Revision: D64189645 Pulled By: jaykorean fbshipit-source-id: 6cf28720169c960c80df257806bfee3c0d177159 --- diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index dd3b53737..c379e0010 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -413,19 +413,22 @@ struct CompactionServiceOutputFile { SequenceNumber largest_seqno; std::string smallest_internal_key; std::string largest_internal_key; - uint64_t oldest_ancester_time; - uint64_t file_creation_time; - uint64_t epoch_number; + uint64_t oldest_ancester_time = kUnknownOldestAncesterTime; + uint64_t file_creation_time = kUnknownFileCreationTime; + uint64_t epoch_number = kUnknownEpochNumber; + std::string file_checksum = kUnknownFileChecksum; + std::string file_checksum_func_name = kUnknownFileChecksumFuncName; uint64_t paranoid_hash; bool marked_for_compaction; - UniqueId64x2 unique_id; + UniqueId64x2 unique_id{}; CompactionServiceOutputFile() = default; CompactionServiceOutputFile( const std::string& name, SequenceNumber smallest, SequenceNumber largest, std::string _smallest_internal_key, std::string _largest_internal_key, uint64_t _oldest_ancester_time, uint64_t _file_creation_time, - uint64_t _epoch_number, uint64_t _paranoid_hash, + uint64_t _epoch_number, const std::string& _file_checksum, + const std::string& _file_checksum_func_name, uint64_t _paranoid_hash, bool _marked_for_compaction, UniqueId64x2 _unique_id) : file_name(name), smallest_seqno(smallest), @@ -435,6 +438,8 @@ struct CompactionServiceOutputFile { oldest_ancester_time(_oldest_ancester_time), file_creation_time(_file_creation_time), epoch_number(_epoch_number), + file_checksum(_file_checksum), + file_checksum_func_name(_file_checksum_func_name), paranoid_hash(_paranoid_hash), marked_for_compaction(_marked_for_compaction), unique_id(std::move(_unique_id)) {} diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index f8a78bc9b..f9ea675ef 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -1656,15 +1656,26 @@ TEST_F(CompactionJobTest, ResultSerialization) { }; result.status = status_list.at(rnd.Uniform(static_cast(status_list.size()))); + + std::string file_checksum = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)); + std::string file_checksum_func_name = "MyAwesomeChecksumGenerator"; while (!rnd.OneIn(10)) { UniqueId64x2 id{rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX)}; result.output_files.emplace_back( - rnd.RandomString(rnd.Uniform(kStrMaxLen)), rnd64.Uniform(UINT64_MAX), - rnd64.Uniform(UINT64_MAX), - rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)), - rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)), - rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX), - rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX), rnd.OneIn(2), id); + rnd.RandomString(rnd.Uniform(kStrMaxLen)) /* file_name */, + rnd64.Uniform(UINT64_MAX) /* smallest_seqno */, + rnd64.Uniform(UINT64_MAX) /* largest_seqno */, + rnd.RandomBinaryString( + rnd.Uniform(kStrMaxLen)) /* smallest_internal_key */, + rnd.RandomBinaryString( + rnd.Uniform(kStrMaxLen)) /* largest_internal_key */, + rnd64.Uniform(UINT64_MAX) /* oldest_ancester_time */, + rnd64.Uniform(UINT64_MAX) /* file_creation_time */, + rnd64.Uniform(UINT64_MAX) /* epoch_number */, + file_checksum /* file_checksum */, + file_checksum_func_name /* file_checksum_func_name */, + rnd64.Uniform(UINT64_MAX) /* paranoid_hash */, + rnd.OneIn(2) /* marked_for_compaction */, id); } result.output_level = rnd.Uniform(10); result.output_path = rnd.RandomString(rnd.Uniform(kStrMaxLen)); @@ -1700,6 +1711,10 @@ TEST_F(CompactionJobTest, ResultSerialization) { ASSERT_FALSE(deserialized_tmp.TEST_Equals(&result, &mismatch)); ASSERT_EQ(mismatch, "output_files.unique_id"); deserialized_tmp.status.PermitUncheckedError(); + + ASSERT_EQ(deserialized_tmp.output_files[0].file_checksum, file_checksum); + ASSERT_EQ(deserialized_tmp.output_files[0].file_checksum_func_name, + file_checksum_func_name); } // Test unknown field diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index b34c7e662..e87d1a2bf 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -195,6 +195,8 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( meta.oldest_ancester_time = file.oldest_ancester_time; meta.file_creation_time = file.file_creation_time; meta.epoch_number = file.epoch_number; + meta.file_checksum = file.file_checksum; + meta.file_checksum_func_name = file.file_checksum_func_name; meta.marked_for_compaction = file.marked_for_compaction; meta.unique_id = file.unique_id; @@ -320,9 +322,6 @@ Status CompactionServiceCompactionJob::Run() { if (status.ok()) { status = io_s; } - if (status.ok()) { - // TODO: Add verify_table() - } // Finish up all book-keeping to unify the subcompaction results compact_->AggregateCompactionStats(compaction_stats_, *compaction_job_stats_); @@ -343,10 +342,14 @@ Status CompactionServiceCompactionJob::Run() { MakeTableFileName(meta.fd.GetNumber()), meta.fd.smallest_seqno, meta.fd.largest_seqno, meta.smallest.Encode().ToString(), meta.largest.Encode().ToString(), meta.oldest_ancester_time, - meta.file_creation_time, meta.epoch_number, - output_file.validator.GetHash(), meta.marked_for_compaction, - meta.unique_id); + meta.file_creation_time, meta.epoch_number, meta.file_checksum, + meta.file_checksum_func_name, output_file.validator.GetHash(), + meta.marked_for_compaction, meta.unique_id); } + + TEST_SYNC_POINT_CALLBACK("CompactionServiceCompactionJob::Run:0", + &compaction_result_); + InternalStats::CompactionStatsFull compaction_stats; sub_compact->AggregateCompactionStats(compaction_stats); compaction_result_->num_output_records = @@ -471,6 +474,14 @@ static std::unordered_map {offsetof(struct CompactionServiceOutputFile, epoch_number), OptionType::kUInt64T, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"file_checksum", + {offsetof(struct CompactionServiceOutputFile, file_checksum), + OptionType::kEncodedString, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"file_checksum_func_name", + {offsetof(struct CompactionServiceOutputFile, file_checksum_func_name), + OptionType::kEncodedString, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, {"paranoid_hash", {offsetof(struct CompactionServiceOutputFile, paranoid_hash), OptionType::kUInt64T, OptionVerificationType::kNormal, diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index bb53a4029..2d6dcf464 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -396,6 +396,249 @@ TEST_F(CompactionServiceTest, ManualCompaction) { ASSERT_TRUE(result.stats.is_remote_compaction); } +TEST_F(CompactionServiceTest, CorruptedOutput) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + ReopenWithCompactionService(&options); + GenerateTestData(); + + auto my_cs = GetCompactionService(); + + std::string start_str = Key(15); + std::string end_str = Key(45); + Slice start(start_str); + Slice end(end_str); + uint64_t comp_num = my_cs->GetCompactionNum(); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionServiceCompactionJob::Run:0", [&](void* arg) { + CompactionServiceResult* compaction_result = + *(static_cast(arg)); + ASSERT_TRUE(compaction_result != nullptr && + !compaction_result->output_files.empty()); + // Corrupt files here + for (const auto& output_file : compaction_result->output_files) { + std::string file_name = + compaction_result->output_path + "/" + output_file.file_name; + + uint64_t file_size = 0; + Status s = options.env->GetFileSize(file_name, &file_size); + ASSERT_OK(s); + ASSERT_GT(file_size, 0); + + ASSERT_OK(test::CorruptFile(env_, file_name, 0, + static_cast(file_size), + true /* verifyChecksum */)); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + // CompactRange() should fail + Status s = db_->CompactRange(CompactRangeOptions(), &start, &end); + ASSERT_NOK(s); + ASSERT_TRUE(s.IsCorruption()); + + ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + // On the worker side, the compaction is considered success + // Verification is done on the primary side + CompactionServiceResult result; + my_cs->GetResult(&result); + ASSERT_OK(result.status); + ASSERT_TRUE(result.stats.is_manual_compaction); + ASSERT_TRUE(result.stats.is_remote_compaction); +} + +TEST_F(CompactionServiceTest, CorruptedOutputParanoidFileCheck) { + for (bool paranoid_file_check_enabled : {false, true}) { + SCOPED_TRACE("paranoid_file_check_enabled=" + + std::to_string(paranoid_file_check_enabled)); + + Options options = CurrentOptions(); + Destroy(options); + options.disable_auto_compactions = true; + options.paranoid_file_checks = paranoid_file_check_enabled; + ReopenWithCompactionService(&options); + GenerateTestData(); + + auto my_cs = GetCompactionService(); + + std::string start_str = Key(15); + std::string end_str = Key(45); + Slice start(start_str); + Slice end(end_str); + uint64_t comp_num = my_cs->GetCompactionNum(); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionServiceCompactionJob::Run:0", [&](void* arg) { + CompactionServiceResult* compaction_result = + *(static_cast(arg)); + ASSERT_TRUE(compaction_result != nullptr && + !compaction_result->output_files.empty()); + // Corrupt files here + for (const auto& output_file : compaction_result->output_files) { + std::string file_name = + compaction_result->output_path + "/" + output_file.file_name; + + // Corrupt very small range of bytes. This corruption is so small + // that this isn't caught by default light-weight check + ASSERT_OK(test::CorruptFile(env_, file_name, 0, 1, + false /* verifyChecksum */)); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + Status s = db_->CompactRange(CompactRangeOptions(), &start, &end); + if (paranoid_file_check_enabled) { + ASSERT_NOK(s); + ASSERT_EQ(Status::Corruption("Paranoid checksums do not match"), s); + } else { + // CompactRange() goes through if paranoid file check is not enabled + ASSERT_OK(s); + } + + ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + // On the worker side, the compaction is considered success + // Verification is done on the primary side + CompactionServiceResult result; + my_cs->GetResult(&result); + ASSERT_OK(result.status); + ASSERT_TRUE(result.stats.is_manual_compaction); + ASSERT_TRUE(result.stats.is_remote_compaction); + } +} + +TEST_F(CompactionServiceTest, TruncatedOutput) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + ReopenWithCompactionService(&options); + GenerateTestData(); + + auto my_cs = GetCompactionService(); + + std::string start_str = Key(15); + std::string end_str = Key(45); + Slice start(start_str); + Slice end(end_str); + uint64_t comp_num = my_cs->GetCompactionNum(); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionServiceCompactionJob::Run:0", [&](void* arg) { + CompactionServiceResult* compaction_result = + *(static_cast(arg)); + ASSERT_TRUE(compaction_result != nullptr && + !compaction_result->output_files.empty()); + // Truncate files here + for (const auto& output_file : compaction_result->output_files) { + std::string file_name = + compaction_result->output_path + "/" + output_file.file_name; + + uint64_t file_size = 0; + Status s = options.env->GetFileSize(file_name, &file_size); + ASSERT_OK(s); + ASSERT_GT(file_size, 0); + + ASSERT_OK(test::TruncateFile(env_, file_name, file_size / 2)); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + // CompactRange() should fail + Status s = db_->CompactRange(CompactRangeOptions(), &start, &end); + ASSERT_NOK(s); + ASSERT_TRUE(s.IsCorruption()); + + ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + // On the worker side, the compaction is considered success + // Verification is done on the primary side + CompactionServiceResult result; + my_cs->GetResult(&result); + ASSERT_OK(result.status); + ASSERT_TRUE(result.stats.is_manual_compaction); + ASSERT_TRUE(result.stats.is_remote_compaction); +} + +TEST_F(CompactionServiceTest, CustomFileChecksum) { + Options options = CurrentOptions(); + options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory(); + ReopenWithCompactionService(&options); + GenerateTestData(); + + auto my_cs = GetCompactionService(); + + std::string start_str = Key(15); + std::string end_str = Key(45); + Slice start(start_str); + Slice end(end_str); + uint64_t comp_num = my_cs->GetCompactionNum(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionServiceCompactionJob::Run:0", [&](void* arg) { + CompactionServiceResult* compaction_result = + *(static_cast(arg)); + ASSERT_TRUE(compaction_result != nullptr && + !compaction_result->output_files.empty()); + // Validate Checksum files here + for (const auto& output_file : compaction_result->output_files) { + std::string file_name = + compaction_result->output_path + "/" + output_file.file_name; + + FileChecksumGenContext gen_context; + gen_context.file_name = file_name; + std::unique_ptr file_checksum_gen = + options.file_checksum_gen_factory->CreateFileChecksumGenerator( + gen_context); + + std::unique_ptr file_reader; + uint64_t file_size = 0; + Status s = options.env->GetFileSize(file_name, &file_size); + ASSERT_OK(s); + ASSERT_GT(file_size, 0); + + s = options.env->NewSequentialFile(file_name, &file_reader, + EnvOptions()); + ASSERT_OK(s); + + Slice result; + std::unique_ptr scratch(new char[file_size]); + s = file_reader->Read(file_size, &result, scratch.get()); + ASSERT_OK(s); + + file_checksum_gen->Update(scratch.get(), result.size()); + file_checksum_gen->Finalize(); + + // Verify actual checksum and the func name + ASSERT_EQ(file_checksum_gen->Name(), + output_file.file_checksum_func_name); + ASSERT_EQ(file_checksum_gen->GetChecksum(), + output_file.file_checksum); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end)); + ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + CompactionServiceResult result; + my_cs->GetResult(&result); + ASSERT_OK(result.status); + ASSERT_TRUE(result.stats.is_manual_compaction); + ASSERT_TRUE(result.stats.is_remote_compaction); +} + TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) { Options options = CurrentOptions(); options.disable_auto_compactions = true;