]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Consolidate manual_compaction_paused_ check (#10070)
authorzczhu <>
Tue, 7 Jun 2022 01:32:26 +0000 (18:32 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Tue, 7 Jun 2022 01:32:26 +0000 (18:32 -0700)
Summary:
As pointed out by [https://github.com/facebook/rocksdb/pull/8351#discussion_r645765422](https://github.com/facebook/rocksdb/pull/8351#discussion_r645765422), check `manual_compaction_paused` and `manual_compaction_canceled` can be reduced by setting `*canceled` to be true in `DisableManualCompaction()` and `*canceled` to be false in the last time calling `EnableManualCompaction()`.

Changed Tests: The origin `DBTest2.PausingManualCompaction1` uses a callback function to increase `manual_compaction_paused` and the origin CompactionJob/CompactionIterator with `manual_compaction_paused` can detect this. I changed the callback function so that it sets `*canceled` as true if `canceled` is not `nullptr` (to notify CompactionJob/CompactionIterator the compaction has been canceled).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10070

Test Plan: This change does not introduce new features, but some slight difference in compaction implementation. Run the same manual compaction unit tests as before (e.g., PausingManualCompaction[1-4], CancelManualCompaction[1-2], CancelManualCompactionWithListener in db_test2, and db_compaction_test).

Reviewed By: ajkr

Differential Revision: D36949133

Pulled By: littlepig2013

fbshipit-source-id: c5dc4c956fbf8f624003a0f5ad2690240063a821

13 files changed:
db/builder.cc
db/compaction/compaction_iterator.cc
db/compaction/compaction_iterator.h
db/compaction/compaction_iterator_test.cc
db/compaction/compaction_job.cc
db/compaction/compaction_job.h
db/compaction/compaction_job_test.cc
db/db_impl/db_impl.h
db/db_impl/db_impl_compaction_flush.cc
db/db_impl/db_impl_secondary.cc
db/db_test2.cc
db/flush_job.cc
include/rocksdb/options.h

index 2d2a8667602357ca9b69d83ca801e7344475bf7c..f0c24de6cd6e14e99215e980c12446b90d041810 100644 (file)
@@ -193,6 +193,7 @@ Status BuildTable(
                   &blob_file_paths, blob_file_additions)
             : nullptr);
 
+    const std::atomic<bool> kManualCompactionCanceledFalse{false};
     CompactionIterator c_iter(
         iter, tboptions.internal_comparator.user_comparator(), &merge,
         kMaxSequenceNumber, &snapshots, earliest_write_conflict_snapshot,
@@ -201,11 +202,9 @@ Status BuildTable(
         true /* internal key corruption is not ok */, range_del_agg.get(),
         blob_file_builder.get(), ioptions.allow_data_in_errors,
         ioptions.enforce_single_del_contracts,
+        /*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
         /*compaction=*/nullptr, compaction_filter.get(),
-        /*shutting_down=*/nullptr,
-        /*manual_compaction_paused=*/nullptr,
-        /*manual_compaction_canceled=*/nullptr, db_options.info_log,
-        full_history_ts_low);
+        /*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low);
 
     c_iter.SeekToFirst();
     for (; c_iter.Valid(); c_iter.Next()) {
index 327f1f1218a8e36d438bccff8e6b3338ba53823b..8d534cf7f58ab1ab47faa617c3436b8ac95d43cc 100644 (file)
@@ -28,11 +28,10 @@ CompactionIterator::CompactionIterator(
     Env* env, bool report_detailed_time, bool expect_valid_internal_key,
     CompactionRangeDelAggregator* range_del_agg,
     BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
-    bool enforce_single_del_contracts, const Compaction* compaction,
-    const CompactionFilter* compaction_filter,
+    bool enforce_single_del_contracts,
+    const std::atomic<bool>& manual_compaction_canceled,
+    const Compaction* compaction, const CompactionFilter* compaction_filter,
     const std::atomic<bool>* shutting_down,
-    const std::atomic<int>* manual_compaction_paused,
-    const std::atomic<bool>* manual_compaction_canceled,
     const std::shared_ptr<Logger> info_log,
     const std::string* full_history_ts_low)
     : CompactionIterator(
@@ -40,10 +39,10 @@ CompactionIterator::CompactionIterator(
           earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
           report_detailed_time, expect_valid_internal_key, range_del_agg,
           blob_file_builder, allow_data_in_errors, enforce_single_del_contracts,
+          manual_compaction_canceled,
           std::unique_ptr<CompactionProxy>(
               compaction ? new RealCompaction(compaction) : nullptr),
-          compaction_filter, shutting_down, manual_compaction_paused,
-          manual_compaction_canceled, info_log, full_history_ts_low) {}
+          compaction_filter, shutting_down, info_log, full_history_ts_low) {}
 
 CompactionIterator::CompactionIterator(
     InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
@@ -54,11 +53,10 @@ CompactionIterator::CompactionIterator(
     CompactionRangeDelAggregator* range_del_agg,
     BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
     bool enforce_single_del_contracts,
+    const std::atomic<bool>& manual_compaction_canceled,
     std::unique_ptr<CompactionProxy> compaction,
     const CompactionFilter* compaction_filter,
     const std::atomic<bool>* shutting_down,
-    const std::atomic<int>* manual_compaction_paused,
-    const std::atomic<bool>* manual_compaction_canceled,
     const std::shared_ptr<Logger> info_log,
     const std::string* full_history_ts_low)
     : input_(input, cmp,
@@ -78,7 +76,6 @@ CompactionIterator::CompactionIterator(
       compaction_(std::move(compaction)),
       compaction_filter_(compaction_filter),
       shutting_down_(shutting_down),
-      manual_compaction_paused_(manual_compaction_paused),
       manual_compaction_canceled_(manual_compaction_canceled),
       info_log_(info_log),
       allow_data_in_errors_(allow_data_in_errors),
index 0249191508eb6fab07d68d544a58a52913395533..acc6f417f45191341e1e942112e383f9e37cb88c 100644 (file)
@@ -167,39 +167,42 @@ class CompactionIterator {
     const Compaction* compaction_;
   };
 
-  CompactionIterator(
-      InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
-      SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
-      SequenceNumber earliest_write_conflict_snapshot,
-      SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
-      Env* env, bool report_detailed_time, bool expect_valid_internal_key,
-      CompactionRangeDelAggregator* range_del_agg,
-      BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
-      bool enforce_single_del_contracts, const Compaction* compaction = nullptr,
-      const CompactionFilter* compaction_filter = nullptr,
-      const std::atomic<bool>* shutting_down = nullptr,
-      const std::atomic<int>* manual_compaction_paused = nullptr,
-      const std::atomic<bool>* manual_compaction_canceled = nullptr,
-      const std::shared_ptr<Logger> info_log = nullptr,
-      const std::string* full_history_ts_low = nullptr);
+  CompactionIterator(InternalIterator* input, const Comparator* cmp,
+                     MergeHelper* merge_helper, SequenceNumber last_sequence,
+                     std::vector<SequenceNumber>* snapshots,
+                     SequenceNumber earliest_write_conflict_snapshot,
+                     SequenceNumber job_snapshot,
+                     const SnapshotChecker* snapshot_checker, Env* env,
+                     bool report_detailed_time, bool expect_valid_internal_key,
+                     CompactionRangeDelAggregator* range_del_agg,
+                     BlobFileBuilder* blob_file_builder,
+                     bool allow_data_in_errors,
+                     bool enforce_single_del_contracts,
+                     const std::atomic<bool>& manual_compaction_canceled,
+                     const Compaction* compaction = nullptr,
+                     const CompactionFilter* compaction_filter = nullptr,
+                     const std::atomic<bool>* shutting_down = nullptr,
+                     const std::shared_ptr<Logger> info_log = nullptr,
+                     const std::string* full_history_ts_low = nullptr);
 
   // Constructor with custom CompactionProxy, used for tests.
-  CompactionIterator(
-      InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
-      SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
-      SequenceNumber earliest_write_conflict_snapshot,
-      SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
-      Env* env, bool report_detailed_time, bool expect_valid_internal_key,
-      CompactionRangeDelAggregator* range_del_agg,
-      BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
-      bool enforce_single_del_contracts,
-      std::unique_ptr<CompactionProxy> compaction,
-      const CompactionFilter* compaction_filter = nullptr,
-      const std::atomic<bool>* shutting_down = nullptr,
-      const std::atomic<int>* manual_compaction_paused = nullptr,
-      const std::atomic<bool>* manual_compaction_canceled = nullptr,
-      const std::shared_ptr<Logger> info_log = nullptr,
-      const std::string* full_history_ts_low = nullptr);
+  CompactionIterator(InternalIterator* input, const Comparator* cmp,
+                     MergeHelper* merge_helper, SequenceNumber last_sequence,
+                     std::vector<SequenceNumber>* snapshots,
+                     SequenceNumber earliest_write_conflict_snapshot,
+                     SequenceNumber job_snapshot,
+                     const SnapshotChecker* snapshot_checker, Env* env,
+                     bool report_detailed_time, bool expect_valid_internal_key,
+                     CompactionRangeDelAggregator* range_del_agg,
+                     BlobFileBuilder* blob_file_builder,
+                     bool allow_data_in_errors,
+                     bool enforce_single_del_contracts,
+                     const std::atomic<bool>& manual_compaction_canceled,
+                     std::unique_ptr<CompactionProxy> compaction,
+                     const CompactionFilter* compaction_filter = nullptr,
+                     const std::atomic<bool>* shutting_down = nullptr,
+                     const std::shared_ptr<Logger> info_log = nullptr,
+                     const std::string* full_history_ts_low = nullptr);
 
   ~CompactionIterator();
 
@@ -320,8 +323,7 @@ class CompactionIterator {
   std::unique_ptr<CompactionProxy> compaction_;
   const CompactionFilter* compaction_filter_;
   const std::atomic<bool>* shutting_down_;
-  const std::atomic<int>* manual_compaction_paused_;
-  const std::atomic<bool>* manual_compaction_canceled_;
+  const std::atomic<bool>& manual_compaction_canceled_;
   bool bottommost_level_;
   bool valid_ = false;
   bool visible_at_tip_;
@@ -426,10 +428,7 @@ class CompactionIterator {
 
   bool IsPausingManualCompaction() {
     // This is a best-effort facility, so memory_order_relaxed is sufficient.
-    return (manual_compaction_paused_ &&
-            manual_compaction_paused_->load(std::memory_order_relaxed) > 0) ||
-           (manual_compaction_canceled_ &&
-            manual_compaction_canceled_->load(std::memory_order_relaxed));
+    return manual_compaction_canceled_.load(std::memory_order_relaxed);
   }
 };
 
index 79f598c62e02762647624e8ed157583cec8881aa..a5d8d30c3578d2fb6f736f65c71614d3a58d03d3 100644 (file)
@@ -279,10 +279,9 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
         snapshot_checker_.get(), Env::Default(),
         false /* report_detailed_time */, false, range_del_agg_.get(),
         nullptr /* blob_file_builder */, true /*allow_data_in_errors*/,
-        true /*enforce_single_del_contracts*/, std::move(compaction), filter,
-        &shutting_down_,
-        /*manual_compaction_paused=*/nullptr,
-        /*manual_compaction_canceled=*/nullptr, /*info_log=*/nullptr,
+        true /*enforce_single_del_contracts*/,
+        /*manual_compaction_canceled=*/kManualCompactionCanceledFalse_,
+        std::move(compaction), filter, &shutting_down_, /*info_log=*/nullptr,
         full_history_ts_low));
   }
 
