SequenceNumber largest_seqno;
std::string smallest_internal_key;
std::string largest_internal_key;
- uint64_t oldest_ancester_time;
- uint64_t file_creation_time;
- uint64_t epoch_number;
+ uint64_t oldest_ancester_time = kUnknownOldestAncesterTime;
+ uint64_t file_creation_time = kUnknownFileCreationTime;
+ uint64_t epoch_number = kUnknownEpochNumber;
+ std::string file_checksum = kUnknownFileChecksum;
+ std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
uint64_t paranoid_hash;
bool marked_for_compaction;
- UniqueId64x2 unique_id;
+ UniqueId64x2 unique_id{};
CompactionServiceOutputFile() = default;
CompactionServiceOutputFile(
const std::string& name, SequenceNumber smallest, SequenceNumber largest,
std::string _smallest_internal_key, std::string _largest_internal_key,
uint64_t _oldest_ancester_time, uint64_t _file_creation_time,
- uint64_t _epoch_number, uint64_t _paranoid_hash,
+ uint64_t _epoch_number, const std::string& _file_checksum,
+ const std::string& _file_checksum_func_name, uint64_t _paranoid_hash,
bool _marked_for_compaction, UniqueId64x2 _unique_id)
: file_name(name),
smallest_seqno(smallest),
oldest_ancester_time(_oldest_ancester_time),
file_creation_time(_file_creation_time),
epoch_number(_epoch_number),
+ file_checksum(_file_checksum),
+ file_checksum_func_name(_file_checksum_func_name),
paranoid_hash(_paranoid_hash),
marked_for_compaction(_marked_for_compaction),
unique_id(std::move(_unique_id)) {}
};
result.status =
status_list.at(rnd.Uniform(static_cast<int>(status_list.size())));
+
+ std::string file_checksum = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen));
+ std::string file_checksum_func_name = "MyAwesomeChecksumGenerator";
while (!rnd.OneIn(10)) {
UniqueId64x2 id{rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX)};
result.output_files.emplace_back(
- rnd.RandomString(rnd.Uniform(kStrMaxLen)), rnd64.Uniform(UINT64_MAX),
- rnd64.Uniform(UINT64_MAX),
- rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)),
- rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)),
- rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX),
- rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX), rnd.OneIn(2), id);
+ rnd.RandomString(rnd.Uniform(kStrMaxLen)) /* file_name */,
+ rnd64.Uniform(UINT64_MAX) /* smallest_seqno */,
+ rnd64.Uniform(UINT64_MAX) /* largest_seqno */,
+ rnd.RandomBinaryString(
+ rnd.Uniform(kStrMaxLen)) /* smallest_internal_key */,
+ rnd.RandomBinaryString(
+ rnd.Uniform(kStrMaxLen)) /* largest_internal_key */,
+ rnd64.Uniform(UINT64_MAX) /* oldest_ancester_time */,
+ rnd64.Uniform(UINT64_MAX) /* file_creation_time */,
+ rnd64.Uniform(UINT64_MAX) /* epoch_number */,
+ file_checksum /* file_checksum */,
+ file_checksum_func_name /* file_checksum_func_name */,
+ rnd64.Uniform(UINT64_MAX) /* paranoid_hash */,
+ rnd.OneIn(2) /* marked_for_compaction */, id);
}
result.output_level = rnd.Uniform(10);
result.output_path = rnd.RandomString(rnd.Uniform(kStrMaxLen));
ASSERT_FALSE(deserialized_tmp.TEST_Equals(&result, &mismatch));
ASSERT_EQ(mismatch, "output_files.unique_id");
deserialized_tmp.status.PermitUncheckedError();
+
+ ASSERT_EQ(deserialized_tmp.output_files[0].file_checksum, file_checksum);
+ ASSERT_EQ(deserialized_tmp.output_files[0].file_checksum_func_name,
+ file_checksum_func_name);
}
// Test unknown field
meta.oldest_ancester_time = file.oldest_ancester_time;
meta.file_creation_time = file.file_creation_time;
meta.epoch_number = file.epoch_number;
+ meta.file_checksum = file.file_checksum;
+ meta.file_checksum_func_name = file.file_checksum_func_name;
meta.marked_for_compaction = file.marked_for_compaction;
meta.unique_id = file.unique_id;
if (status.ok()) {
status = io_s;
}
- if (status.ok()) {
- // TODO: Add verify_table()
- }
// Finish up all book-keeping to unify the subcompaction results
compact_->AggregateCompactionStats(compaction_stats_, *compaction_job_stats_);
MakeTableFileName(meta.fd.GetNumber()), meta.fd.smallest_seqno,
meta.fd.largest_seqno, meta.smallest.Encode().ToString(),
meta.largest.Encode().ToString(), meta.oldest_ancester_time,
- meta.file_creation_time, meta.epoch_number,
- output_file.validator.GetHash(), meta.marked_for_compaction,
- meta.unique_id);
+ meta.file_creation_time, meta.epoch_number, meta.file_checksum,
+ meta.file_checksum_func_name, output_file.validator.GetHash(),
+ meta.marked_for_compaction, meta.unique_id);
}
+
+ TEST_SYNC_POINT_CALLBACK("CompactionServiceCompactionJob::Run:0",
+ &compaction_result_);
+
InternalStats::CompactionStatsFull compaction_stats;
sub_compact->AggregateCompactionStats(compaction_stats);
compaction_result_->num_output_records =
{offsetof(struct CompactionServiceOutputFile, epoch_number),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
+ {"file_checksum",
+ {offsetof(struct CompactionServiceOutputFile, file_checksum),
+ OptionType::kEncodedString, OptionVerificationType::kNormal,
+ OptionTypeFlags::kNone}},
+ {"file_checksum_func_name",
+ {offsetof(struct CompactionServiceOutputFile, file_checksum_func_name),
+ OptionType::kEncodedString, OptionVerificationType::kNormal,
+ OptionTypeFlags::kNone}},
{"paranoid_hash",
{offsetof(struct CompactionServiceOutputFile, paranoid_hash),
OptionType::kUInt64T, OptionVerificationType::kNormal,
ASSERT_TRUE(result.stats.is_remote_compaction);
}
+TEST_F(CompactionServiceTest, CorruptedOutput) {
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ ReopenWithCompactionService(&options);
+ GenerateTestData();
+
+ auto my_cs = GetCompactionService();
+
+ std::string start_str = Key(15);
+ std::string end_str = Key(45);
+ Slice start(start_str);
+ Slice end(end_str);
+ uint64_t comp_num = my_cs->GetCompactionNum();
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionServiceCompactionJob::Run:0", [&](void* arg) {
+ CompactionServiceResult* compaction_result =
+ *(static_cast<CompactionServiceResult**>(arg));
+ ASSERT_TRUE(compaction_result != nullptr &&
+ !compaction_result->output_files.empty());
+ // Corrupt files here
+ for (const auto& output_file : compaction_result->output_files) {
+ std::string file_name =
+ compaction_result->output_path + "/" + output_file.file_name;
+
+ uint64_t file_size = 0;
+ Status s = options.env->GetFileSize(file_name, &file_size);
+ ASSERT_OK(s);
+ ASSERT_GT(file_size, 0);
+
+ ASSERT_OK(test::CorruptFile(env_, file_name, 0,
+ static_cast<int>(file_size),
+ true /* verifyChecksum */));
+ }
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ // CompactRange() should fail
+ Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
+ ASSERT_NOK(s);
+ ASSERT_TRUE(s.IsCorruption());
+
+ ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ // On the worker side, the compaction is considered success
+ // Verification is done on the primary side
+ CompactionServiceResult result;
+ my_cs->GetResult(&result);
+ ASSERT_OK(result.status);
+ ASSERT_TRUE(result.stats.is_manual_compaction);
+ ASSERT_TRUE(result.stats.is_remote_compaction);
+}
+
+TEST_F(CompactionServiceTest, CorruptedOutputParanoidFileCheck) {
+ for (bool paranoid_file_check_enabled : {false, true}) {
+ SCOPED_TRACE("paranoid_file_check_enabled=" +
+ std::to_string(paranoid_file_check_enabled));
+
+ Options options = CurrentOptions();
+ Destroy(options);
+ options.disable_auto_compactions = true;
+ options.paranoid_file_checks = paranoid_file_check_enabled;
+ ReopenWithCompactionService(&options);
+ GenerateTestData();
+
+ auto my_cs = GetCompactionService();
+
+ std::string start_str = Key(15);
+ std::string end_str = Key(45);
+ Slice start(start_str);
+ Slice end(end_str);
+ uint64_t comp_num = my_cs->GetCompactionNum();
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionServiceCompactionJob::Run:0", [&](void* arg) {
+ CompactionServiceResult* compaction_result =
+ *(static_cast<CompactionServiceResult**>(arg));
+ ASSERT_TRUE(compaction_result != nullptr &&
+ !compaction_result->output_files.empty());
+ // Corrupt files here
+ for (const auto& output_file : compaction_result->output_files) {
+ std::string file_name =
+ compaction_result->output_path + "/" + output_file.file_name;
+
+ // Corrupt very small range of bytes. This corruption is so small
+ // that this isn't caught by default light-weight check
+ ASSERT_OK(test::CorruptFile(env_, file_name, 0, 1,
+ false /* verifyChecksum */));
+ }
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
+ if (paranoid_file_check_enabled) {
+ ASSERT_NOK(s);
+ ASSERT_EQ(Status::Corruption("Paranoid checksums do not match"), s);
+ } else {
+ // CompactRange() goes through if paranoid file check is not enabled
+ ASSERT_OK(s);
+ }
+
+ ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ // On the worker side, the compaction is considered success
+ // Verification is done on the primary side
+ CompactionServiceResult result;
+ my_cs->GetResult(&result);
+ ASSERT_OK(result.status);
+ ASSERT_TRUE(result.stats.is_manual_compaction);
+ ASSERT_TRUE(result.stats.is_remote_compaction);
+ }
+}
+
+TEST_F(CompactionServiceTest, TruncatedOutput) {
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ ReopenWithCompactionService(&options);
+ GenerateTestData();
+
+ auto my_cs = GetCompactionService();
+
+ std::string start_str = Key(15);
+ std::string end_str = Key(45);
+ Slice start(start_str);
+ Slice end(end_str);
+ uint64_t comp_num = my_cs->GetCompactionNum();
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionServiceCompactionJob::Run:0", [&](void* arg) {
+ CompactionServiceResult* compaction_result =
+ *(static_cast<CompactionServiceResult**>(arg));
+ ASSERT_TRUE(compaction_result != nullptr &&
+ !compaction_result->output_files.empty());
+ // Truncate files here
+ for (const auto& output_file : compaction_result->output_files) {
+ std::string file_name =
+ compaction_result->output_path + "/" + output_file.file_name;
+
+ uint64_t file_size = 0;
+ Status s = options.env->GetFileSize(file_name, &file_size);
+ ASSERT_OK(s);
+ ASSERT_GT(file_size, 0);
+
+ ASSERT_OK(test::TruncateFile(env_, file_name, file_size / 2));
+ }
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ // CompactRange() should fail
+ Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
+ ASSERT_NOK(s);
+ ASSERT_TRUE(s.IsCorruption());
+
+ ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ // On the worker side, the compaction is considered success
+ // Verification is done on the primary side
+ CompactionServiceResult result;
+ my_cs->GetResult(&result);
+ ASSERT_OK(result.status);
+ ASSERT_TRUE(result.stats.is_manual_compaction);
+ ASSERT_TRUE(result.stats.is_remote_compaction);
+}
+
+TEST_F(CompactionServiceTest, CustomFileChecksum) {
+ Options options = CurrentOptions();
+ options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
+ ReopenWithCompactionService(&options);
+ GenerateTestData();
+
+ auto my_cs = GetCompactionService();
+
+ std::string start_str = Key(15);
+ std::string end_str = Key(45);
+ Slice start(start_str);
+ Slice end(end_str);
+ uint64_t comp_num = my_cs->GetCompactionNum();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionServiceCompactionJob::Run:0", [&](void* arg) {
+ CompactionServiceResult* compaction_result =
+ *(static_cast<CompactionServiceResult**>(arg));
+ ASSERT_TRUE(compaction_result != nullptr &&
+ !compaction_result->output_files.empty());
+ // Validate Checksum files here
+ for (const auto& output_file : compaction_result->output_files) {
+ std::string file_name =
+ compaction_result->output_path + "/" + output_file.file_name;
+
+ FileChecksumGenContext gen_context;
+ gen_context.file_name = file_name;
+ std::unique_ptr<FileChecksumGenerator> file_checksum_gen =
+ options.file_checksum_gen_factory->CreateFileChecksumGenerator(
+ gen_context);
+
+ std::unique_ptr<SequentialFile> file_reader;
+ uint64_t file_size = 0;
+ Status s = options.env->GetFileSize(file_name, &file_size);
+ ASSERT_OK(s);
+ ASSERT_GT(file_size, 0);
+
+ s = options.env->NewSequentialFile(file_name, &file_reader,
+ EnvOptions());
+ ASSERT_OK(s);
+
+ Slice result;
+ std::unique_ptr<char[]> scratch(new char[file_size]);
+ s = file_reader->Read(file_size, &result, scratch.get());
+ ASSERT_OK(s);
+
+ file_checksum_gen->Update(scratch.get(), result.size());
+ file_checksum_gen->Finalize();
+
+ // Verify actual checksum and the func name
+ ASSERT_EQ(file_checksum_gen->Name(),
+ output_file.file_checksum_func_name);
+ ASSERT_EQ(file_checksum_gen->GetChecksum(),
+ output_file.file_checksum);
+ }
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
+ ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ CompactionServiceResult result;
+ my_cs->GetResult(&result);
+ ASSERT_OK(result.status);
+ ASSERT_TRUE(result.stats.is_manual_compaction);
+ ASSERT_TRUE(result.stats.is_remote_compaction);
+}
+
TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;