]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
RemoteCompaction support Fallback to local compaction (#8709)
authorJay Zhuang <zjay@fb.com>
Sat, 18 Sep 2021 06:24:03 +0000 (23:24 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Sat, 18 Sep 2021 07:25:04 +0000 (00:25 -0700)
Summary:
Add support for fallback to local compaction, the user can
return `CompactionServiceJobStatus::kUseLocal` to instruct RocksDB to
run the compaction locally instead of waiting for the remote compaction
result.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8709

Test Plan: unittest

Reviewed By: ajkr

Differential Revision: D30560163

Pulled By: jay-zhuang

fbshipit-source-id: 65d8905a4a1bc185a68daa120997f21d3198dbe1

HISTORY.md
db/compaction/compaction_job.cc
db/compaction/compaction_job.h
db/compaction/compaction_service_test.cc
include/rocksdb/options.h

index fa21fa5853145307bd99201adfdcfd8da0f9fe42..0cbf74ab4c48bc083fc4ddc87e3cf2a06eef9444 100644 (file)
@@ -24,6 +24,7 @@
 * Add compaction priority information in RemoteCompaction, which can be used to schedule high priority job first.
 * Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file.
 * Batch blob read requests for `DB::MultiGet` using `MultiRead`.
+* Add support for fallback to local compaction, the user can return `CompactionServiceJobStatus::kUseLocal` to instruct RocksDB to run the compaction locally instead of waiting for the remote compaction result.
 
 ### Public API change
 * Remove obsolete implementation details FullKey and ParseFullKey from public API
index a89fccb996fae6aef827f249b60b8ee74cd96ad4..1bdc4125f80d8eb9531b9dab2be0a0a7a80d2388 100644 (file)
@@ -932,7 +932,8 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
 }
 
 #ifndef ROCKSDB_LITE
-void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
+CompactionServiceJobStatus
+CompactionJob::ProcessKeyValueCompactionWithCompactionService(
     SubcompactionState* sub_compact) {
   assert(sub_compact);
   assert(sub_compact->compaction);
@@ -969,7 +970,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
   Status s = compaction_input.Write(&compaction_input_binary);
   if (!s.ok()) {
     sub_compact->status = s;
-    return;
+    return CompactionServiceJobStatus::kFailure;
   }
 
   std::ostringstream input_files_oss;
@@ -988,36 +989,73 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
                                 GetCompactionId(sub_compact), thread_pri_);
   CompactionServiceJobStatus compaction_status =
       db_options_.compaction_service->StartV2(info, compaction_input_binary);
-  if (compaction_status != CompactionServiceJobStatus::kSuccess) {
-    sub_compact->status =
-        Status::Incomplete("CompactionService failed to start compaction job.");
-    return;
+  switch (compaction_status) {
+    case CompactionServiceJobStatus::kSuccess:
+      break;
+    case CompactionServiceJobStatus::kFailure:
+      sub_compact->status = Status::Incomplete(
+          "CompactionService failed to start compaction job.");
+      ROCKS_LOG_WARN(db_options_.info_log,
+                     "[%s] [JOB %d] Remote compaction failed to start.",
+                     compaction_input.column_family.name.c_str(), job_id_);
+      return compaction_status;
+    case CompactionServiceJobStatus::kUseLocal:
+      ROCKS_LOG_INFO(
+          db_options_.info_log,
+          "[%s] [JOB %d] Remote compaction fallback to local by API Start.",
+          compaction_input.column_family.name.c_str(), job_id_);
+      return compaction_status;
+    default:
+      assert(false);  // unknown status
+      break;
   }
 
+  ROCKS_LOG_INFO(db_options_.info_log,
+                 "[%s] [JOB %d] Waiting for remote compaction...",
+                 compaction_input.column_family.name.c_str(), job_id_);
   std::string compaction_result_binary;
   compaction_status = db_options_.compaction_service->WaitForCompleteV2(
       info, &compaction_result_binary);
 
+  if (compaction_status == CompactionServiceJobStatus::kUseLocal) {
+    ROCKS_LOG_INFO(db_options_.info_log,
+                   "[%s] [JOB %d] Remote compaction fallback to local by API "
+                   "WaitForComplete.",
+                   compaction_input.column_family.name.c_str(), job_id_);
+    return compaction_status;
+  }
+
   CompactionServiceResult compaction_result;
   s = CompactionServiceResult::Read(compaction_result_binary,
                                     &compaction_result);
-  if (compaction_status != CompactionServiceJobStatus::kSuccess) {
-    sub_compact->status =
-        s.ok() ? compaction_result.status
-               : Status::Incomplete(
-                     "CompactionService failed to run compaction job.");
-    compaction_result.status.PermitUncheckedError();
+
+  if (compaction_status == CompactionServiceJobStatus::kFailure) {
+    if (s.ok()) {
+      if (compaction_result.status.ok()) {
+        sub_compact->status = Status::Incomplete(
+            "CompactionService failed to run the compaction job (even though "
+            "the internal status is okay).");
+      } else {
+        // set the current sub compaction status with the status returned from
+        // remote
+        sub_compact->status = compaction_result.status;
+      }
+    } else {
+      sub_compact->status = Status::Incomplete(
+          "CompactionService failed to run the compaction job (and no valid "
+          "result is returned).");
+      compaction_result.status.PermitUncheckedError();
+    }
     ROCKS_LOG_WARN(db_options_.info_log,
-                   "[%s] [JOB %d] Remote compaction failed, status: %s",
-                   compaction_input.column_family.name.c_str(), job_id_,
-                   s.ToString().c_str());
-    return;
+                   "[%s] [JOB %d] Remote compaction failed.",
+                   compaction_input.column_family.name.c_str(), job_id_);
+    return compaction_status;
   }
 
   if (!s.ok()) {
     sub_compact->status = s;
     compaction_result.status.PermitUncheckedError();
-    return;
+    return CompactionServiceJobStatus::kFailure;
   }
   sub_compact->status = compaction_result.status;
 
@@ -1037,7 +1075,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
 
   if (!s.ok()) {
     sub_compact->status = s;
-    return;
+    return CompactionServiceJobStatus::kFailure;
   }
 
   for (const auto& file : compaction_result.output_files) {
@@ -1048,7 +1086,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
     s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr);
     if (!s.ok()) {
       sub_compact->status = s;
-      return;
+      return CompactionServiceJobStatus::kFailure;
     }
 
     FileMetaData meta;
@@ -1056,7 +1094,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
     s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr);
     if (!s.ok()) {
       sub_compact->status = s;
-      return;
+      return CompactionServiceJobStatus::kFailure;
     }
     meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size,
                              file.smallest_seqno, file.largest_seqno);
