options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
+ options.statistics = CreateDBStatistics();
options.env = env.get();
- if (std::get<0>(GetParam())) {
+ bool use_direct_io = std::get<0>(GetParam());
+ if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
- if (std::get<0>(GetParam()) &&
- (s.IsNotSupported() || s.IsInvalidArgument())) {
+ if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
// TODO akanksha: Remove after adding new units.
ro.async_io = true;
}
+
+ ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
num_keys++;
}
ASSERT_EQ(num_keys, total_keys);
-
ASSERT_GT(buff_prefetch_count, 0);
- buff_prefetch_count = 0;
// For index and data blocks.
if (is_adaptive_readahead) {
ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1));
} else {
ASSERT_EQ(readahead_carry_over_count, 0);
}
+
+ // Check stats to make sure async prefetch is done.
+ {
+ HistogramData async_read_bytes;
+ options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
+ if (ro.async_io && !use_direct_io) {
+ ASSERT_GT(async_read_bytes.count, 0);
+ } else {
+ ASSERT_EQ(async_read_bytes.count, 0);
+ }
+ }
+
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
+
+ options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB
table_options.block_cache = cache;
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
ro.adaptive_readahead = true;
- // TODO akanksha: Remove after adding new units.
ro.async_io = true;
{
/*
iter->Seek(BuildKey(1019));
buff_prefetch_count = 0;
}
+
{
+ ASSERT_OK(options.statistics->Reset());
// After caching, blocks will be read from cache (Sequential blocks)
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
iter->Seek(BuildKey(0));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
ASSERT_EQ(buff_prefetch_count, 2);
+
+ // Check stats to make sure async prefetch is done.
+ {
+ HistogramData async_read_bytes;
+ options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
+ if (GetParam()) {
+ ASSERT_EQ(async_read_bytes.count, 0);
+ } else {
+ ASSERT_GT(async_read_bytes.count, 0);
+ }
+ }
+
buff_prefetch_count = 0;
}
Close();
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
+ options.statistics = CreateDBStatistics();
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
ro.adaptive_readahead = true;
ro.async_io = true;
+ ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
}
ASSERT_EQ(num_keys, total_keys);
ASSERT_GT(buff_prefetch_count, 0);
+
+ // Check stats to make sure async prefetch is done.
+ {
+ HistogramData async_read_bytes;
+ options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
+#if defined(ROCKSDB_IOURING_PRESENT)
+ ASSERT_GT(async_read_bytes.count, 0);
+#else
+ ASSERT_EQ(async_read_bytes.count, 0);
+#endif
+ }
}
+
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
}
-// TODO akanksha: Add perf_times etc.
+// TODO akanksha:
+// 1. Handle use_direct_io case which currently calls Read API.
IOStatus RandomAccessFileReader::ReadAsync(
FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
void** io_handle, IOHandleDeleter* del_fn,
Env::IOPriority rate_limiter_priority) {
if (use_direct_io()) {
+ // For direct_io, it calls Read API.
req.status = Read(opts, req.offset, req.len, &(req.result), req.scratch,
nullptr /*dbg*/, rate_limiter_priority);
cb(req, cb_arg);
return IOStatus::OK();
}
- return file_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn,
- nullptr /*dbg*/);
+
+ // Create a callback and populate info.
+ auto read_async_callback =
+ std::bind(&RandomAccessFileReader::ReadAsyncCallback, this,
+ std::placeholders::_1, std::placeholders::_2);
+ ReadAsyncInfo* read_async_info = new ReadAsyncInfo;
+ read_async_info->cb_ = cb;
+ read_async_info->cb_arg_ = cb_arg;
+ read_async_info->start_time_ = clock_->NowMicros();
+
+#ifndef ROCKSDB_LITE
+ if (ShouldNotifyListeners()) {
+ read_async_info->fs_start_ts_ = FileOperationInfo::StartNow();
+ }
+#endif
+
+ IOStatus s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
+ io_handle, del_fn, nullptr /*dbg*/);
+ if (!s.ok()) {
+ delete read_async_info;
+ }
+ return s;
+}
+
+void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req,
+ void* cb_arg) {
+ ReadAsyncInfo* read_async_info = static_cast<ReadAsyncInfo*>(cb_arg);
+ assert(read_async_info);
+ assert(read_async_info->cb_);
+
+ read_async_info->cb_(req, read_async_info->cb_arg_);
+
+ // Update stats and notify listeners.
+ if (stats_ != nullptr && file_read_hist_ != nullptr) {
+ // elapsed doesn't take into account delay and overwrite as StopWatch does
+ // in Read.
+ uint64_t elapsed = clock_->NowMicros() - read_async_info->start_time_;
+ file_read_hist_->Add(elapsed);
+ }
+ if (req.status.ok()) {
+ RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size());
+ }
+#ifndef ROCKSDB_LITE
+ if (ShouldNotifyListeners()) {
+ auto finish_ts = FileOperationInfo::FinishNow();
+ NotifyOnFileReadFinish(req.offset, req.result.size(),
+ read_async_info->fs_start_ts_, finish_ts,
+ req.status);
+ }
+ if (!req.status.ok()) {
+ NotifyOnIOError(req.status, FileOperationType::kRead, file_name(),
+ req.result.size(), req.offset);
+ }
+#endif
+ RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size());
+ delete read_async_info;
}
} // namespace ROCKSDB_NAMESPACE
const Temperature file_temperature_;
const bool is_last_level_;
+ struct ReadAsyncInfo {
+#ifndef ROCKSDB_LITE
+ FileOperationInfo::StartTimePoint fs_start_ts_;
+#endif
+ uint64_t start_time_;
+ std::function<void(const FSReadRequest&, void*)> cb_;
+ void* cb_arg_;
+ };
+
public:
explicit RandomAccessFileReader(
std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name,
std::function<void(const FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
Env::IOPriority rate_limiter_priority);
+
+ void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg);
};
} // namespace ROCKSDB_NAMESPACE
// Error handler statistics
ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
+ ASYNC_READ_BYTES,
+
HISTOGRAM_ENUM_MAX,
};
case ROCKSDB_NAMESPACE::Histograms::NUM_SST_READ_PER_LEVEL:
return 0x31;
case ROCKSDB_NAMESPACE::Histograms::ERROR_HANDLER_AUTORESUME_RETRY_COUNT:
- return 0x31;
+ return 0x32;
+ case ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES:
+ return 0x33;
case ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX:
// 0x1F for backwards compatibility on current minor version.
return 0x1F;
case 0x32:
return ROCKSDB_NAMESPACE::Histograms::
ERROR_HANDLER_AUTORESUME_RETRY_COUNT;
+ case 0x33:
+ return ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES;
case 0x1F:
// 0x1F for backwards compatibility on current minor version.
return ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX;
*/
ERROR_HANDLER_AUTORESUME_RETRY_COUNT((byte) 0x32),
+ ASYNC_READ_BYTES((byte) 0x33),
+
// 0x1F for backwards compatibility on current minor version.
HISTOGRAM_ENUM_MAX((byte) 0x1F);
{NUM_SST_READ_PER_LEVEL, "rocksdb.num.sst.read.per.level"},
{ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
"rocksdb.error.handler.autoresume.retry.count"},
+ {ASYNC_READ_BYTES, "rocksdb.async.read.bytes"},
};
std::shared_ptr<Statistics> CreateDBStatistics() {