* EventListeners that have a non-empty Name() and that are registered with the ObjectRegistry can now be serialized to/from the OPTIONS file.
* Insert warm blocks (data blocks, uncompressed dict blocks, index and filter blocks) in Block cache during flush under option BlockBasedTableOptions.prepopulate_block_cache. Previously it was enabled for only data blocks.
* BlockBasedTableOptions.prepopulate_block_cache can be dynamically configured using DB::SetOptions.
+* Add CompactionOptionsFIFO.age_for_warm, which allows RocksDB to move old files to warm tier in FIFO compactions. Note that file temperature is still an experimental feature.
### Performance Improvements
* Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value.
std::vector<CompactionInputFiles> _inputs, int _output_level,
uint64_t _target_file_size, uint64_t _max_compaction_bytes,
uint32_t _output_path_id, CompressionType _compression,
- CompressionOptions _compression_opts, uint32_t _max_subcompactions,
- std::vector<FileMetaData*> _grandparents, bool _manual_compaction,
- double _score, bool _deletion_compaction,
+ CompressionOptions _compression_opts, Temperature _output_temperature,
+ uint32_t _max_subcompactions, std::vector<FileMetaData*> _grandparents,
+ bool _manual_compaction, double _score, bool _deletion_compaction,
CompactionReason _compaction_reason)
: input_vstorage_(vstorage),
start_level_(_inputs[0].level),
output_path_id_(_output_path_id),
output_compression_(_compression),
output_compression_opts_(_compression_opts),
+ output_temperature_(_output_temperature),
deletion_compaction_(_deletion_compaction),
inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))),
grandparents_(std::move(_grandparents)),
return false;
}
+ if (start_level_ == output_level_) {
+ // It doesn't make sense if compaction picker picks files just to trivial
+ // move to the same level.
+ return false;
+ }
+
// Used in universal compaction, where trivial move can be done if the
// input files are non overlapping
if ((mutable_cf_options_.compaction_options_universal.allow_trivial_move) &&
std::vector<CompactionInputFiles> inputs, int output_level,
uint64_t target_file_size, uint64_t max_compaction_bytes,
uint32_t output_path_id, CompressionType compression,
- CompressionOptions compression_opts, uint32_t max_subcompactions,
+ CompressionOptions compression_opts,
+ Temperature output_temperature, uint32_t max_subcompactions,
std::vector<FileMetaData*> grandparents,
bool manual_compaction = false, double score = -1,
bool deletion_compaction = false,
uint64_t max_compaction_bytes() const { return max_compaction_bytes_; }
+ Temperature output_temperature() const { return output_temperature_; }
+
uint32_t max_subcompactions() const { return max_subcompactions_; }
uint64_t MinInputFileOldestAncesterTime() const;
const uint32_t output_path_id_;
CompressionType output_compression_;
CompressionOptions output_compression_opts_;
+ Temperature output_temperature_;
// If true, then the compaction can be done by simply deleting input files.
const bool deletion_compaction_;
return "ExternalSstIngestion";
case CompactionReason::kPeriodicCompaction:
return "PeriodicCompaction";
+ case CompactionReason::kChangeTemperature:
+ return "ChangeTemperature";
case CompactionReason::kNumOfReasons:
// fall through
default:
// Pass temperature of botommost files to FileSystem.
FileOptions fo_copy = file_options_;
- Temperature temperature = Temperature::kUnknown;
- if (bottommost_level_) {
- fo_copy.temperature = temperature =
+ Temperature temperature = sub_compact->compaction->output_temperature();
+ if (temperature == Temperature::kUnknown && bottommost_level_) {
+ temperature =
sub_compact->compaction->mutable_cf_options()->bottommost_temperature;
}
+ fo_copy.temperature = temperature;
Status s;
IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy);
cfd->current()->storage_info(), *cfd->ioptions(),
*cfd->GetLatestMutableCFOptions(), mutable_db_options_,
compaction_input_files, output_level, 1024 * 1024, 10 * 1024 * 1024, 0,
- kNoCompression, cfd->GetLatestMutableCFOptions()->compression_opts, 0,
- {}, true);
+ kNoCompression, cfd->GetLatestMutableCFOptions()->compression_opts,
+ Temperature::kUnknown, 0, {}, true);
compaction.SetInputVersion(cfd->current());
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
output_level, compact_options.output_file_size_limit,
mutable_cf_options.max_compaction_bytes, output_path_id, compression_type,
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
- compact_options.max_subcompactions,
+ Temperature::kUnknown, compact_options.max_subcompactions,
/* grandparents */ {}, true);
RegisterCompaction(c);
return c;
GetCompressionType(ioptions_, vstorage, mutable_cf_options,
output_level, 1),
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
- compact_range_options.max_subcompactions, /* grandparents */ {},
+ Temperature::kUnknown, compact_range_options.max_subcompactions,
+ /* grandparents */ {},
/* is manual */ true);
RegisterCompaction(c);
vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options);
GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
vstorage->base_level()),
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
- compact_range_options.max_subcompactions, std::move(grandparents),
+ Temperature::kUnknown, compact_range_options.max_subcompactions,
+ std::move(grandparents),
/* is manual compaction */ true);
TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction);
Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, mutable_db_options,
std::move(inputs), 0, 0, 0, 0, kNoCompression,
- mutable_cf_options.compression_opts,
+ mutable_cf_options.compression_opts, Temperature::kUnknown,
/* max_subcompactions */ 0, {}, /* is manual */ false,
vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOTtl);
{comp_inputs}, 0, 16 * 1024 * 1024 /* output file size limit */,
0 /* max compaction bytes, not applicable */,
0 /* output path ID */, mutable_cf_options.compression,
- mutable_cf_options.compression_opts, 0 /* max_subcompactions */, {},
+ mutable_cf_options.compression_opts, Temperature::kUnknown,
+ 0 /* max_subcompactions */, {},
/* is manual */ false, vstorage->CompactionScore(0),
/* is deletion compaction */ false,
CompactionReason::kFIFOReduceNumFiles);
Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, mutable_db_options,
std::move(inputs), 0, 0, 0, 0, kNoCompression,
- mutable_cf_options.compression_opts,
+ mutable_cf_options.compression_opts, Temperature::kUnknown,
/* max_subcompactions */ 0, {}, /* is manual */ false,
vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
return c;
}
+Compaction* FIFOCompactionPicker::PickCompactionToWarm(
+ const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
+ const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
+ LogBuffer* log_buffer) {
+ if (mutable_cf_options.compaction_options_fifo.age_for_warm == 0) {
+ return nullptr;
+ }
+
+ const int kLevel0 = 0;
+ const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
+
+ int64_t _current_time;
+ auto status = ioptions_.clock->GetCurrentTime(&_current_time);
+ if (!status.ok()) {
+ ROCKS_LOG_BUFFER(log_buffer,
+ "[%s] FIFO compaction: Couldn't get current time: %s. "
+ "Not doing compactions based on warm threshold. ",
+ cf_name.c_str(), status.ToString().c_str());
+ return nullptr;
+ }
+ const uint64_t current_time = static_cast<uint64_t>(_current_time);
+
+ if (!level0_compactions_in_progress_.empty()) {
+ ROCKS_LOG_BUFFER(
+ log_buffer,
+ "[%s] FIFO compaction: Already executing compaction. Parallel "
+ "compactions are not supported",
+ cf_name.c_str());
+ return nullptr;
+ }
+
+ std::vector<CompactionInputFiles> inputs;
+ inputs.emplace_back();
+ inputs[0].level = 0;
+
+ // avoid underflow
+ if (current_time > mutable_cf_options.compaction_options_fifo.age_for_warm) {
+ uint64_t create_time_threshold =
+ current_time - mutable_cf_options.compaction_options_fifo.age_for_warm;
+ uint64_t compaction_size = 0;
+ // We will ideally identify a file qualifying for warm tier by knowing
+ // the timestamp for the youngest entry in the file. However, right now
+ // we don't have the information. We infer it by looking at timestamp
+ // of the next file's (which is just younger) oldest entry's timestamp.
+ FileMetaData* prev_file = nullptr;
+ for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
+ FileMetaData* f = *ritr;
+ assert(f);
+ if (f->being_compacted) {
+ // Right now this probably won't happen as we never try to schedule
+ // two compactions in parallel, so here we just simply don't schedule
+ // anything.
+ return nullptr;
+ }
+ uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
+ if (oldest_ancester_time == kUnknownOldestAncesterTime) {
+ // Older files might not have enough information. It is possible to
+ // handle these files by looking at newer files, but maintaining the
+ // logic isn't worth it.
+ break;
+ }
+ if (oldest_ancester_time > create_time_threshold) {
+ // The previous file (which has slightly older data) doesn't qualify
+ // for warm tier.
+ break;
+ }
+ if (prev_file != nullptr) {
+ compaction_size += prev_file->fd.GetFileSize();
+ if (compaction_size > mutable_cf_options.max_compaction_bytes) {
+ break;
+ }
+ inputs[0].files.push_back(prev_file);
+ ROCKS_LOG_BUFFER(log_buffer,
+ "[%s] FIFO compaction: picking file %" PRIu64
+ " with next file's oldest time %" PRIu64 " for warm",
+ cf_name.c_str(), prev_file->fd.GetNumber(),
+ oldest_ancester_time);
+ }
+ if (f->temperature == Temperature::kUnknown ||
+ f->temperature == Temperature::kHot) {
+ prev_file = f;
+ } else if (!inputs[0].files.empty()) {
+ // A warm file newer than files picked.
+ break;
+ } else {
+ assert(prev_file == nullptr);
+ }
+ }
+ }
+
+ if (inputs[0].files.empty()) {
+ return nullptr;
+ }
+
+ Compaction* c = new Compaction(
+ vstorage, ioptions_, mutable_cf_options, mutable_db_options,
+ std::move(inputs), 0, 0 /* output file size limit */,
+ 0 /* max compaction bytes, not applicable */, 0 /* output path ID */,
+ mutable_cf_options.compression, mutable_cf_options.compression_opts,
+ Temperature::kWarm,
+ /* max_subcompactions */ 0, {}, /* is manual */ false,
+ vstorage->CompactionScore(0),
+ /* is deletion compaction */ false, CompactionReason::kChangeTemperature);
+ return c;
+}
+
Compaction* FIFOCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
c = PickSizeCompaction(cf_name, mutable_cf_options, mutable_db_options,
vstorage, log_buffer);
}
+ if (c == nullptr) {
+ c = PickCompactionToWarm(cf_name, mutable_cf_options, mutable_db_options,
+ vstorage, log_buffer);
+ }
RegisterCompaction(c);
return c;
}
const MutableDBOptions& mutable_db_options,
VersionStorageInfo* version,
LogBuffer* log_buffer);
+
+ Compaction* PickCompactionToWarm(const std::string& cf_name,
+ const MutableCFOptions& mutable_cf_options,
+ const MutableDBOptions& mutable_db_options,
+ VersionStorageInfo* version,
+ LogBuffer* log_buffer);
};
} // namespace ROCKSDB_NAMESPACE
#endif // !ROCKSDB_LITE
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
output_level_, vstorage_->base_level()),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level_),
+ Temperature::kUnknown,
/* max_subcompactions */ 0, std::move(grandparents_), is_manual_,
start_level_score_, false /* deletion_compaction */, compaction_reason_);
void Add(int level, uint32_t file_number, const char* smallest,
const char* largest, uint64_t file_size = 1, uint32_t path_id = 0,
SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100,
- size_t compensated_file_size = 0, bool marked_for_compact = false) {
+ size_t compensated_file_size = 0, bool marked_for_compact = false,
+ Temperature temperature = Temperature::kUnknown,
+ uint64_t oldest_ancestor_time = kUnknownOldestAncesterTime) {
VersionStorageInfo* vstorage;
if (temp_vstorage_) {
vstorage = temp_vstorage_.get();
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
f->compensated_file_size =
(compensated_file_size != 0) ? compensated_file_size : file_size;
+ f->temperature = temperature;
+ f->oldest_ancester_time = oldest_ancestor_time;
vstorage->AddFile(level, f);
files_.emplace_back(f);
file_map_.insert({file_number, {f, level}});
vstorage_->CompactionScore(0) >= 1);
}
}
+
+TEST_F(CompactionPickerTest, FIFOToWarm1) {
+ NewVersionStorage(1, kCompactionStyleFIFO);
+ const uint64_t kFileSize = 100000;
+ const uint64_t kMaxSize = kFileSize * 100000;
+ uint64_t kWarmThreshold = 2000;
+
+ fifo_options_.max_table_files_size = kMaxSize;
+ fifo_options_.age_for_warm = kWarmThreshold;
+ mutable_cf_options_.compaction_options_fifo = fifo_options_;
+ mutable_cf_options_.level0_file_num_compaction_trigger = 2;
+ mutable_cf_options_.max_compaction_bytes = kFileSize * 100;
+ FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_);
+
+ int64_t current_time = 0;
+ ASSERT_OK(Env::Default()->GetCurrentTime(¤t_time));
+ uint64_t threshold_time =
+ static_cast<uint64_t>(current_time) - kWarmThreshold;
+ Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true,
+ Temperature::kUnknown, static_cast<uint64_t>(current_time) - 100);
+ Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true,
+ Temperature::kUnknown, threshold_time + 100);
+ Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true,
+ Temperature::kUnknown, threshold_time - 2000);
+ Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true,
+ Temperature::kUnknown, threshold_time - 3000);
+ UpdateVersionStorageInfo();
+
+ ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
+ std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
+ cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+ &log_buffer_));
+ ASSERT_TRUE(compaction.get() != nullptr);
+ ASSERT_EQ(1U, compaction->num_input_files(0));
+ ASSERT_EQ(3U, compaction->input(0, 0)->fd.GetNumber());
+}
+
+TEST_F(CompactionPickerTest, FIFOToWarm2) {
+ NewVersionStorage(1, kCompactionStyleFIFO);
+ const uint64_t kFileSize = 100000;
+ const uint64_t kMaxSize = kFileSize * 100000;
+ uint64_t kWarmThreshold = 2000;
+
+ fifo_options_.max_table_files_size = kMaxSize;
+ fifo_options_.age_for_warm = kWarmThreshold;
+ mutable_cf_options_.compaction_options_fifo = fifo_options_;
+ mutable_cf_options_.level0_file_num_compaction_trigger = 2;
+ mutable_cf_options_.max_compaction_bytes = kFileSize * 100;
+ FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_);
+
+ int64_t current_time = 0;
+ ASSERT_OK(Env::Default()->GetCurrentTime(¤t_time));
+ uint64_t threshold_time =
+ static_cast<uint64_t>(current_time) - kWarmThreshold;
+ Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true,
+ Temperature::kUnknown, static_cast<uint64_t>(current_time) - 100);
+ Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true,
+ Temperature::kUnknown, threshold_time + 100);
+ Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true,
+ Temperature::kUnknown, threshold_time - 2000);
+ Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true,
+ Temperature::kUnknown, threshold_time - 3000);
+ Add(0, 2U, "200", "300", 4 * kFileSize, 0, 2100, 2200, 0, true,
+ Temperature::kUnknown, threshold_time - 4000);
+ UpdateVersionStorageInfo();
+
+ ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
+ std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
+ cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+ &log_buffer_));
+ ASSERT_TRUE(compaction.get() != nullptr);
+ ASSERT_EQ(2U, compaction->num_input_files(0));
+ ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber());
+ ASSERT_EQ(3U, compaction->input(0, 1)->fd.GetNumber());
+}
+
+TEST_F(CompactionPickerTest, FIFOToWarmMaxSize) {
+ NewVersionStorage(1, kCompactionStyleFIFO);
+ const uint64_t kFileSize = 100000;
+ const uint64_t kMaxSize = kFileSize * 100000;
+ uint64_t kWarmThreshold = 2000;
+
+ fifo_options_.max_table_files_size = kMaxSize;
+ fifo_options_.age_for_warm = kWarmThreshold;
+ mutable_cf_options_.compaction_options_fifo = fifo_options_;
+ mutable_cf_options_.level0_file_num_compaction_trigger = 2;
+ mutable_cf_options_.max_compaction_bytes = kFileSize * 9;
+ FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_);
+
+ int64_t current_time = 0;
+ ASSERT_OK(Env::Default()->GetCurrentTime(¤t_time));
+ uint64_t threshold_time =
+ static_cast<uint64_t>(current_time) - kWarmThreshold;
+ Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true,
+ Temperature::kUnknown, static_cast<uint64_t>(current_time) - 100);
+ Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true,
+ Temperature::kUnknown, threshold_time + 100);
+ Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true,
+ Temperature::kUnknown, threshold_time - 2000);
+ Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true,
+ Temperature::kUnknown, threshold_time - 3000);
+ Add(0, 2U, "200", "300", 4 * kFileSize, 0, 2100, 2200, 0, true,
+ Temperature::kUnknown, threshold_time - 4000);
+ Add(0, 1U, "200", "300", 4 * kFileSize, 0, 2000, 2100, 0, true,
+ Temperature::kUnknown, threshold_time - 5000);
+ UpdateVersionStorageInfo();
+
+ ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
+ std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
+ cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+ &log_buffer_));
+ ASSERT_TRUE(compaction.get() != nullptr);
+ ASSERT_EQ(2U, compaction->num_input_files(0));
+ ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
+ ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
+}
+
+TEST_F(CompactionPickerTest, FIFOToWarmWithExistingWarm) {
+ NewVersionStorage(1, kCompactionStyleFIFO);
+ const uint64_t kFileSize = 100000;
+ const uint64_t kMaxSize = kFileSize * 100000;
+ uint64_t kWarmThreshold = 2000;
+
+ fifo_options_.max_table_files_size = kMaxSize;
+ fifo_options_.age_for_warm = kWarmThreshold;
+ mutable_cf_options_.compaction_options_fifo = fifo_options_;
+ mutable_cf_options_.level0_file_num_compaction_trigger = 2;
+ mutable_cf_options_.max_compaction_bytes = kFileSize * 100;
+ FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_);
+
+ int64_t current_time = 0;
+ ASSERT_OK(Env::Default()->GetCurrentTime(¤t_time));
+ uint64_t threshold_time =
+ static_cast<uint64_t>(current_time) - kWarmThreshold;
+ Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true,
+ Temperature::kUnknown, static_cast<uint64_t>(current_time) - 100);
+ Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true,
+ Temperature::kUnknown, threshold_time + 100);
+ Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true,
+ Temperature::kUnknown, threshold_time - 2000);
+ Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true,
+ Temperature::kUnknown, threshold_time - 3000);
+ Add(0, 2U, "200", "300", 4 * kFileSize, 0, 2100, 2200, 0, true,
+ Temperature::kUnknown, threshold_time - 4000);
+ Add(0, 1U, "200", "300", 4 * kFileSize, 0, 2000, 2100, 0, true,
+ Temperature::kWarm, threshold_time - 5000);
+ UpdateVersionStorageInfo();
+
+ ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
+ std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
+ cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+ &log_buffer_));
+ ASSERT_TRUE(compaction.get() != nullptr);
+ ASSERT_EQ(2U, compaction->num_input_files(0));
+ ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber());
+ ASSERT_EQ(3U, compaction->input(0, 1)->fd.GetNumber());
+}
+
+TEST_F(CompactionPickerTest, FIFOToWarmWithOngoing) {
+ NewVersionStorage(1, kCompactionStyleFIFO);
+ const uint64_t kFileSize = 100000;
+ const uint64_t kMaxSize = kFileSize * 100000;
+ uint64_t kWarmThreshold = 2000;
+
+ fifo_options_.max_table_files_size = kMaxSize;
+ fifo_options_.age_for_warm = kWarmThreshold;
+ mutable_cf_options_.compaction_options_fifo = fifo_options_;
+ mutable_cf_options_.level0_file_num_compaction_trigger = 2;
+ mutable_cf_options_.max_compaction_bytes = kFileSize * 100;
+ FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_);
+
+ int64_t current_time = 0;
+ ASSERT_OK(Env::Default()->GetCurrentTime(¤t_time));
+ uint64_t threshold_time =
+ static_cast<uint64_t>(current_time) - kWarmThreshold;
+ Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true,
+ Temperature::kUnknown, static_cast<uint64_t>(current_time) - 100);
+ Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true,
+ Temperature::kUnknown, threshold_time + 100);
+ Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true,
+ Temperature::kUnknown, threshold_time - 2000);
+ Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true,
+ Temperature::kUnknown, threshold_time - 3000);
+ Add(0, 2U, "200", "300", 4 * kFileSize, 0, 2100, 2200, 0, true,
+ Temperature::kUnknown, threshold_time - 4000);
+ Add(0, 1U, "200", "300", 4 * kFileSize, 0, 2000, 2100, 0, true,
+ Temperature::kWarm, threshold_time - 5000);
+ file_map_[2].first->being_compacted = true;
+ UpdateVersionStorageInfo();
+
+ ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
+ std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
+ cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+ &log_buffer_));
+ // Stop if a file is being compacted
+ ASSERT_TRUE(compaction.get() == nullptr);
+}
+
+TEST_F(CompactionPickerTest, FIFOToWarmWithHotBetweenWarms) {
+ NewVersionStorage(1, kCompactionStyleFIFO);
+ const uint64_t kFileSize = 100000;
+ const uint64_t kMaxSize = kFileSize * 100000;
+ uint64_t kWarmThreshold = 2000;
+
+ fifo_options_.max_table_files_size = kMaxSize;
+ fifo_options_.age_for_warm = kWarmThreshold;
+ mutable_cf_options_.compaction_options_fifo = fifo_options_;
+ mutable_cf_options_.level0_file_num_compaction_trigger = 2;
+ mutable_cf_options_.max_compaction_bytes = kFileSize * 100;
+ FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_);
+
+ int64_t current_time = 0;
+ ASSERT_OK(Env::Default()->GetCurrentTime(¤t_time));
+ uint64_t threshold_time =
+ static_cast<uint64_t>(current_time) - kWarmThreshold;
+ Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true,
+ Temperature::kUnknown, static_cast<uint64_t>(current_time) - 100);
+ Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true,
+ Temperature::kUnknown, threshold_time + 100);
+ Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true,
+ Temperature::kUnknown, threshold_time - 2000);
+ Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true,
+ Temperature::kWarm, threshold_time - 3000);
+ Add(0, 2U, "200", "300", 4 * kFileSize, 0, 2100, 2200, 0, true,
+ Temperature::kUnknown, threshold_time - 4000);
+ Add(0, 1U, "200", "300", 4 * kFileSize, 0, 2000, 2100, 0, true,
+ Temperature::kWarm, threshold_time - 5000);
+ UpdateVersionStorageInfo();
+
+ ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
+ std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
+ cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+ &log_buffer_));
+ // Stop if a file is being compacted
+ ASSERT_TRUE(compaction.get() != nullptr);
+ ASSERT_EQ(1U, compaction->num_input_files(0));
+ ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber());
+}
+
#endif // ROCKSDB_LITE
TEST_F(CompactionPickerTest, CompactionPriMinOverlapping1) {
1, enable_compression),
GetCompressionOptions(mutable_cf_options_, vstorage_, start_level,
enable_compression),
+ Temperature::kUnknown,
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score_, false /* deletion_compaction */, compaction_reason);
}
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
output_level, 1),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level),
+ Temperature::kUnknown,
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score_, false /* deletion_compaction */,
CompactionReason::kFilesMarkedForCompaction);
output_level, 1, true /* enable_compression */),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level,
true /* enable_compression */),
+ Temperature::kUnknown,
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score_, false /* deletion_compaction */, compaction_reason);
}
Destroy(options);
}
+TEST_F(DBCompactionTest, FIFOWarm) {
+ Options options = CurrentOptions();
+ options.compaction_style = kCompactionStyleFIFO;
+ options.num_levels = 1;
+ options.max_open_files = -1;
+ options.level0_file_num_compaction_trigger = 2;
+ options.create_if_missing = true;
+ CompactionOptionsFIFO fifo_options;
+ fifo_options.age_for_warm = 1000;
+ fifo_options.max_table_files_size = 100000000;
+ options.compaction_options_fifo = fifo_options;
+ env_->SetMockSleep();
+ Reopen(options);
+
+ int total_warm = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "NewWritableFile::FileOptions.temperature", [&](void* arg) {
+ Temperature temperature = *(static_cast<Temperature*>(arg));
+ if (temperature == Temperature::kWarm) {
+ total_warm++;
+ }
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ // The file system does not support checksum handoff. The check
+ // will be ignored.
+ ASSERT_OK(Put(Key(0), "value1"));
+ env_->MockSleepForSeconds(800);
+ ASSERT_OK(Put(Key(2), "value2"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put(Key(0), "value1"));
+ env_->MockSleepForSeconds(800);
+ ASSERT_OK(Put(Key(2), "value2"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put(Key(0), "value1"));
+ env_->MockSleepForSeconds(800);
+ ASSERT_OK(Put(Key(2), "value2"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ ASSERT_OK(Put(Key(0), "value1"));
+ env_->MockSleepForSeconds(800);
+ ASSERT_OK(Put(Key(2), "value2"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+
+ ColumnFamilyMetaData metadata;
+ db_->GetColumnFamilyMetaData(&metadata);
+ ASSERT_EQ(4, metadata.file_count);
+ ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature);
+ ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[1].temperature);
+ ASSERT_EQ(Temperature::kWarm, metadata.levels[0].files[2].temperature);
+ ASSERT_EQ(Temperature::kWarm, metadata.levels[0].files[3].temperature);
+ ASSERT_EQ(2, total_warm);
+
+ Destroy(options);
+}
+
#endif // !defined(ROCKSDB_LITE)
} // namespace ROCKSDB_NAMESPACE
if (compaction_style_ == kCompactionStyleFIFO) {
score = static_cast<double>(total_size) /
mutable_cf_options.compaction_options_fifo.max_table_files_size;
- if (mutable_cf_options.compaction_options_fifo.allow_compaction) {
+ if (mutable_cf_options.compaction_options_fifo.allow_compaction ||
+ mutable_cf_options.compaction_options_fifo.age_for_warm > 0) {
+ // Warm tier move can happen at any time. It's too expensive to
+ // check very file's timestamp now. For now, just trigger it
+ // slightly more frequently than FIFO compaction so that this
+ // happens first.
score = std::max(
static_cast<double>(num_sorted_runs) /
mutable_cf_options.level0_file_num_compaction_trigger,
immutable_options, mutable_cf_options, files_[level])),
score);
}
-
} else {
score = static_cast<double>(num_sorted_runs) /
mutable_cf_options.level0_file_num_compaction_trigger;
IOStatus NewWritableFile(FileSystem* fs, const std::string& fname,
std::unique_ptr<FSWritableFile>* result,
const FileOptions& options) {
+ TEST_SYNC_POINT_CALLBACK("NewWritableFile::FileOptions.temperature",
+ const_cast<Temperature*>(&options.temperature));
IOStatus s = fs->NewWritableFile(fname, options, result, nullptr);
TEST_KILL_RANDOM_WITH_WEIGHT("NewWritableFile:0", REDUCE_ODDS2);
return s;
// Default: false;
bool allow_compaction = false;
+ // When not 0, if the data in the file is older than this threshold, RocksDB
+ // will soon move the file to warm temperature.
+ uint64_t age_for_warm = 0;
+
CompactionOptionsFIFO() : max_table_files_size(1 * 1024 * 1024 * 1024) {}
CompactionOptionsFIFO(uint64_t _max_table_files_size, bool _allow_compaction)
: max_table_files_size(_max_table_files_size),
kExternalSstIngestion,
// Compaction due to SST file being too old
kPeriodicCompaction,
+ // Compaction in order to move files to temperature
+ kChangeTemperature,
// total number of compaction reasons, new reasons must be added above this.
kNumOfReasons,
};
return ROCKSDB_NAMESPACE::CompactionReason::kFlush;
case 0x0D:
return ROCKSDB_NAMESPACE::CompactionReason::kExternalSstIngestion;
+ case 0x0E:
+ return ROCKSDB_NAMESPACE::CompactionReason::kPeriodicCompaction;
+ case 0x0F:
+ return ROCKSDB_NAMESPACE::CompactionReason::kChangeTemperature;
default:
// undefined/default
return ROCKSDB_NAMESPACE::CompactionReason::kUnknown;
/**
* Compaction caused by external sst file ingestion
*/
- kExternalSstIngestion((byte)0x0D);
+ kExternalSstIngestion((byte) 0x0D),
+
+ /**
+ * Compaction due to SST file being too old
+ */
+ kPeriodicCompaction((byte) 0x0E),
+
+ /**
+ * Compaction in order to move files to temperature
+ */
+ kChangeTemperature((byte) 0x0F);
private final byte value;
{offsetof(struct CompactionOptionsFIFO, max_table_files_size),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
+ {"age_for_warm",
+ {offsetof(struct CompactionOptionsFIFO, age_for_warm),
+ OptionType::kUInt64T, OptionVerificationType::kNormal,
+ OptionTypeFlags::kMutable}},
{"ttl",
{0, OptionType::kUInt64T, OptionVerificationType::kDeprecated,
OptionTypeFlags::kNone}},
"enable_blob_garbage_collection=true;"
"blob_garbage_collection_age_cutoff=0.5;"
"compaction_options_fifo={max_table_files_size=3;allow_"
- "compaction=false;};",
+ "compaction=false;age_for_warm=1;};",
new_options));
ASSERT_EQ(unset_bytes_base,
DEFINE_uint64(fifo_compaction_ttl, 0, "TTL for the SST Files in seconds.");
+DEFINE_uint64(fifo_age_for_warm, 0, "age_for_warm for FIFO compaction.");
+
// Stacked BlobDB Options
DEFINE_bool(use_blob_db, false, "[Stacked BlobDB] Open a BlobDB instance.");
options.compaction_options_fifo = CompactionOptionsFIFO(
FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024,
FLAGS_fifo_compaction_allow_compaction);
+ options.compaction_options_fifo.age_for_warm = FLAGS_fifo_age_for_warm;
#endif // ROCKSDB_LITE
if (FLAGS_prefix_size != 0) {
options.prefix_extractor.reset(