]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix bug when `DeleteRange()` used with `paranoid_file_checks == true`
authorAndrew Kryczka <andrewkr@fb.com>
Tue, 13 Oct 2020 16:40:12 +0000 (09:40 -0700)
committerAndrew Kryczka <andrewkr@fb.com>
Tue, 13 Oct 2020 16:40:12 +0000 (09:40 -0700)
Backports part of 750817555867a43f0e7b73dffa44756a9136c808.

The unit test comes from https://github.com/facebook/rocksdb/pull/7521.

HISTORY.md
db/compaction/compaction_job.cc
db/corruption_test.cc

index 54787ec9cd3185a096a8dfe4cdfb31fea168422f..df262b6ec6339fb88e548cbd2cae75cecbf3ebf7 100644 (file)
@@ -1,4 +1,8 @@
 # Rocksdb Change Log
+## Unreleased
+### Bug Fixes
+* Fix false positive flush/compaction `Status::Corruption` failure when `paranoid_file_checks == true` and range tombstones were written to the compaction output files.
+
 ## 6.13.1 (10/12/2020)
 ### Bug Fixes
 * Since 6.12, memtable lookup should report unrecognized value_type as corruption (#7121).
index 603a0c981972332307007c9501615d4db5aa3197..dd892dd8070a958ebfab467c531f6b8afcbdbdab 100644 (file)
@@ -1268,8 +1268,8 @@ Status CompactionJob::FinishCompactionOutputFile(
       auto kv = tombstone.Serialize();
       assert(lower_bound == nullptr ||
              ucmp->Compare(*lower_bound, kv.second) < 0);
-      sub_compact->AddToBuilder(kv.first.Encode(), kv.second,
-                                paranoid_file_checks_);
+      // Range tombstone is not supported by output validator yet.
+      sub_compact->builder->Add(kv.first.Encode(), kv.second);
       InternalKey smallest_candidate = std::move(kv.first);
       if (lower_bound != nullptr &&
           ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
index 1adff85b9726d5b082fb6d571e4169137bd97e02..faafc3e098a775c36e6ac9601a27303c673bfd36 100644 (file)
@@ -105,7 +105,7 @@ class CorruptionTest : public testing::Test {
     ASSERT_OK(::ROCKSDB_NAMESPACE::RepairDB(dbname_, options_));
   }
 
-  void Build(int n, int flush_every = 0) {
+  void Build(int n, int start, int flush_every) {
     std::string key_space, value_space;
     WriteBatch batch;
     for (int i = 0; i < n; i++) {
@@ -114,13 +114,15 @@ class CorruptionTest : public testing::Test {
         dbi->TEST_FlushMemTable();
       }
       //if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n);
-      Slice key = Key(i, &key_space);
+      Slice key = Key(i + start, &key_space);
       batch.Clear();
       batch.Put(key, Value(i, &value_space));
       ASSERT_OK(db_->Write(WriteOptions(), &batch));
     }
   }
 
+  void Build(int n, int flush_every = 0) { Build(n, 0, flush_every); }
+
   void Check(int min_expected, int max_expected) {
     uint64_t next_expected = 0;
     uint64_t missed = 0;
@@ -619,6 +621,102 @@ TEST_F(CorruptionTest, ParanoidFileChecksOnCompact) {
   }
 }
 
+TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeFirst) {
+  Options options;
+  options.paranoid_file_checks = true;
+  options.create_if_missing = true;
+  for (bool do_flush : {true, false}) {
+    delete db_;
+    db_ = nullptr;
+    ASSERT_OK(DestroyDB(dbname_, options));
+    ASSERT_OK(DB::Open(options, dbname_, &db_));
+    std::string start, end;
+    assert(db_ != nullptr);
+    ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
+                               Key(3, &start), Key(7, &end)));
+    auto snap = db_->GetSnapshot();
+    ASSERT_NE(snap, nullptr);
+    ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
+                               Key(8, &start), Key(9, &end)));
+    ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
+                               Key(2, &start), Key(5, &end)));
+    Build(10);
+    if (do_flush) {
+      ASSERT_OK(db_->Flush(FlushOptions()));
+    } else {
+      DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
+      ASSERT_OK(dbi->TEST_FlushMemTable());
+      ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
+    }
+    db_->ReleaseSnapshot(snap);
+  }
+}
+
+TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRange) {
+  Options options;
+  options.paranoid_file_checks = true;
+  options.create_if_missing = true;
+  for (bool do_flush : {true, false}) {
+    delete db_;
+    db_ = nullptr;
+    ASSERT_OK(DestroyDB(dbname_, options));
+    ASSERT_OK(DB::Open(options, dbname_, &db_));
+    assert(db_ != nullptr);
+    Build(10, 0, 0);
+    std::string start, end;
+    ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
+                               Key(5, &start), Key(15, &end)));
+    auto snap = db_->GetSnapshot();
+    ASSERT_NE(snap, nullptr);
+    ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
+                               Key(8, &start), Key(9, &end)));
+    ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
+                               Key(12, &start), Key(17, &end)));
+    ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
+                               Key(2, &start), Key(4, &end)));
+    Build(10, 10, 0);
+    if (do_flush) {
+      ASSERT_OK(db_->Flush(FlushOptions()));
+    } else {
+      DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
+      ASSERT_OK(dbi->TEST_FlushMemTable());
+      ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
+    }
+    db_->ReleaseSnapshot(snap);
+  }
+}
+
+TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeLast) {
+  Options options;
+  options.paranoid_file_checks = true;
+  options.create_if_missing = true;
+  for (bool do_flush : {true, false}) {
+    delete db_;
+    db_ = nullptr;
+    ASSERT_OK(DestroyDB(dbname_, options));
+    ASSERT_OK(DB::Open(options, dbname_, &db_));
+    assert(db_ != nullptr);
+    std::string start, end;
+    Build(10);
+    ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
+                               Key(3, &start), Key(7, &end)));
+    auto snap = db_->GetSnapshot();
+    ASSERT_NE(snap, nullptr);
+    ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
+                               Key(6, &start), Key(8, &end)));
+    ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
+                               Key(2, &start), Key(5, &end)));
+    if (do_flush) {
+      ASSERT_OK(db_->Flush(FlushOptions()));
+    } else {
+      DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
+      ASSERT_OK(dbi->TEST_FlushMemTable());
+      ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
+    }
+    db_->ReleaseSnapshot(snap);
+  }
+}
+
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {