]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
log_reader: pass log_number and optional info_log to ctor
authorSage Weil <sage@redhat.com>
Thu, 8 Oct 2015 17:06:16 +0000 (13:06 -0400)
committerSage Weil <sage@redhat.com>
Thu, 8 Oct 2015 17:41:06 +0000 (13:41 -0400)
We will need the log number to validate the recycle-style CRCs.  The log
is helpful for debugging, but optional, as not all callers have it.

Signed-off-by: Sage Weil <sage@redhat.com>
db/db_impl.cc
db/log_reader.cc
db/log_reader.h
db/log_test.cc
db/repair.cc
db/transaction_log_impl.cc
db/version_set.cc
db/wal_manager.cc
util/ldb_cmd.cc

index 533e43938aa6e74e54335a4cab4b1a68cff14d1d..ab54309107f5e6da7d2fcbdfe888b1bac4be8492 100644 (file)
@@ -1117,8 +1117,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
     // paranoid_checks==false so that corruptions cause entire commits
     // to be skipped instead of propagating bad information (like overly
     // large sequence numbers).
-    log::Reader reader(std::move(file_reader), &reporter, true /*checksum*/,
-                       0 /*initial_offset*/);
+    log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
+                       true /*checksum*/, 0 /*initial_offset*/, log_number);
     Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
         "Recovering log #%" PRIu64 " mode %d skip-recovery %d", log_number,
         db_options_.wal_recovery_mode, !continue_replay_log);
index 296f1d50c049f26eedface48af18697908583c51..44396eb7acc64ee8f4108a277552edda06e6b647 100644 (file)
@@ -21,9 +21,12 @@ namespace log {
 Reader::Reporter::~Reporter() {
 }
 
-Reader::Reader(unique_ptr<SequentialFileReader>&& _file, Reporter* reporter,
-               bool checksum, uint64_t initial_offset)
-    : file_(std::move(_file)),
+Reader::Reader(std::shared_ptr<Logger> info_log,
+              unique_ptr<SequentialFileReader>&& _file,
+               Reporter* reporter, bool checksum, uint64_t initial_offset,
+               uint64_t log_num)
+    : info_log_(info_log),
+      file_(std::move(_file)),
       reporter_(reporter),
       checksum_(checksum),
       backing_store_(new char[kBlockSize]),
@@ -33,7 +36,8 @@ Reader::Reader(unique_ptr<SequentialFileReader>&& _file, Reporter* reporter,
       eof_offset_(0),
       last_record_offset_(0),
       end_of_buffer_offset_(0),
-      initial_offset_(initial_offset) {}
+      initial_offset_(initial_offset),
+      log_number_(log_num) {}
 
 Reader::~Reader() {
   delete[] backing_store_;
index 390696b8546c80e8470cd145492f135ef546f7fe..1b0a592c9b1d3bb184e2e0a8d573c0bfd1e90205 100644 (file)
@@ -18,6 +18,7 @@
 namespace rocksdb {
 
 class SequentialFileReader;
+class Logger;
 using std::unique_ptr;
 
 namespace log {
@@ -51,8 +52,10 @@ class Reader {
   //
   // The Reader will start reading at the first record located at physical
   // position >= initial_offset within the file.
-  Reader(unique_ptr<SequentialFileReader>&& file, Reporter* reporter,
-         bool checksum, uint64_t initial_offset);
+  Reader(std::shared_ptr<Logger> info_log,
+        unique_ptr<SequentialFileReader>&& file,
+         Reporter* reporter, bool checksum, uint64_t initial_offset,
+         uint64_t log_num);
 
   ~Reader();
 
@@ -84,6 +87,7 @@ class Reader {
   SequentialFileReader* file() { return file_.get(); }
 
  private:
+  std::shared_ptr<Logger> info_log_;
   const unique_ptr<SequentialFileReader> file_;
   Reporter* const reporter_;
   bool const checksum_;
@@ -104,6 +108,9 @@ class Reader {
   // Offset at which to start looking for the first record to return
   uint64_t const initial_offset_;
 
+  // which log number this is
+  uint64_t const log_number_;
+
   // Extend record types with the following special values
   enum {
     kEof = kMaxRecordType + 1,
index 55ecd5f3fcf80fa22df7d72796815ed9dd4f27df..55f8a968233c9df5f85a2dd6ab703561bac4bfb2 100644 (file)
@@ -163,8 +163,8 @@ class LogTest : public ::testing::TestWithParam<int> {
         source_holder_(
             test::GetSequentialFileReader(new StringSource(reader_contents_))),
         writer_(std::move(dest_holder_), 123, GetParam()),
-        reader_(std::move(source_holder_), &report_, true /*checksum*/,
-                0 /*initial_offset*/) {}
+        reader_(NULL, std::move(source_holder_), &report_,
+                true /*checksum*/, 0 /*initial_offset*/, 123) {}
 
   void Write(const std::string& msg) {
     writer_.AddRecord(Slice(msg));
@@ -258,8 +258,8 @@ class LogTest : public ::testing::TestWithParam<int> {
     unique_ptr<SequentialFileReader> file_reader(
         test::GetSequentialFileReader(new StringSource(reader_contents_)));
     unique_ptr<Reader> offset_reader(
-        new Reader(std::move(file_reader), &report_, true /*checksum*/,
-                   WrittenBytes() + offset_past_end));
+        new Reader(NULL, std::move(file_reader), &report_,
+                   true /*checksum*/, WrittenBytes() + offset_past_end, 123));
     Slice record;
     std::string scratch;
     ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
@@ -270,8 +270,9 @@ class LogTest : public ::testing::TestWithParam<int> {
     WriteInitialOffsetLog();
     unique_ptr<SequentialFileReader> file_reader(
         test::GetSequentialFileReader(new StringSource(reader_contents_)));
-    unique_ptr<Reader> offset_reader(new Reader(
-        std::move(file_reader), &report_, true /*checksum*/, initial_offset));
+    unique_ptr<Reader> offset_reader(
+        new Reader(NULL, std::move(file_reader), &report_,
+                   true /*checksum*/, initial_offset, 123));
     Slice record;
     std::string scratch;
     ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
index c8814e78739e4efc5436f3244583065386baa266..3317cb89cd06149e062a9960735677834f77ad9b 100644 (file)
@@ -249,8 +249,8 @@ class Repairer {
     // corruptions cause entire commits to be skipped instead of
     // propagating bad information (like overly large sequence
     // numbers).
-    log::Reader reader(std::move(lfile_reader), &reporter,
-                       true /*enable checksum*/, 0 /*initial_offset*/);
+    log::Reader reader(options_.info_log, std::move(lfile_reader), &reporter,
+                       true /*enable checksum*/, 0 /*initial_offset*/, log);
 
     // Read all the records and add to a memtable
     std::string scratch;
index 23bd6672b9bc77d3a3de28b80e836c52c287f52a..28c4490f58476851d083e91b9bcd5cd1aee7f6c1 100644 (file)
@@ -262,8 +262,10 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
     return s;
   }
   assert(file);
-  currentLogReader_.reset(new log::Reader(std::move(file), &reporter_,
-                                          read_options_.verify_checksums_, 0));
+  currentLogReader_.reset(new log::Reader(options_->info_log,
+                                         std::move(file), &reporter_,
+                                          read_options_.verify_checksums_, 0,
+                                          logFile->LogNumber()));
   return Status::OK();
 }
 }  //  namespace rocksdb
index 5d44e9fc3fb96648381371be61209f0cde992470..e7619e474aef0f75ab99ecfe73f6189da4554035 100644 (file)
@@ -2341,8 +2341,8 @@ Status VersionSet::Recover(
   {
     VersionSet::LogReporter reporter;
     reporter.status = &s;
-    log::Reader reader(std::move(manifest_file_reader), &reporter,
-                       true /*checksum*/, 0 /*initial_offset*/);
+    log::Reader reader(NULL, std::move(manifest_file_reader), &reporter,
+                       true /*checksum*/, 0 /*initial_offset*/, 0);
     Slice record;
     std::string scratch;
     while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@@ -2594,8 +2594,8 @@ Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
   column_family_names.insert({0, kDefaultColumnFamilyName});
   VersionSet::LogReporter reporter;
   reporter.status = &s;
-  log::Reader reader(std::move(file_reader), &reporter, true /*checksum*/,
-                     0 /*initial_offset*/);
+  log::Reader reader(NULL, std::move(file_reader), &reporter, true /*checksum*/,
+                     0 /*initial_offset*/, 0);
   Slice record;
   std::string scratch;
   while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@@ -2752,8 +2752,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
   {
     VersionSet::LogReporter reporter;
     reporter.status = &s;
-    log::Reader reader(std::move(file_reader), &reporter, true /*checksum*/,
-                       0 /*initial_offset*/);
+    log::Reader reader(NULL, std::move(file_reader), &reporter,
+                       true /*checksum*/, 0 /*initial_offset*/, 0);
     Slice record;
     std::string scratch;
     while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@@ -3005,7 +3005,8 @@ bool VersionSet::ManifestContains(uint64_t manifest_file_num,
     }
     file_reader.reset(new SequentialFileReader(std::move(file)));
   }
-  log::Reader reader(std::move(file_reader), nullptr, true /*checksum*/, 0);
+  log::Reader reader(NULL, std::move(file_reader), nullptr,
+                     true /*checksum*/, 0, 0);
   Slice r;
   std::string scratch;
   bool result = false;
index 37861ab45d1697129f6fe7b4ea68475a11e0fb13..58b0b16438aa112f64fe1cfff11456382ff19b77 100644 (file)
@@ -448,8 +448,8 @@ Status WalManager::ReadFirstLine(const std::string& fname,
   reporter.fname = fname.c_str();
   reporter.status = &status;
   reporter.ignore_error = !db_options_.paranoid_checks;
-  log::Reader reader(std::move(file_reader), &reporter, true /*checksum*/,
-                     0 /*initial_offset*/);
+  log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
+                     true /*checksum*/, 0 /*initial_offset*/, *sequence);
   std::string scratch;
   Slice record;
 
index a441d7167154bc6cb6791429c1c84f65a0fa60b5..fa4079e10869cad961b7cf409098d50433239878 100644 (file)
@@ -1438,7 +1438,13 @@ void DumpWalFile(std::string wal_file, bool print_header, bool print_values,
     }
   } else {
     StdErrReporter reporter;
-    log::Reader reader(move(wal_file_reader), &reporter, true, 0);
+    uint64_t log_number;
+    FileType type;
+    if (!ParseFileName(wal_file, &log_number, &type)) {
+      assert(0);
+    }
+    log::Reader reader(NULL, move(wal_file_reader), &reporter, true, 0,
+                       log_number);
     string scratch;
     WriteBatch batch;
     Slice record;