]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Move old files to warm tier in FIFO compactions (#8310)
authorsdong <siying.d@fb.com>
Mon, 9 Aug 2021 19:50:19 +0000 (12:50 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Mon, 9 Aug 2021 19:51:14 +0000 (12:51 -0700)
Summary:
Some FIFO users want to keep the data for longer, but the old data is rarely accessed. This feature allows users to configure FIFO compaction so that data older than a threshold is moved to a warm storage tier.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8310

Test Plan: Add several unit tests.

Reviewed By: ajkr

Differential Revision: D28493792

fbshipit-source-id: c14824ea634814dee5278b449ab5c98b6e0b5501

21 files changed:
HISTORY.md
db/compaction/compaction.cc
db/compaction/compaction.h
db/compaction/compaction_job.cc
db/compaction/compaction_job_test.cc
db/compaction/compaction_picker.cc
db/compaction/compaction_picker_fifo.cc
db/compaction/compaction_picker_fifo.h
db/compaction/compaction_picker_level.cc
db/compaction/compaction_picker_test.cc
db/compaction/compaction_picker_universal.cc
db/db_compaction_test.cc
db/version_set.cc
file/read_write_util.cc
include/rocksdb/advanced_options.h
include/rocksdb/listener.h
java/rocksjni/portal.h
java/src/main/java/org/rocksdb/CompactionReason.java
options/cf_options.cc
options/options_settable_test.cc
tools/db_bench_tool.cc

index 29955d31ada7e6e9d27d53302c9f8487623dcbe3..d6a6b5ce25ac8ed36948a82c8169e2b55fb46e41 100644 (file)
@@ -13,6 +13,7 @@
 * EventListeners that have a non-empty Name() and that are registered with the ObjectRegistry can now be serialized to/from the OPTIONS file.
 * Insert warm blocks (data blocks, uncompressed dict blocks, index and filter blocks) in Block cache during flush under option BlockBasedTableOptions.prepopulate_block_cache. Previously it was enabled for only data blocks.
 * BlockBasedTableOptions.prepopulate_block_cache can be dynamically configured using DB::SetOptions.
+* Add CompactionOptionsFIFO.age_for_warm, which allows RocksDB to move old files to warm tier in FIFO compactions. Note that file temperature is still an experimental feature.
 
 ### Performance Improvements
 * Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value.
index f2de4f0e86e129cb984d8ee52b045f9dd68cde05..1f5ab9c7d192af4dc4fe57b6e2275253a18229cb 100644 (file)
@@ -211,9 +211,9 @@ Compaction::Compaction(
     std::vector<CompactionInputFiles> _inputs, int _output_level,
     uint64_t _target_file_size, uint64_t _max_compaction_bytes,
     uint32_t _output_path_id, CompressionType _compression,
-    CompressionOptions _compression_opts, uint32_t _max_subcompactions,
-    std::vector<FileMetaData*> _grandparents, bool _manual_compaction,
-    double _score, bool _deletion_compaction,
+    CompressionOptions _compression_opts, Temperature _output_temperature,
+    uint32_t _max_subcompactions, std::vector<FileMetaData*> _grandparents,
+    bool _manual_compaction, double _score, bool _deletion_compaction,
     CompactionReason _compaction_reason)
     : input_vstorage_(vstorage),
       start_level_(_inputs[0].level),
@@ -229,6 +229,7 @@ Compaction::Compaction(
       output_path_id_(_output_path_id),
       output_compression_(_compression),
       output_compression_opts_(_compression_opts),
+      output_temperature_(_output_temperature),
       deletion_compaction_(_deletion_compaction),
       inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))),
       grandparents_(std::move(_grandparents)),
@@ -308,6 +309,12 @@ bool Compaction::IsTrivialMove() const {
     return false;
   }
 
