]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
db/log_reader: treat bad record length or checksum as EOF
authorSage Weil <sage@redhat.com>
Fri, 11 Dec 2015 22:17:43 +0000 (17:17 -0500)
committerSage Weil <sage@redhat.com>
Fri, 11 Dec 2015 22:26:31 +0000 (17:26 -0500)
If we are in kTolerateCorruptedTailRecords, treat these
errors as the end of the log.  This is particularly
important for recycled logs, where we will regularly see
corrupted headers (bad length or checksum) when replaying
a log.  If we are aligned with a block boundary or get lucky,
we will land on an old header and see the log number
mismatch, but more commonly we will land midway through
some previous block and record and effectively see noise.
These must be treated as the end of the log in order for
recycling to work.

This makes the LogTest.Recycle/1 test pass.

We also modify a number of existing tests because the
recycled log files behave fundamentally differently in that
they always stop when they reach the first bad record.

Signed-off-by: Sage Weil <sage@redhat.com>
db/log_reader.cc
db/log_reader.h
db/log_test.cc

index 5dad79abca2813a4ca081876a5c6e46edbbbfed3..1ed8daa801fd5b3efab320e8f9ced1ea56a667e1 100644 (file)
@@ -22,9 +22,8 @@ Reader::Reporter::~Reporter() {
 }
 
 Reader::Reader(std::shared_ptr<Logger> info_log,
-              unique_ptr<SequentialFileReader>&& _file,
-               Reporter* reporter, bool checksum, uint64_t initial_offset,
-               uint64_t log_num)
+               unique_ptr<SequentialFileReader>&& _file, Reporter* reporter,
+               bool checksum, uint64_t initial_offset, uint64_t log_num)
     : info_log_(info_log),
       file_(std::move(_file)),
       reporter_(reporter),
@@ -37,7 +36,8 @@ Reader::Reader(std::shared_ptr<Logger> info_log,
       last_record_offset_(0),
       end_of_buffer_offset_(0),
       initial_offset_(initial_offset),
-      log_number_(log_num) {}
+      log_number_(log_num),
+      recycled_(false) {}
 
 Reader::~Reader() {
   delete[] backing_store_;
@@ -192,6 +192,12 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
         break;
 
       case kBadRecordLen:
+        if (recycled_ &&
+            wal_recovery_mode ==
+                WALRecoveryMode::kTolerateCorruptedTailRecords) {
+          scratch->clear();
+          return false;
+        }
         ReportCorruption(drop_size, "bad record length");
         if (in_fragmented_record) {
           ReportCorruption(scratch->size(), "error in middle of record");
@@ -201,6 +207,12 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
         break;
 
       case kBadRecordChecksum:
+        if (recycled_ &&
+            wal_recovery_mode ==
+                WALRecoveryMode::kTolerateCorruptedTailRecords) {
+          scratch->clear();
+          return false;
+        }
         ReportCorruption(drop_size, "checksum mismatch");
         if (in_fragmented_record) {
           ReportCorruption(scratch->size(), "error in middle of record");
@@ -355,6 +367,9 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
     const uint32_t length = a | (b << 8);
     int header_size = kHeaderSize;
     if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
+      if (end_of_buffer_offset_ - buffer_.size() == 0) {
+        recycled_ = true;
+      }
       header_size = kRecyclableHeaderSize;
       // We need enough for the larger header
       if (buffer_.size() < (size_t)kRecyclableHeaderSize) {
index 78dec5b94d27c57f3180f15545e114cd4d5d89be..d0d7ad6f3472c74f55d02c9dd89e35d948006ce8 100644 (file)
@@ -113,6 +113,9 @@ class Reader {
   // which log number this is
   uint64_t const log_number_;
 
+  // Whether this is a recycled log file
+  bool recycled_;
+
   // Extend record types with the following special values
   enum {
     kEof = kMaxRecordType + 1,
index a69c5ece872e86add85e935c082f20a809a617b0..648237b4103ad28c5087e8c5ce166a9331f02c2e 100644 (file)
@@ -447,9 +447,13 @@ TEST_P(LogTest, BadLength) {
   Write("foo");
   // Least significant size byte is stored in header[4].
   IncrementByte(4, 1);
-  ASSERT_EQ("foo", Read());
-  ASSERT_EQ(kBlockSize, DroppedBytes());
-  ASSERT_EQ("OK", MatchError("bad record length"));
+  if (!GetParam()) {
+    ASSERT_EQ("foo", Read());
+    ASSERT_EQ(kBlockSize, DroppedBytes());
+    ASSERT_EQ("OK", MatchError("bad record length"));
+  } else {
+    ASSERT_EQ("EOF", Read());
+  }
 }
 
 TEST_P(LogTest, BadLengthAtEndIsIgnored) {
@@ -472,8 +476,13 @@ TEST_P(LogTest, ChecksumMismatch) {
   Write("foooooo");
   IncrementByte(0, 14);
   ASSERT_EQ("EOF", Read());
-  ASSERT_EQ(14U + 4 * !!GetParam(), DroppedBytes());
-  ASSERT_EQ("OK", MatchError("checksum mismatch"));
+  if (!GetParam()) {
+    ASSERT_EQ(14U, DroppedBytes());
+    ASSERT_EQ("OK", MatchError("checksum mismatch"));
+  } else {
+    ASSERT_EQ(0U, DroppedBytes());
+    ASSERT_EQ("", ReportMessage());
+  }
 }
 
 TEST_P(LogTest, UnexpectedMiddleType) {
@@ -570,11 +579,15 @@ TEST_P(LogTest, ErrorJoinsRecords) {
     SetByte(offset, 'x');
   }
 
-  ASSERT_EQ("correct", Read());
-  ASSERT_EQ("EOF", Read());
-  size_t dropped = DroppedBytes();
-  ASSERT_LE(dropped, 2 * kBlockSize + 100);
-  ASSERT_GE(dropped, 2 * kBlockSize);
+  if (!GetParam()) {
+    ASSERT_EQ("correct", Read());
+    ASSERT_EQ("EOF", Read());
+    size_t dropped = DroppedBytes();
+    ASSERT_LE(dropped, 2 * kBlockSize + 100);
+    ASSERT_GE(dropped, 2 * kBlockSize);
+  } else {
+    ASSERT_EQ("EOF", Read());
+  }
 }
 
 TEST_P(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }