From: Yu Zhang Date: Fri, 10 Jun 2022 15:21:08 +0000 (-0700) Subject: Return try again when full_history_ts_low is higher than requested ts (#10126) X-Git-Tag: v7.4.3~50 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=693dffd8e8e9133519d3adba9cd817adc0a995ba;p=rocksdb.git Return try again when full_history_ts_low is higher than requested ts (#10126) Summary: This PR helps handle the race condition mentioned in this comment thread: https://github.com/facebook/rocksdb/pull/7884#discussion_r572402281 In case where actual full_history_ts_low is higher than the user's requested ts, return a try again message so they don't have the misconception that data between [ts, full_history_ts_low) is kept. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10126 Test Plan: ``` $COMPILE_WITH_ASAN=1 make -j24 all $./db_with_timestamp_basic_test --gtest_filter=UpdateFullHistoryTsLowTest.ConcurrentUpdate $ make -j24 check ``` Reviewed By: riversand963 Differential Revision: D37055368 Pulled By: jowlyzhang fbshipit-source-id: 787fd0984a246540fa03ac227b1d232590d27828 --- diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index c84a745a6..16d8c8f35 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -952,6 +952,8 @@ Status DBImpl::IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd, VersionEdit edit; edit.SetColumnFamily(cfd->GetID()); edit.SetFullHistoryTsLow(ts_low); + TEST_SYNC_POINT_CALLBACK("DBImpl::IncreaseFullHistoryTsLowImpl:BeforeEdit", + &edit); InstrumentedMutexLock l(&mutex_); std::string current_ts_low = cfd->GetFullHistoryTsLow(); @@ -959,12 +961,25 @@ Status DBImpl::IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd, assert(ucmp->timestamp_size() == ts_low.size() && !ts_low.empty()); if (!current_ts_low.empty() && ucmp->CompareTimestamp(ts_low, current_ts_low) < 0) { - return Status::InvalidArgument( - "Cannot decrease full_history_timestamp_low"); + return Status::InvalidArgument("Cannot decrease full_history_ts_low"); } - return versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, - &mutex_); + Status s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), + &edit, &mutex_); + if (!s.ok()) { + return s; + } + current_ts_low = cfd->GetFullHistoryTsLow(); + if (!current_ts_low.empty() && + ucmp->CompareTimestamp(current_ts_low, ts_low) > 0) { + std::stringstream oss; + oss << "full_history_ts_low: " << Slice(current_ts_low).ToString(true) + << " is set to be higher than the requested " + "timestamp: " + << Slice(ts_low).ToString(true) << std::endl; + return Status::TryAgain(oss.str()); + } + return Status::OK(); } Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options, diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 57ce76f79..84fae1f10 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -3022,6 +3022,48 @@ INSTANTIATE_TEST_CASE_P( BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey))); #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) +class UpdateFullHistoryTsLowTest : public DBBasicTestWithTimestampBase { + public: + UpdateFullHistoryTsLowTest() + : DBBasicTestWithTimestampBase("/update_full_history_ts_low_test") {} +}; + +TEST_F(UpdateFullHistoryTsLowTest, ConcurrentUpdate) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + std::string lower_ts_low = Timestamp(10, 0); + std::string higher_ts_low = Timestamp(25, 0); + const size_t kTimestampSize = lower_ts_low.size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + + DestroyAndReopen(options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + // This workaround swaps `lower_ts_low` originally used for update by the + // caller to `higher_ts_low` after its writer is queued to make sure + // the caller will always get a TryAgain error. + // It mimics cases where two threads update full_history_ts_low concurrently + // with one thread writing a higher ts_low and one thread writing a lower + // ts_low. + VersionEdit* version_edit; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::IncreaseFullHistoryTsLowImpl:BeforeEdit", + [&](void* arg) { version_edit = reinterpret_cast(arg); }); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:BeforeWriterWaiting", + [&](void* /*arg*/) { version_edit->SetFullHistoryTsLow(higher_ts_low); }); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_TRUE( + db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(), lower_ts_low) + .IsTryAgain()); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + Close(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 2ea4ffbd5..378cd873a 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1421,6 +1421,8 @@ class DB { // Increase the full_history_ts of column family. The new ts_low value should // be newer than current full_history_ts value. + // If another thread updates full_history_ts_low concurrently to a higher + // timestamp than the requested ts_low, a try again error will be returned. virtual Status IncreaseFullHistoryTsLow(ColumnFamilyHandle* column_family, std::string ts_low) = 0;