From: Sage Weil Date: Wed, 2 Dec 2015 21:34:29 +0000 (-0500) Subject: log reader and writer X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6569d74c032cd6289967658918bfc0ca794ad380;p=rocksdb.git log reader and writer --- diff --git a/db/db_impl.cc b/db/db_impl.cc index 22b42b18..494bb31d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1128,7 +1128,8 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // to be skipped instead of propagating bad information (like overly // large sequence numbers). log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter, - true /*checksum*/, 0 /*initial_offset*/, log_number); + true /*checksum*/, 0 /*initial_offset*/, log_number, + &db_options_); Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Recovering log #%" PRIu64 " mode %d skip-recovery %d", log_number, db_options_.wal_recovery_mode, !continue_replay_log); @@ -4258,7 +4259,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { new_log = new log::Writer(std::move(file_writer), new_log_number, db_options_.recycle_log_file_num > 0, - &db_options); + &db_options_); } } @@ -4938,7 +4939,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, impl->logs_.emplace_back( new_log_number, new log::Writer(std::move(file_writer), new_log_number, - impl->db_options_.recycle_log_file_num > 0)); + impl->db_options_.recycle_log_file_num > 0, + &impl->db_options_)); // set column family handles for (auto cf : column_families) { diff --git a/db/log_reader.cc b/db/log_reader.cc index 512dd08d..9f1b16d1 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -24,8 +24,10 @@ Reader::Reporter::~Reporter() { Reader::Reader(std::shared_ptr info_log, unique_ptr&& _file, Reporter* reporter, bool checksum, uint64_t initial_offset, - uint64_t log_num) - : info_log_(info_log), + uint64_t log_num, + const DBOptions *opt) + : db_options_(opt), + info_log_(info_log), file_(std::move(_file)), reporter_(reporter), checksum_(checksum), @@ -378,9 +380,10 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) { } // Check crc + uint32_t actual_crc; if (checksum_) { uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); - uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6); + actual_crc = crc32c::Value(header + 6, length + header_size - 6); if (actual_crc != expected_crc) { // Drop the rest of the buffer since "length" itself may have // been corrupted and if we trust it, we could find some @@ -402,6 +405,15 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) { return kBadRecord; } + if (db_options_) + Log(InfoLogLevel::INFO_LEVEL, db_options_->info_log, + "ReadPhysicalRecord: log %lld offset %lld len %d crc %d type %d", + (unsigned long long)log_number_, + (unsigned long long)(end_of_buffer_offset_ - buffer_.size() - + header_size - length), + (int)header_size + (int)length, crc32c::Mask(actual_crc), + type); + *result = Slice(header + header_size, length); return type; } diff --git a/db/log_reader.h b/db/log_reader.h index 28f0a2c1..f885c14c 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -19,6 +19,7 @@ namespace rocksdb { class SequentialFileReader; +class DBOptions; class Logger; using std::unique_ptr; @@ -56,7 +57,8 @@ class Reader { Reader(std::shared_ptr info_log, unique_ptr&& file, Reporter* reporter, bool checksum, uint64_t initial_offset, - uint64_t log_num); + uint64_t log_num, + const DBOptions *opt = NULL); ~Reader(); @@ -89,6 +91,7 @@ class Reader { SequentialFileReader* file() { return file_.get(); } private: + const DBOptions *db_options_; std::shared_ptr info_log_; const unique_ptr file_; Reporter* const reporter_; diff --git a/db/log_writer.cc b/db/log_writer.cc index 728c526b..88291c7a 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -11,6 +11,7 @@ #include #include "rocksdb/env.h" +#include "rocksdb/options.h" #include "util/coding.h" #include "util/crc32c.h" #include "util/file_reader_writer.h" @@ -124,6 +125,14 @@ Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { crc = crc32c::Mask(crc); // Adjust for storage EncodeFixed32(buf, crc); + if (db_options_) + Log(InfoLogLevel::INFO_LEVEL, db_options_->info_log, + "EmitPhysicalRecord: log %lld offset %lld len %d crc %d", + (unsigned long long)log_number_, + (unsigned long long)dest_->GetFileSize(), + (int)header_size + (int)n, + crc); + // Write the header and the payload Status s = dest_->Append(Slice(buf, header_size)); if (s.ok()) {