// paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
- log::Reader reader(std::move(file), &reporter, true /*checksum*/,
- 0 /*initial_offset*/);
+ log::Reader reader(&db_options_, std::move(file), &reporter,
+ true /*checksum*/,
+ 0 /*initial_offset*/, log_number);
Log(InfoLogLevel::INFO_LEVEL,
db_options_.info_log, "Recovering log #%" PRIu64 "", log_number);
#include <stdio.h>
#include "rocksdb/env.h"
+#include "rocksdb/options.h"
#include "util/coding.h"
#include "util/crc32c.h"
Reader::Reporter::~Reporter() {
}
-Reader::Reader(unique_ptr<SequentialFile>&& _file, Reporter* reporter,
- bool checksum, uint64_t initial_offset)
- : file_(std::move(_file)),
+Reader::Reader(const DBOptions *opt,
+ unique_ptr<SequentialFile>&& _file, Reporter* reporter,
+ bool checksum, uint64_t initial_offset,
+ uint32_t log_num)
+ : db_options_(opt),
+ file_(std::move(_file)),
reporter_(reporter),
checksum_(checksum),
backing_store_(new char[kBlockSize]),
eof_offset_(0),
last_record_offset_(0),
end_of_buffer_offset_(0),
- initial_offset_(initial_offset) {}
+ initial_offset_(initial_offset),
+ log_number_(log_num) {}
Reader::~Reader() {
delete[] backing_store_;
namespace rocksdb {
class SequentialFile;
+class DBOptions;
using std::unique_ptr;
namespace log {
//
// The Reader will start reading at the first record located at physical
// position >= initial_offset within the file.
- Reader(unique_ptr<SequentialFile>&& file, Reporter* reporter,
- bool checksum, uint64_t initial_offset);
+ Reader(const DBOptions *opt,
+ unique_ptr<SequentialFile>&& file, Reporter* reporter,
+ bool checksum, uint64_t initial_offset,
+ uint32_t log_num);
~Reader();
SequentialFile* file() { return file_.get(); }
private:
+ const DBOptions *db_options_;
const unique_ptr<SequentialFile> file_;
Reporter* const reporter_;
bool const checksum_;
// Offset at which to start looking for the first record to return
uint64_t const initial_offset_;
+ // which log number this is
+ uint32_t const log_number_;
+
// Extend record types with the following special values
enum {
kEof = kMaxRecordType + 1,
// corruptions cause entire commits to be skipped instead of
// propagating bad information (like overly large sequence
// numbers).
- log::Reader reader(std::move(lfile), &reporter, true /*enable checksum*/,
- 0/*initial_offset*/);
+ log::Reader reader(&options_, std::move(lfile), &reporter,
+ true /*enable checksum*/,
+ 0/*initial_offset*/, log);
// Read all the records and add to a memtable
std::string scratch;
return s;
}
assert(file);
- currentLogReader_.reset(new log::Reader(std::move(file), &reporter_,
- read_options_.verify_checksums_, 0));
+ currentLogReader_.reset(new log::Reader(options_,
+ std::move(file), &reporter_,
+ read_options_.verify_checksums_, 0,
+ logFile->LogNumber()));
return Status::OK();
}
} // namespace rocksdb
{
VersionSet::LogReporter reporter;
reporter.status = &s;
- log::Reader reader(std::move(manifest_file), &reporter, true /*checksum*/,
- 0 /*initial_offset*/);
+ log::Reader reader(db_options_, std::move(manifest_file), &reporter,
+ true /*checksum*/,
+ 0 /*initial_offset*/,
+ 0);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
column_family_names.insert({0, kDefaultColumnFamilyName});
VersionSet::LogReporter reporter;
reporter.status = &s;
- log::Reader reader(std::move(file), &reporter, true /*checksum*/,
- 0 /*initial_offset*/);
+ log::Reader reader(NULL, std::move(file), &reporter,
+ true /*checksum*/,
+ 0 /*initial_offset*/, 0);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
{
VersionSet::LogReporter reporter;
reporter.status = &s;
- log::Reader reader(std::move(file), &reporter, true/*checksum*/,
- 0/*initial_offset*/);
+ log::Reader reader(db_options_, std::move(file), &reporter, true/*checksum*/,
+ 0/*initial_offset*/, 0);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
fname.c_str());
return false;
}
- log::Reader reader(std::move(file), nullptr, true/*checksum*/, 0);
+ log::Reader reader(db_options_, std::move(file), nullptr, true/*checksum*/,
+ 0, 0);
Slice r;
std::string scratch;
bool result = false;
reporter.fname = fname.c_str();
reporter.status = &status;
reporter.ignore_error = !db_options_.paranoid_checks;
- log::Reader reader(std::move(file), &reporter, true /*checksum*/,
- 0 /*initial_offset*/);
+ log::Reader reader(&db_options_, std::move(file), &reporter, true /*checksum*/,
+ 0 /*initial_offset*/, *sequence);
std::string scratch;
Slice record;
}
} else {
StdErrReporter reporter;
- log::Reader reader(move(file), &reporter, true, 0);
+ uint64_t log_number;
+ FileType type;
+ if (!ParseFileName(wal_file, &log_number, &type)) {
+ assert(0);
+ }
+ log::Reader reader(NULL, move(file), &reporter, true, 0, log_number);
string scratch;
WriteBatch batch;
Slice record;