]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Enhance async scan prefetch unit tests (#11087)
authorakankshamahajan <akankshamahajan@fb.com>
Fri, 20 Jan 2023 18:17:57 +0000 (10:17 -0800)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Fri, 20 Jan 2023 18:17:57 +0000 (10:17 -0800)
Summary:
Add more coverage in unit tests for async scan. The added unit test fails without PR https://github.com/facebook/rocksdb/pull/10939.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11087

Test Plan: CircleCI jobs status for new unit tests.

Reviewed By: anand1976

Differential Revision: D42487931

Pulled By: akankshamahajan15

fbshipit-source-id: d59ed7666599bd0d2733ac5d76bd70984b54c5a9

file/file_prefetch_buffer.cc
file/prefetch_test.cc

index 4ac0d05042cc305a2e9cfc0af67247f94619f8a9..f7d4c95913590416baa7ff8d600937e08f8769a2 100644 (file)
@@ -109,6 +109,7 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts,
                                      RandomAccessFileReader* reader,
                                      uint64_t read_len,
                                      uint64_t rounddown_start, uint32_t index) {
+  TEST_SYNC_POINT("FilePrefetchBuffer::ReadAsync");
   // callback for async read request.
   auto fp = std::bind(&FilePrefetchBuffer::PrefetchAsyncCallback, this,
                       std::placeholders::_1, std::placeholders::_2);
index 11cb841b978d761fa64e9da05feebb37818fa5a9..23e7454ed115902ed2f6eef9ca97693238e752e7 100644 (file)
@@ -79,6 +79,27 @@ class PrefetchTest
       public ::testing::WithParamInterface<std::tuple<bool, bool>> {
  public:
   PrefetchTest() : DBTestBase("prefetch_test", true) {}
+
+  void SetGenericOptions(Env* env, bool use_direct_io, Options& options) {
+    options = CurrentOptions();
+    options.write_buffer_size = 1024;
+    options.create_if_missing = true;
+    options.compression = kNoCompression;
+    options.env = env;
+    options.disable_auto_compactions = true;
+    if (use_direct_io) {
+      options.use_direct_reads = true;
+      options.use_direct_io_for_flush_and_compaction = true;
+    }
+  }
+
+  void SetBlockBasedTableOptions(BlockBasedTableOptions& table_options) {
+    table_options.no_block_cache = true;
+    table_options.cache_index_and_filter_blocks = false;
+    table_options.metadata_block_size = 1024;
+    table_options.index_type =
+        BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+  }
 };
 
 INSTANTIATE_TEST_CASE_P(PrefetchTest, PrefetchTest,
@@ -89,28 +110,23 @@ std::string BuildKey(int num, std::string postfix = "") {
   return "my_key_" + std::to_string(num) + postfix;
 }
 
+// This test verifies the basic functionality of prefetching.
 TEST_P(PrefetchTest, Basic) {
   // First param is if the mockFS support_prefetch or not
   bool support_prefetch =
       std::get<0>(GetParam()) &&
       test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
+  std::shared_ptr<MockFS> fs =
+      std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
 
   // Second param is if directIO is enabled or not
   bool use_direct_io = std::get<1>(GetParam());
-  const int kNumKeys = 1100;
-  std::shared_ptr<MockFS> fs =
-      std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
+
   std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
-  Options options = CurrentOptions();
-  options.write_buffer_size = 1024;
-  options.create_if_missing = true;
-  options.compression = kNoCompression;
-  options.env = env.get();
-  if (use_direct_io) {
-    options.use_direct_reads = true;
-    options.use_direct_io_for_flush_and_compaction = true;
-  }
+  Options options;
+  SetGenericOptions(env.get(), use_direct_io, options);
 
+  const int kNumKeys = 1100;
   int buff_prefetch_count = 0;
   SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
                                         [&](void*) { buff_prefetch_count++; });
@@ -192,35 +208,24 @@ TEST_P(PrefetchTest, Basic) {
 }
 
 #ifndef ROCKSDB_LITE
+// This test verifies BlockBasedTableOptions.max_auto_readahead_size is
+// configured dynamically.
 TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) {
   // First param is if the mockFS support_prefetch or not
   bool support_prefetch =
       std::get<0>(GetParam()) &&
       test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
+  std::shared_ptr<MockFS> fs =
+      std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
 
   // Second param is if directIO is enabled or not
   bool use_direct_io = std::get<1>(GetParam());
 
-  std::shared_ptr<MockFS> fs =
-      std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
   std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
-
-  Options options = CurrentOptions();
-  options.write_buffer_size = 1024;
-  options.create_if_missing = true;
-  options.compression = kNoCompression;
-  options.env = env.get();
-  options.disable_auto_compactions = true;
-  if (use_direct_io) {
-    options.use_direct_reads = true;
-    options.use_direct_io_for_flush_and_compaction = true;
-  }
+  Options options;
+  SetGenericOptions(env.get(), use_direct_io, options);
   BlockBasedTableOptions table_options;
-  table_options.no_block_cache = true;
-  table_options.cache_index_and_filter_blocks = false;
-  table_options.metadata_block_size = 1024;
-  table_options.index_type =
-      BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+  SetBlockBasedTableOptions(table_options);
   table_options.max_auto_readahead_size = 0;
   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
 
@@ -329,6 +334,8 @@ TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) {
   Close();
 }
 
+// This test verifies BlockBasedTableOptions.initial_auto_readahead_size is
+// configured dynamically.
 TEST_P(PrefetchTest, ConfigureInternalAutoReadaheadSize) {
   // First param is if the mockFS support_prefetch or not
   bool support_prefetch =
@@ -341,23 +348,10 @@ TEST_P(PrefetchTest, ConfigureInternalAutoReadaheadSize) {
   std::shared_ptr<MockFS> fs =
       std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
   std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
-
-  Options options = CurrentOptions();
-  options.write_buffer_size = 1024;
-  options.create_if_missing = true;
-  options.compression = kNoCompression;
-  options.env = env.get();
-  options.disable_auto_compactions = true;
-  if (use_direct_io) {
-    options.use_direct_reads = true;
-    options.use_direct_io_for_flush_and_compaction = true;
-  }
+  Options options;
+  SetGenericOptions(env.get(), use_direct_io, options);
   BlockBasedTableOptions table_options;
-  table_options.no_block_cache = true;
-  table_options.cache_index_and_filter_blocks = false;
-  table_options.metadata_block_size = 1024;
-  table_options.index_type =
-      BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+  SetBlockBasedTableOptions(table_options);
   table_options.initial_auto_readahead_size = 0;
   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
 
@@ -468,6 +462,8 @@ TEST_P(PrefetchTest, ConfigureInternalAutoReadaheadSize) {
   Close();
 }
 
+// This test verifies BlockBasedTableOptions.num_file_reads_for_auto_readahead
+// is configured dynamically.
 TEST_P(PrefetchTest, ConfigureNumFilesReadsForReadaheadSize) {
   // First param is if the mockFS support_prefetch or not
   bool support_prefetch =
@@ -482,26 +478,13 @@ TEST_P(PrefetchTest, ConfigureNumFilesReadsForReadaheadSize) {
   // Second param is if directIO is enabled or not
   bool use_direct_io = std::get<1>(GetParam());
 
-  Options options = CurrentOptions();
-  options.write_buffer_size = 1024;
-  options.create_if_missing = true;
-  options.compression = kNoCompression;
-  options.env = env.get();
-
+  Options options;
+  SetGenericOptions(env.get(), use_direct_io, options);
   BlockBasedTableOptions table_options;
-  table_options.no_block_cache = true;
-  table_options.cache_index_and_filter_blocks = false;
-  table_options.metadata_block_size = 1024;
-  table_options.index_type =
-      BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+  SetBlockBasedTableOptions(table_options);
   table_options.num_file_reads_for_auto_readahead = 0;
   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
 
-  if (use_direct_io) {
-    options.use_direct_reads = true;
-    options.use_direct_io_for_flush_and_compaction = true;
-  }
-
   int buff_prefetch_count = 0;
   SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
                                         [&](void*) { buff_prefetch_count++; });
@@ -578,6 +561,13 @@ TEST_P(PrefetchTest, ConfigureNumFilesReadsForReadaheadSize) {
 }
 #endif  // !ROCKSDB_LITE
 
+// This test verifies the basic functionality of implicit autoreadahead:
+// - Enable implicit autoreadahead and prefetch only if sequential blocks are
+//   read,
+// - If data is already in buffer and few blocks are not requested to read,
+//   don't reset,
+// - If data blocks are sequential during read after enabling implicit
+//   autoreadahead, reset readahead parameters.
 TEST_P(PrefetchTest, PrefetchWhenReseek) {
   // First param is if the mockFS support_prefetch or not
   bool support_prefetch =
@@ -592,25 +582,12 @@ TEST_P(PrefetchTest, PrefetchWhenReseek) {
   // Second param is if directIO is enabled or not
   bool use_direct_io = std::get<1>(GetParam());
 
-  Options options = CurrentOptions();
-  options.write_buffer_size = 1024;
-  options.create_if_missing = true;
-  options.compression = kNoCompression;
-  options.env = env.get();
-
+  Options options;
+  SetGenericOptions(env.get(), use_direct_io, options);
   BlockBasedTableOptions table_options;
-  table_options.no_block_cache = true;
-  table_options.cache_index_and_filter_blocks = false;
-  table_options.metadata_block_size = 1024;
-  table_options.index_type =
-      BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+  SetBlockBasedTableOptions(table_options);
   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
 
-  if (use_direct_io) {
-    options.use_direct_reads = true;
-    options.use_direct_io_for_flush_and_compaction = true;
-  }
-
   int buff_prefetch_count = 0;
   SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
                                         [&](void*) { buff_prefetch_count++; });
@@ -846,6 +823,12 @@ TEST_P(PrefetchTest, PrefetchWhenReseek) {
   Close();
 }
 
+// This test verifies the functionality of implicit autoreadahead when caching
+// is enabled:
+// - If data is already in buffer and few blocks are not requested to read,
+//   don't reset,
+// - If block was eligible for prefetching/in buffer but found in cache, don't
+// prefetch and reset.
 TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) {
   // First param is if the mockFS support_prefetch or not
   bool support_prefetch =
@@ -860,26 +843,15 @@ TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) {
   // Second param is if directIO is enabled or not
   bool use_direct_io = std::get<1>(GetParam());
 
-  Options options = CurrentOptions();
-  options.write_buffer_size = 1024;
-  options.create_if_missing = true;
-  options.compression = kNoCompression;
-  options.env = env.get();
-
+  Options options;
+  SetGenericOptions(env.get(), use_direct_io, options);
   BlockBasedTableOptions table_options;
+  SetBlockBasedTableOptions(table_options);
   std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);  // 8MB
   table_options.block_cache = cache;
-  table_options.cache_index_and_filter_blocks = false;
-  table_options.metadata_block_size = 1024;
-  table_options.index_type =
-      BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+  table_options.no_block_cache = false;
   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
 
-  if (use_direct_io) {
-    options.use_direct_reads = true;
-    options.use_direct_io_for_flush_and_compaction = true;
-  }
-
   int buff_prefetch_count = 0;
   SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
                                         [&](void*) { buff_prefetch_count++; });
@@ -978,6 +950,7 @@ TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) {
 }
 
 #ifndef ROCKSDB_LITE
+// This test verifies the functionality of ReadOptions.adaptive_readahead.
 TEST_P(PrefetchTest, DBIterLevelReadAhead) {
   const int kNumKeys = 1000;
   // Set options
@@ -988,23 +961,11 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) {
   bool use_direct_io = std::get<0>(GetParam());
   bool is_adaptive_readahead = std::get<1>(GetParam());
 
-  Options options = CurrentOptions();
-  options.write_buffer_size = 1024;
-  options.create_if_missing = true;
-  options.compression = kNoCompression;
+  Options options;
+  SetGenericOptions(env.get(), use_direct_io, options);
   options.statistics = CreateDBStatistics();
-  options.env = env.get();
-
-  if (use_direct_io) {
-    options.use_direct_reads = true;
-    options.use_direct_io_for_flush_and_compaction = true;
-  }
   BlockBasedTableOptions table_options;
-  table_options.no_block_cache = true;
-  table_options.cache_index_and_filter_blocks = false;
-  table_options.metadata_block_size = 1024;
-  table_options.index_type =
-      BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+  SetBlockBasedTableOptions(table_options);
   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
 
   Status s = TryReopen(options);
@@ -1028,7 +989,6 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) {
   }
   MoveFilesToLevel(2);
   int buff_prefetch_count = 0;
-  int buff_async_prefetch_count = 0;
   int readahead_carry_over_count = 0;
   int num_sst_files = NumTableFilesAtLevel(2);
   size_t current_readahead_size = 0;
@@ -1039,6 +999,101 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) {
         "FilePrefetchBuffer::Prefetch:Start",
         [&](void*) { buff_prefetch_count++; });
 
+    // The callback checks, since reads are sequential, readahead_size doesn't
+    // start from 8KB when iterator moves to next file and its called
+    // num_sst_files-1 times (excluding for first file).
+    SyncPoint::GetInstance()->SetCallBack(
+        "BlockPrefetcher::SetReadaheadState", [&](void* arg) {
+          readahead_carry_over_count++;
+          size_t readahead_size = *reinterpret_cast<size_t*>(arg);
+          if (readahead_carry_over_count) {
+            ASSERT_GT(readahead_size, 8 * 1024);
+          }
+        });
+
+    SyncPoint::GetInstance()->SetCallBack(
+        "FilePrefetchBuffer::TryReadFromCache", [&](void* arg) {
+          current_readahead_size = *reinterpret_cast<size_t*>(arg);
+          ASSERT_GT(current_readahead_size, 0);
+        });
+
+    SyncPoint::GetInstance()->EnableProcessing();
+
+    ReadOptions ro;
+    if (is_adaptive_readahead) {
+      ro.adaptive_readahead = true;
+    }
+
+    ASSERT_OK(options.statistics->Reset());
+
+    auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
+    int num_keys = 0;
+    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+      ASSERT_OK(iter->status());
+      num_keys++;
+    }
+    ASSERT_EQ(num_keys, total_keys);
+
+    // For index and data blocks.
+    if (is_adaptive_readahead) {
+      ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1));
+    } else {
+      ASSERT_GT(buff_prefetch_count, 0);
+      ASSERT_EQ(readahead_carry_over_count, 0);
+    }
+
+    SyncPoint::GetInstance()->DisableProcessing();
+    SyncPoint::GetInstance()->ClearAllCallBacks();
+  }
+  Close();
+}
+
+// This test verifies the functionality of ReadOptions.adaptive_readahead when
+// async_io is enabled.
+TEST_P(PrefetchTest, DBIterLevelReadAheadWithAsyncIO) {
+  const int kNumKeys = 1000;
+  // Set options
+  std::shared_ptr<MockFS> fs =
+      std::make_shared<MockFS>(env_->GetFileSystem(), false);
+  std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
+
+  bool use_direct_io = std::get<0>(GetParam());
+  bool is_adaptive_readahead = std::get<1>(GetParam());
+
+  Options options;
+  SetGenericOptions(env.get(), use_direct_io, options);
+  options.statistics = CreateDBStatistics();
+  BlockBasedTableOptions table_options;
+  SetBlockBasedTableOptions(table_options);
+  options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+
+  Status s = TryReopen(options);
+  if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
+    // If direct IO is not supported, skip the test
+    return;
+  } else {
+    ASSERT_OK(s);
+  }
+
+  WriteBatch batch;
+  Random rnd(309);
+  int total_keys = 0;
+  for (int j = 0; j < 5; j++) {
+    for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
+      ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
+      total_keys++;
+    }
+    ASSERT_OK(db_->Write(WriteOptions(), &batch));
+    ASSERT_OK(Flush());
+  }
+  MoveFilesToLevel(2);
+  int buff_async_prefetch_count = 0;
+  int readahead_carry_over_count = 0;
+  int num_sst_files = NumTableFilesAtLevel(2);
+  size_t current_readahead_size = 0;
+
+  // Test - Iterate over the keys sequentially.
+  {
     SyncPoint::GetInstance()->SetCallBack(
         "FilePrefetchBuffer::PrefetchAsyncInternal:Start",
         [&](void*) { buff_async_prefetch_count++; });
@@ -1066,8 +1121,8 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) {
     ReadOptions ro;
     if (is_adaptive_readahead) {
       ro.adaptive_readahead = true;
-      ro.async_io = true;
     }
+    ro.async_io = true;
 
     ASSERT_OK(options.statistics->Reset());
 
@@ -1082,11 +1137,10 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) {
     // For index and data blocks.
     if (is_adaptive_readahead) {
       ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1));