@@ -1077,6 +1115,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
   sub_compact->total_bytes = compaction_result.total_bytes;
   IOSTATS_ADD(bytes_written, compaction_result.bytes_written);
   IOSTATS_ADD(bytes_read, compaction_result.bytes_read);
+  return CompactionServiceJobStatus::kSuccess;
 }
 #endif  // !ROCKSDB_LITE
 
@@ -1086,7 +1125,14 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
 
 #ifndef ROCKSDB_LITE
   if (db_options_.compaction_service) {
-    return ProcessKeyValueCompactionWithCompactionService(sub_compact);
+    CompactionServiceJobStatus comp_status =
+        ProcessKeyValueCompactionWithCompactionService(sub_compact);
+    if (comp_status == CompactionServiceJobStatus::kSuccess ||
+        comp_status == CompactionServiceJobStatus::kFailure) {
+      return;
+    }
+    // fallback to local compaction
+    assert(comp_status == CompactionServiceJobStatus::kUseLocal);
   }
 #endif  // !ROCKSDB_LITE
 
index 586bf86de064ddfdb86a8add5b3c3f277c1113cd..4f4563991b1b1dc03a8b09918ac214ed8ec544cb 100644 (file)
@@ -146,7 +146,7 @@ class CompactionJob {
   // consecutive groups such that each group has a similar size.
   void GenSubcompactionBoundaries();
 
-  void ProcessKeyValueCompactionWithCompactionService(
+  CompactionServiceJobStatus ProcessKeyValueCompactionWithCompactionService(
       SubcompactionState* sub_compact);
 
   // update the thread status for starting a compaction.
index 4555dba39a7067d2d83aac0d436fdee44fbd3a84..d68838182c267b2a67e360ce006301f496c1ad05 100644 (file)
@@ -19,6 +19,11 @@ class TestCompactionServiceBase {
     override_start_status = s;
   }
 
+  void OverrideWaitStatus(CompactionServiceJobStatus s) {
+    is_override_wait_status = true;
+    override_wait_status = s;
+  }
+
   void OverrideWaitResult(std::string str) {
     is_override_wait_result = true;
     override_wait_result = std::move(str);
@@ -27,6 +32,7 @@ class TestCompactionServiceBase {
   void ResetOverride() {
     is_override_wait_result = false;
     is_override_start_status = false;
+    is_override_wait_status = false;
   }
 
   virtual ~TestCompactionServiceBase() = default;
@@ -35,6 +41,9 @@ class TestCompactionServiceBase {
   bool is_override_start_status = false;
   CompactionServiceJobStatus override_start_status =
       CompactionServiceJobStatus::kFailure;
+  bool is_override_wait_status = false;
+  CompactionServiceJobStatus override_wait_status =
+      CompactionServiceJobStatus::kFailure;
   bool is_override_wait_result = false;
   std::string override_wait_result;
 };
@@ -76,6 +85,10 @@ class MyTestCompactionServiceLegacy : public CompactionService,
       jobs_.erase(i);
     }
 
+    if (is_override_wait_status) {
+      return override_wait_status;
+    }
+
     CompactionServiceOptionsOverride options_override;
     options_override.env = options_.env;
     options_override.file_checksum_gen_factory =
@@ -160,6 +173,10 @@ class MyTestCompactionService : public CompactionService,
       jobs_.erase(i);
     }
 
+    if (is_override_wait_status) {
+      return override_wait_status;
+    }
+
     CompactionServiceOptionsOverride options_override;
     options_override.env = options_.env;
     options_override.file_checksum_gen_factory =
@@ -323,7 +340,7 @@ TEST_P(CompactionServiceTest, BasicCompactions) {
   Statistics* compactor_statistics = GetCompactorStatistics();
   ASSERT_GE(my_cs->GetCompactionNum(), 1);
 
-  // make sure the compaction statistics is only recorded on remote side
+  // make sure the compaction statistics is only recorded on the remote side
   ASSERT_GE(
       compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), 1);
   Statistics* primary_statistics = GetPrimaryStatistics();
@@ -658,6 +675,114 @@ TEST_P(CompactionServiceTest, CompactionInfo) {
   ASSERT_EQ(Env::BOTTOM, info.priority);
 }
 
+TEST_P(CompactionServiceTest, FallbackLocalAuto) {
+  Options options = CurrentOptions();
+  ReopenWithCompactionService(&options);
+
+  auto my_cs = GetCompactionService();
+  Statistics* compactor_statistics = GetCompactorStatistics();
+  Statistics* primary_statistics = GetPrimaryStatistics();
+  uint64_t compactor_new_key =
+      compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
+  uint64_t primary_new_key =
+      primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
+
+  my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal);
+
+  for (int i = 0; i < 20; i++) {
+    for (int j = 0; j < 10; j++) {
+      int key_id = i * 10 + j;
+      ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
+    }
+    ASSERT_OK(Flush());
+  }
+
+  for (int i = 0; i < 10; i++) {
+    for (int j = 0; j < 10; j++) {
+      int key_id = i * 20 + j * 2;
+      ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
+    }
+    ASSERT_OK(Flush());
+  }
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+  // verify result
+  for (int i = 0; i < 200; i++) {
+    auto result = Get(Key(i));
+    if (i % 2) {
+      ASSERT_EQ(result, "value" + ToString(i));
+    } else {
+      ASSERT_EQ(result, "value_new" + ToString(i));
+    }
+  }
+
+  ASSERT_EQ(my_cs->GetCompactionNum(), 0);
+
+  // make sure the compaction statistics is only recorded on the local side
+  ASSERT_EQ(
+      compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
+      compactor_new_key);
+  ASSERT_GT(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
+            primary_new_key);
+}
+
+TEST_P(CompactionServiceTest, FallbackLocalManual) {
+  Options options = CurrentOptions();
+  options.disable_auto_compactions = true;
+  ReopenWithCompactionService(&options);
+
+  GenerateTestData();
+  VerifyTestData();
+
+  auto my_cs = GetCompactionService();
+  Statistics* compactor_statistics = GetCompactorStatistics();
+  Statistics* primary_statistics = GetPrimaryStatistics();
+  uint64_t compactor_new_key =
+      compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
+  uint64_t primary_new_key =
+      primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
+
+  // re-enable remote compaction
+  my_cs->ResetOverride();
+  std::string start_str = Key(15);
+  std::string end_str = Key(45);
+  Slice start(start_str);
+  Slice end(end_str);
+  uint64_t comp_num = my_cs->GetCompactionNum();
+
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
+  ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
+  // make sure the compaction statistics is only recorded on the remote side
+  ASSERT_GT(
+      compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
+      compactor_new_key);
+  ASSERT_EQ(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
+            primary_new_key);
+
+  // return run local again with API WaitForComplete
+  my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal);
+  start_str = Key(120);
+  start = start_str;
+  comp_num = my_cs->GetCompactionNum();
+  compactor_new_key =
+      compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
+  primary_new_key =
+      primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
+
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
+  ASSERT_EQ(my_cs->GetCompactionNum(),
+            comp_num);  // no remote compaction is run
+  // make sure the compaction statistics is only recorded on the local side
+  ASSERT_EQ(
+      compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
+      compactor_new_key);
+  ASSERT_GT(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
+            primary_new_key);
+
+  // verify result after 2 manual compactions
+  VerifyTestData();
+}
+
 INSTANTIATE_TEST_CASE_P(
     CompactionServiceTest, CompactionServiceTest,
     ::testing::Values(
index 05506083230b2cca72f1e830915630f5420bdc7b..968abc57a21cb76a4f89847021dbb7dca8f40632 100644 (file)
@@ -372,7 +372,7 @@ extern const char* kHostnameForDbHostId;
 enum class CompactionServiceJobStatus : char {
   kSuccess,
   kFailure,
-  kUseLocal,  // TODO: Add support for use local compaction
+  kUseLocal,
 };
 
 struct CompactionServiceJobInfo {