+  if (start_level_ == output_level_) {
+    // It doesn't make sense if compaction picker picks files just to trivial
+    // move to the same level.
+    return false;
+  }
+
   // Used in universal compaction, where trivial move can be done if the
   // input files are non overlapping
   if ((mutable_cf_options_.compaction_options_universal.allow_trivial_move) &&
index 7854c1c7a976274a577380db363266b5489794fb..6791ee5a24e4ad066aaa6dc9132d24c4cdf92e1b 100644 (file)
@@ -76,7 +76,8 @@ class Compaction {
              std::vector<CompactionInputFiles> inputs, int output_level,
              uint64_t target_file_size, uint64_t max_compaction_bytes,
              uint32_t output_path_id, CompressionType compression,
-             CompressionOptions compression_opts, uint32_t max_subcompactions,
+             CompressionOptions compression_opts,
+             Temperature output_temperature, uint32_t max_subcompactions,
              std::vector<FileMetaData*> grandparents,
              bool manual_compaction = false, double score = -1,
              bool deletion_compaction = false,
@@ -299,6 +300,8 @@ class Compaction {
 
   uint64_t max_compaction_bytes() const { return max_compaction_bytes_; }
 
+  Temperature output_temperature() const { return output_temperature_; }
+
   uint32_t max_subcompactions() const { return max_subcompactions_; }
 
   uint64_t MinInputFileOldestAncesterTime() const;
@@ -356,6 +359,7 @@ class Compaction {
   const uint32_t output_path_id_;
   CompressionType output_compression_;
   CompressionOptions output_compression_opts_;
+  Temperature output_temperature_;
   // If true, then the compaction can be done by simply deleting input files.
   const bool deletion_compaction_;
 
index 071ec6cf8b10532ba02067061f69247ee101109f..e6ff030fb7de6c7a8c97a970c39cadb60a827eb3 100644 (file)
@@ -107,6 +107,8 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) {
       return "ExternalSstIngestion";
     case CompactionReason::kPeriodicCompaction:
       return "PeriodicCompaction";
+    case CompactionReason::kChangeTemperature:
+      return "ChangeTemperature";
     case CompactionReason::kNumOfReasons:
       // fall through
     default:
@@ -1927,11 +1929,12 @@ Status CompactionJob::OpenCompactionOutputFile(
 
   // Pass temperature of botommost files to FileSystem.
   FileOptions fo_copy = file_options_;
-  Temperature temperature = Temperature::kUnknown;
-  if (bottommost_level_) {
-    fo_copy.temperature = temperature =
+  Temperature temperature = sub_compact->compaction->output_temperature();
+  if (temperature == Temperature::kUnknown && bottommost_level_) {
+    temperature =
         sub_compact->compaction->mutable_cf_options()->bottommost_temperature;
   }
+  fo_copy.temperature = temperature;
 
   Status s;
   IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy);
index 7437f1249ff7225aa04bf58c2c7d69ebb3750fdb..b4e96538718e2e60fb6bc1823b373e68ab5d3aa4 100644 (file)
@@ -334,8 +334,8 @@ class CompactionJobTestBase : public testing::Test {
         cfd->current()->storage_info(), *cfd->ioptions(),
         *cfd->GetLatestMutableCFOptions(), mutable_db_options_,
         compaction_input_files, output_level, 1024 * 1024, 10 * 1024 * 1024, 0,
-        kNoCompression, cfd->GetLatestMutableCFOptions()->compression_opts, 0,
-        {}, true);
+        kNoCompression, cfd->GetLatestMutableCFOptions()->compression_opts,
+        Temperature::kUnknown, 0, {}, true);
     compaction.SetInputVersion(cfd->current());
 
     LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
index 6d109213420bf6732c191aba8fe5e6ce951ffc22..fe2fea1efe8798a0ef5c8cb289620ca94efde3d8 100644 (file)
@@ -358,7 +358,7 @@ Compaction* CompactionPicker::CompactFiles(
       output_level, compact_options.output_file_size_limit,
       mutable_cf_options.max_compaction_bytes, output_path_id, compression_type,
       GetCompressionOptions(mutable_cf_options, vstorage, output_level),
-      compact_options.max_subcompactions,
+      Temperature::kUnknown, compact_options.max_subcompactions,
       /* grandparents */ {}, true);
   RegisterCompaction(c);
   return c;
@@ -634,7 +634,8 @@ Compaction* CompactionPicker::CompactRange(
         GetCompressionType(ioptions_, vstorage, mutable_cf_options,
                            output_level, 1),
         GetCompressionOptions(mutable_cf_options, vstorage, output_level),
-        compact_range_options.max_subcompactions, /* grandparents */ {},
+        Temperature::kUnknown, compact_range_options.max_subcompactions,
+        /* grandparents */ {},
         /* is manual */ true);
     RegisterCompaction(c);
     vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options);
@@ -812,7 +813,8 @@ Compaction* CompactionPicker::CompactRange(
       GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
                          vstorage->base_level()),
       GetCompressionOptions(mutable_cf_options, vstorage, output_level),
-      compact_range_options.max_subcompactions, std::move(grandparents),
+      Temperature::kUnknown, compact_range_options.max_subcompactions,
+      std::move(grandparents),
       /* is manual compaction */ true);
 
   TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction);
index 4b4c09b80f0297493a57b2c470e9346ef4e0a48c..01932f483c2e47fef270fedc2c36fd9301d9e88a 100644 (file)
@@ -111,7 +111,7 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
   Compaction* c = new Compaction(
       vstorage, ioptions_, mutable_cf_options, mutable_db_options,
       std::move(inputs), 0, 0, 0, 0, kNoCompression,
-      mutable_cf_options.compression_opts,
+      mutable_cf_options.compression_opts, Temperature::kUnknown,
       /* max_subcompactions */ 0, {}, /* is manual */ false,
       vstorage->CompactionScore(0),
       /* is deletion compaction */ true, CompactionReason::kFIFOTtl);
@@ -154,7 +154,8 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
             {comp_inputs}, 0, 16 * 1024 * 1024 /* output file size limit */,
             0 /* max compaction bytes, not applicable */,
             0 /* output path ID */, mutable_cf_options.compression,
-            mutable_cf_options.compression_opts, 0 /* max_subcompactions */, {},
+            mutable_cf_options.compression_opts, Temperature::kUnknown,
+            0 /* max_subcompactions */, {},
             /* is manual */ false, vstorage->CompactionScore(0),
             /* is deletion compaction */ false,
             CompactionReason::kFIFOReduceNumFiles);
