]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Add WriteOptions::protection_bytes_per_key (#10037)
authorAndrew Kryczka <andrewkr@fb.com>
Fri, 17 Jun 2022 06:10:07 +0000 (23:10 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Fri, 17 Jun 2022 06:10:07 +0000 (23:10 -0700)
Summary:
Added an option, `WriteOptions::protection_bytes_per_key`, that controls how many bytes per key we use for integrity protection in `WriteBatch`. It takes effect when `WriteBatch::GetProtectionBytesPerKey() == 0`.

Currently the only supported value is eight. Invoking a user API with it set to any other nonzero value will result in `Status::NotSupported` returned to the user.

There is also a bug fix for integrity protection with `inplace_callback`, where we forgot to take into account the possible change in varint length when calculating KV checksum for the final encoded buffer.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10037

Test Plan:
- Manual
  - Set default value of `WriteOptions::protection_bytes_per_key` to eight and ran `make check -j24`
  - Enabled in MyShadow for 1+ week
- Automated
  - Unit tests have a `WriteMode` that enables the integrity protection via `WriteOptions`
  - Crash test - in most cases, use `WriteOptions::protection_bytes_per_key` to enable integrity protection

Reviewed By: cbi42

Differential Revision: D36614569

Pulled By: ajkr

fbshipit-source-id: 8650087ceac9b61b560f1e5fafe5e1baf9c725fb

22 files changed:
HISTORY.md
db/db_impl/db_impl_open.cc
db/db_impl/db_impl_write.cc
db/db_kv_checksum_test.cc
db/db_test.cc
db/memtable.cc
db/write_batch.cc
db/write_batch_internal.h
db/write_callback_test.cc
db_stress_tool/db_stress_test_base.cc
db_stress_tool/db_stress_tool.cc
include/rocksdb/options.h
include/rocksdb/utilities/write_batch_with_index.h
include/rocksdb/write_batch.h
tools/db_crashtest.py
utilities/transactions/pessimistic_transaction_db.h
utilities/transactions/transaction_base.cc
utilities/transactions/write_prepared_txn.cc
utilities/transactions/write_prepared_txn_db.cc
utilities/transactions/write_unprepared_txn.cc
utilities/transactions/write_unprepared_txn_db.cc
utilities/write_batch_with_index/write_batch_with_index.cc

index a455f19b47d9d0ab60801d93a3723ec3e55e6862..6405ec8a58a88150a9883ed28420c38c58bfdd62 100644 (file)
@@ -1,6 +1,7 @@
 # Rocksdb Change Log
 ## Unreleased
 ### Bug Fixes
+* Fixed a bug in calculating key-value integrity protection for users of in-place memtable updates. In particular, the affected users would be those who configure `protection_bytes_per_key > 0` on `WriteBatch` or `WriteOptions`, and configure `inplace_callback != nullptr`.
 * Fixed a bug where a snapshot taken during SST file ingestion would be unstable.
 * Fixed a bug for non-TransactionDB with avoid_flush_during_recovery = true and TransactionDB where in case of crash, min_log_number_to_keep may not change on recovery and persisting a new MANIFEST with advanced log_numbers for some column families, results in "column family inconsistency" error on second recovery. As a solution, RocksDB will persist the new MANIFEST after successfully syncing the new WAL. If a future recovery starts from the new MANIFEST, then it means the new WAL is successfully synced. Due to the sentinel empty write batch at the beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point. If future recovery starts from the old MANIFEST, it means the writing the new MANIFEST failed. We won't have the "SST ahead of WAL" error.
 * Fixed a bug where RocksDB DB::Open() may creates and writes to two new MANIFEST files even before recovery succeeds. Now writes to MANIFEST are persisted only after recovery is successful.
@@ -26,6 +27,7 @@
 * The contract for implementations of Comparator::IsSameLengthImmediateSuccessor has been updated to work around a design bug in `auto_prefix_mode`.
 * The API documentation for `auto_prefix_mode` now notes some corner cases in which it returns different results than `total_order_seek`, due to design bugs that are not easily fixed. Users using built-in comparators and keys at least the size of a fixed prefix length are not affected.
 * Obsoleted the NUM_DATA_BLOCKS_READ_PER_LEVEL stat and introduced the NUM_LEVEL_READ_PER_MULTIGET and MULTIGET_COROUTINE_COUNT stats
+* Introduced `WriteOptions::protection_bytes_per_key`, which can be used to enable key-value integrity protection for live updates.
 
 ### New Features
 * Add FileSystem::ReadAsync API in io_tracing
index 7c4852ab26359431a7d792efec29c94cbdc15daf..c71bd7429868e0152b7d720fa44e0916dc88012e 100644 (file)
@@ -1003,12 +1003,17 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
 
       // We create a new batch and initialize with a valid prot_info_ to store
       // the data checksums
-      WriteBatch batch(0, 0, 8, 0);
+      WriteBatch batch;
 
       status = WriteBatchInternal::SetContents(&batch, record);
       if (!status.ok()) {
         return status;
       }
+      status = WriteBatchInternal::UpdateProtectionInfo(&batch,
+                                                        8 /* bytes_per_key */);
+      if (!status.ok()) {
+        return status;
+      }
 
       SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
 
index 787006d35c8052c7413f88b715e7b3d0154d895d..09287a4ef8cbc0dae403db88584e3bbc344a1039 100644 (file)
@@ -106,15 +106,31 @@ void DBImpl::SetRecoverableStatePreReleaseCallback(
 }
 
 Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
-  return WriteImpl(write_options, my_batch, /*callback=*/nullptr,
-                   /*log_used=*/nullptr);
+  Status s;
+  if (write_options.protection_bytes_per_key > 0) {
+    s = WriteBatchInternal::UpdateProtectionInfo(
+        my_batch, write_options.protection_bytes_per_key);
+  }
+  if (s.ok()) {
+    s = WriteImpl(write_options, my_batch, /*callback=*/nullptr,
+                  /*log_used=*/nullptr);
+  }
+  return s;
 }
 
 #ifndef ROCKSDB_LITE
 Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
                                  WriteBatch* my_batch,
                                  WriteCallback* callback) {
-  return WriteImpl(write_options, my_batch, callback, nullptr);
+  Status s;
+  if (write_options.protection_bytes_per_key > 0) {
+    s = WriteBatchInternal::UpdateProtectionInfo(
+        my_batch, write_options.protection_bytes_per_key);
+  }
+  if (s.ok()) {
+    s = WriteImpl(write_options, my_batch, callback, nullptr);
+  }
+  return s;
 }
 #endif  // ROCKSDB_LITE
 
