]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Update stats for Read and ReadAsync in random_access_file_reader for async prefetchin...
authorAkanksha Mahajan <akankshamahajan@fb.com>
Wed, 6 Apr 2022 21:26:53 +0000 (14:26 -0700)
committerakankshamahajan <akankshamahajan@fb.com>
Thu, 7 Apr 2022 16:15:37 +0000 (09:15 -0700)
Summary:
Update stats in random_access_file_reader for Read and
ReadAsync API to take into account the read latency for async
prefetching.

It also fixes ERROR_HANDLER_AUTORESUME_RETRY_COUNT stat whose value was
incorrect in portal.h

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9810

Test Plan: Update unit test

Reviewed By: anand1976

Differential Revision: D35433081

Pulled By: akankshamahajan15

fbshipit-source-id: aeec3901270e58a003ce6b5214bd25ddcb3a12a9

file/prefetch_test.cc
file/random_access_file_reader.cc
file/random_access_file_reader.h
include/rocksdb/statistics.h
java/rocksjni/portal.h
java/src/main/java/org/rocksdb/HistogramType.java
monitoring/statistics.cc

index 9319b99736c86837f3d2d4a5ed3fece188641efe..3e9bb71562f296688cdff155d0cb88f6a39f783b 100644 (file)
@@ -694,8 +694,10 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
   options.write_buffer_size = 1024;
   options.create_if_missing = true;
   options.compression = kNoCompression;
+  options.statistics = CreateDBStatistics();
   options.env = env.get();
-  if (std::get<0>(GetParam())) {
+  bool use_direct_io = std::get<0>(GetParam());
+  if (use_direct_io) {
     options.use_direct_reads = true;
     options.use_direct_io_for_flush_and_compaction = true;
   }
@@ -708,8 +710,7 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
 
   Status s = TryReopen(options);
-  if (std::get<0>(GetParam()) &&
-      (s.IsNotSupported() || s.IsInvalidArgument())) {
+  if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
     // If direct IO is not supported, skip the test
     return;
   } else {
@@ -766,6 +767,8 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
       // TODO akanksha: Remove after adding new units.
       ro.async_io = true;
     }
+
+    ASSERT_OK(options.statistics->Reset());
     auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
     int num_keys = 0;
     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
@@ -773,15 +776,25 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
       num_keys++;
     }
     ASSERT_EQ(num_keys, total_keys);
-
     ASSERT_GT(buff_prefetch_count, 0);
-    buff_prefetch_count = 0;
     // For index and data blocks.
     if (is_adaptive_readahead) {
       ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1));
     } else {
       ASSERT_EQ(readahead_carry_over_count, 0);
     }
+
+    // Check stats to make sure async prefetch is done.
+    {
+      HistogramData async_read_bytes;
+      options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
+      if (ro.async_io && !use_direct_io) {
+        ASSERT_GT(async_read_bytes.count, 0);
+      } else {
+        ASSERT_EQ(async_read_bytes.count, 0);
+      }
+    }
+
     SyncPoint::GetInstance()->DisableProcessing();
     SyncPoint::GetInstance()->ClearAllCallBacks();
   }
@@ -902,6 +915,8 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
     options.use_direct_reads = true;
     options.use_direct_io_for_flush_and_compaction = true;
   }
+
+  options.statistics = CreateDBStatistics();
   BlockBasedTableOptions table_options;
   std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);  // 8MB
   table_options.block_cache = cache;
@@ -948,7 +963,6 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
   SyncPoint::GetInstance()->EnableProcessing();
   ReadOptions ro;
   ro.adaptive_readahead = true;
