}
Reader::Reader(std::shared_ptr<Logger> info_log,
- unique_ptr<SequentialFileReader>&& _file,
- Reporter* reporter, bool checksum, uint64_t initial_offset,
- uint64_t log_num)
+ unique_ptr<SequentialFileReader>&& _file, Reporter* reporter,
+ bool checksum, uint64_t initial_offset, uint64_t log_num)
: info_log_(info_log),
file_(std::move(_file)),
reporter_(reporter),
last_record_offset_(0),
end_of_buffer_offset_(0),
initial_offset_(initial_offset),
- log_number_(log_num) {}
+ log_number_(log_num),
+ recycled_(false) {}
Reader::~Reader() {
delete[] backing_store_;
break;
case kBadRecordLen:
+ if (recycled_ &&
+ wal_recovery_mode ==
+ WALRecoveryMode::kTolerateCorruptedTailRecords) {
+ scratch->clear();
+ return false;
+ }
ReportCorruption(drop_size, "bad record length");
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record");
break;
case kBadRecordChecksum:
+ if (recycled_ &&
+ wal_recovery_mode ==
+ WALRecoveryMode::kTolerateCorruptedTailRecords) {
+ scratch->clear();
+ return false;
+ }
ReportCorruption(drop_size, "checksum mismatch");
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record");
const uint32_t length = a | (b << 8);
int header_size = kHeaderSize;
if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
+ if (end_of_buffer_offset_ - buffer_.size() == 0) {
+ recycled_ = true;
+ }
header_size = kRecyclableHeaderSize;
// We need enough for the larger header
if (buffer_.size() < (size_t)kRecyclableHeaderSize) {
Write("foo");
// Least significant size byte is stored in header[4].
IncrementByte(4, 1);
- ASSERT_EQ("foo", Read());
- ASSERT_EQ(kBlockSize, DroppedBytes());
- ASSERT_EQ("OK", MatchError("bad record length"));
+ if (!GetParam()) {
+ ASSERT_EQ("foo", Read());
+ ASSERT_EQ(kBlockSize, DroppedBytes());
+ ASSERT_EQ("OK", MatchError("bad record length"));
+ } else {
+ ASSERT_EQ("EOF", Read());
+ }
}
TEST_P(LogTest, BadLengthAtEndIsIgnored) {
Write("foooooo");
IncrementByte(0, 14);
ASSERT_EQ("EOF", Read());
- ASSERT_EQ(14U + 4 * !!GetParam(), DroppedBytes());
- ASSERT_EQ("OK", MatchError("checksum mismatch"));
+ if (!GetParam()) {
+ ASSERT_EQ(14U, DroppedBytes());
+ ASSERT_EQ("OK", MatchError("checksum mismatch"));
+ } else {
+ ASSERT_EQ(0U, DroppedBytes());
+ ASSERT_EQ("", ReportMessage());
+ }
}
TEST_P(LogTest, UnexpectedMiddleType) {
SetByte(offset, 'x');
}
- ASSERT_EQ("correct", Read());
- ASSERT_EQ("EOF", Read());
- size_t dropped = DroppedBytes();
- ASSERT_LE(dropped, 2 * kBlockSize + 100);
- ASSERT_GE(dropped, 2 * kBlockSize);
+ if (!GetParam()) {
+ ASSERT_EQ("correct", Read());
+ ASSERT_EQ("EOF", Read());
+ size_t dropped = DroppedBytes();
+ ASSERT_LE(dropped, 2 * kBlockSize + 100);
+ ASSERT_GE(dropped, 2 * kBlockSize);
+ } else {
+ ASSERT_EQ("EOF", Read());
+ }
}
TEST_P(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }