]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
log_reader: handle modified CRCs
authorSage Weil <sage@redhat.com>
Thu, 8 Oct 2015 17:09:22 +0000 (13:09 -0400)
committerSage Weil <sage@redhat.com>
Thu, 8 Oct 2015 17:41:07 +0000 (13:41 -0400)
If we detect a CRC error on the first record of a log file, check to see
whether the CRC has the log_number mixed in.  If so, we set the recycled_
flag, and thereafter assume that the CRC will have log_number mixed in.
This raises our probability of a false-positive CRC to 2/2^32 only
for the first record, but leaves it at 1/2^32 for the rest of each file.

We also adjust the ReadPhysicalRecord return codes to treat any garbage it
sees as EOF, unless the wal_recovery_mode is kSkipAnyCorruptedRecords.

Signed-off-by: Sage Weil <sage@redhat.com>
db/log_reader.cc
db/log_reader.h
db/log_test.cc

index cf5c26152b4895ca81e915b87e6b59d2dc501538..3c3fab3a792e4acb8bff7af171e3ff2b35ae64c2 100644 (file)
@@ -33,6 +33,7 @@ Reader::Reader(std::shared_ptr<Logger> info_log,
       buffer_(),
       eof_(false),
       read_error_(false),
+      recycled_(false),
       eof_offset_(0),
       last_record_offset_(0),
       end_of_buffer_offset_(0),
@@ -300,8 +301,20 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result,
       size_t drop_size = buffer_.size();
       buffer_.clear();
       if (!eof_) {
-        ReportCorruption(drop_size, "bad record length");
-        return kBadRecord;
+        // If the log file is recycled, we can ignore any error
+        // here--we are probably seeing garbage from a previous
+        // incarnation of the log file.  Treat it as EOF.
+        //
+        // Note that in the case of kAbsoluteConsistency we rely on
+        // the file being truncated so that there is no trailing
+        // garbage.
+        if (recycled_ &&
+            wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) {
+          return kEof;
+        } else {
+          ReportCorruption(drop_size, "bad record length");
+          return kBadRecord;
+        }
       }
       // If the end of the file has been reached without reading |length| bytes
       // of payload, assume the writer died in the middle of writing the record.
@@ -318,25 +331,61 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result,
       // Skip zero length record without reporting any drops since
       // such records are produced by the mmap based writing code in
       // env_posix.cc that preallocates file regions.
+      //
       // NOTE: this should never happen in DB written by new RocksDB versions,
       // since we turn off mmap writes to manifest and log files
       buffer_.clear();
+      if (recycled_ &&
+          wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) {
+        return kEof;
+      }
       return kBadRecord;
     }
 
     // Check crc
     if (checksum_) {
-      uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
       uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
+      uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
+      // Check for simple CRC (no log number) if this is the first
+      // record of the file.
+      bool try_recycled = false;
+      if (!recycled_ && last_record_offset_ == 0 &&
+          actual_crc != expected_crc) {
+        // It failed; try using modified (recycled) CRC
+        try_recycled = true;
+      }
+      if (recycled_ || try_recycled) {
+        expected_crc ^= static_cast<uint32_t>(log_number_);
+        if (try_recycled && info_log_) {
+          Log(InfoLogLevel::INFO_LEVEL, info_log_,
+              "ReadPhysicalRecord file is recycled; using alt CRC\n");
+          LogFlush(info_log_);
+        }
+        if (try_recycled && actual_crc == expected_crc) {
+          // We failed the normal CRC but we matched a recycled CRC.. this
+          // must be a recycled file.
+          recycled_ = true;
+        }
+      }
       if (actual_crc != expected_crc) {
         // Drop the rest of the buffer since "length" itself may have
         // been corrupted and if we trust it, we could find some
         // fragment of a real log record that just happens to look
         // like a valid log record.
-        size_t drop_size = buffer_.size();
-        buffer_.clear();
-        ReportCorruption(drop_size, "checksum mismatch");
-        return kBadRecord;
+        //
+        // If the log file is recycled, we can treat this like EOF--it
+        // is most likely a record from the previous incarnation of
+        // the log file.
+        if (recycled_ &&
+            wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) {
+          buffer_.clear();
+          return kEof;
+        } else {
+          size_t drop_size = buffer_.size();
+          ReportCorruption(drop_size, "checksum mismatch");
+          buffer_.clear();
+          return kBadRecord;
+        }
       }
     }
 
index bb86dfda734ac22f20d719cb1f08fc2cb906717c..2a65783d57f8d9d9a25859ab5cd86fb34e09fb5e 100644 (file)
@@ -97,6 +97,7 @@ class Reader {
   Slice buffer_;
   bool eof_;   // Last Read() indicated EOF by returning < kBlockSize
   bool read_error_;   // Error occurred while reading from file
+  bool recycled_;     // true if current file was recycled
 
   // Offset of the file position indicator within the last block when an
   // EOF was detected.
index 22921090a25a88862e74cd469051fd7e04b4137a..a0526195242ac9c09490d26ed5be5d1d2f856326 100644 (file)
@@ -560,11 +560,19 @@ TEST_P(LogTest, ErrorJoinsRecords) {
     SetByte(offset, 'x');
   }
 
-  ASSERT_EQ("correct", Read());
-  ASSERT_EQ("EOF", Read());
-  size_t dropped = DroppedBytes();
-  ASSERT_LE(dropped, 2 * kBlockSize + 100);
-  ASSERT_GE(dropped, 2 * kBlockSize);
+  if (GetParam() == 0) {
+    ASSERT_EQ("correct", Read());
+    ASSERT_EQ("EOF", Read());
+    size_t dropped = DroppedBytes();
+    ASSERT_LE(dropped, 2 * kBlockSize + 100);
+    ASSERT_GE(dropped, 2 * kBlockSize);
+  } else {
+    // in the recycle case, we stop when we hit the first record that
+    // is not valid.
+    ASSERT_EQ("EOF", Read());
+    size_t dropped = DroppedBytes();
+    ASSERT_EQ(dropped, 0);
+  }
 }
 
 TEST_P(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }
@@ -667,7 +675,7 @@ TEST_P(LogTest, ClearEofError2) {
   ASSERT_EQ("OK", MatchError("read error"));
 }
 
-INSTANTIATE_TEST_CASE_P(bool, LogTest, ::testing::Values(0, 2));
+INSTANTIATE_TEST_CASE_P(Recycle, LogTest, ::testing::Values(0, 2));
 
 }  // namespace log
 }  // namespace rocksdb