-  // TODO akanksha: Remove after adding new units.
   ro.async_io = true;
   {
     /*
@@ -964,7 +978,9 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
     iter->Seek(BuildKey(1019));
     buff_prefetch_count = 0;
   }
+
   {
+    ASSERT_OK(options.statistics->Reset());
     // After caching, blocks will be read from cache (Sequential blocks)
     auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
     iter->Seek(BuildKey(0));
@@ -1008,6 +1024,18 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
     ASSERT_TRUE(iter->Valid());
     ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
     ASSERT_EQ(buff_prefetch_count, 2);
+
+    // Check stats to make sure async prefetch is done.
+    {
+      HistogramData async_read_bytes;
+      options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
+      if (GetParam()) {
+        ASSERT_EQ(async_read_bytes.count, 0);
+      } else {
+        ASSERT_GT(async_read_bytes.count, 0);
+      }
+    }
+
     buff_prefetch_count = 0;
   }
   Close();
@@ -1033,6 +1061,7 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) {
   options.create_if_missing = true;
   options.compression = kNoCompression;
   options.env = env.get();
+  options.statistics = CreateDBStatistics();
   if (use_direct_io) {
     options.use_direct_reads = true;
     options.use_direct_io_for_flush_and_compaction = true;
@@ -1080,6 +1109,7 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) {
     ro.adaptive_readahead = true;
     ro.async_io = true;
 
+    ASSERT_OK(options.statistics->Reset());
     auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
     int num_keys = 0;
     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
@@ -1088,7 +1118,19 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) {
     }
     ASSERT_EQ(num_keys, total_keys);
     ASSERT_GT(buff_prefetch_count, 0);
+
+    // Check stats to make sure async prefetch is done.
+    {
+      HistogramData async_read_bytes;
+      options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
+#if defined(ROCKSDB_IOURING_PRESENT)
+      ASSERT_GT(async_read_bytes.count, 0);
+#else
+      ASSERT_EQ(async_read_bytes.count, 0);
+#endif
+    }
   }
+
   SyncPoint::GetInstance()->DisableProcessing();
   SyncPoint::GetInstance()->ClearAllCallBacks();
 
index 0d354cfa453b5ee3ab04416c32a0ff76060e3659..a919b6298eef871be43f18699b4bf094308fa49c 100644 (file)
@@ -425,19 +425,75 @@ IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro,
   }
 }
 
-// TODO akanksha: Add perf_times etc.
+// TODO akanksha:
+// 1. Handle use_direct_io case which currently calls Read API.
 IOStatus RandomAccessFileReader::ReadAsync(
     FSReadRequest& req, const IOOptions& opts,
     std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
     void** io_handle, IOHandleDeleter* del_fn,
     Env::IOPriority rate_limiter_priority) {
   if (use_direct_io()) {
+    // For direct_io, it calls Read API.
     req.status = Read(opts, req.offset, req.len, &(req.result), req.scratch,
                       nullptr /*dbg*/, rate_limiter_priority);
     cb(req, cb_arg);
     return IOStatus::OK();
   }
-  return file_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn,
-                          nullptr /*dbg*/);
+
+  // Create a callback and populate info.
+  auto read_async_callback =
+      std::bind(&RandomAccessFileReader::ReadAsyncCallback, this,
+                std::placeholders::_1, std::placeholders::_2);
+  ReadAsyncInfo* read_async_info = new ReadAsyncInfo;
+  read_async_info->cb_ = cb;
+  read_async_info->cb_arg_ = cb_arg;
+  read_async_info->start_time_ = clock_->NowMicros();
+
+#ifndef ROCKSDB_LITE
+  if (ShouldNotifyListeners()) {
+    read_async_info->fs_start_ts_ = FileOperationInfo::StartNow();
+  }
+#endif
+
+  IOStatus s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
+                                io_handle, del_fn, nullptr /*dbg*/);
+  if (!s.ok()) {
+    delete read_async_info;
+  }
+  return s;
+}
+
+void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req,
+                                               void* cb_arg) {
+  ReadAsyncInfo* read_async_info = static_cast<ReadAsyncInfo*>(cb_arg);
+  assert(read_async_info);
+  assert(read_async_info->cb_);
+
+  read_async_info->cb_(req, read_async_info->cb_arg_);
+
+  // Update stats and notify listeners.
+  if (stats_ != nullptr && file_read_hist_ != nullptr) {
+    // elapsed doesn't take into account delay and overwrite as StopWatch does
+    // in Read.
+    uint64_t elapsed = clock_->NowMicros() - read_async_info->start_time_;
+    file_read_hist_->Add(elapsed);
+  }
+  if (req.status.ok()) {
+    RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size());
+  }
+#ifndef ROCKSDB_LITE
+  if (ShouldNotifyListeners()) {
+    auto finish_ts = FileOperationInfo::FinishNow();
+    NotifyOnFileReadFinish(req.offset, req.result.size(),
+                           read_async_info->fs_start_ts_, finish_ts,
+                           req.status);
+  }
+  if (!req.status.ok()) {
+    NotifyOnIOError(req.status, FileOperationType::kRead, file_name(),
+                    req.result.size(), req.offset);
+  }
+#endif
+  RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size());
+  delete read_async_info;
 }
 }  // namespace ROCKSDB_NAMESPACE
index 26e48478d8ef0712e7f97608c71f9179e1a644d4..55c156f402d625d28cf2702839e944caa35a7928 100644 (file)
@@ -92,6 +92,15 @@ class RandomAccessFileReader {
   const Temperature file_temperature_;
   const bool is_last_level_;
 
+  struct ReadAsyncInfo {
+#ifndef ROCKSDB_LITE
+    FileOperationInfo::StartTimePoint fs_start_ts_;
+#endif
+    uint64_t start_time_;
+    std::function<void(const FSReadRequest&, void*)> cb_;
+    void* cb_arg_;
+  };
+
  public:
   explicit RandomAccessFileReader(
       std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name,
@@ -179,5 +188,7 @@ class RandomAccessFileReader {
                      std::function<void(const FSReadRequest&, void*)> cb,
                      void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
                      Env::IOPriority rate_limiter_priority);
+
+  void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg);
 };
 }  // namespace ROCKSDB_NAMESPACE
index 9d3e8900999d1041604e070024380d2abc241802..3c3462b802d5376f28b489e79691266340ea6530 100644 (file)
@@ -534,6 +534,8 @@ enum Histograms : uint32_t {
   // Error handler statistics
   ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
 
+  ASYNC_READ_BYTES,
+
   HISTOGRAM_ENUM_MAX,
 };
 
index 45abe7c6d725203e1a336ee2318d1a94df870140..7af0c43ec8f8c5c179d48ce79c5ba9b0a710c142 100644 (file)
@@ -5551,7 +5551,9 @@ class HistogramTypeJni {
       case ROCKSDB_NAMESPACE::Histograms::NUM_SST_READ_PER_LEVEL:
         return 0x31;
       case ROCKSDB_NAMESPACE::Histograms::ERROR_HANDLER_AUTORESUME_RETRY_COUNT:
-        return 0x31;
+        return 0x32;
+      case ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES:
+        return 0x33;
       case ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX:
         // 0x1F for backwards compatibility on current minor version.
         return 0x1F;
@@ -5669,6 +5671,8 @@ class HistogramTypeJni {
       case 0x32:
         return ROCKSDB_NAMESPACE::Histograms::
             ERROR_HANDLER_AUTORESUME_RETRY_COUNT;
+      case 0x33:
+        return ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES;
       case 0x1F:
         // 0x1F for backwards compatibility on current minor version.
         return ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX;
index 5953a7d9bdd042aae572ec5a9e4b3be9d73f364f..d5f7da5e03c89ee58187dd32bcf694ed287d781f 100644 (file)
@@ -180,6 +180,8 @@ public enum HistogramType {
    */
   ERROR_HANDLER_AUTORESUME_RETRY_COUNT((byte) 0x32),
 
+  ASYNC_READ_BYTES((byte) 0x33),
+
   // 0x1F for backwards compatibility on current minor version.
   HISTOGRAM_ENUM_MAX((byte) 0x1F);
 
index 4f361a739579723c139a01a21c960f15157b49e6..7988fe07cb308f4c3c41fda123fad2044dd41ec8 100644 (file)
@@ -283,6 +283,7 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
     {NUM_SST_READ_PER_LEVEL, "rocksdb.num.sst.read.per.level"},
     {ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
      "rocksdb.error.handler.autoresume.retry.count"},
+    {ASYNC_READ_BYTES, "rocksdb.async.read.bytes"},
 };
 
 std::shared_ptr<Statistics> CreateDBStatistics() {