-      ASSERT_GT(buff_async_prefetch_count, 0);
     } else {
-      ASSERT_GT(buff_prefetch_count, 0);
       ASSERT_EQ(readahead_carry_over_count, 0);
     }
+    ASSERT_GT(buff_async_prefetch_count, 0);
 
     // Check stats to make sure async prefetch is done.
     {
@@ -1110,11 +1164,34 @@ class PrefetchTest1 : public DBTestBase,
                       public ::testing::WithParamInterface<bool> {
  public:
   PrefetchTest1() : DBTestBase("prefetch_test1", true) {}
+
+  void SetGenericOptions(Env* env, bool use_direct_io, Options& options) {
+    options = CurrentOptions();
+    options.write_buffer_size = 1024;
+    options.create_if_missing = true;
+    options.compression = kNoCompression;
+    options.env = env;
+    options.disable_auto_compactions = true;
+    if (use_direct_io) {
+      options.use_direct_reads = true;
+      options.use_direct_io_for_flush_and_compaction = true;
+    }
+  }
+
+  void SetBlockBasedTableOptions(BlockBasedTableOptions& table_options) {
+    table_options.no_block_cache = true;
+    table_options.cache_index_and_filter_blocks = false;
+    table_options.metadata_block_size = 1024;
+    table_options.index_type =
+        BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+  }
 };
 
 INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1, ::testing::Bool());
 
 #ifndef ROCKSDB_LITE
+// This test verifies the functionality of ReadOptions.adaptive_readahead when
+// reads are not sequential.
 TEST_P(PrefetchTest1, NonSequentialReadsWithAdaptiveReadahead) {
   const int kNumKeys = 1000;
   // Set options
@@ -1122,21 +1199,10 @@ TEST_P(PrefetchTest1, NonSequentialReadsWithAdaptiveReadahead) {
       std::make_shared<MockFS>(env_->GetFileSystem(), false);
   std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
 
-  Options options = CurrentOptions();
-  options.write_buffer_size = 1024;
-  options.create_if_missing = true;
-  options.compression = kNoCompression;
-  options.env = env.get();
-  if (GetParam()) {
-    options.use_direct_reads = true;
-    options.use_direct_io_for_flush_and_compaction = true;
-  }
+  Options options;
+  SetGenericOptions(env.get(), GetParam(), options);
   BlockBasedTableOptions table_options;
-  table_options.no_block_cache = true;
-  table_options.cache_index_and_filter_blocks = false;
-  table_options.metadata_block_size = 1024;
-  table_options.index_type =
-      BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+  SetBlockBasedTableOptions(table_options);
   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
 
   Status s = TryReopen(options);
@@ -1208,6 +1274,16 @@ TEST_P(PrefetchTest1, NonSequentialReadsWithAdaptiveReadahead) {
 }
 #endif  //! ROCKSDB_LITE
 
+// This test verifies the functionality of adaptive_readaheadsize with cache and
+// if block is found in cache, decrease the readahead_size if
+// - its enabled internally by RocksDB (implicit_auto_readahead_) and,
+// - readahead_size is greater than 0 and,
+// - the block would have called prefetch API if not found in cache for
+//   which conditions are:
+//   - few/no bytes are in buffer and,
+//   - block is sequential with the previous read and,
+//   - num_file_reads_ + 1 (including this read) >
+//   num_file_reads_for_auto_readahead_
 TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) {
   const int kNumKeys = 2000;
   // Set options
@@ -1215,24 +1291,14 @@ TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) {
       std::make_shared<MockFS>(env_->GetFileSystem(), false);
   std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
 
-  Options options = CurrentOptions();
-  options.write_buffer_size = 1024;
-  options.create_if_missing = true;
-  options.compression = kNoCompression;
-  options.env = env.get();
-  if (GetParam()) {
-    options.use_direct_reads = true;
-    options.use_direct_io_for_flush_and_compaction = true;
-  }
-
+  Options options;
+  SetGenericOptions(env.get(), GetParam(), options);
   options.statistics = CreateDBStatistics();
   BlockBasedTableOptions table_options;
+  SetBlockBasedTableOptions(table_options);
   std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);  // 8MB
   table_options.block_cache = cache;
-  table_options.cache_index_and_filter_blocks = false;
-  table_options.metadata_block_size = 1024;
-  table_options.index_type =
-      BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+  table_options.no_block_cache = false;
   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
 
   Status s = TryReopen(options);
@@ -1352,6 +1418,8 @@ TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) {
   Close();
 }
 
+// This test verifies the basic functionality of seek parallelization for
+// async_io.
 TEST_P(PrefetchTest1, SeekParallelizationTest) {
   const int kNumKeys = 2000;
   // Set options
@@ -1359,23 +1427,11 @@ TEST_P(PrefetchTest1, SeekParallelizationTest) {
       std::make_shared<MockFS>(env_->GetFileSystem(), false);
   std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
 
-  Options options = CurrentOptions();
-  options.write_buffer_size = 1024;
-  options.create_if_missing = true;
-  options.compression = kNoCompression;
-  options.env = env.get();
-  if (GetParam()) {
-    options.use_direct_reads = true;
-    options.use_direct_io_for_flush_and_compaction = true;
-  }
-
+  Options options;
+  SetGenericOptions(env.get(), GetParam(), options);
   options.statistics = CreateDBStatistics();
   BlockBasedTableOptions table_options;
-  table_options.no_block_cache = true;
-  table_options.cache_index_and_filter_blocks = false;
-  table_options.metadata_block_size = 1024;
-  table_options.index_type =
-      BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+  SetBlockBasedTableOptions(table_options);
   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
 
   Status s = TryReopen(options);
@@ -1485,7 +1541,8 @@ void RunIOTracerParserTool(std::string trace_file) {
 #endif  // ROCKSDB_LITE
 }  // namespace
 
-// Tests the default implementation of ReadAsync API with PosixFileSystem.
+// Tests the default implementation of ReadAsync API with PosixFileSystem during
+// prefetching.
 TEST_P(PrefetchTest, ReadAsyncWithPosixFS) {
   if (mem_env_ || encrypted_env_) {
     ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
@@ -1498,22 +1555,11 @@ TEST_P(PrefetchTest, ReadAsyncWithPosixFS) {
   std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
 
   bool use_direct_io = std::get<0>(GetParam());
-  Options options = CurrentOptions();
-  options.write_buffer_size = 1024;
-  options.create_if_missing = true;
-  options.compression = kNoCompression;
-  options.env = env.get();
+  Options options;
+  SetGenericOptions(env.get(), use_direct_io, options);
   options.statistics = CreateDBStatistics();
-  if (use_direct_io) {
-    options.use_direct_reads = true;
-    options.use_direct_io_for_flush_and_compaction = true;
-  }
   BlockBasedTableOptions table_options;
-  table_options.no_block_cache = true;
-  table_options.cache_index_and_filter_blocks = false;
-  table_options.metadata_block_size = 1024;
-  table_options.index_type =
-      BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+  SetBlockBasedTableOptions(table_options);
   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
 
   Status s = TryReopen(options);
@@ -1600,6 +1646,8 @@ TEST_P(PrefetchTest, ReadAsyncWithPosixFS) {
   Close();
 }
 
+// This test verifies implementation of seek parallelization with
+// PosixFileSystem during prefetching.
 TEST_P(PrefetchTest, MultipleSeekWithPosixFS) {
   if (mem_env_ || encrypted_env_) {
     ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
@@ -1612,22 +1660,11 @@ TEST_P(PrefetchTest, MultipleSeekWithPosixFS) {
   std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
 
   bool use_direct_io = std::get<0>(GetParam());
-  Options options = CurrentOptions();
-  options.write_buffer_size = 1024;
-  options.create_if_missing = true;
-  options.compression = kNoCompression;
-  options.env = env.get();
+  Options options;
+  SetGenericOptions(env.get(), use_direct_io, options);
   options.statistics = CreateDBStatistics();
-  if (use_direct_io) {
-    options.use_direct_reads = true;
-    options.use_direct_io_for_flush_and_compaction = true;
-  }
   BlockBasedTableOptions table_options;
-  table_options.no_block_cache = true;
-  table_options.cache_index_and_filter_blocks = false;
-  table_options.metadata_block_size = 1024;
-  table_options.index_type =
-      BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+  SetBlockBasedTableOptions(table_options);
   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
 
   Status s = TryReopen(options);
@@ -1774,7 +1811,9 @@ TEST_P(PrefetchTest, MultipleSeekWithPosixFS) {
   Close();
 }
 
-TEST_P(PrefetchTest, SeekParallelizationTest1) {
+// This test verifies implementation of seek parallelization with
+// PosixFileSystem during prefetching.
+TEST_P(PrefetchTest, SeekParallelizationTestWithPosix) {
   if (mem_env_ || encrypted_env_) {
     ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
     return;
@@ -1786,23 +1825,11 @@ TEST_P(PrefetchTest, SeekParallelizationTest1) {
   std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
 
   bool use_direct_io = std::get<0>(GetParam());
-  Options options = CurrentOptions();
-  options.write_buffer_size = 1024;
-  options.create_if_missing = true;
-  options.compression = kNoCompression;
-  options.env = env.get();
-  if (use_direct_io) {
-    options.use_direct_reads = true;
-    options.use_direct_io_for_flush_and_compaction = true;
-  }
-
+  Options options;
+  SetGenericOptions(env.get(), use_direct_io, options);
   options.statistics = CreateDBStatistics();
   BlockBasedTableOptions table_options;
-  table_options.no_block_cache = true;
-  table_options.cache_index_and_filter_blocks = false;
-  table_options.metadata_block_size = 1024;
-  table_options.index_type =
-      BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+  SetBlockBasedTableOptions(table_options);
   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
 
   Status s = TryReopen(options);
@@ -1904,6 +1931,7 @@ TEST_P(PrefetchTest, SeekParallelizationTest1) {
 
 #ifndef ROCKSDB_LITE
 #ifdef GFLAGS
+// This test verifies io_tracing with PosixFileSystem during prefetching.
 TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) {
   if (mem_env_ || encrypted_env_) {
     ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
@@ -1916,22 +1944,11 @@ TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) {
   std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
 
   bool use_direct_io = std::get<0>(GetParam());
-  Options options = CurrentOptions();
-  options.write_buffer_size = 1024;
-  options.create_if_missing = true;
-  options.compression = kNoCompression;
-  options.env = env.get();
+  Options options;
+  SetGenericOptions(env.get(), use_direct_io, options);
   options.statistics = CreateDBStatistics();
-  if (use_direct_io) {
-    options.use_direct_reads = true;
-    options.use_direct_io_for_flush_and_compaction = true;
-  }
   BlockBasedTableOptions table_options;
-  table_options.no_block_cache = true;
-  table_options.cache_index_and_filter_blocks = false;
-  table_options.metadata_block_size = 1024;
-  table_options.index_type =
-      BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+  SetBlockBasedTableOptions(table_options);
   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
 
   Status s = TryReopen(options);
@@ -2036,6 +2053,7 @@ class FilePrefetchBufferTest : public testing::Test {
     fs_ = FileSystem::Default();
     test_dir_ = test::PerThreadDBPath("file_prefetch_buffer_test");
     ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr));
+    stats_ = CreateDBStatistics();
   }
 
   void TearDown() override { EXPECT_OK(DestroyDir(env_, test_dir_)); }
@@ -2052,8 +2070,9 @@ class FilePrefetchBufferTest : public testing::Test {
     std::string fpath = Path(fname);
     std::unique_ptr<FSRandomAccessFile> f;
     ASSERT_OK(fs_->NewRandomAccessFile(fpath, opts, &f, nullptr));
-    reader->reset(new RandomAccessFileReader(std::move(f), fpath,
-                                             env_->GetSystemClock().get()));
+    reader->reset(new RandomAccessFileReader(
+        std::move(f), fpath, env_->GetSystemClock().get(),
+        /*io_tracer=*/nullptr, stats_.get()));
   }
 
   void AssertResult(const std::string& content,
@@ -2066,11 +2085,13 @@ class FilePrefetchBufferTest : public testing::Test {
   }
 
   FileSystem* fs() { return fs_.get(); }
+  Statistics* stats() { return stats_.get(); }
 
  private:
   Env* env_;
   std::shared_ptr<FileSystem> fs_;
   std::string test_dir_;
+  std::shared_ptr<Statistics> stats_;
 
   std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; }
 };
@@ -2099,6 +2120,53 @@ TEST_F(FilePrefetchBufferTest, SeekWithBlockCacheHit) {
   ASSERT_TRUE(fpb.TryReadFromCacheAsync(IOOptions(), r.get(), 8192, 8192,
                                         &result, &s, Env::IOPriority::IO_LOW));
 }
+
+TEST_F(FilePrefetchBufferTest, NoSyncWithAsyncIO) {
+  std::string fname = "seek-with-block-cache-hit";
+  Random rand(0);
+  std::string content = rand.RandomString(32768);
+  Write(fname, content);
+
+  FileOptions opts;
+  std::unique_ptr<RandomAccessFileReader> r;
+  Read(fname, opts, &r);
+
+  FilePrefetchBuffer fpb(
+      /*readahead_size=*/8192, /*max_readahead_size=*/16384, /*enable=*/true,
+      /*track_min_offset=*/false, /*implicit_auto_readahead=*/false,
+      /*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/0, fs());
+
+  int read_async_called = 0;
+  SyncPoint::GetInstance()->SetCallBack(
+      "FilePrefetchBuffer::ReadAsync",
+      [&](void* /*arg*/) { read_async_called++; });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  Slice async_result;
+  // Simulate a seek of 4000 bytes at offset 3000. Due to the readahead
+  // settings, it will do two reads of 4000+4096 and 4096
+  Status s = fpb.PrefetchAsync(IOOptions(), r.get(), 3000, 4000, &async_result);
+  // Platforms that don't have IO uring may not support async IO
+  ASSERT_TRUE(s.IsTryAgain() || s.IsNotSupported());
+
+  ASSERT_TRUE(fpb.TryReadFromCacheAsync(IOOptions(), r.get(), /*offset=*/3000,
+                                        /*length=*/4000, &async_result, &s,
+                                        Env::IOPriority::IO_LOW));
+  // No sync call should be made.
+  HistogramData sst_read_micros;
+  stats()->histogramData(SST_READ_MICROS, &sst_read_micros);
+  ASSERT_EQ(sst_read_micros.count, 0);
+
+  // Number of async calls should be.
+  ASSERT_EQ(read_async_called, 2);
+  // Length should be 4000.
+  ASSERT_EQ(async_result.size(), 4000);
+  // Data correctness.
+  Slice result(content.c_str() + 3000, 4000);
+  ASSERT_EQ(result.size(), 4000);
+  ASSERT_EQ(result, async_result);
+}
+
 #endif  // ROCKSDB_LITE
 }  // namespace ROCKSDB_NAMESPACE