]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
More info in CompactionServiceJobInfo and CompactionJobStats (#13029)
authorJay Huh <jewoongh@meta.com>
Wed, 25 Sep 2024 17:26:15 +0000 (10:26 -0700)
committerJay Huh <jewoongh@meta.com>
Wed, 25 Sep 2024 18:43:29 +0000 (11:43 -0700)
Summary:
Add the following to the `CompactionServiceJobInfo`
- compaction_reason
- is_full_compaction
- is_manual_compaction
- bottommost_level

Added `is_remote_compaction` to the `CompactionJobStats` and set initial values to avoid UB for uninitialized values.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13029

Test Plan:
```
./compaction_service_test --gtest_filter="*CompactionInfo*"
```

Reviewed By: anand1976

Differential Revision: D63322878

Pulled By: jaykorean

fbshipit-source-id: f02a66ca45e660b9d354a43837d8ec6beb7621fb

db/compaction/compaction_job_test.cc
db/compaction/compaction_service_job.cc
db/compaction/compaction_service_test.cc
include/rocksdb/compaction_job_stats.h
include/rocksdb/options.h
util/compaction_job_stats_impl.cc

index e286817e6fedf3c8b787465500d63977c6f89820..f8a78bc9b6f14f3a4ee6e5cc6d66e960828e08db 100644 (file)
@@ -50,7 +50,8 @@ void VerifyInitializationOfCompactionJobStats(
   ASSERT_EQ(compaction_job_stats.num_output_records, 0U);
   ASSERT_EQ(compaction_job_stats.num_output_files, 0U);
 
-  ASSERT_EQ(compaction_job_stats.is_manual_compaction, true);
+  ASSERT_TRUE(compaction_job_stats.is_manual_compaction);
+  ASSERT_FALSE(compaction_job_stats.is_remote_compaction);
 
   ASSERT_EQ(compaction_job_stats.total_input_bytes, 0U);
   ASSERT_EQ(compaction_job_stats.total_output_bytes, 0U);
index 8a8db33627be7af300bd657affc4f58c468b2862..b34c7e662ba9144007db51067334751f5f8b0c95 100644 (file)
@@ -68,8 +68,11 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
       "[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
       compaction->column_family_data()->GetName().c_str(), job_id_,
       compaction_input.output_level, input_files_oss.str().c_str());
-  CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
-                                GetCompactionId(sub_compact), thread_pri_);
+  CompactionServiceJobInfo info(
+      dbname_, db_id_, db_session_id_, GetCompactionId(sub_compact),
+      thread_pri_, compaction->compaction_reason(),
+      compaction->is_full_compaction(), compaction->is_manual_compaction(),
+      compaction->bottommost_level());
   CompactionServiceScheduleResponse response =
       db_options_.compaction_service->Schedule(info, compaction_input_binary);
   switch (response.status) {
@@ -333,6 +336,7 @@ Status CompactionServiceCompactionJob::Run() {
   // Build compaction result
   compaction_result_->output_level = compact_->compaction->output_level();
   compaction_result_->output_path = output_path_;
+  compaction_result_->stats.is_remote_compaction = true;
   for (const auto& output_file : sub_compact->GetOutputs()) {
     auto& meta = output_file.meta;
     compaction_result_->output_files.emplace_back(
@@ -527,6 +531,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
          {offsetof(struct CompactionJobStats, is_manual_compaction),
           OptionType::kBoolean, OptionVerificationType::kNormal,
           OptionTypeFlags::kNone}},
+        {"is_remote_compaction",
+         {offsetof(struct CompactionJobStats, is_remote_compaction),
+          OptionType::kBoolean, OptionVerificationType::kNormal,
+          OptionTypeFlags::kNone}},
         {"total_input_bytes",
          {offsetof(struct CompactionJobStats, total_input_bytes),
           OptionType::kUInt64T, OptionVerificationType::kNormal,
index 8aacf2b6d2e8d8783e8ebd2d938e6720fffa6308..bb53a4029b64198b0504e42e3103ef13c8568a84 100644 (file)
@@ -21,8 +21,10 @@ class MyTestCompactionService : public CompactionService {
       : db_path_(std::move(db_path)),
         options_(options),
         statistics_(statistics),
-        start_info_("na", "na", "na", 0, Env::TOTAL),
-        wait_info_("na", "na", "na", 0, Env::TOTAL),
+        start_info_("na", "na", "na", 0, Env::TOTAL, CompactionReason::kUnknown,
+                    false, false, false),
+        wait_info_("na", "na", "na", 0, Env::TOTAL, CompactionReason::kUnknown,
+                   false, false, false),
         listeners_(listeners),
         table_properties_collector_factories_(
             std::move(table_properties_collector_factories)) {}
@@ -97,8 +99,12 @@ class MyTestCompactionService : public CompactionService {
     Status s =
         DB::OpenAndCompact(options, db_path_, db_path_ + "/" + scheduled_job_id,
                            compaction_input, result, options_override);
-    if (is_override_wait_result_) {
-      *result = override_wait_result_;
+    {
+      InstrumentedMutexLock l(&mutex_);
+      if (is_override_wait_result_) {
+        *result = override_wait_result_;
+      }
+      result_ = *result;
     }
     compaction_num_.fetch_add(1);
     if (s.ok()) {
@@ -141,6 +147,10 @@ class MyTestCompactionService : public CompactionService {
 
   void SetCanceled(bool canceled) { canceled_ = canceled; }
 
+  void GetResult(CompactionServiceResult* deserialized) {
+    CompactionServiceResult::Read(result_, deserialized).PermitUncheckedError();
+  }
+
   CompactionServiceJobStatus GetFinalCompactionServiceJobStatus() {
     return final_updated_status_.load();
   }
@@ -162,6 +172,7 @@ class MyTestCompactionService : public CompactionService {
   CompactionServiceJobStatus override_wait_status_ =
       CompactionServiceJobStatus::kFailure;
   bool is_override_wait_result_ = false;
+  std::string result_;
   std::string override_wait_result_;
   std::vector<std::shared_ptr<EventListener>> listeners_;
   std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
@@ -331,6 +342,14 @@ TEST_F(CompactionServiceTest, BasicCompactions) {
   ReopenWithColumnFamilies({kDefaultColumnFamilyName, "cf_1", "cf_2", "cf_3"},
                            options);
   ASSERT_GT(verify_passed, 0);
+  CompactionServiceResult result;
+  my_cs->GetResult(&result);
+  if (s.IsAborted()) {
+    ASSERT_NOK(result.status);
+  } else {
+    ASSERT_OK(result.status);
+  }
+  ASSERT_TRUE(result.stats.is_remote_compaction);
   Close();
 }
 
@@ -369,6 +388,12 @@ TEST_F(CompactionServiceTest, ManualCompaction) {
   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
   ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
   VerifyTestData();
+
+  CompactionServiceResult result;
+  my_cs->GetResult(&result);
+  ASSERT_OK(result.status);
+  ASSERT_TRUE(result.stats.is_manual_compaction);
+  ASSERT_TRUE(result.stats.is_remote_compaction);
 }
 
 TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) {
@@ -601,11 +626,20 @@ TEST_F(CompactionServiceTest, CompactionInfo) {
                               {file.db_path + "/" + file.name}, 2));
   info = my_cs->GetCompactionInfoForStart();
   ASSERT_EQ(Env::USER, info.priority);
+  ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason);
+  ASSERT_EQ(true, info.is_manual_compaction);
+  ASSERT_EQ(false, info.is_full_compaction);
+  ASSERT_EQ(true, info.bottommost_level);
   info = my_cs->GetCompactionInfoForWait();
   ASSERT_EQ(Env::USER, info.priority);
+  ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason);
+  ASSERT_EQ(true, info.is_manual_compaction);
+  ASSERT_EQ(false, info.is_full_compaction);
+  ASSERT_EQ(true, info.bottommost_level);
 
   // Test priority BOTTOM
   env_->SetBackgroundThreads(1, Env::BOTTOM);
+  // This will set bottommost_level = true but is_full_compaction = false
   options.num_levels = 2;
   ReopenWithCompactionService(&options);
   my_cs =
@@ -628,9 +662,71 @@ TEST_F(CompactionServiceTest, CompactionInfo) {
   }
   ASSERT_OK(dbfull()->TEST_WaitForCompact());
   info = my_cs->GetCompactionInfoForStart();
+  ASSERT_EQ(CompactionReason::kLevelL0FilesNum, info.compaction_reason);
+  ASSERT_EQ(false, info.is_manual_compaction);
+  ASSERT_EQ(false, info.is_full_compaction);
+  ASSERT_EQ(true, info.bottommost_level);
   ASSERT_EQ(Env::BOTTOM, info.priority);
   info = my_cs->GetCompactionInfoForWait();
   ASSERT_EQ(Env::BOTTOM, info.priority);
+  ASSERT_EQ(CompactionReason::kLevelL0FilesNum, info.compaction_reason);
+  ASSERT_EQ(false, info.is_manual_compaction);
+  ASSERT_EQ(false, info.is_full_compaction);
+  ASSERT_EQ(true, info.bottommost_level);
+
+  // Test Non-Bottommost Level
+  options.num_levels = 4;
+  ReopenWithCompactionService(&options);
+  my_cs =
+      static_cast_with_check<MyTestCompactionService>(GetCompactionService());
+
+  for (int i = 0; i < options.level0_file_num_compaction_trigger; i++) {
+    for (int j = 0; j < 10; j++) {
+      int key_id = i * 10 + j;
+      ASSERT_OK(Put(Key(key_id), "value_new_new" + std::to_string(key_id)));
+    }
+    ASSERT_OK(Flush());
+  }
+
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  info = my_cs->GetCompactionInfoForStart();
+  ASSERT_EQ(false, info.is_manual_compaction);
+  ASSERT_EQ(false, info.is_full_compaction);
+  ASSERT_EQ(false, info.bottommost_level);
+  info = my_cs->GetCompactionInfoForWait();
+  ASSERT_EQ(false, info.is_manual_compaction);
+  ASSERT_EQ(false, info.is_full_compaction);
+  ASSERT_EQ(false, info.bottommost_level);
+
+  // Test Full Compaction + Bottommost Level
+  options.num_levels = 6;
+  ReopenWithCompactionService(&options);
+  my_cs =
+      static_cast_with_check<MyTestCompactionService>(GetCompactionService());
+
+  for (int i = 0; i < 20; i++) {
+    for (int j = 0; j < 10; j++) {
+      int key_id = i * 10 + j;
+      ASSERT_OK(Put(Key(key_id), "value_new_new" + std::to_string(key_id)));
+    }
+    ASSERT_OK(Flush());
+  }
+
+  CompactRangeOptions cro;
+  cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+  ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  info = my_cs->GetCompactionInfoForStart();
+  ASSERT_EQ(true, info.is_manual_compaction);
+  ASSERT_EQ(true, info.is_full_compaction);
+  ASSERT_EQ(true, info.bottommost_level);
+  ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason);
+  info = my_cs->GetCompactionInfoForWait();
+  ASSERT_EQ(true, info.is_manual_compaction);
+  ASSERT_EQ(true, info.is_full_compaction);
+  ASSERT_EQ(true, info.bottommost_level);
+  ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason);
 }
 
 TEST_F(CompactionServiceTest, FallbackLocalAuto) {
index 7e81530443643daf071dbdad2241e60199044ca7..8ddb6a330d108ae686193a7840aa6af7a266812e 100644 (file)
@@ -19,80 +19,83 @@ struct CompactionJobStats {
   void Add(const CompactionJobStats& stats);
 
   // the elapsed time of this compaction in microseconds.
-  uint64_t elapsed_micros;
+  uint64_t elapsed_micros = 0;
 
   // the elapsed CPU time of this compaction in microseconds.
-  uint64_t cpu_micros;
+  uint64_t cpu_micros = 0;
 
   // Used internally indicating whether a subcompaction's
   // `num_input_records` is accurate.
-  bool has_num_input_records;
+  bool has_num_input_records = false;
   // the number of compaction input records.
-  uint64_t num_input_records;
+  uint64_t num_input_records = 0;
   // the number of blobs read from blob files
-  uint64_t num_blobs_read;
+  uint64_t num_blobs_read = 0;
   // the number of compaction input files (table files)
-  size_t num_input_files;
+  size_t num_input_files = 0;
   // the number of compaction input files at the output level (table files)
-  size_t num_input_files_at_output_level;
+  size_t num_input_files_at_output_level = 0;
 
   // the number of compaction output records.
-  uint64_t num_output_records;
+  uint64_t num_output_records = 0;
   // the number of compaction output files (table files)
-  size_t num_output_files;
+  size_t num_output_files = 0;
   // the number of compaction output files (blob files)
-  size_t num_output_files_blob;
+  size_t num_output_files_blob = 0;
 
   // true if the compaction is a full compaction (all live SST files input)
-  bool is_full_compaction;
+  bool is_full_compaction = false;
   // true if the compaction is a manual compaction
-  bool is_manual_compaction;
+  bool is_manual_compaction = false;
+  // true if the compaction ran in a remote worker
+  bool is_remote_compaction = false;
 
   // the total size of table files in the compaction input
-  uint64_t total_input_bytes;
+  uint64_t total_input_bytes = 0;
   // the total size of blobs read from blob files
-  uint64_t total_blob_bytes_read;
+  uint64_t total_blob_bytes_read = 0;
   // the total size of table files in the compaction output
-  uint64_t total_output_bytes;
+  uint64_t total_output_bytes = 0;
   // the total size of blob files in the compaction output
-  uint64_t total_output_bytes_blob;
+  uint64_t total_output_bytes_blob = 0;
+  ;
 
   // number of records being replaced by newer record associated with same key.
   // this could be a new value or a deletion entry for that key so this field
   // sums up all updated and deleted keys
-  uint64_t num_records_replaced;
+  uint64_t num_records_replaced = 0;
 
   // the sum of the uncompressed input keys in bytes.
-  uint64_t total_input_raw_key_bytes;
+  uint64_t total_input_raw_key_bytes = 0;
   // the sum of the uncompressed input values in bytes.
-  uint64_t total_input_raw_value_bytes;
+  uint64_t total_input_raw_value_bytes = 0;
 
   // the number of deletion entries before compaction. Deletion entries
   // can disappear after compaction because they expired
-  uint64_t num_input_deletion_records;
+  uint64_t num_input_deletion_records = 0;
   // number of deletion records that were found obsolete and discarded
   // because it is not possible to delete any more keys with this entry
   // (i.e. all possible deletions resulting from it have been completed)
-  uint64_t num_expired_deletion_records;
+  uint64_t num_expired_deletion_records = 0;
 
   // number of corrupt keys (ParseInternalKey returned false when applied to
   // the key) encountered and written out.
-  uint64_t num_corrupt_keys;
+  uint64_t num_corrupt_keys = 0;
 
   // Following counters are only populated if
   // options.report_bg_io_stats = true;
 
   // Time spent on file's Append() call.
-  uint64_t file_write_nanos;
+  uint64_t file_write_nanos = 0;
 
   // Time spent on sync file range.
-  uint64_t file_range_sync_nanos;
+  uint64_t file_range_sync_nanos = 0;
 
   // Time spent on file fsync.
-  uint64_t file_fsync_nanos;
+  uint64_t file_fsync_nanos = 0;
 
   // Time spent on preparing file write (fallocate, etc)
-  uint64_t file_prepare_write_nanos;
+  uint64_t file_prepare_write_nanos = 0;
 
   // 0-terminated strings storing the first 8 bytes of the smallest and
   // largest key in the output.
@@ -102,10 +105,10 @@ struct CompactionJobStats {
   std::string largest_output_key_prefix;
 
   // number of single-deletes which do not meet a put
-  uint64_t num_single_del_fallthru;
+  uint64_t num_single_del_fallthru = 0;
 
   // number of single-deletes which meet something other than a put
-  uint64_t num_single_del_mismatch;
+  uint64_t num_single_del_mismatch = 0;
 
   // TODO: Add output_to_penultimate_level output information
 };
index e272e3a69ad597e5b70cc5166f997f530e7e1bd2..9700e25af55dbb67364bc4e920ab71be3dcbd11e 100644 (file)
@@ -466,14 +466,27 @@ struct CompactionServiceJobInfo {
 
   Env::Priority priority;
 
+  // Additional Compaction Details that can be useful in the CompactionService
+  CompactionReason compaction_reason;
+  bool is_full_compaction;
+  bool is_manual_compaction;
+  bool bottommost_level;
+
   CompactionServiceJobInfo(std::string db_name_, std::string db_id_,
                            std::string db_session_id_, uint64_t job_id_,
-                           Env::Priority priority_)
+                           Env::Priority priority_,
+                           CompactionReason compaction_reason_,
+                           bool is_full_compaction_, bool is_manual_compaction_,
+                           bool bottommost_level_)
       : db_name(std::move(db_name_)),
         db_id(std::move(db_id_)),
         db_session_id(std::move(db_session_id_)),
         job_id(job_id_),
-        priority(priority_) {}
+        priority(priority_),
+        compaction_reason(compaction_reason_),
+        is_full_compaction(is_full_compaction_),
+        is_manual_compaction(is_manual_compaction_),
+        bottommost_level(bottommost_level_) {}
 };
 
 struct CompactionServiceScheduleResponse {
index cdb591f23ca1de57be5332716c2329706b5f7623..37e39987e06d2dd1ef873001df6a4a3ce7f44c3a 100644 (file)
@@ -24,6 +24,7 @@ void CompactionJobStats::Reset() {
 
   is_full_compaction = false;
   is_manual_compaction = false;
+  is_remote_compaction = false;
 
   total_input_bytes = 0;
   total_blob_bytes_read = 0;