}
PERF_TIMER_START(write_pre_and_post_process_time);
+ if (!io_s.ok()) {
+ // Check WriteToWAL status
+ IOStatusCheck(io_s);
+ }
if (!w.CallbackFailed()) {
if (!io_s.ok()) {
assert(pre_release_cb_status.ok());
- IOStatusCheck(io_s);
} else {
WriteStatusCheck(pre_release_cb_status);
}
} else {
- assert(io_s.ok() && pre_release_cb_status.ok());
+ assert(pre_release_cb_status.ok());
}
if (need_log_sync) {
w.status = io_s;
}
- if (!w.CallbackFailed()) {
- if (!io_s.ok()) {
- IOStatusCheck(io_s);
- } else {
- WriteStatusCheck(w.status);
- }
+ if (!io_s.ok()) {
+ // Check WriteToWAL status
+ IOStatusCheck(io_s);
+ } else if (!w.CallbackFailed()) {
+ WriteStatusCheck(w.status);
}
if (need_log_sync) {
seq_inc = total_batch_cnt;
}
Status status;
- IOStatus io_s;
- io_s.PermitUncheckedError(); // Allow io_s to be uninitialized
if (!write_options.disableWAL) {
- io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
+ IOStatus io_s =
+ ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
status = io_s;
+ // last_sequence may not be set if there is an error
+ // This error checking and return is moved up to avoid using uninitialized
+ // last_sequence.
+ if (!io_s.ok()) {
+ IOStatusCheck(io_s);
+ write_thread->ExitAsBatchGroupLeader(write_group, status);
+ return status;
+ }
} else {
// Otherwise we inc seq number to do solely the seq allocation
last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
PERF_TIMER_START(write_pre_and_post_process_time);
if (!w.CallbackFailed()) {
- if (!io_s.ok()) {
- IOStatusCheck(io_s);
- } else {
- WriteStatusCheck(status);
- }
+ WriteStatusCheck(status);
}
if (status.ok()) {
size_t index = 0;
return status;
}
-WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
- WriteBatch* tmp_batch, size_t* write_with_wal,
- WriteBatch** to_be_cached_state) {
+Status DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
+ WriteBatch* tmp_batch, WriteBatch** merged_batch,
+ size_t* write_with_wal,
+ WriteBatch** to_be_cached_state) {
assert(write_with_wal != nullptr);
assert(tmp_batch != nullptr);
assert(*to_be_cached_state == nullptr);
- WriteBatch* merged_batch = nullptr;
*write_with_wal = 0;
auto* leader = write_group.leader;
assert(!leader->disable_wal); // Same holds for all in the batch group
// we simply write the first WriteBatch to WAL if the group only
// contains one batch, that batch should be written to the WAL,
// and the batch is not wanting to be truncated
- merged_batch = leader->batch;
- if (WriteBatchInternal::IsLatestPersistentState(merged_batch)) {
- *to_be_cached_state = merged_batch;
+ *merged_batch = leader->batch;
+ if (WriteBatchInternal::IsLatestPersistentState(*merged_batch)) {
+ *to_be_cached_state = *merged_batch;
}
*write_with_wal = 1;
} else {
// WAL needs all of the batches flattened into a single batch.
// We could avoid copying here with an iov-like AddRecord
// interface
- merged_batch = tmp_batch;
+ *merged_batch = tmp_batch;
for (auto writer : write_group) {
if (!writer->CallbackFailed()) {
- Status s = WriteBatchInternal::Append(merged_batch, writer->batch,
+ Status s = WriteBatchInternal::Append(*merged_batch, writer->batch,
/*WAL_only*/ true);
- // Always returns Status::OK.
- assert(s.ok());
+ if (!s.ok()) {
+ tmp_batch->Clear();
+ return s;
+ }
if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
// We only need to cache the last of such write batch
*to_be_cached_state = writer->batch;
}
}
}
- return merged_batch;
+ // return merged_batch;
+ return Status::OK();
}
// When two_write_queues_ is disabled, this function is called from the only
assert(log_size != nullptr);
Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
+ TEST_SYNC_POINT_CALLBACK("DBImpl::WriteToWAL:log_entry", &log_entry);
+ auto s = merged_batch.VerifyChecksum();
+ if (!s.ok()) {
+ return status_to_io_status(std::move(s));
+ }
*log_size = log_entry.size();
// When two_write_queues_ WriteToWAL has to be protected from concurretn calls
// from the two queues anyway and log_write_mutex_ is already held. Otherwise
// Same holds for all in the batch group
size_t write_with_wal = 0;
WriteBatch* to_be_cached_state = nullptr;
- WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_,
- &write_with_wal, &to_be_cached_state);
+ WriteBatch* merged_batch;
+ io_s = status_to_io_status(MergeBatch(write_group, &tmp_batch_, &merged_batch,
+ &write_with_wal, &to_be_cached_state));
+ if (UNLIKELY(!io_s.ok())) {
+ return io_s;
+ }
+
if (merged_batch == write_group.leader->batch) {
write_group.leader->log_used = logfile_number_;
} else if (write_with_wal > 1) {
WriteBatch tmp_batch;
size_t write_with_wal = 0;
WriteBatch* to_be_cached_state = nullptr;
- WriteBatch* merged_batch =
- MergeBatch(write_group, &tmp_batch, &write_with_wal, &to_be_cached_state);
+ WriteBatch* merged_batch;
+ io_s = status_to_io_status(MergeBatch(write_group, &tmp_batch, &merged_batch,
+ &write_with_wal, &to_be_cached_state));
+ if (UNLIKELY(!io_s.ok())) {
+ return io_s;
+ }
// We need to lock log_write_mutex_ since logs_ and alive_log_files might be
// pushed back concurrently
return static_cast<WriteBatchOpType>(static_cast<T>(lhs) + rhs);
}
+std::pair<WriteBatch, Status> GetWriteBatch(ColumnFamilyHandle* cf_handle,
+ WriteBatchOpType op_type) {
+ Status s;
+ WriteBatch wb(0 /* reserved_bytes */, 0 /* max_bytes */,
+ 8 /* protection_bytes_per_entry */, 0 /* default_cf_ts_sz */);
+ switch (op_type) {
+ case WriteBatchOpType::kPut:
+ s = wb.Put(cf_handle, "key", "val");
+ break;
+ case WriteBatchOpType::kDelete:
+ s = wb.Delete(cf_handle, "key");
+ break;
+ case WriteBatchOpType::kSingleDelete:
+ s = wb.SingleDelete(cf_handle, "key");
+ break;
+ case WriteBatchOpType::kDeleteRange:
+ s = wb.DeleteRange(cf_handle, "begin", "end");
+ break;
+ case WriteBatchOpType::kMerge:
+ s = wb.Merge(cf_handle, "key", "val");
+ break;
+ case WriteBatchOpType::kBlobIndex: {
+ // TODO(ajkr): use public API once available.
+ uint32_t cf_id;
+ if (cf_handle == nullptr) {
+ cf_id = 0;
+ } else {
+ cf_id = cf_handle->GetID();
+ }
+
+ std::string blob_index;
+ BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 9876543210,
+ "val");
+
+ s = WriteBatchInternal::PutBlobIndex(&wb, cf_id, "key", blob_index);
+ break;
+ }
+ case WriteBatchOpType::kNum:
+ assert(false);
+ }
+ return {std::move(wb), std::move(s)};
+}
+
class DbKvChecksumTest
: public DBTestBase,
public ::testing::WithParamInterface<std::tuple<WriteBatchOpType, char>> {
corrupt_byte_addend_ = std::get<1>(GetParam());
}
- std::pair<WriteBatch, Status> GetWriteBatch(ColumnFamilyHandle* cf_handle) {
- Status s;
- WriteBatch wb(0 /* reserved_bytes */, 0 /* max_bytes */,
- 8 /* protection_bytes_per_entry */, 0 /* default_cf_ts_sz */);
- switch (op_type_) {
- case WriteBatchOpType::kPut:
- s = wb.Put(cf_handle, "key", "val");
- break;
- case WriteBatchOpType::kDelete:
- s = wb.Delete(cf_handle, "key");
- break;
- case WriteBatchOpType::kSingleDelete:
- s = wb.SingleDelete(cf_handle, "key");
- break;
- case WriteBatchOpType::kDeleteRange:
- s = wb.DeleteRange(cf_handle, "begin", "end");
- break;
- case WriteBatchOpType::kMerge:
- s = wb.Merge(cf_handle, "key", "val");
- break;
- case WriteBatchOpType::kBlobIndex: {
- // TODO(ajkr): use public API once available.
- uint32_t cf_id;
- if (cf_handle == nullptr) {
- cf_id = 0;
- } else {
- cf_id = cf_handle->GetID();
- }
-
- std::string blob_index;
- BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 9876543210,
- "val");
-
- s = WriteBatchInternal::PutBlobIndex(&wb, cf_id, "key", blob_index);
- break;
- }
- case WriteBatchOpType::kNum:
- assert(false);
- }
- return {std::move(wb), std::move(s)};
- }
-
void CorruptNextByteCallBack(void* arg) {
Slice encoded = *static_cast<Slice*>(arg);
if (entry_len_ == std::numeric_limits<size_t>::max()) {
size_t entry_len_ = std::numeric_limits<size_t>::max();
};
-std::string GetTestNameSuffix(
- ::testing::TestParamInfo<std::tuple<WriteBatchOpType, char>> info) {
- std::ostringstream oss;
- switch (std::get<0>(info.param)) {
+std::string GetOpTypeString(const WriteBatchOpType& op_type) {
+ switch (op_type) {
case WriteBatchOpType::kPut:
- oss << "Put";
- break;
+ return "Put";
case WriteBatchOpType::kDelete:
- oss << "Delete";
- break;
+ return "Delete";
case WriteBatchOpType::kSingleDelete:
- oss << "SingleDelete";
- break;
+ return "SingleDelete";
case WriteBatchOpType::kDeleteRange:
- oss << "DeleteRange";
+ return "DeleteRange";
break;
case WriteBatchOpType::kMerge:
- oss << "Merge";
+ return "Merge";
break;
case WriteBatchOpType::kBlobIndex:
- oss << "BlobIndex";
+ return "BlobIndex";
break;
case WriteBatchOpType::kNum:
assert(false);
}
- oss << "Add"
- << static_cast<int>(static_cast<unsigned char>(std::get<1>(info.param)));
- return oss.str();
+ assert(false);
+ return "";
}
INSTANTIATE_TEST_CASE_P(
::testing::Combine(::testing::Range(static_cast<WriteBatchOpType>(0),
WriteBatchOpType::kNum),
::testing::Values(2, 103, 251)),
- GetTestNameSuffix);
+ [](const testing::TestParamInfo<std::tuple<WriteBatchOpType, char>>& args) {
+ std::ostringstream oss;
+ oss << GetOpTypeString(std::get<0>(args.param)) << "Add"
+ << static_cast<int>(
+ static_cast<unsigned char>(std::get<1>(args.param)));
+ return oss.str();
+ });
TEST_P(DbKvChecksumTest, MemTableAddCorrupted) {
// This test repeatedly attempts to write `WriteBatch`es containing a single
Reopen(options);
SyncPoint::GetInstance()->EnableProcessing();
- auto batch_and_status = GetWriteBatch(nullptr /* cf_handle */);
+ auto batch_and_status = GetWriteBatch(nullptr /* cf_handle */, op_type_);
ASSERT_OK(batch_and_status.second);
ASSERT_TRUE(
db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption());
SyncPoint::GetInstance()->DisableProcessing();
+
+ // In case the above callback is not invoked, this test will run
+ // numeric_limits<size_t>::max() times until it reports an error (or will
+ // exhaust disk space). Added this assert to report error early.
+ ASSERT_TRUE(entry_len_ < std::numeric_limits<size_t>::max());
}
}
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
SyncPoint::GetInstance()->EnableProcessing();
- auto batch_and_status = GetWriteBatch(handles_[1]);
+ auto batch_and_status = GetWriteBatch(handles_[1], op_type_);
+ ASSERT_OK(batch_and_status.second);
+ ASSERT_TRUE(
+ db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption());
+ SyncPoint::GetInstance()->DisableProcessing();
+
+ // In case the above callback is not invoked, this test will run
+ // numeric_limits<size_t>::max() times until it reports an error (or will
+ // exhaust disk space). Added this assert to report error early.
+ ASSERT_TRUE(entry_len_ < std::numeric_limits<size_t>::max());
+ }
+}
+
+TEST_P(DbKvChecksumTest, NoCorruptionCase) {
+ // If this test fails, we may have found a piece of malfunctioned hardware
+ auto batch_and_status = GetWriteBatch(nullptr, op_type_);
+ ASSERT_OK(batch_and_status.second);
+ ASSERT_OK(batch_and_status.first.VerifyChecksum());
+}
+
+TEST_P(DbKvChecksumTest, WriteToWALCorrupted) {
+ // This test repeatedly attempts to write `WriteBatch`es containing a single
+ // entry of type `op_type_`. Each attempt has one byte corrupted by adding
+ // `corrupt_byte_addend_` to its original value. The test repeats until an
+ // attempt has been made on each byte in the encoded write batch. All attempts
+ // are expected to fail with `Status::Corruption`
+ Options options = CurrentOptions();
+ if (op_type_ == WriteBatchOpType::kMerge) {
+ options.merge_operator = MergeOperators::CreateStringAppendOperator();
+ }
+ SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::WriteToWAL:log_entry",
+ std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this,
+ std::placeholders::_1));
+ // First 8 bytes are for sequence number which is not protected in write batch
+ corrupt_byte_offset_ = 8;
+
+ while (MoreBytesToCorrupt()) {
+ // Corrupted write batch leads to read-only mode, so we have to
+ // reopen for every attempt.
+ Reopen(options);
+ auto log_size_pre_write = dbfull()->TEST_total_log_size();
+
+ SyncPoint::GetInstance()->EnableProcessing();
+ auto batch_and_status = GetWriteBatch(nullptr /* cf_handle */, op_type_);
+ ASSERT_OK(batch_and_status.second);
+ ASSERT_TRUE(
+ db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption());
+ // Confirm that nothing was written to WAL
+ ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size());
+ ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
+ SyncPoint::GetInstance()->DisableProcessing();
+
+ // In case the above callback is not invoked, this test will run
+ // numeric_limits<size_t>::max() times until it reports an error (or will
+ // exhaust disk space). Added this assert to report error early.
+ ASSERT_TRUE(entry_len_ < std::numeric_limits<size_t>::max());
+ }
+}
+
+TEST_P(DbKvChecksumTest, WriteToWALWithColumnFamilyCorrupted) {
+ // This test repeatedly attempts to write `WriteBatch`es containing a single
+ // entry of type `op_type_`. Each attempt has one byte corrupted by adding
+ // `corrupt_byte_addend_` to its original value. The test repeats until an
+ // attempt has been made on each byte in the encoded write batch. All attempts
+ // are expected to fail with `Status::Corruption`
+ Options options = CurrentOptions();
+ if (op_type_ == WriteBatchOpType::kMerge) {
+ options.merge_operator = MergeOperators::CreateStringAppendOperator();
+ }
+ CreateAndReopenWithCF({"pikachu"}, options);
+ SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::WriteToWAL:log_entry",
+ std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this,
+ std::placeholders::_1));
+ // First 8 bytes are for sequence number which is not protected in write batch
+ corrupt_byte_offset_ = 8;
+
+ while (MoreBytesToCorrupt()) {
+ // Corrupted write batch leads to read-only mode, so we have to
+ // reopen for every attempt.
+ ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
+ auto log_size_pre_write = dbfull()->TEST_total_log_size();
+
+ SyncPoint::GetInstance()->EnableProcessing();
+ auto batch_and_status = GetWriteBatch(handles_[1], op_type_);
ASSERT_OK(batch_and_status.second);
ASSERT_TRUE(
db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption());
+ // Confirm that nothing was written to WAL
+ ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size());
+ ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
SyncPoint::GetInstance()->DisableProcessing();
+
+ // In case the above callback is not invoked, this test will run
+ // numeric_limits<size_t>::max() times until it reports an error (or will
+ // exhaust disk space). Added this assert to report error early.
+ ASSERT_TRUE(entry_len_ < std::numeric_limits<size_t>::max());
+ }
+}
+
+class DbKvChecksumTestMergedBatch
+ : public DBTestBase,
+ public ::testing::WithParamInterface<
+ std::tuple<WriteBatchOpType, WriteBatchOpType, char>> {
+ public:
+ DbKvChecksumTestMergedBatch()
+ : DBTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) {
+ op_type1_ = std::get<0>(GetParam());
+ op_type2_ = std::get<1>(GetParam());
+ corrupt_byte_addend_ = std::get<2>(GetParam());
}
+
+ protected:
+ WriteBatchOpType op_type1_;
+ WriteBatchOpType op_type2_;
+ char corrupt_byte_addend_;
+};
+
+void CorruptWriteBatch(Slice* content, size_t offset,
+ char corrupt_byte_addend) {
+ ASSERT_TRUE(offset < content->size());
+ char* buf = const_cast<char*>(content->data());
+ buf[offset] += corrupt_byte_addend;
+}
+
+TEST_P(DbKvChecksumTestMergedBatch, NoCorruptionCase) {
+ // Veirfy write batch checksum after write batch append
+ auto batch1 = GetWriteBatch(nullptr /* cf_handle */, op_type1_);
+ ASSERT_OK(batch1.second);
+ auto batch2 = GetWriteBatch(nullptr /* cf_handle */, op_type2_);
+ ASSERT_OK(batch2.second);
+ ASSERT_OK(WriteBatchInternal::Append(&batch1.first, &batch2.first));
+ ASSERT_OK(batch1.first.VerifyChecksum());
}
+TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) {
+ // This test has two writers repeatedly attempt to write `WriteBatch`es
+ // containing a single entry of type op_type1_ and op_type2_ respectively. The
+ // leader of the write group writes the batch containinng the entry of type
+ // op_type1_. One byte of the pre-merged write batches is corrupted by adding
+ // `corrupt_byte_addend_` to the batch's original value during each attempt.
+ // The test repeats until an attempt has been made on each byte in both
+ // pre-merged write batches. All attempts are expected to fail with
+ // `Status::Corruption`.
+ Options options = CurrentOptions();
+ if (op_type1_ == WriteBatchOpType::kMerge ||
+ op_type2_ == WriteBatchOpType::kMerge) {
+ options.merge_operator = MergeOperators::CreateStringAppendOperator();
+ }
+
+ auto leader_batch_and_status =
+ GetWriteBatch(nullptr /* cf_handle */, op_type1_);
+ ASSERT_OK(leader_batch_and_status.second);
+ auto follower_batch_and_status =
+ GetWriteBatch(nullptr /* cf_handle */, op_type2_);
+ size_t leader_batch_size = leader_batch_and_status.first.GetDataSize();
+ size_t total_bytes =
+ leader_batch_size + follower_batch_and_status.first.GetDataSize();
+ // First 8 bytes are for sequence number which is not protected in write batch
+ size_t corrupt_byte_offset = 8;
+
+ std::atomic<bool> follower_joined{false};
+ std::atomic<int> leader_count{0};
+ port::Thread follower_thread;
+ // This callback should only be called by the leader thread
+ SyncPoint::GetInstance()->SetCallBack(
+ "WriteThread::JoinBatchGroup:Wait2", [&](void* arg_leader) {
+ auto* leader = reinterpret_cast<WriteThread::Writer*>(arg_leader);
+ ASSERT_EQ(leader->state, WriteThread::STATE_GROUP_LEADER);
+
+ // This callback should only be called by the follower thread
+ SyncPoint::GetInstance()->SetCallBack(
+ "WriteThread::JoinBatchGroup:Wait", [&](void* arg_follower) {
+ auto* follower =
+ reinterpret_cast<WriteThread::Writer*>(arg_follower);
+ // The leader thread will wait on this bool and hence wait until
+ // this writer joins the write group
+ ASSERT_NE(follower->state, WriteThread::STATE_GROUP_LEADER);
+ if (corrupt_byte_offset >= leader_batch_size) {
+ Slice batch_content = follower->batch->Data();
+ CorruptWriteBatch(&batch_content,
+ corrupt_byte_offset - leader_batch_size,
+ corrupt_byte_addend_);
+ }
+ // Leader busy waits on this flag
+ follower_joined = true;
+ // So the follower does not enter the outer callback at
+ // WriteThread::JoinBatchGroup:Wait2
+ SyncPoint::GetInstance()->DisableProcessing();
+ });
+
+ // Start the other writer thread which will join the write group as
+ // follower
+ follower_thread = port::Thread([&]() {
+ follower_batch_and_status =
+ GetWriteBatch(nullptr /* cf_handle */, op_type2_);
+ ASSERT_OK(follower_batch_and_status.second);
+ ASSERT_TRUE(
+ db_->Write(WriteOptions(), &follower_batch_and_status.first)
+ .IsCorruption());
+ });
+
+ ASSERT_EQ(leader->batch->GetDataSize(), leader_batch_size);
+ if (corrupt_byte_offset < leader_batch_size) {
+ Slice batch_content = leader->batch->Data();
+ CorruptWriteBatch(&batch_content, corrupt_byte_offset,
+ corrupt_byte_addend_);
+ }
+ leader_count++;
+ while (!follower_joined) {
+ // busy waiting
+ }
+ });
+ while (corrupt_byte_offset < total_bytes) {
+ // Reopen DB since it failed WAL write which lead to read-only mode
+ Reopen(options);
+ SyncPoint::GetInstance()->EnableProcessing();
+ auto log_size_pre_write = dbfull()->TEST_total_log_size();
+ leader_batch_and_status = GetWriteBatch(nullptr /* cf_handle */, op_type1_);
+ ASSERT_OK(leader_batch_and_status.second);
+ ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first)
+ .IsCorruption());
+ follower_thread.join();
+ // Prevent leader thread from entering this callback
+ SyncPoint::GetInstance()->ClearCallBack("WriteThread::JoinBatchGroup:Wait");
+ ASSERT_EQ(1, leader_count);
+ // Nothing should have been written to WAL
+ ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size());
+ ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
+
+ corrupt_byte_offset++;
+ if (corrupt_byte_offset == leader_batch_size) {
+ // skip over the sequence number part of follower's write batch
+ corrupt_byte_offset += 8;
+ }
+ follower_joined = false;
+ leader_count = 0;
+ }
+ SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) {
+ // This test has two writers repeatedly attempt to write `WriteBatch`es
+ // containing a single entry of type op_type1_ and op_type2_ respectively. The
+ // leader of the write group writes the batch containinng the entry of type
+ // op_type1_. One byte of the pre-merged write batches is corrupted by adding
+ // `corrupt_byte_addend_` to the batch's original value during each attempt.
+ // The test repeats until an attempt has been made on each byte in both
+ // pre-merged write batches. All attempts are expected to fail with
+ // `Status::Corruption`.
+ Options options = CurrentOptions();
+ if (op_type1_ == WriteBatchOpType::kMerge ||
+ op_type2_ == WriteBatchOpType::kMerge) {
+ options.merge_operator = MergeOperators::CreateStringAppendOperator();
+ }
+ CreateAndReopenWithCF({"ramen"}, options);
+
+ auto leader_batch_and_status = GetWriteBatch(handles_[1], op_type1_);
+ ASSERT_OK(leader_batch_and_status.second);
+ auto follower_batch_and_status = GetWriteBatch(handles_[1], op_type2_);
+ size_t leader_batch_size = leader_batch_and_status.first.GetDataSize();
+ size_t total_bytes =
+ leader_batch_size + follower_batch_and_status.first.GetDataSize();
+ // First 8 bytes are for sequence number which is not protected in write batch
+ size_t corrupt_byte_offset = 8;
+
+ std::atomic<bool> follower_joined{false};
+ std::atomic<int> leader_count{0};
+ port::Thread follower_thread;
+ // This callback should only be called by the leader thread
+ SyncPoint::GetInstance()->SetCallBack(
+ "WriteThread::JoinBatchGroup:Wait2", [&](void* arg_leader) {
+ auto* leader = reinterpret_cast<WriteThread::Writer*>(arg_leader);
+ ASSERT_EQ(leader->state, WriteThread::STATE_GROUP_LEADER);
+
+ // This callback should only be called by the follower thread
+ SyncPoint::GetInstance()->SetCallBack(
+ "WriteThread::JoinBatchGroup:Wait", [&](void* arg_follower) {
+ auto* follower =
+ reinterpret_cast<WriteThread::Writer*>(arg_follower);
+ // The leader thread will wait on this bool and hence wait until
+ // this writer joins the write group
+ ASSERT_NE(follower->state, WriteThread::STATE_GROUP_LEADER);
+ if (corrupt_byte_offset >= leader_batch_size) {
+ Slice batch_content =
+ WriteBatchInternal::Contents(follower->batch);
+ CorruptWriteBatch(&batch_content,
+ corrupt_byte_offset - leader_batch_size,
+ corrupt_byte_addend_);
+ }
+ follower_joined = true;
+ // So the follower does not enter the outer callback at
+ // WriteThread::JoinBatchGroup:Wait2
+ SyncPoint::GetInstance()->DisableProcessing();
+ });
+
+ // Start the other writer thread which will join the write group as
+ // follower
+ follower_thread = port::Thread([&]() {
+ follower_batch_and_status = GetWriteBatch(handles_[1], op_type2_);
+ ASSERT_OK(follower_batch_and_status.second);
+ ASSERT_TRUE(
+ db_->Write(WriteOptions(), &follower_batch_and_status.first)
+ .IsCorruption());
+ });
+
+ ASSERT_EQ(leader->batch->GetDataSize(), leader_batch_size);
+ if (corrupt_byte_offset < leader_batch_size) {
+ Slice batch_content = WriteBatchInternal::Contents(leader->batch);
+ CorruptWriteBatch(&batch_content, corrupt_byte_offset,
+ corrupt_byte_addend_);
+ }
+ leader_count++;
+ while (!follower_joined) {
+ // busy waiting
+ }
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+ while (corrupt_byte_offset < total_bytes) {
+ // Reopen DB since it failed WAL write which lead to read-only mode
+ ReopenWithColumnFamilies({kDefaultColumnFamilyName, "ramen"}, options);
+ SyncPoint::GetInstance()->EnableProcessing();
+ auto log_size_pre_write = dbfull()->TEST_total_log_size();
+ leader_batch_and_status = GetWriteBatch(handles_[1], op_type1_);
+ ASSERT_OK(leader_batch_and_status.second);
+ ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first)
+ .IsCorruption());
+ follower_thread.join();
+ // Prevent leader thread from entering this callback
+ SyncPoint::GetInstance()->ClearCallBack("WriteThread::JoinBatchGroup:Wait");
+
+ ASSERT_EQ(1, leader_count);
+ // Nothing should have been written to WAL
+ ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size());
+ ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
+
+ corrupt_byte_offset++;
+ if (corrupt_byte_offset == leader_batch_size) {
+ // skip over the sequence number part of follower's write batch
+ corrupt_byte_offset += 8;
+ }
+ follower_joined = false;
+ leader_count = 0;
+ }
+ SyncPoint::GetInstance()->DisableProcessing();
+}
+
+INSTANTIATE_TEST_CASE_P(
+ DbKvChecksumTestMergedBatch, DbKvChecksumTestMergedBatch,
+ ::testing::Combine(::testing::Range(static_cast<WriteBatchOpType>(0),
+ WriteBatchOpType::kNum),
+ ::testing::Range(static_cast<WriteBatchOpType>(0),
+ WriteBatchOpType::kNum),
+ ::testing::Values(2, 103, 251)),
+ [](const testing::TestParamInfo<
+ std::tuple<WriteBatchOpType, WriteBatchOpType, char>>& args) {
+ std::ostringstream oss;
+ oss << GetOpTypeString(std::get<0>(args.param))
+ << GetOpTypeString(std::get<1>(args.param)) << "Add"
+ << static_cast<int>(
+ static_cast<unsigned char>(std::get<2>(args.param)));
+ return oss.str();
+ });
+
+// TODO: add test for transactions
+// TODO: add test for corrupted write batch with WAL disabled
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {