]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Add extra information to RemoteCompaction APIs (#8680)
authorJay Zhuang <zjay@fb.com>
Mon, 23 Aug 2021 23:26:13 +0000 (16:26 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Mon, 23 Aug 2021 23:27:38 +0000 (16:27 -0700)
Summary:
Currently, we only provide job_id in RemoteCompaction APIs, the
main problem of `job_id` is it cannot uniquely identify a compaction job
between DB instances or between sessions.
Providing DB and session id to the user, which will make building cross
DB compaction service easier.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8680

Test Plan: unittest

Reviewed By: ajkr

Differential Revision: D30444859

Pulled By: jay-zhuang

fbshipit-source-id: fdf107f4286564049637f154193c6d94c3c59448

HISTORY.md
db/compaction/compaction_job.cc
db/compaction/compaction_service_test.cc
include/rocksdb/options.h

index 3add487c7c220c0f18d3ecca9416384d2be59fc1..d8953b190c391a50ce9b669a0992e687eb6dbc51 100644 (file)
@@ -1,4 +1,8 @@
 # Rocksdb Change Log
+## Unreleased
+### New Features
+* RemoteCompaction's interface now includes `db_name`, `db_id`, `session_id`, which could help the user uniquely identify compaction job between db instances and sessions.
+
 ## 6.24.0 (2021-08-20)
 ### Bug Fixes
 * If the primary's CURRENT file is missing or inaccessible, the secondary instance should not hang repeatedly trying to switch to a new MANIFEST. It should instead return the error code encountered while accessing the file.
index fbbb3f8420bfa3cc5b45bb66b54f4a25ed034805..7c3a98e32f3dd9a8084346f81d5d472eabbe1a03 100644 (file)
@@ -984,9 +984,10 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
       "[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
       compaction_input.column_family.name.c_str(), job_id_,
       compaction_input.output_level, input_files_oss.str().c_str());
+  CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
+                                GetCompactionId(sub_compact));
   CompactionServiceJobStatus compaction_status =
-      db_options_.compaction_service->Start(compaction_input_binary,
-                                            GetCompactionId(sub_compact));
+      db_options_.compaction_service->StartV2(info, compaction_input_binary);
   if (compaction_status != CompactionServiceJobStatus::kSuccess) {
     sub_compact->status =
         Status::Incomplete("CompactionService failed to start compaction job.");
@@ -994,8 +995,8 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
   }
 
   std::string compaction_result_binary;
