]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Return try again when full_history_ts_low is higher than requested ts (#10126)
authorYu Zhang <yuzhangyu@fb.com>
Fri, 10 Jun 2022 15:21:08 +0000 (08:21 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Fri, 10 Jun 2022 15:21:08 +0000 (08:21 -0700)
Summary:
This PR helps handle the race condition mentioned in this comment thread: https://github.com/facebook/rocksdb/pull/7884#discussion_r572402281 In case where actual full_history_ts_low is higher than the user's requested ts, return a try again message so they don't have the misconception that data between [ts, full_history_ts_low) is kept.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10126

Test Plan:
```
$COMPILE_WITH_ASAN=1 make -j24 all
$./db_with_timestamp_basic_test --gtest_filter=UpdateFullHistoryTsLowTest.ConcurrentUpdate
$ make -j24 check
```

Reviewed By: riversand963

Differential Revision: D37055368

Pulled By: jowlyzhang

fbshipit-source-id: 787fd0984a246540fa03ac227b1d232590d27828

db/db_impl/db_impl_compaction_flush.cc
db/db_with_timestamp_basic_test.cc
include/rocksdb/db.h

index c84a745a6e60c46f2d83c867fe183e241a1bdd9f..16d8c8f355cad4ba0e6862ce01c4de88f09714f5 100644 (file)
@@ -952,6 +952,8 @@ Status DBImpl::IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd,
   VersionEdit edit;
   edit.SetColumnFamily(cfd->GetID());
   edit.SetFullHistoryTsLow(ts_low);
+  TEST_SYNC_POINT_CALLBACK("DBImpl::IncreaseFullHistoryTsLowImpl:BeforeEdit",
+                           &edit);
 
   InstrumentedMutexLock l(&mutex_);
   std::string current_ts_low = cfd->GetFullHistoryTsLow();
@@ -959,12 +961,25 @@ Status DBImpl::IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd,
   assert(ucmp->timestamp_size() == ts_low.size() && !ts_low.empty());
   if (!current_ts_low.empty() &&
       ucmp->CompareTimestamp(ts_low, current_ts_low) < 0) {
-    return Status::InvalidArgument(
-        "Cannot decrease full_history_timestamp_low");
+    return Status::InvalidArgument("Cannot decrease full_history_ts_low");
   }
 
-  return versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit,
-                                &mutex_);
+  Status s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
+                                    &edit, &mutex_);
+  if (!s.ok()) {
+    return s;
+  }
+  current_ts_low = cfd->GetFullHistoryTsLow();
+  if (!current_ts_low.empty() &&
+      ucmp->CompareTimestamp(current_ts_low, ts_low) > 0) {
+    std::stringstream oss;
+    oss << "full_history_ts_low: " << Slice(current_ts_low).ToString(true)
+        << " is set to be higher than the requested "
+           "timestamp: "
+        << Slice(ts_low).ToString(true) << std::endl;
+    return Status::TryAgain(oss.str());
+  }
+  return Status::OK();
 }
 
 Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
index 57ce76f7916ecc33933026434f57edd2f7d221a9..84fae1f10d2b38127c2e3ec26a812f0406560872 100644 (file)
@@ -3022,6 +3022,48 @@ INSTANTIATE_TEST_CASE_P(
             BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey)));
 #endif  // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
 
+class UpdateFullHistoryTsLowTest : public DBBasicTestWithTimestampBase {
+ public:
+  UpdateFullHistoryTsLowTest()
+      : DBBasicTestWithTimestampBase("/update_full_history_ts_low_test") {}
+};
+
+TEST_F(UpdateFullHistoryTsLowTest, ConcurrentUpdate) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  std::string lower_ts_low = Timestamp(10, 0);
+  std::string higher_ts_low = Timestamp(25, 0);
+  const size_t kTimestampSize = lower_ts_low.size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+
+  DestroyAndReopen(options);
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+  // This workaround swaps `lower_ts_low` originally used for update by the
+  // caller to `higher_ts_low` after its writer is queued to make sure
+  // the caller will always get a TryAgain error.
+  // It mimics cases where two threads update full_history_ts_low concurrently
+  // with one thread writing a higher ts_low and one thread writing a lower
+  // ts_low.
+  VersionEdit* version_edit;
+  SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::IncreaseFullHistoryTsLowImpl:BeforeEdit",
+      [&](void* arg) { version_edit = reinterpret_cast<VersionEdit*>(arg); });
+  SyncPoint::GetInstance()->SetCallBack(
+      "VersionSet::LogAndApply:BeforeWriterWaiting",
+      [&](void* /*arg*/) { version_edit->SetFullHistoryTsLow(higher_ts_low); });
+  SyncPoint::GetInstance()->EnableProcessing();
+  ASSERT_TRUE(
+      db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(), lower_ts_low)
+          .IsTryAgain());
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+
+  Close();
+}
+
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {
index 2ea4ffbd536db2220bff860ca18609048a74dcf1..378cd873a4f6fc064c49563a067b180006c6b105 100644 (file)
@@ -1421,6 +1421,8 @@ class DB {
 
   // Increase the full_history_ts of column family. The new ts_low value should
   // be newer than current full_history_ts value.
+  // If another thread updates full_history_ts_low concurrently to a higher
+  // timestamp than the requested ts_low, a try again error will be returned.
   virtual Status IncreaseFullHistoryTsLow(ColumnFamilyHandle* column_family,
                                           std::string ts_low) = 0;