]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
FIFO compaction style
authorIgor Canadi <icanadi@fb.com>
Wed, 21 May 2014 18:43:35 +0000 (11:43 -0700)
committerIgor Canadi <icanadi@fb.com>
Wed, 21 May 2014 18:43:35 +0000 (11:43 -0700)
Summary:
Introducing new compaction style -- FIFO.

FIFO compaction style has write amplification of 1 (+1 for WAL) and it deletes the oldest files when the total DB size exceeds pre-configured values.

FIFO compaction style is suited for storing high-frequency event logs.

Test Plan: Added a unit test

Reviewers: dhruba, haobo, sdong

Reviewed By: dhruba

Subscribers: alberts, leveldb

Differential Revision: https://reviews.facebook.net/D18765

13 files changed:
HISTORY.md
db/column_family.cc
db/compaction.cc
db/compaction.h
db/compaction_picker.cc
db/compaction_picker.h
db/db_impl.cc
db/db_impl_debug.cc
db/db_test.cc
db/version_set.cc
db/version_set.h
include/rocksdb/options.h
util/options.cc

index f9bdd7c9832740f325e7ca9b2db21590234cab48..43025b722e7c817e3f4c6153410608b63f29e28d 100644 (file)
@@ -7,6 +7,7 @@
 
 ### New Features
 * Hash index for block-based table will be materialized and reconstructed more efficiently. Previously hash index is constructed by scanning the whole table during every table open.
+* FIFO compaction style
 
 ## 3.0.0 (05/05/2014)
 
index 39c37b9e80f76766811cab6868d5384a03d3ed1a..9cf0c0d49a7b5769edf5c0117aab31d4b1239f90 100644 (file)
@@ -12,6 +12,7 @@
 #include <vector>
 #include <string>
 #include <algorithm>
+#include <limits>
 
 #include "db/db_impl.h"
 #include "db/version_set.h"
@@ -116,6 +117,15 @@ ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
   collector_factories.push_back(
       std::make_shared<InternalKeyPropertiesCollectorFactory>());
 
+  if (result.compaction_style == kCompactionStyleFIFO) {
+    result.num_levels = 1;
+    // since we delete level0 files in FIFO compaction when there are too many
+    // of them, these options don't really mean anything
+    result.level0_file_num_compaction_trigger = std::numeric_limits<int>::max();
+    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
+    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
+  }
+
   return result;
 }
 
@@ -196,7 +206,7 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
       options_(*db_options, SanitizeOptions(&internal_comparator_,
                                             &internal_filter_policy_, options)),
       mem_(nullptr),
-      imm_(options.min_write_buffer_number_to_merge),
+      imm_(options_.min_write_buffer_number_to_merge),
       super_version_(nullptr),
       super_version_number_(0),
       local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
@@ -209,16 +219,20 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
 
   // if dummy_versions is nullptr, then this is a dummy column family.
   if (dummy_versions != nullptr) {
-    internal_stats_.reset(new InternalStats(options.num_levels, db_options->env,
-                                            db_options->statistics.get()));
+    internal_stats_.reset(new InternalStats(
+        options_.num_levels, db_options->env, db_options->statistics.get()));
     table_cache_.reset(
         new TableCache(dbname, &options_, storage_options, table_cache));
     if (options_.compaction_style == kCompactionStyleUniversal) {
       compaction_picker_.reset(
           new UniversalCompactionPicker(&options_, &internal_comparator_));
-    } else {
+    } else if (options_.compaction_style == kCompactionStyleLevel) {
       compaction_picker_.reset(
           new LevelCompactionPicker(&options_, &internal_comparator_));
+    } else {
+      assert(options_.compaction_style == kCompactionStyleFIFO);
+      compaction_picker_.reset(
+          new FIFOCompactionPicker(&options_, &internal_comparator_));
     }
 
     Log(options_.info_log, "Options for column family \"%s\":\n",
index 962ce1232666e4e41e3ff933a9b94ca6f577867d..a8caa59efd2d4de9eb0cdbbe16cc2c03f79939bd 100644 (file)
@@ -29,7 +29,8 @@ static uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
 Compaction::Compaction(Version* input_version, int level, int out_level,
                        uint64_t target_file_size,
                        uint64_t max_grandparent_overlap_bytes,
-                       bool seek_compaction, bool enable_compression)
+                       bool seek_compaction, bool enable_compression,
+                       bool deletion_compaction)
     : level_(level),
       out_level_(out_level),
       max_output_file_size_(target_file_size),
@@ -39,6 +40,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level,
       cfd_(input_version_->cfd_),
       seek_compaction_(seek_compaction),
       enable_compression_(enable_compression),
+      deletion_compaction_(deletion_compaction),
       grandparent_index_(0),
       seen_key_(false),
       overlapped_bytes_(0),
@@ -83,6 +85,8 @@ bool Compaction::IsTrivialMove() const {
           TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_);
 }
 
+bool Compaction::IsDeletionCompaction() const { return deletion_compaction_; }
+
 void Compaction::AddInputDeletions(VersionEdit* edit) {
   for (int which = 0; which < 2; which++) {
     for (size_t i = 0; i < inputs_[which].size(); i++) {
@@ -92,6 +96,7 @@ void Compaction::AddInputDeletions(VersionEdit* edit) {
 }
 
 bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
+  assert(cfd_->options()->compaction_style != kCompactionStyleFIFO);
   if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
     return bottommost_level_;
   }
@@ -155,6 +160,7 @@ void Compaction::MarkFilesBeingCompacted(bool value) {
 
 // Is this compaction producing files at the bottommost level?
 void Compaction::SetupBottomMostLevel(bool isManual) {
+  assert(cfd_->options()->compaction_style != kCompactionStyleFIFO);
   if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
     // If universal compaction style is used and manual
     // compaction is occuring, then we are guaranteed that
index 8fd95f909a8831dd2c3e16077e8ae5c04784445e..aaa402303882e30a58cb8eff21f36acd312c9599 100644 (file)
@@ -54,6 +54,9 @@ class Compaction {
   // moving a single input file to the next level (no merging or splitting)
   bool IsTrivialMove() const;
 
+  // If true, just delete all files in inputs_[0]
+  bool IsDeletionCompaction() const;
+
   // Add all inputs to this compaction as delete operations to *edit.
   void AddInputDeletions(VersionEdit* edit);
 
@@ -91,11 +94,13 @@ class Compaction {
  private:
   friend class CompactionPicker;
   friend class UniversalCompactionPicker;
+  friend class FIFOCompactionPicker;
   friend class LevelCompactionPicker;
 
   Compaction(Version* input_version, int level, int out_level,
              uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes,
-             bool seek_compaction = false, bool enable_compression = true);
+             bool seek_compaction = false, bool enable_compression = true,
+             bool deletion_compaction = false);
 
   int level_;
   int out_level_; // levels to which output files are stored
@@ -108,6 +113,8 @@ class Compaction {
 
   bool seek_compaction_;
   bool enable_compression_;
+  // if true, just delete files in inputs_[0]
+  bool deletion_compaction_;
 
   // Each compaction reads inputs from "level_" and "level_+1"
   std::vector<FileMetaData*> inputs_[2];      // The two sets of inputs
index a8700bbbc3cbd9d0125aeb16370d9370d003a4db..3416a0bac9e691024ec8aa67e6619414ef44f0bb 100644 (file)
@@ -9,6 +9,8 @@
 
 #include "db/compaction_picker.h"
 
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
 #include <limits>
 #include "util/log_buffer.h"
 #include "util/statistics.h"
@@ -307,6 +309,9 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
                                            const InternalKey* begin,
                                            const InternalKey* end,
                                            InternalKey** compaction_end) {
+  // CompactionPickerFIFO has its own implementation of compact range
+  assert(options_->compaction_style != kCompactionStyleFIFO);
+
   std::vector<FileMetaData*> inputs;
   bool covering_the_whole_range = true;
 
@@ -886,4 +891,70 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
   return c;
 }
 
+Compaction* FIFOCompactionPicker::PickCompaction(Version* version,
+                                                 LogBuffer* log_buffer) {
+  assert(version->NumberLevels() == 1);
+  uint64_t total_size = 0;
+  for (const auto& file : version->files_[0]) {
+    total_size += file->file_size;
+  }
+
+  if (total_size <= options_->compaction_options_fifo.max_table_files_size ||
+      version->files_[0].size() == 0) {
+    // total size not exceeded
+    LogToBuffer(log_buffer,
+                "[%s] FIFO compaction: nothing to do. Total size %" PRIu64
+                ", max size %" PRIu64 "\n",
+                version->cfd_->GetName().c_str(), total_size,
+                options_->compaction_options_fifo.max_table_files_size);
+    return nullptr;
+  }
+
+  if (compactions_in_progress_[0].size() > 0) {
+    LogToBuffer(log_buffer,
+                "[%s] FIFO compaction: Already executing compaction. No need "
+                "to run parallel compactions since compactions are very fast",
+                version->cfd_->GetName().c_str());
+    return nullptr;
+  }
+
+  Compaction* c = new Compaction(version, 0, 0, 0, 0, false, false,
+                                 true /* is deletion compaction */);
+  // delete old files (FIFO)
+  for (auto ritr = version->files_[0].rbegin();
+       ritr != version->files_[0].rend(); ++ritr) {
+    auto f = *ritr;
+    total_size -= f->file_size;
+    c->inputs_[0].push_back(f);
+    char tmp_fsize[16];
+    AppendHumanBytes(f->file_size, tmp_fsize, sizeof(tmp_fsize));
+    LogToBuffer(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64
+                            " with size %s for deletion",
+                version->cfd_->GetName().c_str(), f->number, tmp_fsize);
+    if (total_size <= options_->compaction_options_fifo.max_table_files_size) {
+      break;
+    }
+  }
+
+  c->MarkFilesBeingCompacted(true);
+  compactions_in_progress_[0].insert(c);
+
+  return c;
+}
+
+Compaction* FIFOCompactionPicker::CompactRange(Version* version,
+                                               int input_level,
+                                               int output_level,
+                                               const InternalKey* begin,
+                                               const InternalKey* end,
+                                               InternalKey** compaction_end) {
+  assert(input_level == 0);
+  assert(output_level == 0);
+  *compaction_end = nullptr;
+  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, options_->info_log.get());
+  auto c = PickCompaction(version, &log_buffer);
+  log_buffer.FlushBufferToLog();
+  return c;
+}
+
 }  // namespace rocksdb
index 6527ef9677bd46d168c1217108f2747f4a3a86bb..65b1bc37ace50b71d92a32a8cae439c9bfb22051 100644 (file)
@@ -47,9 +47,10 @@ class CompactionPicker {
   // compaction_end will be set to nullptr.
   // Client is responsible for compaction_end storage -- when called,
   // *compaction_end should point to valid InternalKey!
-  Compaction* CompactRange(Version* version, int input_level, int output_level,
-                           const InternalKey* begin, const InternalKey* end,
-                           InternalKey** compaction_end);
+  virtual Compaction* CompactRange(Version* version, int input_level,
+                                   int output_level, const InternalKey* begin,
+                                   const InternalKey* end,
+                                   InternalKey** compaction_end);
 
   // Free up the files that participated in a compaction
   void ReleaseCompactionFiles(Compaction* c, Status status);
@@ -162,4 +163,19 @@ class LevelCompactionPicker : public CompactionPicker {
   Compaction* PickCompactionBySize(Version* version, int level, double score);
 };
 
+class FIFOCompactionPicker : public CompactionPicker {
+ public:
+  FIFOCompactionPicker(const Options* options,
+                       const InternalKeyComparator* icmp)
+      : CompactionPicker(options, icmp) {}
+
+  virtual Compaction* PickCompaction(Version* version,
+                                     LogBuffer* log_buffer) override;
+
+  virtual Compaction* CompactRange(Version* version, int input_level,
+                                   int output_level, const InternalKey* begin,
+                                   const InternalKey* end,
+                                   InternalKey** compaction_end) override;
+};
+
 }  // namespace rocksdb
index bdc1832dc33e2b71612994e2c8378fa7583aef14..f8744276749ea856113d086833e296af69086a59 100644 (file)
@@ -1590,7 +1590,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
     return s;
   }
 
-  int max_level_with_files = 1;
+  int max_level_with_files = 0;
   {
     MutexLock l(&mutex_);
     Version* base = cfd->current();
@@ -1604,6 +1604,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
     // in case the compaction is unversal or if we're compacting the
     // bottom-most level, the output level will be the same as input one
     if (cfd->options()->compaction_style == kCompactionStyleUniversal ||
+        cfd->options()->compaction_style == kCompactionStyleFIFO ||
         level == max_level_with_files) {
       s = RunManualCompaction(cfd, level, level, begin, end);
     } else {
@@ -1754,14 +1755,16 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
   // For universal compaction, we enforce every manual compaction to compact
   // all files.
   if (begin == nullptr ||
-      cfd->options()->compaction_style == kCompactionStyleUniversal) {
+      cfd->options()->compaction_style == kCompactionStyleUniversal ||
+      cfd->options()->compaction_style == kCompactionStyleFIFO) {
     manual.begin = nullptr;
   } else {
     begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
     manual.begin = &begin_storage;
   }
   if (end == nullptr ||
-      cfd->options()->compaction_style == kCompactionStyleUniversal) {
+      cfd->options()->compaction_style == kCompactionStyleUniversal ||
+      cfd->options()->compaction_style == kCompactionStyleFIFO) {
     manual.end = nullptr;
   } else {
     end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
@@ -2150,6 +2153,24 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
   if (!c) {
     // Nothing to do
     LogToBuffer(log_buffer, "Compaction nothing to do");
+  } else if (c->IsDeletionCompaction()) {
+    // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
+    // file if there is alive snapshot pointing to it
+    assert(c->num_input_files(1) == 0);
+    assert(c->level() == 0);
+    assert(c->column_family_data()->options()->compaction_style ==
+           kCompactionStyleFIFO);
+    for (const auto& f : *c->inputs(0)) {
+      c->edit()->DeleteFile(c->level(), f->number);
+    }
+    status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_,
+                                    db_directory_.get());
+    InstallSuperVersion(c->column_family_data(), deletion_state);
+    LogToBuffer(log_buffer, "[%s] Deleted %d files\n",
+                c->column_family_data()->GetName().c_str(),
+                c->num_input_files(0));
+    c->ReleaseCompactionFiles(status);
+    *madeProgress = true;
   } else if (!is_manual && c->IsTrivialMove()) {
     // Move file to next level
     assert(c->num_input_files(0) == 1);
@@ -2219,8 +2240,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
     if (!m->done) {
       // We only compacted part of the requested range.  Update *m
       // to the range that is left to be compacted.
-      // Universal compaction should always compact the whole range
+      // Universal and FIFO compactions should always compact the whole range
       assert(m->cfd->options()->compaction_style != kCompactionStyleUniversal);
+      assert(m->cfd->options()->compaction_style != kCompactionStyleFIFO);
       m->tmp_storage = *manual_end;
       m->begin = &m->tmp_storage;
     }
@@ -4468,13 +4490,15 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
 
   if (s.ok()) {
     for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
-      if (cfd->options()->compaction_style == kCompactionStyleUniversal) {
+      if (cfd->options()->compaction_style == kCompactionStyleUniversal ||
+          cfd->options()->compaction_style == kCompactionStyleFIFO) {
         Version* current = cfd->current();
         for (int i = 1; i < current->NumberLevels(); ++i) {
           int num_files = current->NumLevelFiles(i);
           if (num_files > 0) {
-            s = Status::InvalidArgument("Not all files are at level 0. Cannot "
-                "open with universal compaction style.");
+            s = Status::InvalidArgument(
+                "Not all files are at level 0. Cannot "
+                "open with universal or FIFO compaction style.");
             break;
           }
         }
index d6551b45a4886187cbbe77d539d8189070b58f6c..927a01a043ee0856c76de7d02f64f41be3700e27 100644 (file)
@@ -81,7 +81,8 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin,
     cfd = cfh->cfd();
   }
   int output_level =
-      (cfd->options()->compaction_style == kCompactionStyleUniversal)
+      (cfd->options()->compaction_style == kCompactionStyleUniversal ||
+       cfd->options()->compaction_style == kCompactionStyleFIFO)
           ? level
           : level + 1;
   return RunManualCompaction(cfd, level, output_level, begin, end);
index 05403fc070ba19a18feab1c4b0d394c8913b4c37..5e30b33f7e9ccf732dd9e6a6f236aa4e4fda2371 100644 (file)
@@ -317,6 +317,7 @@ class DBTest {
     kCompressedBlockCache,
     kInfiniteMaxOpenFiles,
     kxxHashChecksum,
+    kFIFOCompaction,
     kEnd
   };
   int option_config_;
@@ -339,7 +340,8 @@ class DBTest {
     kSkipPlainTable = 8,
     kSkipHashIndex = 16,
     kSkipNoSeekToLast = 32,
-    kSkipHashCuckoo = 64
+    kSkipHashCuckoo = 64,
+    kSkipFIFOCompaction = 128,
   };
 
   DBTest() : option_config_(kDefault),
@@ -391,6 +393,10 @@ class DBTest {
       if ((skip_mask & kSkipHashCuckoo) && (option_config_ == kHashCuckoo)) {
         continue;
       }
+      if ((skip_mask & kSkipFIFOCompaction) &&
+          option_config_ == kFIFOCompaction) {
+        continue;
+      }
       break;
     }
 
@@ -503,6 +509,10 @@ class DBTest {
         options.table_factory.reset(NewBlockBasedTableFactory(table_options));
         break;
       }
+      case kFIFOCompaction: {
+        options.compaction_style = kCompactionStyleFIFO;
+        break;
+      }
       case kBlockBasedTableWithPrefixHashIndex: {
         BlockBasedTableOptions table_options;
         table_options.index_type = BlockBasedTableOptions::kHashSearch;
@@ -1394,7 +1404,7 @@ TEST(DBTest, GetEncountersEmptyLevel) {
     env_->SleepForMicroseconds(1000000);
 
     ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1);  // XXX
-  } while (ChangeOptions(kSkipUniversalCompaction));
+  } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction));
 }
 
 // KeyMayExist can lead to a few false positives, but not false negatives.
@@ -1460,7 +1470,8 @@ TEST(DBTest, KeyMayExist) {
 
     // KeyMayExist function only checks data in block caches, which is not used
     // by plain table format.
-  } while (ChangeOptions(kSkipPlainTable | kSkipHashIndex));
+  } while (
+      ChangeOptions(kSkipPlainTable | kSkipHashIndex | kSkipFIFOCompaction));
 }
 
 TEST(DBTest, NonBlockingIteration) {
@@ -4387,7 +4398,8 @@ TEST(DBTest, ApproximateSizes) {
       ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
     }
     // ApproximateOffsetOf() is not yet implemented in plain table format.
-  } while (ChangeOptions(kSkipUniversalCompaction | kSkipPlainTable));
+  } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction |
+                         kSkipPlainTable));
 }
 
 TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
