db_options_.wal_recovery_mode, !continue_replay_log);
// Determine if we should tolerate incomplete records at the tail end of the
- // log
- bool report_eof_inconsistency;
- if (db_options_.wal_recovery_mode ==
- WALRecoveryMode::kAbsoluteConsistency) {
- // in clean shutdown we don't expect any error in the log files
- report_eof_inconsistency = true;
- } else {
- // for other modes ignore only incomplete records in the last log file
- // which is presumably due to write in progress during restart
- report_eof_inconsistency = false;
-
- // TODO krad: Evaluate if we need to move to a more strict mode where we
- // restrict the inconsistency to only the last log
- }
-
// Read all the records and add to a memtable
std::string scratch;
Slice record;
}
}
- while (continue_replay_log &&
- reader.ReadRecord(&record, &scratch, report_eof_inconsistency) &&
- status.ok()) {
+ while (
+ continue_replay_log &&
+ reader.ReadRecord(&record, &scratch, db_options_.wal_recovery_mode) &&
+ status.ok()) {
if (record.size() < 12) {
reporter.Corruption(record.size(),
Status::Corruption("log record too small"));
return true;
}
+// For kAbsoluteConsistency, on clean shutdown we don't expect any error
+// in the log files. For other modes, we can ignore only incomplete records
+// in the last log file, which are presumably due to a write in progress
+// during restart (or from log recycling).
+//
+// TODO krad: Evaluate if we need to move to a more strict mode where we
+// restrict the inconsistency to only the last log
bool Reader::ReadRecord(Slice* record, std::string* scratch,
- const bool report_eof_inconsistency) {
+ WALRecoveryMode wal_recovery_mode) {
if (last_record_offset_ < initial_offset_) {
if (!SkipToInitialBlock()) {
return false;
while (true) {
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
const unsigned int record_type =
- ReadPhysicalRecord(&fragment, report_eof_inconsistency);
+ ReadPhysicalRecord(&fragment, wal_recovery_mode);
switch (record_type) {
case kFullType:
if (in_fragmented_record && !scratch->empty()) {
case kEof:
if (in_fragmented_record) {
- if (report_eof_inconsistency) {
+ if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
+ // in clean shutdown we don't expect any error in the log files
ReportCorruption(scratch->size(), "error reading trailing data");
}
// This can be caused by the writer dying immediately after
}
unsigned int Reader::ReadPhysicalRecord(Slice* result,
- const bool report_eof_inconsistency) {
+ WALRecoveryMode wal_recovery_mode) {
while (true) {
if (buffer_.size() < (size_t)kHeaderSize) {
if (!eof_ && !read_error_) {
// end of the file, which can be caused by the writer crashing in the
// middle of writing the header. Unless explicitly requested we don't
// considering this an error, just report EOF.
- if (buffer_.size() && report_eof_inconsistency) {
+ if (buffer_.size() &&
+ wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
+ // in clean shutdown we don't expect any error in the log files
ReportCorruption(buffer_.size(), "truncated header");
}
buffer_.clear();
// If the end of the file has been reached without reading |length| bytes
// of payload, assume the writer died in the middle of writing the record.
// Don't report a corruption unless requested.
- if (drop_size && report_eof_inconsistency) {
+ if (drop_size &&
+ wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
+ // in clean shutdown we don't expect any error in the log files
ReportCorruption(drop_size, "truncated header");
}
return kEof;
#include "db/log_format.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
+#include "rocksdb/options.h"
namespace rocksdb {
// will only be valid until the next mutating operation on this
// reader or the next mutation to *scratch.
bool ReadRecord(Slice* record, std::string* scratch,
- bool report_eof_inconsistency = false);
+ WALRecoveryMode wal_recovery_mode =
+ WALRecoveryMode::kTolerateCorruptedTailRecords);
// Returns the physical offset of the last record returned by ReadRecord.
//
bool SkipToInitialBlock();
// Return type, or one of the preceding special values
- unsigned int ReadPhysicalRecord(Slice* result,
- bool report_eof_inconsistency = false);
+ unsigned int ReadPhysicalRecord(
+ Slice* result, WALRecoveryMode wal_recovery_mode =
+ WALRecoveryMode::kTolerateCorruptedTailRecords);
// Reports dropped bytes to the reporter.
// buffer_ must be updated to remove the dropped bytes prior to invocation.
return dest_contents().size();
}
- std::string Read(const bool report_eof_inconsistency = false) {
+ std::string Read(const WALRecoveryMode wal_recovery_mode =
+ WALRecoveryMode::kTolerateCorruptedTailRecords) {
std::string scratch;
Slice record;
- if (reader_.ReadRecord(&record, &scratch, report_eof_inconsistency)) {
+ if (reader_.ReadRecord(&record, &scratch, wal_recovery_mode)) {
return record.ToString();
} else {
return "EOF";
TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
Write("foo");
ShrinkSize(4); // Drop all payload as well as a header byte
- ASSERT_EQ("EOF", Read(/*report_eof_inconsistency*/ true));
+ ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
// Truncated last record is ignored, not treated as an error
ASSERT_GT(DroppedBytes(), 0U);
ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
TEST_P(LogTest, BadLengthAtEndIsNotIgnored) {
Write("foo");
ShrinkSize(1);
- ASSERT_EQ("EOF", Read(/*report_eof_inconsistency=*/true));
+ ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
ASSERT_GT(DroppedBytes(), 0U);
ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
}
Write(BigString("bar", kBlockSize));
// Remove the LAST block, including header.
ShrinkSize(14);
- ASSERT_EQ("EOF", Read(/*report_eof_inconsistency=*/true));
+ ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
ASSERT_GT(DroppedBytes(), 0U);
ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data"));
}
Write(BigString("bar", kBlockSize));
// Cause a bad record length in the LAST block.
ShrinkSize(1);
- ASSERT_EQ("EOF", Read(/*report_eof_inconsistency=*/true));
+ ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
ASSERT_GT(DroppedBytes(), 0U);
ASSERT_EQ("OK", MatchError(
"Corruption: truncated headerCorruption: "