]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
WAL log retention policy based on archive size.
authorshamdor <shamdor@fb.com>
Thu, 7 Nov 2013 02:46:28 +0000 (18:46 -0800)
committershamdor <shamdor@fb.com>
Thu, 7 Nov 2013 02:46:28 +0000 (18:46 -0800)
Summary:
Archive cleaning will still happen every WAL_ttl seconds
but archived logs will be deleted only if archive size
is greater then a WAL_size_limit value.
Empty archived logs will be deleted evety WAL_ttl.

Test Plan:
1. Unit tests pass.
2. Benchmark.

Reviewers: emayanke, dhruba, haobo, sdong, kailiu, igor

Reviewed By: emayanke

CC: leveldb
Differential Revision: https://reviews.facebook.net/D13869

db/c.cc
db/db_bench.cc
db/db_impl.cc
db/db_impl.h
db/db_test.cc
db/deletefile_test.cc
include/rocksdb/db.h
include/rocksdb/options.h
include/rocksdb/transaction_log.h
tools/db_repl_stress.cc
util/options.cc

diff --git a/db/c.cc b/db/c.cc
index 7903b7ca3723dc55333ac555bbdfbf8073381400..0d99c44dd75689d0ff15dafc3f47723e6c08974f 100644 (file)
--- a/db/c.cc
+++ b/db/c.cc
@@ -554,6 +554,11 @@ void leveldb_options_set_WAL_ttl_seconds(leveldb_options_t* opt, uint64_t ttl) {
   opt->rep.WAL_ttl_seconds = ttl;
 }
 
+void leveldb_options_set_WAL_size_limit_MB(
+    leveldb_options_t* opt, uint64_t limit) {
+  opt->rep.WAL_size_limit_MB = limit;
+}
+
 leveldb_comparator_t* leveldb_comparator_create(
     void* state,
     void (*destructor)(void*),
index e322fdef54b04d56a8f405ad373174691ef6f4e5..ba8e27fa731424905a13fd2cc8b82de84472008b 100644 (file)
@@ -397,7 +397,9 @@ DEFINE_int32(source_compaction_factor, 1, "Cap the size of data in level-K for"
              " a compaction run that compacts Level-K with Level-(K+1) (for"
              " K >= 1)");
 
-DEFINE_uint64(wal_ttl, 0, "Set the TTL for the WAL Files in seconds.");
+DEFINE_uint64(wal_ttl_seconds, 0, "Set the TTL for the WAL Files in seconds.");
+DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files"
+              " in MB.");
 
 DEFINE_bool(bufferedio, rocksdb::EnvOptions().use_os_buffer,
             "Allow buffered io using OS buffers");
