]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix bug caused by releasing snapshot(s) during compaction (#8608)
authorYanqin Jin <yanqin@fb.com>
Wed, 18 Aug 2021 05:13:21 +0000 (22:13 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Wed, 18 Aug 2021 05:14:20 +0000 (22:14 -0700)
Summary:
In debug mode, we are seeing assertion failure as follows

```
db/compaction/compaction_iterator.cc:980: void rocksdb::CompactionIterator::PrepareOutput(): \
Assertion `ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion' failed.
```

It is caused by releasing earliest snapshot during compaction between the execution of
`NextFromInput()` and `PrepareOutput()`.

In one case, as demonstrated in unit test `WritePreparedTransaction.ReleaseEarliestSnapshotDuringCompaction_WithSD2`,
incorrect result may be returned by a following range scan if we disable assertion, as in opt compilation
level: the SingleDelete marker's sequence number is zeroed out, but the preceding PUT is also
outputted to the SST file after compaction. Due to the logic of DBIter, the PUT will not be
skipped and will be returned by iterator in range scan. https://github.com/facebook/rocksdb/issues/8661 illustrates what happened.

Fix by taking a more conservative approach: make compaction zero out sequence number only
if key is in the earliest snapshot when the compaction starts.

Another assertion failure is
```
Assertion `current_user_key_snapshot_ == last_snapshot' failed.
```

It's caused by releasing the snapshot between the PUT and SingleDelete during compaction.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8608

Test Plan: make check

Reviewed By: jay-zhuang

Differential Revision: D30145645

Pulled By: riversand963

fbshipit-source-id: 699f58e66faf70732ad53810ccef43935d3bbe81

HISTORY.md
db/compaction/compaction_iterator.cc
db/compaction/compaction_iterator.h
db/compaction/compaction_iterator_test.cc
utilities/transactions/transaction_test.h
utilities/transactions/write_prepared_transaction_test.cc

index 03a36ba7d1a5bf54aa24f13e4540aaa99c6e5ce6..4de77d6f7aa9be78f9fd0c1ec879079dde837610 100644 (file)
@@ -7,6 +7,7 @@
 * Removed a call to `RenameFile()` on a non-existent info log file ("LOG") when opening a new DB. Such a call was guaranteed to fail though did not impact applications since we swallowed the error. Now we also stopped swallowing errors in renaming "LOG" file.
 * Fixed an issue where `OnFlushCompleted` was not called for atomic flush.
 * Fixed a bug affecting the batched `MultiGet` API when used with keys spanning multiple column families and `sorted_input == false`.
+* Fixed a potential incorrect result in opt mode and assertion failures caused by releasing snapshot(s) during compaction.
 
 ### New Features
 * Made the EventListener extend the Customizable class.
index c84f03d8aeb43a8f09051f6ee6b6c9dda0af28aa..3647af1d76bb142a877cda2e0938d16bd77ca755 100644 (file)
@@ -495,11 +495,11 @@ void CompactionIterator::NextFromInput() {
                         "Unexpected key type %d for compaction output",
                         ikey_.type);
       }
-      assert(current_user_key_snapshot_ == last_snapshot);
-      if (current_user_key_snapshot_ != last_snapshot) {
+      assert(current_user_key_snapshot_ >= last_snapshot);
+      if (current_user_key_snapshot_ < last_snapshot) {
         ROCKS_LOG_FATAL(info_log_,
                         "current_user_key_snapshot_ (%" PRIu64
-                        ") != last_snapshot (%" PRIu64 ")",
+                        ") < last_snapshot (%" PRIu64 ")",
                         current_user_key_snapshot_, last_snapshot);
       }
 
@@ -555,10 +555,20 @@ void CompactionIterator::NextFromInput() {
           ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
               .ok() &&
           cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
+#ifndef NDEBUG
+        const Compaction* c =
+            compaction_ ? compaction_->real_compaction() : nullptr;
+#endif
+        TEST_SYNC_POINT_CALLBACK(
+            "CompactionIterator::NextFromInput:SingleDelete:1",
+            const_cast<Compaction*>(c));
+
         // Check whether the next key belongs to the same snapshot as the
         // SingleDelete.
         if (prev_snapshot == 0 ||
             DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) {
+          TEST_SYNC_POINT_CALLBACK(
+              "CompactionIterator::NextFromInput:SingleDelete:2", nullptr);
           if (next_ikey.type == kTypeSingleDeletion) {
             // We encountered two SingleDeletes in a row.  This could be due to
             // unexpected user input.
@@ -604,6 +614,9 @@ void CompactionIterator::NextFromInput() {
             // Set up the Put to be outputted in the next iteration.
             // (Optimization 3).
             clear_and_output_next_key_ = true;
+            TEST_SYNC_POINT_CALLBACK(
+                "CompactionIterator::NextFromInput:KeepSDForWW",
+                /*arg=*/nullptr);
           }
         } else {
           // We hit the next snapshot without hitting a put, so the iterator
@@ -619,7 +632,8 @@ void CompactionIterator::NextFromInput() {
         // iteration. If the next key is corrupt, we return before the
         // comparison, so the value of has_current_user_key does not matter.
         has_current_user_key_ = false;
-        if (compaction_ != nullptr && InEarliestSnapshot(ikey_.sequence) &&
+        if (compaction_ != nullptr &&
+            DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
             compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
                                                        &level_ptrs_)) {
           // Key doesn't exist outside of this range.
@@ -663,7 +677,7 @@ void CompactionIterator::NextFromInput() {
                (ikey_.type == kTypeDeletion ||
                 (ikey_.type == kTypeDeletionWithTimestamp &&
                  cmp_with_history_ts_low_ < 0)) &&
-               InEarliestSnapshot(ikey_.sequence) &&
+               DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
                ikeyNotNeededForIncrementalSnapshot() &&
                compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
                                                           &level_ptrs_)) {
@@ -706,6 +720,13 @@ void CompactionIterator::NextFromInput() {
                                  ikey_.user_key, &level_ptrs_));
       ParsedInternalKey next_ikey;
       AdvanceInputIter();
+#ifndef NDEBUG
+      const Compaction* c =
+          compaction_ ? compaction_->real_compaction() : nullptr;
+#endif
+      TEST_SYNC_POINT_CALLBACK(
+          "CompactionIterator::NextFromInput:BottommostDelete:1",
+          const_cast<Compaction*>(c));
       // Skip over all versions of this key that happen to occur in the same
       // snapshot range as the delete.
       //
@@ -964,7 +985,8 @@ void CompactionIterator::PrepareOutput() {
     if (valid_ && compaction_ != nullptr &&
         !compaction_->allow_ingest_behind() &&
         ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ &&
-        InEarliestSnapshot(ikey_.sequence) && ikey_.type != kTypeMerge) {
+        DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
+        ikey_.type != kTypeMerge) {
       assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
       if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
         ROCKS_LOG_FATAL(info_log_,
@@ -1040,7 +1062,7 @@ inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() {
          (ikey_.sequence < preserve_deletes_seqnum_);
 }
 
-bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) {
+bool CompactionIterator::IsInCurrentEarliestSnapshot(SequenceNumber sequence) {
   assert(snapshot_checker_ != nullptr);
   bool pre_condition = (earliest_snapshot_ == kMaxSequenceNumber ||
                         (earliest_snapshot_iter_ != snapshots_->end() &&
index 9680fdc0dc2c55eb4e9ade2139de4e6c6fdc6afa..f550c8e3657edecb610a8eaeef0433f951f9c304 100644 (file)
@@ -99,6 +99,8 @@ class CompactionIterator {
     virtual Version* input_version() const = 0;
 
     virtual bool DoesInputReferenceBlobFiles() const = 0;
+
+    virtual const Compaction* real_compaction() const = 0;
   };
 
   class RealCompaction : public CompactionProxy {
@@ -152,6 +154,8 @@ class CompactionIterator {
       return compaction_->DoesInputReferenceBlobFiles();
     }
 
+    const Compaction* real_compaction() const override { return compaction_; }
+
    private:
     const Compaction* compaction_;
   };
@@ -267,13 +271,13 @@ class CompactionIterator {
                SnapshotCheckerResult::kInSnapshot;
   }
 
-  bool IsInEarliestSnapshot(SequenceNumber sequence);
+  bool IsInCurrentEarliestSnapshot(SequenceNumber sequence);
 
   bool DefinitelyInSnapshot(SequenceNumber seq, SequenceNumber snapshot);
 
   bool DefinitelyNotInSnapshot(SequenceNumber seq, SequenceNumber snapshot);
 
-  bool InEarliestSnapshot(SequenceNumber seq);
+  bool InCurrentEarliestSnapshot(SequenceNumber seq);
 
   // Extract user-defined timestamp from user key if possible and compare it
   // with *full_history_ts_low_ if applicable.
@@ -435,9 +439,10 @@ inline bool CompactionIterator::DefinitelyNotInSnapshot(
                     SnapshotCheckerResult::kNotInSnapshot)));
 }
 
-inline bool CompactionIterator::InEarliestSnapshot(SequenceNumber seq) {
+inline bool CompactionIterator::InCurrentEarliestSnapshot(SequenceNumber seq) {
   return ((seq) <= earliest_snapshot_ &&
-          (snapshot_checker_ == nullptr || LIKELY(IsInEarliestSnapshot(seq))));
+          (snapshot_checker_ == nullptr ||
+           LIKELY(IsInCurrentEarliestSnapshot(seq))));
 }
 
 }  // namespace ROCKSDB_NAMESPACE
index 168fb45b1c2c8ce3dfba31fb05844161636c0edb..e167866196d1551edad5c8329f6b9373e0a60365 100644 (file)
@@ -184,6 +184,8 @@ class FakeCompaction : public CompactionIterator::CompactionProxy {
 
   bool DoesInputReferenceBlobFiles() const override { return false; }
 
+  const Compaction* real_compaction() const override { return nullptr; }
+
   bool key_not_exists_beyond_output_level = false;
 
   bool is_bottommost_level = false;
index 492137cb81196282c08129919cea69bd86af216f..ecc8ea1f9fa97268a5be747cb62269559611260a 100644 (file)
@@ -95,8 +95,12 @@ class TransactionTestBase : public ::testing::Test {
     // seems to be a bug in btrfs that the makes readdir return recently
     // unlink-ed files. By using the default fs we simply ignore errors resulted
     // from attempting to delete such files in DestroyDB.
-    options.env = Env::Default();
-    EXPECT_OK(DestroyDB(dbname, options));
+    if (getenv("KEEP_DB") == nullptr) {
+      options.env = Env::Default();
+      EXPECT_OK(DestroyDB(dbname, options));
+    } else {
+      fprintf(stdout, "db is still in %s\n", dbname.c_str());
+    }
     delete env;
   }
 
index fd30c1491067e3d18d01a9ee4759f593c167f96e..f5d77611a6b5f14266cea2f0fd152aa6e2ab2908 100644 (file)
@@ -2758,6 +2758,7 @@ TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) {
   ASSERT_OK(ReOpen());
 
   ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
+  SequenceNumber put_seq = db->GetLatestSequenceNumber();
   auto* transaction =
       db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
   ASSERT_OK(transaction->SetName("txn"));
@@ -2799,11 +2800,202 @@ TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) {
   // Since the delete tombstone is not visible to snapshot2, we need to keep
   // at least one version of the key, for write-conflict check.
   VerifyInternalKeys({{"key1", "", del_seq, kTypeDeletion},
-                      {"key1", "value1", 0, kTypeValue}});
+                      {"key1", "value1", put_seq, kTypeValue}});
   db->ReleaseSnapshot(snapshot2);
   SyncPoint::GetInstance()->ClearAllCallBacks();
 }
 
+TEST_P(WritePreparedTransactionTest,
+       ReleaseEarliestSnapshotDuringCompaction_WithSD) {
+  constexpr size_t kSnapshotCacheBits = 7;  // same as default
+  constexpr size_t kCommitCacheBits = 0;    // minimum commit cache
+  UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
+  options.disable_auto_compactions = true;
+  ASSERT_OK(ReOpen());
+
+  ASSERT_OK(db->Put(WriteOptions(), "key", "value"));
+  ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
+  ASSERT_OK(db->Flush(FlushOptions()));
+
+  auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
+                                   /*old_txn=*/nullptr);
+  ASSERT_OK(txn->SingleDelete("key"));
+  ASSERT_OK(txn->Put("wow", "value"));
+  ASSERT_OK(txn->SetName("txn"));
+  ASSERT_OK(txn->Prepare());
+  ASSERT_OK(db->Flush(FlushOptions()));
+
+  const bool two_write_queues = std::get<1>(GetParam());
+  if (two_write_queues) {
+    // In the case of two queues, commit another txn just to bump
+    // last_published_seq so that a subsequent GetSnapshot() call can return
+    // a snapshot with higher sequence.
+    auto* dummy_txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
+                                           /*old_txn=*/nullptr);
+    ASSERT_OK(dummy_txn->Put("haha", "value"));
+    ASSERT_OK(dummy_txn->Commit());
+    delete dummy_txn;
+  }
+  auto* snapshot = db->GetSnapshot();
+
+  ASSERT_OK(txn->Commit());
+  delete txn;
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* arg) {
+        if (!arg) {
+          return;
+        }
+        db->ReleaseSnapshot(snapshot);
+
+        // Advance max_evicted_seq
+        ASSERT_OK(db->Put(WriteOptions(), "bar", "value"));
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+  ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
+                             /*end=*/nullptr));
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_P(WritePreparedTransactionTest,
+       ReleaseEarliestSnapshotDuringCompaction_WithSD2) {
+  constexpr size_t kSnapshotCacheBits = 7;  // same as default
+  constexpr size_t kCommitCacheBits = 0;    // minimum commit cache
+  UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
+  options.disable_auto_compactions = true;
+  ASSERT_OK(ReOpen());
+
+  ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
+  ASSERT_OK(db->Put(WriteOptions(), "key", "value"));
+  ASSERT_OK(db->Flush(FlushOptions()));
+
+  auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
+                                   /*old_txn=*/nullptr);
+  ASSERT_OK(txn->Put("bar", "value"));
+  ASSERT_OK(txn->SingleDelete("key"));
+  ASSERT_OK(txn->SetName("txn"));
+  ASSERT_OK(txn->Prepare());
+  ASSERT_OK(db->Flush(FlushOptions()));
+
+  ASSERT_OK(txn->Commit());
+  delete txn;
+
+  ASSERT_OK(db->Put(WriteOptions(), "haha", "value"));
+
+  // Create a dummy transaction to take a snapshot for ww-conflict detection.
+  TransactionOptions txn_opts;
+  txn_opts.set_snapshot = true;
+  auto* dummy_txn =
+      db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr);
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "CompactionIterator::NextFromInput:SingleDelete:2", [&](void* /*arg*/) {
+        ASSERT_OK(dummy_txn->Rollback());
+        delete dummy_txn;
+
+        ASSERT_OK(db->Put(WriteOptions(), "dontcare", "value"));
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_OK(db->Put(WriteOptions(), "haha2", "value"));
+  auto* snapshot = db->GetSnapshot();
+
+  ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+  db->ReleaseSnapshot(snapshot);
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_P(WritePreparedTransactionTest,
+       ReleaseEarliestSnapshotDuringCompaction_WithDelete) {
+  constexpr size_t kSnapshotCacheBits = 7;  // same as default
+  constexpr size_t kCommitCacheBits = 0;    // minimum commit cache
+  UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
+  options.disable_auto_compactions = true;
+  ASSERT_OK(ReOpen());
+
+  ASSERT_OK(db->Put(WriteOptions(), "a", "value"));
+  ASSERT_OK(db->Put(WriteOptions(), "b", "value"));
+  ASSERT_OK(db->Put(WriteOptions(), "c", "value"));
+  ASSERT_OK(db->Flush(FlushOptions()));
+
+  auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
+                                   /*old_txn=*/nullptr);
+  ASSERT_OK(txn->Delete("b"));
+  ASSERT_OK(txn->SetName("txn"));
+  ASSERT_OK(txn->Prepare());
+
+  const bool two_write_queues = std::get<1>(GetParam());
+  if (two_write_queues) {
+    // In the case of two queues, commit another txn just to bump
+    // last_published_seq so that a subsequent GetSnapshot() call can return
+    // a snapshot with higher sequence.
+    auto* dummy_txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
+                                           /*old_txn=*/nullptr);
+    ASSERT_OK(dummy_txn->Put("haha", "value"));
+    ASSERT_OK(dummy_txn->Commit());
+    delete dummy_txn;
+  }
+  auto* snapshot1 = db->GetSnapshot();
+  ASSERT_OK(txn->Commit());
+  delete txn;
+  auto* snapshot2 = db->GetSnapshot();
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "CompactionIterator::NextFromInput:BottommostDelete:1", [&](void* arg) {
+        if (!arg) {
+          return;
+        }
+        db->ReleaseSnapshot(snapshot1);
+
+        // Advance max_evicted_seq
+        ASSERT_OK(db->Put(WriteOptions(), "dummy1", "value"));
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
+                             /*end=*/nullptr));
+  db->ReleaseSnapshot(snapshot2);
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_P(WritePreparedTransactionTest,
+       ReleaseSnapshotBetweenSDAndPutDuringCompaction) {
+  constexpr size_t kSnapshotCacheBits = 7;  // same as default
+  constexpr size_t kCommitCacheBits = 0;    // minimum commit cache
+  UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
+  options.disable_auto_compactions = true;
+  ASSERT_OK(ReOpen());
+
+  // Create a dummy transaction to take a snapshot for ww-conflict detection.
+  TransactionOptions txn_opts;
+  txn_opts.set_snapshot = true;
+  auto* dummy_txn =
+      db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr);
+  // Increment seq
+  ASSERT_OK(db->Put(WriteOptions(), "bar", "value"));
+
+  ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
+  ASSERT_OK(db->SingleDelete(WriteOptions(), "foo"));
+  auto* snapshot1 = db->GetSnapshot();
+  // Increment seq
+  ASSERT_OK(db->Put(WriteOptions(), "dontcare", "value"));
+  auto* snapshot2 = db->GetSnapshot();
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "CompactionIterator::NextFromInput:KeepSDForWW", [&](void* /*arg*/) {
+        db->ReleaseSnapshot(snapshot1);
+
+        ASSERT_OK(db->Put(WriteOptions(), "dontcare2", "value2"));
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_OK(db->Flush(FlushOptions()));
+  db->ReleaseSnapshot(snapshot2);
+  ASSERT_OK(dummy_txn->Commit());
+  delete dummy_txn;
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
 // A more complex test to verify compaction/flush should keep keys visible
 // to snapshots.
 TEST_P(WritePreparedTransactionTest,