]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Point-lookup returns timestamps of Delete and SingleDelete (#10056)
authorYanqin Jin <yanqin@fb.com>
Sat, 4 Jun 2022 03:00:42 +0000 (20:00 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Sat, 4 Jun 2022 03:00:42 +0000 (20:00 -0700)
Summary:
If caller specifies a non-null `timestamp` argument in `DB::Get()` or a non-null `timestamps` in `DB::MultiGet()`,
RocksDB will return the timestamps of the point tombstones.

Note: DeleteRange is still unsupported.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10056

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D36677956

Pulled By: riversand963

fbshipit-source-id: 2d7af02cc7237b1829cd269086ea895a49d501ae

db/db_impl/compacted_db_impl.cc
db/db_impl/db_impl.cc
db/db_impl/db_impl_readonly.cc
db/db_impl/db_impl_secondary.cc
db/db_with_timestamp_basic_test.cc
db/memtable.cc
table/get_context.cc
table/multiget_context.h
utilities/transactions/write_committed_transaction_ts_test.cc

index ccb366c5333c6bfd0b655a3e2df3f0157269c629..482a2e1f6af8b8a289b00c493b13853b656cb2cc 100644 (file)
@@ -60,6 +60,13 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
       return s;
     }
   }
+
+  // Clear the timestamps for returning results so that we can distinguish
+  // between tombstone or key that has never been written
+  if (timestamp) {
+    timestamp->clear();
+  }
+
   GetWithTimestampReadCallback read_cb(kMaxSequenceNumber);
   std::string* ts =
       user_comparator_->timestamp_size() > 0 ? timestamp : nullptr;
@@ -114,6 +121,14 @@ std::vector<Status> CompactedDBImpl::MultiGet(
     }
   }
 
+  // Clear the timestamps for returning results so that we can distinguish
+  // between tombstone or key that has never been written
+  if (timestamps) {
+    for (auto& ts : *timestamps) {
+      ts.clear();
+    }
+  }
+
   GetWithTimestampReadCallback read_cb(kMaxSequenceNumber);
   autovector<TableReader*, 16> reader_list;
   for (const auto& key : keys) {
index f82f20f33a3a9c903fea73c7e21ae2b9981cc71d..2faa22ada81f735295c241633b5b9fdba7157bc7 100644 (file)
@@ -1749,6 +1749,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
     }
   }
 
+  // Clear the timestamps for returning results so that we can distinguish
+  // between tombstone or key that has never been written
+  if (get_impl_options.timestamp) {
+    get_impl_options.timestamp->clear();
+  }
+
   GetWithTimestampReadCallback read_cb(0);  // Will call Refresh
 
   PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
@@ -2566,6 +2572,16 @@ Status DBImpl::MultiGetImpl(
   PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
   StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET);
 
+  assert(sorted_keys);
+  // Clear the timestamps for returning results so that we can distinguish
+  // between tombstone or key that has never been written
+  for (auto* kctx : *sorted_keys) {
+    assert(kctx);
+    if (kctx->timestamp) {
+      kctx->timestamp->clear();
+    }
+  }
+
   // For each of the given keys, apply the entire "get" process as follows:
   // First look in the memtable, then in the immutable memtable (if any).
   // s is both in/out. When in, s could either be OK or MergeInProgress.
@@ -4517,6 +4533,8 @@ Status DBImpl::GetLatestSequenceForKey(
           *timestamp != std::string(ts_sz, '\xff')) ||
          (*seq == kMaxSequenceNumber && timestamp->empty()));
 
+  TEST_SYNC_POINT_CALLBACK("DBImpl::GetLatestSequenceForKey:mem", timestamp);
+
   if (*seq != kMaxSequenceNumber) {
     // Found a sequence number, no need to check immutable memtables
     *found_record_for_key = true;
@@ -4581,6 +4599,7 @@ Status DBImpl::GetLatestSequenceForKey(
          (*seq != kMaxSequenceNumber &&
           *timestamp != std::string(ts_sz, '\xff')) ||
          (*seq == kMaxSequenceNumber && timestamp->empty()));
+
   if (*seq != kMaxSequenceNumber) {
     // Found a sequence number, no need to check SST files
     assert(0 == ts_sz || *timestamp != std::string(ts_sz, '\xff'));
index b014f75ce61cea9518542596be3ce9ac763b9caa..5be89bce2406435e349c3b2ba2d04756a7905da2 100644 (file)
@@ -58,6 +58,13 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
       return s;
     }
   }
