"Unexpected key type %d for compaction output",
ikey_.type);
}
- assert(current_user_key_snapshot_ == last_snapshot);
- if (current_user_key_snapshot_ != last_snapshot) {
+ assert(current_user_key_snapshot_ >= last_snapshot);
+ if (current_user_key_snapshot_ < last_snapshot) {
ROCKS_LOG_FATAL(info_log_,
"current_user_key_snapshot_ (%" PRIu64
- ") != last_snapshot (%" PRIu64 ")",
+ ") < last_snapshot (%" PRIu64 ")",
current_user_key_snapshot_, last_snapshot);
}
ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
.ok() &&
cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
+#ifndef NDEBUG
+ const Compaction* c =
+ compaction_ ? compaction_->real_compaction() : nullptr;
+#endif
+ TEST_SYNC_POINT_CALLBACK(
+ "CompactionIterator::NextFromInput:SingleDelete:1",
+ const_cast<Compaction*>(c));
+
// Check whether the next key belongs to the same snapshot as the
// SingleDelete.
if (prev_snapshot == 0 ||
DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) {
+ TEST_SYNC_POINT_CALLBACK(
+ "CompactionIterator::NextFromInput:SingleDelete:2", nullptr);
if (next_ikey.type == kTypeSingleDeletion) {
// We encountered two SingleDeletes in a row. This could be due to
// unexpected user input.
// Set up the Put to be outputted in the next iteration.
// (Optimization 3).
clear_and_output_next_key_ = true;
+ TEST_SYNC_POINT_CALLBACK(
+ "CompactionIterator::NextFromInput:KeepSDForWW",
+ /*arg=*/nullptr);
}
} else {
// We hit the next snapshot without hitting a put, so the iterator
// iteration. If the next key is corrupt, we return before the
// comparison, so the value of has_current_user_key does not matter.
has_current_user_key_ = false;
- if (compaction_ != nullptr && InEarliestSnapshot(ikey_.sequence) &&
+ if (compaction_ != nullptr &&
+ DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
&level_ptrs_)) {
// Key doesn't exist outside of this range.
(ikey_.type == kTypeDeletion ||
(ikey_.type == kTypeDeletionWithTimestamp &&
cmp_with_history_ts_low_ < 0)) &&
- InEarliestSnapshot(ikey_.sequence) &&
+ DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
ikeyNotNeededForIncrementalSnapshot() &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
&level_ptrs_)) {
ikey_.user_key, &level_ptrs_));
ParsedInternalKey next_ikey;
AdvanceInputIter();
+#ifndef NDEBUG
+ const Compaction* c =
+ compaction_ ? compaction_->real_compaction() : nullptr;
+#endif
+ TEST_SYNC_POINT_CALLBACK(
+ "CompactionIterator::NextFromInput:BottommostDelete:1",
+ const_cast<Compaction*>(c));
// Skip over all versions of this key that happen to occur in the same
// snapshot range as the delete.
//
if (valid_ && compaction_ != nullptr &&
!compaction_->allow_ingest_behind() &&
ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ &&
- InEarliestSnapshot(ikey_.sequence) && ikey_.type != kTypeMerge) {
+ DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
+ ikey_.type != kTypeMerge) {
assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
ROCKS_LOG_FATAL(info_log_,
(ikey_.sequence < preserve_deletes_seqnum_);
}
-bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) {
+bool CompactionIterator::IsInCurrentEarliestSnapshot(SequenceNumber sequence) {
assert(snapshot_checker_ != nullptr);
bool pre_condition = (earliest_snapshot_ == kMaxSequenceNumber ||
(earliest_snapshot_iter_ != snapshots_->end() &&
ASSERT_OK(ReOpen());
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
+ SequenceNumber put_seq = db->GetLatestSequenceNumber();
auto* transaction =
db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
ASSERT_OK(transaction->SetName("txn"));
// Since the delete tombstone is not visible to snapshot2, we need to keep
// at least one version of the key, for write-conflict check.
VerifyInternalKeys({{"key1", "", del_seq, kTypeDeletion},
- {"key1", "value1", 0, kTypeValue}});
+ {"key1", "value1", put_seq, kTypeValue}});
db->ReleaseSnapshot(snapshot2);
SyncPoint::GetInstance()->ClearAllCallBacks();
}
+TEST_P(WritePreparedTransactionTest,
+ ReleaseEarliestSnapshotDuringCompaction_WithSD) {
+ constexpr size_t kSnapshotCacheBits = 7; // same as default
+ constexpr size_t kCommitCacheBits = 0; // minimum commit cache
+ UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
+ options.disable_auto_compactions = true;
+ ASSERT_OK(ReOpen());
+
+ ASSERT_OK(db->Put(WriteOptions(), "key", "value"));
+ ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
+ ASSERT_OK(db->Flush(FlushOptions()));
+
+ auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
+ /*old_txn=*/nullptr);
+ ASSERT_OK(txn->SingleDelete("key"));
+ ASSERT_OK(txn->Put("wow", "value"));
+ ASSERT_OK(txn->SetName("txn"));
+ ASSERT_OK(txn->Prepare());
+ ASSERT_OK(db->Flush(FlushOptions()));
+
+ const bool two_write_queues = std::get<1>(GetParam());
+ if (two_write_queues) {
+ // In the case of two queues, commit another txn just to bump
+ // last_published_seq so that a subsequent GetSnapshot() call can return
+ // a snapshot with higher sequence.
+ auto* dummy_txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
+ /*old_txn=*/nullptr);
+ ASSERT_OK(dummy_txn->Put("haha", "value"));
+ ASSERT_OK(dummy_txn->Commit());
+ delete dummy_txn;
+ }
+ auto* snapshot = db->GetSnapshot();
+
+ ASSERT_OK(txn->Commit());
+ delete txn;
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* arg) {
+ if (!arg) {
+ return;
+ }
+ db->ReleaseSnapshot(snapshot);
+
+ // Advance max_evicted_seq
+ ASSERT_OK(db->Put(WriteOptions(), "bar", "value"));
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+ ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
+ /*end=*/nullptr));
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_P(WritePreparedTransactionTest,
+ ReleaseEarliestSnapshotDuringCompaction_WithSD2) {
+ constexpr size_t kSnapshotCacheBits = 7; // same as default
+ constexpr size_t kCommitCacheBits = 0; // minimum commit cache
+ UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
+ options.disable_auto_compactions = true;
+ ASSERT_OK(ReOpen());
+
+ ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
+ ASSERT_OK(db->Put(WriteOptions(), "key", "value"));
+ ASSERT_OK(db->Flush(FlushOptions()));
+
+ auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
+ /*old_txn=*/nullptr);
+ ASSERT_OK(txn->Put("bar", "value"));
+ ASSERT_OK(txn->SingleDelete("key"));
+ ASSERT_OK(txn->SetName("txn"));
+ ASSERT_OK(txn->Prepare());
+ ASSERT_OK(db->Flush(FlushOptions()));
+
+ ASSERT_OK(txn->Commit());
+ delete txn;
+
+ ASSERT_OK(db->Put(WriteOptions(), "haha", "value"));
+
+ // Create a dummy transaction to take a snapshot for ww-conflict detection.
+ TransactionOptions txn_opts;
+ txn_opts.set_snapshot = true;
+ auto* dummy_txn =
+ db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr);
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "CompactionIterator::NextFromInput:SingleDelete:2", [&](void* /*arg*/) {
+ ASSERT_OK(dummy_txn->Rollback());
+ delete dummy_txn;
+
+ ASSERT_OK(db->Put(WriteOptions(), "dontcare", "value"));
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ ASSERT_OK(db->Put(WriteOptions(), "haha2", "value"));
+ auto* snapshot = db->GetSnapshot();
+
+ ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ db->ReleaseSnapshot(snapshot);
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_P(WritePreparedTransactionTest,
+ ReleaseEarliestSnapshotDuringCompaction_WithDelete) {
+ constexpr size_t kSnapshotCacheBits = 7; // same as default
+ constexpr size_t kCommitCacheBits = 0; // minimum commit cache
+ UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
+ options.disable_auto_compactions = true;
+ ASSERT_OK(ReOpen());
+
+ ASSERT_OK(db->Put(WriteOptions(), "a", "value"));
+ ASSERT_OK(db->Put(WriteOptions(), "b", "value"));
+ ASSERT_OK(db->Put(WriteOptions(), "c", "value"));
+ ASSERT_OK(db->Flush(FlushOptions()));
+
+ auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
+ /*old_txn=*/nullptr);
+ ASSERT_OK(txn->Delete("b"));
+ ASSERT_OK(txn->SetName("txn"));
+ ASSERT_OK(txn->Prepare());
+
+ const bool two_write_queues = std::get<1>(GetParam());
+ if (two_write_queues) {
+ // In the case of two queues, commit another txn just to bump
+ // last_published_seq so that a subsequent GetSnapshot() call can return
+ // a snapshot with higher sequence.
+ auto* dummy_txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
+ /*old_txn=*/nullptr);
+ ASSERT_OK(dummy_txn->Put("haha", "value"));
+ ASSERT_OK(dummy_txn->Commit());
+ delete dummy_txn;
+ }
+ auto* snapshot1 = db->GetSnapshot();
+ ASSERT_OK(txn->Commit());
+ delete txn;
+ auto* snapshot2 = db->GetSnapshot();
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "CompactionIterator::NextFromInput:BottommostDelete:1", [&](void* arg) {
+ if (!arg) {
+ return;
+ }
+ db->ReleaseSnapshot(snapshot1);
+
+ // Advance max_evicted_seq
+ ASSERT_OK(db->Put(WriteOptions(), "dummy1", "value"));
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
+ /*end=*/nullptr));
+ db->ReleaseSnapshot(snapshot2);
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_P(WritePreparedTransactionTest,
+ ReleaseSnapshotBetweenSDAndPutDuringCompaction) {
+ constexpr size_t kSnapshotCacheBits = 7; // same as default
+ constexpr size_t kCommitCacheBits = 0; // minimum commit cache
+ UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
+ options.disable_auto_compactions = true;
+ ASSERT_OK(ReOpen());
+
+ // Create a dummy transaction to take a snapshot for ww-conflict detection.
+ TransactionOptions txn_opts;
+ txn_opts.set_snapshot = true;
+ auto* dummy_txn =
+ db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr);
+ // Increment seq
+ ASSERT_OK(db->Put(WriteOptions(), "bar", "value"));
+
+ ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
+ ASSERT_OK(db->SingleDelete(WriteOptions(), "foo"));
+ auto* snapshot1 = db->GetSnapshot();
+ // Increment seq
+ ASSERT_OK(db->Put(WriteOptions(), "dontcare", "value"));
+ auto* snapshot2 = db->GetSnapshot();
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "CompactionIterator::NextFromInput:KeepSDForWW", [&](void* /*arg*/) {
+ db->ReleaseSnapshot(snapshot1);
+
+ ASSERT_OK(db->Put(WriteOptions(), "dontcare2", "value2"));
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ ASSERT_OK(db->Flush(FlushOptions()));
+ db->ReleaseSnapshot(snapshot2);
+ ASSERT_OK(dummy_txn->Commit());
+ delete dummy_txn;
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
// A more complex test to verify compaction/flush should keep keys visible
// to snapshots.
TEST_P(WritePreparedTransactionTest,