#include <algorithm>
#include <atomic>
#include <functional>
+#include <map>
#include <set>
#include <thread>
#include <unordered_map>
Logger* info_log_;
TableCache* table_cache_;
VersionStorageInfo* base_vstorage_;
+ int num_levels_;
LevelState* levels_;
+ // Store states of levels larger than num_levels_. We do this instead of
+ // storing them in levels_ to avoid regression in case there are no files
+ // on invalid levels. The version is not consistent if in the end the files
+ // on invalid levels don't cancel out.
+ std::map<int, std::unordered_set<uint64_t>> invalid_levels_;
+ // Whether there are invalid new files or invalid deletion on levels larger
+ // than num_levels_.
+ bool has_invalid_levels_;
FileComparator level_zero_cmp_;
FileComparator level_nonzero_cmp_;
: env_options_(env_options),
info_log_(info_log),
table_cache_(table_cache),
- base_vstorage_(base_vstorage) {
- levels_ = new LevelState[base_vstorage_->num_levels()];
+ base_vstorage_(base_vstorage),
+ num_levels_(base_vstorage->num_levels()),
+ has_invalid_levels_(false) {
+ levels_ = new LevelState[num_levels_];
level_zero_cmp_.sort_method = FileComparator::kLevel0;
level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
level_nonzero_cmp_.internal_comparator =
}
~Rep() {
- for (int level = 0; level < base_vstorage_->num_levels(); level++) {
+ for (int level = 0; level < num_levels_; level++) {
const auto& added = levels_[level].added_files;
for (auto& pair : added) {
UnrefFile(pair.second);
}
#endif
// make sure the files are sorted correctly
- for (int level = 0; level < vstorage->num_levels(); level++) {
+ for (int level = 0; level < num_levels_; level++) {
auto& level_files = vstorage->LevelFiles(level);
for (size_t i = 1; i < level_files.size(); i++) {
auto f1 = level_files[i - 1];
#endif
// a file to be deleted better exist in the previous version
bool found = false;
- for (int l = 0; !found && l < base_vstorage_->num_levels(); l++) {
+ for (int l = 0; !found && l < num_levels_; l++) {
const std::vector<FileMetaData*>& base_files =
base_vstorage_->LevelFiles(l);
for (size_t i = 0; i < base_files.size(); i++) {
// if the file did not exist in the previous version, then it
// is possibly moved from lower level to higher level in current
// version
- for (int l = level + 1; !found && l < base_vstorage_->num_levels(); l++) {
+ for (int l = level + 1; !found && l < num_levels_; l++) {
auto& level_added = levels_[l].added_files;
auto got = level_added.find(number);
if (got != level_added.end()) {
}
}
+ bool CheckConsistencyForNumLevels() {
+ // Make sure there are no files on or beyond num_levels().
+ if (has_invalid_levels_) {
+ return false;
+ }
+ for (auto& level : invalid_levels_) {
+ if (level.second.size() > 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
// Apply all of the edits in *edit to the current state.
void Apply(VersionEdit* edit) {
CheckConsistency(base_vstorage_);
for (const auto& del_file : del) {
const auto level = del_file.first;
const auto number = del_file.second;
- levels_[level].deleted_files.insert(number);
- CheckConsistencyForDeletes(edit, number, level);
-
- auto exising = levels_[level].added_files.find(number);
- if (exising != levels_[level].added_files.end()) {
- UnrefFile(exising->second);
- levels_[level].added_files.erase(number);
+ if (level < num_levels_) {
+ levels_[level].deleted_files.insert(number);
+ CheckConsistencyForDeletes(edit, number, level);
+
+ auto exising = levels_[level].added_files.find(number);
+ if (exising != levels_[level].added_files.end()) {
+ UnrefFile(exising->second);
+ levels_[level].added_files.erase(number);
+ }
+ } else {
+ if (invalid_levels_[level].count(number) > 0) {
+ invalid_levels_[level].erase(number);
+ } else {
+ // Deleting an non-existing file on invalid level.
+ has_invalid_levels_ = true;
+ }
}
}
// Add new files
for (const auto& new_file : edit->GetNewFiles()) {
const int level = new_file.first;
- FileMetaData* f = new FileMetaData(new_file.second);
- f->refs = 1;
-
- assert(levels_[level].added_files.find(f->fd.GetNumber()) ==
- levels_[level].added_files.end());
- levels_[level].deleted_files.erase(f->fd.GetNumber());
- levels_[level].added_files[f->fd.GetNumber()] = f;
+ if (level < num_levels_) {
+ FileMetaData* f = new FileMetaData(new_file.second);
+ f->refs = 1;
+
+ assert(levels_[level].added_files.find(f->fd.GetNumber()) ==
+ levels_[level].added_files.end());
+ levels_[level].deleted_files.erase(f->fd.GetNumber());
+ levels_[level].added_files[f->fd.GetNumber()] = f;
+ } else {
+ uint64_t number = new_file.second.fd.GetNumber();
+ if (invalid_levels_[level].count(number) == 0) {
+ invalid_levels_[level].insert(number);
+ } else {
+ // Creating an already existing file on invalid level.
+ has_invalid_levels_ = true;
+ }
+ }
}
}
CheckConsistency(base_vstorage_);
CheckConsistency(vstorage);
- for (int level = 0; level < base_vstorage_->num_levels(); level++) {
+ for (int level = 0; level < num_levels_; level++) {
const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
// Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v.
assert(table_cache_ != nullptr);
// <file metadata, level>
std::vector<std::pair<FileMetaData*, int>> files_meta;
- for (int level = 0; level < base_vstorage_->num_levels(); level++) {
+ for (int level = 0; level < num_levels_; level++) {
for (auto& file_meta_pair : levels_[level].added_files) {
auto* file_meta = file_meta_pair.second;
assert(!file_meta->table_reader_handle);
VersionStorageInfo* base_vstorage,
Logger* info_log)
: rep_(new Rep(env_options, info_log, table_cache, base_vstorage)) {}
+
VersionBuilder::~VersionBuilder() { delete rep_; }
+
void VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) {
rep_->CheckConsistency(vstorage);
}
+
void VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit,
uint64_t number, int level) {
rep_->CheckConsistencyForDeletes(edit, number, level);
}
+
+bool VersionBuilder::CheckConsistencyForNumLevels() {
+ return rep_->CheckConsistencyForNumLevels();
+}
+
void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); }
+
void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
rep_->SaveTo(vstorage);
}
+
void VersionBuilder::LoadTableHandlers(
InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache) {
rep_->LoadTableHandlers(internal_stats, max_threads,
prefetch_index_and_filter_in_cache);
}
+
void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level,
FileMetaData* f) {
rep_->MaybeAddFile(vstorage, level, f);