From: Akanksha Mahajan Date: Wed, 6 Apr 2022 21:26:53 +0000 (-0700) Subject: Update stats for Read and ReadAsync in random_access_file_reader for async prefetchin... X-Git-Tag: v7.1.1~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=80bea2ba8ef6e3ec349b4e8a6904b64ee9011b20;p=rocksdb.git Update stats for Read and ReadAsync in random_access_file_reader for async prefetching (#9810) Summary: Update stats in random_access_file_reader for Read and ReadAsync API to take into account the read latency for async prefetching. It also fixes ERROR_HANDLER_AUTORESUME_RETRY_COUNT stat whose value was incorrect in portal.h Pull Request resolved: https://github.com/facebook/rocksdb/pull/9810 Test Plan: Update unit test Reviewed By: anand1976 Differential Revision: D35433081 Pulled By: akankshamahajan15 fbshipit-source-id: aeec3901270e58a003ce6b5214bd25ddcb3a12a9 --- diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 9319b9973..3e9bb7156 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -694,8 +694,10 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { options.write_buffer_size = 1024; options.create_if_missing = true; options.compression = kNoCompression; + options.statistics = CreateDBStatistics(); options.env = env.get(); - if (std::get<0>(GetParam())) { + bool use_direct_io = std::get<0>(GetParam()); + if (use_direct_io) { options.use_direct_reads = true; options.use_direct_io_for_flush_and_compaction = true; } @@ -708,8 +710,7 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { options.table_factory.reset(NewBlockBasedTableFactory(table_options)); Status s = TryReopen(options); - if (std::get<0>(GetParam()) && - (s.IsNotSupported() || s.IsInvalidArgument())) { + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { // If direct IO is not supported, skip the test return; } else { @@ -766,6 +767,8 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { // TODO akanksha: Remove after adding new units. ro.async_io = true; } + + ASSERT_OK(options.statistics->Reset()); auto iter = std::unique_ptr(db_->NewIterator(ro)); int num_keys = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -773,15 +776,25 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { num_keys++; } ASSERT_EQ(num_keys, total_keys); - ASSERT_GT(buff_prefetch_count, 0); - buff_prefetch_count = 0; // For index and data blocks. if (is_adaptive_readahead) { ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1)); } else { ASSERT_EQ(readahead_carry_over_count, 0); } + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + if (ro.async_io && !use_direct_io) { + ASSERT_GT(async_read_bytes.count, 0); + } else { + ASSERT_EQ(async_read_bytes.count, 0); + } + } + SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); } @@ -902,6 +915,8 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { options.use_direct_reads = true; options.use_direct_io_for_flush_and_compaction = true; } + + options.statistics = CreateDBStatistics(); BlockBasedTableOptions table_options; std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB table_options.block_cache = cache; @@ -948,7 +963,6 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { SyncPoint::GetInstance()->EnableProcessing(); ReadOptions ro; ro.adaptive_readahead = true; - // TODO akanksha: Remove after adding new units. ro.async_io = true; { /* @@ -964,7 +978,9 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { iter->Seek(BuildKey(1019)); buff_prefetch_count = 0; } + { + ASSERT_OK(options.statistics->Reset()); // After caching, blocks will be read from cache (Sequential blocks) auto iter = std::unique_ptr(db_->NewIterator(ro)); iter->Seek(BuildKey(0)); @@ -1008,6 +1024,18 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { ASSERT_TRUE(iter->Valid()); ASSERT_EQ(current_readahead_size, expected_current_readahead_size); ASSERT_EQ(buff_prefetch_count, 2); + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + if (GetParam()) { + ASSERT_EQ(async_read_bytes.count, 0); + } else { + ASSERT_GT(async_read_bytes.count, 0); + } + } + buff_prefetch_count = 0; } Close(); @@ -1033,6 +1061,7 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) { options.create_if_missing = true; options.compression = kNoCompression; options.env = env.get(); + options.statistics = CreateDBStatistics(); if (use_direct_io) { options.use_direct_reads = true; options.use_direct_io_for_flush_and_compaction = true; @@ -1080,6 +1109,7 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) { ro.adaptive_readahead = true; ro.async_io = true; + ASSERT_OK(options.statistics->Reset()); auto iter = std::unique_ptr(db_->NewIterator(ro)); int num_keys = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -1088,7 +1118,19 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) { } ASSERT_EQ(num_keys, total_keys); ASSERT_GT(buff_prefetch_count, 0); + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); +#if defined(ROCKSDB_IOURING_PRESENT) + ASSERT_GT(async_read_bytes.count, 0); +#else + ASSERT_EQ(async_read_bytes.count, 0); +#endif + } } + SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index 0d354cfa4..a919b6298 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -425,19 +425,75 @@ IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro, } } -// TODO akanksha: Add perf_times etc. +// TODO akanksha: +// 1. Handle use_direct_io case which currently calls Read API. IOStatus RandomAccessFileReader::ReadAsync( FSReadRequest& req, const IOOptions& opts, std::function cb, void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, Env::IOPriority rate_limiter_priority) { if (use_direct_io()) { + // For direct_io, it calls Read API. req.status = Read(opts, req.offset, req.len, &(req.result), req.scratch, nullptr /*dbg*/, rate_limiter_priority); cb(req, cb_arg); return IOStatus::OK(); } - return file_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, - nullptr /*dbg*/); + + // Create a callback and populate info. + auto read_async_callback = + std::bind(&RandomAccessFileReader::ReadAsyncCallback, this, + std::placeholders::_1, std::placeholders::_2); + ReadAsyncInfo* read_async_info = new ReadAsyncInfo; + read_async_info->cb_ = cb; + read_async_info->cb_arg_ = cb_arg; + read_async_info->start_time_ = clock_->NowMicros(); + +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + read_async_info->fs_start_ts_ = FileOperationInfo::StartNow(); + } +#endif + + IOStatus s = file_->ReadAsync(req, opts, read_async_callback, read_async_info, + io_handle, del_fn, nullptr /*dbg*/); + if (!s.ok()) { + delete read_async_info; + } + return s; +} + +void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req, + void* cb_arg) { + ReadAsyncInfo* read_async_info = static_cast(cb_arg); + assert(read_async_info); + assert(read_async_info->cb_); + + read_async_info->cb_(req, read_async_info->cb_arg_); + + // Update stats and notify listeners. + if (stats_ != nullptr && file_read_hist_ != nullptr) { + // elapsed doesn't take into account delay and overwrite as StopWatch does + // in Read. + uint64_t elapsed = clock_->NowMicros() - read_async_info->start_time_; + file_read_hist_->Add(elapsed); + } + if (req.status.ok()) { + RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size()); + } +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = FileOperationInfo::FinishNow(); + NotifyOnFileReadFinish(req.offset, req.result.size(), + read_async_info->fs_start_ts_, finish_ts, + req.status); + } + if (!req.status.ok()) { + NotifyOnIOError(req.status, FileOperationType::kRead, file_name(), + req.result.size(), req.offset); + } +#endif + RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size()); + delete read_async_info; } } // namespace ROCKSDB_NAMESPACE diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index 26e48478d..55c156f40 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -92,6 +92,15 @@ class RandomAccessFileReader { const Temperature file_temperature_; const bool is_last_level_; + struct ReadAsyncInfo { +#ifndef ROCKSDB_LITE + FileOperationInfo::StartTimePoint fs_start_ts_; +#endif + uint64_t start_time_; + std::function cb_; + void* cb_arg_; + }; + public: explicit RandomAccessFileReader( std::unique_ptr&& raf, const std::string& _file_name, @@ -179,5 +188,7 @@ class RandomAccessFileReader { std::function cb, void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, Env::IOPriority rate_limiter_priority); + + void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg); }; } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 9d3e89009..3c3462b80 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -534,6 +534,8 @@ enum Histograms : uint32_t { // Error handler statistics ERROR_HANDLER_AUTORESUME_RETRY_COUNT, + ASYNC_READ_BYTES, + HISTOGRAM_ENUM_MAX, }; diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index 45abe7c6d..7af0c43ec 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -5551,7 +5551,9 @@ class HistogramTypeJni { case ROCKSDB_NAMESPACE::Histograms::NUM_SST_READ_PER_LEVEL: return 0x31; case ROCKSDB_NAMESPACE::Histograms::ERROR_HANDLER_AUTORESUME_RETRY_COUNT: - return 0x31; + return 0x32; + case ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES: + return 0x33; case ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX: // 0x1F for backwards compatibility on current minor version. return 0x1F; @@ -5669,6 +5671,8 @@ class HistogramTypeJni { case 0x32: return ROCKSDB_NAMESPACE::Histograms:: ERROR_HANDLER_AUTORESUME_RETRY_COUNT; + case 0x33: + return ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES; case 0x1F: // 0x1F for backwards compatibility on current minor version. return ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX; diff --git a/java/src/main/java/org/rocksdb/HistogramType.java b/java/src/main/java/org/rocksdb/HistogramType.java index 5953a7d9b..d5f7da5e0 100644 --- a/java/src/main/java/org/rocksdb/HistogramType.java +++ b/java/src/main/java/org/rocksdb/HistogramType.java @@ -180,6 +180,8 @@ public enum HistogramType { */ ERROR_HANDLER_AUTORESUME_RETRY_COUNT((byte) 0x32), + ASYNC_READ_BYTES((byte) 0x33), + // 0x1F for backwards compatibility on current minor version. HISTOGRAM_ENUM_MAX((byte) 0x1F); diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index 4f361a739..7988fe07c 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -283,6 +283,7 @@ const std::vector> HistogramsNameMap = { {NUM_SST_READ_PER_LEVEL, "rocksdb.num.sst.read.per.level"}, {ERROR_HANDLER_AUTORESUME_RETRY_COUNT, "rocksdb.error.handler.autoresume.retry.count"}, + {ASYNC_READ_BYTES, "rocksdb.async.read.bytes"}, }; std::shared_ptr CreateDBStatistics() {