-  compaction_status = db_options_.compaction_service->WaitForComplete(
-      GetCompactionId(sub_compact), &compaction_result_binary);
+  compaction_status = db_options_.compaction_service->WaitForCompleteV2(
+      info, &compaction_result_binary);
 
   CompactionServiceResult compaction_result;
   s = CompactionServiceResult::Read(compaction_result_binary,
index e20c6f865664ced24f38fb84a3d7ddb931197fd8..e2c326be91c870f0fa0685cc50240b94ba49e95b 100644 (file)
 
 namespace ROCKSDB_NAMESPACE {
 
-class MyTestCompactionService : public CompactionService {
+class TestCompactionServiceBase {
  public:
-  MyTestCompactionService(const std::string& db_path, Options& options,
-                          std::shared_ptr<Statistics> statistics = nullptr)
-      : db_path_(db_path), options_(options), statistics_(statistics) {}
+  virtual int GetCompactionNum() = 0;
 
-  static const char* kClassName() { return "MyTestCompactionService"; }
+  void OverrideStartStatus(CompactionServiceJobStatus s) {
+    is_override_start_status = true;
+    override_start_status = s;
+  }
+
+  void OverrideWaitResult(std::string str) {
+    is_override_wait_result = true;
+    override_wait_result = std::move(str);
+  }
+
+  void ResetOverride() {
+    is_override_wait_result = false;
+    is_override_start_status = false;
+  }
+
+  virtual ~TestCompactionServiceBase() = default;
+
+ protected:
+  bool is_override_start_status = false;
+  CompactionServiceJobStatus override_start_status =
+      CompactionServiceJobStatus::kFailure;
+  bool is_override_wait_result = false;
+  std::string override_wait_result;
+};
+
+class MyTestCompactionServiceLegacy : public CompactionService,
+                                      public TestCompactionServiceBase {
+ public:
+  MyTestCompactionServiceLegacy(std::string db_path, Options& options,
+                                std::shared_ptr<Statistics>& statistics)
+      : db_path_(std::move(db_path)),
+        options_(options),
+        statistics_(statistics) {}
+
+  static const char* kClassName() { return "MyTestCompactionServiceLegacy"; }
 
   const char* Name() const override { return kClassName(); }
 
@@ -25,7 +57,9 @@ class MyTestCompactionService : public CompactionService {
     InstrumentedMutexLock l(&mutex_);
     jobs_.emplace(job_id, compaction_service_input);
     CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
-    TEST_SYNC_POINT_CALLBACK("MyTestCompactionService::Start::End", &s);
+    if (is_override_start_status) {
+      return override_start_status;
+    }
     return s;
   }
 
@@ -59,8 +93,9 @@ class MyTestCompactionService : public CompactionService {
     Status s = DB::OpenAndCompact(
         db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(job_id),
         compaction_input, compaction_service_result, options_override);
-    TEST_SYNC_POINT_CALLBACK("MyTestCompactionService::WaitForComplete::End",
-                             compaction_service_result);
+    if (is_override_wait_result) {
+      *compaction_service_result = override_wait_result;
+    }
     compaction_num_.fetch_add(1);
     if (s.ok()) {
       return CompactionServiceJobStatus::kSuccess;
@@ -69,7 +104,7 @@ class MyTestCompactionService : public CompactionService {
     }
   }
 
-  int GetCompactionNum() { return compaction_num_.load(); }
+  int GetCompactionNum() override { return compaction_num_.load(); }
 
  private:
   InstrumentedMutex mutex_;
@@ -80,12 +115,140 @@ class MyTestCompactionService : public CompactionService {
   std::shared_ptr<Statistics> statistics_;
 };
 
-class CompactionServiceTest : public DBTestBase {
+class MyTestCompactionService : public CompactionService,
+                                public TestCompactionServiceBase {
+ public:
+  MyTestCompactionService(std::string db_path, Options& options,
+                          std::shared_ptr<Statistics>& statistics)
+      : db_path_(std::move(db_path)),
+        options_(options),
+        statistics_(statistics),
+        start_info_("na", "na", "na", 0),
+        wait_info_("na", "na", "na", 0) {}
+
+  static const char* kClassName() { return "MyTestCompactionService"; }
+
+  const char* Name() const override { return kClassName(); }
+
+  CompactionServiceJobStatus StartV2(
+      const CompactionServiceJobInfo& info,
+      const std::string& compaction_service_input) override {
+    InstrumentedMutexLock l(&mutex_);
+    start_info_ = info;
+    assert(info.db_name == db_path_);
+    jobs_.emplace(info.job_id, compaction_service_input);
+    CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
+    if (is_override_start_status) {
+      return override_start_status;
+    }
+    return s;
+  }
+
+  CompactionServiceJobStatus WaitForCompleteV2(
+      const CompactionServiceJobInfo& info,
+      std::string* compaction_service_result) override {
+    std::string compaction_input;
+    assert(info.db_name == db_path_);
+    {
+      InstrumentedMutexLock l(&mutex_);
+      wait_info_ = info;
+      auto i = jobs_.find(info.job_id);
+      if (i == jobs_.end()) {
+        return CompactionServiceJobStatus::kFailure;
+      }
+      compaction_input = std::move(i->second);
+      jobs_.erase(i);
+    }
+
+    CompactionServiceOptionsOverride options_override;
+    options_override.env = options_.env;
+    options_override.file_checksum_gen_factory =
+        options_.file_checksum_gen_factory;
+    options_override.comparator = options_.comparator;
+    options_override.merge_operator = options_.merge_operator;
+    options_override.compaction_filter = options_.compaction_filter;
+    options_override.compaction_filter_factory =
+        options_.compaction_filter_factory;
+    options_override.prefix_extractor = options_.prefix_extractor;
+    options_override.table_factory = options_.table_factory;
+    options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
+    options_override.statistics = statistics_;
+
+    Status s = DB::OpenAndCompact(
+        db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(info.job_id),
+        compaction_input, compaction_service_result, options_override);
+    if (is_override_wait_result) {
+      *compaction_service_result = override_wait_result;
+    }
+    compaction_num_.fetch_add(1);
+    if (s.ok()) {
+      return CompactionServiceJobStatus::kSuccess;
+    } else {
+      return CompactionServiceJobStatus::kFailure;
+    }
+  }
+
+  int GetCompactionNum() override { return compaction_num_.load(); }
+
+  CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; }
+  CompactionServiceJobInfo GetCompactionInfoForWait() { return wait_info_; }
+
+ private:
+  InstrumentedMutex mutex_;
+  std::atomic_int compaction_num_{0};
+  std::map<uint64_t, std::string> jobs_;
+  const std::string db_path_;
+  Options options_;
+  std::shared_ptr<Statistics> statistics_;
+  CompactionServiceJobInfo start_info_;
+  CompactionServiceJobInfo wait_info_;
+};
+
+// This is only for listing test classes
+enum TestCompactionServiceType {
+  MyTestCompactionServiceType,
+  MyTestCompactionServiceLegacyType,
+};
+
+class CompactionServiceTest
+    : public DBTestBase,
+      public testing::WithParamInterface<TestCompactionServiceType> {
  public:
   explicit CompactionServiceTest()
       : DBTestBase("compaction_service_test", true) {}
 
  protected:
+  void ReopenWithCompactionService(Options* options) {
+    options->env = env_;
+    primary_statistics_ = CreateDBStatistics();
+    options->statistics = primary_statistics_;
+    compactor_statistics_ = CreateDBStatistics();
+    TestCompactionServiceType cs_type = GetParam();
+    switch (cs_type) {
+      case MyTestCompactionServiceType:
+        compaction_service_ = std::make_shared<MyTestCompactionService>(
+            dbname_, *options, compactor_statistics_);
+        break;
+      case MyTestCompactionServiceLegacyType:
+        compaction_service_ = std::make_shared<MyTestCompactionServiceLegacy>(
+            dbname_, *options, compactor_statistics_);
+        break;
+      default:
+        assert(false);
+    }
+    options->compaction_service = compaction_service_;
+    DestroyAndReopen(*options);
+  }
+
+  Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); }
+
+  Statistics* GetPrimaryStatistics() { return primary_statistics_.get(); }
+
+  TestCompactionServiceBase* GetCompactionService() {
+    CompactionService* cs = compaction_service_.get();
+    return dynamic_cast<TestCompactionServiceBase*>(cs);
+  }
+
   void GenerateTestData() {
     // Generate 20 files @ L2
     for (int i = 0; i < 20; i++) {
@@ -119,17 +282,16 @@ class CompactionServiceTest : public DBTestBase {
       }
     }
   }
+
+ private:
+  std::shared_ptr<Statistics> compactor_statistics_;
+  std::shared_ptr<Statistics> primary_statistics_;
+  std::shared_ptr<CompactionService> compaction_service_;
 };
 
-TEST_F(CompactionServiceTest, BasicCompactions) {
+TEST_P(CompactionServiceTest, BasicCompactions) {
   Options options = CurrentOptions();
-  options.env = env_;
-  options.statistics = CreateDBStatistics();
-  std::shared_ptr<Statistics> compactor_statistics = CreateDBStatistics();
-  options.compaction_service = std::make_shared<MyTestCompactionService>(
-      dbname_, options, compactor_statistics);
-
-  DestroyAndReopen(options);
+  ReopenWithCompactionService(&options);
 
   for (int i = 0; i < 20; i++) {
     for (int j = 0; j < 10; j++) {
@@ -157,21 +319,22 @@ TEST_F(CompactionServiceTest, BasicCompactions) {
       ASSERT_EQ(result, "value_new" + ToString(i));
     }
   }
-  auto my_cs =
-      dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
+  auto my_cs = GetCompactionService();
+  Statistics* compactor_statistics = GetCompactorStatistics();
   ASSERT_GE(my_cs->GetCompactionNum(), 1);
 
   // make sure the compaction statistics is only recorded on remote side
   ASSERT_GE(
       compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), 1);
-  ASSERT_EQ(options.statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
+  Statistics* primary_statistics = GetPrimaryStatistics();
+  ASSERT_EQ(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
             0);
 
   // Test failed compaction
   SyncPoint::GetInstance()->SetCallBack(
       "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
         // override job status
-        Status* s = static_cast<Status*>(status);
+        auto s = static_cast<Status*>(status);
         *s = Status::Aborted("MyTestCompactionService failed to compact!");
       });
   SyncPoint::GetInstance()->EnableProcessing();
@@ -200,17 +363,13 @@ TEST_F(CompactionServiceTest, BasicCompactions) {
   ASSERT_TRUE(s.IsAborted());
 }
 
-TEST_F(CompactionServiceTest, ManualCompaction) {
+TEST_P(CompactionServiceTest, ManualCompaction) {
   Options options = CurrentOptions();
-  options.env = env_;
   options.disable_auto_compactions = true;
-  options.compaction_service =
-      std::make_shared<MyTestCompactionService>(dbname_, options);
-  DestroyAndReopen(options);
+  ReopenWithCompactionService(&options);
   GenerateTestData();
 
-  auto my_cs =
-      dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
+  auto my_cs = GetCompactionService();
 
   std::string start_str = Key(15);
   std::string end_str = Key(45);
@@ -241,22 +400,15 @@ TEST_F(CompactionServiceTest, ManualCompaction) {
   VerifyTestData();
 }
 
-TEST_F(CompactionServiceTest, FailedToStart) {
+TEST_P(CompactionServiceTest, FailedToStart) {
   Options options = CurrentOptions();
-  options.env = env_;
   options.disable_auto_compactions = true;
-  options.compaction_service =
-      std::make_shared<MyTestCompactionService>(dbname_, options);
-  DestroyAndReopen(options);
+  ReopenWithCompactionService(&options);
+
   GenerateTestData();
 
-  SyncPoint::GetInstance()->SetCallBack(
-      "MyTestCompactionService::Start::End", [&](void* status) {
-        // override job status
-        auto s = static_cast<CompactionServiceJobStatus*>(status);
-        *s = CompactionServiceJobStatus::kFailure;
-      });
-  SyncPoint::GetInstance()->EnableProcessing();
+  auto my_cs = GetCompactionService();
+  my_cs->OverrideStartStatus(CompactionServiceJobStatus::kFailure);
 
   std::string start_str = Key(15);
   std::string end_str = Key(45);
@@ -266,22 +418,15 @@ TEST_F(CompactionServiceTest, FailedToStart) {
   ASSERT_TRUE(s.IsIncomplete());
 }
 
-TEST_F(CompactionServiceTest, InvalidResult) {
+TEST_P(CompactionServiceTest, InvalidResult) {
   Options options = CurrentOptions();
-  options.env = env_;
   options.disable_auto_compactions = true;
-  options.compaction_service =
-      std::make_shared<MyTestCompactionService>(dbname_, options);
-  DestroyAndReopen(options);
+  ReopenWithCompactionService(&options);
+
   GenerateTestData();
 
-  SyncPoint::GetInstance()->SetCallBack(
-      "MyTestCompactionService::WaitForComplete::End", [&](void* result) {
-        // override job status
-        auto result_str = static_cast<std::string*>(result);
-        *result_str = "Invalid Str";
-      });
-  SyncPoint::GetInstance()->EnableProcessing();
+  auto my_cs = GetCompactionService();
+  my_cs->OverrideWaitResult("Invalid Str");
 
   std::string start_str = Key(15);
   std::string end_str = Key(45);
@@ -291,21 +436,17 @@ TEST_F(CompactionServiceTest, InvalidResult) {
   ASSERT_FALSE(s.ok());
 }
 
-TEST_F(CompactionServiceTest, SubCompaction) {
+TEST_P(CompactionServiceTest, SubCompaction) {
   Options options = CurrentOptions();
-  options.env = env_;
   options.max_subcompactions = 10;
   options.target_file_size_base = 1 << 10;  // 1KB
   options.disable_auto_compactions = true;
-  options.compaction_service =
-      std::make_shared<MyTestCompactionService>(dbname_, options);
+  ReopenWithCompactionService(&options);
 
-  DestroyAndReopen(options);
   GenerateTestData();
   VerifyTestData();
 
-  auto my_cs =
-      dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
+  auto my_cs = GetCompactionService();
   int compaction_num_before = my_cs->GetCompactionNum();
 
   auto cro = CompactRangeOptions();
@@ -334,16 +475,12 @@ class PartialDeleteCompactionFilter : public CompactionFilter {
   const char* Name() const override { return "PartialDeleteCompactionFilter"; }
 };
 
-TEST_F(CompactionServiceTest, CompactionFilter) {
+TEST_P(CompactionServiceTest, CompactionFilter) {
   Options options = CurrentOptions();
-  options.env = env_;
   std::unique_ptr<CompactionFilter> delete_comp_filter(
       new PartialDeleteCompactionFilter());
   options.compaction_filter = delete_comp_filter.get();
-  options.compaction_service =
-      std::make_shared<MyTestCompactionService>(dbname_, options);
-
-  DestroyAndReopen(options);
+  ReopenWithCompactionService(&options);
 
   for (int i = 0; i < 20; i++) {
     for (int j = 0; j < 10; j++) {
@@ -375,18 +512,13 @@ TEST_F(CompactionServiceTest, CompactionFilter) {
       ASSERT_EQ(result, "value_new" + ToString(i));
     }
   }
-  auto my_cs =
-      dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
+  auto my_cs = GetCompactionService();
   ASSERT_GE(my_cs->GetCompactionNum(), 1);
 }
 
-TEST_F(CompactionServiceTest, Snapshot) {
+TEST_P(CompactionServiceTest, Snapshot) {
   Options options = CurrentOptions();
-  options.env = env_;
-  options.compaction_service =
-      std::make_shared<MyTestCompactionService>(dbname_, options);
-
-  DestroyAndReopen(options);
+  ReopenWithCompactionService(&options);
 
   ASSERT_OK(Put(Key(1), "value1"));
   ASSERT_OK(Put(Key(2), "value1"));
@@ -398,23 +530,18 @@ TEST_F(CompactionServiceTest, Snapshot) {
   ASSERT_OK(Flush());
 
   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
-  auto my_cs =
-      dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
+  auto my_cs = GetCompactionService();
   ASSERT_GE(my_cs->GetCompactionNum(), 1);
   ASSERT_EQ("value1", Get(Key(1), s1));
   ASSERT_EQ("value2", Get(Key(1)));
   db_->ReleaseSnapshot(s1);
 }
 
-TEST_F(CompactionServiceTest, ConcurrentCompaction) {
+TEST_P(CompactionServiceTest, ConcurrentCompaction) {
   Options options = CurrentOptions();
   options.level0_file_num_compaction_trigger = 100;
-  options.env = env_;
-  options.compaction_service =
-      std::make_shared<MyTestCompactionService>(dbname_, options);
   options.max_background_jobs = 20;
-
-  DestroyAndReopen(options);
+  ReopenWithCompactionService(&options);
   GenerateTestData();
 
   ColumnFamilyMetaData meta;
@@ -442,12 +569,50 @@ TEST_F(CompactionServiceTest, ConcurrentCompaction) {
       ASSERT_EQ(result, "value_new" + ToString(i));
     }
   }
-  auto my_cs =
-      dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
+  auto my_cs = GetCompactionService();
   ASSERT_EQ(my_cs->GetCompactionNum(), 10);
   ASSERT_EQ(FilesPerLevel(), "0,0,10");
 }
 
+TEST_P(CompactionServiceTest, CompactionInfo) {
+  Options options = CurrentOptions();
+  options.disable_auto_compactions = true;
+  ReopenWithCompactionService(&options);
+  GenerateTestData();
+
+  auto my_cs = GetCompactionService();
+
+  std::string start_str = Key(15);
+  std::string end_str = Key(45);
+  Slice start(start_str);
+  Slice end(end_str);
+  uint64_t comp_num = my_cs->GetCompactionNum();
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
+  ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
+  VerifyTestData();
+  // only test compaction info for new compaction service interface
+  if (GetParam() == MyTestCompactionServiceType) {
+    auto cs = static_cast_with_check<MyTestCompactionService>(my_cs);
+    CompactionServiceJobInfo info = cs->GetCompactionInfoForStart();
+    ASSERT_EQ(dbname_, info.db_name);
+    std::string db_id, db_session_id;
+    ASSERT_OK(db_->GetDbIdentity(db_id));
+    ASSERT_EQ(db_id, info.db_id);
+    ASSERT_OK(db_->GetDbSessionId(db_session_id));
+    ASSERT_EQ(db_session_id, info.db_session_id);
+    info = cs->GetCompactionInfoForWait();
+    ASSERT_EQ(dbname_, info.db_name);
+    ASSERT_EQ(db_id, info.db_id);
+    ASSERT_EQ(db_session_id, info.db_session_id);
+  }
+}
+
+INSTANTIATE_TEST_CASE_P(
+    CompactionServiceTest, CompactionServiceTest,
+    ::testing::Values(
+        TestCompactionServiceType::MyTestCompactionServiceType,
+        TestCompactionServiceType::MyTestCompactionServiceLegacyType));
+
 }  // namespace ROCKSDB_NAMESPACE
 
 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
index 224f6616c15bcc537f94eedc2d011618e7a0a448..3160fec7f7fce00d7565afb66621bd6272a07c6b 100644 (file)
@@ -375,27 +375,70 @@ enum class CompactionServiceJobStatus : char {
   kUseLocal,  // TODO: Add support for use local compaction
 };
 
+struct CompactionServiceJobInfo {
+  std::string db_name;
+  std::string db_id;
+  std::string db_session_id;
+  uint64_t job_id;  // job_id is only unique within the current DB and session,
+                    // restart DB will reset the job_id. `db_id` and
+                    // `db_session_id` could help you build unique id across
+                    // different DBs and sessions.
+
+  // TODO: Add priority information
+  CompactionServiceJobInfo(std::string db_name_, std::string db_id_,
+                           std::string db_session_id_, uint64_t job_id_)
+      : db_name(std::move(db_name_)),
+        db_id(std::move(db_id_)),
+        db_session_id(std::move(db_session_id_)),
+        job_id(job_id_) {}
+};
+
 class CompactionService : public Customizable {
  public:
   static const char* Type() { return "CompactionService"; }
 
   // Returns the name of this compaction service.
-  virtual const char* Name() const = 0;
+  const char* Name() const override = 0;
 
   // Start the compaction with input information, which can be passed to
   // `DB::OpenAndCompact()`.
   // job_id is pre-assigned, it will be reset after DB re-open.
-  // TODO: sub-compaction is not supported, as they will have the same job_id, a
-  // sub-compaction id might be added
+  // Warning: deprecated, please use the new interface
+  // `StartV2(CompactionServiceJobInfo, ...)` instead.
   virtual CompactionServiceJobStatus Start(
-      const std::string& compaction_service_input, uint64_t job_id) = 0;
+      const std::string& /*compaction_service_input*/, uint64_t /*job_id*/) {
+    return CompactionServiceJobStatus::kUseLocal;
+  }
+
+  // Start the remote compaction with `compaction_service_input`, which can be
+  // passed to `DB::OpenAndCompact()` on the remote side. `info` provides the
+  // information the user might want to know, which includes `job_id`.
+  virtual CompactionServiceJobStatus StartV2(
+      const CompactionServiceJobInfo& info,
+      const std::string& compaction_service_input) {
+    // Default implementation to call legacy interface, please override and
+    // replace the legacy implementation
+    return Start(compaction_service_input, info.job_id);
+  }
 
   // Wait compaction to be finish.
-  // TODO: Add output path override
+  // Warning: deprecated, please use the new interface
+  // `WaitForCompleteV2(CompactionServiceJobInfo, ...)` instead.
   virtual CompactionServiceJobStatus WaitForComplete(
-      uint64_t job_id, std::string* compaction_service_result) = 0;
-
-  virtual ~CompactionService() {}
+      uint64_t /*job_id*/, std::string* /*compaction_service_result*/) {
+    return CompactionServiceJobStatus::kUseLocal;
+  }
+
+  // Wait for remote compaction to finish.
+  virtual CompactionServiceJobStatus WaitForCompleteV2(
+      const CompactionServiceJobInfo& info,
+      std::string* compaction_service_result) {
+    // Default implementation to call legacy interface, please override and
+    // replace the legacy implementation
+    return WaitForComplete(info.job_id, compaction_service_result);
+  }
+
+  ~CompactionService() override = default;
 };
 
 struct DBOptions {