]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Add missing range conflict check between file ingestion and RefitLevel() (#10988)
authorHui Xiao <huixiao@fb.com>
Thu, 29 Dec 2022 23:05:36 +0000 (15:05 -0800)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Thu, 29 Dec 2022 23:05:36 +0000 (15:05 -0800)
Summary:
**Context:**
File ingestion never checks whether the key range it acts on overlaps with an ongoing RefitLevel() (used in `CompactRange()` with `change_level=true`). That's because RefitLevel() doesn't register and make its key range known to file ingestion. Though it checks overlapping with other compactions by https://github.com/facebook/rocksdb/blob/7.8.fb/db/external_sst_file_ingestion_job.cc#L998.

RefitLevel() (used in `CompactRange()` with `change_level=true`) doesn't check whether the key range it acts on overlaps with an ongoing file ingestion. That's because file ingestion does not register and make its key range known to other compactions.
- Note that non-refitlevel-compaction (e.g, manual compaction w/o RefitLevel() or general compaction) also does not check key range overlap with ongoing file ingestion for the same reason.
- But it's fine. Credited to cbi42's discovery, `WaitForIngestFile` was called by background and foreground compactions. They were introduced in https://github.com/facebook/rocksdb/commit/0f88160f67d36ea30e3aca3a3cef924c3a009be6, https://github.com/facebook/rocksdb/commit/5c64fb67d2fc198f1a73ff3ae543749a6a41f513 and https://github.com/facebook/rocksdb/commit/87dfc1d23e0e16ff73e15f63c6fa0fb3b3fc8c8c.
- Regardless, this PR registers file ingestion like a compaction is a general approach that will also add range conflict check between file ingestion and non-refitlevel-compaction, though it has not been the issue motivated this PR.

Above are bugs resulting in two bad consequences:
- If file ingestion and RefitLevel() creates files in the same level, then range-overlapped files will be created at that level and caught as corruption by `force_consistency_checks=true`
- If file ingestion and RefitLevel() creates file in different levels, then with one further compaction on the ingested file, it can result in two same keys both with seqno 0 in two different levels. Then with iterator's [optimization](https://github.com/facebook/rocksdb/blame/c62f3221698fd273b673d4f7e54eabb8329a4369/db/db_iter.cc#L342-L343) that assumes no two same keys both with seqno 0, it will either break this assertion in debug build or, even worst, return value of this same key for the key after it, which is the wrong value to return, in release build.

Therefore we decide to introduce range conflict check for file ingestion and RefitLevel() inspired from the existing range conflict check among compactions.

**Summary:**
- Treat file ingestion job and RefitLevel() as `Compaction` of new compaction reasons: `CompactionReason::kExternalSstIngestion` and `CompactionReason::kRefitLevel` and register/unregister them.  File ingestion is treated as compaction from L0 to different levels and RefitLevel() as compaction from source level to target level.
- Check for `RangeOverlapWithCompaction` with other ongoing compactions, `RegisterCompaction()` on this "compaction" before changing the LSM state in `VersionStorageInfo`, and `UnregisterCompaction()` after changing.
- Replace scattered fixes (https://github.com/facebook/rocksdb/commit/0f88160f67d36ea30e3aca3a3cef924c3a009be6, https://github.com/facebook/rocksdb/commit/5c64fb67d2fc198f1a73ff3ae543749a6a41f513 and https://github.com/facebook/rocksdb/commit/87dfc1d23e0e16ff73e15f63c6fa0fb3b3fc8c8c.) that prevents overlapping between file ingestion and non-refit-level compaction with this fix cuz those practices are easy to overlook.
- Misc: logic cleanup, see PR comments

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10988

Test Plan:
- New unit test `DBCompactionTestWithOngoingFileIngestionParam*` that failed pre-fix and passed afterwards.
- Made compatible with existing tests, see PR comments
- make check
- [Ongoing] Stress test rehearsal with normal value and aggressive CI value https://github.com/facebook/rocksdb/pull/10761

Reviewed By: cbi42

Differential Revision: D41535685

Pulled By: hx235

fbshipit-source-id: 549833a577ba1496d20a870583d4caa737da1258

19 files changed:
HISTORY.md
db/column_family.cc
db/compaction/compaction.cc
db/compaction/compaction_job.cc
db/compaction/compaction_picker.cc
db/compaction/compaction_picker_level.cc
db/db_bloom_filter_test.cc
db/db_compaction_test.cc
db/db_impl/db_impl.cc
db/db_impl/db_impl.h
db/db_impl/db_impl_compaction_flush.cc
db/external_sst_file_basic_test.cc
db/external_sst_file_ingestion_job.cc
db/external_sst_file_ingestion_job.h
db/external_sst_file_test.cc
include/rocksdb/listener.h
include/rocksdb/options.h
java/rocksjni/portal.h
java/src/main/java/org/rocksdb/CompactionReason.java

index 80aa22263dc15369d0c1257d0018c41858ab39ae..c0d3515500f978a8336640e539b468068f38889c 100644 (file)
@@ -15,6 +15,7 @@
 * Fixed a bug in LockWAL() leading to re-locking mutex (#11020).
 * Fixed a heap use after free bug in async scan prefetching when the scan thread and another thread try to read and load the same seek block into cache.
 * Fixed a heap use after free in async scan prefetching if dictionary compression is enabled, in which case sync read of the compression dictionary gets mixed with async prefetching
+* Fixed a data race bug of `CompactRange()` under `change_level=true` acts on overlapping range with an ongoing file ingestion for level compaction. This will either result in overlapping file ranges corruption at a certain level caught by `force_consistency_checks=true` or protentially two same keys both with seqno 0 in two different levels (i.e, new data ends up in lower/older level). The latter will be caught by assertion in debug build but go silently and result in read returning wrong result in release build. This fix is general so it also replaced previous fixes to a similar problem for `CompactFiles()` (#4665), general `CompactRange()` and auto compaction (commit 5c64fb6 and 87dfc1d).
 
 ### New Features
 * When an SstPartitionerFactory is configured, CompactRange() now automatically selects for compaction any files overlapping a partition boundary that is in the compaction range, even if no actual entries are in the requested compaction range. With this feature, manual compaction can be used to (re-)establish SST partition points when SstPartitioner changes, without a full compaction.
index d9875336c303a81fabb554218da533f980b4913b..8124b23cd7eda57caa9a27b8e7356e145f8b0e03 100644 (file)
@@ -1218,6 +1218,7 @@ Compaction* ColumnFamilyData::CompactRange(
   if (result != nullptr) {
     result->SetInputVersion(current_);
   }
+  TEST_SYNC_POINT("ColumnFamilyData::CompactRange:Return");
   return result;
 }
 
index 3d6d334dbefb0bb00ffd0a7a44585368cc149b07..8947545895bf757b655434668b8ed96d8b4a6bbb 100644 (file)
@@ -235,12 +235,19 @@ Compaction::Compaction(
       inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))),
       grandparents_(std::move(_grandparents)),
       score_(_score),
-      bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
+      bottommost_level_(
+          // For simplicity, we don't support the concept of "bottommost level"
+          // with
+          // `CompactionReason::kExternalSstIngestion` and
+          // `CompactionReason::kRefitLevel`
+          (_compaction_reason == CompactionReason::kExternalSstIngestion ||
+           _compaction_reason == CompactionReason::kRefitLevel)
+              ? false
+              : IsBottommostLevel(output_level_, vstorage, inputs_)),
       is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
       is_manual_compaction_(_manual_compaction),
       trim_ts_(_trim_ts),
       is_trivial_move_(false),
-
       compaction_reason_(_compaction_reason),
       notify_on_compaction_completion_(false),
       enable_blob_garbage_collection_(
@@ -255,8 +262,15 @@ Compaction::Compaction(
                   _blob_garbage_collection_age_cutoff > 1
               ? mutable_cf_options()->blob_garbage_collection_age_cutoff
               : _blob_garbage_collection_age_cutoff),
-      penultimate_level_(EvaluatePenultimateLevel(
-          vstorage, immutable_options_, start_level_, output_level_)) {
+      penultimate_level_(
+          // For simplicity, we don't support the concept of "penultimate level"
+          // with `CompactionReason::kExternalSstIngestion` and
+          // `CompactionReason::kRefitLevel`
+          _compaction_reason == CompactionReason::kExternalSstIngestion ||
+                  _compaction_reason == CompactionReason::kRefitLevel
+              ? Compaction::kInvalidLevel
+              : EvaluatePenultimateLevel(vstorage, immutable_options_,
+                                         start_level_, output_level_)) {
   MarkFilesBeingCompacted(true);
   if (is_manual_compaction_) {
     compaction_reason_ = CompactionReason::kManualCompaction;
index a30b21195b7860192d8ad699afc2dd1613cfe95c..0f1dde3273057d41bd2f501eb4e316af0d69ff27 100644 (file)
@@ -99,6 +99,8 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) {
       return "ForcedBlobGC";
     case CompactionReason::kRoundRobinTtl:
       return "RoundRobinTtl";
+    case CompactionReason::kRefitLevel:
+      return "RefitLevel";
     case CompactionReason::kNumOfReasons:
       // fall through
     default:
index de2570eeed85ad6127b5998d0a3b7d0d3d22654f..5fe058b56d19b82f3d652bf8064d718aff525148 100644 (file)
@@ -1126,7 +1126,11 @@ void CompactionPicker::RegisterCompaction(Compaction* c) {
          c->output_level() == 0 ||
          !FilesRangeOverlapWithCompaction(*c->inputs(), c->output_level(),
                                           c->GetPenultimateLevel()));
-  if (c->start_level() == 0 ||
+  // CompactionReason::kExternalSstIngestion's start level is just a placeholder
+  // number without actual meaning as file ingestion technically does not have
+  // an input level like other compactions
+  if ((c->start_level() == 0 &&
+       c->compaction_reason() != CompactionReason::kExternalSstIngestion) ||
       ioptions_.compaction_style == kCompactionStyleUniversal) {
     level0_compactions_in_progress_.insert(c);
   }
index 31987fc52b4f83fd52bfef33a4b2c0fd6b206fd5..2162d30a30a1dc00262dae0e7d1b1896621b310d 100644 (file)
@@ -447,21 +447,21 @@ bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() {
       compaction_inputs_.push_back(output_level_inputs_);
     }
 
+    // In some edge cases we could pick a compaction that will be compacting
+    // a key range that overlap with another running compaction, and both
+    // of them have the same output level. This could happen if
+    // (1) we are running a non-exclusive manual compaction
+    // (2) AddFile ingest a new file into the LSM tree
+    // We need to disallow this from happening.
+    if (compaction_picker_->FilesRangeOverlapWithCompaction(
+            compaction_inputs_, output_level_,
+            Compaction::EvaluatePenultimateLevel(
+                vstorage_, ioptions_, start_level_, output_level_))) {
+      // This compaction output could potentially conflict with the output
+      // of a currently running compaction, we cannot run it.
+      return false;
+    }
     if (!is_l0_trivial_move_) {
-      // In some edge cases we could pick a compaction that will be compacting
-      // a key range that overlap with another running compaction, and both
-      // of them have the same output level. This could happen if
-      // (1) we are running a non-exclusive manual compaction
-      // (2) AddFile ingest a new file into the LSM tree
-      // We need to disallow this from happening.
-      if (compaction_picker_->FilesRangeOverlapWithCompaction(
-              compaction_inputs_, output_level_,
-              Compaction::EvaluatePenultimateLevel(
-                  vstorage_, ioptions_, start_level_, output_level_))) {
-        // This compaction output could potentially conflict with the output
-        // of a currently running compaction, we cannot run it.
-        return false;
-      }
       compaction_picker_->GetGrandparents(vstorage_, start_level_inputs_,
                                           output_level_inputs_, &grandparents_);
     }
index d68ab61153d5b38f38c41f42155e8c609dcfc007..7bf509abec9b776da6b3aad499bbcce2ec08717e 100644 (file)
@@ -1229,7 +1229,7 @@ TEST_P(ChargeFilterConstructionTestWithParam, Basic) {
      *
      *  The test is designed in a way such that the reservation for (p1 - b')
      *  will trigger at least another dummy entry insertion
-     *  (or equivelantly to saying, creating another peak).
+     *  (or equivalently to saying, creating another peak).
      *
      * kStandard128Ribbon + FullFilter +
      * detect_filter_construct_corruption
@@ -2618,8 +2618,7 @@ TEST_F(DBBloomFilterTest, OptimizeFiltersForHits) {
       BottommostLevelCompaction::kSkip;
   compact_options.change_level = true;
   compact_options.target_level = 7;
-  ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)
-                  .IsNotSupported());
+  ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
 
   ASSERT_EQ(trivial_move, 1);
   ASSERT_EQ(non_trivial_move, 0);
index 0227c06e416cf91c8e57f941de381746b659d10e..2fffcae60724834f9ca140dc7db945084cfc60c5 100644 (file)
@@ -6245,6 +6245,231 @@ TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
   Close();
 }
 
+class DBCompactionTestWithOngoingFileIngestionParam
+    : public DBCompactionTest,
+      public testing::WithParamInterface<std::string> {
+ public:
+  DBCompactionTestWithOngoingFileIngestionParam() : DBCompactionTest() {
+    compaction_path_to_test_ = GetParam();
+  }
+  void SetupOptions() {
+    options_ = CurrentOptions();
+    options_.create_if_missing = true;
+
+    if (compaction_path_to_test_ == "RefitLevelCompactRange") {
+      options_.num_levels = 7;
+    } else {
+      options_.num_levels = 3;
+    }
+    options_.compaction_style = CompactionStyle::kCompactionStyleLevel;
+    if (compaction_path_to_test_ == "AutoCompaction") {
+      options_.disable_auto_compactions = false;
+      options_.level0_file_num_compaction_trigger = 1;
+    } else {
+      options_.disable_auto_compactions = true;
+    }
+  }
+
+  void PauseCompactionThread() {
+    sleeping_task_.reset(new test::SleepingBackgroundTask());
+    env_->SetBackgroundThreads(1, Env::LOW);
+    env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
+                   sleeping_task_.get(), Env::Priority::LOW);
+    sleeping_task_->WaitUntilSleeping();
+  }
+
+  void ResumeCompactionThread() {
+    if (sleeping_task_) {
+      sleeping_task_->WakeUp();
+      sleeping_task_->WaitUntilDone();
+    }
+  }
+
+  void SetupFilesToForceFutureFilesIngestedToCertainLevel() {
+    SstFileWriter sst_file_writer(EnvOptions(), options_);
+    std::string dummy = dbname_ + "/dummy.sst";
+    ASSERT_OK(sst_file_writer.Open(dummy));
+    ASSERT_OK(sst_file_writer.Put("k2", "dummy"));
+    ASSERT_OK(sst_file_writer.Finish());
+    ASSERT_OK(db_->IngestExternalFile({dummy}, IngestExternalFileOptions()));
+    // L2 is made to contain a file overlapped with files to be ingested in
+    // later steps on key "k2". This will force future files ingested to L1 or
+    // above.
+    ASSERT_EQ("0,0,1", FilesPerLevel(0));
+  }
+
+  void SetupSyncPoints() {
+    if (compaction_path_to_test_ == "AutoCompaction") {
+      SyncPoint::GetInstance()->SetCallBack(
+          "ExternalSstFileIngestionJob::Run", [&](void*) {
+            SyncPoint::GetInstance()->LoadDependency(
+                {{"DBImpl::BackgroundCompaction():AfterPickCompaction",
+                  "VersionSet::LogAndApply:WriteManifest"}});
+          });
+    } else if (compaction_path_to_test_ == "NonRefitLevelCompactRange") {
+      SyncPoint::GetInstance()->SetCallBack(
+          "ExternalSstFileIngestionJob::Run", [&](void*) {
+            SyncPoint::GetInstance()->LoadDependency(
+                {{"ColumnFamilyData::CompactRange:Return",
+                  "VersionSet::LogAndApply:WriteManifest"}});
+          });
+    } else if (compaction_path_to_test_ == "RefitLevelCompactRange") {
+      SyncPoint::GetInstance()->SetCallBack(
+          "ExternalSstFileIngestionJob::Run", [&](void*) {
+            SyncPoint::GetInstance()->LoadDependency(
+                {{"DBImpl::CompactRange:PostRefitLevel",
+                  "VersionSet::LogAndApply:WriteManifest"}});
+          });
+    } else if (compaction_path_to_test_ == "CompactFiles") {
+      SyncPoint::GetInstance()->SetCallBack(
+          "ExternalSstFileIngestionJob::Run", [&](void*) {
+            SyncPoint::GetInstance()->LoadDependency(
+                {{"DBImpl::CompactFilesImpl::PostSanitizeCompactionInputFiles",
+                  "VersionSet::LogAndApply:WriteManifest"}});
+          });
+    } else {
+      assert(false);
+    }
+    SyncPoint::GetInstance()->LoadDependency(
+        {{"ExternalSstFileIngestionJob::Run", "PreCompaction"}});
+    SyncPoint::GetInstance()->EnableProcessing();
+  }
+
+  void RunCompactionOverlappedWithFileIngestion() {
+    if (compaction_path_to_test_ == "AutoCompaction") {
+      TEST_SYNC_POINT("PreCompaction");
+      ResumeCompactionThread();
+      // Without proper range conflict check,
+      // this would have been `Status::Corruption` about overlapping ranges
+      Status s = dbfull()->TEST_WaitForCompact();
+      EXPECT_OK(s);
+    } else if (compaction_path_to_test_ == "NonRefitLevelCompactRange") {
+      CompactRangeOptions cro;
+      cro.change_level = false;
+      std::string start_key = "k1";
+      Slice start(start_key);
+      std::string end_key = "k4";
+      Slice end(end_key);
+      TEST_SYNC_POINT("PreCompaction");
+      // Without proper range conflict check,
+      // this would have been `Status::Corruption` about overlapping ranges
+      Status s = dbfull()->CompactRange(cro, &start, &end);
+      EXPECT_OK(s);
+    } else if (compaction_path_to_test_ == "RefitLevelCompactRange") {
+      CompactRangeOptions cro;
+      cro.change_level = true;
+      cro.target_level = 5;
+      std::string start_key = "k1";
+      Slice start(start_key);
+      std::string end_key = "k4";
+      Slice end(end_key);
+      TEST_SYNC_POINT("PreCompaction");
+      Status s = dbfull()->CompactRange(cro, &start, &end);
+      // Without proper range conflict check,
+      // this would have been `Status::Corruption` about overlapping ranges
+      // To see this, remove the fix AND replace
+      // `DBImpl::CompactRange:PostRefitLevel` in sync point dependency with
+      // `DBImpl::ReFitLevel:PostRegisterCompaction`
+      EXPECT_TRUE(s.IsNotSupported());
+      EXPECT_TRUE(s.ToString().find("some ongoing compaction's output") !=
+                  std::string::npos);
+    } else if (compaction_path_to_test_ == "CompactFiles") {
+      ColumnFamilyMetaData cf_meta_data;
+      db_->GetColumnFamilyMetaData(&cf_meta_data);
+      ASSERT_EQ(cf_meta_data.levels[0].files.size(), 1);
+      std::vector<std::string> input_files;
+      for (const auto& file : cf_meta_data.levels[0].files) {
+        input_files.push_back(file.name);
+      }
+      TEST_SYNC_POINT("PreCompaction");
+      Status s = db_->CompactFiles(CompactionOptions(), input_files, 1);
+      // Without proper range conflict check,
+      // this would have been `Status::Corruption` about overlapping ranges
+      EXPECT_TRUE(s.IsAborted());
+      EXPECT_TRUE(
+          s.ToString().find(
+              "A running compaction is writing to the same output level") !=
+          std::string::npos);
+    } else {
+      assert(false);
+    }
+  }
+
+  void DisableSyncPoints() {
+    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  }
+
+ protected:
+  std::string compaction_path_to_test_;
+  Options options_;
+  std::shared_ptr<test::SleepingBackgroundTask> sleeping_task_;
+};
+
+INSTANTIATE_TEST_CASE_P(DBCompactionTestWithOngoingFileIngestionParam,
+                        DBCompactionTestWithOngoingFileIngestionParam,
+                        ::testing::Values("AutoCompaction",
+                                          "NonRefitLevelCompactRange",
+                                          "RefitLevelCompactRange",
+                                          "CompactFiles"));
+
+TEST_P(DBCompactionTestWithOngoingFileIngestionParam, RangeConflictCheck) {
+  SetupOptions();
+  DestroyAndReopen(options_);
+
+  if (compaction_path_to_test_ == "AutoCompaction") {
+    PauseCompactionThread();
+  }
+
+  if (compaction_path_to_test_ != "RefitLevelCompactRange") {
+    SetupFilesToForceFutureFilesIngestedToCertainLevel();
+  }
+
+  // Create s1
+  ASSERT_OK(Put("k1", "v"));
+  ASSERT_OK(Put("k4", "v"));
+  ASSERT_OK(Flush());
+  if (compaction_path_to_test_ == "RefitLevelCompactRange") {
+    MoveFilesToLevel(6 /* level */);
+    ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel(0));
+  } else {
+    ASSERT_EQ("1,0,1", FilesPerLevel(0));
+  }
+
+  // To coerce following sequence of events
+  // Timeline   Thread 1 (Ingest s2)          Thread 2 (Compact s1)
+  // t0   |     Decide to output to Lk
+  // t1   |     Release lock in LogAndApply()
+  // t2   |                                    Acquire lock
+  // t3   |                                    Decides to compact to Lk
+  //      |                                    Expected to fail due to range
+  //      |                                    conflict check with file
+  //      |                                    ingestion
+  // t4   |                                    Release lock in LogAndApply()
+  // t5   |    Acquire lock again and finish
+  // t6   |                                    Acquire lock again and finish
+  SetupSyncPoints();
+
+  // Ingest s2
+  port::Thread thread1([&] {
+    SstFileWriter sst_file_writer(EnvOptions(), options_);
+    std::string s2 = dbname_ + "/ingested_s2.sst";
+    ASSERT_OK(sst_file_writer.Open(s2));
+    ASSERT_OK(sst_file_writer.Put("k2", "v2"));
+    ASSERT_OK(sst_file_writer.Put("k3", "v2"));
+    ASSERT_OK(sst_file_writer.Finish());
+    ASSERT_OK(db_->IngestExternalFile({s2}, IngestExternalFileOptions()));
+  });
+
+  // Compact s1. Without proper range conflict check,
+  // this will encounter overlapping file corruption.
+  port::Thread thread2([&] { RunCompactionOverlappedWithFileIngestion(); });
+
+  thread1.join();
+  thread2.join();
+  DisableSyncPoints();
+}
+
 TEST_F(DBCompactionTest, ConsistencyFailTest) {
   Options options = CurrentOptions();
   options.force_consistency_checks = true;
index 411503a6fe17055a5abbee146ebfdc4afe712595..f930878dee772c2151be5533806b1d7375086bbd 100644 (file)
@@ -5199,8 +5199,9 @@ Status DBImpl::IngestExternalFiles(
   for (const auto& arg : args) {
     auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
     ingestion_jobs.emplace_back(versions_.get(), cfd, immutable_db_options_,
-                                file_options_, &snapshots_, arg.options,
-                                &directories_, &event_logger_, io_tracer_);
+                                mutable_db_options_, file_options_, &snapshots_,
+                                arg.options, &directories_, &event_logger_,
+                                io_tracer_);
   }
 
   // TODO(yanqin) maybe make jobs run in parallel
@@ -5333,6 +5334,7 @@ Status DBImpl::IngestExternalFiles(
         if (!status.ok()) {
           break;
         }
+        ingestion_jobs[i].RegisterRange();
       }
     }
     if (status.ok()) {
@@ -5388,6 +5390,10 @@ Status DBImpl::IngestExternalFiles(
       }
     }
 
+    for (auto& job : ingestion_jobs) {
+      job.UnregisterRange();
+    }
+
     if (status.ok()) {
       for (size_t i = 0; i != num_cfs; ++i) {
         auto* cfd =
@@ -5759,13 +5765,6 @@ void DBImpl::NotifyOnExternalFileIngested(
   }
 }
 
-void DBImpl::WaitForIngestFile() {
-  mutex_.AssertHeld();
-  while (num_running_ingest_file_ > 0) {
-    bg_cv_.Wait();
-  }
-}
-
 Status DBImpl::StartTrace(const TraceOptions& trace_options,
                           std::unique_ptr<TraceWriter>&& trace_writer) {
   InstrumentedMutexLock lock(&trace_mutex_);
index 0eebef77483a55e482b16a49146b0e2ef5fb8854..9fcd9efea24c29b15980b23ffc7088f9ec06234b 100644 (file)
@@ -2023,14 +2023,6 @@ class DBImpl : public DB {
                           const int output_level, int output_path_id,
                           JobContext* job_context, LogBuffer* log_buffer,
                           CompactionJobInfo* compaction_job_info);
-
-  // Wait for current IngestExternalFile() calls to finish.
-  // REQUIRES: mutex_ held
-  void WaitForIngestFile();
-#else
-  // IngestExternalFile is not supported in ROCKSDB_LITE so this function
-  // will be no-op
-  void WaitForIngestFile() {}
 #endif  // ROCKSDB_LITE
 
   ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);
index 95ee948eb2f19574d64eaac2a65af82c7ae5fda7..f84588276c1f813c4d477edfd0a35cd6d528ca4b 100644 (file)
@@ -1249,6 +1249,12 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
 
     ROCKS_LOG_INFO(immutable_db_options_.info_log,
                    "[RefitLevel] waiting for background threads to stop");
+    // TODO(hx235): remove `Enable/DisableManualCompaction` and
+    // `Continue/PauseBackgroundWork` once we ensure registering RefitLevel()'s
+    // range is sufficient (if not, what else is needed) for avoiding range
+    // conflicts with other activities (e.g, compaction, flush) that are
+    // currently avoided by `Enable/DisableManualCompaction` and
+    // `Continue/PauseBackgroundWork`.
     DisableManualCompaction();
     s = PauseBackgroundWork();
     if (s.ok()) {
@@ -1313,13 +1319,6 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
           const_cast<std::atomic<int>*>(&manual_compaction_paused_)));
   {
     InstrumentedMutexLock l(&mutex_);
-
-    // This call will unlock/lock the mutex to wait for current running
-    // IngestExternalFile() calls to finish.
-    WaitForIngestFile();
-
-    // We need to get current after `WaitForIngestFile`, because
-    // `IngestExternalFile` may add files that overlap with `input_file_names`
     auto* current = cfd->current();
     current->Ref();
 
@@ -1398,6 +1397,7 @@ Status DBImpl::CompactFilesImpl(
 
   Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
       &input_set, cf_meta, output_level);
+  TEST_SYNC_POINT("DBImpl::CompactFilesImpl::PostSanitizeCompactionInputFiles");
   if (!s.ok()) {
     return s;
   }
@@ -1691,6 +1691,10 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
 
   InstrumentedMutexLock guard_lock(&mutex_);
 
+  auto* vstorage = cfd->current()->storage_info();
+  if (vstorage->LevelFiles(level).empty()) {
+    return Status::OK();
+  }
   // only allow one thread refitting
   if (refitting_level_) {
     ROCKS_LOG_INFO(immutable_db_options_.info_log,
@@ -1706,8 +1710,16 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
     to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
   }
 
-  auto* vstorage = cfd->current()->storage_info();
   if (to_level != level) {
+    std::vector<CompactionInputFiles> input(1);
+    input[0].level = level;
+    for (auto& f : vstorage->LevelFiles(level)) {
+      input[0].files.push_back(f);
+    }
+    InternalKey refit_level_smallest;
+    InternalKey refit_level_largest;
+    cfd->compaction_picker()->GetRange(input[0], &refit_level_smallest,
+                                       &refit_level_largest);
     if (to_level > level) {
       if (level == 0) {
         refitting_level_ = false;
@@ -1721,6 +1733,14 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
           return Status::NotSupported(
               "Levels between source and target are not empty for a move.");
         }
+        if (cfd->RangeOverlapWithCompaction(refit_level_smallest.user_key(),
+                                            refit_level_largest.user_key(),
+                                            l)) {
+          refitting_level_ = false;
+          return Status::NotSupported(
+              "Levels between source and target "
+              "will have some ongoing compaction's output.");
+        }
       }
     } else {
       // to_level < level
@@ -1731,12 +1751,39 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
           return Status::NotSupported(
               "Levels between source and target are not empty for a move.");
         }
+        if (cfd->RangeOverlapWithCompaction(refit_level_smallest.user_key(),
+                                            refit_level_largest.user_key(),
+                                            l)) {
+          refitting_level_ = false;
+          return Status::NotSupported(
+              "Levels between source and target "
+              "will have some ongoing compaction's output.");
+        }
       }
     }
     ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
                     "[%s] Before refitting:\n%s", cfd->GetName().c_str(),
                     cfd->current()->DebugString().data());
 
+    std::unique_ptr<Compaction> c(new Compaction(
+        vstorage, *cfd->ioptions(), mutable_cf_options, mutable_db_options_,
+        {input}, to_level,
+        MaxFileSizeForLevel(
+            mutable_cf_options, to_level,
+            cfd->ioptions()
+                ->compaction_style) /* output file size limit, not applicable */
+        ,
+        LLONG_MAX /* max compaction bytes, not applicable */,
+        0 /* output path ID, not applicable */, mutable_cf_options.compression,
+        mutable_cf_options.compression_opts, Temperature::kUnknown,
+        0 /* max_subcompactions, not applicable */,
+        {} /* grandparents, not applicable */, false /* is manual */,
+        "" /* trim_ts */, -1 /* score, not applicable */,
+        false /* is deletion compaction, not applicable */,
+        false /* l0_files_might_overlap, not applicable */,
+        CompactionReason::kRefitLevel));
+    cfd->compaction_picker()->RegisterCompaction(c.get());
+    TEST_SYNC_POINT("DBImpl::ReFitLevel:PostRegisterCompaction");
     VersionEdit edit;
     edit.SetColumnFamily(cfd->GetID());
 
@@ -1757,6 +1804,9 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
     Status status = versions_->LogAndApply(cfd, mutable_cf_options, &edit,
                                            &mutex_, directories_.GetDbDir());
 
+    cfd->compaction_picker()->UnregisterCompaction(c.get());
+    c.reset();
+
     InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
 
     ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
@@ -1973,9 +2023,6 @@ Status DBImpl::RunManualCompaction(
                manual.begin, manual.end, &manual.manual_end, &manual_conflict,
                max_file_num_to_ignore, trim_ts)) == nullptr &&
           manual_conflict))) {
-      // exclusive manual compactions should not see a conflict during
-      // CompactRange
-      assert(!exclusive || !manual_conflict);
       // Running either this or some other manual compaction
       bg_cv_.Wait();
       if (manual_compaction_paused_ > 0 && scheduled && !unscheduled) {
@@ -3004,10 +3051,6 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
   {
     InstrumentedMutexLock l(&mutex_);
 
-    // This call will unlock/lock the mutex to wait for current running
-    // IngestExternalFile() calls to finish.
-    WaitForIngestFile();
-
     num_running_compactions_++;
 
     std::unique_ptr<std::list<uint64_t>::iterator>
@@ -3649,11 +3692,6 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
 }
 
 bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
-  if (num_running_ingest_file_ > 0) {
-    // We need to wait for other IngestExternalFile() calls to finish
-    // before running a manual compaction.
-    return true;
-  }
   if (m->exclusive) {
     return (bg_bottom_compaction_scheduled_ > 0 ||
             bg_compaction_scheduled_ > 0);
index 665c89869e25aab512f84c47f305354152795b3a..97b9897339c5022800b1e21823833c4aa2ad7ba0 100644 (file)
@@ -800,6 +800,7 @@ TEST_P(ExternalSSTFileBasicTest, IngestFileWithMultipleValueType) {
   bool verify_checksums_before_ingest = std::get<1>(GetParam());
   do {
     Options options = CurrentOptions();
+    options.disable_auto_compactions = true;
     options.merge_operator.reset(new TestPutOperator());
     DestroyAndReopen(options);
     std::map<std::string, std::string> true_data;
index 849f98e874f38b0a5aa038d78e8d5ecc560134af..6437fdc498c3e4f15dff9b524aa4e5fbb80f70df 100644 (file)
@@ -477,9 +477,82 @@ Status ExternalSstFileIngestionJob::Run() {
     f_metadata.temperature = f.file_temperature;
     edit_.AddFile(f.picked_level, f_metadata);
   }
+
+  CreateEquivalentFileIngestingCompactions();
   return status;
 }
 
+void ExternalSstFileIngestionJob::CreateEquivalentFileIngestingCompactions() {
+  // A map from output level to input of compactions equivalent to this
+  // ingestion job.
+  // TODO: simplify below logic to creating compaction per ingested file
+  // instead of per output level, once we figure out how to treat ingested files
+  // with adjacent range deletion tombstones to same output level in the same
+  // job as non-overlapping compactions.
+  std::map<int, CompactionInputFiles>
+      output_level_to_file_ingesting_compaction_input;
+
+  for (const auto& pair : edit_.GetNewFiles()) {
+    int output_level = pair.first;
+    const FileMetaData& f_metadata = pair.second;
+
+    CompactionInputFiles& input =
+        output_level_to_file_ingesting_compaction_input[output_level];
+    if (input.files.empty()) {
+      // Treat the source level of ingested files to be level 0
+      input.level = 0;
+    }
+
+    compaction_input_metdatas_.push_back(new FileMetaData(f_metadata));
+    input.files.push_back(compaction_input_metdatas_.back());
+  }
+
+  for (const auto& pair : output_level_to_file_ingesting_compaction_input) {
+    int output_level = pair.first;
+    const CompactionInputFiles& input = pair.second;
+
+    const auto& mutable_cf_options = *(cfd_->GetLatestMutableCFOptions());
+    file_ingesting_compactions_.push_back(new Compaction(
+        cfd_->current()->storage_info(), *cfd_->ioptions(), mutable_cf_options,
+        mutable_db_options_, {input}, output_level,
+        MaxFileSizeForLevel(
+            mutable_cf_options, output_level,
+            cfd_->ioptions()->compaction_style) /* output file size
+            limit,
+                                                 * not applicable
+                                                 */
+        ,
+        LLONG_MAX /* max compaction bytes, not applicable */,
+        0 /* output path ID, not applicable */, mutable_cf_options.compression,
+        mutable_cf_options.compression_opts, Temperature::kUnknown,
+        0 /* max_subcompaction, not applicable */,
+        {} /* grandparents, not applicable */, false /* is manual */,
+        "" /* trim_ts */, -1 /* score, not applicable */,
+        false /* is deletion compaction, not applicable */,
+        files_overlap_ /* l0_files_might_overlap, not applicable */,
+        CompactionReason::kExternalSstIngestion));
+  }
+}
+
+void ExternalSstFileIngestionJob::RegisterRange() {
+  for (const auto& c : file_ingesting_compactions_) {
+    cfd_->compaction_picker()->RegisterCompaction(c);
+  }
+}
+
+void ExternalSstFileIngestionJob::UnregisterRange() {
+  for (const auto& c : file_ingesting_compactions_) {
+    cfd_->compaction_picker()->UnregisterCompaction(c);
+    delete c;
+  }
+  file_ingesting_compactions_.clear();
+
+  for (const auto& f : compaction_input_metdatas_) {
+    delete f;
+  }
+  compaction_input_metdatas_.clear();
+}
+
 void ExternalSstFileIngestionJob::UpdateStats() {
   // Update internal stats for new ingested files
   uint64_t total_keys = 0;
@@ -798,8 +871,16 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
     if (lvl > 0 && lvl < vstorage->base_level()) {
       continue;
     }
-
-    if (vstorage->NumLevelFiles(lvl) > 0) {
+    if (cfd_->RangeOverlapWithCompaction(
+            file_to_ingest->smallest_internal_key.user_key(),
+            file_to_ingest->largest_internal_key.user_key(), lvl)) {
+      // We must use L0 or any level higher than `lvl` to be able to overwrite
+      // the compaction output keys that we overlap with in this level, We also
+      // need to assign this file a seqno to overwrite the compaction output
+      // keys in level `lvl`
+      overlap_with_db = true;
+      break;
+    } else if (vstorage->NumLevelFiles(lvl) > 0) {
       bool overlap_with_level = false;
       status = sv->current->OverlapWithLevelIterator(
           ro, env_options_, file_to_ingest->smallest_internal_key.user_key(),
@@ -856,6 +937,7 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
       target_level < cfd_->NumberLevels() - 1) {
     status = Status::TryAgain(
         "Files cannot be ingested to Lmax. Please make sure key range of Lmax "
+        "and ongoing compaction's output to Lmax"
         "does not overlap with files to ingest.");
     return status;
   }
@@ -873,7 +955,7 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
 Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile(
     IngestedFileInfo* file_to_ingest) {
   auto* vstorage = cfd_->current()->storage_info();
-  // first check if new files fit in the bottommost level
+  // First, check if new files fit in the bottommost level
   int bottom_lvl = cfd_->NumberLevels() - 1;
   if (!IngestedFileFitInLevel(file_to_ingest, bottom_lvl)) {
     return Status::InvalidArgument(
@@ -881,7 +963,7 @@ Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile(
         "at the bottommost level!");
   }
 
-  // second check if despite allow_ingest_behind=true we still have 0 seqnums
+  // Second, check if despite allow_ingest_behind=true we still have 0 seqnums
   // at some upper level
   for (int lvl = 0; lvl < cfd_->NumberLevels() - 1; lvl++) {
     for (auto file : vstorage->LevelFiles(lvl)) {
@@ -997,14 +1079,8 @@ bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
     // add it to this level
     return false;
   }
-  if (cfd_->RangeOverlapWithCompaction(file_smallest_user_key,
-                                       file_largest_user_key, level)) {
-    // File overlap with a running compaction output that will be stored
-    // in this level, we cannot add this file to this level
-    return false;
-  }
 
-  // File did not overlap with level files, our compaction output
+  // File did not overlap with level files, nor compaction output
   return true;
 }
 
index ce50ae86da188e077c5af4e121b3e70a9dc7930b..49bb1e31e59f351ca30d887c31063e0e5d346a1d 100644 (file)
@@ -11,6 +11,7 @@
 #include "db/column_family.h"
 #include "db/internal_stats.h"
 #include "db/snapshot_impl.h"
+#include "db/version_edit.h"
 #include "env/file_system_tracer.h"
 #include "logging/event_logger.h"
 #include "options/db_options.h"
@@ -78,7 +79,8 @@ class ExternalSstFileIngestionJob {
  public:
   ExternalSstFileIngestionJob(
       VersionSet* versions, ColumnFamilyData* cfd,
-      const ImmutableDBOptions& db_options, const EnvOptions& env_options,
+      const ImmutableDBOptions& db_options,
+      const MutableDBOptions& mutable_db_options, const EnvOptions& env_options,
       SnapshotList* db_snapshots,
       const IngestExternalFileOptions& ingestion_options,
       Directories* directories, EventLogger* event_logger,
@@ -88,6 +90,7 @@ class ExternalSstFileIngestionJob {
         versions_(versions),
         cfd_(cfd),
         db_options_(db_options),
+        mutable_db_options_(mutable_db_options),
         env_options_(env_options),
         db_snapshots_(db_snapshots),
         ingestion_options_(ingestion_options),
@@ -99,6 +102,17 @@ class ExternalSstFileIngestionJob {
     assert(directories != nullptr);
   }
 
+  ~ExternalSstFileIngestionJob() {
+    for (const auto& c : file_ingesting_compactions_) {
+      cfd_->compaction_picker()->UnregisterCompaction(c);
+      delete c;
+    }
+
+    for (const auto& f : compaction_input_metdatas_) {
+      delete f;
+    }
+  }
+
   // Prepare the job by copying external files into the DB.
   Status Prepare(const std::vector<std::string>& external_files_paths,
                  const std::vector<std::string>& files_checksums,
@@ -120,6 +134,15 @@ class ExternalSstFileIngestionJob {
   // REQUIRES: Mutex held
   Status Run();
 
+  // Register key range involved in this ingestion job
+  // to prevent key range conflict with other ongoing compaction/file ingestion
+  // REQUIRES: Mutex held
+  void RegisterRange();
+
+  // Unregister key range registered for this ingestion job
+  // REQUIRES: Mutex held
+  void UnregisterRange();
+
   // Update column family stats.
   // REQUIRES: Mutex held
   void UpdateStats();
@@ -175,11 +198,17 @@ class ExternalSstFileIngestionJob {
   template <typename TWritableFile>
   Status SyncIngestedFile(TWritableFile* file);
 
+  // Create equivalent `Compaction` objects to this file ingestion job
+  // , which will be used to check range conflict with other ongoing
+  // compactions.
+  void CreateEquivalentFileIngestingCompactions();
+
   SystemClock* clock_;
   FileSystemPtr fs_;
   VersionSet* versions_;
   ColumnFamilyData* cfd_;
   const ImmutableDBOptions& db_options_;
+  const MutableDBOptions& mutable_db_options_;
   const EnvOptions& env_options_;
   SnapshotList* db_snapshots_;
   autovector<IngestedFileInfo> files_to_ingest_;
@@ -196,6 +225,14 @@ class ExternalSstFileIngestionJob {
   // file_checksum_gen_factory is set, DB will generate checksum each file.
   bool need_generate_file_checksum_{true};
   std::shared_ptr<IOTracer> io_tracer_;
+
+  // Below are variables used in (un)registering range for this ingestion job
+  //
+  // FileMetaData used in inputs of compactions equivalent to this ingestion
+  // job
+  std::vector<FileMetaData*> compaction_input_metdatas_;
+  // Compactions equivalent to this ingestion job
+  std::vector<Compaction*> file_ingesting_compactions_;
 };
 
 }  // namespace ROCKSDB_NAMESPACE
index d16f6a58c45562a03f54b7843e4b00967c40574e..fcb2e7c9435cd49e6ca86908209b5e6a6c2fdbe1 100644 (file)
@@ -973,7 +973,7 @@ TEST_F(ExternalSSTFileTest, MultiThreaded) {
 
   do {
     Options options = CurrentOptions();
-
+    options.disable_auto_compactions = true;
     std::atomic<int> thread_num(0);
     std::function<void()> write_file_func = [&]() {
       int file_idx = thread_num.fetch_add(1);
@@ -1249,8 +1249,9 @@ TEST_P(ExternalSSTFileTest, PickedLevel) {
 
   // This file overlaps with file 0 (L3), file 1 (L2) and the
   // output of compaction going to L1
-  ASSERT_OK(GenerateAndAddExternalFile(options, {4, 7}, -1, false, false, true,
-                                       false, false, &true_data));
+  ASSERT_OK(GenerateAndAddExternalFile(options, {4, 7}, -1,
+                                       true /* allow_global_seqno */, false,
+                                       true, false, false, &true_data));
   EXPECT_EQ(FilesPerLevel(), "5,0,1,1");
 
   // This file does not overlap with any file or with the running compaction
@@ -1270,106 +1271,6 @@ TEST_P(ExternalSSTFileTest, PickedLevel) {
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
 
-TEST_F(ExternalSSTFileTest, PickedLevelBug) {
-  env_->skip_fsync_ = true;
-  Options options = CurrentOptions();
-  options.disable_auto_compactions = false;
-  options.level0_file_num_compaction_trigger = 3;
-  options.num_levels = 2;
-  DestroyAndReopen(options);
-
-  std::vector<int> file_keys;
-
-  // file #1 in L0
-  file_keys = {0, 5, 7};
-  for (int k : file_keys) {
-    ASSERT_OK(Put(Key(k), Key(k)));
-  }
-  ASSERT_OK(Flush());
-
-  // file #2 in L0
-  file_keys = {4, 6, 8, 9};
-  for (int k : file_keys) {
-    ASSERT_OK(Put(Key(k), Key(k)));
-  }
-  ASSERT_OK(Flush());
-
-  // We have 2 overlapping files in L0
-  EXPECT_EQ(FilesPerLevel(), "2");
-
-  ASSERT_OK(dbfull()->TEST_WaitForCompact());
-
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
-      {{"DBImpl::IngestExternalFile:AfterIncIngestFileCounter",
-        "ExternalSSTFileTest::PickedLevelBug:0"},
-       {"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"},
-       {"ExternalSSTFileTest::PickedLevelBug:2",
-        "DBImpl::RunManualCompaction:0"},
-       {"ExternalSSTFileTest::PickedLevelBug:3",
-        "DBImpl::RunManualCompaction:1"}});
-
-  std::atomic<bool> bg_compact_started(false);
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
-      "DBImpl::BackgroundCompaction:Start",
-      [&](void* /*arg*/) { bg_compact_started.store(true); });
-
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
-
-  Status bg_compact_status;
-  Status bg_addfile_status;
-
-  {
-    // While writing the MANIFEST start a thread that will ask for compaction
-    ThreadGuard bg_compact(port::Thread([&]() {
-      bg_compact_status =
-          db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
-    }));
-    TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:2");
-
-    // Start a thread that will ingest a new file
-    ThreadGuard bg_addfile(port::Thread([&]() {
-      file_keys = {1, 2, 3};
-      bg_addfile_status = GenerateAndAddExternalFile(options, file_keys, 1);
-    }));
-
-    // Wait for AddFile to start picking levels and writing MANIFEST
-    TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0");
-
-    TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:3");
-
-    // We need to verify that no compactions can run while AddFile is
-    // ingesting the files into the levels it find suitable. So we will
-    // wait for 2 seconds to give a chance for compactions to run during
-    // this period, and then make sure that no compactions where able to run
-    env_->SleepForMicroseconds(1000000 * 2);
-    bool bg_compact_started_tmp = bg_compact_started.load();
-
-    // Hold AddFile from finishing writing the MANIFEST
-    TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:1");
-
-    // check the status at the end, so even if the ASSERT fails the threads
-    // could be joined and return.
-    ASSERT_FALSE(bg_compact_started_tmp);
-  }
-
-  ASSERT_OK(bg_addfile_status);
-  ASSERT_OK(bg_compact_status);
-
-  ASSERT_OK(dbfull()->TEST_WaitForCompact());
-
-  int total_keys = 0;
-  Iterator* iter = db_->NewIterator(ReadOptions());
-  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
-    ASSERT_OK(iter->status());
-    total_keys++;
-  }
-  ASSERT_EQ(total_keys, 10);
-
-  delete iter;
-
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
-}
-
 TEST_F(ExternalSSTFileTest, IngestNonExistingFile) {
   Options options = CurrentOptions();
   DestroyAndReopen(options);
@@ -1420,7 +1321,8 @@ TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) {
   int range_id = 0;
   std::vector<int> file_keys;
   std::function<void()> bg_addfile = [&]() {
-    ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id));
+    ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id,
+                                         true /* allow_global_seqno */));
   };
 
   const int num_of_ranges = 1000;
@@ -1503,8 +1405,9 @@ TEST_F(ExternalSSTFileTest, PickedLevelDynamic) {
 
   // This file overlaps with the output of the compaction (going to L3)
   // so the file will be added to L0 since L3 is the base level
-  ASSERT_OK(GenerateAndAddExternalFile(options, {31, 32, 33, 34}, -1, false,
-                                       false, true, false, false, &true_data));
+  ASSERT_OK(GenerateAndAddExternalFile(options, {31, 32, 33, 34}, -1,
+                                       true /* allow_global_seqno */, false,
+                                       true, false, false, &true_data));
   EXPECT_EQ(FilesPerLevel(), "5");
 
   // This file does not overlap with the current running compactiong
@@ -1642,14 +1545,15 @@ TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) {
 
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
       "CompactionJob::Run():Start", [&](void* /*arg*/) {
-        // fit in L3 but will overlap with compaction so will be added
-        // to L2 but a compaction will trivially move it to L3
-        // and break LSM consistency
+        // Fit in L3 but will overlap with the compaction output so will be
+        // added to L2. Prior to the fix, a compaction will then trivially move
+        // this file to L3 and break LSM consistency
         static std::atomic<bool> called = {false};
         if (!called) {
           called = true;
           ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}}));
-          ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7));
+          ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7,
+                                               true /* allow_global_seqno */));
         }
       });
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
index 8644fcf3f5d0cf5ee7270fafb93ff4d134b3419d..853b587581bcd265f4c369bbbaaaf67b667d33ca 100644 (file)
@@ -140,7 +140,10 @@ enum class CompactionReason : int {
   // According to the comments in flush_job.cc, RocksDB treats flush as
   // a level 0 compaction in internal stats.
   kFlush,
-  // Compaction caused by external sst file ingestion
+  // [InternalOnly] External sst file ingestion treated as a compaction
+  // with placeholder input level L0 as file ingestion
+  // technically does not have an input level like other compactions.
+  // Used only for internal stats and conflict checking with other compactions
   kExternalSstIngestion,
   // Compaction due to SST file being too old
   kPeriodicCompaction,
@@ -151,6 +154,9 @@ enum class CompactionReason : int {
   // A special TTL compaction for RoundRobin policy, which basically the same as
   // kLevelMaxLevelSize, but the goal is to compact TTLed files.
   kRoundRobinTtl,
+  // [InternalOnly] DBImpl::ReFitLevel treated as a compaction,
+  // Used only for internal conflict checking with other compactions
+  kRefitLevel,
   // total number of compaction reasons, new reasons must be added above this.
   kNumOfReasons,
 };
index 7a4d8b5a6892702584a1562111c3ad44c121a45e..e9832882641a67eb7a039c58cbb0f3890dfe3f37 100644 (file)
@@ -1933,7 +1933,8 @@ struct IngestExternalFileOptions {
   // that where created before the file was ingested.
   bool snapshot_consistency = true;
   // If set to false, IngestExternalFile() will fail if the file key range
-  // overlaps with existing keys or tombstones in the DB.
+  // overlaps with existing keys or tombstones or output of ongoing compaction
+  // during file ingestion in the DB.
   bool allow_global_seqno = true;
   // If set to false and the file key range overlaps with the memtable key range
   // (memtable flush required), IngestExternalFile will fail.
index 340199507b2ba2a8063dfa32bb0ee616a0104375..1a72507a935693dbe51f8ced44bf1b251b6db135 100644 (file)
@@ -7181,6 +7181,16 @@ class CompactionReasonJni {
         return 0x0C;
       case ROCKSDB_NAMESPACE::CompactionReason::kExternalSstIngestion:
         return 0x0D;
+      case ROCKSDB_NAMESPACE::CompactionReason::kPeriodicCompaction:
+        return 0x0E;
+      case ROCKSDB_NAMESPACE::CompactionReason::kChangeTemperature:
+        return 0x0F;
+      case ROCKSDB_NAMESPACE::CompactionReason::kForcedBlobGC:
+        return 0x11;
+      case ROCKSDB_NAMESPACE::CompactionReason::kRoundRobinTtl:
+        return 0x12;
+      case ROCKSDB_NAMESPACE::CompactionReason::kRefitLevel:
+        return 0x13;
       default:
         return 0x7F;  // undefined
     }
@@ -7225,6 +7235,12 @@ class CompactionReasonJni {
         return ROCKSDB_NAMESPACE::CompactionReason::kPeriodicCompaction;
       case 0x0F:
         return ROCKSDB_NAMESPACE::CompactionReason::kChangeTemperature;
+      case 0x11:
+        return ROCKSDB_NAMESPACE::CompactionReason::kForcedBlobGC;
+      case 0x12:
+        return ROCKSDB_NAMESPACE::CompactionReason::kRoundRobinTtl;
+      case 0x13:
+        return ROCKSDB_NAMESPACE::CompactionReason::kRefitLevel;
       default:
         // undefined/default
         return ROCKSDB_NAMESPACE::CompactionReason::kUnknown;
index 24e23445041f8b1692de7a02f559962496dbf40a..46ec33f3f14178efce559384504aaa83bf29e428 100644 (file)
@@ -88,7 +88,23 @@ public enum CompactionReason {
   /**
    * Compaction in order to move files to temperature
    */
-  kChangeTemperature((byte) 0x0F);
+  kChangeTemperature((byte) 0x0F),
+
+  /**
+   * Compaction scheduled to force garbage collection of blob files
+   */
+  kForcedBlobGC((byte) 0x11),
+
+  /**
+   * A special TTL compaction for RoundRobin policy, which basically the same as
+   * kLevelMaxLevelSize, but the goal is to compact TTLed files.
+   */
+  kRoundRobinTtl((byte) 0x12),
+
+  /**
+   * Compaction by calling DBImpl::ReFitLevel
+   */
+  kRefitLevel((byte) 0x13);
 
   private final byte value;