@@ -203,13 +204,119 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
   Compaction* c = new Compaction(
       vstorage, ioptions_, mutable_cf_options, mutable_db_options,
       std::move(inputs), 0, 0, 0, 0, kNoCompression,
-      mutable_cf_options.compression_opts,
+      mutable_cf_options.compression_opts, Temperature::kUnknown,
       /* max_subcompactions */ 0, {}, /* is manual */ false,
       vstorage->CompactionScore(0),
       /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
   return c;
 }
 
+Compaction* FIFOCompactionPicker::PickCompactionToWarm(
+    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
+    const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
+    LogBuffer* log_buffer) {
+  if (mutable_cf_options.compaction_options_fifo.age_for_warm == 0) {
+    return nullptr;
+  }
+
+  const int kLevel0 = 0;
+  const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
+
+  int64_t _current_time;
+  auto status = ioptions_.clock->GetCurrentTime(&_current_time);
+  if (!status.ok()) {
+    ROCKS_LOG_BUFFER(log_buffer,
+                     "[%s] FIFO compaction: Couldn't get current time: %s. "
+                     "Not doing compactions based on warm threshold. ",
+                     cf_name.c_str(), status.ToString().c_str());
+    return nullptr;
+  }
+  const uint64_t current_time = static_cast<uint64_t>(_current_time);
+
+  if (!level0_compactions_in_progress_.empty()) {
+    ROCKS_LOG_BUFFER(
+        log_buffer,
+        "[%s] FIFO compaction: Already executing compaction. Parallel "
+        "compactions are not supported",
+        cf_name.c_str());
+    return nullptr;
+  }
+
+  std::vector<CompactionInputFiles> inputs;
+  inputs.emplace_back();
+  inputs[0].level = 0;
+
+  // avoid underflow
+  if (current_time > mutable_cf_options.compaction_options_fifo.age_for_warm) {
+    uint64_t create_time_threshold =
+        current_time - mutable_cf_options.compaction_options_fifo.age_for_warm;
+    uint64_t compaction_size = 0;
+    // We will ideally identify a file qualifying for warm tier by knowing
+    // the timestamp for the youngest entry in the file. However, right now
+    // we don't have the information. We infer it by looking at timestamp
+    // of the next file's (which is just younger) oldest entry's timestamp.
+    FileMetaData* prev_file = nullptr;
+    for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
+      FileMetaData* f = *ritr;
+      assert(f);
+      if (f->being_compacted) {
+        // Right now this probably won't happen as we never try to schedule
+        // two compactions in parallel, so here we just simply don't schedule
+        // anything.
+        return nullptr;
+      }
+      uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
+      if (oldest_ancester_time == kUnknownOldestAncesterTime) {
+        // Older files might not have enough information. It is possible to
+        // handle these files by looking at newer files, but maintaining the
+        // logic isn't worth it.
+        break;
+      }
+      if (oldest_ancester_time > create_time_threshold) {
+        // The previous file (which has slightly older data) doesn't qualify
+        // for warm tier.
+        break;
+      }
+      if (prev_file != nullptr) {
+        compaction_size += prev_file->fd.GetFileSize();
+        if (compaction_size > mutable_cf_options.max_compaction_bytes) {
+          break;
+        }
+        inputs[0].files.push_back(prev_file);
+        ROCKS_LOG_BUFFER(log_buffer,
+                         "[%s] FIFO compaction: picking file %" PRIu64
+                         " with next file's oldest time %" PRIu64 " for warm",
+                         cf_name.c_str(), prev_file->fd.GetNumber(),
+                         oldest_ancester_time);
+      }
+      if (f->temperature == Temperature::kUnknown ||
+          f->temperature == Temperature::kHot) {
+        prev_file = f;
+      } else if (!inputs[0].files.empty()) {
+        // A warm file newer than files picked.
+        break;
+      } else {
+        assert(prev_file == nullptr);
+      }
+    }
+  }
+
+  if (inputs[0].files.empty()) {
+    return nullptr;
+  }
+
+  Compaction* c = new Compaction(
+      vstorage, ioptions_, mutable_cf_options, mutable_db_options,
+      std::move(inputs), 0, 0 /* output file size limit */,
+      0 /* max compaction bytes, not applicable */, 0 /* output path ID */,
+      mutable_cf_options.compression, mutable_cf_options.compression_opts,
+      Temperature::kWarm,
+      /* max_subcompactions */ 0, {}, /* is manual */ false,
+      vstorage->CompactionScore(0),
+      /* is deletion compaction */ false, CompactionReason::kChangeTemperature);
+  return c;
+}
+
 Compaction* FIFOCompactionPicker::PickCompaction(
     const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
     const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
@@ -225,6 +332,10 @@ Compaction* FIFOCompactionPicker::PickCompaction(
     c = PickSizeCompaction(cf_name, mutable_cf_options, mutable_db_options,
                            vstorage, log_buffer);
   }
+  if (c == nullptr) {
+    c = PickCompactionToWarm(cf_name, mutable_cf_options, mutable_db_options,
+                             vstorage, log_buffer);
+  }
   RegisterCompaction(c);
   return c;
 }