@@ -4531,8 +4543,8 @@ TEST(DBTest, HiddenValuesAreRemoved) {
     // ApproximateOffsetOf() is not yet implemented in plain table format,
     // which is used by Size().
     // skip HashCuckooRep as it does not support snapshot
-  } while (ChangeOptions(kSkipUniversalCompaction | kSkipPlainTable |
-                         kSkipHashCuckoo));
+  } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction |
+                         kSkipPlainTable | kSkipHashCuckoo));
 }
 
 TEST(DBTest, CompactBetweenSnapshots) {
@@ -4588,7 +4600,7 @@ TEST(DBTest, CompactBetweenSnapshots) {
     ASSERT_EQ("sixth", Get(1, "foo"));
     ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
     // skip HashCuckooRep as it does not support snapshot
-  } while (ChangeOptions(kSkipHashCuckoo));
+  } while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction));
 }
 
 TEST(DBTest, DeletionMarkers1) {
@@ -4694,7 +4706,7 @@ TEST(DBTest, OverlapInLevel0) {
     Flush(1);
     ASSERT_EQ("3", FilesPerLevel(1));
     ASSERT_EQ("NOT_FOUND", Get(1, "600"));
-  } while (ChangeOptions(kSkipUniversalCompaction));
+  } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction));
 }
 
 TEST(DBTest, L0_CompactionBug_Issue44_a) {
@@ -6797,6 +6809,42 @@ TEST(DBTest, ChecksumTest) {
   ASSERT_EQ("f", Get("e"));
   ASSERT_EQ("h", Get("g"));
 }
+
+TEST(DBTest, FIFOCompactionTest) {
+  for (int iter = 0; iter < 2; ++iter) {
+    // first iteration -- auto compaction
+    // second iteration -- manual compaction
+    Options options;
+    options.compaction_style = kCompactionStyleFIFO;
+    options.write_buffer_size = 100 << 10;                             // 100KB
+    options.compaction_options_fifo.max_table_files_size = 500 << 10;  // 500KB
+    options.compression = kNoCompression;
+    options.create_if_missing = true;
+    if (iter == 1) {
+      options.disable_auto_compactions = true;
+    }
+    DestroyAndReopen(&options);
+
+    Random rnd(301);
+    for (int i = 0; i < 6; ++i) {
+      for (int j = 0; j < 100; ++j) {
+        ASSERT_OK(Put(std::to_string(i * 100 + j), RandomString(&rnd, 1024)));
+      }
+      // flush should happen here
+    }
+    if (iter == 0) {
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    } else {
+      ASSERT_OK(db_->CompactRange(nullptr, nullptr));
+    }
+    // only 5 files should survive
+    ASSERT_EQ(NumTableFilesAtLevel(0), 5);
+    for (int i = 0; i < 50; ++i) {
+      // these keys should be deleted in previous compaction
+      ASSERT_EQ("NOT_FOUND", Get(std::to_string(i)));
+    }
+  }
+}
 }  // namespace rocksdb
 
 int main(int argc, char** argv) {
index 02e9aa152372590fe8c4ad4d3c0e724ae3a1c253..5327cf55fd990e668f788bfa237586562dd3815c 100644 (file)
@@ -711,7 +711,8 @@ void Version::ComputeCompactionScore(
   int max_score_level = 0;
 
   int num_levels_to_check =
-      (cfd_->options()->compaction_style != kCompactionStyleUniversal)
+      (cfd_->options()->compaction_style != kCompactionStyleUniversal &&
+       cfd_->options()->compaction_style != kCompactionStyleFIFO)
           ? NumberLevels() - 1
           : 1;
 
@@ -730,14 +731,18 @@ void Version::ComputeCompactionScore(
       // setting, or very high compression ratios, or lots of
       // overwrites/deletions).
       int numfiles = 0;
+      uint64_t total_size = 0;
       for (unsigned int i = 0; i < files_[level].size(); i++) {
         if (!files_[level][i]->being_compacted) {
+          total_size += files_[level][i]->file_size;
           numfiles++;
         }
       }
-
-      // If we are slowing down writes, then we better compact that first
-      if (numfiles >= cfd_->options()->level0_stop_writes_trigger) {
+      if (cfd_->options()->compaction_style == kCompactionStyleFIFO) {
+        score = static_cast<double>(total_size) /
+                cfd_->options()->compaction_options_fifo.max_table_files_size;
+      } else if (numfiles >= cfd_->options()->level0_stop_writes_trigger) {
+        // If we are slowing down writes, then we better compact that first
         score = 1000000;
       } else if (numfiles >= cfd_->options()->level0_slowdown_writes_trigger) {
         score = 10000;
@@ -803,6 +808,10 @@ bool CompareSeqnoDescending(const Version::Fsize& first,
 } // anonymous namespace
 
 void Version::UpdateFilesBySize() {
+  if (cfd_->options()->compaction_style == kCompactionStyleFIFO) {
+    // don't need this
+    return;
+  }
   // No need to sort the highest level because it is never compacted.
   int max_level =
       (cfd_->options()->compaction_style == kCompactionStyleUniversal)
@@ -871,7 +880,8 @@ bool Version::NeedsCompaction() const {
   // TODO(sdong): improve this function to be accurate for universal
   //              compactions.
   int num_levels_to_check =
-      (cfd_->options()->compaction_style != kCompactionStyleUniversal)
+      (cfd_->options()->compaction_style != kCompactionStyleUniversal &&
+       cfd_->options()->compaction_style != kCompactionStyleFIFO)
           ? NumberLevels() - 1
           : 1;
   for (int i = 0; i < num_levels_to_check; i++) {
@@ -1253,7 +1263,7 @@ struct VersionSet::ManifestWriter {
 class VersionSet::Builder {
  private:
   // Helper to sort v->files_
-  // kLevel0LevelCompaction -- NewestFirst
+  // kLevel0LevelCompaction -- NewestFirst (also used for FIFO compaction)
   // kLevel0UniversalCompaction -- NewestFirstBySeqNo
   // kLevelNon0 -- BySmallestKey
   struct FileComparator {
index 13a138341b742651151b4f6524de39df69eaddb5..ffadb5813302ddd8dd536788ea7b40275ffb6b3e 100644 (file)
@@ -217,6 +217,7 @@ class Version {
   friend class CompactionPicker;
   friend class LevelCompactionPicker;
   friend class UniversalCompactionPicker;
+  friend class FIFOCompactionPicker;
 
   class LevelFileNumIterator;
   class LevelFileIteratorState;
index e26ecde516373333cb21074fbb53ce8017bfeabf..9ba6a522cfa07b4ea4ff0a91040ae1a599ad764f 100644 (file)
@@ -53,8 +53,18 @@ enum CompressionType : char {
 };
 
 enum CompactionStyle : char {
-  kCompactionStyleLevel = 0x0,     // level based compaction style
-  kCompactionStyleUniversal = 0x1  // Universal compaction style
+  kCompactionStyleLevel = 0x0,      // level based compaction style
+  kCompactionStyleUniversal = 0x1,  // Universal compaction style
+  kCompactionStyleFIFO = 0x2,       // FIFO compaction style
+};
+
+struct CompactionOptionsFIFO {
+  // once the total sum of table files reaches this, we will delete the oldest
+  // table file
+  // Default: 1GB
+  uint64_t max_table_files_size;
+
+  CompactionOptionsFIFO() : max_table_files_size(1 * 1024 * 1024 * 1024) {}
 };
 
 // Compression options for different compression algorithms like Zlib
@@ -429,6 +439,9 @@ struct ColumnFamilyOptions {
   // The options needed to support Universal Style compactions
   CompactionOptionsUniversal compaction_options_universal;
 
+  // The options for FIFO compaction style
+  CompactionOptionsFIFO compaction_options_fifo;
+
   // Use KeyMayExist API to filter deletes when this is true.
   // If KeyMayExist returns false, i.e. the key definitely does not exist, then
   // the delete is a noop. KeyMayExist only incurs in-memory look up.
index 22952f587b0b79812f786fd2a8217b5d7905fb21..4fe8b219ecd0ff49153b0a737bfa42136a13699f 100644 (file)
@@ -135,6 +135,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
       compaction_style(options.compaction_style),
       verify_checksums_in_compaction(options.verify_checksums_in_compaction),
       compaction_options_universal(options.compaction_options_universal),
+      compaction_options_fifo(options.compaction_options_fifo),
       filter_deletes(options.filter_deletes),
       max_sequential_skip_in_iterations(
           options.max_sequential_skip_in_iterations),
@@ -413,6 +414,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
     Log(log,
         "Options.compaction_options_universal.compression_size_percent: %u",
         compaction_options_universal.compression_size_percent);
+    Log(log, "Options.compaction_options_fifo.max_table_files_size: %" PRIu64,
+        compaction_options_fifo.max_table_files_size);
     std::string collector_names;
     for (const auto& collector_factory : table_properties_collector_factories) {
       collector_names.append(collector_factory->Name());