VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
edit.SetFullHistoryTsLow(ts_low);
+ TEST_SYNC_POINT_CALLBACK("DBImpl::IncreaseFullHistoryTsLowImpl:BeforeEdit",
+ &edit);
InstrumentedMutexLock l(&mutex_);
std::string current_ts_low = cfd->GetFullHistoryTsLow();
assert(ucmp->timestamp_size() == ts_low.size() && !ts_low.empty());
if (!current_ts_low.empty() &&
ucmp->CompareTimestamp(ts_low, current_ts_low) < 0) {
- return Status::InvalidArgument(
- "Cannot decrease full_history_timestamp_low");
+ return Status::InvalidArgument("Cannot decrease full_history_ts_low");
}
- return versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit,
- &mutex_);
+ Status s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
+ &edit, &mutex_);
+ if (!s.ok()) {
+ return s;
+ }
+ current_ts_low = cfd->GetFullHistoryTsLow();
+ if (!current_ts_low.empty() &&
+ ucmp->CompareTimestamp(current_ts_low, ts_low) > 0) {
+ std::stringstream oss;
+ oss << "full_history_ts_low: " << Slice(current_ts_low).ToString(true)
+ << " is set to be higher than the requested "
+ "timestamp: "
+ << Slice(ts_low).ToString(true) << std::endl;
+ return Status::TryAgain(oss.str());
+ }
+ return Status::OK();
}
Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey)));
#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
+class UpdateFullHistoryTsLowTest : public DBBasicTestWithTimestampBase {
+ public:
+ UpdateFullHistoryTsLowTest()
+ : DBBasicTestWithTimestampBase("/update_full_history_ts_low_test") {}
+};
+
+TEST_F(UpdateFullHistoryTsLowTest, ConcurrentUpdate) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ options.create_if_missing = true;
+ std::string lower_ts_low = Timestamp(10, 0);
+ std::string higher_ts_low = Timestamp(25, 0);
+ const size_t kTimestampSize = lower_ts_low.size();
+ TestComparator test_cmp(kTimestampSize);
+ options.comparator = &test_cmp;
+
+ DestroyAndReopen(options);
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ // This workaround swaps `lower_ts_low` originally used for update by the
+ // caller to `higher_ts_low` after its writer is queued to make sure
+ // the caller will always get a TryAgain error.
+ // It mimics cases where two threads update full_history_ts_low concurrently
+ // with one thread writing a higher ts_low and one thread writing a lower
+ // ts_low.
+ VersionEdit* version_edit;
+ SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::IncreaseFullHistoryTsLowImpl:BeforeEdit",
+ [&](void* arg) { version_edit = reinterpret_cast<VersionEdit*>(arg); });
+ SyncPoint::GetInstance()->SetCallBack(
+ "VersionSet::LogAndApply:BeforeWriterWaiting",
+ [&](void* /*arg*/) { version_edit->SetFullHistoryTsLow(higher_ts_low); });
+ SyncPoint::GetInstance()->EnableProcessing();
+ ASSERT_TRUE(
+ db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(), lower_ts_low)
+ .IsTryAgain());
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ Close();
+}
+
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {