]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Add compaction priority information in RemoteCompaction (#8707)
authorJay Zhuang <zjay@fb.com>
Thu, 16 Sep 2021 22:08:23 +0000 (15:08 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Thu, 16 Sep 2021 22:09:35 +0000 (15:09 -0700)
Summary:
Add compaction priority information in RemoteCompaction, which
can be used to schedule high priority job first.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8707

Test Plan: unittest

Reviewed By: ajkr

Differential Revision: D30548401

Pulled By: jay-zhuang

fbshipit-source-id: b30446511fb31b4583c49edd8565d496cf013a34

HISTORY.md
db/compaction/compaction_job.cc
db/compaction/compaction_service_test.cc
include/rocksdb/options.h

index a38c1fbc7c11493ac93051339d72ea257c794e32..5d07ebaad386f9b81e59ad9d2fe7589bd20805df 100644 (file)
@@ -21,6 +21,7 @@
 * Added two new RateLimiter IOPriorities: `Env::IO_USER`,`Env::IO_MID`. `Env::IO_USER` will have superior priority over all other RateLimiter IOPriorities without being subject to fair scheduling constraint.
 * `SstFileWriter` now supports `Put`s and `Delete`s with user-defined timestamps. Note that the ingestion logic itself is not timestamp-aware yet.
 * Allow a single write batch to include keys from multiple column families whose timestamps' formats can differ. For example, some column families may disable timestamp, while others enable timestamp.
+* Add compaction priority information in RemoteCompaction, which can be used to schedule high priority job first.
 
 ### Public API change
 * Remove obsolete implementation details FullKey and ParseFullKey from public API
index 7c3a98e32f3dd9a8084346f81d5d472eabbe1a03..3e2ba08e76da6c0b6173069cbd6f01b352d96186 100644 (file)
@@ -985,7 +985,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
       compaction_input.column_family.name.c_str(), job_id_,
       compaction_input.output_level, input_files_oss.str().c_str());
   CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
-                                GetCompactionId(sub_compact));
+                                GetCompactionId(sub_compact), thread_pri_);
   CompactionServiceJobStatus compaction_status =
       db_options_.compaction_service->StartV2(info, compaction_input_binary);
   if (compaction_status != CompactionServiceJobStatus::kSuccess) {
index e2c326be91c870f0fa0685cc50240b94ba49e95b..4555dba39a7067d2d83aac0d436fdee44fbd3a84 100644 (file)
@@ -123,8 +123,8 @@ class MyTestCompactionService : public CompactionService,
       : db_path_(std::move(db_path)),
         options_(options),
         statistics_(statistics),
-        start_info_("na", "na", "na", 0),
-        wait_info_("na", "na", "na", 0) {}
+        start_info_("na", "na", "na", 0, Env::TOTAL),
+        wait_info_("na", "na", "na", 0, Env::TOTAL) {}
 
   static const char* kClassName() { return "MyTestCompactionService"; }
 
@@ -575,36 +575,87 @@ TEST_P(CompactionServiceTest, ConcurrentCompaction) {
 }
 
 TEST_P(CompactionServiceTest, CompactionInfo) {
+  // only test compaction info for new compaction service interface
+  if (GetParam() != MyTestCompactionServiceType) {
+    return;
+  }
+
   Options options = CurrentOptions();
-  options.disable_auto_compactions = true;
   ReopenWithCompactionService(&options);
-  GenerateTestData();
 
-  auto my_cs = GetCompactionService();
+  for (int i = 0; i < 20; i++) {
+    for (int j = 0; j < 10; j++) {
+      int key_id = i * 10 + j;
+      ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
+    }
+    ASSERT_OK(Flush());
+  }
 
-  std::string start_str = Key(15);
-  std::string end_str = Key(45);
-  Slice start(start_str);
-  Slice end(end_str);
+  for (int i = 0; i < 10; i++) {
+    for (int j = 0; j < 10; j++) {
+      int key_id = i * 20 + j * 2;
+      ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
+    }
+    ASSERT_OK(Flush());
+  }
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  auto my_cs =
+      static_cast_with_check<MyTestCompactionService>(GetCompactionService());
   uint64_t comp_num = my_cs->GetCompactionNum();
-  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
-  ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
-  VerifyTestData();
-  // only test compaction info for new compaction service interface
-  if (GetParam() == MyTestCompactionServiceType) {
-    auto cs = static_cast_with_check<MyTestCompactionService>(my_cs);
-    CompactionServiceJobInfo info = cs->GetCompactionInfoForStart();
-    ASSERT_EQ(dbname_, info.db_name);
-    std::string db_id, db_session_id;
-    ASSERT_OK(db_->GetDbIdentity(db_id));
-    ASSERT_EQ(db_id, info.db_id);
-    ASSERT_OK(db_->GetDbSessionId(db_session_id));
-    ASSERT_EQ(db_session_id, info.db_session_id);
-    info = cs->GetCompactionInfoForWait();
-    ASSERT_EQ(dbname_, info.db_name);
-    ASSERT_EQ(db_id, info.db_id);
-    ASSERT_EQ(db_session_id, info.db_session_id);
+  ASSERT_GE(comp_num, 1);
+
+  CompactionServiceJobInfo info = my_cs->GetCompactionInfoForStart();
+  ASSERT_EQ(dbname_, info.db_name);
+  std::string db_id, db_session_id;
+  ASSERT_OK(db_->GetDbIdentity(db_id));
+  ASSERT_EQ(db_id, info.db_id);
+  ASSERT_OK(db_->GetDbSessionId(db_session_id));
+  ASSERT_EQ(db_session_id, info.db_session_id);
+  ASSERT_EQ(Env::LOW, info.priority);
+  info = my_cs->GetCompactionInfoForWait();
+  ASSERT_EQ(dbname_, info.db_name);
+  ASSERT_EQ(db_id, info.db_id);
+  ASSERT_EQ(db_session_id, info.db_session_id);
+  ASSERT_EQ(Env::LOW, info.priority);
+
+  // Test priority USER
+  ColumnFamilyMetaData meta;
+  db_->GetColumnFamilyMetaData(&meta);
+  SstFileMetaData file = meta.levels[1].files[0];
+  ASSERT_OK(db_->CompactFiles(CompactionOptions(),
+                              {file.db_path + "/" + file.name}, 2));
+  info = my_cs->GetCompactionInfoForStart();
+  ASSERT_EQ(Env::USER, info.priority);
+  info = my_cs->GetCompactionInfoForWait();
+  ASSERT_EQ(Env::USER, info.priority);
+
+  // Test priority BOTTOM
+  env_->SetBackgroundThreads(1, Env::BOTTOM);
+  options.num_levels = 2;
+  ReopenWithCompactionService(&options);
+  my_cs =
+      static_cast_with_check<MyTestCompactionService>(GetCompactionService());
+
+  for (int i = 0; i < 20; i++) {
+    for (int j = 0; j < 10; j++) {
+      int key_id = i * 10 + j;
+      ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
+    }
+    ASSERT_OK(Flush());
   }
+
+  for (int i = 0; i < 4; i++) {
+    for (int j = 0; j < 10; j++) {
+      int key_id = i * 20 + j * 2;
+      ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
+    }
+    ASSERT_OK(Flush());
+  }
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  info = my_cs->GetCompactionInfoForStart();
+  ASSERT_EQ(Env::BOTTOM, info.priority);
+  info = my_cs->GetCompactionInfoForWait();
+  ASSERT_EQ(Env::BOTTOM, info.priority);
 }
 
 INSTANTIATE_TEST_CASE_P(
index 3160fec7f7fce00d7565afb66621bd6272a07c6b..05506083230b2cca72f1e830915630f5420bdc7b 100644 (file)
@@ -384,13 +384,16 @@ struct CompactionServiceJobInfo {
                     // `db_session_id` could help you build unique id across
                     // different DBs and sessions.
 
-  // TODO: Add priority information
+  Env::Priority priority;
+
   CompactionServiceJobInfo(std::string db_name_, std::string db_id_,
-                           std::string db_session_id_, uint64_t job_id_)
+                           std::string db_session_id_, uint64_t job_id_,
+                           Env::Priority priority_)
       : db_name(std::move(db_name_)),
         db_id(std::move(db_id_)),
         db_session_id(std::move(db_session_id_)),
-        job_id(job_id_) {}
+        job_id(job_id_),
+        priority(priority_) {}
 };
 
 class CompactionService : public Customizable {