]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
API for getting archived log files
authorMayank Agarwal <amayank@fb.com>
Tue, 6 Aug 2013 19:54:37 +0000 (12:54 -0700)
committerMayank Agarwal <amayank@fb.com>
Mon, 19 Aug 2013 20:37:04 +0000 (13:37 -0700)
Summary: Also expanded class LogFile to have startSequene and FileSize and exposed it publicly

Test Plan: make all check

Reviewers: dhruba, haobo

Reviewed By: dhruba

CC: leveldb
Differential Revision: https://reviews.facebook.net/D12087

18 files changed:
db/db_filesnapshot.cc
db/db_impl.cc
db/db_impl.h
db/db_test.cc
db/filename.cc
db/log_file.h [deleted file]
db/transaction_log_impl.cc [new file with mode: 0644]
db/transaction_log_impl.h [new file with mode: 0644]
db/transaction_log_iterator_impl.cc [deleted file]
db/transaction_log_iterator_impl.h [deleted file]
include/leveldb/db.h
include/leveldb/transaction_log.h [new file with mode: 0644]
include/leveldb/transaction_log_iterator.h [deleted file]
include/leveldb/types.h
include/utilities/stackable_db.h
tools/db_repl_stress.cc
utilities/ttl/db_ttl.cc
utilities/ttl/db_ttl.h

index 5fd2af8612736b69abc5fb7fbc6ff9b1b115e17a..ae64392e53154cacecf06007a54086ca1846de02 100644 (file)
@@ -2,10 +2,11 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#include "db/db_impl.h"
-#include "db/filename.h"
+#include <algorithm>
 #include <string>
 #include <stdint.h>
+#include "db/db_impl.h"
+#include "db/filename.h"
 #include "db/version_set.h"
 #include "leveldb/db.h"
 #include "leveldb/env.h"
@@ -66,4 +67,47 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
   return Status::OK();
 }
 
+Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) {
+  // First get sorted files in archive dir, then append sorted files from main
+  // dir to maintain sorted order
+
+  //  list wal files in archive dir.
+  Status s;
+  std::string archivedir = ArchivalDirectory(dbname_);
+  if (env_->FileExists(archivedir)) {
+    s = AppendSortedWalsOfType(archivedir, files, kArchivedLogFile);
+    if (!s.ok()) {
+      return s;
+    }
+  }
+  // list wal files in main db dir.
+  s = AppendSortedWalsOfType(dbname_, files, kAliveLogFile);
+  if (!s.ok()) {
+    return s;
+  }
+  return s;
+}
+
+Status DBImpl::DeleteWalFiles(const VectorLogPtr& files) {
+  Status s;
+  std::string archivedir = ArchivalDirectory(dbname_);
+  std::string files_not_deleted;
+  for (const auto& wal : files) {
+    /* Try deleting in archive dir. If fails, try deleting in main db dir.
+     * This is efficient because all except for very few wal files will be in
+     * archive. Checking for WalType is not much helpful because alive wal could
+       be archived now.
+     */
+    if (!env_->DeleteFile(archivedir + "/" + wal->Filename()).ok() &&
+        !env_->DeleteFile(dbname_ + "/" + wal->Filename()).ok()) {
+      files_not_deleted.append(wal->Filename());
+    }
+  }
+  if (!files_not_deleted.empty()) {
+    return Status::IOError("Deleted all requested files except: " +
+                           files_not_deleted);
+  }
+  return Status::OK();
+}
+
 }
index f2351037b9e6e9f36a4fe3e9c583e50f0e52e567..1326148bb729daf958094081131154563b0b3a0b 100644 (file)
@@ -27,7 +27,7 @@
 #include "db/table_cache.h"
 #include "db/version_set.h"
 #include "db/write_batch_internal.h"
-#include "db/transaction_log_iterator_impl.h"
+#include "db/transaction_log_impl.h"
 #include "leveldb/compaction_filter.h"
 #include "leveldb/db.h"
 #include "leveldb/env.h"
@@ -1046,34 +1046,24 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() {
 Status DBImpl::GetUpdatesSince(SequenceNumber seq,
                                unique_ptr<TransactionLogIterator>* iter) {
 
-  //  Get All Log Files.
-  //  Sort Files
-  //  Get the first entry from each file.
+  if (seq > last_flushed_sequence_) {
+    return Status::IOError("Requested sequence not yet written in the db");
+  }
+  //  Get all sorted Wal Files.
   //  Do binary search and open files and find the seq number.
 
-  std::vector<LogFile> walFiles;
-  // list wal files in main db dir.
-  Status s = ListAllWALFiles(dbname_, &walFiles, kAliveLogFile);
+  std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr);
+  Status s = GetSortedWalFiles(*wal_files);
   if (!s.ok()) {
     return s;
   }
-  //  list wal files in archive dir.
-  std::string archivedir = ArchivalDirectory(dbname_);
-  if (env_->FileExists(archivedir)) {
-    s = ListAllWALFiles(archivedir, &walFiles, kArchivedLogFile);
-    if (!s.ok()) {
-      return s;
-    }
-  }
 
-  if (walFiles.empty()) {
+  if (wal_files->empty()) {
     return Status::IOError(" NO WAL Files present in the db");
   }
   //  std::shared_ptr would have been useful here.
 
-  std::unique_ptr<std::vector<LogFile>> probableWALFiles(
-    new std::vector<LogFile>());
-  s = FindProbableWALFiles(&walFiles, probableWALFiles.get(), seq);
+  s = RetainProbableWalFiles(*wal_files, seq);
   if (!s.ok()) {
     return s;
   }
@@ -1082,90 +1072,61 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq,
                                    &options_,
                                    storage_options_,
                                    seq,
-                                   std::move(probableWALFiles),
+                                   std::move(wal_files),
                                    &last_flushed_sequence_));
   iter->get()->Next();
   return iter->get()->status();
 }
 
