From: Mayank Agarwal Date: Tue, 6 Aug 2013 19:54:37 +0000 (-0700) Subject: API for getting archived log files X-Git-Tag: v2.2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8a3547d38ee32f6f0fbf3b1b117b23eca8b426fc;p=rocksdb.git API for getting archived log files Summary: Also expanded class LogFile to have startSequene and FileSize and exposed it publicly Test Plan: make all check Reviewers: dhruba, haobo Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D12087 --- diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 5fd2af86..ae64392e 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -2,10 +2,11 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include "db/db_impl.h" -#include "db/filename.h" +#include #include #include +#include "db/db_impl.h" +#include "db/filename.h" #include "db/version_set.h" #include "leveldb/db.h" #include "leveldb/env.h" @@ -66,4 +67,47 @@ Status DBImpl::GetLiveFiles(std::vector& ret, return Status::OK(); } +Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) { + // First get sorted files in archive dir, then append sorted files from main + // dir to maintain sorted order + + // list wal files in archive dir. + Status s; + std::string archivedir = ArchivalDirectory(dbname_); + if (env_->FileExists(archivedir)) { + s = AppendSortedWalsOfType(archivedir, files, kArchivedLogFile); + if (!s.ok()) { + return s; + } + } + // list wal files in main db dir. + s = AppendSortedWalsOfType(dbname_, files, kAliveLogFile); + if (!s.ok()) { + return s; + } + return s; +} + +Status DBImpl::DeleteWalFiles(const VectorLogPtr& files) { + Status s; + std::string archivedir = ArchivalDirectory(dbname_); + std::string files_not_deleted; + for (const auto& wal : files) { + /* Try deleting in archive dir. If fails, try deleting in main db dir. + * This is efficient because all except for very few wal files will be in + * archive. Checking for WalType is not much helpful because alive wal could + be archived now. + */ + if (!env_->DeleteFile(archivedir + "/" + wal->Filename()).ok() && + !env_->DeleteFile(dbname_ + "/" + wal->Filename()).ok()) { + files_not_deleted.append(wal->Filename()); + } + } + if (!files_not_deleted.empty()) { + return Status::IOError("Deleted all requested files except: " + + files_not_deleted); + } + return Status::OK(); +} + } diff --git a/db/db_impl.cc b/db/db_impl.cc index f2351037..1326148b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -27,7 +27,7 @@ #include "db/table_cache.h" #include "db/version_set.h" #include "db/write_batch_internal.h" -#include "db/transaction_log_iterator_impl.h" +#include "db/transaction_log_impl.h" #include "leveldb/compaction_filter.h" #include "leveldb/db.h" #include "leveldb/env.h" @@ -1046,34 +1046,24 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() { Status DBImpl::GetUpdatesSince(SequenceNumber seq, unique_ptr* iter) { - // Get All Log Files. - // Sort Files - // Get the first entry from each file. + if (seq > last_flushed_sequence_) { + return Status::IOError("Requested sequence not yet written in the db"); + } + // Get all sorted Wal Files. // Do binary search and open files and find the seq number. - std::vector walFiles; - // list wal files in main db dir. - Status s = ListAllWALFiles(dbname_, &walFiles, kAliveLogFile); + std::unique_ptr wal_files(new VectorLogPtr); + Status s = GetSortedWalFiles(*wal_files); if (!s.ok()) { return s; } - // list wal files in archive dir. - std::string archivedir = ArchivalDirectory(dbname_); - if (env_->FileExists(archivedir)) { - s = ListAllWALFiles(archivedir, &walFiles, kArchivedLogFile); - if (!s.ok()) { - return s; - } - } - if (walFiles.empty()) { + if (wal_files->empty()) { return Status::IOError(" NO WAL Files present in the db"); } // std::shared_ptr would have been useful here. - std::unique_ptr> probableWALFiles( - new std::vector()); - s = FindProbableWALFiles(&walFiles, probableWALFiles.get(), seq); + s = RetainProbableWalFiles(*wal_files, seq); if (!s.ok()) { return s; } @@ -1082,90 +1072,61 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, &options_, storage_options_, seq, - std::move(probableWALFiles), + std::move(wal_files), &last_flushed_sequence_)); iter->get()->Next(); return iter->get()->status(); } -Status DBImpl::FindProbableWALFiles(std::vector* const allLogs, - std::vector* const result, - const SequenceNumber target) { - assert(allLogs != nullptr); - assert(result != nullptr); - - std::sort(allLogs->begin(), allLogs->end()); +Status DBImpl::RetainProbableWalFiles(VectorLogPtr& all_logs, + const SequenceNumber target) { long start = 0; // signed to avoid overflow when target is < first file. - long end = static_cast(allLogs->size()) - 1; + long end = static_cast(all_logs.size()) - 1; // Binary Search. avoid opening all files. while (end >= start) { long mid = start + (end - start) / 2; // Avoid overflow. - WriteBatch batch; - Status s = ReadFirstRecord(allLogs->at(mid), &batch); - if (!s.ok()) { - if (CheckFileExistsAndEmpty(allLogs->at(mid))) { - allLogs->erase(allLogs->begin() + mid); - --end; - continue; - } - return s; - } - SequenceNumber currentSeqNum = WriteBatchInternal::Sequence(&batch); - if (currentSeqNum == target) { - start = mid; + SequenceNumber current_seq_num = all_logs.at(mid)->StartSequence(); + if (current_seq_num == target) { end = mid; break; - } else if (currentSeqNum < target) { + } else if (current_seq_num < target) { start = mid + 1; } else { end = mid - 1; } } - size_t startIndex = std::max(0l, end); // end could be -ve. - for (size_t i = startIndex; i < allLogs->size(); ++i) { - result->push_back(allLogs->at(i)); - } - if (result->empty()) { - return Status::IOError( - "No probable files. Check if the db contains log files"); - } + size_t start_index = std::max(0l, end); // end could be -ve. + // The last wal file is always included + all_logs.erase(all_logs.begin(), all_logs.begin() + start_index); return Status::OK(); } -bool DBImpl::CheckFileExistsAndEmpty(const LogFile& file) { - if (file.type == kAliveLogFile) { - const std::string fname = LogFileName(dbname_, file.logNumber); - uint64_t file_size; - Status s = env_->GetFileSize(fname, &file_size); - if (s.ok() && file_size == 0) { - return true; - } - } - const std::string fname = ArchivedLogFileName(dbname_, file.logNumber); +bool DBImpl::CheckWalFileExistsAndEmpty(const WalFileType type, + const uint64_t number) { + const std::string fname = (type == kAliveLogFile) ? + LogFileName(dbname_, number) : ArchivedLogFileName(dbname_, number); uint64_t file_size; Status s = env_->GetFileSize(fname, &file_size); - if (s.ok() && file_size == 0) { - return true; - } - return false; + return (s.ok() && (file_size == 0)); } -Status DBImpl::ReadFirstRecord(const LogFile& file, WriteBatch* const result) { +Status DBImpl::ReadFirstRecord(const WalFileType type, const uint64_t number, + WriteBatch* const result) { - if (file.type == kAliveLogFile) { - std::string fname = LogFileName(dbname_, file.logNumber); + if (type == kAliveLogFile) { + std::string fname = LogFileName(dbname_, number); Status status = ReadFirstLine(fname, result); if (!status.ok()) { // check if the file got moved to archive. - std::string archivedFile = ArchivedLogFileName(dbname_, file.logNumber); - Status s = ReadFirstLine(archivedFile, result); + std::string archived_file = ArchivedLogFileName(dbname_, number); + Status s = ReadFirstLine(archived_file, result); if (!s.ok()) { - return Status::IOError("Log File Has been deleted"); + return Status::IOError("Log File has been deleted"); } } return Status::OK(); - } else if (file.type == kArchivedLogFile) { - std::string fname = ArchivedLogFileName(dbname_, file.logNumber); + } else if (type == kArchivedLogFile) { + std::string fname = ArchivedLogFileName(dbname_, number); Status status = ReadFirstLine(fname, result); return status; } @@ -1204,6 +1165,7 @@ Status DBImpl::ReadFirstLine(const std::string& fname, 0/*initial_offset*/); std::string scratch; Slice record; + if (reader.ReadRecord(&record, &scratch) && status.ok()) { if (record.size() < 12) { reporter.Corruption( @@ -1217,22 +1179,49 @@ Status DBImpl::ReadFirstLine(const std::string& fname, return Status::IOError("Error reading from file " + fname); } -Status DBImpl::ListAllWALFiles(const std::string& path, - std::vector* const logFiles, - WalFileType logType) { - assert(logFiles != nullptr); - std::vector allFiles; - const Status status = env_->GetChildren(path, &allFiles); +struct CompareLogByPointer { + bool operator() (const unique_ptr& a, + const unique_ptr& b) { + LogFileImpl* a_impl = dynamic_cast(a.get()); + LogFileImpl* b_impl = dynamic_cast(b.get()); + return *a_impl < *b_impl; + } +}; + +Status DBImpl::AppendSortedWalsOfType(const std::string& path, + VectorLogPtr& log_files, WalFileType log_type) { + std::vector all_files; + const Status status = env_->GetChildren(path, &all_files); if (!status.ok()) { return status; } - for (const auto& f : allFiles) { + log_files.reserve(log_files.size() + all_files.size()); + for (const auto& f : all_files) { uint64_t number; FileType type; if (ParseFileName(f, &number, &type) && type == kLogFile){ - logFiles->push_back(LogFile(number, logType)); + + WriteBatch batch; + Status s = ReadFirstRecord(log_type, number, &batch); + if (!s.ok()) { + if (CheckWalFileExistsAndEmpty(log_type, number)) { + continue; + } + return s; + } + + uint64_t size_bytes; + s = env_->GetFileSize(LogFileName(path, number), &size_bytes); + if (!s.ok()) { + return s; + } + + log_files.push_back(std::move(unique_ptr(new LogFileImpl( + number, log_type, WriteBatchInternal::Sequence(&batch), size_bytes)))); } } + CompareLogByPointer compare_log_files; + std::sort(log_files.begin(), log_files.end(), compare_log_files); return status; } diff --git a/db/db_impl.h b/db/db_impl.h index c7f90b3c..cea8041e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -10,15 +10,15 @@ #include #include #include "db/dbformat.h" -#include "db/log_file.h" #include "db/log_writer.h" #include "db/snapshot.h" #include "leveldb/db.h" #include "leveldb/env.h" +#include "leveldb/memtablerep.h" +#include "leveldb/transaction_log.h" #include "port/port.h" #include "util/stats_logger.h" #include "memtablelist.h" -#include "leveldb/memtablerep.h" #ifdef USE_SCRIBE #include "scribe/scribe_logger.h" @@ -73,6 +73,8 @@ class DBImpl : public DB { virtual Status EnableFileDeletions(); virtual Status GetLiveFiles(std::vector&, uint64_t* manifest_file_size); + virtual Status GetSortedWalFiles(VectorLogPtr& files); + virtual Status DeleteWalFiles(const VectorLogPtr& files); virtual SequenceNumber GetLatestSequenceNumber(); virtual Status GetUpdatesSince(SequenceNumber seq_number, unique_ptr* iter); @@ -183,7 +185,7 @@ class DBImpl : public DB { void MaybeScheduleCompaction(); static void BGWork(void* db); void BackgroundCall(); - Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state); + Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state); void CleanupCompaction(CompactionState* compact); Status DoCompactionWork(CompactionState* compact); @@ -208,19 +210,21 @@ class DBImpl : public DB { void PurgeObsoleteWALFiles(); - Status ListAllWALFiles(const std::string& path, - std::vector* logFiles, - WalFileType type); + Status AppendSortedWalsOfType(const std::string& path, + VectorLogPtr& log_files, + WalFileType type); - // Find's all the log files which contain updates with seq no. - // Greater Than or Equal to the requested SequenceNumber - Status FindProbableWALFiles(std::vector* const allLogs, - std::vector* const result, - const SequenceNumber target); + // Requires: all_logs should be sorted with earliest log file first + // Retains all log files in all_logs which contain updates with seq no. + // Greater Than or Equal to the requested SequenceNumber. + Status RetainProbableWalFiles(VectorLogPtr& all_logs, + const SequenceNumber target); // return true if - bool CheckFileExistsAndEmpty(const LogFile& file); + bool CheckWalFileExistsAndEmpty(const WalFileType type, + const uint64_t number); - Status ReadFirstRecord(const LogFile& file, WriteBatch* const result); + Status ReadFirstRecord(const WalFileType type, const uint64_t number, + WriteBatch* const result); Status ReadFirstLine(const std::string& fname, WriteBatch* const batch); diff --git a/db/db_test.cc b/db/db_test.cc index 8cd1dda6..a087edf2 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3461,6 +3461,14 @@ class ModelDB: public DB { return Status::OK(); } + virtual Status GetSortedWalFiles(VectorLogPtr& files) { + return Status::OK(); + } + + virtual Status DeleteWalFiles(const VectorLogPtr& files) { + return Status::OK(); + } + virtual SequenceNumber GetLatestSequenceNumber() { return 0; } diff --git a/db/filename.cc b/db/filename.cc index 81a14215..794410cf 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -46,10 +46,13 @@ extern Status WriteStringToFileSync(Env* env, const Slice& data, static std::string MakeFileName(const std::string& name, uint64_t number, const char* suffix) { char buf[100]; - snprintf(buf, sizeof(buf), "/%06llu.%s", + snprintf(buf, sizeof(buf), "%06llu.%s", static_cast(number), suffix); - return name + buf; + if (name.empty()) { + return buf; + } + return name + "/" + buf; } std::string LogFileName(const std::string& name, uint64_t number) { diff --git a/db/log_file.h b/db/log_file.h deleted file mode 100644 index da7808e6..00000000 --- a/db/log_file.h +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2008-present Facebook. All Rights Reserved. - -#ifndef STORAGE_LEVELDB_DB_LOG_FILE_H_ -#define STORAGE_LEVELDB_DB_LOG_FILE_H_ - -namespace leveldb { - -enum WalFileType { - kArchivedLogFile = 0, - kAliveLogFile = 1 -} ; - -class LogFile { - - public: - uint64_t logNumber; - WalFileType type; - - LogFile(uint64_t logNum,WalFileType logType) : - logNumber(logNum), - type(logType) {} - - LogFile(const LogFile& that) { - logNumber = that.logNumber; - type = that.type; - } - - bool operator < (const LogFile& that) const { - return logNumber < that.logNumber; - } - - std::string ToString() const { - char response[100]; - const char* typeOfLog; - if (type == kAliveLogFile) { - typeOfLog = "Alive Log"; - } else { - typeOfLog = "Archived Log"; - } - sprintf(response, - "LogNumber : %ld LogType : %s", - logNumber, - typeOfLog); - return std::string(response); - } -}; -} // namespace leveldb -#endif // STORAGE_LEVELDB_DB_LOG_FILE_H_ diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc new file mode 100644 index 00000000..6de0bd9c --- /dev/null +++ b/db/transaction_log_impl.cc @@ -0,0 +1,171 @@ +#include "db/transaction_log_impl.h" +#include "db/write_batch_internal.h" + +namespace leveldb { + +TransactionLogIteratorImpl::TransactionLogIteratorImpl( + const std::string& dbname, + const Options* options, + const EnvOptions& soptions, + const SequenceNumber seq, + std::unique_ptr files, + SequenceNumber const * const lastFlushedSequence) : + dbname_(dbname), + options_(options), + soptions_(soptions), + startingSequenceNumber_(seq), + files_(std::move(files)), + started_(false), + isValid_(false), + currentFileIndex_(0), + lastFlushedSequence_(lastFlushedSequence) { + assert(startingSequenceNumber_ <= *lastFlushedSequence_); + assert(files_.get() != nullptr); + + reporter_.env = options_->env; + reporter_.info_log = options_->info_log.get(); +} + +Status TransactionLogIteratorImpl::OpenLogFile( + const LogFile* logFile, + unique_ptr* file) { + Env* env = options_->env; + if (logFile->Type() == kArchivedLogFile) { + std::string fname = ArchivedLogFileName(dbname_, logFile->LogNumber()); + return env->NewSequentialFile(fname, file, soptions_); + } else { + std::string fname = LogFileName(dbname_, logFile->LogNumber()); + Status status = env->NewSequentialFile(fname, file, soptions_); + if (!status.ok()) { + // If cannot open file in DB directory. + // Try the archive dir, as it could have moved in the meanwhile. + fname = ArchivedLogFileName(dbname_, logFile->LogNumber()); + status = env->NewSequentialFile(fname, file, soptions_); + if (!status.ok()) { + return Status::IOError(" Requested file not present in the dir"); + } + } + return status; + } +} + +BatchResult TransactionLogIteratorImpl::GetBatch() { + assert(isValid_); // cannot call in a non valid state. + BatchResult result; + result.sequence = currentSequence_; + result.writeBatchPtr = std::move(currentBatch_); + return result; +} + +Status TransactionLogIteratorImpl::status() { + return currentStatus_; +} + +bool TransactionLogIteratorImpl::Valid() { + return started_ && isValid_; +} + +void TransactionLogIteratorImpl::Next() { + LogFile* currentLogFile = files_.get()->at(currentFileIndex_).get(); + +// First seek to the given seqNo. in the current file. + std::string scratch; + Slice record; + if (!started_) { + started_ = true; // this piece only runs onced. + isValid_ = false; + if (startingSequenceNumber_ > *lastFlushedSequence_) { + currentStatus_ = Status::IOError("Looking for a sequence, " + "which is not flushed yet."); + return; + } + Status s = OpenLogReader(currentLogFile); + if (!s.ok()) { + currentStatus_ = s; + isValid_ = false; + return; + } + while (currentLogReader_->ReadRecord(&record, &scratch)) { + if (record.size() < 12) { + reporter_.Corruption( + record.size(), Status::Corruption("log record too small")); + continue; + } + UpdateCurrentWriteBatch(record); + if (currentSequence_ >= startingSequenceNumber_) { + assert(currentSequence_ <= *lastFlushedSequence_); + isValid_ = true; + break; + } else { + isValid_ = false; + } + } + if (isValid_) { + // Done for this iteration + return; + } + } + bool openNextFile = true; + while(openNextFile) { + assert(currentLogReader_); + if (currentSequence_ < *lastFlushedSequence_) { + if (currentLogReader_->IsEOF()) { + currentLogReader_->UnmarkEOF(); + } + while (currentLogReader_->ReadRecord(&record, &scratch)) { + if (record.size() < 12) { + reporter_.Corruption( + record.size(), Status::Corruption("log record too small")); + continue; + } else { + UpdateCurrentWriteBatch(record); + openNextFile = false; + break; + } + } + } + + if (openNextFile) { + if (currentFileIndex_ < files_.get()->size() - 1) { + ++currentFileIndex_; + Status status =OpenLogReader(files_.get()->at(currentFileIndex_).get()); + if (!status.ok()) { + isValid_ = false; + currentStatus_ = status; + return; + } + } else { + isValid_ = false; + openNextFile = false; + if (currentSequence_ == *lastFlushedSequence_) { + currentStatus_ = Status::OK(); + } else { + currentStatus_ = Status::IOError(" NO MORE DATA LEFT"); + } + } + } + } +} + +void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { + WriteBatch* batch = new WriteBatch(); + WriteBatchInternal::SetContents(batch, record); + currentSequence_ = WriteBatchInternal::Sequence(batch); + currentBatch_.reset(batch); + isValid_ = true; + currentStatus_ = Status::OK(); +} + +Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) { + unique_ptr file; + Status status = OpenLogFile(logFile, &file); + if (!status.ok()) { + return status; + } + assert(file); + currentLogReader_.reset( + new log::Reader(std::move(file), &reporter_, true, 0) + ); + return Status::OK(); +} +} // namespace leveldb diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h new file mode 100644 index 00000000..27ab1936 --- /dev/null +++ b/db/transaction_log_impl.h @@ -0,0 +1,98 @@ +// Copyright 2008-present Facebook. All Rights Reserved. +#ifndef STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_ +#define STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_ + +#include + +#include "leveldb/env.h" +#include "leveldb/options.h" +#include "leveldb/types.h" +#include "leveldb/transaction_log.h" +#include "db/log_reader.h" +#include "db/filename.h" + +namespace leveldb { + +struct LogReporter : public log::Reader::Reporter { + Env* env; + Logger* info_log; + virtual void Corruption(size_t bytes, const Status& s) { + Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str()); + } +}; + +class LogFileImpl : public LogFile { + public: + LogFileImpl(uint64_t logNum, WalFileType logType, SequenceNumber startSeq, + uint64_t sizeBytes) : + logNumber_(logNum), + type_(logType), + startSequence_(startSeq), + sizeFileBytes_(sizeBytes) { + } + + std::string Filename() const { return LogFileName("", logNumber_); } + + uint64_t LogNumber() const { return logNumber_; } + + WalFileType Type() const { return type_; } + + SequenceNumber StartSequence() const { return startSequence_; } + + uint64_t SizeFileBytes() const { return sizeFileBytes_; } + + bool operator < (const LogFile& that) const { + return LogNumber() < that.LogNumber(); + } + + private: + uint64_t logNumber_; + WalFileType type_; + SequenceNumber startSequence_; + uint64_t sizeFileBytes_; + +}; + +class TransactionLogIteratorImpl : public TransactionLogIterator { + public: + TransactionLogIteratorImpl(const std::string& dbname, + const Options* options, + const EnvOptions& soptions, + const SequenceNumber seqNum, + std::unique_ptr files, + SequenceNumber const * const lastFlushedSequence); + + virtual bool Valid(); + + virtual void Next(); + + virtual Status status(); + + virtual BatchResult GetBatch(); + + private: + const std::string& dbname_; + const Options* options_; + const EnvOptions& soptions_; + const SequenceNumber startingSequenceNumber_; + std::unique_ptr files_; + bool started_; + bool isValid_; // not valid when it starts of. + Status currentStatus_; + size_t currentFileIndex_; + std::unique_ptr currentBatch_; + unique_ptr currentLogReader_; + Status OpenLogFile(const LogFile* logFile, unique_ptr* file); + LogReporter reporter_; + SequenceNumber const * const lastFlushedSequence_; + // represents the sequence number being read currently. + SequenceNumber currentSequence_; + + void UpdateCurrentWriteBatch(const Slice& record); + Status OpenLogReader(const LogFile* file); +}; + + + +} // namespace leveldb +#endif // STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_ diff --git a/db/transaction_log_iterator_impl.cc b/db/transaction_log_iterator_impl.cc deleted file mode 100644 index f8e4f8b2..00000000 --- a/db/transaction_log_iterator_impl.cc +++ /dev/null @@ -1,172 +0,0 @@ -#include "db/transaction_log_iterator_impl.h" -#include "db/write_batch_internal.h" -#include "db/filename.h" - -namespace leveldb { - -TransactionLogIteratorImpl::TransactionLogIteratorImpl( - const std::string& dbname, - const Options* options, - const EnvOptions& soptions, - SequenceNumber& seq, - std::unique_ptr> files, - SequenceNumber const * const lastFlushedSequence) : - dbname_(dbname), - options_(options), - soptions_(soptions), - startingSequenceNumber_(seq), - files_(std::move(files)), - started_(false), - isValid_(false), - currentFileIndex_(0), - lastFlushedSequence_(lastFlushedSequence) { - assert(files_.get() != nullptr); - assert(lastFlushedSequence_); - - reporter_.env = options_->env; - reporter_.info_log = options_->info_log.get(); -} - -Status TransactionLogIteratorImpl::OpenLogFile( - const LogFile& logFile, - unique_ptr* file) { - Env* env = options_->env; - if (logFile.type == kArchivedLogFile) { - std::string fname = ArchivedLogFileName(dbname_, logFile.logNumber); - return env->NewSequentialFile(fname, file, soptions_); - } else { - std::string fname = LogFileName(dbname_, logFile.logNumber); - Status status = env->NewSequentialFile(fname, file, soptions_); - if (!status.ok()) { - // If cannot open file in DB directory. - // Try the archive dir, as it could have moved in the meanwhile. - fname = ArchivedLogFileName(dbname_, logFile.logNumber); - status = env->NewSequentialFile(fname, file, soptions_); - if (!status.ok()) { - return Status::IOError(" Requested file not present in the dir"); - } - } - return status; - } -} - -BatchResult TransactionLogIteratorImpl::GetBatch() { - assert(isValid_); // cannot call in a non valid state. - BatchResult result; - result.sequence = currentSequence_; - result.writeBatchPtr = std::move(currentBatch_); - return result; -} - -Status TransactionLogIteratorImpl::status() { - return currentStatus_; -} - -bool TransactionLogIteratorImpl::Valid() { - return started_ && isValid_; -} - -void TransactionLogIteratorImpl::Next() { - LogFile currentLogFile = files_.get()->at(currentFileIndex_); - -// First seek to the given seqNo. in the current file. - std::string scratch; - Slice record; - if (!started_) { - started_ = true; // this piece only runs onced. - isValid_ = false; - if (startingSequenceNumber_ > *lastFlushedSequence_) { - currentStatus_ = Status::IOError("Looking for a sequence, " - "which is not flushed yet."); - return; - } - Status s = OpenLogReader(currentLogFile); - if (!s.ok()) { - currentStatus_ = s; - isValid_ = false; - return; - } - while (currentLogReader_->ReadRecord(&record, &scratch)) { - if (record.size() < 12) { - reporter_.Corruption( - record.size(), Status::Corruption("log record too small")); - continue; - } - UpdateCurrentWriteBatch(record); - if (currentSequence_ >= startingSequenceNumber_) { - assert(currentSequence_ <= *lastFlushedSequence_); - isValid_ = true; - break; - } else { - isValid_ = false; - } - } - if (isValid_) { - // Done for this iteration - return; - } - } - bool openNextFile = true; - while(openNextFile) { - assert(currentLogReader_); - if (currentSequence_ < *lastFlushedSequence_) { - if (currentLogReader_->IsEOF()) { - currentLogReader_->UnmarkEOF(); - } - while (currentLogReader_->ReadRecord(&record, &scratch)) { - if (record.size() < 12) { - reporter_.Corruption( - record.size(), Status::Corruption("log record too small")); - continue; - } else { - UpdateCurrentWriteBatch(record); - openNextFile = false; - break; - } - } - } - - if (openNextFile) { - if (currentFileIndex_ < files_.get()->size() - 1) { - ++currentFileIndex_; - Status status = OpenLogReader(files_.get()->at(currentFileIndex_)); - if (!status.ok()) { - isValid_ = false; - currentStatus_ = status; - return; - } - } else { - isValid_ = false; - openNextFile = false; - if (currentSequence_ == *lastFlushedSequence_) { - currentStatus_ = Status::OK(); - } else { - currentStatus_ = Status::IOError(" NO MORE DATA LEFT"); - } - } - } - } -} - -void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { - WriteBatch* batch = new WriteBatch(); - WriteBatchInternal::SetContents(batch, record); - currentSequence_ = WriteBatchInternal::Sequence(batch); - currentBatch_.reset(batch); - isValid_ = true; - currentStatus_ = Status::OK(); -} - -Status TransactionLogIteratorImpl::OpenLogReader(const LogFile& logFile) { - unique_ptr file; - Status status = OpenLogFile(logFile, &file); - if (!status.ok()) { - return status; - } - assert(file); - currentLogReader_.reset( - new log::Reader(std::move(file), &reporter_, true, 0) - ); - return Status::OK(); -} -} // namespace leveldb diff --git a/db/transaction_log_iterator_impl.h b/db/transaction_log_iterator_impl.h deleted file mode 100644 index faf07f43..00000000 --- a/db/transaction_log_iterator_impl.h +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2008-present Facebook. All Rights Reserved. -#ifndef STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_ -#define STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_ - -#include - -#include "leveldb/env.h" -#include "leveldb/options.h" -#include "leveldb/types.h" -#include "leveldb/transaction_log_iterator.h" -#include "db/log_file.h" -#include "db/log_reader.h" - -namespace leveldb { - -struct LogReporter : public log::Reader::Reporter { - Env* env; - Logger* info_log; - virtual void Corruption(size_t bytes, const Status& s) { - Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str()); - } -}; - -class TransactionLogIteratorImpl : public TransactionLogIterator { - public: - TransactionLogIteratorImpl(const std::string& dbname, - const Options* options, - const EnvOptions& soptions, - SequenceNumber& seqNum, - std::unique_ptr> files, - SequenceNumber const * const lastFlushedSequence); - - virtual bool Valid(); - - virtual void Next(); - - virtual Status status(); - - virtual BatchResult GetBatch(); - - private: - const std::string& dbname_; - const Options* options_; - const EnvOptions& soptions_; - const uint64_t startingSequenceNumber_; - std::unique_ptr> files_; - bool started_; - bool isValid_; // not valid when it starts of. - Status currentStatus_; - size_t currentFileIndex_; - std::unique_ptr currentBatch_; - unique_ptr currentLogReader_; - Status OpenLogFile(const LogFile& logFile, unique_ptr* file); - LogReporter reporter_; - SequenceNumber const * const lastFlushedSequence_; - // represents the sequence number being read currently. - SequenceNumber currentSequence_; - - void UpdateCurrentWriteBatch(const Slice& record); - Status OpenLogReader(const LogFile& file); -}; - - - -} // namespace leveldb -#endif // STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_ diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 0c056c36..d0316ee4 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -12,7 +12,7 @@ #include "leveldb/iterator.h" #include "leveldb/options.h" #include "leveldb/types.h" -#include "leveldb/transaction_log_iterator.h" +#include "leveldb/transaction_log.h" namespace leveldb { @@ -232,6 +232,14 @@ class DB { virtual Status GetLiveFiles(std::vector&, uint64_t* manifest_file_size) = 0; + // Retrieve the sorted list of all wal files with earliest file first + virtual Status GetSortedWalFiles(VectorLogPtr& files) = 0; + + // Delete wal files in files. These can be either live or archived. + // Returns Status::OK if all files could be deleted, otherwise Status::IOError + // which contains information about files that could not be deleted. + virtual Status DeleteWalFiles(const VectorLogPtr& files) = 0; + // The sequence number of the most recent transaction. virtual SequenceNumber GetLatestSequenceNumber() = 0; diff --git a/include/leveldb/transaction_log.h b/include/leveldb/transaction_log.h new file mode 100644 index 00000000..a834afa8 --- /dev/null +++ b/include/leveldb/transaction_log.h @@ -0,0 +1,78 @@ +// Copyright 2008-present Facebook. All Rights Reserved. +#ifndef STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_ +#define STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_ + +#include "leveldb/status.h" +#include "leveldb/types.h" +#include "leveldb/write_batch.h" + +namespace leveldb { + +class LogFile; +typedef std::vector> VectorLogPtr; + +enum WalFileType { + /* Indicates that WAL file is in archive directory. WAL files are moved from + * the main db directory to archive directory once they are not live and stay + * there for a duration of WAL_ttl_seconds which can be set in Options + */ + kArchivedLogFile = 0, + + /* Indicates that WAL file is live and resides in the main db directory */ + kAliveLogFile = 1 +} ; + +class LogFile { + public: + LogFile() {} + virtual ~LogFile() {} + + // Returns log file's name excluding the db path + virtual std::string Filename() const = 0; + + // Primary identifier for log file. + // This is directly proportional to creation time of the log file + virtual uint64_t LogNumber() const = 0; + + // Log file can be either alive or archived + virtual WalFileType Type() const = 0; + + // Starting sequence number of writebatch written in this log file + virtual SequenceNumber StartSequence() const = 0; + + // Size of log file on disk in Bytes + virtual uint64_t SizeFileBytes() const = 0; +}; + +struct BatchResult { + SequenceNumber sequence; + std::unique_ptr writeBatchPtr; +}; + +// A TransactionLogIterator is used to iterate over the Transaction's in a db. +class TransactionLogIterator { + public: + TransactionLogIterator() {} + virtual ~TransactionLogIterator() {} + + // An iterator is either positioned at a WriteBatch or not valid. + // This method returns true if the iterator is valid. + // Can read data from a valid iterator. + virtual bool Valid() = 0; + + // Moves the iterator to the next WriteBatch. + // REQUIRES: Valid() to be true. + virtual void Next() = 0; + + // Return's ok if the iterator is valid. + // Return the Error when something has gone wrong. + virtual Status status() = 0; + + // If valid return's the current write_batch and the sequence number of the + // latest transaction contained in the batch. + // ONLY use if Valid() is true and status() is OK. + virtual BatchResult GetBatch() = 0; +}; +} // namespace leveldb + +#endif // STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_ diff --git a/include/leveldb/transaction_log_iterator.h b/include/leveldb/transaction_log_iterator.h deleted file mode 100644 index d755d829..00000000 --- a/include/leveldb/transaction_log_iterator.h +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2008-present Facebook. All Rights Reserved. -#ifndef STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_ -#define STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_ - -#include "leveldb/status.h" -#include "leveldb/write_batch.h" - -namespace leveldb { - - -struct BatchResult { - SequenceNumber sequence; - std::unique_ptr writeBatchPtr; -}; - -// A TransactionLogIterator is used to iterate over the Transaction's in a db. -class TransactionLogIterator { - public: - TransactionLogIterator() {} - virtual ~TransactionLogIterator() {} - - // An iterator is either positioned at a WriteBatch or not valid. - // This method returns true if the iterator is valid. - // Can read data from a valid iterator. - virtual bool Valid() = 0; - - // Moves the iterator to the next WriteBatch. - // REQUIRES: Valid() to be true. - virtual void Next() = 0; - - // Return's ok if the iterator is valid. - // Return the Error when something has gone wrong. - virtual Status status() = 0; - - // If valid return's the current write_batch and the sequence number of the - // latest transaction contained in the batch. - // ONLY use if Valid() is true and status() is OK. - virtual BatchResult GetBatch() = 0; -}; -} // namespace leveldb - -#endif // STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_ diff --git a/include/leveldb/types.h b/include/leveldb/types.h index 6ec058c2..1d910946 100644 --- a/include/leveldb/types.h +++ b/include/leveldb/types.h @@ -7,7 +7,7 @@ namespace leveldb { // Define all public custom types here. -// Represents a sequence number in a WAL file. +// Represents a sequence number in a WAL file. typedef uint64_t SequenceNumber; } // namespace leveldb diff --git a/include/utilities/stackable_db.h b/include/utilities/stackable_db.h index 916496e0..08930d19 100644 --- a/include/utilities/stackable_db.h +++ b/include/utilities/stackable_db.h @@ -139,6 +139,15 @@ class StackableDB : public DB { return sdb_->GetLatestSequenceNumber(); } + virtual Status GetSortedWalFiles(VectorLogPtr& files) override { + return sdb_->GetSortedWalFiles(files); + } + + virtual Status DeleteWalFiles(const VectorLogPtr& files) + override{ + return sdb_->DeleteWalFiles(files); + } + virtual Status GetUpdatesSince(SequenceNumber seq_number, unique_ptr* iter) override { diff --git a/tools/db_repl_stress.cc b/tools/db_repl_stress.cc index 012a3df0..010fa573 100644 --- a/tools/db_repl_stress.cc +++ b/tools/db_repl_stress.cc @@ -1,4 +1,3 @@ - #include #include "db/write_batch_internal.h" @@ -19,7 +18,6 @@ using namespace leveldb; struct DataPumpThread { size_t no_records; DB* db; // Assumption DB is Open'ed already. - volatile bool is_running; }; static std::string RandomString(Random* rnd, int len) { @@ -33,59 +31,50 @@ static void DataPumpThreadBody(void* arg) { DB* db = t->db; Random rnd(301); size_t i = 0; - t->is_running = true; - while( i < t->no_records ) { - db->Put(WriteOptions(), - Slice(RandomString(&rnd, 50)), - Slice(RandomString(&rnd, 500))); - ++i; + while(i++ < t->no_records) { + if(!db->Put(WriteOptions(), Slice(RandomString(&rnd, 500)), + Slice(RandomString(&rnd, 500))).ok()) { + fprintf(stderr, "Error in put\n"); + exit(1); + } } - t->is_running = false; } struct ReplicationThread { port::AtomicPointer stop; DB* db; - volatile SequenceNumber latest; volatile size_t no_read; - volatile bool has_more; }; static void ReplicationThreadBody(void* arg) { ReplicationThread* t = reinterpret_cast(arg); DB* db = t->db; unique_ptr iter; - SequenceNumber currentSeqNum = 0; + SequenceNumber currentSeqNum = 1; while (t->stop.Acquire_Load() != nullptr) { - if (!iter) { - db->GetUpdatesSince(currentSeqNum, &iter); - fprintf(stdout, "Refreshing iterator\n"); - iter->Next(); - while(iter->Valid()) { - BatchResult res = iter->GetBatch(); - if (res.sequence != currentSeqNum +1 - && res.sequence != currentSeqNum) { - fprintf(stderr, - "Missed a seq no. b/w %ld and %ld\n", - currentSeqNum, - res.sequence); - exit(1); - } - currentSeqNum = res.sequence; - t->latest = res.sequence; - iter->Next(); - t->no_read++; + iter.reset(); + Status s; + while(!db->GetUpdatesSince(currentSeqNum, &iter).ok()) { + if (t->stop.Acquire_Load() == nullptr) { + return; + } + } + fprintf(stderr, "Refreshing iterator\n"); + for(;iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) { + BatchResult res = iter->GetBatch(); + if (res.sequence != currentSeqNum) { + fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n", currentSeqNum, + res.sequence); + exit(1); } } - iter.reset(); } } - int main(int argc, const char** argv) { - long FLAGS_num_inserts = 1000; - long FLAGS_WAL_ttl_seconds = 1000; + uint64_t FLAGS_num_inserts = 1000; + uint64_t FLAGS_WAL_ttl_seconds = 1000; char junk; long l; @@ -108,36 +97,34 @@ int main(int argc, const char** argv) { options.create_if_missing = true; options.WAL_ttl_seconds = FLAGS_WAL_ttl_seconds; DB* db; + DestroyDB(default_db_path, options); Status s = DB::Open(options, default_db_path, &db); if (!s.ok()) { fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str()); + exit(1); } DataPumpThread dataPump; dataPump.no_records = FLAGS_num_inserts; dataPump.db = db; - dataPump.is_running = true; env->StartThread(DataPumpThreadBody, &dataPump); ReplicationThread replThread; replThread.db = db; replThread.no_read = 0; - replThread.has_more = true; replThread.stop.Release_Store(env); // store something to make it non-null. env->StartThread(ReplicationThreadBody, &replThread); - while(dataPump.is_running) { - continue; - } + while(replThread.no_read < FLAGS_num_inserts); replThread.stop.Release_Store(nullptr); - if ( replThread.no_read < dataPump.no_records ) { + if (replThread.no_read < dataPump.no_records) { // no. read should be => than inserted. fprintf(stderr, "No. of Record's written and read not same\nRead : %ld" - " Written : %ld", replThread.no_read, dataPump.no_records); + " Written : %ld\n", replThread.no_read, dataPump.no_records); exit(1); } + fprintf(stderr, "Successful!\n"); exit(0); - fprintf(stdout, "ALL IS FINE"); } diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index b6235ba4..9babecd0 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -266,6 +266,14 @@ SequenceNumber DBWithTTL::GetLatestSequenceNumber() { return db_->GetLatestSequenceNumber(); } +Status DBWithTTL::GetSortedWalFiles(VectorLogPtr& files) { + return db_->GetSortedWalFiles(files); +} + +Status DBWithTTL::DeleteWalFiles(const VectorLogPtr& files){ + return db_->DeleteWalFiles(files); +} + Status DBWithTTL::GetUpdatesSince( SequenceNumber seq_number, unique_ptr* iter) { diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index 1285b448..dbba5c63 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -77,6 +77,10 @@ class DBWithTTL : public StackableDB { virtual Status GetLiveFiles(std::vector& vec, uint64_t* mfs); + virtual Status GetSortedWalFiles(VectorLogPtr& files); + + virtual Status DeleteWalFiles(const VectorLogPtr& files); + virtual SequenceNumber GetLatestSequenceNumber(); virtual Status GetUpdatesSince(SequenceNumber seq_number,