}
void DBImpl::CompactRange(const Slice* begin, const Slice* end,
- bool reduce_level) {
+ bool reduce_level, int target_level) {
int max_level_with_files = 1;
{
MutexLock l(&mutex_);
}
if (reduce_level) {
- ReFitLevel(max_level_with_files);
+ ReFitLevel(max_level_with_files, target_level);
}
}
return minimum_level;
}
-void DBImpl::ReFitLevel(int level) {
+void DBImpl::ReFitLevel(int level, int target_level) {
assert(level < NumberLevels());
MutexLock l(&mutex_);
}
// move to a smaller level
- int to_level = FindMinimumEmptyLevelFitting(level);
+ int to_level = target_level;
+ if (target_level < 0) {
+ to_level = FindMinimumEmptyLevelFitting(level);
+ }
assert(to_level <= level);
}
}
impl->mutex_.Unlock();
+
+ if (options.compaction_style == kCompactionStyleUniversal) {
+ std::string property;
+ int num_files;
+ for (int i = 1; i < impl->NumberLevels(); i++) {
+ num_files = impl->versions_->NumLevelFiles(i);
+ if (num_files > 0) {
+ s = Status::InvalidArgument("Not all files are at level 0. Cannot "
+ "open with universal compaction style.");
+ break;
+ }
+ }
+ }
+
if (s.ok()) {
*dbptr = impl;
} else {
virtual bool GetProperty(const Slice& property, std::string* value);
virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes);
virtual void CompactRange(const Slice* begin, const Slice* end,
- bool reduce_level = false);
+ bool reduce_level = false, int target_level = -1);
virtual int NumberLevels();
virtual int MaxMemCompactionLevel();
virtual int Level0StopWriteTrigger();
// input level. Return the input level, if such level could not be found.
int FindMinimumEmptyLevelFitting(int level);
- // Move the files in the input level to the minimum level that could hold
- // the data set.
- void ReFitLevel(int level);
+ // Move the files in the input level to the target level.
+ // If target_level < 0, automatically calculate the minimum level that could
+ // hold the data set.
+ void ReFitLevel(int level, int target_level = -1);
// Constant after construction
const InternalFilterPolicy internal_filter_policy_;
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual void CompactRange(const Slice* begin, const Slice* end,
- bool reduce_level = false) {
+ bool reduce_level = false, int target_level = -1) {
}
virtual Status DisableFileDeletions() {
return Status::NotSupported("Not supported operation in read only mode.");
}
}
+TEST(DBTest, ConvertCompactionStyle) {
+ Random rnd(301);
+ int max_key_level_insert = 200;
+ int max_key_universal_insert = 600;
+
+ // Stage 1: generate a db with level compaction
+ Options options = CurrentOptions();
+ options.write_buffer_size = 100<<10; //100KB
+ options.num_levels = 4;
+ options.level0_file_num_compaction_trigger = 3;
+ options.max_bytes_for_level_base = 500<<10; // 500KB
+ options.max_bytes_for_level_multiplier = 1;
+ options.target_file_size_base = 200<<10; // 200KB
+ options.target_file_size_multiplier = 1;
+ Reopen(&options);
+
+ for (int i = 0; i <= max_key_level_insert; i++) {
+ // each value is 10K
+ ASSERT_OK(Put(Key(i), RandomString(&rnd, 10000)));
+ }
+ dbfull()->Flush(FlushOptions());
+ dbfull()->TEST_WaitForCompact();
+
+ ASSERT_GT(TotalTableFiles(), 1);
+ int non_level0_num_files = 0;
+ for (int i = 1; i < dbfull()->NumberLevels(); i++) {
+ non_level0_num_files += NumTableFilesAtLevel(i);
+ }
+ ASSERT_GT(non_level0_num_files, 0);
+
+ // Stage 2: reopen with universal compaction - should fail
+ options = CurrentOptions();
+ options.compaction_style = kCompactionStyleUniversal;
+ Status s = TryReopen(&options);
+ ASSERT_TRUE(s.IsInvalidArgument());
+
+ // Stage 3: compact into a single file and move the file to level 0
+ options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ options.target_file_size_base = INT_MAX;
+ options.target_file_size_multiplier = 1;
+ options.max_bytes_for_level_base = INT_MAX;
+ options.max_bytes_for_level_multiplier = 1;
+ Reopen(&options);
+
+ dbfull()->CompactRange(nullptr, nullptr,
+ true /* reduce level */,
+ 0 /* reduce to level 0 */);
+
+ for (int i = 0; i < dbfull()->NumberLevels(); i++) {
+ int num = NumTableFilesAtLevel(i);
+ if (i == 0) {
+ ASSERT_EQ(num, 1);
+ } else {
+ ASSERT_EQ(num, 0);
+ }
+ }
+
+ // Stage 4: re-open in universal compaction style and do some db operations
+ options = CurrentOptions();
+ options.compaction_style = kCompactionStyleUniversal;
+ options.write_buffer_size = 100<<10; //100KB
+ options.level0_file_num_compaction_trigger = 3;
+ Reopen(&options);
+
+ for (int i = max_key_level_insert / 2; i <= max_key_universal_insert; i++) {
+ ASSERT_OK(Put(Key(i), RandomString(&rnd, 10000)));
+ }
+ dbfull()->Flush(FlushOptions());
+ dbfull()->TEST_WaitForCompact();
+
+ for (int i = 1; i < dbfull()->NumberLevels(); i++) {
+ ASSERT_EQ(NumTableFilesAtLevel(i), 0);
+ }
+
+ // verify keys inserted in both level compaction style and universal
+ // compaction style
+ std::string keys_in_db;
+ Iterator* iter = dbfull()->NewIterator(ReadOptions());
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ keys_in_db.append(iter->key().ToString());
+ keys_in_db.push_back(',');
+ }
+ delete iter;
+
+ std::string expected_keys;
+ for (int i = 0; i <= max_key_universal_insert; i++) {
+ expected_keys.append(Key(i));
+ expected_keys.push_back(',');
+ }
+
+ ASSERT_EQ(keys_in_db, expected_keys);
+}
+
void MinLevelHelper(DBTest* self, Options& options) {
Random rnd(301);
}
}
virtual void CompactRange(const Slice* start, const Slice* end,
- bool reduce_level ) {
+ bool reduce_level, int target_level) {
}
virtual int NumberLevels()
// after compaction is reduced, that level might not be appropriate for
// hosting all the files. In this case, client could set reduce_level
// to true, to move the files back to the minimum level capable of holding
- // the data set.
+ // the data set or a given level (specified by non-negative target_level).
virtual void CompactRange(const Slice* begin, const Slice* end,
- bool reduce_level = false) = 0;
+ bool reduce_level = false,
+ int target_level = -1) = 0;
// Number of levels used for this DB.
virtual int NumberLevels() = 0;
}
virtual void CompactRange(const Slice* begin, const Slice* end,
- bool reduce_level = false) override {
- return sdb_->CompactRange(begin, end, reduce_level);
+ bool reduce_level = false,
+ int target_level = -1) override {
+ return sdb_->CompactRange(begin, end, reduce_level, target_level);
}
virtual int NumberLevels() override {
return new WALDumperCommand(cmdParams, option_map, flags);
} else if (cmd == ReduceDBLevelsCommand::Name()) {
return new ReduceDBLevelsCommand(cmdParams, option_map, flags);
+ } else if (cmd == ChangeCompactionStyleCommand::Name()) {
+ return new ChangeCompactionStyleCommand(cmdParams, option_map, flags);
} else if (cmd == DBDumperCommand::Name()) {
return new DBDumperCommand(cmdParams, option_map, flags);
} else if (cmd == DBLoaderCommand::Name()) {
}
}
+const string ChangeCompactionStyleCommand::ARG_OLD_COMPACTION_STYLE =
+ "old_compaction_style";
+const string ChangeCompactionStyleCommand::ARG_NEW_COMPACTION_STYLE =
+ "new_compaction_style";
+
+ChangeCompactionStyleCommand::ChangeCompactionStyleCommand(
+ const vector<string>& params, const map<string, string>& options,
+ const vector<string>& flags) :
+ LDBCommand(options, flags, false,
+ BuildCmdLineOptions({ARG_OLD_COMPACTION_STYLE,
+ ARG_NEW_COMPACTION_STYLE})),
+ old_compaction_style_(-1),
+ new_compaction_style_(-1) {
+
+ ParseIntOption(option_map_, ARG_OLD_COMPACTION_STYLE, old_compaction_style_,
+ exec_state_);
+ if (old_compaction_style_ != kCompactionStyleLevel &&
+ old_compaction_style_ != kCompactionStyleUniversal) {
+ exec_state_ = LDBCommandExecuteResult::FAILED(
+ "Use --" + ARG_OLD_COMPACTION_STYLE + " to specify old compaction " +
+ "style. Check ldb help for proper compaction style value.\n");
+ return;
+ }
+
+ ParseIntOption(option_map_, ARG_NEW_COMPACTION_STYLE, new_compaction_style_,
+ exec_state_);
+ if (new_compaction_style_ != kCompactionStyleLevel &&
+ new_compaction_style_ != kCompactionStyleUniversal) {
+ exec_state_ = LDBCommandExecuteResult::FAILED(
+ "Use --" + ARG_NEW_COMPACTION_STYLE + " to specify new compaction " +
+ "style. Check ldb help for proper compaction style value.\n");
+ return;
+ }
+
+ if (new_compaction_style_ == old_compaction_style_) {
+ exec_state_ = LDBCommandExecuteResult::FAILED(
+ "Old compaction style is the same as new compaction style. "
+ "Nothing to do.\n");
+ return;
+ }
+
+ if (old_compaction_style_ == kCompactionStyleUniversal &&
+ new_compaction_style_ == kCompactionStyleLevel) {
+ exec_state_ = LDBCommandExecuteResult::FAILED(
+ "Convert from universal compaction to level compaction. "
+ "Nothing to do.\n");
+ return;
+ }
+}
+
+void ChangeCompactionStyleCommand::Help(string& ret) {
+ ret.append(" ");
+ ret.append(ChangeCompactionStyleCommand::Name());
+ ret.append(" --" + ARG_OLD_COMPACTION_STYLE + "=<Old compaction style: 0 " +
+ "for level compaction, 1 for universal compaction>");
+ ret.append(" --" + ARG_NEW_COMPACTION_STYLE + "=<New compaction style: 0 " +
+ "for level compaction, 1 for universal compaction>");
+ ret.append("\n");
+}
+
+Options ChangeCompactionStyleCommand::PrepareOptionsForOpenDB() {
+ Options opt = LDBCommand::PrepareOptionsForOpenDB();
+
+ if (old_compaction_style_ == kCompactionStyleLevel &&
+ new_compaction_style_ == kCompactionStyleUniversal) {
+ // In order to convert from level compaction to universal compaction, we
+ // need to compact all data into a single file and move it to level 0.
+ opt.disable_auto_compactions = true;
+ opt.target_file_size_base = INT_MAX;
+ opt.target_file_size_multiplier = 1;
+ opt.max_bytes_for_level_base = INT_MAX;
+ opt.max_bytes_for_level_multiplier = 1;
+ }
+
+ return opt;
+}
+
+void ChangeCompactionStyleCommand::DoCommand() {
+ // print db stats before we have made any change
+ std::string property;
+ std::string files_per_level;
+ for (int i = 0; i < db_->NumberLevels(); i++) {
+ db_->GetProperty("leveldb.num-files-at-level" + NumberToString(i),
+ &property);
+
+ // format print string
+ char buf[100];
+ snprintf(buf, sizeof(buf), "%s%s", (i ? "," : ""), property.c_str());
+ files_per_level += buf;
+ }
+ fprintf(stdout, "files per level before compaction: %s\n",
+ files_per_level.c_str());
+
+ // manual compact into a single file and move the file to level 0
+ db_->CompactRange(nullptr, nullptr,
+ true /* reduce level */,
+ 0 /* reduce to level 0 */);
+
+ // verify compaction result
+ files_per_level = "";
+ int num_files = 0;
+ for (int i = 0; i < db_->NumberLevels(); i++) {
+ db_->GetProperty("leveldb.num-files-at-level" + NumberToString(i),
+ &property);
+
+ // format print string
+ char buf[100];
+ snprintf(buf, sizeof(buf), "%s%s", (i ? "," : ""), property.c_str());
+ files_per_level += buf;
+
+ num_files = atoi(property.c_str());
+
+ // level 0 should have only 1 file
+ if (i == 0 && num_files != 1) {
+ exec_state_ = LDBCommandExecuteResult::FAILED("Number of db files at "
+ "level 0 after compaction is " + std::to_string(num_files) +
+ ", not 1.\n");
+ return;
+ }
+ // other levels should have no file
+ if (i > 0 && num_files != 0) {
+ exec_state_ = LDBCommandExecuteResult::FAILED("Number of db files at "
+ "level " + std::to_string(i) + " after compaction is " +
+ std::to_string(num_files) + ", not 0.\n");
+ return;
+ }
+ }
+
+ fprintf(stdout, "files per level after compaction: %s\n",
+ files_per_level.c_str());
+}
+
class InMemoryHandler : public WriteBatch::Handler {
public:
ret.append(" ");
ret.append(WALDumperCommand::Name());
ret.append(" --" + ARG_WAL_FILE + "=<write_ahead_log_file_path>");
- ret.append(" --[" + ARG_PRINT_HEADER + "] ");
- ret.append(" --[ " + ARG_PRINT_VALUE + "] ");
+ ret.append(" [--" + ARG_PRINT_HEADER + "] ");
+ ret.append(" [--" + ARG_PRINT_VALUE + "] ");
ret.append("\n");
}
Status GetOldNumOfLevels(Options& opt, int* levels);
};
+class ChangeCompactionStyleCommand : public LDBCommand {
+public:
+ static string Name() { return "change_compaction_style"; }
+
+ ChangeCompactionStyleCommand(const vector<string>& params,
+ const map<string, string>& options, const vector<string>& flags);
+
+ virtual Options PrepareOptionsForOpenDB();
+
+ virtual void DoCommand();
+
+ static void Help(string& msg);
+
+private:
+ int old_compaction_style_;
+ int new_compaction_style_;
+
+ static const string ARG_OLD_COMPACTION_STYLE;
+ static const string ARG_NEW_COMPACTION_STYLE;
+};
+
class WALDumperCommand : public LDBCommand {
public:
static string Name() { return "dump_wal"; }
WALDumperCommand::Help(ret);
CompactorCommand::Help(ret);
ReduceDBLevelsCommand::Help(ret);
+ ChangeCompactionStyleCommand::Help(ret);
DBDumperCommand::Help(ret);
DBLoaderCommand::Help(ret);
ManifestDumpCommand::Help(ret);
}
void DBWithTTL::CompactRange(const Slice* begin, const Slice* end,
- bool reduce_level) {
- db_->CompactRange(begin, end, reduce_level);
+ bool reduce_level, int target_level) {
+ db_->CompactRange(begin, end, reduce_level, target_level);
}
int DBWithTTL::NumberLevels() {
virtual void GetApproximateSizes(const Range* r, int n, uint64_t* sizes);
virtual void CompactRange(const Slice* begin, const Slice* end,
- bool reduce_level = false);
+ bool reduce_level = false, int target_level = -1);
virtual int NumberLevels();