index 2a07f8df77692ecf334ca2ac34d18b14638e03c0..b0d58aa9d026fa25519148c6ccefc989ea81c169 100644 (file)
@@ -52,6 +52,12 @@ class FIFOCompactionPicker : public CompactionPicker {
                                  const MutableDBOptions& mutable_db_options,
                                  VersionStorageInfo* version,
                                  LogBuffer* log_buffer);
+
+  Compaction* PickCompactionToWarm(const std::string& cf_name,
+                                   const MutableCFOptions& mutable_cf_options,
+                                   const MutableDBOptions& mutable_db_options,
+                                   VersionStorageInfo* version,
+                                   LogBuffer* log_buffer);
 };
 }  // namespace ROCKSDB_NAMESPACE
 #endif  // !ROCKSDB_LITE
index 08c48c8f0b5605e1152e74c7dd065796a652d3b0..28c565b1b7e81d5bbc9fa9b21b2462be846ec407 100644 (file)
@@ -336,6 +336,7 @@ Compaction* LevelCompactionBuilder::GetCompaction() {
       GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
                          output_level_, vstorage_->base_level()),
       GetCompressionOptions(mutable_cf_options_, vstorage_, output_level_),
+      Temperature::kUnknown,
       /* max_subcompactions */ 0, std::move(grandparents_), is_manual_,
       start_level_score_, false /* deletion_compaction */, compaction_reason_);
 
index 5d543048f5a586cceb5e303746c4c01c5d6670e1..bce3a2076ee0527e608b94fb4fa58ad333913738 100644 (file)
@@ -98,7 +98,9 @@ class CompactionPickerTest : public testing::Test {
   void Add(int level, uint32_t file_number, const char* smallest,
            const char* largest, uint64_t file_size = 1, uint32_t path_id = 0,
            SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100,
-           size_t compensated_file_size = 0, bool marked_for_compact = false) {
+           size_t compensated_file_size = 0, bool marked_for_compact = false,
+           Temperature temperature = Temperature::kUnknown,
+           uint64_t oldest_ancestor_time = kUnknownOldestAncesterTime) {
     VersionStorageInfo* vstorage;
     if (temp_vstorage_) {
       vstorage = temp_vstorage_.get();
@@ -115,6 +117,8 @@ class CompactionPickerTest : public testing::Test {
         kUnknownFileChecksum, kUnknownFileChecksumFuncName);
     f->compensated_file_size =
         (compensated_file_size != 0) ? compensated_file_size : file_size;
+    f->temperature = temperature;
+    f->oldest_ancester_time = oldest_ancestor_time;
     vstorage->AddFile(level, f);
     files_.emplace_back(f);
     file_map_.insert({file_number, {f, level}});
@@ -757,6 +761,245 @@ TEST_F(CompactionPickerTest, NeedsCompactionFIFO) {
               vstorage_->CompactionScore(0) >= 1);
   }
 }
+
+TEST_F(CompactionPickerTest, FIFOToWarm1) {
+  NewVersionStorage(1, kCompactionStyleFIFO);
+  const uint64_t kFileSize = 100000;
+  const uint64_t kMaxSize = kFileSize * 100000;
+  uint64_t kWarmThreshold = 2000;
+
+  fifo_options_.max_table_files_size = kMaxSize;
+  fifo_options_.age_for_warm = kWarmThreshold;
+  mutable_cf_options_.compaction_options_fifo = fifo_options_;
+  mutable_cf_options_.level0_file_num_compaction_trigger = 2;
+  mutable_cf_options_.max_compaction_bytes = kFileSize * 100;
+  FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_);
+
+  int64_t current_time = 0;
+  ASSERT_OK(Env::Default()->GetCurrentTime(&current_time));
+  uint64_t threshold_time =
+      static_cast<uint64_t>(current_time) - kWarmThreshold;
+  Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true,
+      Temperature::kUnknown, static_cast<uint64_t>(current_time) - 100);
+  Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true,
+      Temperature::kUnknown, threshold_time + 100);
+  Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true,
+      Temperature::kUnknown, threshold_time - 2000);
+  Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true,
+      Temperature::kUnknown, threshold_time - 3000);
+  UpdateVersionStorageInfo();
+
+  ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
+  std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
+  ASSERT_TRUE(compaction.get() != nullptr);
+  ASSERT_EQ(1U, compaction->num_input_files(0));
+  ASSERT_EQ(3U, compaction->input(0, 0)->fd.GetNumber());
+}
+
+TEST_F(CompactionPickerTest, FIFOToWarm2) {
+  NewVersionStorage(1, kCompactionStyleFIFO);
+  const uint64_t kFileSize = 100000;
+  const uint64_t kMaxSize = kFileSize * 100000;
+  uint64_t kWarmThreshold = 2000;
+
+  fifo_options_.max_table_files_size = kMaxSize;
+  fifo_options_.age_for_warm = kWarmThreshold;
+  mutable_cf_options_.compaction_options_fifo = fifo_options_;
+  mutable_cf_options_.level0_file_num_compaction_trigger = 2;
+  mutable_cf_options_.max_compaction_bytes = kFileSize * 100;
+  FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_);
+
+  int64_t current_time = 0;
+  ASSERT_OK(Env::Default()->GetCurrentTime(&current_time));
+  uint64_t threshold_time =
+      static_cast<uint64_t>(current_time) - kWarmThreshold;
+  Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true,
+      Temperature::kUnknown, static_cast<uint64_t>(current_time) - 100);
+  Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true,
+      Temperature::kUnknown, threshold_time + 100);
+  Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true,
+      Temperature::kUnknown, threshold_time - 2000);
+  Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true,
+      Temperature::kUnknown, threshold_time - 3000);
+  Add(0, 2U, "200", "300", 4 * kFileSize, 0, 2100, 2200, 0, true,
+      Temperature::kUnknown, threshold_time - 4000);
+  UpdateVersionStorageInfo();
+
+  ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
+  std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
+  ASSERT_TRUE(compaction.get() != nullptr);
+  ASSERT_EQ(2U, compaction->num_input_files(0));
+  ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber());
+  ASSERT_EQ(3U, compaction->input(0, 1)->fd.GetNumber());
+}
+
+TEST_F(CompactionPickerTest, FIFOToWarmMaxSize) {
+  NewVersionStorage(1, kCompactionStyleFIFO);
+  const uint64_t kFileSize = 100000;
+  const uint64_t kMaxSize = kFileSize * 100000;
+  uint64_t kWarmThreshold = 2000;
+
+  fifo_options_.max_table_files_size = kMaxSize;
+  fifo_options_.age_for_warm = kWarmThreshold;
+  mutable_cf_options_.compaction_options_fifo = fifo_options_;
+  mutable_cf_options_.level0_file_num_compaction_trigger = 2;
+  mutable_cf_options_.max_compaction_bytes = kFileSize * 9;
+  FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_);
+
+  int64_t current_time = 0;
+  ASSERT_OK(Env::Default()->GetCurrentTime(&current_time));
+  uint64_t threshold_time =
+      static_cast<uint64_t>(current_time) - kWarmThreshold;
+  Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true,
+      Temperature::kUnknown, static_cast<uint64_t>(current_time) - 100);
+  Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true,
+      Temperature::kUnknown, threshold_time + 100);
+  Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true,
+      Temperature::kUnknown, threshold_time - 2000);
+  Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true,
+      Temperature::kUnknown, threshold_time - 3000);
+  Add(0, 2U, "200", "300", 4 * kFileSize, 0, 2100, 2200, 0, true,
+      Temperature::kUnknown, threshold_time - 4000);
+  Add(0, 1U, "200", "300", 4 * kFileSize, 0, 2000, 2100, 0, true,
+      Temperature::kUnknown, threshold_time - 5000);
+  UpdateVersionStorageInfo();
+
+  ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
+  std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
+  ASSERT_TRUE(compaction.get() != nullptr);
+  ASSERT_EQ(2U, compaction->num_input_files(0));
+  ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
+  ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
+}
+
+TEST_F(CompactionPickerTest, FIFOToWarmWithExistingWarm) {
+  NewVersionStorage(1, kCompactionStyleFIFO);
+  const uint64_t kFileSize = 100000;
+  const uint64_t kMaxSize = kFileSize * 100000;
+  uint64_t kWarmThreshold = 2000;
+
+  fifo_options_.max_table_files_size = kMaxSize;
+  fifo_options_.age_for_warm = kWarmThreshold;
+  mutable_cf_options_.compaction_options_fifo = fifo_options_;
+  mutable_cf_options_.level0_file_num_compaction_trigger = 2;
+  mutable_cf_options_.max_compaction_bytes = kFileSize * 100;
+  FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_);
+
+  int64_t current_time = 0;
+  ASSERT_OK(Env::Default()->GetCurrentTime(&current_time));
+  uint64_t threshold_time =
+      static_cast<uint64_t>(current_time) - kWarmThreshold;
+  Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true,
+      Temperature::kUnknown, static_cast<uint64_t>(current_time) - 100);
+  Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true,
+      Temperature::kUnknown, threshold_time + 100);
+  Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true,
+      Temperature::kUnknown, threshold_time - 2000);
+  Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true,
+      Temperature::kUnknown, threshold_time - 3000);
+  Add(0, 2U, "200", "300", 4 * kFileSize, 0, 2100, 2200, 0, true,
+      Temperature::kUnknown, threshold_time - 4000);
+  Add(0, 1U, "200", "300", 4 * kFileSize, 0, 2000, 2100, 0, true,
+      Temperature::kWarm, threshold_time - 5000);
+  UpdateVersionStorageInfo();
+
+  ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
+  std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
+  ASSERT_TRUE(compaction.get() != nullptr);
+  ASSERT_EQ(2U, compaction->num_input_files(0));
+  ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber());
+  ASSERT_EQ(3U, compaction->input(0, 1)->fd.GetNumber());
+}
+
+TEST_F(CompactionPickerTest, FIFOToWarmWithOngoing) {
+  NewVersionStorage(1, kCompactionStyleFIFO);
+  const uint64_t kFileSize = 100000;
+  const uint64_t kMaxSize = kFileSize * 100000;
+  uint64_t kWarmThreshold = 2000;
+
+  fifo_options_.max_table_files_size = kMaxSize;
+  fifo_options_.age_for_warm = kWarmThreshold;
+  mutable_cf_options_.compaction_options_fifo = fifo_options_;
+  mutable_cf_options_.level0_file_num_compaction_trigger = 2;
+  mutable_cf_options_.max_compaction_bytes = kFileSize * 100;
+  FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_);
+
+  int64_t current_time = 0;
+  ASSERT_OK(Env::Default()->GetCurrentTime(&current_time));
+  uint64_t threshold_time =
+      static_cast<uint64_t>(current_time) - kWarmThreshold;
+  Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true,
+      Temperature::kUnknown, static_cast<uint64_t>(current_time) - 100);
+  Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true,
+      Temperature::kUnknown, threshold_time + 100);
+  Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true,
+      Temperature::kUnknown, threshold_time - 2000);
+  Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true,
+      Temperature::kUnknown, threshold_time - 3000);
+  Add(0, 2U, "200", "300", 4 * kFileSize, 0, 2100, 2200, 0, true,
+      Temperature::kUnknown, threshold_time - 4000);
+  Add(0, 1U, "200", "300", 4 * kFileSize, 0, 2000, 2100, 0, true,
+      Temperature::kWarm, threshold_time - 5000);
+  file_map_[2].first->being_compacted = true;
+  UpdateVersionStorageInfo();
+
+  ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
+  std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
+  // Stop if a file is being compacted
+  ASSERT_TRUE(compaction.get() == nullptr);
+}
+
+TEST_F(CompactionPickerTest, FIFOToWarmWithHotBetweenWarms) {
+  NewVersionStorage(1, kCompactionStyleFIFO);
+  const uint64_t kFileSize = 100000;
+  const uint64_t kMaxSize = kFileSize * 100000;
+  uint64_t kWarmThreshold = 2000;
+
+  fifo_options_.max_table_files_size = kMaxSize;
+  fifo_options_.age_for_warm = kWarmThreshold;
+  mutable_cf_options_.compaction_options_fifo = fifo_options_;
+  mutable_cf_options_.level0_file_num_compaction_trigger = 2;
+  mutable_cf_options_.max_compaction_bytes = kFileSize * 100;
+  FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_);
+
+  int64_t current_time = 0;
+  ASSERT_OK(Env::Default()->GetCurrentTime(&current_time));
+  uint64_t threshold_time =
+      static_cast<uint64_t>(current_time) - kWarmThreshold;
+  Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true,
+      Temperature::kUnknown, static_cast<uint64_t>(current_time) - 100);
+  Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true,
+      Temperature::kUnknown, threshold_time + 100);
+  Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true,
+      Temperature::kUnknown, threshold_time - 2000);
+  Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true,
+      Temperature::kWarm, threshold_time - 3000);
+  Add(0, 2U, "200", "300", 4 * kFileSize, 0, 2100, 2200, 0, true,
+      Temperature::kUnknown, threshold_time - 4000);
+  Add(0, 1U, "200", "300", 4 * kFileSize, 0, 2000, 2100, 0, true,
+      Temperature::kWarm, threshold_time - 5000);
+  UpdateVersionStorageInfo();
+
+  ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
+  std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
+  // Stop if a file is being compacted
+  ASSERT_TRUE(compaction.get() != nullptr);
+  ASSERT_EQ(1U, compaction->num_input_files(0));
+  ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber());
+}
+
 #endif  // ROCKSDB_LITE
 
 TEST_F(CompactionPickerTest, CompactionPriMinOverlapping1) {
index b6f38f8282f9bcb501aa2395a5b1445ebb3e5181..211a4f468afc31769918b3d6e462067bf8bf1555 100644 (file)
@@ -728,6 +728,7 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
                          1, enable_compression),
       GetCompressionOptions(mutable_cf_options_, vstorage_, start_level,
                             enable_compression),