@@ -341,6 +340,7 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
   std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_;
   std::unique_ptr<SnapshotChecker> snapshot_checker_;
   std::atomic<bool> shutting_down_{false};
+  const std::atomic<bool> kManualCompactionCanceledFalse_{false};
   FakeCompaction* compaction_proxy_;
 };
 
index bf1895ddcbb351971064d04e533f2ffc879be0d2..38b015bb61d1c2f814033b3beada9b129dbeb5b0 100644 (file)
@@ -429,8 +429,7 @@ CompactionJob::CompactionJob(
     bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
     CompactionJobStats* compaction_job_stats, Env::Priority thread_pri,
     const std::shared_ptr<IOTracer>& io_tracer,
-    const std::atomic<int>* manual_compaction_paused,
-    const std::atomic<bool>* manual_compaction_canceled,
+    const std::atomic<bool>& manual_compaction_canceled,
     const std::string& db_id, const std::string& db_session_id,
     std::string full_history_ts_low, std::string trim_ts,
     BlobFileCompletionCallback* blob_callback)
@@ -456,7 +455,6 @@ CompactionJob::CompactionJob(
           fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
       versions_(versions),
       shutting_down_(shutting_down),
-      manual_compaction_paused_(manual_compaction_paused),
       manual_compaction_canceled_(manual_compaction_canceled),
       db_directory_(db_directory),
       blob_output_directory_(blob_output_directory),
@@ -1256,8 +1254,8 @@ void CompactionJob::NotifyOnSubcompactionBegin(
   if (shutting_down_->load(std::memory_order_acquire)) {
     return;
   }
-  if (c->is_manual_compaction() && manual_compaction_paused_ &&
-      manual_compaction_paused_->load(std::memory_order_acquire) > 0) {
+  if (c->is_manual_compaction() &&
+      manual_compaction_canceled_.load(std::memory_order_acquire)) {
     return;
   }
 
@@ -1470,7 +1468,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
   TEST_SYNC_POINT_CALLBACK(
       "CompactionJob::Run():PausingManualCompaction:1",
       reinterpret_cast<void*>(
-          const_cast<std::atomic<int>*>(manual_compaction_paused_)));
+          const_cast<std::atomic<bool>*>(&manual_compaction_canceled_)));
 
   Status status;
   const std::string* const full_history_ts_low =
@@ -1484,9 +1482,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
       snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
       /*expect_valid_internal_key=*/true, &range_del_agg,
       blob_file_builder.get(), db_options_.allow_data_in_errors,
-      db_options_.enforce_single_del_contracts, sub_compact->compaction,
-      compaction_filter, shutting_down_, manual_compaction_paused_,
-      manual_compaction_canceled_, db_options_.info_log, full_history_ts_low));
+      db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
+      sub_compact->compaction, compaction_filter, shutting_down_,
+      db_options_.info_log, full_history_ts_low));
   auto c_iter = sub_compact->c_iter.get();
   c_iter->SeekToFirst();
   if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
