]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Add few optimizations in async_io for short scans (#10140)
authorAkanksha Mahajan <akankshamahajan@fb.com>
Thu, 16 Jun 2022 03:17:35 +0000 (20:17 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Thu, 16 Jun 2022 03:17:35 +0000 (20:17 -0700)
Summary:
This PR adds few optimizations for async_io for shorter scans.
1.  If async_io is enabled, seek would create FilePrefetchBuffer object to fetch the data asynchronously. However `FilePrefetchbuffer::num_file_reads_` wasn't taken into consideration if it calls Next after Seek and would go for Prefetching.  This PR fixes that and Next will go for prefetching only if `FilePrefetchbuffer::num_file_reads_` is greater than 2 along with if blocks are sequential. This scenario is only for implicit auto readahead.
2. For seek, when it calls TryReadFromCacheAsync to poll it makes async call as well because TryReadFromCacheAsync flow wasn't changed. So I updated to return after poll instead of further prefetching any data.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10140

Test Plan:
1. Added a unit test
                  2. Ran crash_test with async_io = 1 to make sure nothing crashes.

Reviewed By: anand1976

Differential Revision: D37042242

Pulled By: akankshamahajan15

fbshipit-source-id: b8e6b7cb2ee0886f37a8f53951948b9084e8ffda

file/file_prefetch_buffer.cc
file/file_prefetch_buffer.h
file/prefetch_test.cc
table/block_based/block_based_table_iterator.cc
table/block_based/block_based_table_reader.h
table/block_based/block_prefetcher.cc
table/block_based/block_prefetcher.h
table/block_based/partitioned_filter_block.cc
table/block_based/partitioned_index_iterator.cc
table/block_based/partitioned_index_reader.cc

index 135cdd2e18680d972f7dae4cb2ba04cb4462183b..9e97845c310aae6d941cf61645b17ced2896d026 100644 (file)
@@ -282,8 +282,17 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(
           bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) {
     offset += length;
     length = 0;
+
+    // Since async request was submitted directly by calling PrefetchAsync in
+    // last call, we don't need to prefetch further as this call is to poll the
+    // data submitted in previous call.
+    if (async_request_submitted_) {
+      return Status::OK();
+    }
   }
 
+  async_request_submitted_ = false;
+
   Status s;
   size_t prefetch_size = length + readahead_size;
   size_t alignment = reader->file()->GetRequiredBufferAlignment();
@@ -442,14 +451,10 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
   return true;
 }
 
-// TODO akanksha: Merge this function with TryReadFromCache once async
-// functionality is stable.
 bool FilePrefetchBuffer::TryReadFromCacheAsync(
     const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
     size_t n, Slice* result, Status* status,
     Env::IOPriority rate_limiter_priority) {
-  assert(async_io_);
-
   if (track_min_offset_ && offset < min_offset_read_) {
     min_offset_read_ = static_cast<size_t>(offset);
   }
@@ -503,7 +508,7 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync(
 #endif
         return false;
       }
-      prefetched = true;
+      prefetched = async_request_submitted_ ? false : true;
     } else {
       return false;
     }
@@ -519,6 +524,7 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync(
   if (prefetched) {
     readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
   }
+  async_request_submitted_ = false;
   return true;
 }
 
@@ -635,6 +641,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
   // the data. It will return without polling if blocks are not sequential.
   UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false);
   prev_len_ = 0;
+  async_request_submitted_ = true;
 
   return Status::TryAgain();
 }
index 6d32c8314530ad0f67083afeba61f41cf70f40bf..04303aa3053656a25beb0cad6442b25da9b30669 100644 (file)
@@ -65,7 +65,7 @@ class FilePrefetchBuffer {
   FilePrefetchBuffer(size_t readahead_size = 0, size_t max_readahead_size = 0,
                      bool enable = true, bool track_min_offset = false,
                      bool implicit_auto_readahead = false,
-                     bool async_io = false, FileSystem* fs = nullptr,
+                     uint64_t num_file_reads = 0, FileSystem* fs = nullptr,
                      SystemClock* clock = nullptr, Statistics* stats = nullptr)
       : curr_(0),
         readahead_size_(readahead_size),
@@ -77,19 +77,20 @@ class FilePrefetchBuffer {
         implicit_auto_readahead_(implicit_auto_readahead),
         prev_offset_(0),
         prev_len_(0),
-        num_file_reads_(kMinNumFileReadsToStartAutoReadahead + 1),
+        num_file_reads_(num_file_reads),
         io_handle_(nullptr),
         del_fn_(nullptr),
         async_read_in_progress_(false),
-        async_io_(async_io),
+        async_request_submitted_(false),
         fs_(fs),
         clock_(clock),
         stats_(stats) {
+    assert((num_file_reads_ >= kMinNumFileReadsToStartAutoReadahead + 1) ||
+           (num_file_reads_ == 0));
     // If async_io_ is enabled, data is asynchronously filled in second buffer
     // while curr_ is being consumed. If data is overlapping in two buffers,
     // data is copied to third buffer to return continuous buffer.
     bufs_.resize(3);
-    (void)async_io_;
   }
 
   ~FilePrefetchBuffer() {
@@ -262,6 +263,7 @@ class FilePrefetchBuffer {
     readahead_size_ = initial_auto_readahead_size_;
   }
 
+  // Called in case of implicit auto prefetching.
   bool IsEligibleForPrefetch(uint64_t offset, size_t n) {
     // Prefetch only if this read is sequential otherwise reset readahead_size_
     // to initial value.
@@ -271,6 +273,13 @@ class FilePrefetchBuffer {
       return false;
     }
     num_file_reads_++;
+
+    // Since async request was submitted in last call directly by calling
+    // PrefetchAsync, it skips num_file_reads_ check as this call is to poll the
+    // data submitted in previous call.
+    if (async_request_submitted_) {
+      return true;
+    }
     if (num_file_reads_ <= kMinNumFileReadsToStartAutoReadahead) {
       UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/);
       return false;
@@ -301,14 +310,21 @@ class FilePrefetchBuffer {
   bool implicit_auto_readahead_;
   uint64_t prev_offset_;
   size_t prev_len_;
-  int64_t num_file_reads_;
+  // num_file_reads_ is only used when implicit_auto_readahead_ is set.
+  uint64_t num_file_reads_;
 
   // io_handle_ is allocated and used by underlying file system in case of
   // asynchronous reads.
   void* io_handle_;
   IOHandleDeleter del_fn_;
   bool async_read_in_progress_;
-  bool async_io_;
+
+  // If async_request_submitted_ is set then it indicates RocksDB called
+  // PrefetchAsync to submit request. It needs to TryReadFromCacheAsync to poll
+  // the submitted request without checking if data is sequential and
+  // num_file_reads_.
+  bool async_request_submitted_;
+
   FileSystem* fs_;
   SystemClock* clock_;
   Statistics* stats_;
index de896a99e4d0314a938457940d188db442f157ff..88e74cc35abd541c0317d49851096e32bc3a6ba0 100644 (file)
@@ -1157,9 +1157,8 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
   size_t expected_current_readahead_size = 8 * 1024;
   size_t decrease_readahead_size = 8 * 1024;
 
-  SyncPoint::GetInstance()->SetCallBack(
-      "FilePrefetchBuffer::PrefetchAsyncInternal:Start",
-      [&](void*) { buff_prefetch_count++; });
+  SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
+                                        [&](void*) { buff_prefetch_count++; });
   SyncPoint::GetInstance()->SetCallBack(
       "FilePrefetchBuffer::TryReadFromCache", [&](void* arg) {
         current_readahead_size = *reinterpret_cast<size_t*>(arg);
@@ -1168,7 +1167,7 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
   SyncPoint::GetInstance()->EnableProcessing();
   ReadOptions ro;
   ro.adaptive_readahead = true;
-  ro.async_io = true;
+  // ro.async_io = true;
   {
     /*
      * Reseek keys from sequential Data Blocks within same partitioned
@@ -1200,17 +1199,15 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
              ? (expected_current_readahead_size - decrease_readahead_size)
              : 0));
 
-    iter->Seek(BuildKey(1000));  // Prefetch the block.
+    iter->Seek(BuildKey(1000));  // Won't prefetch the block.
     ASSERT_TRUE(iter->Valid());
     ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
-    expected_current_readahead_size *= 2;
 
     iter->Seek(BuildKey(1004));  // Prefetch the block.
     ASSERT_TRUE(iter->Valid());
     ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
     expected_current_readahead_size *= 2;
 
-    // 1011 is already in cache but won't reset??
     iter->Seek(BuildKey(1011));
     ASSERT_TRUE(iter->Valid());
 
@@ -1244,7 +1241,101 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
     iter->Seek(BuildKey(1022));
     ASSERT_TRUE(iter->Valid());
     ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
-    ASSERT_EQ(buff_prefetch_count, 3);
+    ASSERT_EQ(buff_prefetch_count, 2);
+
+    buff_prefetch_count = 0;
+  }
+  Close();
+}
+
+TEST_P(PrefetchTest2, SeekParallelizationTest) {
+  const int kNumKeys = 2000;
+  // Set options
+  std::shared_ptr<MockFS> fs =
+      std::make_shared<MockFS>(env_->GetFileSystem(), false);
+  std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
+
+  Options options = CurrentOptions();
+  options.write_buffer_size = 1024;
+  options.create_if_missing = true;
+  options.compression = kNoCompression;
+  options.env = env.get();
+  if (GetParam()) {
+    options.use_direct_reads = true;
+    options.use_direct_io_for_flush_and_compaction = true;
+  }
+
+  options.statistics = CreateDBStatistics();
+  BlockBasedTableOptions table_options;
+  table_options.no_block_cache = true;
+  table_options.cache_index_and_filter_blocks = false;
+  table_options.metadata_block_size = 1024;
+  table_options.index_type =
+      BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+  options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+
+  Status s = TryReopen(options);
+  if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) {
+    // If direct IO is not supported, skip the test
+    return;
+  } else {
+    ASSERT_OK(s);
+  }
+
+  WriteBatch batch;
+  Random rnd(309);
+  for (int i = 0; i < kNumKeys; i++) {
+    ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
+  }
+  ASSERT_OK(db_->Write(WriteOptions(), &batch));
+
+  std::string start_key = BuildKey(0);
+  std::string end_key = BuildKey(kNumKeys - 1);
+  Slice least(start_key.data(), start_key.size());
+  Slice greatest(end_key.data(), end_key.size());
+
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
+
+  int buff_prefetch_count = 0;
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "FilePrefetchBuffer::PrefetchAsyncInternal:Start",
+      [&](void*) { buff_prefetch_count++; });
+
+  SyncPoint::GetInstance()->EnableProcessing();
+  ReadOptions ro;
+  ro.adaptive_readahead = true;
+  ro.async_io = true;
+
+  {
+    ASSERT_OK(options.statistics->Reset());
+    // Each block contains around 4 keys.
+    auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
+    iter->Seek(BuildKey(0));  // Prefetch data because of seek parallelization.
+    ASSERT_TRUE(iter->Valid());
+    iter->Next();
+    ASSERT_TRUE(iter->Valid());
+    iter->Next();
+    ASSERT_TRUE(iter->Valid());
+    iter->Next();
+    ASSERT_TRUE(iter->Valid());
+
+    // New data block. Since num_file_reads in FilePrefetch after this read is
+    // 2, it won't go for prefetching.
+    iter->Next();
+    ASSERT_TRUE(iter->Valid());
+    iter->Next();
+    ASSERT_TRUE(iter->Valid());
+    iter->Next();
+    ASSERT_TRUE(iter->Valid());
+    iter->Next();
+    ASSERT_TRUE(iter->Valid());
+
+    // Prefetch data.
+    iter->Next();
+    ASSERT_TRUE(iter->Valid());
+
+    ASSERT_EQ(buff_prefetch_count, 2);
 
     // Check stats to make sure async prefetch is done.
     {
index 52734f7ca08125be8e00d01c06df46bf8ad85b1e..0e7e378b75835ce84af2b19838d328891a850eaa 100644 (file)
@@ -257,7 +257,7 @@ void BlockBasedTableIterator::InitDataBlock() {
     //   Enabled from the very first IO when ReadOptions.readahead_size is set.
     block_prefetcher_.PrefetchIfNeeded(
         rep, data_block_handle, read_options_.readahead_size, is_for_compaction,
-        read_options_.async_io, read_options_.rate_limiter_priority);
+        /*async_io=*/false, read_options_.rate_limiter_priority);
     Status s;
     table_->NewDataBlockIterator<DataBlockIter>(
         read_options_, data_block_handle, &block_iter_, BlockType::kData,
index 2d624cd00bfbe216275e21a49aeba83da39d82e3..692e519d0fa8382cd0fea8529603a2a523cfe8a8 100644 (file)
@@ -662,21 +662,21 @@ struct BlockBasedTable::Rep {
                                 size_t max_readahead_size,
                                 std::unique_ptr<FilePrefetchBuffer>* fpb,
                                 bool implicit_auto_readahead,
-                                bool async_io) const {
+                                uint64_t num_file_reads) const {
     fpb->reset(new FilePrefetchBuffer(
         readahead_size, max_readahead_size,
         !ioptions.allow_mmap_reads /* enable */, false /* track_min_offset */,
-        implicit_auto_readahead, async_io, ioptions.fs.get(), ioptions.clock,
-        ioptions.stats));
+        implicit_auto_readahead, num_file_reads, ioptions.fs.get(),
+        ioptions.clock, ioptions.stats));
   }
 
   void CreateFilePrefetchBufferIfNotExists(
       size_t readahead_size, size_t max_readahead_size,
       std::unique_ptr<FilePrefetchBuffer>* fpb, bool implicit_auto_readahead,
-      bool async_io) const {
+      uint64_t num_file_reads) const {
     if (!(*fpb)) {
       CreateFilePrefetchBuffer(readahead_size, max_readahead_size, fpb,
-                               implicit_auto_readahead, async_io);
+                               implicit_auto_readahead, num_file_reads);
     }
   }
 
index 1f08c161bd42ec7005dbacdfef72a873a4e13938..981e2043c5a4e15c7cd34b5da72b8100a5d92bb2 100644 (file)
@@ -16,17 +16,21 @@ void BlockPrefetcher::PrefetchIfNeeded(
     const BlockBasedTable::Rep* rep, const BlockHandle& handle,
     const size_t readahead_size, bool is_for_compaction, const bool async_io,
     const Env::IOPriority rate_limiter_priority) {
+  // num_file_reads is used  by FilePrefetchBuffer only when
+  // implicit_auto_readahead is set.
   if (is_for_compaction) {
     rep->CreateFilePrefetchBufferIfNotExists(
         compaction_readahead_size_, compaction_readahead_size_,
-        &prefetch_buffer_, false, async_io);
+        &prefetch_buffer_, /*implicit_auto_readahead=*/false,
+        /*num_file_reads=*/0);
     return;
   }
 
   // Explicit user requested readahead.
   if (readahead_size > 0) {
     rep->CreateFilePrefetchBufferIfNotExists(
-        readahead_size, readahead_size, &prefetch_buffer_, false, async_io);
+        readahead_size, readahead_size, &prefetch_buffer_,
+        /*implicit_auto_readahead=*/false, /*num_file_reads=*/0);
     return;
   }
 
@@ -39,11 +43,13 @@ void BlockPrefetcher::PrefetchIfNeeded(
     return;
   }
 
-  // In case of async_io, it always creates the PrefetchBuffer.
+  // In case of async_io, always creates the PrefetchBuffer irrespective of
+  // num_file_reads_.
   if (async_io) {
     rep->CreateFilePrefetchBufferIfNotExists(
         initial_auto_readahead_size_, max_auto_readahead_size,
-        &prefetch_buffer_, /*implicit_auto_readahead=*/true, async_io);
+        &prefetch_buffer_, /*implicit_auto_readahead=*/true,
+        /*num_file_reads=*/0);
     return;
   }
 
@@ -78,9 +84,9 @@ void BlockPrefetcher::PrefetchIfNeeded(
   }
 
   if (rep->file->use_direct_io()) {
-    rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size_,
-                                             max_auto_readahead_size,
-                                             &prefetch_buffer_, true, async_io);
+    rep->CreateFilePrefetchBufferIfNotExists(
+        initial_auto_readahead_size_, max_auto_readahead_size,
+        &prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_);
     return;
   }
 
@@ -96,9 +102,9 @@ void BlockPrefetcher::PrefetchIfNeeded(
       BlockBasedTable::BlockSizeWithTrailer(handle) + readahead_size_,
       rate_limiter_priority);
   if (s.IsNotSupported()) {
-    rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size_,
-                                             max_auto_readahead_size,
-                                             &prefetch_buffer_, true, async_io);
+    rep->CreateFilePrefetchBufferIfNotExists(
+        initial_auto_readahead_size_, max_auto_readahead_size,
+        &prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_);
     return;
   }
 
index 285903511d2dba1d53e21010d99dbfbc4ab4adec..4fae7f0bb806fcf773899953ef83fc4994326ead 100644 (file)
@@ -63,7 +63,7 @@ class BlockPrefetcher {
   // initial_auto_readahead_size_ is used if RocksDB uses internal prefetch
   // buffer.
   uint64_t initial_auto_readahead_size_;
-  int64_t num_file_reads_ = 0;
+  uint64_t num_file_reads_ = 0;
   uint64_t prev_offset_ = 0;
   size_t prev_len_ = 0;
   std::unique_ptr<FilePrefetchBuffer> prefetch_buffer_;
index 73b8c5716a08934119207efa416b6eb185a0b95d..6fc6ddd1e1016324342cbe0e392fca5647149858 100644 (file)
@@ -503,7 +503,7 @@ Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro,
   std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
   rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer,
                                 false /* Implicit autoreadahead */,
-                                false /*async_io*/);
+                                0 /*num_reads_*/);
 
   IOOptions opts;
   s = rep->file->PrepareIOOptions(ro, opts);
index 9e8c06268d84f68909d26621233de5ab90ac09a7..94a0231333fea3b3eed12d97e05ea9db486d65a3 100644 (file)
@@ -89,10 +89,10 @@ void PartitionedIndexIterator::InitPartitionedIndexBlock() {
     //   Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0.
     // Explicit user requested readahead:
     //   Enabled from the very first IO when ReadOptions.readahead_size is set.
-    block_prefetcher_.PrefetchIfNeeded(
-        rep, partitioned_index_handle, read_options_.readahead_size,
-        is_for_compaction, read_options_.async_io,
-        read_options_.rate_limiter_priority);
+    block_prefetcher_.PrefetchIfNeeded(rep, partitioned_index_handle,
+                                       read_options_.readahead_size,
+                                       is_for_compaction, /*async_io=*/false,
+                                       read_options_.rate_limiter_priority);
     Status s;
     table_->NewDataBlockIterator<IndexBlockIter>(
         read_options_, partitioned_index_handle, &block_iter_,
index 14e3f7484d83f3ee46db1c96bcfd6eaed4a5180b..53d892097877b207338c6bed6b07ce17ba068272 100644 (file)
@@ -158,7 +158,7 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro,
   std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
   rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer,
                                 false /*Implicit auto readahead*/,
-                                false /*async_io*/);
+                                0 /*num_reads_*/);
   IOOptions opts;
   {
     Status s = rep->file->PrepareIOOptions(ro, opts);