]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
log reader
authorSage Weil <sage@redhat.com>
Tue, 29 Sep 2015 02:17:10 +0000 (22:17 -0400)
committerSage Weil <sage@redhat.com>
Tue, 29 Sep 2015 02:17:10 +0000 (22:17 -0400)
db/db_impl.cc
db/log_reader.cc
db/log_reader.h
db/repair.cc
db/transaction_log_impl.cc
db/version_set.cc
db/wal_manager.cc
util/ldb_cmd.cc

index b524158294b8bb1d46779b596ceecadd2ca432ba..af455bea9233e908967e3d8792bb16526c2e15e2 100644 (file)
@@ -999,8 +999,9 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
     // paranoid_checks==false so that corruptions cause entire commits
     // to be skipped instead of propagating bad information (like overly
     // large sequence numbers).
-    log::Reader reader(std::move(file), &reporter, true /*checksum*/,
-                       0 /*initial_offset*/);
+    log::Reader reader(&db_options_, std::move(file), &reporter,
+                      true /*checksum*/,
+                       0 /*initial_offset*/, log_number);
     Log(InfoLogLevel::INFO_LEVEL,
         db_options_.info_log, "Recovering log #%" PRIu64 "", log_number);
 
index f6514cfd3c1289214b3329bacbc0377bbf23714a..d9f681f312af09dca5e6b191f6cc876a05994d1c 100644 (file)
@@ -11,6 +11,7 @@
 
 #include <stdio.h>
 #include "rocksdb/env.h"
+#include "rocksdb/options.h"
 #include "util/coding.h"
 #include "util/crc32c.h"
 
