### New Features
* Hash index for block-based table will be materialized and reconstructed more efficiently. Previously hash index is constructed by scanning the whole table during every table open.
+* FIFO compaction style
## 3.0.0 (05/05/2014)
#include <vector>
#include <string>
#include <algorithm>
+#include <limits>
#include "db/db_impl.h"
#include "db/version_set.h"
collector_factories.push_back(
std::make_shared<InternalKeyPropertiesCollectorFactory>());
+ if (result.compaction_style == kCompactionStyleFIFO) {
+ result.num_levels = 1;
+ // since we delete level0 files in FIFO compaction when there are too many
+ // of them, these options don't really mean anything
+ result.level0_file_num_compaction_trigger = std::numeric_limits<int>::max();
+ result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
+ result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
+ }
+
return result;
}
options_(*db_options, SanitizeOptions(&internal_comparator_,
&internal_filter_policy_, options)),
mem_(nullptr),
- imm_(options.min_write_buffer_number_to_merge),
+ imm_(options_.min_write_buffer_number_to_merge),
super_version_(nullptr),
super_version_number_(0),
local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
// if dummy_versions is nullptr, then this is a dummy column family.
if (dummy_versions != nullptr) {
- internal_stats_.reset(new InternalStats(options.num_levels, db_options->env,
- db_options->statistics.get()));
+ internal_stats_.reset(new InternalStats(
+ options_.num_levels, db_options->env, db_options->statistics.get()));
table_cache_.reset(
new TableCache(dbname, &options_, storage_options, table_cache));
if (options_.compaction_style == kCompactionStyleUniversal) {
compaction_picker_.reset(
new UniversalCompactionPicker(&options_, &internal_comparator_));
- } else {
+ } else if (options_.compaction_style == kCompactionStyleLevel) {
compaction_picker_.reset(
new LevelCompactionPicker(&options_, &internal_comparator_));
+ } else {
+ assert(options_.compaction_style == kCompactionStyleFIFO);
+ compaction_picker_.reset(
+ new FIFOCompactionPicker(&options_, &internal_comparator_));
}
Log(options_.info_log, "Options for column family \"%s\":\n",
Compaction::Compaction(Version* input_version, int level, int out_level,
uint64_t target_file_size,
uint64_t max_grandparent_overlap_bytes,
- bool seek_compaction, bool enable_compression)
+ bool seek_compaction, bool enable_compression,
+ bool deletion_compaction)
: level_(level),
out_level_(out_level),
max_output_file_size_(target_file_size),
cfd_(input_version_->cfd_),
seek_compaction_(seek_compaction),
enable_compression_(enable_compression),
+ deletion_compaction_(deletion_compaction),
grandparent_index_(0),
seen_key_(false),
overlapped_bytes_(0),
TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_);
}
+bool Compaction::IsDeletionCompaction() const { return deletion_compaction_; }
+
void Compaction::AddInputDeletions(VersionEdit* edit) {
for (int which = 0; which < 2; which++) {
for (size_t i = 0; i < inputs_[which].size(); i++) {
}
bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
+ assert(cfd_->options()->compaction_style != kCompactionStyleFIFO);
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
return bottommost_level_;
}
// Is this compaction producing files at the bottommost level?
void Compaction::SetupBottomMostLevel(bool isManual) {
+ assert(cfd_->options()->compaction_style != kCompactionStyleFIFO);
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
// If universal compaction style is used and manual
// compaction is occuring, then we are guaranteed that
// moving a single input file to the next level (no merging or splitting)
bool IsTrivialMove() const;
+ // If true, just delete all files in inputs_[0]
+ bool IsDeletionCompaction() const;
+
// Add all inputs to this compaction as delete operations to *edit.
void AddInputDeletions(VersionEdit* edit);
private:
friend class CompactionPicker;
friend class UniversalCompactionPicker;
+ friend class FIFOCompactionPicker;
friend class LevelCompactionPicker;
Compaction(Version* input_version, int level, int out_level,
uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes,
- bool seek_compaction = false, bool enable_compression = true);
+ bool seek_compaction = false, bool enable_compression = true,
+ bool deletion_compaction = false);
int level_;
int out_level_; // levels to which output files are stored
bool seek_compaction_;
bool enable_compression_;
+ // if true, just delete files in inputs_[0]
+ bool deletion_compaction_;
// Each compaction reads inputs from "level_" and "level_+1"
std::vector<FileMetaData*> inputs_[2]; // The two sets of inputs
#include "db/compaction_picker.h"
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
#include <limits>
#include "util/log_buffer.h"
#include "util/statistics.h"
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) {
+ // CompactionPickerFIFO has its own implementation of compact range
+ assert(options_->compaction_style != kCompactionStyleFIFO);
+
std::vector<FileMetaData*> inputs;
bool covering_the_whole_range = true;
return c;
}
+Compaction* FIFOCompactionPicker::PickCompaction(Version* version,
+ LogBuffer* log_buffer) {
+ assert(version->NumberLevels() == 1);
+ uint64_t total_size = 0;
+ for (const auto& file : version->files_[0]) {
+ total_size += file->file_size;
+ }
+
+ if (total_size <= options_->compaction_options_fifo.max_table_files_size ||
+ version->files_[0].size() == 0) {
+ // total size not exceeded
+ LogToBuffer(log_buffer,
+ "[%s] FIFO compaction: nothing to do. Total size %" PRIu64
+ ", max size %" PRIu64 "\n",
+ version->cfd_->GetName().c_str(), total_size,
+ options_->compaction_options_fifo.max_table_files_size);
+ return nullptr;
+ }
+
+ if (compactions_in_progress_[0].size() > 0) {
+ LogToBuffer(log_buffer,
+ "[%s] FIFO compaction: Already executing compaction. No need "
+ "to run parallel compactions since compactions are very fast",
+ version->cfd_->GetName().c_str());
+ return nullptr;
+ }
+
+ Compaction* c = new Compaction(version, 0, 0, 0, 0, false, false,
+ true /* is deletion compaction */);
+ // delete old files (FIFO)
+ for (auto ritr = version->files_[0].rbegin();
+ ritr != version->files_[0].rend(); ++ritr) {
+ auto f = *ritr;
+ total_size -= f->file_size;
+ c->inputs_[0].push_back(f);
+ char tmp_fsize[16];
+ AppendHumanBytes(f->file_size, tmp_fsize, sizeof(tmp_fsize));
+ LogToBuffer(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64
+ " with size %s for deletion",
+ version->cfd_->GetName().c_str(), f->number, tmp_fsize);
+ if (total_size <= options_->compaction_options_fifo.max_table_files_size) {
+ break;
+ }
+ }
+
+ c->MarkFilesBeingCompacted(true);
+ compactions_in_progress_[0].insert(c);
+
+ return c;
+}
+
+Compaction* FIFOCompactionPicker::CompactRange(Version* version,
+ int input_level,
+ int output_level,
+ const InternalKey* begin,
+ const InternalKey* end,
+ InternalKey** compaction_end) {
+ assert(input_level == 0);
+ assert(output_level == 0);
+ *compaction_end = nullptr;
+ LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, options_->info_log.get());
+ auto c = PickCompaction(version, &log_buffer);
+ log_buffer.FlushBufferToLog();
+ return c;
+}
+
} // namespace rocksdb
// compaction_end will be set to nullptr.
// Client is responsible for compaction_end storage -- when called,
// *compaction_end should point to valid InternalKey!
- Compaction* CompactRange(Version* version, int input_level, int output_level,
- const InternalKey* begin, const InternalKey* end,
- InternalKey** compaction_end);
+ virtual Compaction* CompactRange(Version* version, int input_level,
+ int output_level, const InternalKey* begin,
+ const InternalKey* end,
+ InternalKey** compaction_end);
// Free up the files that participated in a compaction
void ReleaseCompactionFiles(Compaction* c, Status status);
Compaction* PickCompactionBySize(Version* version, int level, double score);
};
+class FIFOCompactionPicker : public CompactionPicker {
+ public:
+ FIFOCompactionPicker(const Options* options,
+ const InternalKeyComparator* icmp)
+ : CompactionPicker(options, icmp) {}
+
+ virtual Compaction* PickCompaction(Version* version,
+ LogBuffer* log_buffer) override;
+
+ virtual Compaction* CompactRange(Version* version, int input_level,
+ int output_level, const InternalKey* begin,
+ const InternalKey* end,
+ InternalKey** compaction_end) override;
+};
+
} // namespace rocksdb
return s;
}
- int max_level_with_files = 1;
+ int max_level_with_files = 0;
{
MutexLock l(&mutex_);
Version* base = cfd->current();
// in case the compaction is unversal or if we're compacting the
// bottom-most level, the output level will be the same as input one
if (cfd->options()->compaction_style == kCompactionStyleUniversal ||
+ cfd->options()->compaction_style == kCompactionStyleFIFO ||
level == max_level_with_files) {
s = RunManualCompaction(cfd, level, level, begin, end);
} else {
// For universal compaction, we enforce every manual compaction to compact
// all files.
if (begin == nullptr ||
- cfd->options()->compaction_style == kCompactionStyleUniversal) {
+ cfd->options()->compaction_style == kCompactionStyleUniversal ||
+ cfd->options()->compaction_style == kCompactionStyleFIFO) {
manual.begin = nullptr;
} else {
begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
manual.begin = &begin_storage;
}
if (end == nullptr ||
- cfd->options()->compaction_style == kCompactionStyleUniversal) {
+ cfd->options()->compaction_style == kCompactionStyleUniversal ||
+ cfd->options()->compaction_style == kCompactionStyleFIFO) {
manual.end = nullptr;
} else {
end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
if (!c) {
// Nothing to do
LogToBuffer(log_buffer, "Compaction nothing to do");
+ } else if (c->IsDeletionCompaction()) {
+ // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
+ // file if there is alive snapshot pointing to it
+ assert(c->num_input_files(1) == 0);
+ assert(c->level() == 0);
+ assert(c->column_family_data()->options()->compaction_style ==
+ kCompactionStyleFIFO);
+ for (const auto& f : *c->inputs(0)) {
+ c->edit()->DeleteFile(c->level(), f->number);
+ }
+ status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_,
+ db_directory_.get());
+ InstallSuperVersion(c->column_family_data(), deletion_state);
+ LogToBuffer(log_buffer, "[%s] Deleted %d files\n",
+ c->column_family_data()->GetName().c_str(),
+ c->num_input_files(0));
+ c->ReleaseCompactionFiles(status);
+ *madeProgress = true;
} else if (!is_manual && c->IsTrivialMove()) {
// Move file to next level
assert(c->num_input_files(0) == 1);
if (!m->done) {
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
- // Universal compaction should always compact the whole range
+ // Universal and FIFO compactions should always compact the whole range
assert(m->cfd->options()->compaction_style != kCompactionStyleUniversal);
+ assert(m->cfd->options()->compaction_style != kCompactionStyleFIFO);
m->tmp_storage = *manual_end;
m->begin = &m->tmp_storage;
}
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
- if (cfd->options()->compaction_style == kCompactionStyleUniversal) {
+ if (cfd->options()->compaction_style == kCompactionStyleUniversal ||
+ cfd->options()->compaction_style == kCompactionStyleFIFO) {
Version* current = cfd->current();
for (int i = 1; i < current->NumberLevels(); ++i) {
int num_files = current->NumLevelFiles(i);
if (num_files > 0) {
- s = Status::InvalidArgument("Not all files are at level 0. Cannot "
- "open with universal compaction style.");
+ s = Status::InvalidArgument(
+ "Not all files are at level 0. Cannot "
+ "open with universal or FIFO compaction style.");
break;
}
}
cfd = cfh->cfd();
}
int output_level =
- (cfd->options()->compaction_style == kCompactionStyleUniversal)
+ (cfd->options()->compaction_style == kCompactionStyleUniversal ||
+ cfd->options()->compaction_style == kCompactionStyleFIFO)
? level
: level + 1;
return RunManualCompaction(cfd, level, output_level, begin, end);
kCompressedBlockCache,
kInfiniteMaxOpenFiles,
kxxHashChecksum,
+ kFIFOCompaction,
kEnd
};
int option_config_;
kSkipPlainTable = 8,
kSkipHashIndex = 16,
kSkipNoSeekToLast = 32,
- kSkipHashCuckoo = 64
+ kSkipHashCuckoo = 64,
+ kSkipFIFOCompaction = 128,
};
DBTest() : option_config_(kDefault),
if ((skip_mask & kSkipHashCuckoo) && (option_config_ == kHashCuckoo)) {
continue;
}
+ if ((skip_mask & kSkipFIFOCompaction) &&
+ option_config_ == kFIFOCompaction) {
+ continue;
+ }
break;
}
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
break;
}
+ case kFIFOCompaction: {
+ options.compaction_style = kCompactionStyleFIFO;
+ break;
+ }
case kBlockBasedTableWithPrefixHashIndex: {
BlockBasedTableOptions table_options;
table_options.index_type = BlockBasedTableOptions::kHashSearch;
env_->SleepForMicroseconds(1000000);
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1); // XXX
- } while (ChangeOptions(kSkipUniversalCompaction));
+ } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction));
}
// KeyMayExist can lead to a few false positives, but not false negatives.
// KeyMayExist function only checks data in block caches, which is not used
// by plain table format.
- } while (ChangeOptions(kSkipPlainTable | kSkipHashIndex));
+ } while (
+ ChangeOptions(kSkipPlainTable | kSkipHashIndex | kSkipFIFOCompaction));
}
TEST(DBTest, NonBlockingIteration) {
ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
}
// ApproximateOffsetOf() is not yet implemented in plain table format.
- } while (ChangeOptions(kSkipUniversalCompaction | kSkipPlainTable));
+ } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction |
+ kSkipPlainTable));
}
TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
// ApproximateOffsetOf() is not yet implemented in plain table format,
// which is used by Size().
// skip HashCuckooRep as it does not support snapshot
- } while (ChangeOptions(kSkipUniversalCompaction | kSkipPlainTable |
- kSkipHashCuckoo));
+ } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction |
+ kSkipPlainTable | kSkipHashCuckoo));
}
TEST(DBTest, CompactBetweenSnapshots) {
ASSERT_EQ("sixth", Get(1, "foo"));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
// skip HashCuckooRep as it does not support snapshot
- } while (ChangeOptions(kSkipHashCuckoo));
+ } while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction));
}
TEST(DBTest, DeletionMarkers1) {
Flush(1);
ASSERT_EQ("3", FilesPerLevel(1));
ASSERT_EQ("NOT_FOUND", Get(1, "600"));
- } while (ChangeOptions(kSkipUniversalCompaction));
+ } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction));
}
TEST(DBTest, L0_CompactionBug_Issue44_a) {
ASSERT_EQ("f", Get("e"));
ASSERT_EQ("h", Get("g"));
}
+
+TEST(DBTest, FIFOCompactionTest) {
+ for (int iter = 0; iter < 2; ++iter) {
+ // first iteration -- auto compaction
+ // second iteration -- manual compaction
+ Options options;
+ options.compaction_style = kCompactionStyleFIFO;
+ options.write_buffer_size = 100 << 10; // 100KB
+ options.compaction_options_fifo.max_table_files_size = 500 << 10; // 500KB
+ options.compression = kNoCompression;
+ options.create_if_missing = true;
+ if (iter == 1) {
+ options.disable_auto_compactions = true;
+ }
+ DestroyAndReopen(&options);
+
+ Random rnd(301);
+ for (int i = 0; i < 6; ++i) {
+ for (int j = 0; j < 100; ++j) {
+ ASSERT_OK(Put(std::to_string(i * 100 + j), RandomString(&rnd, 1024)));
+ }
+ // flush should happen here
+ }
+ if (iter == 0) {
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ } else {
+ ASSERT_OK(db_->CompactRange(nullptr, nullptr));
+ }
+ // only 5 files should survive
+ ASSERT_EQ(NumTableFilesAtLevel(0), 5);
+ for (int i = 0; i < 50; ++i) {
+ // these keys should be deleted in previous compaction
+ ASSERT_EQ("NOT_FOUND", Get(std::to_string(i)));
+ }
+ }
+}
} // namespace rocksdb
int main(int argc, char** argv) {
int max_score_level = 0;
int num_levels_to_check =
- (cfd_->options()->compaction_style != kCompactionStyleUniversal)
+ (cfd_->options()->compaction_style != kCompactionStyleUniversal &&
+ cfd_->options()->compaction_style != kCompactionStyleFIFO)
? NumberLevels() - 1
: 1;
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
int numfiles = 0;
+ uint64_t total_size = 0;
for (unsigned int i = 0; i < files_[level].size(); i++) {
if (!files_[level][i]->being_compacted) {
+ total_size += files_[level][i]->file_size;
numfiles++;
}
}
-
- // If we are slowing down writes, then we better compact that first
- if (numfiles >= cfd_->options()->level0_stop_writes_trigger) {
+ if (cfd_->options()->compaction_style == kCompactionStyleFIFO) {
+ score = static_cast<double>(total_size) /
+ cfd_->options()->compaction_options_fifo.max_table_files_size;
+ } else if (numfiles >= cfd_->options()->level0_stop_writes_trigger) {
+ // If we are slowing down writes, then we better compact that first
score = 1000000;
} else if (numfiles >= cfd_->options()->level0_slowdown_writes_trigger) {
score = 10000;
} // anonymous namespace
void Version::UpdateFilesBySize() {
+ if (cfd_->options()->compaction_style == kCompactionStyleFIFO) {
+ // don't need this
+ return;
+ }
// No need to sort the highest level because it is never compacted.
int max_level =
(cfd_->options()->compaction_style == kCompactionStyleUniversal)
// TODO(sdong): improve this function to be accurate for universal
// compactions.
int num_levels_to_check =
- (cfd_->options()->compaction_style != kCompactionStyleUniversal)
+ (cfd_->options()->compaction_style != kCompactionStyleUniversal &&
+ cfd_->options()->compaction_style != kCompactionStyleFIFO)
? NumberLevels() - 1
: 1;
for (int i = 0; i < num_levels_to_check; i++) {
class VersionSet::Builder {
private:
// Helper to sort v->files_
- // kLevel0LevelCompaction -- NewestFirst
+ // kLevel0LevelCompaction -- NewestFirst (also used for FIFO compaction)
// kLevel0UniversalCompaction -- NewestFirstBySeqNo
// kLevelNon0 -- BySmallestKey
struct FileComparator {
friend class CompactionPicker;
friend class LevelCompactionPicker;
friend class UniversalCompactionPicker;
+ friend class FIFOCompactionPicker;
class LevelFileNumIterator;
class LevelFileIteratorState;
};
enum CompactionStyle : char {
- kCompactionStyleLevel = 0x0, // level based compaction style
- kCompactionStyleUniversal = 0x1 // Universal compaction style
+ kCompactionStyleLevel = 0x0, // level based compaction style
+ kCompactionStyleUniversal = 0x1, // Universal compaction style
+ kCompactionStyleFIFO = 0x2, // FIFO compaction style
+};
+
+struct CompactionOptionsFIFO {
+ // once the total sum of table files reaches this, we will delete the oldest
+ // table file
+ // Default: 1GB
+ uint64_t max_table_files_size;
+
+ CompactionOptionsFIFO() : max_table_files_size(1 * 1024 * 1024 * 1024) {}
};
// Compression options for different compression algorithms like Zlib
// The options needed to support Universal Style compactions
CompactionOptionsUniversal compaction_options_universal;
+ // The options for FIFO compaction style
+ CompactionOptionsFIFO compaction_options_fifo;
+
// Use KeyMayExist API to filter deletes when this is true.
// If KeyMayExist returns false, i.e. the key definitely does not exist, then
// the delete is a noop. KeyMayExist only incurs in-memory look up.
compaction_style(options.compaction_style),
verify_checksums_in_compaction(options.verify_checksums_in_compaction),
compaction_options_universal(options.compaction_options_universal),
+ compaction_options_fifo(options.compaction_options_fifo),
filter_deletes(options.filter_deletes),
max_sequential_skip_in_iterations(
options.max_sequential_skip_in_iterations),
Log(log,
"Options.compaction_options_universal.compression_size_percent: %u",
compaction_options_universal.compression_size_percent);
+ Log(log, "Options.compaction_options_fifo.max_table_files_size: %" PRIu64,
+ compaction_options_fifo.max_table_files_size);
std::string collector_names;
for (const auto& collector_factory : table_properties_collector_factories) {
collector_names.append(collector_factory->Name());