+      Temperature::kUnknown,
       /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
       score_, false /* deletion_compaction */, compaction_reason);
 }
@@ -955,6 +956,7 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
       GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
                          output_level, 1),
       GetCompressionOptions(mutable_cf_options_, vstorage_, output_level),
+      Temperature::kUnknown,
       /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
       score_, false /* deletion_compaction */,
       CompactionReason::kFilesMarkedForCompaction);
@@ -1029,6 +1031,7 @@ Compaction* UniversalCompactionBuilder::PickCompactionToOldest(
                          output_level, 1, true /* enable_compression */),
       GetCompressionOptions(mutable_cf_options_, vstorage_, output_level,
                             true /* enable_compression */),
+      Temperature::kUnknown,
       /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
       score_, false /* deletion_compaction */, compaction_reason);
 }
index 20b0a36b4c7f7f67d16d5b070f2a9db935322c74..53dadde71738e0723bca59e2c69e22d49f7d0e3e 100644 (file)
@@ -6749,6 +6749,69 @@ TEST_F(DBCompactionTest, CompactionWithChecksumHandoffManifest2) {
   Destroy(options);
 }
 
+TEST_F(DBCompactionTest, FIFOWarm) {
+  Options options = CurrentOptions();
+  options.compaction_style = kCompactionStyleFIFO;
+  options.num_levels = 1;
+  options.max_open_files = -1;
+  options.level0_file_num_compaction_trigger = 2;
+  options.create_if_missing = true;
+  CompactionOptionsFIFO fifo_options;
+  fifo_options.age_for_warm = 1000;
+  fifo_options.max_table_files_size = 100000000;
+  options.compaction_options_fifo = fifo_options;
+  env_->SetMockSleep();
+  Reopen(options);
+
+  int total_warm = 0;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "NewWritableFile::FileOptions.temperature", [&](void* arg) {
+        Temperature temperature = *(static_cast<Temperature*>(arg));
+        if (temperature == Temperature::kWarm) {
+          total_warm++;
+        }
+      });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  // The file system does not support checksum handoff. The check
+  // will be ignored.
+  ASSERT_OK(Put(Key(0), "value1"));
+  env_->MockSleepForSeconds(800);
+  ASSERT_OK(Put(Key(2), "value2"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put(Key(0), "value1"));
+  env_->MockSleepForSeconds(800);
+  ASSERT_OK(Put(Key(2), "value2"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put(Key(0), "value1"));
+  env_->MockSleepForSeconds(800);
+  ASSERT_OK(Put(Key(2), "value2"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+  ASSERT_OK(Put(Key(0), "value1"));
+  env_->MockSleepForSeconds(800);
+  ASSERT_OK(Put(Key(2), "value2"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+
+  ColumnFamilyMetaData metadata;
+  db_->GetColumnFamilyMetaData(&metadata);
+  ASSERT_EQ(4, metadata.file_count);
+  ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature);
+  ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[1].temperature);
+  ASSERT_EQ(Temperature::kWarm, metadata.levels[0].files[2].temperature);
+  ASSERT_EQ(Temperature::kWarm, metadata.levels[0].files[3].temperature);
+  ASSERT_EQ(2, total_warm);
+
+  Destroy(options);
+}
+
 #endif  // !defined(ROCKSDB_LITE)
 
 }  // namespace ROCKSDB_NAMESPACE
index a2bfdac4822845a7af65b2c38834fd10811c2e94..d8802825f4dca1f01da9a506f2922876a8615d85 100644 (file)
@@ -2615,7 +2615,12 @@ void VersionStorageInfo::ComputeCompactionScore(
       if (compaction_style_ == kCompactionStyleFIFO) {
         score = static_cast<double>(total_size) /
                 mutable_cf_options.compaction_options_fifo.max_table_files_size;
-        if (mutable_cf_options.compaction_options_fifo.allow_compaction) {
+        if (mutable_cf_options.compaction_options_fifo.allow_compaction ||
+            mutable_cf_options.compaction_options_fifo.age_for_warm > 0) {
+          // Warm tier move can happen at any time. It's too expensive to
+          // check very file's timestamp now. For now, just trigger it
+          // slightly more frequently than FIFO compaction so that this
+          // happens first.
           score = std::max(
               static_cast<double>(num_sorted_runs) /
                   mutable_cf_options.level0_file_num_compaction_trigger,
@@ -2627,7 +2632,6 @@ void VersionStorageInfo::ComputeCompactionScore(
                   immutable_options, mutable_cf_options, files_[level])),
               score);
         }
-
       } else {
         score = static_cast<double>(num_sorted_runs) /
                 mutable_cf_options.level0_file_num_compaction_trigger;
index 9df6c5a39d88a982e8cf0893c00a223ebc054a5b..cc4f6b84974de42a079b627eeb2de17bf459431b 100644 (file)
@@ -17,6 +17,8 @@ namespace ROCKSDB_NAMESPACE {
 IOStatus NewWritableFile(FileSystem* fs, const std::string& fname,
                          std::unique_ptr<FSWritableFile>* result,
                          const FileOptions& options) {
+  TEST_SYNC_POINT_CALLBACK("NewWritableFile::FileOptions.temperature",
+                           const_cast<Temperature*>(&options.temperature));
   IOStatus s = fs->NewWritableFile(fname, options, result, nullptr);
   TEST_KILL_RANDOM_WITH_WEIGHT("NewWritableFile:0", REDUCE_ODDS2);
   return s;
index 1c4f09b76b5325293646cc88c3d26b954d00a16d..680c46317e089e6a98dda764b016d7d0c5d20e3c 100644 (file)
@@ -70,6 +70,10 @@ struct CompactionOptionsFIFO {
   // Default: false;
   bool allow_compaction = false;
 
+  // When not 0, if the data in the file is older than this threshold, RocksDB
+  // will soon move the file to warm temperature.
+  uint64_t age_for_warm = 0;
+
   CompactionOptionsFIFO() : max_table_files_size(1 * 1024 * 1024 * 1024) {}
   CompactionOptionsFIFO(uint64_t _max_table_files_size, bool _allow_compaction)
       : max_table_files_size(_max_table_files_size),
index f59e395de2a436fffd38c3862958f76f6c83a830..5216e1e24ad9dab65d0694bc638146072d2b3ec6 100644 (file)
@@ -93,6 +93,8 @@ enum class CompactionReason : int {
   kExternalSstIngestion,
   // Compaction due to SST file being too old
   kPeriodicCompaction,
+  // Compaction in order to move files to temperature
+  kChangeTemperature,
   // total number of compaction reasons, new reasons must be added above this.
   kNumOfReasons,
 };
index ce2c96a921295fca7ce6a1cdd81f99b03376f752..c5f34a8a225fc87507638ac6fbcfc799a6e96bf6 100644 (file)
@@ -6989,6 +6989,10 @@ class CompactionReasonJni {
         return ROCKSDB_NAMESPACE::CompactionReason::kFlush;
       case 0x0D:
         return ROCKSDB_NAMESPACE::CompactionReason::kExternalSstIngestion;
+      case 0x0E:
+        return ROCKSDB_NAMESPACE::CompactionReason::kPeriodicCompaction;
+      case 0x0F:
+        return ROCKSDB_NAMESPACE::CompactionReason::kChangeTemperature;
       default:
         // undefined/default
         return ROCKSDB_NAMESPACE::CompactionReason::kUnknown;
index f18c4812228f3886c145dd3a29d0126a0563aff5..24e23445041f8b1692de7a02f559962496dbf40a 100644 (file)
@@ -78,7 +78,17 @@ public enum CompactionReason {
   /**
    * Compaction caused by external sst file ingestion
    */
-  kExternalSstIngestion((byte)0x0D);
+  kExternalSstIngestion((byte) 0x0D),
+
+  /**
+   * Compaction due to SST file being too old
+   */
+  kPeriodicCompaction((byte) 0x0E),
+
+  /**
+   * Compaction in order to move files to temperature
+   */
+  kChangeTemperature((byte) 0x0F);
 
   private final byte value;
 
index 830f820ef82c3d19a5105f7ae5895688afec82db..465413170d0ea450a0022679f03644bfbcfe2ad6 100644 (file)
@@ -172,6 +172,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
          {offsetof(struct CompactionOptionsFIFO, max_table_files_size),
           OptionType::kUInt64T, OptionVerificationType::kNormal,
           OptionTypeFlags::kMutable}},
+        {"age_for_warm",
+         {offsetof(struct CompactionOptionsFIFO, age_for_warm),
+          OptionType::kUInt64T, OptionVerificationType::kNormal,
+          OptionTypeFlags::kMutable}},
         {"ttl",
          {0, OptionType::kUInt64T, OptionVerificationType::kDeprecated,
           OptionTypeFlags::kNone}},
index c5fc0b235bee629022c869c2206b89d22376a5c4..5c79a024858dd1d559f2bc37c65a0d9fd526f979 100644 (file)
@@ -516,7 +516,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
       "enable_blob_garbage_collection=true;"
       "blob_garbage_collection_age_cutoff=0.5;"
       "compaction_options_fifo={max_table_files_size=3;allow_"
-      "compaction=false;};",
+      "compaction=false;age_for_warm=1;};",
       new_options));
 
   ASSERT_EQ(unset_bytes_base,
index 1b2ad6988a6ddd00251dc6c59cb88cb0cb3e287c..bb69878a15e3062f50951d72bd9fb5ac075d015a 100644 (file)
@@ -889,6 +889,8 @@ DEFINE_bool(fifo_compaction_allow_compaction, true,
 
 DEFINE_uint64(fifo_compaction_ttl, 0, "TTL for the SST Files in seconds.");
 
+DEFINE_uint64(fifo_age_for_warm, 0, "age_for_warm for FIFO compaction.");
+
 // Stacked BlobDB Options
 DEFINE_bool(use_blob_db, false, "[Stacked BlobDB] Open a BlobDB instance.");
 
@@ -3964,6 +3966,7 @@ class Benchmark {
     options.compaction_options_fifo = CompactionOptionsFIFO(
         FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024,
         FLAGS_fifo_compaction_allow_compaction);
+    options.compaction_options_fifo.age_for_warm = FLAGS_fifo_age_for_warm;
 #endif  // ROCKSDB_LITE
     if (FLAGS_prefix_size != 0) {
       options.prefix_extractor.reset(