@@ -20,9 +21,12 @@ namespace log {
 Reader::Reporter::~Reporter() {
 }
 
-Reader::Reader(unique_ptr<SequentialFile>&& _file, Reporter* reporter,
-               bool checksum, uint64_t initial_offset)
-    : file_(std::move(_file)),
+Reader::Reader(const DBOptions *opt,
+              unique_ptr<SequentialFile>&& _file, Reporter* reporter,
+               bool checksum, uint64_t initial_offset,
+              uint32_t log_num)
+    : db_options_(opt),
+      file_(std::move(_file)),
       reporter_(reporter),
       checksum_(checksum),
       backing_store_(new char[kBlockSize]),
@@ -32,7 +36,8 @@ Reader::Reader(unique_ptr<SequentialFile>&& _file, Reporter* reporter,
       eof_offset_(0),
       last_record_offset_(0),
       end_of_buffer_offset_(0),
-      initial_offset_(initial_offset) {}
+      initial_offset_(initial_offset),
+      log_number_(log_num) {}
 
 Reader::~Reader() {
   delete[] backing_store_;
index a7cf45b4a00cb192b0df346bc8e90cc0c5b24466..f24083139424a9157a5b9371fd316acf4c7d9ede 100644 (file)
@@ -18,6 +18,7 @@
 namespace rocksdb {
 
 class SequentialFile;
+class DBOptions;
 using std::unique_ptr;
 
 namespace log {
@@ -51,8 +52,10 @@ class Reader {
   //
   // The Reader will start reading at the first record located at physical
   // position >= initial_offset within the file.
-  Reader(unique_ptr<SequentialFile>&& file, Reporter* reporter,
-         bool checksum, uint64_t initial_offset);
+  Reader(const DBOptions *opt,
+        unique_ptr<SequentialFile>&& file, Reporter* reporter,
+         bool checksum, uint64_t initial_offset,
+        uint32_t log_num);
 
   ~Reader();
 
@@ -83,6 +86,7 @@ class Reader {
   SequentialFile* file() { return file_.get(); }
 
  private:
+  const DBOptions *db_options_;
   const unique_ptr<SequentialFile> file_;
   Reporter* const reporter_;
   bool const checksum_;
@@ -103,6 +107,9 @@ class Reader {
   // Offset at which to start looking for the first record to return
   uint64_t const initial_offset_;
 
+  // which log number this is
+  uint32_t const log_number_;
+
   // Extend record types with the following special values
   enum {
     kEof = kMaxRecordType + 1,
index 355150d959d6162b37d8f3203e98208e9cbe4f8d..83614c60f304fb63916171ff9fddd3352b63c7fc 100644 (file)
@@ -246,8 +246,9 @@ class Repairer {
     // corruptions cause entire commits to be skipped instead of
     // propagating bad information (like overly large sequence
     // numbers).
-    log::Reader reader(std::move(lfile), &reporter, true /*enable checksum*/,
-                       0/*initial_offset*/);
+    log::Reader reader(&options_, std::move(lfile), &reporter,
+                      true /*enable checksum*/,
+                       0/*initial_offset*/, log);
 
     // Read all the records and add to a memtable
     std::string scratch;
index b0bf6e4e9427be9c180e2be9a3b4fa1a6ac3f620..60a1fdeb7c9f73353d5f94d9286eb7ef7e0c489f 100644 (file)
@@ -257,8 +257,10 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
     return s;
   }
   assert(file);
-  currentLogReader_.reset(new log::Reader(std::move(file), &reporter_,
-                                          read_options_.verify_checksums_, 0));
+  currentLogReader_.reset(new log::Reader(options_,
+                                         std::move(file), &reporter_,
+                                          read_options_.verify_checksums_, 0,
+                                         logFile->LogNumber()));
   return Status::OK();
 }
 }  //  namespace rocksdb
index 03ccb7957bb5e6bbbe2de421791c52c9a3cc8816..46f7f54fc13781adb39ee147cba5a7a89aa77c0f 100644 (file)
@@ -2155,8 +2155,10 @@ Status VersionSet::Recover(
   {
     VersionSet::LogReporter reporter;
     reporter.status = &s;
-    log::Reader reader(std::move(manifest_file), &reporter, true /*checksum*/,
-                       0 /*initial_offset*/);
+    log::Reader reader(db_options_, std::move(manifest_file), &reporter,
+                      true /*checksum*/,
+                       0 /*initial_offset*/,
+                      0);
     Slice record;
     std::string scratch;
     while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@@ -2401,8 +2403,9 @@ Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
   column_family_names.insert({0, kDefaultColumnFamilyName});
   VersionSet::LogReporter reporter;
   reporter.status = &s;
-  log::Reader reader(std::move(file), &reporter, true /*checksum*/,
-                     0 /*initial_offset*/);
+  log::Reader reader(NULL, std::move(file), &reporter,
+                    true /*checksum*/,
+                     0 /*initial_offset*/, 0);
   Slice record;
   std::string scratch;
   while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@@ -2554,8 +2557,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
   {
     VersionSet::LogReporter reporter;
     reporter.status = &s;
-    log::Reader reader(std::move(file), &reporter, true/*checksum*/,
-                       0/*initial_offset*/);
+    log::Reader reader(db_options_, std::move(file), &reporter, true/*checksum*/,
+                       0/*initial_offset*/, 0);
     Slice record;
     std::string scratch;
     while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@@ -2799,7 +2802,8 @@ bool VersionSet::ManifestContains(uint64_t manifest_file_num,
         fname.c_str());
     return false;
   }
-  log::Reader reader(std::move(file), nullptr, true/*checksum*/, 0);
+  log::Reader reader(db_options_, std::move(file), nullptr, true/*checksum*/,
+                    0, 0);
   Slice r;
   std::string scratch;
   bool result = false;
index 5651bae3a7f33d8c2cd0d647197b9529202a3e55..f3c2083e71f0472ebe484dcf638a3259f7afad15 100644 (file)
@@ -441,8 +441,8 @@ Status WalManager::ReadFirstLine(const std::string& fname,
   reporter.fname = fname.c_str();
   reporter.status = &status;
   reporter.ignore_error = !db_options_.paranoid_checks;
-  log::Reader reader(std::move(file), &reporter, true /*checksum*/,
-                     0 /*initial_offset*/);
+  log::Reader reader(&db_options_, std::move(file), &reporter, true /*checksum*/,
+                     0 /*initial_offset*/, *sequence);
   std::string scratch;
   Slice record;
 
index e7b29d24f534993d9a6e49dbfc872c83845456f1..7ec55c36cd5c2d5e205842b5d412d61e0022d766 100644 (file)
@@ -1415,7 +1415,12 @@ void DumpWalFile(std::string wal_file, bool print_header, bool print_values,
     }
   } else {
     StdErrReporter reporter;
-    log::Reader reader(move(file), &reporter, true, 0);
+    uint64_t log_number;
+    FileType type;
+    if (!ParseFileName(wal_file, &log_number, &type)) {
+      assert(0);
+    }
+    log::Reader reader(NULL, move(file), &reporter, true, 0, log_number);
     string scratch;
     WriteBatch batch;
     Slice record;