namespace ROCKSDB_NAMESPACE {
-class MyTestCompactionService : public CompactionService {
+class TestCompactionServiceBase {
public:
- MyTestCompactionService(const std::string& db_path, Options& options,
- std::shared_ptr<Statistics> statistics = nullptr)
- : db_path_(db_path), options_(options), statistics_(statistics) {}
+ virtual int GetCompactionNum() = 0;
- static const char* kClassName() { return "MyTestCompactionService"; }
+ void OverrideStartStatus(CompactionServiceJobStatus s) {
+ is_override_start_status = true;
+ override_start_status = s;
+ }
+
+ void OverrideWaitResult(std::string str) {
+ is_override_wait_result = true;
+ override_wait_result = std::move(str);
+ }
+
+ void ResetOverride() {
+ is_override_wait_result = false;
+ is_override_start_status = false;
+ }
+
+ virtual ~TestCompactionServiceBase() = default;
+
+ protected:
+ bool is_override_start_status = false;
+ CompactionServiceJobStatus override_start_status =
+ CompactionServiceJobStatus::kFailure;
+ bool is_override_wait_result = false;
+ std::string override_wait_result;
+};
+
+class MyTestCompactionServiceLegacy : public CompactionService,
+ public TestCompactionServiceBase {
+ public:
+ MyTestCompactionServiceLegacy(std::string db_path, Options& options,
+ std::shared_ptr<Statistics>& statistics)
+ : db_path_(std::move(db_path)),
+ options_(options),
+ statistics_(statistics) {}
+
+ static const char* kClassName() { return "MyTestCompactionServiceLegacy"; }
const char* Name() const override { return kClassName(); }
InstrumentedMutexLock l(&mutex_);
jobs_.emplace(job_id, compaction_service_input);
CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
- TEST_SYNC_POINT_CALLBACK("MyTestCompactionService::Start::End", &s);
+ if (is_override_start_status) {
+ return override_start_status;
+ }
return s;
}
Status s = DB::OpenAndCompact(
db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(job_id),
compaction_input, compaction_service_result, options_override);
- TEST_SYNC_POINT_CALLBACK("MyTestCompactionService::WaitForComplete::End",
- compaction_service_result);
+ if (is_override_wait_result) {
+ *compaction_service_result = override_wait_result;
+ }
compaction_num_.fetch_add(1);
if (s.ok()) {
return CompactionServiceJobStatus::kSuccess;
}
}
- int GetCompactionNum() { return compaction_num_.load(); }
+ int GetCompactionNum() override { return compaction_num_.load(); }
private:
InstrumentedMutex mutex_;
std::shared_ptr<Statistics> statistics_;
};
-class CompactionServiceTest : public DBTestBase {
+class MyTestCompactionService : public CompactionService,
+ public TestCompactionServiceBase {
+ public:
+ MyTestCompactionService(std::string db_path, Options& options,
+ std::shared_ptr<Statistics>& statistics)
+ : db_path_(std::move(db_path)),
+ options_(options),
+ statistics_(statistics),
+ start_info_("na", "na", "na", 0),
+ wait_info_("na", "na", "na", 0) {}
+
+ static const char* kClassName() { return "MyTestCompactionService"; }
+
+ const char* Name() const override { return kClassName(); }
+
+ CompactionServiceJobStatus StartV2(
+ const CompactionServiceJobInfo& info,
+ const std::string& compaction_service_input) override {
+ InstrumentedMutexLock l(&mutex_);
+ start_info_ = info;
+ assert(info.db_name == db_path_);
+ jobs_.emplace(info.job_id, compaction_service_input);
+ CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
+ if (is_override_start_status) {
+ return override_start_status;
+ }
+ return s;
+ }
+
+ CompactionServiceJobStatus WaitForCompleteV2(
+ const CompactionServiceJobInfo& info,
+ std::string* compaction_service_result) override {
+ std::string compaction_input;
+ assert(info.db_name == db_path_);
+ {
+ InstrumentedMutexLock l(&mutex_);
+ wait_info_ = info;
+ auto i = jobs_.find(info.job_id);
+ if (i == jobs_.end()) {
+ return CompactionServiceJobStatus::kFailure;
+ }
+ compaction_input = std::move(i->second);
+ jobs_.erase(i);
+ }
+
+ CompactionServiceOptionsOverride options_override;
+ options_override.env = options_.env;
+ options_override.file_checksum_gen_factory =
+ options_.file_checksum_gen_factory;
+ options_override.comparator = options_.comparator;
+ options_override.merge_operator = options_.merge_operator;
+ options_override.compaction_filter = options_.compaction_filter;
+ options_override.compaction_filter_factory =
+ options_.compaction_filter_factory;
+ options_override.prefix_extractor = options_.prefix_extractor;
+ options_override.table_factory = options_.table_factory;
+ options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
+ options_override.statistics = statistics_;
+
+ Status s = DB::OpenAndCompact(
+ db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(info.job_id),
+ compaction_input, compaction_service_result, options_override);
+ if (is_override_wait_result) {
+ *compaction_service_result = override_wait_result;
+ }
+ compaction_num_.fetch_add(1);
+ if (s.ok()) {
+ return CompactionServiceJobStatus::kSuccess;
+ } else {
+ return CompactionServiceJobStatus::kFailure;
+ }
+ }
+
+ int GetCompactionNum() override { return compaction_num_.load(); }
+
+ CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; }
+ CompactionServiceJobInfo GetCompactionInfoForWait() { return wait_info_; }
+
+ private:
+ InstrumentedMutex mutex_;
+ std::atomic_int compaction_num_{0};
+ std::map<uint64_t, std::string> jobs_;
+ const std::string db_path_;
+ Options options_;
+ std::shared_ptr<Statistics> statistics_;
+ CompactionServiceJobInfo start_info_;
+ CompactionServiceJobInfo wait_info_;
+};
+
+// This is only for listing test classes
+enum TestCompactionServiceType {
+ MyTestCompactionServiceType,
+ MyTestCompactionServiceLegacyType,
+};
+
+class CompactionServiceTest
+ : public DBTestBase,
+ public testing::WithParamInterface<TestCompactionServiceType> {
public:
explicit CompactionServiceTest()
: DBTestBase("compaction_service_test", true) {}
protected:
+ void ReopenWithCompactionService(Options* options) {
+ options->env = env_;
+ primary_statistics_ = CreateDBStatistics();
+ options->statistics = primary_statistics_;
+ compactor_statistics_ = CreateDBStatistics();
+ TestCompactionServiceType cs_type = GetParam();
+ switch (cs_type) {
+ case MyTestCompactionServiceType:
+ compaction_service_ = std::make_shared<MyTestCompactionService>(
+ dbname_, *options, compactor_statistics_);
+ break;
+ case MyTestCompactionServiceLegacyType:
+ compaction_service_ = std::make_shared<MyTestCompactionServiceLegacy>(
+ dbname_, *options, compactor_statistics_);
+ break;
+ default:
+ assert(false);
+ }
+ options->compaction_service = compaction_service_;
+ DestroyAndReopen(*options);
+ }
+
+ Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); }
+
+ Statistics* GetPrimaryStatistics() { return primary_statistics_.get(); }
+
+ TestCompactionServiceBase* GetCompactionService() {
+ CompactionService* cs = compaction_service_.get();
+ return dynamic_cast<TestCompactionServiceBase*>(cs);
+ }
+
void GenerateTestData() {
// Generate 20 files @ L2
for (int i = 0; i < 20; i++) {
}
}
}
+
+ private:
+ std::shared_ptr<Statistics> compactor_statistics_;
+ std::shared_ptr<Statistics> primary_statistics_;
+ std::shared_ptr<CompactionService> compaction_service_;
};
-TEST_F(CompactionServiceTest, BasicCompactions) {
+TEST_P(CompactionServiceTest, BasicCompactions) {
Options options = CurrentOptions();
- options.env = env_;
- options.statistics = CreateDBStatistics();
- std::shared_ptr<Statistics> compactor_statistics = CreateDBStatistics();
- options.compaction_service = std::make_shared<MyTestCompactionService>(
- dbname_, options, compactor_statistics);
-
- DestroyAndReopen(options);
+ ReopenWithCompactionService(&options);
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
ASSERT_EQ(result, "value_new" + ToString(i));
}
}
- auto my_cs =
- dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
+ auto my_cs = GetCompactionService();
+ Statistics* compactor_statistics = GetCompactorStatistics();
ASSERT_GE(my_cs->GetCompactionNum(), 1);
// make sure the compaction statistics is only recorded on remote side
ASSERT_GE(
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), 1);
- ASSERT_EQ(options.statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
+ Statistics* primary_statistics = GetPrimaryStatistics();
+ ASSERT_EQ(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
0);
// Test failed compaction
SyncPoint::GetInstance()->SetCallBack(
"DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
// override job status
- Status* s = static_cast<Status*>(status);
+ auto s = static_cast<Status*>(status);
*s = Status::Aborted("MyTestCompactionService failed to compact!");
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_TRUE(s.IsAborted());
}
-TEST_F(CompactionServiceTest, ManualCompaction) {
+TEST_P(CompactionServiceTest, ManualCompaction) {
Options options = CurrentOptions();
- options.env = env_;
options.disable_auto_compactions = true;
- options.compaction_service =
- std::make_shared<MyTestCompactionService>(dbname_, options);
- DestroyAndReopen(options);
+ ReopenWithCompactionService(&options);
GenerateTestData();
- auto my_cs =
- dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
+ auto my_cs = GetCompactionService();
std::string start_str = Key(15);
std::string end_str = Key(45);
VerifyTestData();
}
-TEST_F(CompactionServiceTest, FailedToStart) {
+TEST_P(CompactionServiceTest, FailedToStart) {
Options options = CurrentOptions();
- options.env = env_;
options.disable_auto_compactions = true;
- options.compaction_service =
- std::make_shared<MyTestCompactionService>(dbname_, options);
- DestroyAndReopen(options);
+ ReopenWithCompactionService(&options);
+
GenerateTestData();
- SyncPoint::GetInstance()->SetCallBack(
- "MyTestCompactionService::Start::End", [&](void* status) {
- // override job status
- auto s = static_cast<CompactionServiceJobStatus*>(status);
- *s = CompactionServiceJobStatus::kFailure;
- });
- SyncPoint::GetInstance()->EnableProcessing();
+ auto my_cs = GetCompactionService();
+ my_cs->OverrideStartStatus(CompactionServiceJobStatus::kFailure);
std::string start_str = Key(15);
std::string end_str = Key(45);
ASSERT_TRUE(s.IsIncomplete());
}
-TEST_F(CompactionServiceTest, InvalidResult) {
+TEST_P(CompactionServiceTest, InvalidResult) {
Options options = CurrentOptions();
- options.env = env_;
options.disable_auto_compactions = true;
- options.compaction_service =
- std::make_shared<MyTestCompactionService>(dbname_, options);
- DestroyAndReopen(options);
+ ReopenWithCompactionService(&options);
+
GenerateTestData();
- SyncPoint::GetInstance()->SetCallBack(
- "MyTestCompactionService::WaitForComplete::End", [&](void* result) {
- // override job status
- auto result_str = static_cast<std::string*>(result);
- *result_str = "Invalid Str";
- });
- SyncPoint::GetInstance()->EnableProcessing();
+ auto my_cs = GetCompactionService();
+ my_cs->OverrideWaitResult("Invalid Str");
std::string start_str = Key(15);
std::string end_str = Key(45);
ASSERT_FALSE(s.ok());
}
-TEST_F(CompactionServiceTest, SubCompaction) {
+TEST_P(CompactionServiceTest, SubCompaction) {
Options options = CurrentOptions();
- options.env = env_;
options.max_subcompactions = 10;
options.target_file_size_base = 1 << 10; // 1KB
options.disable_auto_compactions = true;
- options.compaction_service =
- std::make_shared<MyTestCompactionService>(dbname_, options);
+ ReopenWithCompactionService(&options);
- DestroyAndReopen(options);
GenerateTestData();
VerifyTestData();
- auto my_cs =
- dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
+ auto my_cs = GetCompactionService();
int compaction_num_before = my_cs->GetCompactionNum();
auto cro = CompactRangeOptions();
const char* Name() const override { return "PartialDeleteCompactionFilter"; }
};
-TEST_F(CompactionServiceTest, CompactionFilter) {
+TEST_P(CompactionServiceTest, CompactionFilter) {
Options options = CurrentOptions();
- options.env = env_;
std::unique_ptr<CompactionFilter> delete_comp_filter(
new PartialDeleteCompactionFilter());
options.compaction_filter = delete_comp_filter.get();
- options.compaction_service =
- std::make_shared<MyTestCompactionService>(dbname_, options);
-
- DestroyAndReopen(options);
+ ReopenWithCompactionService(&options);
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
ASSERT_EQ(result, "value_new" + ToString(i));
}
}
- auto my_cs =
- dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
+ auto my_cs = GetCompactionService();
ASSERT_GE(my_cs->GetCompactionNum(), 1);
}
-TEST_F(CompactionServiceTest, Snapshot) {
+TEST_P(CompactionServiceTest, Snapshot) {
Options options = CurrentOptions();
- options.env = env_;
- options.compaction_service =
- std::make_shared<MyTestCompactionService>(dbname_, options);
-
- DestroyAndReopen(options);
+ ReopenWithCompactionService(&options);
ASSERT_OK(Put(Key(1), "value1"));
ASSERT_OK(Put(Key(2), "value1"));
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- auto my_cs =
- dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
+ auto my_cs = GetCompactionService();
ASSERT_GE(my_cs->GetCompactionNum(), 1);
ASSERT_EQ("value1", Get(Key(1), s1));
ASSERT_EQ("value2", Get(Key(1)));
db_->ReleaseSnapshot(s1);
}
-TEST_F(CompactionServiceTest, ConcurrentCompaction) {
+TEST_P(CompactionServiceTest, ConcurrentCompaction) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 100;
- options.env = env_;
- options.compaction_service =
- std::make_shared<MyTestCompactionService>(dbname_, options);
options.max_background_jobs = 20;
-
- DestroyAndReopen(options);
+ ReopenWithCompactionService(&options);
GenerateTestData();
ColumnFamilyMetaData meta;
ASSERT_EQ(result, "value_new" + ToString(i));
}
}
- auto my_cs =
- dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
+ auto my_cs = GetCompactionService();
ASSERT_EQ(my_cs->GetCompactionNum(), 10);
ASSERT_EQ(FilesPerLevel(), "0,0,10");
}
+TEST_P(CompactionServiceTest, CompactionInfo) {
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ ReopenWithCompactionService(&options);
+ GenerateTestData();
+
+ auto my_cs = GetCompactionService();
+
+ std::string start_str = Key(15);
+ std::string end_str = Key(45);
+ Slice start(start_str);
+ Slice end(end_str);
+ uint64_t comp_num = my_cs->GetCompactionNum();
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
+ ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
+ VerifyTestData();
+ // only test compaction info for new compaction service interface
+ if (GetParam() == MyTestCompactionServiceType) {
+ auto cs = static_cast_with_check<MyTestCompactionService>(my_cs);
+ CompactionServiceJobInfo info = cs->GetCompactionInfoForStart();
+ ASSERT_EQ(dbname_, info.db_name);
+ std::string db_id, db_session_id;
+ ASSERT_OK(db_->GetDbIdentity(db_id));
+ ASSERT_EQ(db_id, info.db_id);
+ ASSERT_OK(db_->GetDbSessionId(db_session_id));
+ ASSERT_EQ(db_session_id, info.db_session_id);
+ info = cs->GetCompactionInfoForWait();
+ ASSERT_EQ(dbname_, info.db_name);
+ ASSERT_EQ(db_id, info.db_id);
+ ASSERT_EQ(db_session_id, info.db_session_id);
+ }
+}
+
+INSTANTIATE_TEST_CASE_P(
+ CompactionServiceTest, CompactionServiceTest,
+ ::testing::Values(
+ TestCompactionServiceType::MyTestCompactionServiceType,
+ TestCompactionServiceType::MyTestCompactionServiceLegacyType));
+
} // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS