Summary: Also expanded class LogFile to have startSequene and FileSize and exposed it publicly
Test Plan: make all check
Reviewers: dhruba, haobo
Reviewed By: dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12087
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include "db/db_impl.h"
-#include "db/filename.h"
+#include <algorithm>
#include <string>
#include <stdint.h>
+#include "db/db_impl.h"
+#include "db/filename.h"
#include "db/version_set.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
return Status::OK();
}
+Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) {
+ // First get sorted files in archive dir, then append sorted files from main
+ // dir to maintain sorted order
+
+ // list wal files in archive dir.
+ Status s;
+ std::string archivedir = ArchivalDirectory(dbname_);
+ if (env_->FileExists(archivedir)) {
+ s = AppendSortedWalsOfType(archivedir, files, kArchivedLogFile);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ // list wal files in main db dir.
+ s = AppendSortedWalsOfType(dbname_, files, kAliveLogFile);
+ if (!s.ok()) {
+ return s;
+ }
+ return s;
+}
+
+Status DBImpl::DeleteWalFiles(const VectorLogPtr& files) {
+ Status s;
+ std::string archivedir = ArchivalDirectory(dbname_);
+ std::string files_not_deleted;
+ for (const auto& wal : files) {
+ /* Try deleting in archive dir. If fails, try deleting in main db dir.
+ * This is efficient because all except for very few wal files will be in
+ * archive. Checking for WalType is not much helpful because alive wal could
+ be archived now.
+ */
+ if (!env_->DeleteFile(archivedir + "/" + wal->Filename()).ok() &&
+ !env_->DeleteFile(dbname_ + "/" + wal->Filename()).ok()) {
+ files_not_deleted.append(wal->Filename());
+ }
+ }
+ if (!files_not_deleted.empty()) {
+ return Status::IOError("Deleted all requested files except: " +
+ files_not_deleted);
+ }
+ return Status::OK();
+}
+
}
#include "db/table_cache.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
-#include "db/transaction_log_iterator_impl.h"
+#include "db/transaction_log_impl.h"
#include "leveldb/compaction_filter.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
Status DBImpl::GetUpdatesSince(SequenceNumber seq,
unique_ptr<TransactionLogIterator>* iter) {
- // Get All Log Files.
- // Sort Files
- // Get the first entry from each file.
+ if (seq > last_flushed_sequence_) {
+ return Status::IOError("Requested sequence not yet written in the db");
+ }
+ // Get all sorted Wal Files.
// Do binary search and open files and find the seq number.
- std::vector<LogFile> walFiles;
- // list wal files in main db dir.
- Status s = ListAllWALFiles(dbname_, &walFiles, kAliveLogFile);
+ std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr);
+ Status s = GetSortedWalFiles(*wal_files);
if (!s.ok()) {
return s;
}
- // list wal files in archive dir.
- std::string archivedir = ArchivalDirectory(dbname_);
- if (env_->FileExists(archivedir)) {
- s = ListAllWALFiles(archivedir, &walFiles, kArchivedLogFile);
- if (!s.ok()) {
- return s;
- }
- }
- if (walFiles.empty()) {
+ if (wal_files->empty()) {
return Status::IOError(" NO WAL Files present in the db");
}
// std::shared_ptr would have been useful here.
- std::unique_ptr<std::vector<LogFile>> probableWALFiles(
- new std::vector<LogFile>());
- s = FindProbableWALFiles(&walFiles, probableWALFiles.get(), seq);
+ s = RetainProbableWalFiles(*wal_files, seq);
if (!s.ok()) {
return s;
}
&options_,
storage_options_,
seq,
- std::move(probableWALFiles),
+ std::move(wal_files),
&last_flushed_sequence_));
iter->get()->Next();
return iter->get()->status();
}
-Status DBImpl::FindProbableWALFiles(std::vector<LogFile>* const allLogs,
- std::vector<LogFile>* const result,
- const SequenceNumber target) {
- assert(allLogs != nullptr);
- assert(result != nullptr);
-
- std::sort(allLogs->begin(), allLogs->end());
+Status DBImpl::RetainProbableWalFiles(VectorLogPtr& all_logs,
+ const SequenceNumber target) {
long start = 0; // signed to avoid overflow when target is < first file.
- long end = static_cast<long>(allLogs->size()) - 1;
+ long end = static_cast<long>(all_logs.size()) - 1;
// Binary Search. avoid opening all files.
while (end >= start) {
long mid = start + (end - start) / 2; // Avoid overflow.
- WriteBatch batch;
- Status s = ReadFirstRecord(allLogs->at(mid), &batch);
- if (!s.ok()) {
- if (CheckFileExistsAndEmpty(allLogs->at(mid))) {
- allLogs->erase(allLogs->begin() + mid);
- --end;
- continue;
- }
- return s;
- }
- SequenceNumber currentSeqNum = WriteBatchInternal::Sequence(&batch);
- if (currentSeqNum == target) {
- start = mid;
+ SequenceNumber current_seq_num = all_logs.at(mid)->StartSequence();
+ if (current_seq_num == target) {
end = mid;
break;
- } else if (currentSeqNum < target) {
+ } else if (current_seq_num < target) {
start = mid + 1;
} else {
end = mid - 1;
}
}
- size_t startIndex = std::max(0l, end); // end could be -ve.
- for (size_t i = startIndex; i < allLogs->size(); ++i) {
- result->push_back(allLogs->at(i));
- }
- if (result->empty()) {
- return Status::IOError(
- "No probable files. Check if the db contains log files");
- }
+ size_t start_index = std::max(0l, end); // end could be -ve.
+ // The last wal file is always included
+ all_logs.erase(all_logs.begin(), all_logs.begin() + start_index);
return Status::OK();
}
-bool DBImpl::CheckFileExistsAndEmpty(const LogFile& file) {
- if (file.type == kAliveLogFile) {
- const std::string fname = LogFileName(dbname_, file.logNumber);
- uint64_t file_size;
- Status s = env_->GetFileSize(fname, &file_size);
- if (s.ok() && file_size == 0) {
- return true;
- }
- }
- const std::string fname = ArchivedLogFileName(dbname_, file.logNumber);
+bool DBImpl::CheckWalFileExistsAndEmpty(const WalFileType type,
+ const uint64_t number) {
+ const std::string fname = (type == kAliveLogFile) ?
+ LogFileName(dbname_, number) : ArchivedLogFileName(dbname_, number);
uint64_t file_size;
Status s = env_->GetFileSize(fname, &file_size);
- if (s.ok() && file_size == 0) {
- return true;
- }
- return false;
+ return (s.ok() && (file_size == 0));
}
-Status DBImpl::ReadFirstRecord(const LogFile& file, WriteBatch* const result) {
+Status DBImpl::ReadFirstRecord(const WalFileType type, const uint64_t number,
+ WriteBatch* const result) {
- if (file.type == kAliveLogFile) {
- std::string fname = LogFileName(dbname_, file.logNumber);
+ if (type == kAliveLogFile) {
+ std::string fname = LogFileName(dbname_, number);
Status status = ReadFirstLine(fname, result);
if (!status.ok()) {
// check if the file got moved to archive.
- std::string archivedFile = ArchivedLogFileName(dbname_, file.logNumber);
- Status s = ReadFirstLine(archivedFile, result);
+ std::string archived_file = ArchivedLogFileName(dbname_, number);
+ Status s = ReadFirstLine(archived_file, result);
if (!s.ok()) {
- return Status::IOError("Log File Has been deleted");
+ return Status::IOError("Log File has been deleted");
}
}
return Status::OK();
- } else if (file.type == kArchivedLogFile) {
- std::string fname = ArchivedLogFileName(dbname_, file.logNumber);
+ } else if (type == kArchivedLogFile) {
+ std::string fname = ArchivedLogFileName(dbname_, number);
Status status = ReadFirstLine(fname, result);
return status;
}
0/*initial_offset*/);
std::string scratch;
Slice record;
+
if (reader.ReadRecord(&record, &scratch) && status.ok()) {
if (record.size() < 12) {
reporter.Corruption(
return Status::IOError("Error reading from file " + fname);
}
-Status DBImpl::ListAllWALFiles(const std::string& path,
- std::vector<LogFile>* const logFiles,
- WalFileType logType) {
- assert(logFiles != nullptr);
- std::vector<std::string> allFiles;
- const Status status = env_->GetChildren(path, &allFiles);
+struct CompareLogByPointer {
+ bool operator() (const unique_ptr<LogFile>& a,
+ const unique_ptr<LogFile>& b) {
+ LogFileImpl* a_impl = dynamic_cast<LogFileImpl*>(a.get());
+ LogFileImpl* b_impl = dynamic_cast<LogFileImpl*>(b.get());
+ return *a_impl < *b_impl;
+ }
+};
+
+Status DBImpl::AppendSortedWalsOfType(const std::string& path,
+ VectorLogPtr& log_files, WalFileType log_type) {
+ std::vector<std::string> all_files;
+ const Status status = env_->GetChildren(path, &all_files);
if (!status.ok()) {
return status;
}
- for (const auto& f : allFiles) {
+ log_files.reserve(log_files.size() + all_files.size());
+ for (const auto& f : all_files) {
uint64_t number;
FileType type;
if (ParseFileName(f, &number, &type) && type == kLogFile){
- logFiles->push_back(LogFile(number, logType));
+
+ WriteBatch batch;
+ Status s = ReadFirstRecord(log_type, number, &batch);
+ if (!s.ok()) {
+ if (CheckWalFileExistsAndEmpty(log_type, number)) {
+ continue;
+ }
+ return s;
+ }
+
+ uint64_t size_bytes;
+ s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
+ if (!s.ok()) {
+ return s;
+ }
+
+ log_files.push_back(std::move(unique_ptr<LogFile>(new LogFileImpl(
+ number, log_type, WriteBatchInternal::Sequence(&batch), size_bytes))));
}
}
+ CompareLogByPointer compare_log_files;
+ std::sort(log_files.begin(), log_files.end(), compare_log_files);
return status;
}
#include <set>
#include <vector>
#include "db/dbformat.h"
-#include "db/log_file.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
+#include "leveldb/memtablerep.h"
+#include "leveldb/transaction_log.h"
#include "port/port.h"
#include "util/stats_logger.h"
#include "memtablelist.h"
-#include "leveldb/memtablerep.h"
#ifdef USE_SCRIBE
#include "scribe/scribe_logger.h"
virtual Status EnableFileDeletions();
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size);
+ virtual Status GetSortedWalFiles(VectorLogPtr& files);
+ virtual Status DeleteWalFiles(const VectorLogPtr& files);
virtual SequenceNumber GetLatestSequenceNumber();
virtual Status GetUpdatesSince(SequenceNumber seq_number,
unique_ptr<TransactionLogIterator>* iter);
void MaybeScheduleCompaction();
static void BGWork(void* db);
void BackgroundCall();
- Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state);
+ Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state);
void CleanupCompaction(CompactionState* compact);
Status DoCompactionWork(CompactionState* compact);
void PurgeObsoleteWALFiles();
- Status ListAllWALFiles(const std::string& path,
- std::vector<LogFile>* logFiles,
- WalFileType type);
+ Status AppendSortedWalsOfType(const std::string& path,
+ VectorLogPtr& log_files,
+ WalFileType type);
- // Find's all the log files which contain updates with seq no.
- // Greater Than or Equal to the requested SequenceNumber
- Status FindProbableWALFiles(std::vector<LogFile>* const allLogs,
- std::vector<LogFile>* const result,
- const SequenceNumber target);
+ // Requires: all_logs should be sorted with earliest log file first
+ // Retains all log files in all_logs which contain updates with seq no.
+ // Greater Than or Equal to the requested SequenceNumber.
+ Status RetainProbableWalFiles(VectorLogPtr& all_logs,
+ const SequenceNumber target);
// return true if
- bool CheckFileExistsAndEmpty(const LogFile& file);
+ bool CheckWalFileExistsAndEmpty(const WalFileType type,
+ const uint64_t number);
- Status ReadFirstRecord(const LogFile& file, WriteBatch* const result);
+ Status ReadFirstRecord(const WalFileType type, const uint64_t number,
+ WriteBatch* const result);
Status ReadFirstLine(const std::string& fname, WriteBatch* const batch);
return Status::OK();
}
+ virtual Status GetSortedWalFiles(VectorLogPtr& files) {
+ return Status::OK();
+ }
+
+ virtual Status DeleteWalFiles(const VectorLogPtr& files) {
+ return Status::OK();
+ }
+
virtual SequenceNumber GetLatestSequenceNumber() {
return 0;
}
static std::string MakeFileName(const std::string& name, uint64_t number,
const char* suffix) {
char buf[100];
- snprintf(buf, sizeof(buf), "/%06llu.%s",
+ snprintf(buf, sizeof(buf), "%06llu.%s",
static_cast<unsigned long long>(number),
suffix);
- return name + buf;
+ if (name.empty()) {
+ return buf;
+ }
+ return name + "/" + buf;
}
std::string LogFileName(const std::string& name, uint64_t number) {
+++ /dev/null
-// Copyright 2008-present Facebook. All Rights Reserved.
-
-#ifndef STORAGE_LEVELDB_DB_LOG_FILE_H_
-#define STORAGE_LEVELDB_DB_LOG_FILE_H_
-
-namespace leveldb {
-
-enum WalFileType {
- kArchivedLogFile = 0,
- kAliveLogFile = 1
-} ;
-
-class LogFile {
-
- public:
- uint64_t logNumber;
- WalFileType type;
-
- LogFile(uint64_t logNum,WalFileType logType) :
- logNumber(logNum),
- type(logType) {}
-
- LogFile(const LogFile& that) {
- logNumber = that.logNumber;
- type = that.type;
- }
-
- bool operator < (const LogFile& that) const {
- return logNumber < that.logNumber;
- }
-
- std::string ToString() const {
- char response[100];
- const char* typeOfLog;
- if (type == kAliveLogFile) {
- typeOfLog = "Alive Log";
- } else {
- typeOfLog = "Archived Log";
- }
- sprintf(response,
- "LogNumber : %ld LogType : %s",
- logNumber,
- typeOfLog);
- return std::string(response);
- }
-};
-} // namespace leveldb
-#endif // STORAGE_LEVELDB_DB_LOG_FILE_H_
--- /dev/null
+#include "db/transaction_log_impl.h"
+#include "db/write_batch_internal.h"
+
+namespace leveldb {
+
+TransactionLogIteratorImpl::TransactionLogIteratorImpl(
+ const std::string& dbname,
+ const Options* options,
+ const EnvOptions& soptions,
+ const SequenceNumber seq,
+ std::unique_ptr<VectorLogPtr> files,
+ SequenceNumber const * const lastFlushedSequence) :
+ dbname_(dbname),
+ options_(options),
+ soptions_(soptions),
+ startingSequenceNumber_(seq),
+ files_(std::move(files)),
+ started_(false),
+ isValid_(false),
+ currentFileIndex_(0),
+ lastFlushedSequence_(lastFlushedSequence) {
+ assert(startingSequenceNumber_ <= *lastFlushedSequence_);
+ assert(files_.get() != nullptr);
+
+ reporter_.env = options_->env;
+ reporter_.info_log = options_->info_log.get();
+}
+
+Status TransactionLogIteratorImpl::OpenLogFile(
+ const LogFile* logFile,
+ unique_ptr<SequentialFile>* file) {
+ Env* env = options_->env;
+ if (logFile->Type() == kArchivedLogFile) {
+ std::string fname = ArchivedLogFileName(dbname_, logFile->LogNumber());
+ return env->NewSequentialFile(fname, file, soptions_);
+ } else {
+ std::string fname = LogFileName(dbname_, logFile->LogNumber());
+ Status status = env->NewSequentialFile(fname, file, soptions_);
+ if (!status.ok()) {
+ // If cannot open file in DB directory.
+ // Try the archive dir, as it could have moved in the meanwhile.
+ fname = ArchivedLogFileName(dbname_, logFile->LogNumber());
+ status = env->NewSequentialFile(fname, file, soptions_);
+ if (!status.ok()) {
+ return Status::IOError(" Requested file not present in the dir");
+ }
+ }
+ return status;
+ }
+}
+
+BatchResult TransactionLogIteratorImpl::GetBatch() {
+ assert(isValid_); // cannot call in a non valid state.
+ BatchResult result;
+ result.sequence = currentSequence_;
+ result.writeBatchPtr = std::move(currentBatch_);
+ return result;
+}
+
+Status TransactionLogIteratorImpl::status() {
+ return currentStatus_;
+}
+
+bool TransactionLogIteratorImpl::Valid() {
+ return started_ && isValid_;
+}
+
+void TransactionLogIteratorImpl::Next() {
+ LogFile* currentLogFile = files_.get()->at(currentFileIndex_).get();
+
+// First seek to the given seqNo. in the current file.
+ std::string scratch;
+ Slice record;
+ if (!started_) {
+ started_ = true; // this piece only runs onced.
+ isValid_ = false;
+ if (startingSequenceNumber_ > *lastFlushedSequence_) {
+ currentStatus_ = Status::IOError("Looking for a sequence, "
+ "which is not flushed yet.");
+ return;
+ }
+ Status s = OpenLogReader(currentLogFile);
+ if (!s.ok()) {
+ currentStatus_ = s;
+ isValid_ = false;
+ return;
+ }
+ while (currentLogReader_->ReadRecord(&record, &scratch)) {
+ if (record.size() < 12) {
+ reporter_.Corruption(
+ record.size(), Status::Corruption("log record too small"));
+ continue;
+ }
+ UpdateCurrentWriteBatch(record);
+ if (currentSequence_ >= startingSequenceNumber_) {
+ assert(currentSequence_ <= *lastFlushedSequence_);
+ isValid_ = true;
+ break;
+ } else {
+ isValid_ = false;
+ }
+ }
+ if (isValid_) {
+ // Done for this iteration
+ return;
+ }
+ }
+ bool openNextFile = true;
+ while(openNextFile) {
+ assert(currentLogReader_);
+ if (currentSequence_ < *lastFlushedSequence_) {
+ if (currentLogReader_->IsEOF()) {
+ currentLogReader_->UnmarkEOF();
+ }
+ while (currentLogReader_->ReadRecord(&record, &scratch)) {
+ if (record.size() < 12) {
+ reporter_.Corruption(
+ record.size(), Status::Corruption("log record too small"));
+ continue;
+ } else {
+ UpdateCurrentWriteBatch(record);
+ openNextFile = false;
+ break;
+ }
+ }
+ }
+
+ if (openNextFile) {
+ if (currentFileIndex_ < files_.get()->size() - 1) {
+ ++currentFileIndex_;
+ Status status =OpenLogReader(files_.get()->at(currentFileIndex_).get());
+ if (!status.ok()) {
+ isValid_ = false;
+ currentStatus_ = status;
+ return;
+ }
+ } else {
+ isValid_ = false;
+ openNextFile = false;
+ if (currentSequence_ == *lastFlushedSequence_) {
+ currentStatus_ = Status::OK();
+ } else {
+ currentStatus_ = Status::IOError(" NO MORE DATA LEFT");
+ }
+ }
+ }
+ }
+}
+
+void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
+ WriteBatch* batch = new WriteBatch();
+ WriteBatchInternal::SetContents(batch, record);
+ currentSequence_ = WriteBatchInternal::Sequence(batch);
+ currentBatch_.reset(batch);
+ isValid_ = true;
+ currentStatus_ = Status::OK();
+}
+
+Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
+ unique_ptr<SequentialFile> file;
+ Status status = OpenLogFile(logFile, &file);
+ if (!status.ok()) {
+ return status;
+ }
+ assert(file);
+ currentLogReader_.reset(
+ new log::Reader(std::move(file), &reporter_, true, 0)
+ );
+ return Status::OK();
+}
+} // namespace leveldb
--- /dev/null
+// Copyright 2008-present Facebook. All Rights Reserved.
+#ifndef STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_
+#define STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_
+
+#include <vector>
+
+#include "leveldb/env.h"
+#include "leveldb/options.h"
+#include "leveldb/types.h"
+#include "leveldb/transaction_log.h"
+#include "db/log_reader.h"
+#include "db/filename.h"
+
+namespace leveldb {
+
+struct LogReporter : public log::Reader::Reporter {
+ Env* env;
+ Logger* info_log;
+ virtual void Corruption(size_t bytes, const Status& s) {
+ Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str());
+ }
+};
+
+class LogFileImpl : public LogFile {
+ public:
+ LogFileImpl(uint64_t logNum, WalFileType logType, SequenceNumber startSeq,
+ uint64_t sizeBytes) :
+ logNumber_(logNum),
+ type_(logType),
+ startSequence_(startSeq),
+ sizeFileBytes_(sizeBytes) {
+ }
+
+ std::string Filename() const { return LogFileName("", logNumber_); }
+
+ uint64_t LogNumber() const { return logNumber_; }
+
+ WalFileType Type() const { return type_; }
+
+ SequenceNumber StartSequence() const { return startSequence_; }
+
+ uint64_t SizeFileBytes() const { return sizeFileBytes_; }
+
+ bool operator < (const LogFile& that) const {
+ return LogNumber() < that.LogNumber();
+ }
+
+ private:
+ uint64_t logNumber_;
+ WalFileType type_;
+ SequenceNumber startSequence_;
+ uint64_t sizeFileBytes_;
+
+};
+
+class TransactionLogIteratorImpl : public TransactionLogIterator {
+ public:
+ TransactionLogIteratorImpl(const std::string& dbname,
+ const Options* options,
+ const EnvOptions& soptions,
+ const SequenceNumber seqNum,
+ std::unique_ptr<VectorLogPtr> files,
+ SequenceNumber const * const lastFlushedSequence);
+
+ virtual bool Valid();
+
+ virtual void Next();
+
+ virtual Status status();
+
+ virtual BatchResult GetBatch();
+
+ private:
+ const std::string& dbname_;
+ const Options* options_;
+ const EnvOptions& soptions_;
+ const SequenceNumber startingSequenceNumber_;
+ std::unique_ptr<VectorLogPtr> files_;
+ bool started_;
+ bool isValid_; // not valid when it starts of.
+ Status currentStatus_;
+ size_t currentFileIndex_;
+ std::unique_ptr<WriteBatch> currentBatch_;
+ unique_ptr<log::Reader> currentLogReader_;
+ Status OpenLogFile(const LogFile* logFile, unique_ptr<SequentialFile>* file);
+ LogReporter reporter_;
+ SequenceNumber const * const lastFlushedSequence_;
+ // represents the sequence number being read currently.
+ SequenceNumber currentSequence_;
+
+ void UpdateCurrentWriteBatch(const Slice& record);
+ Status OpenLogReader(const LogFile* file);
+};
+
+
+
+} // namespace leveldb
+#endif // STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_
+++ /dev/null
-#include "db/transaction_log_iterator_impl.h"
-#include "db/write_batch_internal.h"
-#include "db/filename.h"
-
-namespace leveldb {
-
-TransactionLogIteratorImpl::TransactionLogIteratorImpl(
- const std::string& dbname,
- const Options* options,
- const EnvOptions& soptions,
- SequenceNumber& seq,
- std::unique_ptr<std::vector<LogFile>> files,
- SequenceNumber const * const lastFlushedSequence) :
- dbname_(dbname),
- options_(options),
- soptions_(soptions),
- startingSequenceNumber_(seq),
- files_(std::move(files)),
- started_(false),
- isValid_(false),
- currentFileIndex_(0),
- lastFlushedSequence_(lastFlushedSequence) {
- assert(files_.get() != nullptr);
- assert(lastFlushedSequence_);
-
- reporter_.env = options_->env;
- reporter_.info_log = options_->info_log.get();
-}
-
-Status TransactionLogIteratorImpl::OpenLogFile(
- const LogFile& logFile,
- unique_ptr<SequentialFile>* file) {
- Env* env = options_->env;
- if (logFile.type == kArchivedLogFile) {
- std::string fname = ArchivedLogFileName(dbname_, logFile.logNumber);
- return env->NewSequentialFile(fname, file, soptions_);
- } else {
- std::string fname = LogFileName(dbname_, logFile.logNumber);
- Status status = env->NewSequentialFile(fname, file, soptions_);
- if (!status.ok()) {
- // If cannot open file in DB directory.
- // Try the archive dir, as it could have moved in the meanwhile.
- fname = ArchivedLogFileName(dbname_, logFile.logNumber);
- status = env->NewSequentialFile(fname, file, soptions_);
- if (!status.ok()) {
- return Status::IOError(" Requested file not present in the dir");
- }
- }
- return status;
- }
-}
-
-BatchResult TransactionLogIteratorImpl::GetBatch() {
- assert(isValid_); // cannot call in a non valid state.
- BatchResult result;
- result.sequence = currentSequence_;
- result.writeBatchPtr = std::move(currentBatch_);
- return result;
-}
-
-Status TransactionLogIteratorImpl::status() {
- return currentStatus_;
-}
-
-bool TransactionLogIteratorImpl::Valid() {
- return started_ && isValid_;
-}
-
-void TransactionLogIteratorImpl::Next() {
- LogFile currentLogFile = files_.get()->at(currentFileIndex_);
-
-// First seek to the given seqNo. in the current file.
- std::string scratch;
- Slice record;
- if (!started_) {
- started_ = true; // this piece only runs onced.
- isValid_ = false;
- if (startingSequenceNumber_ > *lastFlushedSequence_) {
- currentStatus_ = Status::IOError("Looking for a sequence, "
- "which is not flushed yet.");
- return;
- }
- Status s = OpenLogReader(currentLogFile);
- if (!s.ok()) {
- currentStatus_ = s;
- isValid_ = false;
- return;
- }
- while (currentLogReader_->ReadRecord(&record, &scratch)) {
- if (record.size() < 12) {
- reporter_.Corruption(
- record.size(), Status::Corruption("log record too small"));
- continue;
- }
- UpdateCurrentWriteBatch(record);
- if (currentSequence_ >= startingSequenceNumber_) {
- assert(currentSequence_ <= *lastFlushedSequence_);
- isValid_ = true;
- break;
- } else {
- isValid_ = false;
- }
- }
- if (isValid_) {
- // Done for this iteration
- return;
- }
- }
- bool openNextFile = true;
- while(openNextFile) {
- assert(currentLogReader_);
- if (currentSequence_ < *lastFlushedSequence_) {
- if (currentLogReader_->IsEOF()) {
- currentLogReader_->UnmarkEOF();
- }
- while (currentLogReader_->ReadRecord(&record, &scratch)) {
- if (record.size() < 12) {
- reporter_.Corruption(
- record.size(), Status::Corruption("log record too small"));
- continue;
- } else {
- UpdateCurrentWriteBatch(record);
- openNextFile = false;
- break;
- }
- }
- }
-
- if (openNextFile) {
- if (currentFileIndex_ < files_.get()->size() - 1) {
- ++currentFileIndex_;
- Status status = OpenLogReader(files_.get()->at(currentFileIndex_));
- if (!status.ok()) {
- isValid_ = false;
- currentStatus_ = status;
- return;
- }
- } else {
- isValid_ = false;
- openNextFile = false;
- if (currentSequence_ == *lastFlushedSequence_) {
- currentStatus_ = Status::OK();
- } else {
- currentStatus_ = Status::IOError(" NO MORE DATA LEFT");
- }
- }
- }
- }
-}
-
-void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
- WriteBatch* batch = new WriteBatch();
- WriteBatchInternal::SetContents(batch, record);
- currentSequence_ = WriteBatchInternal::Sequence(batch);
- currentBatch_.reset(batch);
- isValid_ = true;
- currentStatus_ = Status::OK();
-}
-
-Status TransactionLogIteratorImpl::OpenLogReader(const LogFile& logFile) {
- unique_ptr<SequentialFile> file;
- Status status = OpenLogFile(logFile, &file);
- if (!status.ok()) {
- return status;
- }
- assert(file);
- currentLogReader_.reset(
- new log::Reader(std::move(file), &reporter_, true, 0)
- );
- return Status::OK();
-}
-} // namespace leveldb
+++ /dev/null
-// Copyright 2008-present Facebook. All Rights Reserved.
-#ifndef STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_
-#define STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_
-
-#include <vector>
-
-#include "leveldb/env.h"
-#include "leveldb/options.h"
-#include "leveldb/types.h"
-#include "leveldb/transaction_log_iterator.h"
-#include "db/log_file.h"
-#include "db/log_reader.h"
-
-namespace leveldb {
-
-struct LogReporter : public log::Reader::Reporter {
- Env* env;
- Logger* info_log;
- virtual void Corruption(size_t bytes, const Status& s) {
- Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str());
- }
-};
-
-class TransactionLogIteratorImpl : public TransactionLogIterator {
- public:
- TransactionLogIteratorImpl(const std::string& dbname,
- const Options* options,
- const EnvOptions& soptions,
- SequenceNumber& seqNum,
- std::unique_ptr<std::vector<LogFile>> files,
- SequenceNumber const * const lastFlushedSequence);
-
- virtual bool Valid();
-
- virtual void Next();
-
- virtual Status status();
-
- virtual BatchResult GetBatch();
-
- private:
- const std::string& dbname_;
- const Options* options_;
- const EnvOptions& soptions_;
- const uint64_t startingSequenceNumber_;
- std::unique_ptr<std::vector<LogFile>> files_;
- bool started_;
- bool isValid_; // not valid when it starts of.
- Status currentStatus_;
- size_t currentFileIndex_;
- std::unique_ptr<WriteBatch> currentBatch_;
- unique_ptr<log::Reader> currentLogReader_;
- Status OpenLogFile(const LogFile& logFile, unique_ptr<SequentialFile>* file);
- LogReporter reporter_;
- SequenceNumber const * const lastFlushedSequence_;
- // represents the sequence number being read currently.
- SequenceNumber currentSequence_;
-
- void UpdateCurrentWriteBatch(const Slice& record);
- Status OpenLogReader(const LogFile& file);
-};
-
-
-
-} // namespace leveldb
-#endif // STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_
#include "leveldb/iterator.h"
#include "leveldb/options.h"
#include "leveldb/types.h"
-#include "leveldb/transaction_log_iterator.h"
+#include "leveldb/transaction_log.h"
namespace leveldb {
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size) = 0;
+ // Retrieve the sorted list of all wal files with earliest file first
+ virtual Status GetSortedWalFiles(VectorLogPtr& files) = 0;
+
+ // Delete wal files in files. These can be either live or archived.
+ // Returns Status::OK if all files could be deleted, otherwise Status::IOError
+ // which contains information about files that could not be deleted.
+ virtual Status DeleteWalFiles(const VectorLogPtr& files) = 0;
+
// The sequence number of the most recent transaction.
virtual SequenceNumber GetLatestSequenceNumber() = 0;
--- /dev/null
+// Copyright 2008-present Facebook. All Rights Reserved.
+#ifndef STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_
+#define STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_
+
+#include "leveldb/status.h"
+#include "leveldb/types.h"
+#include "leveldb/write_batch.h"
+
+namespace leveldb {
+
+class LogFile;
+typedef std::vector<std::unique_ptr<LogFile>> VectorLogPtr;
+
+enum WalFileType {
+ /* Indicates that WAL file is in archive directory. WAL files are moved from
+ * the main db directory to archive directory once they are not live and stay
+ * there for a duration of WAL_ttl_seconds which can be set in Options
+ */
+ kArchivedLogFile = 0,
+
+ /* Indicates that WAL file is live and resides in the main db directory */
+ kAliveLogFile = 1
+} ;
+
+class LogFile {
+ public:
+ LogFile() {}
+ virtual ~LogFile() {}
+
+ // Returns log file's name excluding the db path
+ virtual std::string Filename() const = 0;
+
+ // Primary identifier for log file.
+ // This is directly proportional to creation time of the log file
+ virtual uint64_t LogNumber() const = 0;
+
+ // Log file can be either alive or archived
+ virtual WalFileType Type() const = 0;
+
+ // Starting sequence number of writebatch written in this log file
+ virtual SequenceNumber StartSequence() const = 0;
+
+ // Size of log file on disk in Bytes
+ virtual uint64_t SizeFileBytes() const = 0;
+};
+
+struct BatchResult {
+ SequenceNumber sequence;
+ std::unique_ptr<WriteBatch> writeBatchPtr;
+};
+
+// A TransactionLogIterator is used to iterate over the Transaction's in a db.
+class TransactionLogIterator {
+ public:
+ TransactionLogIterator() {}
+ virtual ~TransactionLogIterator() {}
+
+ // An iterator is either positioned at a WriteBatch or not valid.
+ // This method returns true if the iterator is valid.
+ // Can read data from a valid iterator.
+ virtual bool Valid() = 0;
+
+ // Moves the iterator to the next WriteBatch.
+ // REQUIRES: Valid() to be true.
+ virtual void Next() = 0;
+
+ // Return's ok if the iterator is valid.
+ // Return the Error when something has gone wrong.
+ virtual Status status() = 0;
+
+ // If valid return's the current write_batch and the sequence number of the
+ // latest transaction contained in the batch.
+ // ONLY use if Valid() is true and status() is OK.
+ virtual BatchResult GetBatch() = 0;
+};
+} // namespace leveldb
+
+#endif // STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_
+++ /dev/null
-// Copyright 2008-present Facebook. All Rights Reserved.
-#ifndef STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_
-#define STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_
-
-#include "leveldb/status.h"
-#include "leveldb/write_batch.h"
-
-namespace leveldb {
-
-
-struct BatchResult {
- SequenceNumber sequence;
- std::unique_ptr<WriteBatch> writeBatchPtr;
-};
-
-// A TransactionLogIterator is used to iterate over the Transaction's in a db.
-class TransactionLogIterator {
- public:
- TransactionLogIterator() {}
- virtual ~TransactionLogIterator() {}
-
- // An iterator is either positioned at a WriteBatch or not valid.
- // This method returns true if the iterator is valid.
- // Can read data from a valid iterator.
- virtual bool Valid() = 0;
-
- // Moves the iterator to the next WriteBatch.
- // REQUIRES: Valid() to be true.
- virtual void Next() = 0;
-
- // Return's ok if the iterator is valid.
- // Return the Error when something has gone wrong.
- virtual Status status() = 0;
-
- // If valid return's the current write_batch and the sequence number of the
- // latest transaction contained in the batch.
- // ONLY use if Valid() is true and status() is OK.
- virtual BatchResult GetBatch() = 0;
-};
-} // namespace leveldb
-
-#endif // STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_
// Define all public custom types here.
-// Represents a sequence number in a WAL file.
+// Represents a sequence number in a WAL file.
typedef uint64_t SequenceNumber;
} // namespace leveldb
return sdb_->GetLatestSequenceNumber();
}
+ virtual Status GetSortedWalFiles(VectorLogPtr& files) override {
+ return sdb_->GetSortedWalFiles(files);
+ }
+
+ virtual Status DeleteWalFiles(const VectorLogPtr& files)
+ override{
+ return sdb_->DeleteWalFiles(files);
+ }
+
virtual Status GetUpdatesSince(SequenceNumber seq_number,
unique_ptr<TransactionLogIterator>* iter)
override {
-
#include <cstdio>
#include "db/write_batch_internal.h"
struct DataPumpThread {
size_t no_records;
DB* db; // Assumption DB is Open'ed already.
- volatile bool is_running;
};
static std::string RandomString(Random* rnd, int len) {
DB* db = t->db;
Random rnd(301);
size_t i = 0;
- t->is_running = true;
- while( i < t->no_records ) {
- db->Put(WriteOptions(),
- Slice(RandomString(&rnd, 50)),
- Slice(RandomString(&rnd, 500)));
- ++i;
+ while(i++ < t->no_records) {
+ if(!db->Put(WriteOptions(), Slice(RandomString(&rnd, 500)),
+ Slice(RandomString(&rnd, 500))).ok()) {
+ fprintf(stderr, "Error in put\n");
+ exit(1);
+ }
}
- t->is_running = false;
}
struct ReplicationThread {
port::AtomicPointer stop;
DB* db;
- volatile SequenceNumber latest;
volatile size_t no_read;
- volatile bool has_more;
};
static void ReplicationThreadBody(void* arg) {
ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg);
DB* db = t->db;
unique_ptr<TransactionLogIterator> iter;
- SequenceNumber currentSeqNum = 0;
+ SequenceNumber currentSeqNum = 1;
while (t->stop.Acquire_Load() != nullptr) {
- if (!iter) {
- db->GetUpdatesSince(currentSeqNum, &iter);
- fprintf(stdout, "Refreshing iterator\n");
- iter->Next();
- while(iter->Valid()) {
- BatchResult res = iter->GetBatch();
- if (res.sequence != currentSeqNum +1
- && res.sequence != currentSeqNum) {
- fprintf(stderr,
- "Missed a seq no. b/w %ld and %ld\n",
- currentSeqNum,
- res.sequence);
- exit(1);
- }
- currentSeqNum = res.sequence;
- t->latest = res.sequence;
- iter->Next();
- t->no_read++;
+ iter.reset();
+ Status s;
+ while(!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
+ if (t->stop.Acquire_Load() == nullptr) {
+ return;
+ }
+ }
+ fprintf(stderr, "Refreshing iterator\n");
+ for(;iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) {
+ BatchResult res = iter->GetBatch();
+ if (res.sequence != currentSeqNum) {
+ fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n", currentSeqNum,
+ res.sequence);
+ exit(1);
}
}
- iter.reset();
}
}
-
int main(int argc, const char** argv) {
- long FLAGS_num_inserts = 1000;
- long FLAGS_WAL_ttl_seconds = 1000;
+ uint64_t FLAGS_num_inserts = 1000;
+ uint64_t FLAGS_WAL_ttl_seconds = 1000;
char junk;
long l;
options.create_if_missing = true;
options.WAL_ttl_seconds = FLAGS_WAL_ttl_seconds;
DB* db;
+ DestroyDB(default_db_path, options);
Status s = DB::Open(options, default_db_path, &db);
if (!s.ok()) {
fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str());
+ exit(1);
}
DataPumpThread dataPump;
dataPump.no_records = FLAGS_num_inserts;
dataPump.db = db;
- dataPump.is_running = true;
env->StartThread(DataPumpThreadBody, &dataPump);
ReplicationThread replThread;
replThread.db = db;
replThread.no_read = 0;
- replThread.has_more = true;
replThread.stop.Release_Store(env); // store something to make it non-null.
env->StartThread(ReplicationThreadBody, &replThread);
- while(dataPump.is_running) {
- continue;
- }
+ while(replThread.no_read < FLAGS_num_inserts);
replThread.stop.Release_Store(nullptr);
- if ( replThread.no_read < dataPump.no_records ) {
+ if (replThread.no_read < dataPump.no_records) {
// no. read should be => than inserted.
fprintf(stderr, "No. of Record's written and read not same\nRead : %ld"
- " Written : %ld", replThread.no_read, dataPump.no_records);
+ " Written : %ld\n", replThread.no_read, dataPump.no_records);
exit(1);
}
+ fprintf(stderr, "Successful!\n");
exit(0);
- fprintf(stdout, "ALL IS FINE");
}
return db_->GetLatestSequenceNumber();
}
+Status DBWithTTL::GetSortedWalFiles(VectorLogPtr& files) {
+ return db_->GetSortedWalFiles(files);
+}
+
+Status DBWithTTL::DeleteWalFiles(const VectorLogPtr& files){
+ return db_->DeleteWalFiles(files);
+}
+
Status DBWithTTL::GetUpdatesSince(
SequenceNumber seq_number,
unique_ptr<TransactionLogIterator>* iter) {
virtual Status GetLiveFiles(std::vector<std::string>& vec, uint64_t* mfs);
+ virtual Status GetSortedWalFiles(VectorLogPtr& files);
+
+ virtual Status DeleteWalFiles(const VectorLogPtr& files);
+
virtual SequenceNumber GetLatestSequenceNumber();
virtual Status GetUpdatesSince(SequenceNumber seq_number,