@@ -1352,7 +1354,8 @@ class Benchmark {
     options.level0_slowdown_writes_trigger =
       FLAGS_level0_slowdown_writes_trigger;
     options.compression = FLAGS_compression_type_e;
-    options.WAL_ttl_seconds = FLAGS_wal_ttl;
+    options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
+    options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
     if (FLAGS_min_level_to_compress >= 0) {
       assert(FLAGS_min_level_to_compress <= FLAGS_num_levels);
       options.compression_per_level.resize(FLAGS_num_levels);
index 32985a9ca196345e79c484fac8713101f80c24f8..dd526392b8bf765507e14e4cdfaf2a54844ea199 100644 (file)
@@ -264,6 +264,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
       delete_obsolete_files_last_run_(0),
       purge_wal_files_last_run_(0),
       last_stats_dump_time_microsec_(0),
+      default_interval_to_delete_obsolete_WAL_(600),
       stall_level0_slowdown_(0),
       stall_memtable_compaction_(0),
       stall_level0_num_files_(0),
@@ -407,7 +408,7 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
 }
 
 const Status DBImpl::CreateArchivalDirectory() {
-  if (options_.WAL_ttl_seconds > 0) {
+  if (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0) {
     std::string archivalPath = ArchivalDirectory(options_.wal_dir);
     return env_->CreateDirIfMissing(archivalPath);
   }
@@ -494,7 +495,7 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) {
 Status DBImpl::DeleteLogFile(uint64_t number) {
   Status s;
   auto filename = LogFileName(options_.wal_dir, number);
-  if (options_.WAL_ttl_seconds > 0) {
+  if (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0) {
     s = env_->RenameFile(filename,
                          ArchivedLogFileName(options_.wal_dir, number));
 
@@ -613,34 +614,128 @@ void DBImpl::DeleteObsoleteFiles() {
   EvictObsoleteFiles(deletion_state);
 }
 
+// 1. Go through all archived files and
+//    a. if ttl is enabled, delete outdated files
+//    b. if archive size limit is enabled, delete empty files,
+//        compute file number and size.
+// 2. If size limit is enabled:
+//    a. compute how many files should be deleted
+//    b. get sorted non-empty archived logs
+//    c. delete what should be deleted
 void DBImpl::PurgeObsoleteWALFiles() {
+  bool const ttl_enabled = options_.WAL_ttl_seconds > 0;
+  bool const size_limit_enabled =  options_.WAL_size_limit_MB > 0;
+  if (!ttl_enabled && !size_limit_enabled) {
+    return;
+  }
+
   int64_t current_time;
   Status s = env_->GetCurrentTime(&current_time);
-  uint64_t now_seconds = static_cast<uint64_t>(current_time);
-  assert(s.ok());
+  if (!s.ok()) {
+    Log(options_.info_log, "Can't get current time: %s", s.ToString().c_str());
+    assert(false);
+    return;
+  }
+  uint64_t const now_seconds = static_cast<uint64_t>(current_time);
+  uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled) ?
+    options_.WAL_ttl_seconds / 2 : default_interval_to_delete_obsolete_WAL_;
 
-  if (options_.WAL_ttl_seconds != ULONG_MAX && options_.WAL_ttl_seconds > 0) {
-    if (purge_wal_files_last_run_ + options_.WAL_ttl_seconds > now_seconds) {
-      return;
-    }
-    std::vector<std::string> wal_files;
-    std::string archival_dir = ArchivalDirectory(options_.wal_dir);
-    env_->GetChildren(archival_dir, &wal_files);
-    for (const auto& f : wal_files) {
-      uint64_t file_m_time;
-      const std::string file_path = archival_dir + "/" + f;
-      const Status s = env_->GetFileModificationTime(file_path, &file_m_time);
-      if (s.ok() && (now_seconds - file_m_time > options_.WAL_ttl_seconds)) {
-        Status status = env_->DeleteFile(file_path);
-        if (!status.ok()) {
-          Log(options_.info_log,
-              "Failed Deleting a WAL file Error : i%s",
-              status.ToString().c_str());
+  if (purge_wal_files_last_run_ + time_to_check > now_seconds) {
+    return;
+  }
+
+  purge_wal_files_last_run_ = now_seconds;
+
+  std::string archival_dir = ArchivalDirectory(options_.wal_dir);
+  std::vector<std::string> files;
+  s = env_->GetChildren(archival_dir, &files);
+  if (!s.ok()) {
+    Log(options_.info_log, "Can't get archive files: %s", s.ToString().c_str());
+    assert(false);
+    return;
+  }
+
+  size_t log_files_num = 0;
+  uint64_t log_file_size = 0;
+
+  for (auto& f : files) {
+    uint64_t number;
+    FileType type;
+    if (ParseFileName(f, &number, &type) && type == kLogFile) {
+      std::string const file_path = archival_dir + "/" + f;
+      if (ttl_enabled) {
+        uint64_t file_m_time;
+        Status const s = env_->GetFileModificationTime(file_path,
+          &file_m_time);
+        if (!s.ok()) {
+          Log(options_.info_log, "Can't get file mod time: %s: %s",
+              file_path.c_str(), s.ToString().c_str());
+          continue;
         }
-      } // Ignore errors.
+        if (now_seconds - file_m_time > options_.WAL_ttl_seconds) {
+          Status const s = env_->DeleteFile(file_path);
+          if (!s.ok()) {
+            Log(options_.info_log, "Can't delete file: %s: %s",
+                file_path.c_str(), s.ToString().c_str());
+            continue;
+          }
+          continue;
+        }
+      }
+
+      if (size_limit_enabled) {
+        uint64_t file_size;
+        Status const s = env_->GetFileSize(file_path, &file_size);
+        if (!s.ok()) {
+          Log(options_.info_log, "Can't get file size: %s: %s",
+              file_path.c_str(), s.ToString().c_str());
+          return;
+        } else {
+          if (file_size > 0) {
+            log_file_size = std::max(log_file_size, file_size);
+            ++log_files_num;
+          } else {
+            Status s = env_->DeleteFile(file_path);
+            if (!s.ok()) {
+              Log(options_.info_log, "Can't delete file: %s: %s",
+                  file_path.c_str(), s.ToString().c_str());
+              continue;
+            }
+          }
+        }
+      }
+    }
+  }
+
+  if (0 == log_files_num || !size_limit_enabled) {
+    return;
+  }
+
+  size_t const files_keep_num = options_.WAL_size_limit_MB *
+    1024 * 1024 / log_file_size;
+  if (log_files_num <= files_keep_num) {
+    return;
+  }
+
+  size_t files_del_num = log_files_num - files_keep_num;
+  VectorLogPtr archived_logs;
+  AppendSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile);
+
+  if (files_del_num > archived_logs.size()) {
+    Log(options_.info_log, "Trying to delete more archived log files than "
+        "exist. Deleting all");
+    files_del_num = archived_logs.size();
+  }
+
+  for (size_t i = 0; i < files_del_num; ++i) {
+    std::string const file_path = archived_logs[i]->PathName();
+    Status const s = DeleteFile(file_path);
+    if (!s.ok()) {
+      Log(options_.info_log, "Can't delete file: %s: %s",
+          file_path.c_str(), s.ToString().c_str());
+      continue;
     }
   }
-  purge_wal_files_last_run_ = now_seconds;
 }
 
 // If externalTable is set, then apply recovered transactions
index 3b6be83734a0d23578965a838c06c9f744ad2661..c70ec23d4d474e3bf94d8a0f8db669809fc2dbd2 100644 (file)
@@ -116,6 +116,11 @@ class DBImpl : public DB {
   // get total level0 file size. Only for testing.
   uint64_t TEST_GetLevel0TotalSize() { return versions_->NumLevelBytes(0);}
 
+  void TEST_SetDefaultTimeToCheck(uint64_t default_interval_to_delete_obsolete_WAL)
+  {
+    default_interval_to_delete_obsolete_WAL_ = default_interval_to_delete_obsolete_WAL;
+  }
+
  protected:
   Env* const env_;
   const std::string dbname_;
@@ -323,6 +328,10 @@ class DBImpl : public DB {
   // last time stats were dumped to LOG
   std::atomic<uint64_t> last_stats_dump_time_microsec_;
 
+  // obsolete files will be deleted every this seconds if ttl deletion is
+  // enabled and archive size_limit is disabled.
+  uint64_t default_interval_to_delete_obsolete_WAL_;
+
   // These count the number of microseconds for which MakeRoomForWrite stalls.
   uint64_t stall_level0_slowdown_;
   uint64_t stall_memtable_compaction_;
index c089dcf43a79a2e218a98d6296d5487650df4e48..35f30c0f50f6b8bcbc448ba8ef48da66e959ea3f 100644 (file)
@@ -3811,28 +3811,28 @@ std::vector<std::uint64_t> ListLogFiles(Env* env, const std::string& path) {
   return std::move(log_files);
 }
 
-TEST(DBTest, WALArchival) {
+TEST(DBTest, WALArchivalTtl) {
   do {
-    std::string value(1024, '1');
     Options options = CurrentOptions();
     options.create_if_missing = true;
     options.WAL_ttl_seconds = 1000;
     DestroyAndReopen(&options);
 
-
-    //  TEST : Create DB with a ttl.
+    //  TEST : Create DB with a ttl and no size limit.
     //  Put some keys. Count the log files present in the DB just after insert.
     //  Re-open db. Causes deletion/archival to take place.
     //  Assert that the files moved under "/archive".
+    //  Reopen db with small ttl.
+    //  Assert that archive was removed.
 
     std::string archiveDir = ArchivalDirectory(dbname_);
 
     for (int i = 0; i < 10; ++i) {
       for (int j = 0; j < 10; ++j) {
-        ASSERT_OK(Put(Key(10*i+j), value));
+        ASSERT_OK(Put(Key(10 * i + j), DummyString(1024)));
       }
 
-      std::vector<uint64_t> logFiles = ListLogFiles(env_, dbname_);
+      std::vector<uint64_t> log_files = ListLogFiles(env_, dbname_);
 
       options.create_if_missing = false;
       Reopen(&options);
@@ -3840,37 +3840,78 @@ TEST(DBTest, WALArchival) {
       std::vector<uint64_t> logs = ListLogFiles(env_, archiveDir);
       std::set<uint64_t> archivedFiles(logs.begin(), logs.end());
 
-      for (auto& log : logFiles) {
+      for (auto& log : log_files) {
         ASSERT_TRUE(archivedFiles.find(log) != archivedFiles.end());
       }
     }
 
-    std::vector<uint64_t> logFiles = ListLogFiles(env_, archiveDir);
-    ASSERT_TRUE(logFiles.size() > 0);
+    std::vector<uint64_t> log_files = ListLogFiles(env_, archiveDir);
+    ASSERT_TRUE(log_files.size() > 0);
+
     options.WAL_ttl_seconds = 1;
-    env_->SleepForMicroseconds(2*1000*1000);
+    env_->SleepForMicroseconds(2 * 1000 * 1000);
     Reopen(&options);
 
-    logFiles = ListLogFiles(env_, archiveDir);
-    ASSERT_TRUE(logFiles.size() == 0);
+    log_files = ListLogFiles(env_, archiveDir);
+    ASSERT_TRUE(log_files.empty());
   } while (ChangeCompactOptions());
 }
 
-TEST(DBTest, WALClear) {
+uint64_t GetLogDirSize(std::string dir_path, SpecialEnv* env) {
+  uint64_t dir_size = 0;
+  std::vector<std::string> files;
+  env->GetChildren(dir_path, &files);
+  for (auto& f : files) {
+    uint64_t number;
+    FileType type;
+    if (ParseFileName(f, &number, &type) && type == kLogFile) {
+      std::string const file_path = dir_path + "/" + f;
+      uint64_t file_size;
+      env->GetFileSize(file_path, &file_size);
+      dir_size += file_size;
+    }
+  }
+  return dir_size;
+}
+
+TEST(DBTest, WALArchivalSizeLimit) {
   do {
     Options options = CurrentOptions();
     options.create_if_missing = true;
-    options.WAL_ttl_seconds = 1;
+    options.WAL_ttl_seconds = 0;
+    options.WAL_size_limit_MB = 1000;
+
+    // TEST : Create DB with huge size limit and no ttl.
+    // Put some keys. Count the archived log files present in the DB
+    // just after insert. Assert that there are many enough.
+    // Change size limit. Re-open db.
+    // Assert that archive is not greater than WAL_size_limit_MB.
+    // Set ttl and time_to_check_ to small values. Re-open db.
+    // Assert that there are no archived logs left.
 
-    for (int j = 0; j < 10; ++j)
-    for (int i = 0; i < 10; ++i)
-      ASSERT_OK(Put(Key(10*i+j), DummyString(1024)));
+    DestroyAndReopen(&options);
+    for (int i = 0; i < 128 * 128; ++i) {
+      ASSERT_OK(Put(Key(i), DummyString(1024)));
+    }
     Reopen(&options);
+
     std::string archive_dir = ArchivalDirectory(dbname_);
     std::vector<std::uint64_t> log_files = ListLogFiles(env_, archive_dir);
-    ASSERT_TRUE(!log_files.empty());
+    ASSERT_TRUE(log_files.size() > 2);
+
+    options.WAL_size_limit_MB = 8;
+    Reopen(&options);
+    dbfull()->TEST_PurgeObsoleteteWAL();
+
+    uint64_t archive_size = GetLogDirSize(archive_dir, env_);
+    ASSERT_TRUE(archive_size <= options.WAL_size_limit_MB * 1024 * 1024);
+
+    options.WAL_ttl_seconds = 1;
+    dbfull()->TEST_SetDefaultTimeToCheck(1);
     env_->SleepForMicroseconds(2 * 1000 * 1000);
+    Reopen(&options);
     dbfull()->TEST_PurgeObsoleteteWAL();
+
     log_files = ListLogFiles(env_, archive_dir);
     ASSERT_TRUE(log_files.empty());
   } while (ChangeCompactOptions());
index a864716cce6b0a39e32545b2349922a0f0b2ef38..324c8c69d4ccb1104699337146a8f8705c223f6d 100644 (file)
@@ -38,6 +38,7 @@ class DeleteFileTest {
     options_.target_file_size_base = 1024*1024*1000;
     options_.max_bytes_for_level_base = 1024*1024*1000;
     options_.WAL_ttl_seconds = 300; // Used to test log files
+    options_.WAL_size_limit_MB = 1024; // Used to test log files
     dbname_ = test::TmpDir() + "/deletefile_test";
     DestroyDB(dbname_, options_);
     numlevels_ = 7;
index 25614a91c1ad1e3bcb65438fcfe5f0ebc223edea..3d7bc3ab6947e46f846d1f92d1e79e49494c3dd8 100644 (file)
@@ -264,9 +264,10 @@ class DB {
   // seq_number. If the sequence number is non existent, it returns an iterator
   // at the first available seq_no after the requested seq_no
   // Returns Status::Ok if iterator is valid
-  // Must set WAL_ttl_seconds to a large value to use this api, else the WAL
-  // files will get cleared aggressively and the iterator might keep getting
-  // invalid before an update is read.
+  // Must set WAL_ttl_seconds or WAL_size_limit_MB to large values to
+  // use this api, else the WAL files will get
+  // cleared aggressively and the iterator might keep getting invalid before
+  // an update is read.
   virtual Status GetUpdatesSince(SequenceNumber seq_number,
                                  unique_ptr<TransactionLogIterator>* iter) = 0;
 
index 055019e7685346a673db16713fd18a6bfe10f1e2..08f6cf0c2cf29756c3a7c88b70425f123546748e 100644 (file)
@@ -489,15 +489,20 @@ struct Options {
   // be issued on this database.
   bool disable_auto_compactions;
 
-  // The number of seconds a WAL(write ahead log) should be kept after it has
-  // been marked as Not Live. If the value is set. The WAL files are moved to
-  // the archive directory and deleted after the given TTL.
-  // If set to 0, WAL files are deleted as soon as they are not required by
-  // the database.
-  // If set to std::numeric_limits<uint64_t>::max() the WAL files will never be
-  // deleted.
-  // Default : 0
+  // The following two fields affect how archived logs will be deleted.
+  // 1. If both set to 0, logs will be deleted asap and will not get into
+  //    the archive.
+  // 2. If WAL_ttl_seconds is 0 and WAL_size_limit_MB is not 0,
+  //    WAL files will be checked every 10 min and if total size is greater
+  //    then WAL_size_limit_MB, they will be deleted starting with the
+  //    earliest until size_limit is met. All empty files will be deleted.
+  // 3. If WAL_ttl_seconds is not 0 and WAL_size_limit_MB is 0, then
+  //    WAL files will be checked every WAL_ttl_secondsi / 2 and those which
+  //    are older than WAL_ttl_seconds will be deleted.
+  // 4. If both are not 0, WAL files will be checked every 10 min and both
+  //    checks will be performed with ttl being first.
   uint64_t WAL_ttl_seconds;
+  uint64_t WAL_size_limit_MB;
 
   // Number of bytes to preallocate (via fallocate) the manifest
   // files.  Default is 4mb, which is reasonable to reduce random IO
index e74980af60271b3572f9b8d4590d3f9b0953e78a..4da83d021e882debba9c9cbbecdb9d43de1ad3a3 100644 (file)
@@ -16,7 +16,9 @@ typedef std::vector<std::unique_ptr<LogFile>> VectorLogPtr;
 enum  WalFileType {
   /* Indicates that WAL file is in archive directory. WAL files are moved from
    * the main db directory to archive directory once they are not live and stay
-   * there for a duration of WAL_ttl_seconds which can be set in Options
+   * there until cleaned up. Files are cleaned depending on archive size
+   * (Options::WAL_size_limit_MB) and time since last cleaning
+   * (Options::WAL_ttl_seconds).
    */
   kArchivedLogFile = 0,
 
index 20fe40f3eaf3e07422415bf5ef6ae87703b55461..c22d31eb1d25e39cf4847eec112cac8bcbd840cc 100644 (file)
@@ -80,11 +80,14 @@ static void ReplicationThreadBody(void* arg) {
 
 DEFINE_uint64(num_inserts, 1000, "the num of inserts the first thread should"
               " perform.");
-DEFINE_uint64(wal_ttl, 1000, "the wal ttl for the run(in seconds)");
+DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)");
+DEFINE_uint64(wal_size_limit_MB, 10, "the wal size limit for the run"
+              "(in MB)");
 
 int main(int argc, const char** argv) {
   google::SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
-    " --num_inserts=<num_inserts> --wal_ttl=<WAL_ttl_seconds>");
+    " --num_inserts=<num_inserts> --wal_ttl_seconds=<WAL_ttl_seconds>" +
+    " --wal_size_limit_MB=<WAL_size_limit_MB>");
   google::ParseCommandLineFlags(&argc, const_cast<char***>(&argv), true);
 
   Env* env = Env::Default();
@@ -93,7 +96,8 @@ int main(int argc, const char** argv) {
   default_db_path += "db_repl_stress";
   Options options;
   options.create_if_missing = true;
-  options.WAL_ttl_seconds = FLAGS_wal_ttl;
+  options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
+  options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
   DB* db;
   DestroyDB(default_db_path, options);
 
index 170a8ae55184bb1d0523aca47a3e634775ee3e2b..3952f45faef53c6fd802048d31fc52ae4a51902c 100644 (file)
@@ -77,6 +77,7 @@ Options::Options()
       arena_block_size(0),
       disable_auto_compactions(false),
       WAL_ttl_seconds(0),
+      WAL_size_limit_MB(0),
       manifest_preallocation_size(4 * 1024 * 1024),
       purge_redundant_kvs_while_flush(true),
       allow_os_buffer(true),
@@ -237,6 +238,8 @@ Options::Dump(Logger* log) const
         disable_auto_compactions);
     Log(log,"                        Options.WAL_ttl_seconds: %ld",
         WAL_ttl_seconds);
+    Log(log,"                      Options.WAL_size_limit_MB: %ld",
+                 WAL_size_limit_MB);
     Log(log,"            Options.manifest_preallocation_size: %ld",
         manifest_preallocation_size);
     Log(log,"         Options.purge_redundant_kvs_while_flush: %d",