}
}
+ // Clear the timestamps for returning results so that we can distinguish
+ // between tombstone or key that has never been written
+ if (get_impl_options.timestamp) {
+ get_impl_options.timestamp->clear();
+ }
+
GetWithTimestampReadCallback read_cb(0); // Will call Refresh
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET);
+ assert(sorted_keys);
+ // Clear the timestamps for returning results so that we can distinguish
+ // between tombstone or key that has never been written
+ for (auto* kctx : *sorted_keys) {
+ assert(kctx);
+ if (kctx->timestamp) {
+ kctx->timestamp->clear();
+ }
+ }
+
// For each of the given keys, apply the entire "get" process as follows:
// First look in the memtable, then in the immutable memtable (if any).
// s is both in/out. When in, s could either be OK or MergeInProgress.
*timestamp != std::string(ts_sz, '\xff')) ||
(*seq == kMaxSequenceNumber && timestamp->empty()));
+ TEST_SYNC_POINT_CALLBACK("DBImpl::GetLatestSequenceForKey:mem", timestamp);
+
if (*seq != kMaxSequenceNumber) {
// Found a sequence number, no need to check immutable memtables
*found_record_for_key = true;
(*seq != kMaxSequenceNumber &&
*timestamp != std::string(ts_sz, '\xff')) ||
(*seq == kMaxSequenceNumber && timestamp->empty()));
+
if (*seq != kMaxSequenceNumber) {
// Found a sequence number, no need to check SST files
assert(0 == ts_sz || *timestamp != std::string(ts_sz, '\xff'));
ASSERT_OK(s);
ASSERT_EQ("v1", value);
+ std::string key_ts;
+ ASSERT_TRUE(db_->Get(ropts, "k3", &value, &key_ts).IsNotFound());
+ ASSERT_EQ(Timestamp(2, 0), key_ts);
+
Close();
}
options.comparator = &test_cmp;
DestroyAndReopen(options);
auto check_value_by_ts = [](DB* db, Slice key, std::string readTs,
- Status status, std::string checkValue) {
+ Status status, std::string checkValue,
+ std::string expected_ts) {
ReadOptions ropts;
Slice ts = readTs;
ropts.timestamp = &ts;
std::string value;
- Status s = db->Get(ropts, key, &value);
+ std::string key_ts;
+ Status s = db->Get(ropts, key, &value, &key_ts);
ASSERT_TRUE(s == status);
if (s.ok()) {
ASSERT_EQ(checkValue, value);
}
+ if (s.ok() || s.IsNotFound()) {
+ ASSERT_EQ(expected_ts, key_ts);
+ }
};
// Construct data of different versions with different ts
ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(2, 0), "v1"));
ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(4, 0), "v2"));
ASSERT_OK(db_->Delete(WriteOptions(), "k1", Timestamp(5, 0)));
ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(6, 0), "v3"));
- check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v3");
+ check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v3",
+ Timestamp(6, 0));
ASSERT_OK(Flush());
Close();
// Trim data whose version > Timestamp(5, 0), read(k1, ts(7)) <- NOT_FOUND.
ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
&handles_, &db_, Timestamp(5, 0)));
- check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::NotFound(), "");
+ check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::NotFound(), "",
+ Timestamp(5, 0));
Close();
// Trim data whose timestamp > Timestamp(4, 0), read(k1, ts(7)) <- v2
ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
&handles_, &db_, Timestamp(4, 0)));
- check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v2");
+ check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v2",
+ Timestamp(4, 0));
Close();
}
std::vector<Slice> keys(batch_size);
std::vector<PinnableSlice> values(batch_size);
std::vector<Status> statuses(batch_size);
+ std::vector<std::string> timestamps(batch_size);
keys[0] = "foo";
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
- statuses.data());
+ timestamps.data(), statuses.data(), true);
ASSERT_OK(statuses[0]);
+ ASSERT_EQ(Timestamp(1, 0), timestamps[0]);
+ for (auto& elem : values) {
+ elem.Reset();
+ }
+
+ ASSERT_OK(db_->SingleDelete(WriteOptions(), "foo", Timestamp(2, 0)));
+ ts = Timestamp(3, 0);
+ read_ts = ts;
+ read_opts.timestamp = &read_ts;
+ db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
+ timestamps.data(), statuses.data(), true);
+ ASSERT_TRUE(statuses[0].IsNotFound());
+ ASSERT_EQ(Timestamp(2, 0), timestamps[0]);
+
Close();
}
std::vector<Slice> keys(batch_size);
std::vector<PinnableSlice> values(batch_size);
std::vector<Status> statuses(batch_size);
+ std::vector<std::string> timestamps(batch_size);
keys[0] = "foo";
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
- statuses.data());
+ timestamps.data(), statuses.data(), true);
ASSERT_OK(statuses[0]);
+ ASSERT_EQ(Timestamp(1, 0), timestamps[0]);
+ for (auto& elem : values) {
+ elem.Reset();
+ }
+
+ ASSERT_OK(db_->SingleDelete(WriteOptions(), "foo", Timestamp(2, 0)));
+ // TODO re-enable after fixing a bug of kHashSearch
+ if (GetParam() != BlockBasedTableOptions::IndexType::kHashSearch) {
+ ASSERT_OK(Flush());
+ }
+
+ ts = Timestamp(3, 0);
+ read_ts = ts;
+ db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
+ timestamps.data(), statuses.data(), true);
+ ASSERT_TRUE(statuses[0].IsNotFound());
+ ASSERT_EQ(Timestamp(2, 0), timestamps[0]);
+
Close();
}
ts = Timestamp(3, 0);
read_ts = ts;
read_opts.timestamp = &read_ts;
- s = db_->Get(read_opts, "a", &value);
+ std::string key_ts;
+ s = db_->Get(read_opts, "a", &value, &key_ts);
ASSERT_TRUE(s.IsNotFound());
+ ASSERT_EQ(Timestamp(3, 0), key_ts);
// Time-travel to the past before deletion
ts = Timestamp(2, 0);
std::unique_ptr<Iterator> it1(db_->NewIterator(read_opts));
ASSERT_NE(nullptr, it1);
ASSERT_OK(it1->status());
- // TODO(zjay) Fix seek with prefix
- // it1->Seek(keys[i]);
- // ASSERT_TRUE(it1->Valid());
+ it1->Seek(keys[i]);
+ ASSERT_TRUE(it1->Valid());
}
for (int i = 2; i < 4; i++) {
ropts.timestamp = &ts;
for (uint64_t i = 0; i != static_cast<uint64_t>(kNumKeysPerFile); ++i) {
std::string value;
- Status s = db_->Get(ropts, Key1(i), &value);
+ std::string key_ts;
+ Status s = db_->Get(ropts, Key1(i), &value, &key_ts);
if ((i % 3) == 2) {
ASSERT_OK(s);
ASSERT_EQ("new_value_2", value);
+ ASSERT_EQ(Timestamp(5, 0), key_ts);
+ } else if ((i % 3) == 1) {
+ ASSERT_TRUE(s.IsNotFound());
+ ASSERT_EQ(Timestamp(5, 0), key_ts);
} else {
ASSERT_TRUE(s.IsNotFound());
+ ASSERT_EQ(Timestamp(3, 0), key_ts);
}
}
}
txn0.reset();
}
+TEST_P(WriteCommittedTxnWithTsTest, CheckKeysForConflicts) {
+ options.comparator = test::BytewiseComparatorWithU64TsWrapper();
+ ASSERT_OK(ReOpen());
+
+ std::unique_ptr<Transaction> txn1(
+ db->BeginTransaction(WriteOptions(), TransactionOptions()));
+ assert(txn1);
+
+ std::unique_ptr<Transaction> txn2(
+ db->BeginTransaction(WriteOptions(), TransactionOptions()));
+ assert(txn2);
+ ASSERT_OK(txn2->Put("foo", "v0"));
+ ASSERT_OK(txn2->SetCommitTimestamp(10));
+ ASSERT_OK(txn2->Commit());
+ txn2.reset();
+
+ // txn1 takes a snapshot after txn2 commits. The writes of txn2 have
+ // a smaller seqno than txn1's snapshot, thus should not affect conflict
+ // checking.
+ txn1->SetSnapshot();
+
+ std::unique_ptr<Transaction> txn3(
+ db->BeginTransaction(WriteOptions(), TransactionOptions()));
+ assert(txn3);
+ ASSERT_OK(txn3->SetReadTimestampForValidation(20));
+ std::string dontcare;
+ ASSERT_OK(txn3->GetForUpdate(ReadOptions(), "foo", &dontcare));
+ ASSERT_OK(txn3->SingleDelete("foo"));
+ ASSERT_OK(txn3->SetName("txn3"));
+ ASSERT_OK(txn3->Prepare());
+ ASSERT_OK(txn3->SetCommitTimestamp(30));
+ // txn3 reads at ts=20 > txn2's commit timestamp, and commits at ts=30.
+ // txn3 can commit successfully, leaving a tombstone with ts=30.
+ ASSERT_OK(txn3->Commit());
+ txn3.reset();
+
+ bool called = false;
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::GetLatestSequenceForKey:mem", [&](void* arg) {
+ auto* const ts_ptr = reinterpret_cast<std::string*>(arg);
+ assert(ts_ptr);
+ Slice ts_slc = *ts_ptr;
+ uint64_t last_ts = 0;
+ ASSERT_TRUE(GetFixed64(&ts_slc, &last_ts));
+ ASSERT_EQ(30, last_ts);
+ called = true;
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ // txn1's read timestamp is 25 < 30 (commit timestamp of txn3). Therefore,
+ // the tombstone written by txn3 causes the conflict checking to fail.
+ ASSERT_OK(txn1->SetReadTimestampForValidation(25));
+ ASSERT_TRUE(txn1->GetForUpdate(ReadOptions(), "foo", &dontcare).IsBusy());
+ ASSERT_TRUE(called);
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {