private:
friend class Version;
friend class VersionSet;
+ friend class CompactionPicker;
+ friend class UniversalCompactionPicker;
+ friend class LevelCompactionPicker;
Compaction(Version* input_version, int level, int out_level,
uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes,
--- /dev/null
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under the BSD-style license found in the
+// LICENSE file in the root directory of this source tree. An additional grant
+// of patent rights can be found in the PATENTS file in the same directory.
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/compaction_picker.h"
+
+namespace rocksdb {
+
+namespace {
+
+uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
+ uint64_t sum = 0;
+ for (size_t i = 0; i < files.size() && files[i]; i++) {
+ sum += files[i]->file_size;
+ }
+ return sum;
+}
+
+} // anonymous namespace
+
+CompactionPicker::CompactionPicker(const Options* options,
+ const InternalKeyComparator* icmp)
+ : compactions_in_progress_(options->num_levels),
+ options_(options),
+ num_levels_(options->num_levels),
+ icmp_(icmp) {
+ Init();
+}
+
+void CompactionPicker::ReduceNumberOfLevels(int new_levels) {
+ num_levels_ = new_levels;
+ Init();
+}
+
+void CompactionPicker::Init() {
+ max_file_size_.reset(new uint64_t[NumberLevels()]);
+ level_max_bytes_.reset(new uint64_t[NumberLevels()]);
+ int target_file_size_multiplier = options_->target_file_size_multiplier;
+ int max_bytes_multiplier = options_->max_bytes_for_level_multiplier;
+ for (int i = 0; i < NumberLevels(); i++) {
+ if (i == 0 && options_->compaction_style == kCompactionStyleUniversal) {
+ max_file_size_[i] = ULLONG_MAX;
+ level_max_bytes_[i] = options_->max_bytes_for_level_base;
+ } else if (i > 1) {
+ max_file_size_[i] = max_file_size_[i - 1] * target_file_size_multiplier;
+ level_max_bytes_[i] =
+ level_max_bytes_[i - 1] * max_bytes_multiplier *
+ options_->max_bytes_for_level_multiplier_additional[i - 1];
+ } else {
+ max_file_size_[i] = options_->target_file_size_base;
+ level_max_bytes_[i] = options_->max_bytes_for_level_base;
+ }
+ }
+}
+
+CompactionPicker::~CompactionPicker() {}
+
+void CompactionPicker::SizeBeingCompacted(std::vector<uint64_t>& sizes) {
+ for (int level = 0; level < NumberLevels() - 1; level++) {
+ uint64_t total = 0;
+ for (auto c : compactions_in_progress_[level]) {
+ assert(c->level() == level);
+ for (int i = 0; i < c->num_input_files(0); i++) {
+ total += c->input(0,i)->file_size;
+ }
+ }
+ sizes[level] = total;
+ }
+}
+
+// Clear all files to indicate that they are not being compacted
+// Delete this compaction from the list of running compactions.
+void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) {
+ c->MarkFilesBeingCompacted(false);
+ compactions_in_progress_[c->level()].erase(c);
+ if (!status.ok()) {
+ c->ResetNextCompactionIndex();
+ }
+}
+
+uint64_t CompactionPicker::MaxFileSizeForLevel(int level) const {
+ assert(level >= 0);
+ assert(level < NumberLevels());
+ return max_file_size_[level];
+}
+
+uint64_t CompactionPicker::MaxGrandParentOverlapBytes(int level) {
+ uint64_t result = MaxFileSizeForLevel(level);
+ result *= options_->max_grandparent_overlap_factor;
+ return result;
+}
+
+double CompactionPicker::MaxBytesForLevel(int level) {
+ // Note: the result for level zero is not really used since we set
+ // the level-0 compaction threshold based on number of files.
+ assert(level >= 0);
+ assert(level < NumberLevels());
+ return level_max_bytes_[level];
+}
+
+void CompactionPicker::GetRange(const std::vector<FileMetaData*>& inputs,
+ InternalKey* smallest, InternalKey* largest) {
+ assert(!inputs.empty());
+ smallest->Clear();
+ largest->Clear();
+ for (size_t i = 0; i < inputs.size(); i++) {
+ FileMetaData* f = inputs[i];
+ if (i == 0) {
+ *smallest = f->smallest;
+ *largest = f->largest;
+ } else {
+ if (icmp_->Compare(f->smallest, *smallest) < 0) {
+ *smallest = f->smallest;
+ }
+ if (icmp_->Compare(f->largest, *largest) > 0) {
+ *largest = f->largest;
+ }
+ }
+ }
+}
+
+void CompactionPicker::GetRange(const std::vector<FileMetaData*>& inputs1,
+ const std::vector<FileMetaData*>& inputs2,
+ InternalKey* smallest, InternalKey* largest) {
+ std::vector<FileMetaData*> all = inputs1;
+ all.insert(all.end(), inputs2.begin(), inputs2.end());
+ GetRange(all, smallest, largest);
+}
+
+// Add more files to the inputs on "level" to make sure that
+// no newer version of a key is compacted to "level+1" while leaving an older
+// version in a "level". Otherwise, any Get() will search "level" first,
+// and will likely return an old/stale value for the key, since it always
+// searches in increasing order of level to find the value. This could
+// also scramble the order of merge operands. This function should be
+// called any time a new Compaction is created, and its inputs_[0] are
+// populated.
+//
+// Will set c to nullptr if it is impossible to apply this compaction.
+void CompactionPicker::ExpandWhileOverlapping(Compaction* c) {
+ // If inputs are empty then there is nothing to expand.
+ if (!c || c->inputs_[0].empty()) {
+ return;
+ }
+
+ // GetOverlappingInputs will always do the right thing for level-0.
+ // So we don't need to do any expansion if level == 0.
+ if (c->level() == 0) {
+ return;
+ }
+
+ const int level = c->level();
+ InternalKey smallest, largest;
+
+ // Keep expanding c->inputs_[0] until we are sure that there is a
+ // "clean cut" boundary between the files in input and the surrounding files.
+ // This will ensure that no parts of a key are lost during compaction.
+ int hint_index = -1;
+ size_t old_size;
+ do {
+ old_size = c->inputs_[0].size();
+ GetRange(c->inputs_[0], &smallest, &largest);
+ c->inputs_[0].clear();
+ c->input_version_->GetOverlappingInputs(
+ level, &smallest, &largest, &c->inputs_[0], hint_index, &hint_index);
+ } while(c->inputs_[0].size() > old_size);
+
+ // Get the new range
+ GetRange(c->inputs_[0], &smallest, &largest);
+
+ // If, after the expansion, there are files that are already under
+ // compaction, then we must drop/cancel this compaction.
+ int parent_index = -1;
+ if (FilesInCompaction(c->inputs_[0]) ||
+ (c->level() != c->output_level() &&
+ ParentRangeInCompaction(c->input_version_, &smallest, &largest, level,
+ &parent_index))) {
+ c->inputs_[0].clear();
+ c->inputs_[1].clear();
+ delete c;
+ c = nullptr;
+ }
+}
+
+uint64_t CompactionPicker::ExpandedCompactionByteSizeLimit(int level) {
+ uint64_t result = MaxFileSizeForLevel(level);
+ result *= options_->expanded_compaction_factor;
+ return result;
+}
+
+// Returns true if any one of specified files are being compacted
+bool CompactionPicker::FilesInCompaction(std::vector<FileMetaData*>& files) {
+ for (unsigned int i = 0; i < files.size(); i++) {
+ if (files[i]->being_compacted) {
+ return true;
+ }
+ }
+ return false;
+}
+
+// Returns true if any one of the parent files are being compacted
+bool CompactionPicker::ParentRangeInCompaction(Version* version,
+ const InternalKey* smallest,
+ const InternalKey* largest,
+ int level, int* parent_index) {
+ std::vector<FileMetaData*> inputs;
+ assert(level + 1 < NumberLevels());
+
+ version->GetOverlappingInputs(level + 1, smallest, largest, &inputs,
+ *parent_index, parent_index);
+ return FilesInCompaction(inputs);
+}
+
+// Populates the set of inputs from "level+1" that overlap with "level".
+// Will also attempt to expand "level" if that doesn't expand "level+1"
+// or cause "level" to include a file for compaction that has an overlapping
+// user-key with another file.
+void CompactionPicker::SetupOtherInputs(Compaction* c) {
+ // If inputs are empty, then there is nothing to expand.
+ // If both input and output levels are the same, no need to consider
+ // files at level "level+1"
+ if (c->inputs_[0].empty() || c->level() == c->output_level()) {
+ return;
+ }
+
+ const int level = c->level();
+ InternalKey smallest, largest;
+
+ // Get the range one last time.
+ GetRange(c->inputs_[0], &smallest, &largest);
+
+ // Populate the set of next-level files (inputs_[1]) to include in compaction
+ c->input_version_->GetOverlappingInputs(level + 1, &smallest, &largest,
+ &c->inputs_[1], c->parent_index_,
+ &c->parent_index_);
+
+ // Get entire range covered by compaction
+ InternalKey all_start, all_limit;
+ GetRange(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
+
+ // See if we can further grow the number of inputs in "level" without
+ // changing the number of "level+1" files we pick up. We also choose NOT
+ // to expand if this would cause "level" to include some entries for some
+ // user key, while excluding other entries for the same user key. This
+ // can happen when one user key spans multiple files.
+ if (!c->inputs_[1].empty()) {
+ std::vector<FileMetaData*> expanded0;
+ c->input_version_->GetOverlappingInputs(
+ level, &all_start, &all_limit, &expanded0, c->base_index_, nullptr);
+ const uint64_t inputs0_size = TotalFileSize(c->inputs_[0]);
+ const uint64_t inputs1_size = TotalFileSize(c->inputs_[1]);
+ const uint64_t expanded0_size = TotalFileSize(expanded0);
+ uint64_t limit = ExpandedCompactionByteSizeLimit(level);
+ if (expanded0.size() > c->inputs_[0].size() &&
+ inputs1_size + expanded0_size < limit &&
+ !FilesInCompaction(expanded0) &&
+ !c->input_version_->HasOverlappingUserKey(&expanded0, level)) {
+ InternalKey new_start, new_limit;
+ GetRange(expanded0, &new_start, &new_limit);
+ std::vector<FileMetaData*> expanded1;
+ c->input_version_->GetOverlappingInputs(level + 1, &new_start, &new_limit,
+ &expanded1, c->parent_index_,
+ &c->parent_index_);
+ if (expanded1.size() == c->inputs_[1].size() &&
+ !FilesInCompaction(expanded1)) {
+ Log(options_->info_log,
+ "Expanding@%lu %lu+%lu (%lu+%lu bytes) to %lu+%lu (%lu+%lu bytes)"
+ "\n",
+ (unsigned long)level,
+ (unsigned long)(c->inputs_[0].size()),
+ (unsigned long)(c->inputs_[1].size()),
+ (unsigned long)inputs0_size,
+ (unsigned long)inputs1_size,
+ (unsigned long)(expanded0.size()),
+ (unsigned long)(expanded1.size()),
+ (unsigned long)expanded0_size,
+ (unsigned long)inputs1_size);
+ smallest = new_start;
+ largest = new_limit;
+ c->inputs_[0] = expanded0;
+ c->inputs_[1] = expanded1;
+ GetRange(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
+ }
+ }
+ }
+
+ // Compute the set of grandparent files that overlap this compaction
+ // (parent == level+1; grandparent == level+2)
+ if (level + 2 < NumberLevels()) {
+ c->input_version_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
+ &c->grandparents_);
+ }
+}
+
+
+Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
+ int output_level,
+ const InternalKey* begin,
+ const InternalKey* end,
+ InternalKey** compaction_end) {
+ std::vector<FileMetaData*> inputs;
+ bool covering_the_whole_range = true;
+
+ // All files are 'overlapping' in universal style compaction.
+ // We have to compact the entire range in one shot.
+ if (options_->compaction_style == kCompactionStyleUniversal) {
+ begin = nullptr;
+ end = nullptr;
+ }
+ version->GetOverlappingInputs(input_level, begin, end, &inputs);
+ if (inputs.empty()) {
+ return nullptr;
+ }
+
+ // Avoid compacting too much in one shot in case the range is large.
+ // But we cannot do this for level-0 since level-0 files can overlap
+ // and we must not pick one file and drop another older file if the
+ // two files overlap.
+ if (input_level > 0) {
+ const uint64_t limit =
+ MaxFileSizeForLevel(input_level) * options_->source_compaction_factor;
+ uint64_t total = 0;
+ for (size_t i = 0; i + 1 < inputs.size(); ++i) {
+ uint64_t s = inputs[i]->file_size;
+ total += s;
+ if (total >= limit) {
+ **compaction_end = inputs[i + 1]->smallest;
+ covering_the_whole_range = false;
+ inputs.resize(i + 1);
+ break;
+ }
+ }
+ }
+ Compaction* c = new Compaction(version, input_level, output_level,
+ MaxFileSizeForLevel(output_level),
+ MaxGrandParentOverlapBytes(input_level));
+
+ c->inputs_[0] = inputs;
+ ExpandWhileOverlapping(c);
+ if (c == nullptr) {
+ Log(options_->info_log, "Could not compact due to expansion failure.\n");
+ return nullptr;
+ }
+
+ SetupOtherInputs(c);
+
+ if (covering_the_whole_range) {
+ *compaction_end = nullptr;
+ }
+
+ // These files that are to be manaully compacted do not trample
+ // upon other files because manual compactions are processed when
+ // the system has a max of 1 background compaction thread.
+ c->MarkFilesBeingCompacted(true);
+
+ // Is this compaction creating a file at the bottommost level
+ c->SetupBottomMostLevel(true);
+ return c;
+}
+
+Compaction* LevelCompactionPicker::PickCompaction(Version* version) {
+ Compaction* c = nullptr;
+ int level = -1;
+
+ // Compute the compactions needed. It is better to do it here
+ // and also in LogAndApply(), otherwise the values could be stale.
+ std::vector<uint64_t> size_being_compacted(NumberLevels() - 1);
+ SizeBeingCompacted(size_being_compacted);
+ version->Finalize(size_being_compacted);
+
+ // We prefer compactions triggered by too much data in a level over
+ // the compactions triggered by seeks.
+ //
+ // Find the compactions by size on all levels.
+ for (int i = 0; i < NumberLevels() - 1; i++) {
+ assert(i == 0 ||
+ version->compaction_score_[i] <= version->compaction_score_[i - 1]);
+ level = version->compaction_level_[i];
+ if ((version->compaction_score_[i] >= 1)) {
+ c = PickCompactionBySize(version, level, version->compaction_score_[i]);
+ ExpandWhileOverlapping(c);
+ if (c != nullptr) {
+ break;
+ }
+ }
+ }
+
+ // Find compactions needed by seeks
+ FileMetaData* f = version->file_to_compact_;
+ if (c == nullptr && f != nullptr && !f->being_compacted) {
+
+ level = version->file_to_compact_level_;
+ int parent_index = -1;
+
+ // Only allow one level 0 compaction at a time.
+ // Do not pick this file if its parents at level+1 are being compacted.
+ if (level != 0 || compactions_in_progress_[0].empty()) {
+ if (!ParentRangeInCompaction(version, &f->smallest, &f->largest, level,
+ &parent_index)) {
+ c = new Compaction(version, level, level + 1,
+ MaxFileSizeForLevel(level + 1),
+ MaxGrandParentOverlapBytes(level), true);
+ c->inputs_[0].push_back(f);
+ c->parent_index_ = parent_index;
+ c->input_version_->file_to_compact_ = nullptr;
+ ExpandWhileOverlapping(c);
+ }
+ }
+ }
+
+ if (c == nullptr) {
+ return nullptr;
+ }
+
+ // Two level 0 compaction won't run at the same time, so don't need to worry
+ // about files on level 0 being compacted.
+ if (level == 0) {
+ assert(compactions_in_progress_[0].empty());
+ InternalKey smallest, largest;
+ GetRange(c->inputs_[0], &smallest, &largest);
+ // Note that the next call will discard the file we placed in
+ // c->inputs_[0] earlier and replace it with an overlapping set
+ // which will include the picked file.
+ c->inputs_[0].clear();
+ c->input_version_->GetOverlappingInputs(0, &smallest, &largest,
+ &c->inputs_[0]);
+
+ // If we include more L0 files in the same compaction run it can
+ // cause the 'smallest' and 'largest' key to get extended to a
+ // larger range. So, re-invoke GetRange to get the new key range
+ GetRange(c->inputs_[0], &smallest, &largest);
+ if (ParentRangeInCompaction(c->input_version_, &smallest, &largest, level,
+ &c->parent_index_)) {
+ delete c;
+ return nullptr;
+ }
+ assert(!c->inputs_[0].empty());
+ }
+
+ // Setup "level+1" files (inputs_[1])
+ SetupOtherInputs(c);
+
+ // mark all the files that are being compacted
+ c->MarkFilesBeingCompacted(true);
+
+ // Is this compaction creating a file at the bottommost level
+ c->SetupBottomMostLevel(false);
+
+ // remember this currently undergoing compaction
+ compactions_in_progress_[level].insert(c);
+
+ return c;
+}
+
+Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
+ int level,
+ double score) {
+ Compaction* c = nullptr;
+
+ // level 0 files are overlapping. So we cannot pick more
+ // than one concurrent compactions at this level. This
+ // could be made better by looking at key-ranges that are
+ // being compacted at level 0.
+ if (level == 0 && compactions_in_progress_[level].size() == 1) {
+ return nullptr;
+ }
+
+ assert(level >= 0);
+ assert(level + 1 < NumberLevels());
+ c = new Compaction(version, level, level + 1, MaxFileSizeForLevel(level + 1),
+ MaxGrandParentOverlapBytes(level));
+ c->score_ = score;
+
+ // Pick the largest file in this level that is not already
+ // being compacted
+ std::vector<int>& file_size = c->input_version_->files_by_size_[level];
+
+ // record the first file that is not yet compacted
+ int nextIndex = -1;
+
+ for (unsigned int i = c->input_version_->next_file_to_compact_by_size_[level];
+ i < file_size.size(); i++) {
+ int index = file_size[i];
+ FileMetaData* f = c->input_version_->files_[level][index];
+
+ // check to verify files are arranged in descending size
+ assert((i == file_size.size() - 1) ||
+ (i >= Version::number_of_files_to_sort_ - 1) ||
+ (f->file_size >=
+ c->input_version_->files_[level][file_size[i + 1]]->file_size));
+
+ // do not pick a file to compact if it is being compacted
+ // from n-1 level.
+ if (f->being_compacted) {
+ continue;
+ }
+
+ // remember the startIndex for the next call to PickCompaction
+ if (nextIndex == -1) {
+ nextIndex = i;
+ }
+
+ //if (i > Version::number_of_files_to_sort_) {
+ // Log(options_->info_log, "XXX Looking at index %d", i);
+ //}
+
+ // Do not pick this file if its parents at level+1 are being compacted.
+ // Maybe we can avoid redoing this work in SetupOtherInputs
+ int parent_index = -1;
+ if (ParentRangeInCompaction(c->input_version_, &f->smallest, &f->largest,
+ level, &parent_index)) {
+ continue;
+ }
+ c->inputs_[0].push_back(f);
+ c->base_index_ = index;
+ c->parent_index_ = parent_index;
+ break;
+ }
+
+ if (c->inputs_[0].empty()) {
+ delete c;
+ c = nullptr;
+ }
+
+ // store where to start the iteration in the next call to PickCompaction
+ c->input_version_->next_file_to_compact_by_size_[level] = nextIndex;
+
+ return c;
+}
+
+// Universal style of compaction. Pick files that are contiguous in
+// time-range to compact.
+//
+Compaction* UniversalCompactionPicker::PickCompaction(Version* version) {
+ int level = 0;
+ double score = version->compaction_score_[0];
+
+ if ((version->files_[level].size() <
+ (unsigned int)options_->level0_file_num_compaction_trigger)) {
+ Log(options_->info_log, "Universal: nothing to do\n");
+ return nullptr;
+ }
+ Version::FileSummaryStorage tmp;
+ Log(options_->info_log, "Universal: candidate files(%lu): %s\n",
+ version->files_[level].size(),
+ version->LevelFileSummary(&tmp, 0));
+
+ // Check for size amplification first.
+ Compaction* c = PickCompactionUniversalSizeAmp(version, score);
+ if (c == nullptr) {
+
+ // Size amplification is within limits. Try reducing read
+ // amplification while maintaining file size ratios.
+ unsigned int ratio = options_->compaction_options_universal.size_ratio;
+ c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX);
+
+ // Size amplification and file size ratios are within configured limits.
+ // If max read amplification is exceeding configured limits, then force
+ // compaction without looking at filesize ratios and try to reduce
+ // the number of files to fewer than level0_file_num_compaction_trigger.
+ if (c == nullptr) {
+ unsigned int num_files = version->files_[level].size() -
+ options_->level0_file_num_compaction_trigger;
+ c = PickCompactionUniversalReadAmp(version, score, UINT_MAX, num_files);
+ }
+ }
+ if (c == nullptr) {
+ return nullptr;
+ }
+ assert(c->inputs_[0].size() > 1);
+
+ // validate that all the chosen files are non overlapping in time
+ FileMetaData* newerfile __attribute__((unused)) = nullptr;
+ for (unsigned int i = 0; i < c->inputs_[0].size(); i++) {
+ FileMetaData* f = c->inputs_[0][i];
+ assert (f->smallest_seqno <= f->largest_seqno);
+ assert(newerfile == nullptr ||
+ newerfile->smallest_seqno > f->largest_seqno);
+ newerfile = f;
+ }
+
+ // The files are sorted from newest first to oldest last.
+ std::vector<int>& file_by_time = c->input_version_->files_by_size_[level];
+
+ // Is the earliest file part of this compaction?
+ int last_index = file_by_time[file_by_time.size()-1];
+ FileMetaData* last_file = c->input_version_->files_[level][last_index];
+ if (c->inputs_[0][c->inputs_[0].size()-1] == last_file) {
+ c->bottommost_level_ = true;
+ }
+
+ // update statistics
+ if (options_->statistics != nullptr) {
+ options_->statistics->measureTime(NUM_FILES_IN_SINGLE_COMPACTION,
+ c->inputs_[0].size());
+ }
+
+ // mark all the files that are being compacted
+ c->MarkFilesBeingCompacted(true);
+
+ // remember this currently undergoing compaction
+ compactions_in_progress_[level].insert(c);
+
+ // Record whether this compaction includes all sst files.
+ // For now, it is only relevant in universal compaction mode.
+ c->is_full_compaction_ =
+ (c->inputs_[0].size() == c->input_version_->files_[0].size());
+
+ return c;
+}
+
+//
+// Consider compaction files based on their size differences with
+// the next file in time order.
+//
+Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
+ Version* version, double score, unsigned int ratio,
+ unsigned int max_number_of_files_to_compact) {
+ int level = 0;
+
+ unsigned int min_merge_width =
+ options_->compaction_options_universal.min_merge_width;
+ unsigned int max_merge_width =
+ options_->compaction_options_universal.max_merge_width;
+
+ // The files are sorted from newest first to oldest last.
+ std::vector<int>& file_by_time = version->files_by_size_[level];
+ FileMetaData* f = nullptr;
+ bool done = false;
+ int start_index = 0;
+ unsigned int candidate_count;
+ assert(file_by_time.size() == version->files_[level].size());
+
+ unsigned int max_files_to_compact = std::min(max_merge_width,
+ max_number_of_files_to_compact);
+ min_merge_width = std::max(min_merge_width, 2U);
+
+ // Considers a candidate file only if it is smaller than the
+ // total size accumulated so far.
+ for (unsigned int loop = 0; loop < file_by_time.size(); loop++) {
+
+ candidate_count = 0;
+
+ // Skip files that are already being compacted
+ for (f = nullptr; loop < file_by_time.size(); loop++) {
+ int index = file_by_time[loop];
+ f = version->files_[level][index];
+
+ if (!f->being_compacted) {
+ candidate_count = 1;
+ break;
+ }
+ Log(options_->info_log,
+ "Universal: file %lu[%d] being compacted, skipping",
+ (unsigned long)f->number, loop);
+ f = nullptr;
+ }
+
+ // This file is not being compacted. Consider it as the
+ // first candidate to be compacted.
+ uint64_t candidate_size = f != nullptr? f->file_size : 0;
+ if (f != nullptr) {
+ Log(options_->info_log, "Universal: Possible candidate file %lu[%d].",
+ (unsigned long)f->number, loop);
+ }
+
+ // Check if the suceeding files need compaction.
+ for (unsigned int i = loop+1;
+ candidate_count < max_files_to_compact && i < file_by_time.size();
+ i++) {
+ int index = file_by_time[i];
+ FileMetaData* f = version->files_[level][index];
+ if (f->being_compacted) {
+ break;
+ }
+ // pick files if the total candidate file size (increased by the
+ // specified ratio) is still larger than the next candidate file.
+ uint64_t sz = (candidate_size * (100L + ratio)) /100;
+ if (sz < f->file_size) {
+ break;
+ }
+ candidate_count++;
+ candidate_size += f->file_size;
+ }
+
+ // Found a series of consecutive files that need compaction.
+ if (candidate_count >= (unsigned int)min_merge_width) {
+ start_index = loop;
+ done = true;
+ break;
+ } else {
+ for (unsigned int i = loop;
+ i < loop + candidate_count && i < file_by_time.size(); i++) {
+ int index = file_by_time[i];
+ FileMetaData* f = version->files_[level][index];
+ Log(options_->info_log,
+ "Universal: Skipping file %lu[%d] with size %lu %d\n",
+ (unsigned long)f->number,
+ i,
+ (unsigned long)f->file_size,
+ f->being_compacted);
+ }
+ }
+ }
+ if (!done || candidate_count <= 1) {
+ return nullptr;
+ }
+ unsigned int first_index_after = start_index + candidate_count;
+ // Compression is enabled if files compacted earlier already reached
+ // size ratio of compression.
+ bool enable_compression = true;
+ int ratio_to_compress =
+ options_->compaction_options_universal.compression_size_percent;
+ if (ratio_to_compress >= 0) {
+ uint64_t total_size = version->NumLevelBytes(level);
+ uint64_t older_file_size = 0;
+ for (unsigned int i = file_by_time.size() - 1; i >= first_index_after;
+ i--) {
+ older_file_size += version->files_[level][file_by_time[i]]->file_size;
+ if (older_file_size * 100L >= total_size * (long) ratio_to_compress) {
+ enable_compression = false;
+ break;
+ }
+ }
+ }
+ Compaction* c =
+ new Compaction(version, level, level, MaxFileSizeForLevel(level),
+ LLONG_MAX, false, enable_compression);
+ c->score_ = score;
+
+ for (unsigned int i = start_index; i < first_index_after; i++) {
+ int index = file_by_time[i];
+ FileMetaData* f = c->input_version_->files_[level][index];
+ c->inputs_[0].push_back(f);
+ Log(options_->info_log, "Universal: Picking file %lu[%d] with size %lu\n",
+ (unsigned long)f->number,
+ i,
+ (unsigned long)f->file_size);
+ }
+ return c;
+}
+
+// Look at overall size amplification. If size amplification
+// exceeeds the configured value, then do a compaction
+// of the candidate files all the way upto the earliest
+// base file (overrides configured values of file-size ratios,
+// min_merge_width and max_merge_width).
+//
+Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
+ Version* version, double score) {
+ int level = 0;
+
+ // percentage flexibilty while reducing size amplification
+ uint64_t ratio = options_->compaction_options_universal.
+ max_size_amplification_percent;
+
+ // The files are sorted from newest first to oldest last.
+ std::vector<int>& file_by_time = version->files_by_size_[level];
+ assert(file_by_time.size() == version->files_[level].size());
+
+ unsigned int candidate_count = 0;
+ uint64_t candidate_size = 0;
+ unsigned int start_index = 0;
+ FileMetaData* f = nullptr;
+
+ // Skip files that are already being compacted
+ for (unsigned int loop = 0; loop < file_by_time.size() - 1; loop++) {
+ int index = file_by_time[loop];
+ f = version->files_[level][index];
+ if (!f->being_compacted) {
+ start_index = loop; // Consider this as the first candidate.
+ break;
+ }
+ Log(options_->info_log, "Universal: skipping file %lu[%d] compacted %s",
+ (unsigned long)f->number,
+ loop,
+ " cannot be a candidate to reduce size amp.\n");
+ f = nullptr;
+ }
+ if (f == nullptr) {
+ return nullptr; // no candidate files
+ }
+
+ Log(options_->info_log, "Universal: First candidate file %lu[%d] %s",
+ (unsigned long)f->number,
+ start_index,
+ " to reduce size amp.\n");
+
+ // keep adding up all the remaining files
+ for (unsigned int loop = start_index; loop < file_by_time.size() - 1;
+ loop++) {
+ int index = file_by_time[loop];
+ f = version->files_[level][index];
+ if (f->being_compacted) {
+ Log(options_->info_log,
+ "Universal: Possible candidate file %lu[%d] %s.",
+ (unsigned long)f->number,
+ loop,
+ " is already being compacted. No size amp reduction possible.\n");
+ return nullptr;
+ }
+ candidate_size += f->file_size;
+ candidate_count++;
+ }
+ if (candidate_count == 0) {
+ return nullptr;
+ }
+
+ // size of earliest file
+ int index = file_by_time[file_by_time.size() - 1];
+ uint64_t earliest_file_size = version->files_[level][index]->file_size;
+
+ // size amplification = percentage of additional size
+ if (candidate_size * 100 < ratio * earliest_file_size) {
+ Log(options_->info_log,
+ "Universal: size amp not needed. newer-files-total-size %lu "
+ "earliest-file-size %lu",
+ (unsigned long)candidate_size,
+ (unsigned long)earliest_file_size);
+ return nullptr;
+ } else {
+ Log(options_->info_log,
+ "Universal: size amp needed. newer-files-total-size %lu "
+ "earliest-file-size %lu",
+ (unsigned long)candidate_size,
+ (unsigned long)earliest_file_size);
+ }
+ assert(start_index >= 0 && start_index < file_by_time.size() - 1);
+
+ // create a compaction request
+ // We always compact all the files, so always compress.
+ Compaction* c =
+ new Compaction(version, level, level, MaxFileSizeForLevel(level),
+ LLONG_MAX, false, true);
+ c->score_ = score;
+ for (unsigned int loop = start_index; loop < file_by_time.size(); loop++) {
+ int index = file_by_time[loop];
+ f = c->input_version_->files_[level][index];
+ c->inputs_[0].push_back(f);
+ Log(options_->info_log,
+ "Universal: size amp picking file %lu[%d] with size %lu",
+ (unsigned long)f->number,
+ index,
+ (unsigned long)f->file_size);
+ }
+ return c;
+}
+
+} // namespace rocksdb
--- /dev/null
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under the BSD-style license found in the
+// LICENSE file in the root directory of this source tree. An additional grant
+// of patent rights can be found in the PATENTS file in the same directory.
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include "db/version_set.h"
+#include "db/compaction.h"
+#include "rocksdb/status.h"
+#include "rocksdb/options.h"
+
+#include <vector>
+#include <memory>
+#include <set>
+
+namespace rocksdb {
+
+class Compaction;
+class Version;
+
+class CompactionPicker {
+ public:
+ CompactionPicker(const Options* options, const InternalKeyComparator* icmp);
+ virtual ~CompactionPicker();
+
+ // See VersionSet::ReduceNumberOfLevels()
+ void ReduceNumberOfLevels(int new_levels);
+
+ // Pick level and inputs for a new compaction.
+ // Returns nullptr if there is no compaction to be done.
+ // Otherwise returns a pointer to a heap-allocated object that
+ // describes the compaction. Caller should delete the result.
+ virtual Compaction* PickCompaction(Version* version) = 0;
+
+ // Return a compaction object for compacting the range [begin,end] in
+ // the specified level. Returns nullptr if there is nothing in that
+ // level that overlaps the specified range. Caller should delete
+ // the result.
+ //
+ // The returned Compaction might not include the whole requested range.
+ // In that case, compaction_end will be set to the next key that needs
+ // compacting. In case the compaction will compact the whole range,
+ // compaction_end will be set to nullptr.
+ // Client is responsible for compaction_end storage -- when called,
+ // *compaction_end should point to valid InternalKey!
+ Compaction* CompactRange(Version* version, int input_level, int output_level,
+ const InternalKey* begin, const InternalKey* end,
+ InternalKey** compaction_end);
+
+ // Free up the files that participated in a compaction
+ void ReleaseCompactionFiles(Compaction* c, Status status);
+
+ // Return the total amount of data that is undergoing
+ // compactions per level
+ void SizeBeingCompacted(std::vector<uint64_t>& sizes);
+
+ // Returns maximum total overlap bytes with grandparent
+ // level (i.e., level+2) before we stop building a single
+ // file in level->level+1 compaction.
+ uint64_t MaxGrandParentOverlapBytes(int level);
+
+ // Returns maximum total bytes of data on a given level.
+ double MaxBytesForLevel(int level);
+
+ // Get the max file size in a given level.
+ uint64_t MaxFileSizeForLevel(int level) const;
+
+ protected:
+ int NumberLevels() const { return num_levels_; }
+
+ // Stores the minimal range that covers all entries in inputs in
+ // *smallest, *largest.
+ // REQUIRES: inputs is not empty
+ void GetRange(const std::vector<FileMetaData*>& inputs, InternalKey* smallest,
+ InternalKey* largest);
+
+ // Stores the minimal range that covers all entries in inputs1 and inputs2
+ // in *smallest, *largest.
+ // REQUIRES: inputs is not empty
+ void GetRange(const std::vector<FileMetaData*>& inputs1,
+ const std::vector<FileMetaData*>& inputs2,
+ InternalKey* smallest, InternalKey* largest);
+
+ void ExpandWhileOverlapping(Compaction* c);
+
+ uint64_t ExpandedCompactionByteSizeLimit(int level);
+
+ // Returns true if any one of the specified files are being compacted
+ bool FilesInCompaction(std::vector<FileMetaData*>& files);
+
+ // Returns true if any one of the parent files are being compacted
+ bool ParentRangeInCompaction(Version* version, const InternalKey* smallest,
+ const InternalKey* largest, int level,
+ int* index);
+
+ void SetupOtherInputs(Compaction* c);
+
+ // record all the ongoing compactions for all levels
+ std::vector<std::set<Compaction*>> compactions_in_progress_;
+
+ // Per-level target file size.
+ std::unique_ptr<uint64_t[]> max_file_size_;
+
+ // Per-level max bytes
+ std::unique_ptr<uint64_t[]> level_max_bytes_;
+
+ const Options* const options_;
+ private:
+ void Init();
+
+ int num_levels_;
+
+ const InternalKeyComparator* const icmp_;
+};
+
+class UniversalCompactionPicker : public CompactionPicker {
+ public:
+ UniversalCompactionPicker(const Options* options,
+ const InternalKeyComparator* icmp)
+ : CompactionPicker(options, icmp) {}
+ virtual Compaction* PickCompaction(Version* version) override;
+
+ private:
+ // Pick Universal compaction to limit read amplification
+ Compaction* PickCompactionUniversalReadAmp(Version* version, double score,
+ unsigned int ratio,
+ unsigned int num_files);
+
+ // Pick Universal compaction to limit space amplification.
+ Compaction* PickCompactionUniversalSizeAmp(Version* version, double score);
+};
+
+class LevelCompactionPicker : public CompactionPicker {
+ public:
+ LevelCompactionPicker(const Options* options,
+ const InternalKeyComparator* icmp)
+ : CompactionPicker(options, icmp) {}
+ virtual Compaction* PickCompaction(Version* version) override;
+
+ private:
+ // For the specfied level, pick a compaction.
+ // Returns nullptr if there is no compaction to be done.
+ // If level is 0 and there is already a compaction on that level, this
+ // function will return nullptr.
+ Compaction* PickCompactionBySize(Version* version, int level, double score);
+};
+
+} // namespace rocksdb
}
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const uint64_t sum = TotalFileSize(overlaps);
- if (sum > vset_->MaxGrandParentOverlapBytes(level)) {
+ if (sum > vset_->compaction_picker_->MaxGrandParentOverlapBytes(level)) {
break;
}
level++;
dummy_versions_(this),
current_(nullptr),
need_slowdown_for_num_level0_files_(false),
- compactions_in_progress_(options_->num_levels),
current_version_number_(0),
manifest_file_size_(0),
storage_options_(storage_options),
storage_options_compactions_(storage_options_) {
compact_pointer_ = new std::string[options_->num_levels];
- Init(options_->num_levels);
+ if (options_->compaction_style == kCompactionStyleUniversal) {
+ compaction_picker_.reset(new UniversalCompactionPicker(options_, &icmp_));
+ } else {
+ compaction_picker_.reset(new LevelCompactionPicker(options_, &icmp_));
+ }
AppendVersion(new Version(this, current_version_number_++));
}
}
obsolete_files_.clear();
delete[] compact_pointer_;
- delete[] max_file_size_;
- delete[] level_max_bytes_;
-}
-
-void VersionSet::Init(int num_levels) {
- max_file_size_ = new uint64_t[num_levels];
- level_max_bytes_ = new uint64_t[num_levels];
- int target_file_size_multiplier = options_->target_file_size_multiplier;
- int max_bytes_multiplier = options_->max_bytes_for_level_multiplier;
- for (int i = 0; i < num_levels; i++) {
- if (i == 0 && options_->compaction_style == kCompactionStyleUniversal) {
- max_file_size_[i] = ULLONG_MAX;
- level_max_bytes_[i] = options_->max_bytes_for_level_base;
- } else if (i > 1) {
- max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier;
- level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier *
- options_->max_bytes_for_level_multiplier_additional[i-1];
- } else {
- max_file_size_[i] = options_->target_file_size_base;
- level_max_bytes_[i] = options_->max_bytes_for_level_base;
- }
- }
}
void VersionSet::AppendVersion(Version* v) {
{
// calculate the amount of data being compacted at every level
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
- SizeBeingCompacted(size_being_compacted);
+ compaction_picker_->SizeBeingCompacted(size_being_compacted);
mu->Unlock();
// Install recovered version
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
- SizeBeingCompacted(size_being_compacted);
+ compaction_picker_->SizeBeingCompacted(size_being_compacted);
v->Finalize(size_being_compacted);
manifest_file_size_ = manifest_file_size;
// Install recovered version
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
- SizeBeingCompacted(size_being_compacted);
+ compaction_picker_->SizeBeingCompacted(size_being_compacted);
v->Finalize(size_being_compacted);
AppendVersion(v);
}
}
-// Stores the minimal range that covers all entries in inputs in
-// *smallest, *largest.
-// REQUIRES: inputs is not empty
-void VersionSet::GetRange(const std::vector<FileMetaData*>& inputs,
- InternalKey* smallest,
- InternalKey* largest) {
- assert(!inputs.empty());
- smallest->Clear();
- largest->Clear();
- for (size_t i = 0; i < inputs.size(); i++) {
- FileMetaData* f = inputs[i];
- if (i == 0) {
- *smallest = f->smallest;
- *largest = f->largest;
- } else {
- if (icmp_.Compare(f->smallest, *smallest) < 0) {
- *smallest = f->smallest;
- }
- if (icmp_.Compare(f->largest, *largest) > 0) {
- *largest = f->largest;
- }
- }
- }
+Compaction* VersionSet::PickCompaction() {
+ return compaction_picker_->PickCompaction(current_);
}
-// Stores the minimal range that covers all entries in inputs1 and inputs2
-// in *smallest, *largest.
-// REQUIRES: inputs is not empty
-void VersionSet::GetRange2(const std::vector<FileMetaData*>& inputs1,
- const std::vector<FileMetaData*>& inputs2,
- InternalKey* smallest,
- InternalKey* largest) {
- std::vector<FileMetaData*> all = inputs1;
- all.insert(all.end(), inputs2.begin(), inputs2.end());
- GetRange(all, smallest, largest);
+Compaction* VersionSet::CompactRange(int input_level, int output_level,
+ const InternalKey* begin,
+ const InternalKey* end,
+ InternalKey** compaction_end) {
+ return compaction_picker_->CompactRange(current_, input_level, output_level,
+ begin, end, compaction_end);
}
Iterator* VersionSet::MakeInputIterator(Compaction* c) {
}
double VersionSet::MaxBytesForLevel(int level) {
- // Note: the result for level zero is not really used since we set
- // the level-0 compaction threshold based on number of files.
- assert(level >= 0);
- assert(level < NumberLevels());
- return level_max_bytes_[level];
+ return compaction_picker_->MaxBytesForLevel(level);
}
uint64_t VersionSet::MaxFileSizeForLevel(int level) {
- assert(level >= 0);
- assert(level < NumberLevels());
- return max_file_size_[level];
-}
-
-uint64_t VersionSet::ExpandedCompactionByteSizeLimit(int level) {
- uint64_t result = MaxFileSizeForLevel(level);
- result *= options_->expanded_compaction_factor;
- return result;
-}
-
-uint64_t VersionSet::MaxGrandParentOverlapBytes(int level) {
- uint64_t result = MaxFileSizeForLevel(level);
- result *= options_->max_grandparent_overlap_factor;
- return result;
+ return compaction_picker_->MaxFileSizeForLevel(level);
}
// verify that the files listed in this compaction are present
return true; // everything good
}
-// Clear all files to indicate that they are not being compacted
-// Delete this compaction from the list of running compactions.
void VersionSet::ReleaseCompactionFiles(Compaction* c, Status status) {
- c->MarkFilesBeingCompacted(false);
- compactions_in_progress_[c->level()].erase(c);
- if (!status.ok()) {
- c->ResetNextCompactionIndex();
- }
-}
-
-// The total size of files that are currently being compacted
-// at at every level upto the penultimate level.
-void VersionSet::SizeBeingCompacted(std::vector<uint64_t>& sizes) {
- for (int level = 0; level < NumberLevels() - 1; level++) {
- uint64_t total = 0;
- for (std::set<Compaction*>::iterator it =
- compactions_in_progress_[level].begin();
- it != compactions_in_progress_[level].end();
- ++it) {
- Compaction* c = (*it);
- assert(c->level() == level);
- for (int i = 0; i < c->num_input_files(0); i++) {
- total += c->input(0,i)->file_size;
- }
- }
- sizes[level] = total;
- }
-}
-
-//
-// Look at overall size amplification. If size amplification
-// exceeeds the configured value, then do a compaction
-// of the candidate files all the way upto the earliest
-// base file (overrides configured values of file-size ratios,
-// min_merge_width and max_merge_width).
-//
-Compaction* VersionSet::PickCompactionUniversalSizeAmp(int level,
- double score) {
- assert (level == 0);
-
- // percentage flexibilty while reducing size amplification
- uint64_t ratio = options_->compaction_options_universal.
- max_size_amplification_percent;
-
- // The files are sorted from newest first to oldest last.
- std::vector<int>& file_by_time = current_->files_by_size_[level];
- assert(file_by_time.size() == current_->files_[level].size());
-
- unsigned int candidate_count = 0;
- uint64_t candidate_size = 0;
- unsigned int start_index = 0;
- FileMetaData* f = nullptr;
-
- // Skip files that are already being compacted
- for (unsigned int loop = 0; loop < file_by_time.size() - 1; loop++) {
- int index = file_by_time[loop];
- f = current_->files_[level][index];
- if (!f->being_compacted) {
- start_index = loop; // Consider this as the first candidate.
- break;
- }
- Log(options_->info_log, "Universal: skipping file %lu[%d] compacted %s",
- (unsigned long)f->number,
- loop,
- " cannot be a candidate to reduce size amp.\n");
- f = nullptr;
- }
- if (f == nullptr) {
- return nullptr; // no candidate files
- }
-
- Log(options_->info_log, "Universal: First candidate file %lu[%d] %s",
- (unsigned long)f->number,
- start_index,
- " to reduce size amp.\n");
-
- // keep adding up all the remaining files
- for (unsigned int loop = start_index; loop < file_by_time.size() - 1;
- loop++) {
- int index = file_by_time[loop];
- f = current_->files_[level][index];
- if (f->being_compacted) {
- Log(options_->info_log,
- "Universal: Possible candidate file %lu[%d] %s.",
- (unsigned long)f->number,
- loop,
- " is already being compacted. No size amp reduction possible.\n");
- return nullptr;
- }
- candidate_size += f->file_size;
- candidate_count++;
- }
- if (candidate_count == 0) {
- return nullptr;
- }
-
- // size of earliest file
- int index = file_by_time[file_by_time.size() - 1];
- uint64_t earliest_file_size = current_->files_[level][index]->file_size;
-
- // size amplification = percentage of additional size
- if (candidate_size * 100 < ratio * earliest_file_size) {
- Log(options_->info_log,
- "Universal: size amp not needed. newer-files-total-size %lu "
- "earliest-file-size %lu",
- (unsigned long)candidate_size,
- (unsigned long)earliest_file_size);
- return nullptr;
- } else {
- Log(options_->info_log,
- "Universal: size amp needed. newer-files-total-size %lu "
- "earliest-file-size %lu",
- (unsigned long)candidate_size,
- (unsigned long)earliest_file_size);
- }
- assert(start_index >= 0 && start_index < file_by_time.size() - 1);
-
- // create a compaction request
- // We always compact all the files, so always compress.
- Compaction* c =
- new Compaction(current_, level, level, MaxFileSizeForLevel(level),
- LLONG_MAX, false, true);
- c->score_ = score;
- for (unsigned int loop = start_index; loop < file_by_time.size(); loop++) {
- int index = file_by_time[loop];
- f = c->input_version_->files_[level][index];
- c->inputs_[0].push_back(f);
- Log(options_->info_log,
- "Universal: size amp picking file %lu[%d] with size %lu",
- (unsigned long)f->number,
- index,
- (unsigned long)f->file_size);
- }
- return c;
-}
-
-//
-// Consider compaction files based on their size differences with
-// the next file in time order.
-//
-Compaction* VersionSet::PickCompactionUniversalReadAmp(
- int level, double score, unsigned int ratio,
- unsigned int max_number_of_files_to_compact) {
-
- unsigned int min_merge_width =
- options_->compaction_options_universal.min_merge_width;
- unsigned int max_merge_width =
- options_->compaction_options_universal.max_merge_width;
-
- // The files are sorted from newest first to oldest last.
- std::vector<int>& file_by_time = current_->files_by_size_[level];
- FileMetaData* f = nullptr;
- bool done = false;
- int start_index = 0;
- unsigned int candidate_count;
- assert(file_by_time.size() == current_->files_[level].size());
-
- unsigned int max_files_to_compact = std::min(max_merge_width,
- max_number_of_files_to_compact);
- min_merge_width = std::max(min_merge_width, 2U);
-
- // Considers a candidate file only if it is smaller than the
- // total size accumulated so far.
- for (unsigned int loop = 0; loop < file_by_time.size(); loop++) {
-
- candidate_count = 0;
-
- // Skip files that are already being compacted
- for (f = nullptr; loop < file_by_time.size(); loop++) {
- int index = file_by_time[loop];
- f = current_->files_[level][index];
-
- if (!f->being_compacted) {
- candidate_count = 1;
- break;
- }
- Log(options_->info_log,
- "Universal: file %lu[%d] being compacted, skipping",
- (unsigned long)f->number, loop);
- f = nullptr;
- }
-
- // This file is not being compacted. Consider it as the
- // first candidate to be compacted.
- uint64_t candidate_size = f != nullptr? f->file_size : 0;
- if (f != nullptr) {
- Log(options_->info_log, "Universal: Possible candidate file %lu[%d].",
- (unsigned long)f->number, loop);
- }
-
- // Check if the suceeding files need compaction.
- for (unsigned int i = loop+1;
- candidate_count < max_files_to_compact && i < file_by_time.size();
- i++) {
- int index = file_by_time[i];
- FileMetaData* f = current_->files_[level][index];
- if (f->being_compacted) {
- break;
- }
- // pick files if the total candidate file size (increased by the
- // specified ratio) is still larger than the next candidate file.
- uint64_t sz = (candidate_size * (100L + ratio)) /100;
- if (sz < f->file_size) {
- break;
- }
- candidate_count++;
- candidate_size += f->file_size;
- }
-
- // Found a series of consecutive files that need compaction.
- if (candidate_count >= (unsigned int)min_merge_width) {
- start_index = loop;
- done = true;
- break;
- } else {
- for (unsigned int i = loop;
- i < loop + candidate_count && i < file_by_time.size(); i++) {
- int index = file_by_time[i];
- FileMetaData* f = current_->files_[level][index];
- Log(options_->info_log,
- "Universal: Skipping file %lu[%d] with size %lu %d\n",
- (unsigned long)f->number,
- i,
- (unsigned long)f->file_size,
- f->being_compacted);
- }
- }
- }
- if (!done || candidate_count <= 1) {
- return nullptr;
- }
- unsigned int first_index_after = start_index + candidate_count;
- // Compression is enabled if files compacted earlier already reached
- // size ratio of compression.
- bool enable_compression = true;
- int ratio_to_compress =
- options_->compaction_options_universal.compression_size_percent;
- if (ratio_to_compress >= 0) {
- uint64_t total_size = TotalFileSize(current_->files_[level]);
- uint64_t older_file_size = 0;
- for (unsigned int i = file_by_time.size() - 1; i >= first_index_after;
- i--) {
- older_file_size += current_->files_[level][file_by_time[i]]->file_size;
- if (older_file_size * 100L >= total_size * (long) ratio_to_compress) {
- enable_compression = false;
- break;
- }
- }
- }
- Compaction* c =
- new Compaction(current_, level, level, MaxFileSizeForLevel(level),
- LLONG_MAX, false, enable_compression);
- c->score_ = score;
-
- for (unsigned int i = start_index; i < first_index_after; i++) {
- int index = file_by_time[i];
- FileMetaData* f = c->input_version_->files_[level][index];
- c->inputs_[0].push_back(f);
- Log(options_->info_log, "Universal: Picking file %lu[%d] with size %lu\n",
- (unsigned long)f->number,
- i,
- (unsigned long)f->file_size);
- }
- return c;
-}
-
-//
-// Universal style of compaction. Pick files that are contiguous in
-// time-range to compact.
-//
-Compaction* VersionSet::PickCompactionUniversal(int level, double score) {
- assert (level == 0);
-
- if ((current_->files_[level].size() <
- (unsigned int)options_->level0_file_num_compaction_trigger)) {
- Log(options_->info_log, "Universal: nothing to do\n");
- return nullptr;
- }
- Version::FileSummaryStorage tmp;
- Log(options_->info_log, "Universal: candidate files(%lu): %s\n",
- current_->files_[level].size(),
- current_->LevelFileSummary(&tmp, 0));
-
- // Check for size amplification first.
- Compaction* c = PickCompactionUniversalSizeAmp(level, score);
- if (c == nullptr) {
-
- // Size amplification is within limits. Try reducing read
- // amplification while maintaining file size ratios.
- unsigned int ratio = options_->compaction_options_universal.size_ratio;
- c = PickCompactionUniversalReadAmp(level, score, ratio, UINT_MAX);
-
- // Size amplification and file size ratios are within configured limits.
- // If max read amplification is exceeding configured limits, then force
- // compaction without looking at filesize ratios and try to reduce
- // the number of files to fewer than level0_file_num_compaction_trigger.
- if (c == nullptr) {
- unsigned int num_files = current_->files_[level].size() -
- options_->level0_file_num_compaction_trigger;
- c = PickCompactionUniversalReadAmp(level, score, UINT_MAX, num_files);
- }
- }
- if (c == nullptr) {
- return nullptr;
- }
- assert(c->inputs_[0].size() > 1);
-
- // validate that all the chosen files are non overlapping in time
- FileMetaData* newerfile __attribute__((unused)) = nullptr;
- for (unsigned int i = 0; i < c->inputs_[0].size(); i++) {
- FileMetaData* f = c->inputs_[0][i];
- assert (f->smallest_seqno <= f->largest_seqno);
- assert(newerfile == nullptr ||
- newerfile->smallest_seqno > f->largest_seqno);
- newerfile = f;
- }
-
- // The files are sorted from newest first to oldest last.
- std::vector<int>& file_by_time = c->input_version_->files_by_size_[level];
-
- // Is the earliest file part of this compaction?
- int last_index = file_by_time[file_by_time.size()-1];
- FileMetaData* last_file = c->input_version_->files_[level][last_index];
- if (c->inputs_[0][c->inputs_[0].size()-1] == last_file) {
- c->bottommost_level_ = true;
- }
-
- // update statistics
- if (options_->statistics != nullptr) {
- options_->statistics->measureTime(NUM_FILES_IN_SINGLE_COMPACTION,
- c->inputs_[0].size());
- }
-
- // mark all the files that are being compacted
- c->MarkFilesBeingCompacted(true);
-
- // remember this currently undergoing compaction
- compactions_in_progress_[level].insert(c);
-
- // Record whether this compaction includes all sst files.
- // For now, it is only relevant in universal compaction mode.
- c->is_full_compaction_ =
- (c->inputs_[0].size() == c->input_version_->files_[0].size());
-
- return c;
-}
-
-Compaction* VersionSet::PickCompactionBySize(int level, double score) {
- Compaction* c = nullptr;
-
- // level 0 files are overlapping. So we cannot pick more
- // than one concurrent compactions at this level. This
- // could be made better by looking at key-ranges that are
- // being compacted at level 0.
- if (level == 0 && compactions_in_progress_[level].size() == 1) {
- return nullptr;
- }
-
- assert(level >= 0);
- assert(level + 1 < current_->NumberLevels());
- c = new Compaction(current_, level, level + 1, MaxFileSizeForLevel(level + 1),
- MaxGrandParentOverlapBytes(level));
- c->score_ = score;
-
- // Pick the largest file in this level that is not already
- // being compacted
- std::vector<int>& file_size = c->input_version_->files_by_size_[level];
-
- // record the first file that is not yet compacted
- int nextIndex = -1;
-
- for (unsigned int i = c->input_version_->next_file_to_compact_by_size_[level];
- i < file_size.size(); i++) {
- int index = file_size[i];
- FileMetaData* f = c->input_version_->files_[level][index];
-
- // check to verify files are arranged in descending size
- assert((i == file_size.size() - 1) ||
- (i >= Version::number_of_files_to_sort_ - 1) ||
- (f->file_size >=
- c->input_version_->files_[level][file_size[i + 1]]->file_size));
-
- // do not pick a file to compact if it is being compacted
- // from n-1 level.
- if (f->being_compacted) {
- continue;
- }
-
- // remember the startIndex for the next call to PickCompaction
- if (nextIndex == -1) {
- nextIndex = i;
- }
-
- //if (i > Version::number_of_files_to_sort_) {
- // Log(options_->info_log, "XXX Looking at index %d", i);
- //}
-
- // Do not pick this file if its parents at level+1 are being compacted.
- // Maybe we can avoid redoing this work in SetupOtherInputs
- int parent_index = -1;
- if (ParentRangeInCompaction(&f->smallest, &f->largest, level,
- &parent_index)) {
- continue;
- }
- c->inputs_[0].push_back(f);
- c->base_index_ = index;
- c->parent_index_ = parent_index;
- break;
- }
-
- if (c->inputs_[0].empty()) {
- delete c;
- c = nullptr;
- }
-
- // store where to start the iteration in the next call to PickCompaction
- c->input_version_->next_file_to_compact_by_size_[level] = nextIndex;
-
- return c;
-}
-
-Compaction* VersionSet::PickCompaction() {
- Compaction* c = nullptr;
- int level = -1;
-
- // Compute the compactions needed. It is better to do it here
- // and also in LogAndApply(), otherwise the values could be stale.
- std::vector<uint64_t> size_being_compacted(NumberLevels()-1);
- current_->vset_->SizeBeingCompacted(size_being_compacted);
- current_->Finalize(size_being_compacted);
-
- // In universal style of compaction, compact L0 files back into L0.
- if (options_->compaction_style == kCompactionStyleUniversal) {
- int level = 0;
- c = PickCompactionUniversal(level, current_->compaction_score_[level]);
- return c;
- }
-
- // We prefer compactions triggered by too much data in a level over
- // the compactions triggered by seeks.
- //
- // Find the compactions by size on all levels.
- for (int i = 0; i < NumberLevels()-1; i++) {
- assert(i == 0 || current_->compaction_score_[i] <=
- current_->compaction_score_[i-1]);
- level = current_->compaction_level_[i];
- if ((current_->compaction_score_[i] >= 1)) {
- c = PickCompactionBySize(level, current_->compaction_score_[i]);
- ExpandWhileOverlapping(c);
- if (c != nullptr) {
- break;
- }
- }
- }
-
- // Find compactions needed by seeks
- FileMetaData* f = current_->file_to_compact_;
- if (c == nullptr && f != nullptr && !f->being_compacted) {
-
- level = current_->file_to_compact_level_;
- int parent_index = -1;
-
- // Only allow one level 0 compaction at a time.
- // Do not pick this file if its parents at level+1 are being compacted.
- if (level != 0 || compactions_in_progress_[0].empty()) {
- if(!ParentRangeInCompaction(&f->smallest, &f->largest, level,
- &parent_index)) {
- c = new Compaction(current_, level, level + 1,
- MaxFileSizeForLevel(level + 1),
- MaxGrandParentOverlapBytes(level), true);
- c->inputs_[0].push_back(f);
- c->parent_index_ = parent_index;
- c->input_version_->file_to_compact_ = nullptr;
- ExpandWhileOverlapping(c);
- }
- }
- }
-
- if (c == nullptr) {
- return nullptr;
- }
-
- // Two level 0 compaction won't run at the same time, so don't need to worry
- // about files on level 0 being compacted.
- if (level == 0) {
- assert(compactions_in_progress_[0].empty());
- InternalKey smallest, largest;
- GetRange(c->inputs_[0], &smallest, &largest);
- // Note that the next call will discard the file we placed in
- // c->inputs_[0] earlier and replace it with an overlapping set
- // which will include the picked file.
- c->inputs_[0].clear();
- c->input_version_->GetOverlappingInputs(0, &smallest, &largest,
- &c->inputs_[0]);
-
- // If we include more L0 files in the same compaction run it can
- // cause the 'smallest' and 'largest' key to get extended to a
- // larger range. So, re-invoke GetRange to get the new key range
- GetRange(c->inputs_[0], &smallest, &largest);
- if (ParentRangeInCompaction(&smallest, &largest,
- level, &c->parent_index_)) {
- delete c;
- return nullptr;
- }
- assert(!c->inputs_[0].empty());
- }
-
- // Setup "level+1" files (inputs_[1])
- SetupOtherInputs(c);
-
- // mark all the files that are being compacted
- c->MarkFilesBeingCompacted(true);
-
- // Is this compaction creating a file at the bottommost level
- c->SetupBottomMostLevel(false);
-
- // remember this currently undergoing compaction
- compactions_in_progress_[level].insert(c);
-
- return c;
-}
-
-// Returns true if any one of the parent files are being compacted
-bool VersionSet::ParentRangeInCompaction(const InternalKey* smallest,
- const InternalKey* largest, int level,
- int* parent_index) {
- std::vector<FileMetaData*> inputs;
- assert(level + 1 < current_->NumberLevels());
-
- current_->GetOverlappingInputs(level + 1, smallest, largest, &inputs,
- *parent_index, parent_index);
- return FilesInCompaction(inputs);
-}
-
-// Returns true if any one of specified files are being compacted
-bool VersionSet::FilesInCompaction(std::vector<FileMetaData*>& files) {
- for (unsigned int i = 0; i < files.size(); i++) {
- if (files[i]->being_compacted) {
- return true;
- }
- }
- return false;
-}
-
-// Add more files to the inputs on "level" to make sure that
-// no newer version of a key is compacted to "level+1" while leaving an older
-// version in a "level". Otherwise, any Get() will search "level" first,
-// and will likely return an old/stale value for the key, since it always
-// searches in increasing order of level to find the value. This could
-// also scramble the order of merge operands. This function should be
-// called any time a new Compaction is created, and its inputs_[0] are
-// populated.
-//
-// Will set c to nullptr if it is impossible to apply this compaction.
-void VersionSet::ExpandWhileOverlapping(Compaction* c) {
- // If inputs are empty then there is nothing to expand.
- if (!c || c->inputs_[0].empty()) {
- return;
- }
-
- // GetOverlappingInputs will always do the right thing for level-0.
- // So we don't need to do any expansion if level == 0.
- if (c->level() == 0) {
- return;
- }
-
- const int level = c->level();
- InternalKey smallest, largest;
-
- // Keep expanding c->inputs_[0] until we are sure that there is a
- // "clean cut" boundary between the files in input and the surrounding files.
- // This will ensure that no parts of a key are lost during compaction.
- int hint_index = -1;
- size_t old_size;
- do {
- old_size = c->inputs_[0].size();
- GetRange(c->inputs_[0], &smallest, &largest);
- c->inputs_[0].clear();
- c->input_version_->GetOverlappingInputs(
- level, &smallest, &largest, &c->inputs_[0], hint_index, &hint_index);
- } while(c->inputs_[0].size() > old_size);
-
- // Get the new range
- GetRange(c->inputs_[0], &smallest, &largest);
-
- // If, after the expansion, there are files that are already under
- // compaction, then we must drop/cancel this compaction.
- int parent_index = -1;
- if (FilesInCompaction(c->inputs_[0]) ||
- (c->level() != c->output_level() &&
- ParentRangeInCompaction(&smallest, &largest, level, &parent_index))) {
- c->inputs_[0].clear();
- c->inputs_[1].clear();
- delete c;
- c = nullptr;
- }
-}
-
-// Populates the set of inputs from "level+1" that overlap with "level".
-// Will also attempt to expand "level" if that doesn't expand "level+1"
-// or cause "level" to include a file for compaction that has an overlapping
-// user-key with another file.
-void VersionSet::SetupOtherInputs(Compaction* c) {
- // If inputs are empty, then there is nothing to expand.
- // If both input and output levels are the same, no need to consider
- // files at level "level+1"
- if (c->inputs_[0].empty() || c->level() == c->output_level()) {
- return;
- }
-
- const int level = c->level();
- InternalKey smallest, largest;
-
- // Get the range one last time.
- GetRange(c->inputs_[0], &smallest, &largest);
-
- // Populate the set of next-level files (inputs_[1]) to include in compaction
- c->input_version_->GetOverlappingInputs(level + 1, &smallest, &largest,
- &c->inputs_[1], c->parent_index_,
- &c->parent_index_);
-
- // Get entire range covered by compaction
- InternalKey all_start, all_limit;
- GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
-
- // See if we can further grow the number of inputs in "level" without
- // changing the number of "level+1" files we pick up. We also choose NOT
- // to expand if this would cause "level" to include some entries for some
- // user key, while excluding other entries for the same user key. This
- // can happen when one user key spans multiple files.
- if (!c->inputs_[1].empty()) {
- std::vector<FileMetaData*> expanded0;
- c->input_version_->GetOverlappingInputs(
- level, &all_start, &all_limit, &expanded0, c->base_index_, nullptr);
- const uint64_t inputs0_size = TotalFileSize(c->inputs_[0]);
- const uint64_t inputs1_size = TotalFileSize(c->inputs_[1]);
- const uint64_t expanded0_size = TotalFileSize(expanded0);
- uint64_t limit = ExpandedCompactionByteSizeLimit(level);
- if (expanded0.size() > c->inputs_[0].size() &&
- inputs1_size + expanded0_size < limit &&
- !FilesInCompaction(expanded0) &&
- !c->input_version_->HasOverlappingUserKey(&expanded0, level)) {
- InternalKey new_start, new_limit;
- GetRange(expanded0, &new_start, &new_limit);
- std::vector<FileMetaData*> expanded1;
- c->input_version_->GetOverlappingInputs(level + 1, &new_start, &new_limit,
- &expanded1, c->parent_index_,
- &c->parent_index_);
- if (expanded1.size() == c->inputs_[1].size() &&
- !FilesInCompaction(expanded1)) {
- Log(options_->info_log,
- "Expanding@%lu %lu+%lu (%lu+%lu bytes) to %lu+%lu (%lu+%lu bytes)"
- "\n",
- (unsigned long)level,
- (unsigned long)(c->inputs_[0].size()),
- (unsigned long)(c->inputs_[1].size()),
- (unsigned long)inputs0_size,
- (unsigned long)inputs1_size,
- (unsigned long)(expanded0.size()),
- (unsigned long)(expanded1.size()),
- (unsigned long)expanded0_size,
- (unsigned long)inputs1_size);
- smallest = new_start;
- largest = new_limit;
- c->inputs_[0] = expanded0;
- c->inputs_[1] = expanded1;
- GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
- }
- }
- }
-
- // Compute the set of grandparent files that overlap this compaction
- // (parent == level+1; grandparent == level+2)
- if (level + 2 < NumberLevels()) {
- c->input_version_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
- &c->grandparents_);
- }
-
- if (false) {
- Log(options_->info_log, "Compacting %d '%s' .. '%s'",
- level,
- smallest.DebugString().c_str(),
- largest.DebugString().c_str());
- }
-
- // Update the place where we will do the next compaction for this level.
- // We update this immediately instead of waiting for the VersionEdit
- // to be applied so that if the compaction fails, we will try a different
- // key range next time.
- compact_pointer_[level] = largest.Encode().ToString();
- c->edit_->SetCompactPointer(level, largest);
+ compaction_picker_->ReleaseCompactionFiles(c, status);
}
Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
obsolete_files_.clear();
}
-Compaction* VersionSet::CompactRange(int input_level,
- int output_level,
- const InternalKey* begin,
- const InternalKey* end,
- InternalKey** compaction_end) {
- std::vector<FileMetaData*> inputs;
- bool covering_the_whole_range = true;
-
- // All files are 'overlapping' in universal style compaction.
- // We have to compact the entire range in one shot.
- if (options_->compaction_style == kCompactionStyleUniversal) {
- begin = nullptr;
- end = nullptr;
- }
- current_->GetOverlappingInputs(input_level, begin, end, &inputs);
- if (inputs.empty()) {
- return nullptr;
- }
-
- // Avoid compacting too much in one shot in case the range is large.
- // But we cannot do this for level-0 since level-0 files can overlap
- // and we must not pick one file and drop another older file if the
- // two files overlap.
- if (input_level > 0) {
- const uint64_t limit =
- MaxFileSizeForLevel(input_level) * options_->source_compaction_factor;
- uint64_t total = 0;
- for (size_t i = 0; i + 1 < inputs.size(); ++i) {
- uint64_t s = inputs[i]->file_size;
- total += s;
- if (total >= limit) {
- **compaction_end = inputs[i + 1]->smallest;
- covering_the_whole_range = false;
- inputs.resize(i + 1);
- break;
- }
- }
- }
- Compaction* c = new Compaction(current_, input_level, output_level,
- MaxFileSizeForLevel(output_level),
- MaxGrandParentOverlapBytes(input_level));
-
- c->inputs_[0] = inputs;
- ExpandWhileOverlapping(c);
- if (c == nullptr) {
- Log(options_->info_log, "Could not compact due to expansion failure.\n");
- return nullptr;
- }
-
- SetupOtherInputs(c);
-
- if (covering_the_whole_range) {
- *compaction_end = nullptr;
- }
-
- // These files that are to be manaully compacted do not trample
- // upon other files because manual compactions are processed when
- // the system has a max of 1 background compaction thread.
- c->MarkFilesBeingCompacted(true);
-
- // Is this compaction creating a file at the bottommost level
- c->SetupBottomMostLevel(true);
- return c;
-}
-
} // namespace rocksdb
#include "port/port.h"
#include "db/table_cache.h"
#include "db/compaction.h"
+#include "db/compaction_picker.h"
namespace rocksdb {
namespace log { class Writer; }
class Compaction;
+class CompactionPicker;
class Iterator;
class MemTable;
class TableCache;
friend class Compaction;
friend class VersionSet;
friend class DBImpl;
+ friend class CompactionPicker;
+ friend class LevelCompactionPicker;
+ friend class UniversalCompactionPicker;
class LevelFileNumIterator;
Iterator* NewConcatenatingIterator(const ReadOptions&,
// Return the size of the current manifest file
uint64_t ManifestFileSize() const { return manifest_file_size_; }
- // For the specfied level, pick a compaction.
- // Returns nullptr if there is no compaction to be done.
- // If level is 0 and there is already a compaction on that level, this
- // function will return nullptr.
- Compaction* PickCompactionBySize(int level, double score);
-
- // Pick files to compact in Universal mode
- Compaction* PickCompactionUniversal(int level, double score);
-
- // Pick Universal compaction to limit read amplification
- Compaction* PickCompactionUniversalReadAmp(int level, double score,
- unsigned int ratio, unsigned int num_files);
-
- // Pick Universal compaction to limit space amplification.
- Compaction* PickCompactionUniversalSizeAmp(int level, double score);
-
- // Free up the files that were participated in a compaction
- void ReleaseCompactionFiles(Compaction* c, Status status);
-
// verify that the files that we started with for a compaction
// still exist in the current version and in the same original level.
// This ensures that a concurrent compaction did not erroneously
// pick the same files to compact.
bool VerifyCompactionFileConsistency(Compaction* c);
+ double MaxBytesForLevel(int level);
+
// Get the max file size in a given level.
uint64_t MaxFileSizeForLevel(int level);
- double MaxBytesForLevel(int level);
+ void ReleaseCompactionFiles(Compaction* c, Status status);
Status GetMetadataForFile(
uint64_t number, int *filelevel, FileMetaData *metadata);
friend class Compaction;
friend class Version;
- void Init(int num_levels);
-
- void GetRange(const std::vector<FileMetaData*>& inputs,
- InternalKey* smallest,
- InternalKey* largest);
-
- void GetRange2(const std::vector<FileMetaData*>& inputs1,
- const std::vector<FileMetaData*>& inputs2,
- InternalKey* smallest,
- InternalKey* largest);
-
- void ExpandWhileOverlapping(Compaction* c);
-
- void SetupOtherInputs(Compaction* c);
-
// Save current contents to *log
Status WriteSnapshot(log::Writer* log);
bool ManifestContains(const std::string& record) const;
- uint64_t ExpandedCompactionByteSizeLimit(int level);
-
- uint64_t MaxGrandParentOverlapBytes(int level);
-
Env* const env_;
const std::string dbname_;
const Options* const options_;
// Either an empty string, or a valid InternalKey.
std::string* compact_pointer_;
- // Per-level target file size.
- uint64_t* max_file_size_;
-
- // Per-level max bytes
- uint64_t* level_max_bytes_;
-
- // record all the ongoing compactions for all levels
- std::vector<std::set<Compaction*> > compactions_in_progress_;
+ // An object that keeps all the compaction stats
+ // and picks the next compaction
+ std::unique_ptr<CompactionPicker> compaction_picker_;
// generates a increasing version number for every new version
uint64_t current_version_number_;
VersionSet(const VersionSet&);
void operator=(const VersionSet&);
- // Return the total amount of data that is undergoing
- // compactions per level
- void SizeBeingCompacted(std::vector<uint64_t>&);
-
- // Returns true if any one of the parent files are being compacted
- bool ParentRangeInCompaction(const InternalKey* smallest,
- const InternalKey* largest, int level, int* index);
-
- // Returns true if any one of the specified files are being compacted
- bool FilesInCompaction(std::vector<FileMetaData*>& files);
-
void LogAndApplyHelper(Builder*b, Version* v,
VersionEdit* edit, port::Mutex* mu);
};
current_version->num_levels_ = new_levels;
delete[] compact_pointer_;
- delete[] max_file_size_;
- delete[] level_max_bytes_;
num_levels_ = new_levels;
compact_pointer_ = new std::string[new_levels];
- Init(new_levels);
+ compaction_picker_->ReduceNumberOfLevels(new_levels);
VersionEdit ve;
st = LogAndApply(&ve, mu, true);
return st;