* Add compaction priority information in RemoteCompaction, which can be used to schedule high priority job first.
* Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file.
* Batch blob read requests for `DB::MultiGet` using `MultiRead`.
+* Add support for fallback to local compaction, the user can return `CompactionServiceJobStatus::kUseLocal` to instruct RocksDB to run the compaction locally instead of waiting for the remote compaction result.
### Public API change
* Remove obsolete implementation details FullKey and ParseFullKey from public API
}
#ifndef ROCKSDB_LITE
-void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
+CompactionServiceJobStatus
+CompactionJob::ProcessKeyValueCompactionWithCompactionService(
SubcompactionState* sub_compact) {
assert(sub_compact);
assert(sub_compact->compaction);
Status s = compaction_input.Write(&compaction_input_binary);
if (!s.ok()) {
sub_compact->status = s;
- return;
+ return CompactionServiceJobStatus::kFailure;
}
std::ostringstream input_files_oss;
GetCompactionId(sub_compact), thread_pri_);
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->StartV2(info, compaction_input_binary);
- if (compaction_status != CompactionServiceJobStatus::kSuccess) {
- sub_compact->status =
- Status::Incomplete("CompactionService failed to start compaction job.");
- return;
+ switch (compaction_status) {
+ case CompactionServiceJobStatus::kSuccess:
+ break;
+ case CompactionServiceJobStatus::kFailure:
+ sub_compact->status = Status::Incomplete(
+ "CompactionService failed to start compaction job.");
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "[%s] [JOB %d] Remote compaction failed to start.",
+ compaction_input.column_family.name.c_str(), job_id_);
+ return compaction_status;
+ case CompactionServiceJobStatus::kUseLocal:
+ ROCKS_LOG_INFO(
+ db_options_.info_log,
+ "[%s] [JOB %d] Remote compaction fallback to local by API Start.",
+ compaction_input.column_family.name.c_str(), job_id_);
+ return compaction_status;
+ default:
+ assert(false); // unknown status
+ break;
}
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "[%s] [JOB %d] Waiting for remote compaction...",
+ compaction_input.column_family.name.c_str(), job_id_);
std::string compaction_result_binary;
compaction_status = db_options_.compaction_service->WaitForCompleteV2(
info, &compaction_result_binary);
+ if (compaction_status == CompactionServiceJobStatus::kUseLocal) {
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "[%s] [JOB %d] Remote compaction fallback to local by API "
+ "WaitForComplete.",
+ compaction_input.column_family.name.c_str(), job_id_);
+ return compaction_status;
+ }
+
CompactionServiceResult compaction_result;
s = CompactionServiceResult::Read(compaction_result_binary,
&compaction_result);
- if (compaction_status != CompactionServiceJobStatus::kSuccess) {
- sub_compact->status =
- s.ok() ? compaction_result.status
- : Status::Incomplete(
- "CompactionService failed to run compaction job.");
- compaction_result.status.PermitUncheckedError();
+
+ if (compaction_status == CompactionServiceJobStatus::kFailure) {
+ if (s.ok()) {
+ if (compaction_result.status.ok()) {
+ sub_compact->status = Status::Incomplete(
+ "CompactionService failed to run the compaction job (even though "
+ "the internal status is okay).");
+ } else {
+ // set the current sub compaction status with the status returned from
+ // remote
+ sub_compact->status = compaction_result.status;
+ }
+ } else {
+ sub_compact->status = Status::Incomplete(
+ "CompactionService failed to run the compaction job (and no valid "
+ "result is returned).");
+ compaction_result.status.PermitUncheckedError();
+ }
ROCKS_LOG_WARN(db_options_.info_log,
- "[%s] [JOB %d] Remote compaction failed, status: %s",
- compaction_input.column_family.name.c_str(), job_id_,
- s.ToString().c_str());
- return;
+ "[%s] [JOB %d] Remote compaction failed.",
+ compaction_input.column_family.name.c_str(), job_id_);
+ return compaction_status;
}
if (!s.ok()) {
sub_compact->status = s;
compaction_result.status.PermitUncheckedError();
- return;
+ return CompactionServiceJobStatus::kFailure;
}
sub_compact->status = compaction_result.status;
if (!s.ok()) {
sub_compact->status = s;
- return;
+ return CompactionServiceJobStatus::kFailure;
}
for (const auto& file : compaction_result.output_files) {
s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr);
if (!s.ok()) {
sub_compact->status = s;
- return;
+ return CompactionServiceJobStatus::kFailure;
}
FileMetaData meta;
s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr);
if (!s.ok()) {
sub_compact->status = s;
- return;
+ return CompactionServiceJobStatus::kFailure;
}
meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size,
file.smallest_seqno, file.largest_seqno);
sub_compact->total_bytes = compaction_result.total_bytes;
IOSTATS_ADD(bytes_written, compaction_result.bytes_written);
IOSTATS_ADD(bytes_read, compaction_result.bytes_read);
+ return CompactionServiceJobStatus::kSuccess;
}
#endif // !ROCKSDB_LITE
#ifndef ROCKSDB_LITE
if (db_options_.compaction_service) {
- return ProcessKeyValueCompactionWithCompactionService(sub_compact);
+ CompactionServiceJobStatus comp_status =
+ ProcessKeyValueCompactionWithCompactionService(sub_compact);
+ if (comp_status == CompactionServiceJobStatus::kSuccess ||
+ comp_status == CompactionServiceJobStatus::kFailure) {
+ return;
+ }
+ // fallback to local compaction
+ assert(comp_status == CompactionServiceJobStatus::kUseLocal);
}
#endif // !ROCKSDB_LITE
// consecutive groups such that each group has a similar size.
void GenSubcompactionBoundaries();
- void ProcessKeyValueCompactionWithCompactionService(
+ CompactionServiceJobStatus ProcessKeyValueCompactionWithCompactionService(
SubcompactionState* sub_compact);
// update the thread status for starting a compaction.
override_start_status = s;
}
+ void OverrideWaitStatus(CompactionServiceJobStatus s) {
+ is_override_wait_status = true;
+ override_wait_status = s;
+ }
+
void OverrideWaitResult(std::string str) {
is_override_wait_result = true;
override_wait_result = std::move(str);
void ResetOverride() {
is_override_wait_result = false;
is_override_start_status = false;
+ is_override_wait_status = false;
}
virtual ~TestCompactionServiceBase() = default;
bool is_override_start_status = false;
CompactionServiceJobStatus override_start_status =
CompactionServiceJobStatus::kFailure;
+ bool is_override_wait_status = false;
+ CompactionServiceJobStatus override_wait_status =
+ CompactionServiceJobStatus::kFailure;
bool is_override_wait_result = false;
std::string override_wait_result;
};
jobs_.erase(i);
}
+ if (is_override_wait_status) {
+ return override_wait_status;
+ }
+
CompactionServiceOptionsOverride options_override;
options_override.env = options_.env;
options_override.file_checksum_gen_factory =
jobs_.erase(i);
}
+ if (is_override_wait_status) {
+ return override_wait_status;
+ }
+
CompactionServiceOptionsOverride options_override;
options_override.env = options_.env;
options_override.file_checksum_gen_factory =
Statistics* compactor_statistics = GetCompactorStatistics();
ASSERT_GE(my_cs->GetCompactionNum(), 1);
- // make sure the compaction statistics is only recorded on remote side
+ // make sure the compaction statistics is only recorded on the remote side
ASSERT_GE(
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), 1);
Statistics* primary_statistics = GetPrimaryStatistics();
ASSERT_EQ(Env::BOTTOM, info.priority);
}
+TEST_P(CompactionServiceTest, FallbackLocalAuto) {
+ Options options = CurrentOptions();
+ ReopenWithCompactionService(&options);
+
+ auto my_cs = GetCompactionService();
+ Statistics* compactor_statistics = GetCompactorStatistics();
+ Statistics* primary_statistics = GetPrimaryStatistics();
+ uint64_t compactor_new_key =
+ compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
+ uint64_t primary_new_key =
+ primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
+
+ my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal);
+
+ for (int i = 0; i < 20; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 10 + j;
+ ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 20 + j * 2;
+ ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ // verify result
+ for (int i = 0; i < 200; i++) {
+ auto result = Get(Key(i));
+ if (i % 2) {
+ ASSERT_EQ(result, "value" + ToString(i));
+ } else {
+ ASSERT_EQ(result, "value_new" + ToString(i));
+ }
+ }
+
+ ASSERT_EQ(my_cs->GetCompactionNum(), 0);
+
+ // make sure the compaction statistics is only recorded on the local side
+ ASSERT_EQ(
+ compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
+ compactor_new_key);
+ ASSERT_GT(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
+ primary_new_key);
+}
+
+TEST_P(CompactionServiceTest, FallbackLocalManual) {
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ ReopenWithCompactionService(&options);
+
+ GenerateTestData();
+ VerifyTestData();
+
+ auto my_cs = GetCompactionService();
+ Statistics* compactor_statistics = GetCompactorStatistics();
+ Statistics* primary_statistics = GetPrimaryStatistics();
+ uint64_t compactor_new_key =
+ compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
+ uint64_t primary_new_key =
+ primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
+
+ // re-enable remote compaction
+ my_cs->ResetOverride();
+ std::string start_str = Key(15);
+ std::string end_str = Key(45);
+ Slice start(start_str);
+ Slice end(end_str);
+ uint64_t comp_num = my_cs->GetCompactionNum();
+
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
+ ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
+ // make sure the compaction statistics is only recorded on the remote side
+ ASSERT_GT(
+ compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
+ compactor_new_key);
+ ASSERT_EQ(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
+ primary_new_key);
+
+ // return run local again with API WaitForComplete
+ my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal);
+ start_str = Key(120);
+ start = start_str;
+ comp_num = my_cs->GetCompactionNum();
+ compactor_new_key =
+ compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
+ primary_new_key =
+ primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
+
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
+ ASSERT_EQ(my_cs->GetCompactionNum(),
+ comp_num); // no remote compaction is run
+ // make sure the compaction statistics is only recorded on the local side
+ ASSERT_EQ(
+ compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
+ compactor_new_key);
+ ASSERT_GT(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
+ primary_new_key);
+
+ // verify result after 2 manual compactions
+ VerifyTestData();
+}
+
INSTANTIATE_TEST_CASE_P(
CompactionServiceTest, CompactionServiceTest,
::testing::Values(
enum class CompactionServiceJobStatus : char {
kSuccess,
kFailure,
- kUseLocal, // TODO: Add support for use local compaction
+ kUseLocal,
};
struct CompactionServiceJobInfo {