@@ -129,6 +145,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
                          PreReleaseCallback* pre_release_callback,
                          PostMemTableCallback* post_memtable_callback) {
   assert(!seq_per_batch_ || batch_cnt != 0);
+  assert(my_batch == nullptr || my_batch->Count() == 0 ||
+         write_options.protection_bytes_per_key == 0 ||
+         write_options.protection_bytes_per_key ==
+             my_batch->GetProtectionBytesPerKey());
   if (my_batch == nullptr) {
     return Status::InvalidArgument("Batch is nullptr!");
   } else if (!disable_memtable &&
@@ -156,6 +176,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
         "rate-limiting automatic WAL flush, which requires "
         "`WriteOptions::disableWAL` and "
         "`DBOptions::manual_wal_flush` both set to false");
+  } else if (write_options.protection_bytes_per_key != 0 &&
+             write_options.protection_bytes_per_key != 8) {
+    return Status::InvalidArgument(
+        "`WriteOptions::protection_bytes_per_key` must be zero or eight");
   }
   // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock
   // grabs but does not seem thread-safe.
@@ -2188,7 +2212,8 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
   // Pre-allocate size of write batch conservatively.
   // 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
   // and we allocate 11 extra bytes for key length, as well as value length.
-  WriteBatch batch(key.size() + value.size() + 24);
+  WriteBatch batch(key.size() + value.size() + 24, 0 /* max_bytes */,
+                   opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);
   Status s = batch.Put(column_family, key, value);
   if (!s.ok()) {
     return s;
@@ -2202,7 +2227,9 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
   assert(default_cf);
   const Comparator* const default_cf_ucmp = default_cf->GetComparator();
   assert(default_cf_ucmp);
-  WriteBatch batch(0, 0, 0, default_cf_ucmp->timestamp_size());
+  WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
+                   opt.protection_bytes_per_key,
+                   default_cf_ucmp->timestamp_size());
   Status s = batch.Put(column_family, key, ts, value);
   if (!s.ok()) {
     return s;
@@ -2212,7 +2239,8 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
 
 Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
                   const Slice& key) {
-  WriteBatch batch;
+  WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
+                   opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);
   Status s = batch.Delete(column_family, key);
   if (!s.ok()) {
     return s;
@@ -2226,7 +2254,9 @@ Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
   assert(default_cf);
   const Comparator* const default_cf_ucmp = default_cf->GetComparator();
   assert(default_cf_ucmp);
-  WriteBatch batch(0, 0, 0, default_cf_ucmp->timestamp_size());
+  WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
+                   opt.protection_bytes_per_key,
+                   default_cf_ucmp->timestamp_size());
   Status s = batch.Delete(column_family, key, ts);
   if (!s.ok()) {
     return s;
@@ -2236,7 +2266,8 @@ Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
 
 Status DB::SingleDelete(const WriteOptions& opt,
                         ColumnFamilyHandle* column_family, const Slice& key) {
-  WriteBatch batch;
+  WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
+                   opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);
   Status s = batch.SingleDelete(column_family, key);
   if (!s.ok()) {
     return s;
@@ -2251,7 +2282,9 @@ Status DB::SingleDelete(const WriteOptions& opt,
   assert(default_cf);
   const Comparator* const default_cf_ucmp = default_cf->GetComparator();
   assert(default_cf_ucmp);
-  WriteBatch batch(0, 0, 0, default_cf_ucmp->timestamp_size());
+  WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
+                   opt.protection_bytes_per_key,
+                   default_cf_ucmp->timestamp_size());
   Status s = batch.SingleDelete(column_family, key, ts);
   if (!s.ok()) {
     return s;
@@ -2262,7 +2295,8 @@ Status DB::SingleDelete(const WriteOptions& opt,
 Status DB::DeleteRange(const WriteOptions& opt,
                        ColumnFamilyHandle* column_family,
                        const Slice& begin_key, const Slice& end_key) {
-  WriteBatch batch;
+  WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
+                   opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);
   Status s = batch.DeleteRange(column_family, begin_key, end_key);
   if (!s.ok()) {
     return s;
@@ -2272,7 +2306,8 @@ Status DB::DeleteRange(const WriteOptions& opt,
 
 Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
                  const Slice& key, const Slice& value) {
-  WriteBatch batch;
+  WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
+                   opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);
   Status s = batch.Merge(column_family, key, value);
   if (!s.ok()) {
     return s;
index 5636c9e6ebac71f684b95d72fc641946b642d397..3bed9d784e081aded5cbd0fd87062e6bc5ea46f6 100644 (file)
@@ -15,7 +15,6 @@ enum class WriteBatchOpType {
   kSingleDelete,
   kDeleteRange,
   kMerge,
-  kBlobIndex,
   kNum,
 };
 
@@ -25,11 +24,28 @@ WriteBatchOpType operator+(WriteBatchOpType lhs, const int rhs) {
   return static_cast<WriteBatchOpType>(static_cast<T>(lhs) + rhs);
 }
 
+enum class WriteMode {
+  // `Write()` a `WriteBatch` constructed with `protection_bytes_per_key > 0`.
+  kWriteProtectedBatch = 0,
+  // `Write()` a `WriteBatch` constructed with `protection_bytes_per_key == 0`.
+  // Protection is enabled via `WriteOptions::protection_bytes_per_key > 0`.
+  kWriteUnprotectedBatch,
+  // TODO(ajkr): add a mode that uses `Write()` wrappers, e.g., `Put()`.
+  kNum,
+};
+
+// Integer addition is needed for `::testing::Range()` to take the enum type.
+WriteMode operator+(WriteMode lhs, const int rhs) {
+  using T = std::underlying_type<WriteMode>::type;
+  return static_cast<WriteMode>(static_cast<T>(lhs) + rhs);
+}
+
 std::pair<WriteBatch, Status> GetWriteBatch(ColumnFamilyHandle* cf_handle,
+                                            size_t protection_bytes_per_key,
                                             WriteBatchOpType op_type) {
   Status s;
   WriteBatch wb(0 /* reserved_bytes */, 0 /* max_bytes */,
-                8 /* protection_bytes_per_entry */, 0 /* default_cf_ts_sz */);
+                protection_bytes_per_key, 0 /* default_cf_ts_sz */);
   switch (op_type) {
     case WriteBatchOpType::kPut:
       s = wb.Put(cf_handle, "key", "val");
@@ -46,36 +62,44 @@ std::pair<WriteBatch, Status> GetWriteBatch(ColumnFamilyHandle* cf_handle,
     case WriteBatchOpType::kMerge:
       s = wb.Merge(cf_handle, "key", "val");
       break;
-    case WriteBatchOpType::kBlobIndex: {
-      // TODO(ajkr): use public API once available.
-      uint32_t cf_id;
-      if (cf_handle == nullptr) {
-        cf_id = 0;
-      } else {
-        cf_id = cf_handle->GetID();
-      }
-
-      std::string blob_index;
-      BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 9876543210,
-                                  "val");
-
-      s = WriteBatchInternal::PutBlobIndex(&wb, cf_id, "key", blob_index);
-      break;
-    }
     case WriteBatchOpType::kNum:
       assert(false);
   }
   return {std::move(wb), std::move(s)};
 }
 
-class DbKvChecksumTest
-    : public DBTestBase,
-      public ::testing::WithParamInterface<std::tuple<WriteBatchOpType, char>> {
+class DbKvChecksumTest : public DBTestBase,
+                         public ::testing::WithParamInterface<
+                             std::tuple<WriteBatchOpType, char, WriteMode>> {
  public:
   DbKvChecksumTest()
       : DBTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) {
     op_type_ = std::get<0>(GetParam());
     corrupt_byte_addend_ = std::get<1>(GetParam());
+    write_mode_ = std::get<2>(GetParam());
+  }
+
+  Status ExecuteWrite(ColumnFamilyHandle* cf_handle) {
+    switch (write_mode_) {
+      case WriteMode::kWriteProtectedBatch: {
+        auto batch_and_status = GetWriteBatch(
+            cf_handle, 8 /* protection_bytes_per_key */, op_type_);
+        assert(batch_and_status.second.ok());
+        return db_->Write(WriteOptions(), &batch_and_status.first);
+      }
+      case WriteMode::kWriteUnprotectedBatch: {
+        auto batch_and_status = GetWriteBatch(
+            cf_handle, 0 /* protection_bytes_per_key */, op_type_);
+        assert(batch_and_status.second.ok());
+        WriteOptions write_opts;
+        write_opts.protection_bytes_per_key = 8;
+        return db_->Write(write_opts, &batch_and_status.first);
+      }
+      case WriteMode::kNum:
+        assert(false);
+    }
+    return Status::NotSupported("WriteMode " +
+                                std::to_string(static_cast<int>(write_mode_)));
   }
 
   void CorruptNextByteCallBack(void* arg) {
@@ -96,6 +120,7 @@ class DbKvChecksumTest
  protected:
   WriteBatchOpType op_type_;
   char corrupt_byte_addend_;
+  WriteMode write_mode_;
   size_t corrupt_byte_offset_ = 0;
   size_t entry_len_ = std::numeric_limits<size_t>::max();
 };
@@ -114,9 +139,6 @@ std::string GetOpTypeString(const WriteBatchOpType& op_type) {
     case WriteBatchOpType::kMerge:
       return "Merge";
       break;
-    case WriteBatchOpType::kBlobIndex:
-      return "BlobIndex";
-      break;
     case WriteBatchOpType::kNum:
       assert(false);
   }
@@ -128,15 +150,31 @@ INSTANTIATE_TEST_CASE_P(
     DbKvChecksumTest, DbKvChecksumTest,
     ::testing::Combine(::testing::Range(static_cast<WriteBatchOpType>(0),
                                         WriteBatchOpType::kNum),
-                       ::testing::Values(2, 103, 251)),
-    [](const testing::TestParamInfo<std::tuple<WriteBatchOpType, char>>& args) {
+                       ::testing::Values(2, 103, 251),
+                       ::testing::Range(static_cast<WriteMode>(0),
+                                        WriteMode::kNum)),
+    [](const testing::TestParamInfo<
+        std::tuple<WriteBatchOpType, char, WriteMode>>& args) {
       std::ostringstream oss;
       oss << GetOpTypeString(std::get<0>(args.param)) << "Add"
           << static_cast<int>(
                  static_cast<unsigned char>(std::get<1>(args.param)));
+      switch (std::get<2>(args.param)) {
+        case WriteMode::kWriteProtectedBatch:
+          oss << "WriteProtectedBatch";
+          break;
+        case WriteMode::kWriteUnprotectedBatch:
+          oss << "WriteUnprotectedBatch";
+          break;
+        case WriteMode::kNum:
+          assert(false);
+      }
       return oss.str();
     });
 
+// TODO(ajkr): add a test that corrupts the `WriteBatch` contents. Such
+// corruptions should only be detectable in `WriteMode::kWriteProtectedBatch`.
+
 TEST_P(DbKvChecksumTest, MemTableAddCorrupted) {
   // This test repeatedly attempts to write `WriteBatch`es containing a single
   // entry of type `op_type_`. Each attempt has one byte corrupted in its
@@ -158,10 +196,7 @@ TEST_P(DbKvChecksumTest, MemTableAddCorrupted) {
     Reopen(options);
 
     SyncPoint::GetInstance()->EnableProcessing();
-    auto batch_and_status = GetWriteBatch(nullptr /* cf_handle */, op_type_);
-    ASSERT_OK(batch_and_status.second);
-    ASSERT_TRUE(
-        db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption());
+    ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption());
     SyncPoint::GetInstance()->DisableProcessing();
 
     // In case the above callback is not invoked, this test will run
@@ -194,10 +229,7 @@ TEST_P(DbKvChecksumTest, MemTableAddWithColumnFamilyCorrupted) {
     ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
 
     SyncPoint::GetInstance()->EnableProcessing();
-    auto batch_and_status = GetWriteBatch(handles_[1], op_type_);
-    ASSERT_OK(batch_and_status.second);
-    ASSERT_TRUE(
-        db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption());
+    ASSERT_TRUE(ExecuteWrite(handles_[1]).IsCorruption());
     SyncPoint::GetInstance()->DisableProcessing();
 
     // In case the above callback is not invoked, this test will run
@@ -209,7 +241,8 @@ TEST_P(DbKvChecksumTest, MemTableAddWithColumnFamilyCorrupted) {
 
 TEST_P(DbKvChecksumTest, NoCorruptionCase) {
   // If this test fails, we may have found a piece of malfunctioned hardware
-  auto batch_and_status = GetWriteBatch(nullptr, op_type_);
+  auto batch_and_status =
+      GetWriteBatch(nullptr, 8 /* protection_bytes_per_key */, op_type_);
   ASSERT_OK(batch_and_status.second);
   ASSERT_OK(batch_and_status.first.VerifyChecksum());
 }
@@ -238,10 +271,7 @@ TEST_P(DbKvChecksumTest, WriteToWALCorrupted) {
     auto log_size_pre_write = dbfull()->TEST_total_log_size();
 
     SyncPoint::GetInstance()->EnableProcessing();
-    auto batch_and_status = GetWriteBatch(nullptr /* cf_handle */, op_type_);
-    ASSERT_OK(batch_and_status.second);
-    ASSERT_TRUE(
-        db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption());
+    ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption());
     // Confirm that nothing was written to WAL
     ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size());
     ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
@@ -279,10 +309,7 @@ TEST_P(DbKvChecksumTest, WriteToWALWithColumnFamilyCorrupted) {
     auto log_size_pre_write = dbfull()->TEST_total_log_size();
 
     SyncPoint::GetInstance()->EnableProcessing();
-    auto batch_and_status = GetWriteBatch(handles_[1], op_type_);
-    ASSERT_OK(batch_and_status.second);
-    ASSERT_TRUE(
-        db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption());
+    ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption());
     // Confirm that nothing was written to WAL
     ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size());
     ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
@@ -322,9 +349,11 @@ void CorruptWriteBatch(Slice* content, size_t offset,
 
 TEST_P(DbKvChecksumTestMergedBatch, NoCorruptionCase) {
   // Veirfy write batch checksum after write batch append
-  auto batch1 = GetWriteBatch(nullptr /* cf_handle */, op_type1_);
+  auto batch1 = GetWriteBatch(nullptr /* cf_handle */,
+                              8 /* protection_bytes_per_key */, op_type1_);
   ASSERT_OK(batch1.second);
-  auto batch2 = GetWriteBatch(nullptr /* cf_handle */, op_type2_);
+  auto batch2 = GetWriteBatch(nullptr /* cf_handle */,
+                              8 /* protection_bytes_per_key */, op_type2_);
   ASSERT_OK(batch2.second);
   ASSERT_OK(WriteBatchInternal::Append(&batch1.first, &batch2.first));
   ASSERT_OK(batch1.first.VerifyChecksum());
@@ -345,11 +374,11 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) {
     options.merge_operator = MergeOperators::CreateStringAppendOperator();
   }
 
-  auto leader_batch_and_status =
-      GetWriteBatch(nullptr /* cf_handle */, op_type1_);
+  auto leader_batch_and_status = GetWriteBatch(
+      nullptr /* cf_handle */, 8 /* protection_bytes_per_key */, op_type1_);
   ASSERT_OK(leader_batch_and_status.second);
-  auto follower_batch_and_status =
-      GetWriteBatch(nullptr /* cf_handle */, op_type2_);
+  auto follower_batch_and_status = GetWriteBatch(
+      nullptr /* cf_handle */, 8 /* protection_bytes_per_key */, op_type2_);
   size_t leader_batch_size = leader_batch_and_status.first.GetDataSize();
   size_t total_bytes =
       leader_batch_size + follower_batch_and_status.first.GetDataSize();
@@ -390,7 +419,8 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) {
         // follower
         follower_thread = port::Thread([&]() {
           follower_batch_and_status =
-              GetWriteBatch(nullptr /* cf_handle */, op_type2_);
+              GetWriteBatch(nullptr /* cf_handle */,
+                            8 /* protection_bytes_per_key */, op_type2_);
           ASSERT_OK(follower_batch_and_status.second);
           ASSERT_TRUE(
               db_->Write(WriteOptions(), &follower_batch_and_status.first)
@@ -413,7 +443,8 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) {
     Reopen(options);
     SyncPoint::GetInstance()->EnableProcessing();
     auto log_size_pre_write = dbfull()->TEST_total_log_size();
-    leader_batch_and_status = GetWriteBatch(nullptr /* cf_handle */, op_type1_);
+    leader_batch_and_status = GetWriteBatch(
+        nullptr /* cf_handle */, 8 /* protection_bytes_per_key */, op_type1_);
     ASSERT_OK(leader_batch_and_status.second);
     ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first)
                     .IsCorruption());
@@ -452,9 +483,11 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) {
   }
   CreateAndReopenWithCF({"ramen"}, options);
 
-  auto leader_batch_and_status = GetWriteBatch(handles_[1], op_type1_);
+  auto leader_batch_and_status =
+      GetWriteBatch(handles_[1], 8 /* protection_bytes_per_key */, op_type1_);
   ASSERT_OK(leader_batch_and_status.second);
-  auto follower_batch_and_status = GetWriteBatch(handles_[1], op_type2_);
+  auto follower_batch_and_status =
+      GetWriteBatch(handles_[1], 8 /* protection_bytes_per_key */, op_type2_);
   size_t leader_batch_size = leader_batch_and_status.first.GetDataSize();
   size_t total_bytes =
       leader_batch_size + follower_batch_and_status.first.GetDataSize();
@@ -494,7 +527,8 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) {
         // Start the other writer thread which will join the write group as
         // follower
         follower_thread = port::Thread([&]() {
-          follower_batch_and_status = GetWriteBatch(handles_[1], op_type2_);
+          follower_batch_and_status = GetWriteBatch(
+              handles_[1], 8 /* protection_bytes_per_key */, op_type2_);
           ASSERT_OK(follower_batch_and_status.second);
           ASSERT_TRUE(
               db_->Write(WriteOptions(), &follower_batch_and_status.first)
@@ -518,7 +552,8 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) {
     ReopenWithColumnFamilies({kDefaultColumnFamilyName, "ramen"}, options);
     SyncPoint::GetInstance()->EnableProcessing();
     auto log_size_pre_write = dbfull()->TEST_total_log_size();
-    leader_batch_and_status = GetWriteBatch(handles_[1], op_type1_);
+    leader_batch_and_status =
+        GetWriteBatch(handles_[1], 8 /* protection_bytes_per_key */, op_type1_);
     ASSERT_OK(leader_batch_and_status.second);
     ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first)
                     .IsCorruption());
index 3fb685680a63fb800803bdcd9249578f2725e038..a47e8fdb4832e6b8418c70c97fc77f0820058e12 100644 (file)
@@ -4271,7 +4271,9 @@ TEST_F(DBTest, ConcurrentFlushWAL) {
         threads.emplace_back([&] {
           for (size_t i = cnt; i < 2 * cnt; i++) {
             auto istr = std::to_string(i);
-            WriteBatch batch;
+            WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
+                             wopt.protection_bytes_per_key,
+                             0 /* default_cf_ts_sz */);
             ASSERT_OK(batch.Put("a" + istr, "b" + istr));
             ASSERT_OK(
                 dbfull()->WriteImpl(wopt, &batch, nullptr, nullptr, 0, true));
index 4b609cae7e730ca8db62e0338447f7055e0f494f..2f86d0758672d93156cceb556601100a12c16f19 100644 (file)
@@ -1159,6 +1159,7 @@ Status MemTable::UpdateCallback(SequenceNumber seq, const Slice& key,
               if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
                 // shift the value buffer as well.
                 memcpy(p, prev_buffer, new_prev_size);
+                prev_buffer = p;
               }
             }
             RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
index b919ea056d67f943c2d225ca4dee9d161f1fe86e..7a863c67fe1e86c50248051ffc456bef42aeba9d 100644 (file)
@@ -2844,16 +2844,10 @@ class ProtectionInfoUpdater : public WriteBatch::Handler {
 
 Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
   assert(contents.size() >= WriteBatchInternal::kHeader);
+  assert(b->prot_info_ == nullptr);
 
   b->rep_.assign(contents.data(), contents.size());
   b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
-
-  // If we have a prot_info_, update protection info entries for the batch.
-  if (b->prot_info_) {
-    ProtectionInfoUpdater prot_info_updater(b->prot_info_.get());
-    return b->Iterate(&prot_info_updater);
-  }
-
   return Status::OK();
 }
 
@@ -2910,4 +2904,28 @@ size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
   }
 }
 
+Status WriteBatchInternal::UpdateProtectionInfo(WriteBatch* wb,
+                                                size_t bytes_per_key) {
+  if (bytes_per_key == 0) {
+    if (wb->prot_info_ != nullptr) {
+      wb->prot_info_.reset();
+      return Status::OK();
+    } else {
+      // Already not protected.
+      return Status::OK();
+    }
+  } else if (bytes_per_key == 8) {
+    if (wb->prot_info_ == nullptr) {
+      wb->prot_info_.reset(new WriteBatch::ProtectionInfo());
+      ProtectionInfoUpdater prot_info_updater(wb->prot_info_.get());
+      return wb->Iterate(&prot_info_updater);
+    } else {
+      // Already protected.
+      return Status::OK();
+    }
+  }
+  return Status::NotSupported(
+      "WriteBatch protection info must be zero or eight bytes/key");
+}
+
 }  // namespace ROCKSDB_NAMESPACE
index 926acc63a0776e50684f38c27f29ad075035f5bb..53e83a23e749de8ba3d40d96a25def3451ad1b3e 100644 (file)
@@ -236,6 +236,8 @@ class WriteBatchInternal {
   static bool HasKeyWithTimestamp(const WriteBatch& wb) {
     return wb.has_key_with_ts_;
   }
+
+  static Status UpdateProtectionInfo(WriteBatch* wb, size_t bytes_per_key);
 };
 
 // LocalSavePoint is similar to a scope guard
index 1dc659aad388d8f4ed8737a76287c0aa4af9163e..423b2a2aa357c436a02f13e64c935ade0f5a1519 100644 (file)
@@ -307,6 +307,10 @@ TEST_P(WriteCallbackPTest, WriteWithCallbackTest) {
       WriteOptions woptions;
       woptions.disableWAL = !enable_WAL_;
       woptions.sync = enable_WAL_;
+      if (woptions.protection_bytes_per_key > 0) {
+        ASSERT_OK(WriteBatchInternal::UpdateProtectionInfo(
+            &write_op.write_batch_, woptions.protection_bytes_per_key));
+      }
       Status s;
       if (seq_per_batch_) {
         class PublishSeqCallback : public PreReleaseCallback {
index 466cef209f446f0731d24bebea9b07caf89e171e..91036dc22c72389880ed8e23205951c564bea548 100644 (file)
@@ -617,6 +617,7 @@ void StressTest::OperateDb(ThreadState* thread) {
     write_opts.sync = true;
   }
   write_opts.disableWAL = FLAGS_disable_wal;
+  write_opts.protection_bytes_per_key = FLAGS_batch_protection_bytes_per_key;
   const int prefix_bound = static_cast<int>(FLAGS_readpercent) +
                            static_cast<int>(FLAGS_prefixpercent);
   const int write_bound = prefix_bound + static_cast<int>(FLAGS_writepercent);
index 1729ee3f7cc38b61e4613c9b938fc616f6651752..3e8490cccaf32a8fec095cbb6563165ebbaf3932 100644 (file)
@@ -265,13 +265,6 @@ int db_stress_tool(int argc, char** argv) {
         "test_batches_snapshots  must all be 0 when using compaction filter\n");
     exit(1);
   }
-  if (FLAGS_batch_protection_bytes_per_key > 0 &&
-      !FLAGS_test_batches_snapshots) {
-    fprintf(stderr,
-            "Error: test_batches_snapshots must be enabled when "
-            "batch_protection_bytes_per_key > 0\n");
-    exit(1);
-  }
   if (FLAGS_test_multi_ops_txns) {
     CheckAndSetOptionsForMultiOpsTxnStressTest();
   }
index 19bc3cb190a5c2e5cde69e7ef8ea3b5c8b663549..72a2f7de11d5bfcd217d6f41bf020e851bd5baee 100644 (file)
@@ -1737,6 +1737,13 @@ struct WriteOptions {
   // Default: `Env::IO_TOTAL`
   Env::IOPriority rate_limiter_priority;
 
+  // `protection_bytes_per_key` is the number of bytes used to store
+  // protection information for each key entry. Currently supported values are
+  // zero (disabled) and eight.
+  //
+  // Default: zero (disabled).
+  size_t protection_bytes_per_key;
+
   WriteOptions()
       : sync(false),
         disableWAL(false),
@@ -1744,7 +1751,8 @@ struct WriteOptions {
         no_slowdown(false),
         low_pri(false),
         memtable_insert_hint_per_batch(false),
-        rate_limiter_priority(Env::IO_TOTAL) {}
+        rate_limiter_priority(Env::IO_TOTAL),
+        protection_bytes_per_key(0) {}
 };
 
 // Options that control flush operations
index 90174abafd6e1c6d8fdca6a7edae9504da3c0e2e..21974e67a1203875782fe4ae46c610c46a715e88 100644 (file)
@@ -98,7 +98,7 @@ class WriteBatchWithIndex : public WriteBatchBase {
   explicit WriteBatchWithIndex(
       const Comparator* backup_index_comparator = BytewiseComparator(),
       size_t reserved_bytes = 0, bool overwrite_key = false,
-      size_t max_bytes = 0);
+      size_t max_bytes = 0, size_t protection_bytes_per_key = 0);
 
   ~WriteBatchWithIndex() override;
   WriteBatchWithIndex(WriteBatchWithIndex&&);
index d8bd108ea01ac3d780f488c124338c413c8fb047..f4838272be8abc50f1740ed206b40172a99ead48 100644 (file)
@@ -419,9 +419,6 @@ class WriteBatch : public WriteBatchBase {
   struct ProtectionInfo;
   size_t GetProtectionBytesPerKey() const;
 
-  // Clears prot_info_ if there are no entries.
-  void ClearProtectionInfoIfEmpty();
-
  private:
   friend class WriteBatchInternal;
   friend class LocalSavePoint;
index cea8232d5e896394d61b2f3657e119a9a35bd7dd..579885fd582a135d45946cd2c6ebdb39358b82f5 100644 (file)
@@ -514,8 +514,6 @@ def finalize_and_sanitize(src_params):
         dest_params["readpercent"] += dest_params.get("prefixpercent", 20)
         dest_params["prefixpercent"] = 0
         dest_params["test_batches_snapshots"] = 0
-    if dest_params.get("test_batches_snapshots") == 0:
-        dest_params["batch_protection_bytes_per_key"] = 0
     if (dest_params.get("prefix_size") == -1 and
         dest_params.get("memtable_whole_key_filtering") == 0):
         dest_params["memtable_prefix_bloom_size_ratio"] = 0
index 68b6227ef375dc1db6fcfcacea54d42afdef72c5..755b94a757595594732a06b60a681a73ba5ff5a7 100644 (file)
@@ -71,20 +71,27 @@ class PessimisticTransactionDB : public TransactionDB {
   virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
   inline Status WriteWithConcurrencyControl(const WriteOptions& opts,
                                             WriteBatch* updates) {
-    // Need to lock all keys in this batch to prevent write conflicts with
-    // concurrent transactions.
-    Transaction* txn = BeginInternalTransaction(opts);
-    txn->DisableIndexing();
-
-    auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
-
-    // Since commitBatch sorts the keys before locking, concurrent Write()
-    // operations will not cause a deadlock.
-    // In order to avoid a deadlock with a concurrent Transaction, Transactions
-    // should use a lock timeout.
-    Status s = txn_impl->CommitBatch(updates);
-
-    delete txn;
+    Status s;
+    if (opts.protection_bytes_per_key > 0) {
+      s = WriteBatchInternal::UpdateProtectionInfo(
+          updates, opts.protection_bytes_per_key);
+    }
+    if (s.ok()) {
+      // Need to lock all keys in this batch to prevent write conflicts with
+      // concurrent transactions.
+      Transaction* txn = BeginInternalTransaction(opts);
+      txn->DisableIndexing();
+
+      auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
+
+      // Since commitBatch sorts the keys before locking, concurrent Write()
+      // operations will not cause a deadlock.
+      // In order to avoid a deadlock with a concurrent Transaction,
+      // Transactions should use a lock timeout.
+      s = txn_impl->CommitBatch(updates);
+
+      delete txn;
+    }
 
     return s;
   }
index 53d54abfb980a053ec0437e63fd8aa9f0eb8e091..c98cfcbf270f4f9ec3153e991a44da3013a7919b 100644 (file)
@@ -67,8 +67,11 @@ TransactionBaseImpl::TransactionBaseImpl(
       cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
       lock_tracker_factory_(lock_tracker_factory),
       start_time_(dbimpl_->GetSystemClock()->NowMicros()),
-      write_batch_(cmp_, 0, true, 0),
+      write_batch_(cmp_, 0, true, 0, write_options.protection_bytes_per_key),
       tracked_locks_(lock_tracker_factory_.Create()),
+      commit_time_batch_(0 /* reserved_bytes */, 0 /* max_bytes */,
+                         write_options.protection_bytes_per_key,
+                         0 /* default_cf_ts_sz */),
       indexing_enabled_(true) {
   assert(dynamic_cast<DBImpl*>(db_) != nullptr);
   log_number_ = 0;
@@ -108,6 +111,12 @@ void TransactionBaseImpl::Reinitialize(DB* db,
   start_time_ = dbimpl_->GetSystemClock()->NowMicros();
   indexing_enabled_ = true;
   cmp_ = GetColumnFamilyUserComparator(db_->DefaultColumnFamily());
+  WriteBatchInternal::UpdateProtectionInfo(
+      write_batch_.GetWriteBatch(), write_options_.protection_bytes_per_key)
+      .PermitUncheckedError();
+  WriteBatchInternal::UpdateProtectionInfo(
+      &commit_time_batch_, write_options_.protection_bytes_per_key)
+      .PermitUncheckedError();
 }
 
 void TransactionBaseImpl::SetSnapshot() {
index ce297535419dda0fe7dec9008f71bd56f946b602..1133f903af05171d7cd8981512aa5b0303051cf6 100644 (file)
@@ -267,7 +267,9 @@ Status WritePreparedTxn::RollbackInternal() {
   assert(db_impl_);
   assert(wpt_db_);
 
-  WriteBatch rollback_batch;
+  WriteBatch rollback_batch(0 /* reserved_bytes */, 0 /* max_bytes */,
+                            write_options_.protection_bytes_per_key,
+                            0 /* default_cf_ts_sz */);
   assert(GetId() != kMaxSequenceNumber);
   assert(GetId() > 0);
   auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap();
index d70d9591df65b1d0d54ec2fadd347a5a01fd171c..c6661479a932ac7da5465e613907270e32b929e9 100644 (file)
@@ -166,6 +166,15 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
     // increased for this batch.
     return Status::OK();
   }
+
+  if (write_options_orig.protection_bytes_per_key > 0) {
+    auto s = WriteBatchInternal::UpdateProtectionInfo(
+        batch, write_options_orig.protection_bytes_per_key);
+    if (!s.ok()) {
+      return s;
+    }
+  }
+
   if (batch_cnt == 0) {  // not provided, then compute it
     // TODO(myabandeh): add an option to allow user skipping this cost
     SubBatchCounter counter(*GetCFComparatorMap());
index 2e375d54eb23291f5aa89e446db728f6eb903151..6e04d33442cda6e9d985df1d1c26c087aa0f9b1e 100644 (file)
@@ -464,7 +464,7 @@ Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() {
   // only used if the write batch encounters an invalid cf id, and falls back to
   // this comparator.
   WriteBatchWithIndex wb(wpt_db_->DefaultColumnFamily()->GetComparator(), 0,
-                         true, 0);
+                         true, 0, write_options_.protection_bytes_per_key);
   // Swap with write_batch_ so that wb contains the complete write batch. The
   // actual write batch that will be flushed to DB will be built in
   // write_batch_, and will be read by FlushWriteBatchToDBInternal.
@@ -722,7 +722,8 @@ Status WriteUnpreparedTxn::WriteRollbackKeys(
 Status WriteUnpreparedTxn::RollbackInternal() {
   // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
   WriteBatchWithIndex rollback_batch(
-      wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0);
+      wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0,
+      write_options_.protection_bytes_per_key);
   assert(GetId() != kMaxSequenceNumber);
   assert(GetId() > 0);
   Status s;
index 0ef96d0a45382a6f7391de2fc225a2930cdaa270..72a21755a944d67be79af5fc8ca18c41f600714e 100644 (file)
@@ -59,7 +59,9 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
   for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); ++it) {
     auto last_visible_txn = it->first - 1;
     const auto& batch = it->second.batch_;
-    WriteBatch rollback_batch;
+    WriteBatch rollback_batch(0 /* reserved_bytes */, 0 /* max_bytes */,
+                              w_options.protection_bytes_per_key,
+                              0 /* default_cf_ts_sz */);
 
     struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
       DBImpl* db_;
index af13d901ae627ada849bfbfbed19e63d858cfe52..9f65216f7d61f4554cbb4fe584b397dd92f934cd 100644 (file)
@@ -25,8 +25,9 @@
 namespace ROCKSDB_NAMESPACE {
 struct WriteBatchWithIndex::Rep {
   explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
-               size_t max_bytes = 0, bool _overwrite_key = false)
-      : write_batch(reserved_bytes, max_bytes, /*protection_bytes_per_key=*/0,
+               size_t max_bytes = 0, bool _overwrite_key = false,
+               size_t protection_bytes_per_key = 0)
+      : write_batch(reserved_bytes, max_bytes, protection_bytes_per_key,
                     index_comparator ? index_comparator->timestamp_size() : 0),
         comparator(index_comparator, &write_batch),
         skip_list(comparator, &arena),
@@ -262,9 +263,9 @@ Status WriteBatchWithIndex::Rep::ReBuildIndex() {
 
 WriteBatchWithIndex::WriteBatchWithIndex(
     const Comparator* default_index_comparator, size_t reserved_bytes,
-    bool overwrite_key, size_t max_bytes)
+    bool overwrite_key, size_t max_bytes, size_t protection_bytes_per_key)
     : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes,
-                  overwrite_key)) {}
+                  overwrite_key, protection_bytes_per_key)) {}
 
 WriteBatchWithIndex::~WriteBatchWithIndex() {}