-Status DBImpl::FindProbableWALFiles(std::vector<LogFile>* const allLogs,
-                                    std::vector<LogFile>* const result,
-                                    const SequenceNumber target) {
-  assert(allLogs != nullptr);
-  assert(result != nullptr);
-
-  std::sort(allLogs->begin(), allLogs->end());
+Status DBImpl::RetainProbableWalFiles(VectorLogPtr& all_logs,
+                                      const SequenceNumber target) {
   long start = 0; // signed to avoid overflow when target is < first file.
-  long end = static_cast<long>(allLogs->size()) - 1;
+  long end = static_cast<long>(all_logs.size()) - 1;
   // Binary Search. avoid opening all files.
   while (end >= start) {
     long mid = start + (end - start) / 2;  // Avoid overflow.
-    WriteBatch batch;
-    Status s = ReadFirstRecord(allLogs->at(mid), &batch);
-    if (!s.ok()) {
-      if (CheckFileExistsAndEmpty(allLogs->at(mid))) {
-        allLogs->erase(allLogs->begin() + mid);
-        --end;
-        continue;
-      }
-      return s;
-    }
-    SequenceNumber currentSeqNum = WriteBatchInternal::Sequence(&batch);
-    if (currentSeqNum == target) {
-      start = mid;
+    SequenceNumber current_seq_num = all_logs.at(mid)->StartSequence();
+    if (current_seq_num == target) {
       end = mid;
       break;
-    } else if (currentSeqNum < target) {
+    } else if (current_seq_num < target) {
       start = mid + 1;
     } else {
       end = mid - 1;
     }
   }
-  size_t startIndex = std::max(0l, end); // end could be -ve.
-  for (size_t i = startIndex; i < allLogs->size(); ++i) {
-    result->push_back(allLogs->at(i));
-  }
-  if (result->empty()) {
-    return Status::IOError(
-        "No probable files. Check if the db contains log files");
-  }
+  size_t start_index = std::max(0l, end); // end could be -ve.
+  // The last wal file is always included
+  all_logs.erase(all_logs.begin(), all_logs.begin() + start_index);
   return Status::OK();
 }
 
-bool DBImpl::CheckFileExistsAndEmpty(const LogFile& file) {
-  if (file.type == kAliveLogFile) {
-    const std::string fname = LogFileName(dbname_, file.logNumber);
-    uint64_t file_size;
-    Status s = env_->GetFileSize(fname, &file_size);
-    if (s.ok() && file_size == 0) {
-      return true;
-    }
-  }
-  const std::string fname = ArchivedLogFileName(dbname_, file.logNumber);
+bool DBImpl::CheckWalFileExistsAndEmpty(const WalFileType type,
+                                        const uint64_t number) {
+  const std::string fname = (type == kAliveLogFile) ?
+    LogFileName(dbname_, number) : ArchivedLogFileName(dbname_, number);
   uint64_t file_size;
   Status s = env_->GetFileSize(fname, &file_size);
-  if (s.ok() && file_size == 0) {
-    return true;
-  }
-  return false;
+  return (s.ok() && (file_size == 0));
 }
 
-Status DBImpl::ReadFirstRecord(const LogFile& file, WriteBatch* const result) {
+Status DBImpl::ReadFirstRecord(const WalFileType type, const uint64_t number,
+                               WriteBatch* const result) {
 
-  if (file.type == kAliveLogFile) {
-    std::string fname = LogFileName(dbname_, file.logNumber);
+  if (type == kAliveLogFile) {
+    std::string fname = LogFileName(dbname_, number);
     Status status = ReadFirstLine(fname, result);
     if (!status.ok()) {
       //  check if the file got moved to archive.
-      std::string archivedFile = ArchivedLogFileName(dbname_, file.logNumber);
-      Status s = ReadFirstLine(archivedFile, result);
+      std::string archived_file = ArchivedLogFileName(dbname_, number);
+      Status s = ReadFirstLine(archived_file, result);
       if (!s.ok()) {
-        return Status::IOError("Log File Has been deleted");
+        return Status::IOError("Log File has been deleted");
       }
     }
     return Status::OK();
-  } else if (file.type == kArchivedLogFile) {
-    std::string fname = ArchivedLogFileName(dbname_, file.logNumber);
+  } else if (type == kArchivedLogFile) {
+    std::string fname = ArchivedLogFileName(dbname_, number);
     Status status = ReadFirstLine(fname, result);
     return status;
   }
@@ -1204,6 +1165,7 @@ Status DBImpl::ReadFirstLine(const std::string& fname,
                      0/*initial_offset*/);
   std::string scratch;
   Slice record;
+
   if (reader.ReadRecord(&record, &scratch) && status.ok()) {
     if (record.size() < 12) {
       reporter.Corruption(
@@ -1217,22 +1179,49 @@ Status DBImpl::ReadFirstLine(const std::string& fname,
   return Status::IOError("Error reading from file " + fname);
 }
 
-Status DBImpl::ListAllWALFiles(const std::string& path,
-                               std::vector<LogFile>* const logFiles,
-                               WalFileType logType) {
-  assert(logFiles != nullptr);
-  std::vector<std::string> allFiles;
-  const Status status = env_->GetChildren(path, &allFiles);
+struct CompareLogByPointer {
+  bool operator() (const unique_ptr<LogFile>& a,
+                   const unique_ptr<LogFile>& b) {
+    LogFileImpl* a_impl = dynamic_cast<LogFileImpl*>(a.get());
+    LogFileImpl* b_impl = dynamic_cast<LogFileImpl*>(b.get());
+    return *a_impl < *b_impl;
+  }
+};
+
+Status DBImpl::AppendSortedWalsOfType(const std::string& path,
+    VectorLogPtr& log_files, WalFileType log_type) {
+  std::vector<std::string> all_files;
+  const Status status = env_->GetChildren(path, &all_files);
   if (!status.ok()) {
     return status;
   }
-  for (const auto& f : allFiles) {
+  log_files.reserve(log_files.size() + all_files.size());
+  for (const auto& f : all_files) {
     uint64_t number;
     FileType type;
     if (ParseFileName(f, &number, &type) && type == kLogFile){
-      logFiles->push_back(LogFile(number, logType));
+
+      WriteBatch batch;
+      Status s = ReadFirstRecord(log_type, number, &batch);
+      if (!s.ok()) {
+        if (CheckWalFileExistsAndEmpty(log_type, number)) {
+          continue;
+        }
+        return s;
+      }
+
+      uint64_t size_bytes;
+      s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
+      if (!s.ok()) {
+        return s;
+      }
+
+      log_files.push_back(std::move(unique_ptr<LogFile>(new LogFileImpl(
+        number, log_type, WriteBatchInternal::Sequence(&batch), size_bytes))));
     }
   }
+  CompareLogByPointer compare_log_files;
+  std::sort(log_files.begin(), log_files.end(), compare_log_files);
   return status;
 }
 
index c7f90b3cce163923cc2252208fd41c9c920273c9..cea8041e429a211f3e1cd8ec7c30662df03ea388 100644 (file)
 #include <set>
 #include <vector>
 #include "db/dbformat.h"
-#include "db/log_file.h"
 #include "db/log_writer.h"
 #include "db/snapshot.h"
 #include "leveldb/db.h"
 #include "leveldb/env.h"
+#include "leveldb/memtablerep.h"
+#include "leveldb/transaction_log.h"
 #include "port/port.h"
 #include "util/stats_logger.h"
 #include "memtablelist.h"
-#include "leveldb/memtablerep.h"
 
 #ifdef USE_SCRIBE
 #include "scribe/scribe_logger.h"
@@ -73,6 +73,8 @@ class DBImpl : public DB {
   virtual Status EnableFileDeletions();
   virtual Status GetLiveFiles(std::vector<std::string>&,
                               uint64_t* manifest_file_size);
+  virtual Status GetSortedWalFiles(VectorLogPtr& files);
+  virtual Status DeleteWalFiles(const VectorLogPtr& files);
   virtual SequenceNumber GetLatestSequenceNumber();
   virtual Status GetUpdatesSince(SequenceNumber seq_number,
                                  unique_ptr<TransactionLogIterator>* iter);
@@ -183,7 +185,7 @@ class DBImpl : public DB {
   void MaybeScheduleCompaction();
   static void BGWork(void* db);
   void BackgroundCall();
-  Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state);
+  Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state);
   void CleanupCompaction(CompactionState* compact);
   Status DoCompactionWork(CompactionState* compact);
 
@@ -208,19 +210,21 @@ class DBImpl : public DB {
 
   void PurgeObsoleteWALFiles();
 
-  Status ListAllWALFiles(const std::string& path,
-                         std::vector<LogFile>* logFiles,
-                         WalFileType type);
+  Status AppendSortedWalsOfType(const std::string& path,
+                                VectorLogPtr& log_files,
+                                WalFileType type);
 
-  //  Find's all the log files which contain updates with seq no.
-  //  Greater Than or Equal to the requested SequenceNumber
-  Status FindProbableWALFiles(std::vector<LogFile>* const allLogs,
-                              std::vector<LogFile>* const result,
-                              const SequenceNumber target);
+  // Requires: all_logs should be sorted with earliest log file first
+  // Retains all log files in all_logs which contain updates with seq no.
+  // Greater Than or Equal to the requested SequenceNumber.
+  Status RetainProbableWalFiles(VectorLogPtr& all_logs,
+                                const SequenceNumber target);
   //  return true if
-  bool CheckFileExistsAndEmpty(const LogFile& file);
+  bool CheckWalFileExistsAndEmpty(const WalFileType type,
+                                  const uint64_t number);
 
-  Status ReadFirstRecord(const LogFile& file, WriteBatch* const result);
+  Status ReadFirstRecord(const WalFileType type, const uint64_t number,
+                         WriteBatch* const result);
 
   Status ReadFirstLine(const std::string& fname, WriteBatch* const batch);
 
index 8cd1dda6b23b287a08ba7ff6c9fbee4229acc415..a087edf2c83ebcbd4797f02b3ccf6f93a0f83ecc 100644 (file)
@@ -3461,6 +3461,14 @@ class ModelDB: public DB {
     return Status::OK();
   }
 
+  virtual Status GetSortedWalFiles(VectorLogPtr& files) {
+    return Status::OK();
+  }
+
+  virtual Status DeleteWalFiles(const VectorLogPtr& files) {
+    return Status::OK();
+  }
+
   virtual SequenceNumber GetLatestSequenceNumber() {
     return 0;
   }
index 81a1421534f744e752a1d079d0b0b36c61d28ef1..794410cf3c60cdc71abde833b9155823018e81a0 100644 (file)
@@ -46,10 +46,13 @@ extern Status WriteStringToFileSync(Env* env, const Slice& data,
 static std::string MakeFileName(const std::string& name, uint64_t number,
                                 const char* suffix) {
   char buf[100];
-  snprintf(buf, sizeof(buf), "/%06llu.%s",
+  snprintf(buf, sizeof(buf), "%06llu.%s",
            static_cast<unsigned long long>(number),
            suffix);
-  return name + buf;
+  if (name.empty()) {
+    return buf;
+  }
+  return name + "/" + buf;
 }
 
 std::string LogFileName(const std::string& name, uint64_t number) {
diff --git a/db/log_file.h b/db/log_file.h
deleted file mode 100644 (file)
index da7808e..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-// Copyright 2008-present Facebook. All Rights Reserved.
-
-#ifndef STORAGE_LEVELDB_DB_LOG_FILE_H_
-#define STORAGE_LEVELDB_DB_LOG_FILE_H_
-
-namespace leveldb {
-
-enum  WalFileType {
-  kArchivedLogFile = 0,
-  kAliveLogFile = 1
-} ;
-
-class LogFile {
-
- public:
-  uint64_t logNumber;
-  WalFileType type;
-
-  LogFile(uint64_t logNum,WalFileType logType) :
-    logNumber(logNum),
-    type(logType) {}
-
-  LogFile(const LogFile& that) {
-   logNumber = that.logNumber;
-   type = that.type;
-  }
-
-  bool operator < (const LogFile& that) const {
-    return logNumber < that.logNumber;
-  }
-
-  std::string ToString() const {
-    char response[100];
-    const char* typeOfLog;
-    if (type == kAliveLogFile) {
-      typeOfLog = "Alive Log";
-    } else {
-      typeOfLog = "Archived Log";
-    }
-    sprintf(response,
-            "LogNumber  : %ld LogType : %s",
-            logNumber,
-            typeOfLog);
-    return std::string(response);
-  }
-};
-}  //  namespace leveldb
-#endif  // STORAGE_LEVELDB_DB_LOG_FILE_H_
diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc
new file mode 100644 (file)
index 0000000..6de0bd9
--- /dev/null
@@ -0,0 +1,171 @@
+#include "db/transaction_log_impl.h"
+#include "db/write_batch_internal.h"
+
+namespace leveldb {
+
+TransactionLogIteratorImpl::TransactionLogIteratorImpl(
+                           const std::string& dbname,
+                           const Options* options,
+                           const EnvOptions& soptions,
+                           const SequenceNumber seq,
+                           std::unique_ptr<VectorLogPtr> files,
+                           SequenceNumber const * const lastFlushedSequence) :
+    dbname_(dbname),
+    options_(options),
+    soptions_(soptions),
+    startingSequenceNumber_(seq),
+    files_(std::move(files)),
+    started_(false),
+    isValid_(false),
+    currentFileIndex_(0),
+    lastFlushedSequence_(lastFlushedSequence) {
+  assert(startingSequenceNumber_ <= *lastFlushedSequence_);
+  assert(files_.get() != nullptr);
+
+  reporter_.env = options_->env;
+  reporter_.info_log = options_->info_log.get();
+}
+
+Status TransactionLogIteratorImpl::OpenLogFile(
+  const LogFile* logFile,
+  unique_ptr<SequentialFile>* file) {
+  Env* env = options_->env;
+  if (logFile->Type() == kArchivedLogFile) {
+    std::string fname = ArchivedLogFileName(dbname_, logFile->LogNumber());
+    return env->NewSequentialFile(fname, file, soptions_);
+  } else {
+    std::string fname = LogFileName(dbname_, logFile->LogNumber());
+    Status status = env->NewSequentialFile(fname, file, soptions_);
+    if (!status.ok()) {
+      //  If cannot open file in DB directory.
+      //  Try the archive dir, as it could have moved in the meanwhile.
+      fname = ArchivedLogFileName(dbname_, logFile->LogNumber());
+      status = env->NewSequentialFile(fname, file, soptions_);
+      if (!status.ok()) {
+        return Status::IOError(" Requested file not present in the dir");
+      }
+    }
+    return status;
+  }
+}
+
+BatchResult TransactionLogIteratorImpl::GetBatch()  {
+  assert(isValid_);  //  cannot call in a non valid state.
+  BatchResult result;
+  result.sequence = currentSequence_;
+  result.writeBatchPtr = std::move(currentBatch_);
+  return result;
+}
+
+Status TransactionLogIteratorImpl::status() {
+  return currentStatus_;
+}
+
+bool TransactionLogIteratorImpl::Valid() {
+  return started_ && isValid_;
+}
+
+void TransactionLogIteratorImpl::Next() {
+  LogFile* currentLogFile = files_.get()->at(currentFileIndex_).get();
+
+//  First seek to the given seqNo. in the current file.
+  std::string scratch;
+  Slice record;
+  if (!started_) {
+    started_ = true;  // this piece only runs onced.
+    isValid_ = false;
+    if (startingSequenceNumber_ > *lastFlushedSequence_) {
+      currentStatus_ = Status::IOError("Looking for a sequence, "
+                                        "which is not flushed yet.");
+      return;
+    }
+    Status s = OpenLogReader(currentLogFile);
+    if (!s.ok()) {
+      currentStatus_ = s;
+      isValid_ = false;
+      return;
+    }
+    while (currentLogReader_->ReadRecord(&record, &scratch)) {
+      if (record.size() < 12) {
+        reporter_.Corruption(
+          record.size(), Status::Corruption("log record too small"));
+        continue;
+      }
+      UpdateCurrentWriteBatch(record);
+      if (currentSequence_ >= startingSequenceNumber_) {
+        assert(currentSequence_ <= *lastFlushedSequence_);
+        isValid_ = true;
+        break;
+      } else {
+        isValid_ = false;
+      }
+    }
+    if (isValid_) {
+      // Done for this iteration
+      return;
+    }
+  }
+  bool openNextFile = true;
+  while(openNextFile) {
+    assert(currentLogReader_);
+    if (currentSequence_ < *lastFlushedSequence_) {
+      if (currentLogReader_->IsEOF()) {
+        currentLogReader_->UnmarkEOF();
+      }
+      while (currentLogReader_->ReadRecord(&record, &scratch)) {
+        if (record.size() < 12) {
+          reporter_.Corruption(
+            record.size(), Status::Corruption("log record too small"));
+          continue;
+        } else {
+          UpdateCurrentWriteBatch(record);
+          openNextFile = false;
+          break;
+        }
+      }
+    }
+
+    if (openNextFile) {
+      if (currentFileIndex_ < files_.get()->size() - 1) {
+        ++currentFileIndex_;
+        Status status =OpenLogReader(files_.get()->at(currentFileIndex_).get());
+        if (!status.ok()) {
+          isValid_ = false;
+          currentStatus_ = status;
+          return;
+        }
+      } else {
+        isValid_ = false;
+        openNextFile = false;
+        if (currentSequence_ == *lastFlushedSequence_) {
+          currentStatus_ = Status::OK();
+        } else {
+          currentStatus_ = Status::IOError(" NO MORE DATA LEFT");
+        }
+      }
+    }
+  }
+}
+
+void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
+  WriteBatch* batch = new WriteBatch();
+  WriteBatchInternal::SetContents(batch, record);
+  currentSequence_ = WriteBatchInternal::Sequence(batch);
+  currentBatch_.reset(batch);
+  isValid_ = true;
+  currentStatus_ = Status::OK();
+}
+
+Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
+  unique_ptr<SequentialFile> file;
+  Status status = OpenLogFile(logFile, &file);
+  if (!status.ok()) {
+    return status;
+  }
+  assert(file);
+  currentLogReader_.reset(
+    new log::Reader(std::move(file), &reporter_, true, 0)
+  );
+  return Status::OK();
+}
+}  //  namespace leveldb
diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h
new file mode 100644 (file)
index 0000000..27ab193
--- /dev/null
@@ -0,0 +1,98 @@
+// Copyright 2008-present Facebook. All Rights Reserved.
+#ifndef STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_
+#define STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_
+
+#include <vector>
+
+#include "leveldb/env.h"
+#include "leveldb/options.h"
+#include "leveldb/types.h"
+#include "leveldb/transaction_log.h"
+#include "db/log_reader.h"
+#include "db/filename.h"
+
+namespace leveldb {
+
+struct LogReporter : public log::Reader::Reporter {
+  Env* env;
+  Logger* info_log;
+  virtual void Corruption(size_t bytes, const Status& s) {
+    Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str());
+  }
+};
+
+class LogFileImpl : public LogFile {
+ public:
+  LogFileImpl(uint64_t logNum, WalFileType logType, SequenceNumber startSeq,
+              uint64_t sizeBytes) :
+    logNumber_(logNum),
+    type_(logType),
+    startSequence_(startSeq),
+    sizeFileBytes_(sizeBytes) {
+  }
+
+  std::string Filename() const { return LogFileName("", logNumber_); }
+
+  uint64_t LogNumber() const { return logNumber_; }
+
+  WalFileType Type() const { return type_; }
+
+  SequenceNumber StartSequence() const { return startSequence_; }
+
+  uint64_t SizeFileBytes() const { return sizeFileBytes_; }
+
+  bool operator < (const LogFile& that) const {
+    return LogNumber() < that.LogNumber();
+  }
+
+ private:
+  uint64_t logNumber_;
+  WalFileType type_;
+  SequenceNumber startSequence_;
+  uint64_t sizeFileBytes_;
+
+};
+
+class TransactionLogIteratorImpl : public TransactionLogIterator {
+ public:
+  TransactionLogIteratorImpl(const std::string& dbname,
+                             const Options* options,
+                             const EnvOptions& soptions,
+                             const SequenceNumber seqNum,
+                             std::unique_ptr<VectorLogPtr> files,
+                             SequenceNumber const * const lastFlushedSequence);
+
+  virtual bool Valid();
+
+  virtual void Next();
+
+  virtual Status status();
+
+  virtual BatchResult GetBatch();
+
+ private:
+  const std::string& dbname_;
+  const Options* options_;
+  const EnvOptions& soptions_;
+  const SequenceNumber startingSequenceNumber_;
+  std::unique_ptr<VectorLogPtr> files_;
+  bool started_;
+  bool isValid_;  // not valid when it starts of.
+  Status currentStatus_;
+  size_t currentFileIndex_;
+  std::unique_ptr<WriteBatch> currentBatch_;
+  unique_ptr<log::Reader> currentLogReader_;
+  Status OpenLogFile(const LogFile* logFile, unique_ptr<SequentialFile>* file);
+  LogReporter reporter_;
+  SequenceNumber const * const lastFlushedSequence_;
+  // represents the sequence number being read currently.
+  SequenceNumber currentSequence_;
+
+  void UpdateCurrentWriteBatch(const Slice& record);
+  Status OpenLogReader(const LogFile* file);
+};
+
+
+
+}  //  namespace leveldb
+#endif  //  STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_
diff --git a/db/transaction_log_iterator_impl.cc b/db/transaction_log_iterator_impl.cc
deleted file mode 100644 (file)
index f8e4f8b..0000000
+++ /dev/null
@@ -1,172 +0,0 @@
-#include "db/transaction_log_iterator_impl.h"
-#include "db/write_batch_internal.h"
-#include "db/filename.h"
-
-namespace leveldb {
-
-TransactionLogIteratorImpl::TransactionLogIteratorImpl(
-                           const std::string& dbname,
-                           const Options* options,
-                           const EnvOptions& soptions,
-                           SequenceNumber& seq,
-                           std::unique_ptr<std::vector<LogFile>> files,
-                           SequenceNumber const * const lastFlushedSequence) :
-  dbname_(dbname),
-  options_(options),
-  soptions_(soptions),
-  startingSequenceNumber_(seq),
-  files_(std::move(files)),
-  started_(false),
-  isValid_(false),
-  currentFileIndex_(0),
-  lastFlushedSequence_(lastFlushedSequence) {
-  assert(files_.get() != nullptr);
-  assert(lastFlushedSequence_);
-
-  reporter_.env = options_->env;
-  reporter_.info_log = options_->info_log.get();
-}
-
-Status TransactionLogIteratorImpl::OpenLogFile(
-  const LogFile& logFile,
-  unique_ptr<SequentialFile>* file) {
-  Env* env = options_->env;
-  if (logFile.type == kArchivedLogFile) {
-    std::string fname = ArchivedLogFileName(dbname_, logFile.logNumber);
-    return env->NewSequentialFile(fname, file, soptions_);
-  } else {
-    std::string fname = LogFileName(dbname_, logFile.logNumber);
-    Status status = env->NewSequentialFile(fname, file, soptions_);
-    if (!status.ok()) {
-      //  If cannot open file in DB directory.
-      //  Try the archive dir, as it could have moved in the meanwhile.
-      fname = ArchivedLogFileName(dbname_, logFile.logNumber);
-      status = env->NewSequentialFile(fname, file, soptions_);
-      if (!status.ok()) {
-        return Status::IOError(" Requested file not present in the dir");
-      }
-    }
-    return status;
-  }
-}
-
-BatchResult TransactionLogIteratorImpl::GetBatch()  {
-  assert(isValid_);  //  cannot call in a non valid state.
-  BatchResult result;
-  result.sequence = currentSequence_;
-  result.writeBatchPtr = std::move(currentBatch_);
-  return result;
-}
-
-Status TransactionLogIteratorImpl::status() {
-  return currentStatus_;
-}
-
-bool TransactionLogIteratorImpl::Valid() {
-  return started_ && isValid_;
-}
-
-void TransactionLogIteratorImpl::Next() {
-  LogFile currentLogFile = files_.get()->at(currentFileIndex_);
-
-//  First seek to the given seqNo. in the current file.
-  std::string scratch;
-  Slice record;
-  if (!started_) {
-    started_ = true;  // this piece only runs onced.
-    isValid_ = false;
-    if (startingSequenceNumber_ > *lastFlushedSequence_) {
-      currentStatus_ = Status::IOError("Looking for a sequence, "
-                                        "which is not flushed yet.");
-      return;
-    }
-    Status s = OpenLogReader(currentLogFile);
-    if (!s.ok()) {
-      currentStatus_ = s;
-      isValid_ = false;
-      return;
-    }
-    while (currentLogReader_->ReadRecord(&record, &scratch)) {
-      if (record.size() < 12) {
-        reporter_.Corruption(
-          record.size(), Status::Corruption("log record too small"));
-        continue;
-      }
-      UpdateCurrentWriteBatch(record);
-      if (currentSequence_ >= startingSequenceNumber_) {
-        assert(currentSequence_ <= *lastFlushedSequence_);
-        isValid_ = true;
-        break;
-      } else {
-        isValid_ = false;
-      }
-    }
-    if (isValid_) {
-      // Done for this iteration
-      return;
-    }
-  }
-  bool openNextFile = true;
-  while(openNextFile) {
-    assert(currentLogReader_);
-    if (currentSequence_ < *lastFlushedSequence_) {
-      if (currentLogReader_->IsEOF()) {
-        currentLogReader_->UnmarkEOF();
-      }
-      while (currentLogReader_->ReadRecord(&record, &scratch)) {
-        if (record.size() < 12) {
-          reporter_.Corruption(
-            record.size(), Status::Corruption("log record too small"));
-          continue;
-        } else {
-          UpdateCurrentWriteBatch(record);
-          openNextFile = false;
-          break;
-        }
-      }
-    }
-
-    if (openNextFile) {
-      if (currentFileIndex_ < files_.get()->size() - 1) {
-        ++currentFileIndex_;
-        Status status = OpenLogReader(files_.get()->at(currentFileIndex_));
-        if (!status.ok()) {
-          isValid_ = false;
-          currentStatus_ = status;
-          return;
-        }
-      } else {
-        isValid_ = false;
-        openNextFile = false;
-        if (currentSequence_ == *lastFlushedSequence_) {
-          currentStatus_ = Status::OK();
-        } else {
-          currentStatus_ = Status::IOError(" NO MORE DATA LEFT");
-        }
-      }
-    }
-  }
-}
-
-void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
-  WriteBatch* batch = new WriteBatch();
-  WriteBatchInternal::SetContents(batch, record);
-  currentSequence_ = WriteBatchInternal::Sequence(batch);
-  currentBatch_.reset(batch);
-  isValid_ = true;
-  currentStatus_ = Status::OK();
-}
-
-Status TransactionLogIteratorImpl::OpenLogReader(const LogFile& logFile) {
-  unique_ptr<SequentialFile> file;
-  Status status = OpenLogFile(logFile, &file);
-  if (!status.ok()) {
-    return status;
-  }
-  assert(file);
-  currentLogReader_.reset(
-    new log::Reader(std::move(file), &reporter_, true, 0)
-  );
-  return Status::OK();
-}
-}  //  namespace leveldb
diff --git a/db/transaction_log_iterator_impl.h b/db/transaction_log_iterator_impl.h
deleted file mode 100644 (file)
index faf07f4..0000000
+++ /dev/null
@@ -1,66 +0,0 @@
-// Copyright 2008-present Facebook. All Rights Reserved.
-#ifndef STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_
-#define STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_
-
-#include <vector>
-
-#include "leveldb/env.h"
-#include "leveldb/options.h"
-#include "leveldb/types.h"
-#include "leveldb/transaction_log_iterator.h"
-#include "db/log_file.h"
-#include "db/log_reader.h"
-
-namespace leveldb {
-
-struct LogReporter : public log::Reader::Reporter {
-  Env* env;
-  Logger* info_log;
-  virtual void Corruption(size_t bytes, const Status& s) {
-    Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str());
-  }
-};
-
-class TransactionLogIteratorImpl : public TransactionLogIterator {
- public:
-  TransactionLogIteratorImpl(const std::string& dbname,
-                             const Options* options,
-                             const EnvOptions& soptions,
-                             SequenceNumber& seqNum,
-                             std::unique_ptr<std::vector<LogFile>> files,
-                             SequenceNumber const * const lastFlushedSequence);
-
-  virtual bool Valid();
-
-  virtual void Next();
-
-  virtual Status status();
-
-  virtual BatchResult GetBatch();
-
- private:
-  const std::string& dbname_;
-  const Options* options_;
-  const EnvOptions& soptions_;
-  const uint64_t startingSequenceNumber_;
-  std::unique_ptr<std::vector<LogFile>> files_;
-  bool started_;
-  bool isValid_;  // not valid when it starts of.
-  Status currentStatus_;
-  size_t currentFileIndex_;
-  std::unique_ptr<WriteBatch> currentBatch_;
-  unique_ptr<log::Reader> currentLogReader_;
-  Status OpenLogFile(const LogFile& logFile, unique_ptr<SequentialFile>* file);
-  LogReporter reporter_;
-  SequenceNumber const * const lastFlushedSequence_;
-  // represents the sequence number being read currently.
-  SequenceNumber currentSequence_;
-
-  void UpdateCurrentWriteBatch(const Slice& record);
-  Status OpenLogReader(const LogFile& file);
-};
-
-
-
-}  //  namespace leveldb
-#endif  //  STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_
index 0c056c362f6d23b68ac376a6a7d83e1d5583ddc0..d0316ee4ff945d2966b9e9aaad479b6c396ca609 100644 (file)
@@ -12,7 +12,7 @@
 #include "leveldb/iterator.h"
 #include "leveldb/options.h"
 #include "leveldb/types.h"
-#include "leveldb/transaction_log_iterator.h"
+#include "leveldb/transaction_log.h"
 
 namespace leveldb {
 
@@ -232,6 +232,14 @@ class DB {
   virtual Status GetLiveFiles(std::vector<std::string>&,
                               uint64_t* manifest_file_size) = 0;
 
+  // Retrieve the sorted list of all wal files with earliest file first
+  virtual Status GetSortedWalFiles(VectorLogPtr& files) = 0;
+
+  // Delete wal files in files. These can be either live or archived.
+  // Returns Status::OK if all files could be deleted, otherwise Status::IOError
+  // which contains information about files that could not be deleted.
+  virtual Status DeleteWalFiles(const VectorLogPtr& files) = 0;
+
   // The sequence number of the most recent transaction.
   virtual SequenceNumber GetLatestSequenceNumber() = 0;
 
diff --git a/include/leveldb/transaction_log.h b/include/leveldb/transaction_log.h
new file mode 100644 (file)
index 0000000..a834afa
--- /dev/null
@@ -0,0 +1,78 @@
+// Copyright 2008-present Facebook. All Rights Reserved.
+#ifndef STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_
+#define STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_
+
+#include "leveldb/status.h"
+#include "leveldb/types.h"
+#include "leveldb/write_batch.h"
+
+namespace leveldb {
+
+class LogFile;
+typedef std::vector<std::unique_ptr<LogFile>> VectorLogPtr;
+
+enum  WalFileType {
+  /* Indicates that WAL file is in archive directory. WAL files are moved from
+   * the main db directory to archive directory once they are not live and stay
+   * there for a duration of WAL_ttl_seconds which can be set in Options
+   */
+  kArchivedLogFile = 0,
+
+  /* Indicates that WAL file is live and resides in the main db directory */
+  kAliveLogFile = 1
+} ;
+
+class LogFile {
+ public:
+  LogFile() {}
+  virtual ~LogFile() {}
+
+  // Returns log file's name excluding the db path
+  virtual std::string Filename() const = 0;
+
+  // Primary identifier for log file.
+  // This is directly proportional to creation time of the log file
+  virtual uint64_t LogNumber() const = 0;
+
+  // Log file can be either alive or archived
+  virtual WalFileType Type() const = 0;
+
+  // Starting sequence number of writebatch written in this log file
+  virtual SequenceNumber StartSequence() const = 0;
+
+  // Size of log file on disk in Bytes
+  virtual uint64_t SizeFileBytes() const = 0;
+};
+
+struct BatchResult {
+  SequenceNumber sequence;
+  std::unique_ptr<WriteBatch> writeBatchPtr;
+};
+
+//  A TransactionLogIterator is used to iterate over the Transaction's in a db.
+class TransactionLogIterator {
+ public:
+  TransactionLogIterator() {}
+  virtual ~TransactionLogIterator() {}
+
+  // An iterator is either positioned at a WriteBatch or not valid.
+  // This method returns true if the iterator is valid.
+  // Can read data from a valid iterator.
+  virtual bool Valid() = 0;
+
+  // Moves the iterator to the next WriteBatch.
+  // REQUIRES: Valid() to be true.
+  virtual void Next() = 0;
+
+  // Return's ok if the iterator is valid.
+  // Return the Error when something has gone wrong.
+  virtual Status status() = 0;
+
+  // If valid return's the current write_batch and the sequence number of the
+  // latest transaction contained in the batch.
+  // ONLY use if Valid() is true and status() is OK.
+  virtual BatchResult GetBatch() = 0;
+};
+} //  namespace leveldb
+
+#endif  // STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_
diff --git a/include/leveldb/transaction_log_iterator.h b/include/leveldb/transaction_log_iterator.h
deleted file mode 100644 (file)
index d755d82..0000000
+++ /dev/null
@@ -1,42 +0,0 @@
-// Copyright 2008-present Facebook. All Rights Reserved.
-#ifndef STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_
-#define STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_
-
-#include "leveldb/status.h"
-#include "leveldb/write_batch.h"
-
-namespace leveldb {
-
-
-struct BatchResult {
-  SequenceNumber sequence;
-  std::unique_ptr<WriteBatch> writeBatchPtr;
-};
-
-//  A TransactionLogIterator is used to iterate over the Transaction's in a db.
-class TransactionLogIterator {
- public:
-  TransactionLogIterator() {}
-  virtual ~TransactionLogIterator() {}
-
-  // An iterator is either positioned at a WriteBatch or not valid.
-  // This method returns true if the iterator is valid.
-  // Can read data from a valid iterator.
-  virtual bool Valid() = 0;
-
-  // Moves the iterator to the next WriteBatch.
-  // REQUIRES: Valid() to be true.
-  virtual void Next() = 0;
-
-  // Return's ok if the iterator is valid.
-  // Return the Error when something has gone wrong.
-  virtual Status status() = 0;
-
-  // If valid return's the current write_batch and the sequence number of the
-  // latest transaction contained in the batch.
-  // ONLY use if Valid() is true and status() is OK.
-  virtual BatchResult GetBatch() = 0;
-};
-} //  namespace leveldb
-
-#endif  // STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_
index 6ec058c2e120833de3b977ba2ebe07a8682b46b3..1d91094614483ac4526d2c80e727b548e929e3ce 100644 (file)
@@ -7,7 +7,7 @@ namespace leveldb {
 
 // Define all public custom types here.
 
-//  Represents a sequence number in a WAL file.
+// Represents a sequence number in a WAL file.
 typedef uint64_t SequenceNumber;
 
 }  //  namespace leveldb
index 916496e0496c683330a8b8082405f40bead59bf1..08930d194bb9c26e66c667388ab9bcce9f70dcf2 100644 (file)
@@ -139,6 +139,15 @@ class StackableDB : public DB {
     return sdb_->GetLatestSequenceNumber();
   }
 
+  virtual Status GetSortedWalFiles(VectorLogPtr& files) override {
+    return sdb_->GetSortedWalFiles(files);
+  }
+
+  virtual Status DeleteWalFiles(const VectorLogPtr& files)
+    override{
+      return sdb_->DeleteWalFiles(files);
+  }
+
   virtual Status GetUpdatesSince(SequenceNumber seq_number,
                                  unique_ptr<TransactionLogIterator>* iter)
     override {
index 012a3df01da30af9110b18fa00901626cdce6724..010fa573df102ec06f4cee1ee1349545cdeb2ddb 100644 (file)
@@ -1,4 +1,3 @@
-
 #include <cstdio>
 
 #include "db/write_batch_internal.h"
@@ -19,7 +18,6 @@ using namespace leveldb;
 struct DataPumpThread {
   size_t no_records;
   DB* db; // Assumption DB is Open'ed already.
-  volatile bool is_running;
 };
 
 static std::string RandomString(Random* rnd, int len) {
@@ -33,59 +31,50 @@ static void DataPumpThreadBody(void* arg) {
   DB* db = t->db;
   Random rnd(301);
   size_t i = 0;
-  t->is_running = true;
-  while( i < t->no_records ) {
-    db->Put(WriteOptions(),
-            Slice(RandomString(&rnd, 50)),
-            Slice(RandomString(&rnd, 500)));
-    ++i;
+  while(i++ < t->no_records) {
+    if(!db->Put(WriteOptions(), Slice(RandomString(&rnd, 500)),
+                Slice(RandomString(&rnd, 500))).ok()) {
+      fprintf(stderr, "Error in put\n");
+      exit(1);
+    }
   }
-  t->is_running = false;
 }
 
 struct ReplicationThread {
   port::AtomicPointer stop;
   DB* db;
-  volatile SequenceNumber latest;
   volatile size_t no_read;
-  volatile bool has_more;
 };
 
 static void ReplicationThreadBody(void* arg) {
   ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg);
   DB* db = t->db;
   unique_ptr<TransactionLogIterator> iter;
-  SequenceNumber currentSeqNum = 0;
+  SequenceNumber currentSeqNum = 1;
   while (t->stop.Acquire_Load() != nullptr) {
-    if (!iter) {
-      db->GetUpdatesSince(currentSeqNum, &iter);
-      fprintf(stdout, "Refreshing iterator\n");
-      iter->Next();
-      while(iter->Valid()) {
-        BatchResult res = iter->GetBatch();
-        if (res.sequence != currentSeqNum +1
-            && res.sequence != currentSeqNum) {
-          fprintf(stderr,
-                  "Missed a seq no. b/w %ld and %ld\n",
-                  currentSeqNum,
-                  res.sequence);
-          exit(1);
-        }
-        currentSeqNum = res.sequence;
-        t->latest = res.sequence;
-        iter->Next();
-        t->no_read++;
+    iter.reset();
+    Status s;
+    while(!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
+      if (t->stop.Acquire_Load() == nullptr) {
+        return;
+      }
+    }
+    fprintf(stderr, "Refreshing iterator\n");
+    for(;iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) {
+      BatchResult res = iter->GetBatch();
+      if (res.sequence != currentSeqNum) {
+        fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n", currentSeqNum,
+                        res.sequence);
+        exit(1);
       }
     }
-    iter.reset();
   }
 }
 
-
 int main(int argc, const char** argv) {
 
-  long FLAGS_num_inserts = 1000;
-  long FLAGS_WAL_ttl_seconds = 1000;
+  uint64_t FLAGS_num_inserts = 1000;
+  uint64_t FLAGS_WAL_ttl_seconds = 1000;
   char junk;
   long l;
 
@@ -108,36 +97,34 @@ int main(int argc, const char** argv) {
   options.create_if_missing = true;
   options.WAL_ttl_seconds = FLAGS_WAL_ttl_seconds;
   DB* db;
+  DestroyDB(default_db_path, options);
 
   Status s = DB::Open(options, default_db_path, &db);
 
   if (!s.ok()) {
     fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str());
+    exit(1);
   }
 
   DataPumpThread dataPump;
   dataPump.no_records = FLAGS_num_inserts;
   dataPump.db = db;
-  dataPump.is_running = true;
   env->StartThread(DataPumpThreadBody, &dataPump);
 
   ReplicationThread replThread;
   replThread.db = db;
   replThread.no_read = 0;
-  replThread.has_more = true;
   replThread.stop.Release_Store(env); // store something to make it non-null.
 
   env->StartThread(ReplicationThreadBody, &replThread);
-  while(dataPump.is_running) {
-    continue;
-  }
+  while(replThread.no_read < FLAGS_num_inserts);
   replThread.stop.Release_Store(nullptr);
-  if ( replThread.no_read < dataPump.no_records ) {
+  if (replThread.no_read < dataPump.no_records) {
     // no. read should be => than inserted.
     fprintf(stderr, "No. of Record's written and read not same\nRead : %ld"
-            " Written : %ld", replThread.no_read, dataPump.no_records);
+            " Written : %ld\n", replThread.no_read, dataPump.no_records);
     exit(1);
   }
+  fprintf(stderr, "Successful!\n");
   exit(0);
-  fprintf(stdout, "ALL IS FINE");
 }
index b6235ba4a4eedf1c821ba59dd41c2ba0b857f4cb..9babecd0fab25230eeccc5aef54846e711ca112b 100644 (file)
@@ -266,6 +266,14 @@ SequenceNumber DBWithTTL::GetLatestSequenceNumber() {
   return db_->GetLatestSequenceNumber();
 }
 
+Status DBWithTTL::GetSortedWalFiles(VectorLogPtr& files) {
+  return db_->GetSortedWalFiles(files);
+}
+
+Status DBWithTTL::DeleteWalFiles(const VectorLogPtr& files){
+  return db_->DeleteWalFiles(files);
+}
+
 Status DBWithTTL::GetUpdatesSince(
     SequenceNumber seq_number,
     unique_ptr<TransactionLogIterator>* iter) {
index 1285b448e26f78132c0ef197eab2e34c4dabf825..dbba5c63ae4a91c19818108914374eb4e5893ac8 100644 (file)
@@ -77,6 +77,10 @@ class DBWithTTL : public StackableDB {
 
   virtual Status GetLiveFiles(std::vector<std::string>& vec, uint64_t* mfs);
 
+  virtual Status GetSortedWalFiles(VectorLogPtr& files);
+
+  virtual Status DeleteWalFiles(const VectorLogPtr& files);
+
   virtual SequenceNumber GetLatestSequenceNumber();
 
   virtual Status GetUpdatesSince(SequenceNumber seq_number,