@@ -1568,7 +1566,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
     TEST_SYNC_POINT_CALLBACK(
         "CompactionJob::Run():PausingManualCompaction:2",
         reinterpret_cast<void*>(
-            const_cast<std::atomic<int>*>(manual_compaction_paused_)));
+            const_cast<std::atomic<bool>*>(&manual_compaction_canceled_)));
     if (partitioner.get()) {
       last_key_for_partitioner.assign(c_iter->user_key().data_,
                                       c_iter->user_key().size_);
@@ -1647,10 +1645,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
     status = Status::ShutdownInProgress("Database shutdown");
   }
   if ((status.ok() || status.IsColumnFamilyDropped()) &&
-      ((manual_compaction_paused_ &&
-        manual_compaction_paused_->load(std::memory_order_relaxed) > 0) ||
-       (manual_compaction_canceled_ &&
-        manual_compaction_canceled_->load(std::memory_order_relaxed)))) {
+      (manual_compaction_canceled_.load(std::memory_order_relaxed))) {
     status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
   }
   if (status.ok()) {
@@ -2527,7 +2522,7 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob(
     std::vector<SequenceNumber> existing_snapshots,
     std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
     const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
-    const std::atomic<bool>* manual_compaction_canceled,
+    const std::atomic<bool>& manual_compaction_canceled,
     const std::string& db_id, const std::string& db_session_id,
     const std::string& output_path,
     const CompactionServiceInput& compaction_service_input,
@@ -2540,7 +2535,7 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob(
           compaction->mutable_cf_options()->paranoid_file_checks,
           compaction->mutable_cf_options()->report_bg_io_stats, dbname,
           &(compaction_service_result->stats), Env::Priority::USER, io_tracer,
-          nullptr, manual_compaction_canceled, db_id, db_session_id,
+          manual_compaction_canceled, db_id, db_session_id,
           compaction->column_family_data()->GetFullHistoryTsLow()),
       output_path_(output_path),
       compaction_input_(compaction_service_input),
index 17d6f1051ec91051d0616756392cebb4349ad82e..a8f0e4eeb8c61c4fa1e6bdcd3a947e6690a0a208 100644 (file)
@@ -78,8 +78,7 @@ class CompactionJob {
       bool paranoid_file_checks, bool measure_io_stats,
       const std::string& dbname, CompactionJobStats* compaction_job_stats,
       Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
-      const std::atomic<int>* manual_compaction_paused = nullptr,
-      const std::atomic<bool>* manual_compaction_canceled = nullptr,
+      const std::atomic<bool>& manual_compaction_canceled,
       const std::string& db_id = "", const std::string& db_session_id = "",
       std::string full_history_ts_low = "", std::string trim_ts = "",
       BlobFileCompletionCallback* blob_callback = nullptr);
@@ -195,8 +194,7 @@ class CompactionJob {
   FileOptions file_options_for_read_;
   VersionSet* versions_;
   const std::atomic<bool>* shutting_down_;
-  const std::atomic<int>* manual_compaction_paused_;
-  const std::atomic<bool>* manual_compaction_canceled_;
+  const std::atomic<bool>& manual_compaction_canceled_;
   FSDirectory* db_directory_;
   FSDirectory* blob_output_directory_;
   InstrumentedMutex* db_mutex_;
@@ -357,7 +355,7 @@ class CompactionServiceCompactionJob : private CompactionJob {
       std::vector<SequenceNumber> existing_snapshots,
       std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
       const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
-      const std::atomic<bool>* manual_compaction_canceled,
+      const std::atomic<bool>& manual_compaction_canceled,
       const std::string& db_id, const std::string& db_session_id,
       const std::string& output_path,
       const CompactionServiceInput& compaction_service_input,
index 403fd1877ea2635b878234d092d6227f451b5d7a..a704226d4e4121eacb63a358b0320dc76bd203c0 100644 (file)
@@ -353,6 +353,7 @@ class CompactionJobTestBase : public testing::Test {
     SnapshotChecker* snapshot_checker = nullptr;
     ASSERT_TRUE(full_history_ts_low_.empty() ||
                 ucmp_->timestamp_size() == full_history_ts_low_.size());
+    const std::atomic<bool> kManualCompactionCanceledFalse{false};
     CompactionJob compaction_job(
         0, &compaction, db_options_, mutable_db_options_, env_options_,
         versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr,
@@ -360,9 +361,9 @@ class CompactionJobTestBase : public testing::Test {
         earliest_write_conflict_snapshot, snapshot_checker, nullptr,
         table_cache_, &event_logger, false, false, dbname_,
         &compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */,
-        /*manual_compaction_paused=*/nullptr,
-        /*manual_compaction_canceled=*/nullptr, env_->GenerateUniqueId(),
-        DBImpl::GenerateDbSessionId(nullptr), full_history_ts_low_);
+        /*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
+        env_->GenerateUniqueId(), DBImpl::GenerateDbSessionId(nullptr),
+        full_history_ts_low_);
     VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
 
     compaction_job.Prepare();
index 880d30a916ef8a521a7a437beecad501f9978252..4e192e8316449a1e30399b74758b1692c9bf8ae2 100644 (file)
@@ -1216,6 +1216,9 @@ class DBImpl : public DB {
   InstrumentedMutex trace_mutex_;
   BlockCacheTracer block_cache_tracer_;
 
+  // constant false canceled flag, used when the compaction is not manual
+  const std::atomic<bool> kManualCompactionCanceledFalse_{false};
+
   // State below is protected by mutex_
   // With two_write_queues enabled, some of the variables that accessed during
   // WriteToWAL need different synchronization: log_empty_, alive_log_files_,
@@ -1603,7 +1606,11 @@ class DBImpl : public DB {
           output_path_id(_output_path_id),
           exclusive(_exclusive),
           disallow_trivial_move(_disallow_trivial_move),
-          canceled(_canceled) {}
+          canceled(_canceled ? *_canceled : canceled_internal_storage) {}
+    // When _canceled is not provided by ther user, we assign the reference of
+    // canceled_internal_storage to it to consolidate canceled and
+    // manual_compaction_paused since DisableManualCompaction() might be
+    // called
 
     ColumnFamilyData* cfd;
     int input_level;
@@ -1620,7 +1627,12 @@ class DBImpl : public DB {
     InternalKey* manual_end = nullptr;   // how far we are compacting
     InternalKey tmp_storage;      // Used to keep track of compaction progress
     InternalKey tmp_storage1;     // Used to keep track of compaction progress
-    std::atomic<bool>* canceled;  // Compaction canceled by the user?
+
+    // When the user provides a canceled pointer in CompactRangeOptions, the
+    // above varaibe is the reference of the user-provided
+    // `canceled`, otherwise, it is the reference of canceled_internal_storage
+    std::atomic<bool> canceled_internal_storage = false;
+    std::atomic<bool>& canceled;  // Compaction canceled pointer reference
   };
   struct PrepickedCompaction {
     // background compaction takes ownership of `compaction`.
index f552310e81bbc7922e332273ee908674fb25bedf..c84a745a6e60c46f2d83c867fe183e241a1bdd9f 100644 (file)
@@ -1217,6 +1217,10 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
 
   // Perform CompactFiles
   TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
+  TEST_SYNC_POINT_CALLBACK(
+      "TestCompactFiles:PausingManualCompaction:3",
+      reinterpret_cast<void*>(
+          const_cast<std::atomic<int>*>(&manual_compaction_paused_)));
   {
     InstrumentedMutexLock l(&mutex_);
 
@@ -1372,7 +1376,7 @@ Status DBImpl::CompactFilesImpl(
       c->mutable_cf_options()->paranoid_file_checks,
       c->mutable_cf_options()->report_bg_io_stats, dbname_,
       &compaction_job_stats, Env::Priority::USER, io_tracer_,
-      &manual_compaction_paused_, nullptr, db_id_, db_session_id_,
+      kManualCompactionCanceledFalse_, db_id_, db_session_id_,
       c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(),
       &blob_callback_);
 
@@ -1838,8 +1842,7 @@ Status DBImpl::RunManualCompaction(
     // and `CompactRangeOptions::canceled` might not work well together.
     while (bg_bottom_compaction_scheduled_ > 0 ||
            bg_compaction_scheduled_ > 0) {
-      if (manual_compaction_paused_ > 0 ||
-          (manual.canceled != nullptr && *manual.canceled == true)) {
+      if (manual_compaction_paused_ > 0 || manual.canceled == true) {
         // Pretend the error came from compaction so the below cleanup/error
         // handling code can process it.
         manual.done = true;
@@ -2376,10 +2379,18 @@ Status DBImpl::EnableAutoCompaction(
   return s;
 }
 
+// NOTE: Calling DisableManualCompaction() may overwrite the
+// user-provided canceled variable in CompactRangeOptions
 void DBImpl::DisableManualCompaction() {
   InstrumentedMutexLock l(&mutex_);
   manual_compaction_paused_.fetch_add(1, std::memory_order_release);
 
+  // Mark the canceled as true when the cancellation is triggered by
+  // manual_compaction_paused (may overwrite user-provided `canceled`)
+  for (const auto& manual_compaction : manual_compaction_dequeue_) {
+    manual_compaction->canceled = true;
+  }
+
   // Wake up manual compactions waiting to start.
   bg_cv_.SignalAll();
 
@@ -2392,6 +2403,11 @@ void DBImpl::DisableManualCompaction() {
   }
 }
 
+// NOTE: In contrast to DisableManualCompaction(), calling
+// EnableManualCompaction() does NOT overwrite the user-provided *canceled
+// variable to be false since there is NO CHANCE a canceled compaction
+// is uncanceled. In other words, a canceled compaction must have been
+// dropped out of the manual compaction queue, when we disable it.
 void DBImpl::EnableManualCompaction() {
   InstrumentedMutexLock l(&mutex_);
   assert(manual_compaction_paused_ > 0);
@@ -3037,10 +3053,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
     if (shutting_down_.load(std::memory_order_acquire)) {
       status = Status::ShutdownInProgress();
     } else if (is_manual &&
-               manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
-      status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
-    } else if (is_manual && manual_compaction->canceled &&
-               manual_compaction->canceled->load(std::memory_order_acquire)) {
+               manual_compaction->canceled.load(std::memory_order_acquire)) {
       status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
     }
   } else {
@@ -3357,6 +3370,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
     GetSnapshotContext(job_context, &snapshot_seqs,
                        &earliest_write_conflict_snapshot, &snapshot_checker);
     assert(is_snapshot_supported_ || snapshots_.empty());
+
     CompactionJob compaction_job(
         job_context->job_id, c.get(), immutable_db_options_,
         mutable_db_options_, file_options_for_compaction_, versions_.get(),
@@ -3368,9 +3382,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
         c->mutable_cf_options()->paranoid_file_checks,
         c->mutable_cf_options()->report_bg_io_stats, dbname_,
         &compaction_job_stats, thread_pri, io_tracer_,
-        is_manual ? &manual_compaction_paused_ : nullptr,
-        is_manual ? manual_compaction->canceled : nullptr, db_id_,
-        db_session_id_, c->column_family_data()->GetFullHistoryTsLow(),
+        is_manual ? manual_compaction->canceled
+                  : kManualCompactionCanceledFalse_,
+        db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow(),
         c->trim_ts(), &blob_callback_);
     compaction_job.Prepare();
 
index ab75819e44337668db7657b0b6c950fd5089db2d..55fd72acd179e9d0d696b3d1ab94959537d14782 100644 (file)
@@ -820,8 +820,8 @@ Status DBImplSecondary::CompactWithoutInstallation(
       file_options_for_compaction_, versions_.get(), &shutting_down_,
       &log_buffer, output_dir.get(), stats_, &mutex_, &error_handler_,
       input.snapshots, table_cache_, &event_logger_, dbname_, io_tracer_,
-      options.canceled, input.db_id, db_session_id_, secondary_path_, input,
-      result);
+      options.canceled ? *options.canceled : kManualCompactionCanceledFalse_,
+      input.db_id, db_session_id_, secondary_path_, input, result);
 
   mutex_.Unlock();
   s = compaction_job.Run();
index 52bc1a78dd75af8ffdf038a88c530d45be44f26c..493e01f357a10ea5113873d56714c3d60032ebf8 100644 (file)
@@ -3081,10 +3081,21 @@ TEST_F(DBTest2, PausingManualCompaction1) {
   int manual_compactions_paused = 0;
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
       "CompactionJob::Run():PausingManualCompaction:1", [&](void* arg) {
+        auto canceled = static_cast<std::atomic<bool>*>(arg);
+        // CompactRange triggers manual compaction and cancel the compaction
+        // by set *canceled as true
+        if (canceled != nullptr) {
+          canceled->store(true, std::memory_order_release);
+        }
+        manual_compactions_paused += 1;
+      });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "TestCompactFiles:PausingManualCompaction:3", [&](void* arg) {
         auto paused = static_cast<std::atomic<int>*>(arg);
+        // CompactFiles() relies on manual_compactions_paused to
+        // determine if thie compaction should be paused or not
         ASSERT_EQ(0, paused->load(std::memory_order_acquire));
         paused->fetch_add(1, std::memory_order_release);
-        manual_compactions_paused += 1;
       });
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
@@ -3149,7 +3160,7 @@ TEST_F(DBTest2, PausingManualCompaction2) {
 
   Random rnd(301);
   for (int i = 0; i < 2; i++) {
-    // Generate a file containing 10 keys.
+    // Generate a file containing 100 keys.
     for (int j = 0; j < 100; j++) {
       ASSERT_OK(Put(Key(j), rnd.RandomString(50)));
     }
@@ -3248,10 +3259,21 @@ TEST_F(DBTest2, PausingManualCompaction4) {
   int run_manual_compactions = 0;
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
       "CompactionJob::Run():PausingManualCompaction:2", [&](void* arg) {
+        auto canceled = static_cast<std::atomic<bool>*>(arg);
+        // CompactRange triggers manual compaction and cancel the compaction
+        // by set *canceled as true
+        if (canceled != nullptr) {
+          canceled->store(true, std::memory_order_release);
+        }
+        run_manual_compactions++;
+      });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "TestCompactFiles:PausingManualCompaction:3", [&](void* arg) {
         auto paused = static_cast<std::atomic<int>*>(arg);
+        // CompactFiles() relies on manual_compactions_paused to
+        // determine if thie compaction should be paused or not
         ASSERT_EQ(0, paused->load(std::memory_order_acquire));
         paused->fetch_add(1, std::memory_order_release);
-        run_manual_compactions++;
       });
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
@@ -3266,7 +3288,6 @@ TEST_F(DBTest2, PausingManualCompaction4) {
 
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
       "CompactionJob::Run():PausingManualCompaction:2");
-  dbfull()->EnableManualCompaction();
   ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
   ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
 #ifndef ROCKSDB_LITE
@@ -3515,8 +3536,9 @@ TEST_F(DBTest2, CancelManualCompactionWithListener) {
       "CompactionJob::FinishCompactionOutputFile1",
       [&](void* /*arg*/) { running_compaction++; });
 
-  // Case I: 1 Notify begin compaction, 2 DisableManualCompaction, 3 Compaction
-  // not run, 4 Notify compaction end.
+  // Case I: 1 Notify begin compaction, 2 Set *canceled as true to disable
+  // manual compaction in the callback function, 3 Compaction not run,
+  // 4 Notify compaction end.
   listener->code_ = Status::kIncomplete;
   listener->subcode_ = Status::SubCode::kManualCompactionPaused;
 
@@ -3533,8 +3555,9 @@ TEST_F(DBTest2, CancelManualCompactionWithListener) {
   listener->num_compaction_started_ = 0;
   listener->num_compaction_ended_ = 0;
 
-  // Case II: 1 DisableManualCompaction, 2 Notify begin compaction (return
-  // without notifying), 3 Notify compaction end (return without notifying).
+  // Case II: 1 Set *canceled as true in the callback function to disable manual
+  // compaction, 2 Notify begin compaction (return without notifying), 3 Notify
+  // compaction end (return without notifying).
   ASSERT_TRUE(dbfull()
                   ->CompactRange(compact_options, nullptr, nullptr)
                   .IsManualCompactionPaused());
@@ -3545,8 +3568,8 @@ TEST_F(DBTest2, CancelManualCompactionWithListener) {
   ASSERT_EQ(running_compaction, 0);
 
   // Case III: 1 Notify begin compaction, 2 Compaction in between
-  // 3. DisableManualCompaction, , 4 Notify compaction end.
-  // compact_options.canceled->store(false, std::memory_order_release);
+  // 3. Set *canceled as true in the callback function to disable manual
+  // compaction, 4 Notify compaction end.
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
       "CompactionIterator:ProcessKV");
 
index 08d166de9a2353db630304619969ccbc4dda5e33..bd2fc96813efabd2fc0f9c9a20d7adfc5d6c1258 100644 (file)
@@ -456,6 +456,7 @@ Status FlushJob::MemPurge() {
         snapshot_checker_);
     assert(job_context_);
     SequenceNumber job_snapshot_seq = job_context_->GetJobSnapshotSequence();
+    const std::atomic<bool> kManualCompactionCanceledFalse{false};
     CompactionIterator c_iter(
         iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge,
         kMaxSequenceNumber, &existing_snapshots_,
@@ -464,10 +465,9 @@ Status FlushJob::MemPurge() {
         true /* internal key corruption is not ok */, range_del_agg.get(),
         nullptr, ioptions->allow_data_in_errors,
         ioptions->enforce_single_del_contracts,
+        /*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
         /*compaction=*/nullptr, compaction_filter.get(),
-        /*shutting_down=*/nullptr,
-        /*manual_compaction_paused=*/nullptr,
-        /*manual_compaction_canceled=*/nullptr, ioptions->info_log,
+        /*shutting_down=*/nullptr, ioptions->info_log,
         &(cfd_->GetFullHistoryTsLow()));
 
     // Set earliest sequence number in the new memtable
index bc092b24d19820f817fd723ee4b4c5d1e8cf2c46..8f54d063f76b016628db1d89429b07409a3dad31 100644 (file)
@@ -1834,6 +1834,13 @@ struct CompactRangeOptions {
   // Cancellation can be delayed waiting on automatic compactions when used
   // together with `exclusive_manual_compaction == true`.
   std::atomic<bool>* canceled = nullptr;
+  // NOTE: Calling DisableManualCompaction() overwrites the uer-provided
+  // canceled variable in CompactRangeOptions.
+  // Typically, when CompactRange is being called in one thread (t1) with
+  // canceled = false, and DisableManualCompaction is being called in the
+  // other thread (t2), manual compaction is disabled normally, even if the
+  // compaction iterator may still scan a few items before *canceled is
+  // set to true
 
   // If set to kForce, RocksDB will override enable_blob_file_garbage_collection
   // to true; if set to kDisable, RocksDB will override it to false, and