// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
- true /*checksum*/, 0 /*initial_offset*/, log_number);
+ true /*checksum*/, 0 /*initial_offset*/, log_number,
+ &db_options_);
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d skip-recovery %d", log_number,
db_options_.wal_recovery_mode, !continue_replay_log);
mutable_cf_options.write_buffer_size);
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(lfile), opt_env_opt));
- new_log = new log::Writer(std::move(file_writer), new_log_number,
- db_options_.recycle_log_file_num > 0);
+ new_log = new log::Writer(std::move(file_writer),
+ new_log_number,
+ db_options_.recycle_log_file_num > 0,
+ &db_options_);
}
}
impl->logs_.emplace_back(
new_log_number,
new log::Writer(std::move(file_writer), new_log_number,
- impl->db_options_.recycle_log_file_num > 0));
+ impl->db_options_.recycle_log_file_num > 0,
+ &impl->db_options_));
// set column family handles
for (auto cf : column_families) {
}
Reader::Reader(std::shared_ptr<Logger> info_log,
- unique_ptr<SequentialFileReader>&& _file, Reporter* reporter,
- bool checksum, uint64_t initial_offset, uint64_t log_num)
- : info_log_(info_log),
+ unique_ptr<SequentialFileReader>&& _file,
+ Reporter* reporter, bool checksum, uint64_t initial_offset,
+ uint64_t log_num,
+ const DBOptions *opt)
+ : db_options_(opt),
+ info_log_(info_log),
file_(std::move(_file)),
reporter_(reporter),
checksum_(checksum),
}
// Check crc
+ uint32_t actual_crc = 0;
if (checksum_) {
uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
- uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
+ actual_crc = crc32c::Value(header + 6, length + header_size - 6);
if (actual_crc != expected_crc) {
// Drop the rest of the buffer since "length" itself may have
// been corrupted and if we trust it, we could find some
return kBadRecord;
}
+ if (db_options_)
+ Log(InfoLogLevel::DEBUG_LEVEL, db_options_->info_log,
+ "ReadPhysicalRecord: log %lld offset %lld len %d crc %d type %d",
+ (unsigned long long)log_number_,
+ (unsigned long long)(end_of_buffer_offset_ - buffer_.size() -
+ header_size - length),
+ (int)header_size + (int)length, crc32c::Mask(actual_crc),
+ type);
+
*result = Slice(header + header_size, length);
return type;
}
namespace rocksdb {
class SequentialFileReader;
+class DBOptions;
class Logger;
using std::unique_ptr;
Reader(std::shared_ptr<Logger> info_log,
unique_ptr<SequentialFileReader>&& file,
Reporter* reporter, bool checksum, uint64_t initial_offset,
- uint64_t log_num);
+ uint64_t log_num,
+ const DBOptions *opt = NULL);
~Reader();
SequentialFileReader* file() { return file_.get(); }
private:
+ const DBOptions *db_options_;
std::shared_ptr<Logger> info_log_;
const unique_ptr<SequentialFileReader> file_;
Reporter* const reporter_;
#include <stdint.h>
#include "rocksdb/env.h"
+#include "rocksdb/options.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/file_reader_writer.h"
namespace log {
Writer::Writer(unique_ptr<WritableFileWriter>&& dest,
- uint64_t log_number, bool recycle_log_files)
- : dest_(std::move(dest)),
+ uint64_t log_number, bool recycle_log_files,
+ const DBOptions *opt)
+ : db_options_(opt),
+ dest_(std::move(dest)),
block_offset_(0),
log_number_(log_number),
recycle_log_files_(recycle_log_files) {
crc = crc32c::Mask(crc); // Adjust for storage
EncodeFixed32(buf, crc);
+ if (db_options_)
+ Log(InfoLogLevel::DEBUG_LEVEL, db_options_->info_log,
+ "EmitPhysicalRecord: log %lld offset %lld len %d crc %d",
+ (unsigned long long)log_number_,
+ (unsigned long long)dest_->GetFileSize(),
+ (int)header_size + (int)n,
+ crc);
+
// Write the header and the payload
Status s = dest_->Append(Slice(buf, header_size));
if (s.ok()) {
namespace rocksdb {
class WritableFileWriter;
+class DBOptions;
using std::unique_ptr;
// "*dest" must be initially empty.
// "*dest" must remain live while this Writer is in use.
explicit Writer(unique_ptr<WritableFileWriter>&& dest,
- uint64_t log_number, bool recycle_log_files);
+ uint64_t log_number, bool recycle_log_files,
+ const DBOptions *opt = NULL);
~Writer();
Status AddRecord(const Slice& slice);
const WritableFileWriter* file() const { return dest_.get(); }
private:
+ const DBOptions *db_options_;
unique_ptr<WritableFileWriter> dest_;
size_t block_offset_; // Current offset in block
uint64_t log_number_;