+
+  // Clear the timestamps for returning results so that we can distinguish
+  // between tombstone or key that has never been written
+  if (timestamp) {
+    timestamp->clear();
+  }
+
   const Comparator* ucmp = column_family->GetComparator();
   assert(ucmp);
   std::string* ts = ucmp->timestamp_size() > 0 ? timestamp : nullptr;
index 1c56ccd82b7d4ea21ccb7c77d9d94384fe4a3e1f..bec1fe2f7598988625d868c533807d4f86596cdd 100644 (file)
@@ -361,6 +361,12 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
     }
   }
 
+  // Clear the timestamp for returning results so that we can distinguish
+  // between tombstone or key that has never been written later.
+  if (timestamp) {
+    timestamp->clear();
+  }
+
   auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family);
   ColumnFamilyData* cfd = cfh->cfd();
   if (tracer_) {
index 009dbbc108e3be5b581fbe88e26b125ab973857a..473cf0ce7b9d92a4de90d796290b969780b4c30a 100644 (file)
@@ -230,6 +230,10 @@ TEST_F(DBBasicTestWithTimestamp, GcPreserveLatestVersionBelowFullHistoryLow) {
   ASSERT_OK(s);
   ASSERT_EQ("v1", value);
 
+  std::string key_ts;
+  ASSERT_TRUE(db_->Get(ropts, "k3", &value, &key_ts).IsNotFound());
+  ASSERT_EQ(Timestamp(2, 0), key_ts);
+
   Close();
 }
 
@@ -543,23 +547,29 @@ TEST_F(DBBasicTestWithTimestamp, TrimHistoryTest) {
   options.comparator = &test_cmp;
   DestroyAndReopen(options);
   auto check_value_by_ts = [](DB* db, Slice key, std::string readTs,
-                              Status status, std::string checkValue) {
+                              Status status, std::string checkValue,
+                              std::string expected_ts) {
     ReadOptions ropts;
     Slice ts = readTs;
     ropts.timestamp = &ts;
     std::string value;
-    Status s = db->Get(ropts, key, &value);
+    std::string key_ts;
+    Status s = db->Get(ropts, key, &value, &key_ts);
     ASSERT_TRUE(s == status);
     if (s.ok()) {
       ASSERT_EQ(checkValue, value);
     }
+    if (s.ok() || s.IsNotFound()) {
+      ASSERT_EQ(expected_ts, key_ts);
+    }
   };
   // Construct data of different versions with different ts
   ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(2, 0), "v1"));
   ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(4, 0), "v2"));
   ASSERT_OK(db_->Delete(WriteOptions(), "k1", Timestamp(5, 0)));
   ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(6, 0), "v3"));
-  check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v3");
+  check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v3",
+                    Timestamp(6, 0));
   ASSERT_OK(Flush());
   Close();
 
@@ -572,13 +582,15 @@ TEST_F(DBBasicTestWithTimestamp, TrimHistoryTest) {
   // Trim data whose version > Timestamp(5, 0), read(k1, ts(7)) <- NOT_FOUND.
   ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
                                    &handles_, &db_, Timestamp(5, 0)));
-  check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::NotFound(), "");
+  check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::NotFound(), "",
+                    Timestamp(5, 0));
   Close();
 
   // Trim data whose timestamp > Timestamp(4, 0), read(k1, ts(7)) <- v2
   ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
                                    &handles_, &db_, Timestamp(4, 0)));
-  check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v2");
+  check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v2",
+                    Timestamp(4, 0));
   Close();
 }
 
@@ -1210,12 +1222,27 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetWithFastLocalBloom) {
   std::vector<Slice> keys(batch_size);
   std::vector<PinnableSlice> values(batch_size);
   std::vector<Status> statuses(batch_size);
+  std::vector<std::string> timestamps(batch_size);
   keys[0] = "foo";
   ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
   db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
-                statuses.data());
+                timestamps.data(), statuses.data(), true);
 
   ASSERT_OK(statuses[0]);
+  ASSERT_EQ(Timestamp(1, 0), timestamps[0]);
+  for (auto& elem : values) {
+    elem.Reset();
+  }
+
+  ASSERT_OK(db_->SingleDelete(WriteOptions(), "foo", Timestamp(2, 0)));
+  ts = Timestamp(3, 0);
+  read_ts = ts;
+  read_opts.timestamp = &read_ts;
+  db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
+                timestamps.data(), statuses.data(), true);
+  ASSERT_TRUE(statuses[0].IsNotFound());
+  ASSERT_EQ(Timestamp(2, 0), timestamps[0]);
+
   Close();
 }
 
@@ -1251,12 +1278,31 @@ TEST_P(DBBasicTestWithTimestampTableOptions, MultiGetWithPrefix) {
   std::vector<Slice> keys(batch_size);
   std::vector<PinnableSlice> values(batch_size);
   std::vector<Status> statuses(batch_size);
+  std::vector<std::string> timestamps(batch_size);
   keys[0] = "foo";
   ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
   db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
-                statuses.data());
+                timestamps.data(), statuses.data(), true);
 
   ASSERT_OK(statuses[0]);
+  ASSERT_EQ(Timestamp(1, 0), timestamps[0]);
+  for (auto& elem : values) {
+    elem.Reset();
+  }
+
+  ASSERT_OK(db_->SingleDelete(WriteOptions(), "foo", Timestamp(2, 0)));
+  // TODO re-enable after fixing a bug of kHashSearch
+  if (GetParam() != BlockBasedTableOptions::IndexType::kHashSearch) {
+    ASSERT_OK(Flush());
+  }
+
+  ts = Timestamp(3, 0);
+  read_ts = ts;
+  db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
+                timestamps.data(), statuses.data(), true);
+  ASSERT_TRUE(statuses[0].IsNotFound());
+  ASSERT_EQ(Timestamp(2, 0), timestamps[0]);
+
   Close();
 }
 
@@ -1507,8 +1553,10 @@ TEST_F(DBBasicTestWithTimestamp, CompactDeletionWithTimestampMarkerToBottom) {
   ts = Timestamp(3, 0);
   read_ts = ts;
   read_opts.timestamp = &read_ts;
-  s = db_->Get(read_opts, "a", &value);
+  std::string key_ts;
+  s = db_->Get(read_opts, "a", &value, &key_ts);
   ASSERT_TRUE(s.IsNotFound());
+  ASSERT_EQ(Timestamp(3, 0), key_ts);
 
   // Time-travel to the past before deletion
   ts = Timestamp(2, 0);
@@ -1613,9 +1661,8 @@ TEST_P(DBBasicTestWithTimestampFilterPrefixSettings, GetAndMultiGet) {
       std::unique_ptr<Iterator> it1(db_->NewIterator(read_opts));
       ASSERT_NE(nullptr, it1);
       ASSERT_OK(it1->status());
-      // TODO(zjay) Fix seek with prefix
-      // it1->Seek(keys[i]);
-      // ASSERT_TRUE(it1->Valid());
+      it1->Seek(keys[i]);
+      ASSERT_TRUE(it1->Valid());
     }
 
     for (int i = 2; i < 4; i++) {
@@ -2400,12 +2447,18 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutDeleteGet) {
     ropts.timestamp = &ts;
     for (uint64_t i = 0; i != static_cast<uint64_t>(kNumKeysPerFile); ++i) {
       std::string value;
-      Status s = db_->Get(ropts, Key1(i), &value);
+      std::string key_ts;
+      Status s = db_->Get(ropts, Key1(i), &value, &key_ts);
       if ((i % 3) == 2) {
         ASSERT_OK(s);
         ASSERT_EQ("new_value_2", value);
+        ASSERT_EQ(Timestamp(5, 0), key_ts);
+      } else if ((i % 3) == 1) {
+        ASSERT_TRUE(s.IsNotFound());
+        ASSERT_EQ(Timestamp(5, 0), key_ts);
       } else {
         ASSERT_TRUE(s.IsNotFound());
+        ASSERT_EQ(Timestamp(3, 0), key_ts);
       }
     }
   }
index 6a4d2e127cdcb09d36aaa3cd5cfd02d5a3d30e4d..4b609cae7e730ca8db62e0338447f7055e0f494f 100644 (file)
@@ -813,6 +813,10 @@ static bool SaveValue(void* arg, const char* entry) {
           }
         } else {
           *(s->status) = Status::NotFound();
+          if (ts_sz > 0 && s->timestamp != nullptr) {
+            Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
+            s->timestamp->assign(ts.data(), ts.size());
+          }
         }
         *(s->found_final_value) = true;
         return false;
index c61f82b1aaba6ad62c63562288ea0dee27a630f6..a8163f19d3ef3b82d958acd59214a86d504d0cef 100644 (file)
@@ -334,6 +334,11 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
         assert(state_ == kNotFound || state_ == kMerge);
         if (kNotFound == state_) {
           state_ = kDeleted;
+          size_t ts_sz = ucmp_->timestamp_size();
+          if (ts_sz > 0 && timestamp_ != nullptr) {
+            Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
+            timestamp_->assign(ts.data(), ts.size());
+          }
         } else if (kMerge == state_) {
           state_ = kFound;
           Merge(nullptr);
index ca29816f572316af1de8982273b348533ebac4bd..188681480da7b05ad3c26316949af96727f391f2 100644 (file)
@@ -137,6 +137,9 @@ class MultiGetContext {
           sorted_keys_[iter]->lkey->user_key(),
           read_opts.timestamp == nullptr ? 0 : read_opts.timestamp->size());
       sorted_keys_[iter]->ikey = sorted_keys_[iter]->lkey->internal_key();
+      sorted_keys_[iter]->timestamp = (*sorted_keys)[begin + iter]->timestamp;
+      sorted_keys_[iter]->get_context =
+          (*sorted_keys)[begin + iter]->get_context;
     }
   }
 
index 980289846c4d594cf80103b1838af4cce414d728..0c1f659c3343bc705bb0a8273bad4f33fb813044 100644 (file)
@@ -498,6 +498,67 @@ TEST_P(WriteCommittedTxnWithTsTest, RefineReadTimestamp) {
   txn0.reset();
 }
 
+TEST_P(WriteCommittedTxnWithTsTest, CheckKeysForConflicts) {
+  options.comparator = test::BytewiseComparatorWithU64TsWrapper();
+  ASSERT_OK(ReOpen());
+
+  std::unique_ptr<Transaction> txn1(
+      db->BeginTransaction(WriteOptions(), TransactionOptions()));
+  assert(txn1);
+
+  std::unique_ptr<Transaction> txn2(
+      db->BeginTransaction(WriteOptions(), TransactionOptions()));
+  assert(txn2);
+  ASSERT_OK(txn2->Put("foo", "v0"));
+  ASSERT_OK(txn2->SetCommitTimestamp(10));
+  ASSERT_OK(txn2->Commit());
+  txn2.reset();
+
+  // txn1 takes a snapshot after txn2 commits. The writes of txn2 have
+  // a smaller seqno than txn1's snapshot, thus should not affect conflict
+  // checking.
+  txn1->SetSnapshot();
+
+  std::unique_ptr<Transaction> txn3(
+      db->BeginTransaction(WriteOptions(), TransactionOptions()));
+  assert(txn3);
+  ASSERT_OK(txn3->SetReadTimestampForValidation(20));
+  std::string dontcare;
+  ASSERT_OK(txn3->GetForUpdate(ReadOptions(), "foo", &dontcare));
+  ASSERT_OK(txn3->SingleDelete("foo"));
+  ASSERT_OK(txn3->SetName("txn3"));
+  ASSERT_OK(txn3->Prepare());
+  ASSERT_OK(txn3->SetCommitTimestamp(30));
+  // txn3 reads at ts=20 > txn2's commit timestamp, and commits at ts=30.
+  // txn3 can commit successfully, leaving a tombstone with ts=30.
+  ASSERT_OK(txn3->Commit());
+  txn3.reset();
+
+  bool called = false;
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+  SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::GetLatestSequenceForKey:mem", [&](void* arg) {
+        auto* const ts_ptr = reinterpret_cast<std::string*>(arg);
+        assert(ts_ptr);
+        Slice ts_slc = *ts_ptr;
+        uint64_t last_ts = 0;
+        ASSERT_TRUE(GetFixed64(&ts_slc, &last_ts));
+        ASSERT_EQ(30, last_ts);
+        called = true;
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  // txn1's read timestamp is 25 < 30 (commit timestamp of txn3). Therefore,
+  // the tombstone written by txn3 causes the conflict checking to fail.
+  ASSERT_OK(txn1->SetReadTimestampForValidation(25));
+  ASSERT_TRUE(txn1->GetForUpdate(ReadOptions(), "foo", &dontcare).IsBusy());
+  ASSERT_TRUE(called);
+
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {