bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) {
offset += length;
length = 0;
+
+ // Since async request was submitted directly by calling PrefetchAsync in
+ // last call, we don't need to prefetch further as this call is to poll the
+ // data submitted in previous call.
+ if (async_request_submitted_) {
+ return Status::OK();
+ }
}
+ async_request_submitted_ = false;
+
Status s;
size_t prefetch_size = length + readahead_size;
size_t alignment = reader->file()->GetRequiredBufferAlignment();
return true;
}
-// TODO akanksha: Merge this function with TryReadFromCache once async
-// functionality is stable.
bool FilePrefetchBuffer::TryReadFromCacheAsync(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status,
Env::IOPriority rate_limiter_priority) {
- assert(async_io_);
-
if (track_min_offset_ && offset < min_offset_read_) {
min_offset_read_ = static_cast<size_t>(offset);
}
#endif
return false;
}
- prefetched = true;
+ prefetched = async_request_submitted_ ? false : true;
} else {
return false;
}
if (prefetched) {
readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
}
+ async_request_submitted_ = false;
return true;
}
// the data. It will return without polling if blocks are not sequential.
UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false);
prev_len_ = 0;
+ async_request_submitted_ = true;
return Status::TryAgain();
}
FilePrefetchBuffer(size_t readahead_size = 0, size_t max_readahead_size = 0,
bool enable = true, bool track_min_offset = false,
bool implicit_auto_readahead = false,
- bool async_io = false, FileSystem* fs = nullptr,
+ uint64_t num_file_reads = 0, FileSystem* fs = nullptr,
SystemClock* clock = nullptr, Statistics* stats = nullptr)
: curr_(0),
readahead_size_(readahead_size),
implicit_auto_readahead_(implicit_auto_readahead),
prev_offset_(0),
prev_len_(0),
- num_file_reads_(kMinNumFileReadsToStartAutoReadahead + 1),
+ num_file_reads_(num_file_reads),
io_handle_(nullptr),
del_fn_(nullptr),
async_read_in_progress_(false),
- async_io_(async_io),
+ async_request_submitted_(false),
fs_(fs),
clock_(clock),
stats_(stats) {
+ assert((num_file_reads_ >= kMinNumFileReadsToStartAutoReadahead + 1) ||
+ (num_file_reads_ == 0));
// If async_io_ is enabled, data is asynchronously filled in second buffer
// while curr_ is being consumed. If data is overlapping in two buffers,
// data is copied to third buffer to return continuous buffer.
bufs_.resize(3);
- (void)async_io_;
}
~FilePrefetchBuffer() {
readahead_size_ = initial_auto_readahead_size_;
}
+ // Called in case of implicit auto prefetching.
bool IsEligibleForPrefetch(uint64_t offset, size_t n) {
// Prefetch only if this read is sequential otherwise reset readahead_size_
// to initial value.
return false;
}
num_file_reads_++;
+
+ // Since async request was submitted in last call directly by calling
+ // PrefetchAsync, it skips num_file_reads_ check as this call is to poll the
+ // data submitted in previous call.
+ if (async_request_submitted_) {
+ return true;
+ }
if (num_file_reads_ <= kMinNumFileReadsToStartAutoReadahead) {
UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/);
return false;
bool implicit_auto_readahead_;
uint64_t prev_offset_;
size_t prev_len_;
- int64_t num_file_reads_;
+ // num_file_reads_ is only used when implicit_auto_readahead_ is set.
+ uint64_t num_file_reads_;
// io_handle_ is allocated and used by underlying file system in case of
// asynchronous reads.
void* io_handle_;
IOHandleDeleter del_fn_;
bool async_read_in_progress_;
- bool async_io_;
+
+ // If async_request_submitted_ is set then it indicates RocksDB called
+ // PrefetchAsync to submit request. It needs to TryReadFromCacheAsync to poll
+ // the submitted request without checking if data is sequential and
+ // num_file_reads_.
+ bool async_request_submitted_;
+
FileSystem* fs_;
SystemClock* clock_;
Statistics* stats_;
size_t expected_current_readahead_size = 8 * 1024;
size_t decrease_readahead_size = 8 * 1024;
- SyncPoint::GetInstance()->SetCallBack(
- "FilePrefetchBuffer::PrefetchAsyncInternal:Start",
- [&](void*) { buff_prefetch_count++; });
+ SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
+ [&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::TryReadFromCache", [&](void* arg) {
current_readahead_size = *reinterpret_cast<size_t*>(arg);
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
ro.adaptive_readahead = true;
- ro.async_io = true;
+ // ro.async_io = true;
{
/*
* Reseek keys from sequential Data Blocks within same partitioned
? (expected_current_readahead_size - decrease_readahead_size)
: 0));
- iter->Seek(BuildKey(1000)); // Prefetch the block.
+ iter->Seek(BuildKey(1000)); // Won't prefetch the block.
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
- expected_current_readahead_size *= 2;
iter->Seek(BuildKey(1004)); // Prefetch the block.
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
expected_current_readahead_size *= 2;
- // 1011 is already in cache but won't reset??
iter->Seek(BuildKey(1011));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1022));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
- ASSERT_EQ(buff_prefetch_count, 3);
+ ASSERT_EQ(buff_prefetch_count, 2);
+
+ buff_prefetch_count = 0;
+ }
+ Close();
+}
+
+TEST_P(PrefetchTest2, SeekParallelizationTest) {
+ const int kNumKeys = 2000;
+ // Set options
+ std::shared_ptr<MockFS> fs =
+ std::make_shared<MockFS>(env_->GetFileSystem(), false);
+ std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
+
+ Options options = CurrentOptions();
+ options.write_buffer_size = 1024;
+ options.create_if_missing = true;
+ options.compression = kNoCompression;
+ options.env = env.get();
+ if (GetParam()) {
+ options.use_direct_reads = true;
+ options.use_direct_io_for_flush_and_compaction = true;
+ }
+
+ options.statistics = CreateDBStatistics();
+ BlockBasedTableOptions table_options;
+ table_options.no_block_cache = true;
+ table_options.cache_index_and_filter_blocks = false;
+ table_options.metadata_block_size = 1024;
+ table_options.index_type =
+ BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+
+ Status s = TryReopen(options);
+ if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) {
+ // If direct IO is not supported, skip the test
+ return;
+ } else {
+ ASSERT_OK(s);
+ }
+
+ WriteBatch batch;
+ Random rnd(309);
+ for (int i = 0; i < kNumKeys; i++) {
+ ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
+ }
+ ASSERT_OK(db_->Write(WriteOptions(), &batch));
+
+ std::string start_key = BuildKey(0);
+ std::string end_key = BuildKey(kNumKeys - 1);
+ Slice least(start_key.data(), start_key.size());
+ Slice greatest(end_key.data(), end_key.size());
+
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
+
+ int buff_prefetch_count = 0;
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "FilePrefetchBuffer::PrefetchAsyncInternal:Start",
+ [&](void*) { buff_prefetch_count++; });
+
+ SyncPoint::GetInstance()->EnableProcessing();
+ ReadOptions ro;
+ ro.adaptive_readahead = true;
+ ro.async_io = true;
+
+ {
+ ASSERT_OK(options.statistics->Reset());
+ // Each block contains around 4 keys.
+ auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
+ iter->Seek(BuildKey(0)); // Prefetch data because of seek parallelization.
+ ASSERT_TRUE(iter->Valid());
+ iter->Next();
+ ASSERT_TRUE(iter->Valid());
+ iter->Next();
+ ASSERT_TRUE(iter->Valid());
+ iter->Next();
+ ASSERT_TRUE(iter->Valid());
+
+ // New data block. Since num_file_reads in FilePrefetch after this read is
+ // 2, it won't go for prefetching.
+ iter->Next();
+ ASSERT_TRUE(iter->Valid());
+ iter->Next();
+ ASSERT_TRUE(iter->Valid());
+ iter->Next();
+ ASSERT_TRUE(iter->Valid());
+ iter->Next();
+ ASSERT_TRUE(iter->Valid());
+
+ // Prefetch data.
+ iter->Next();
+ ASSERT_TRUE(iter->Valid());
+
+ ASSERT_EQ(buff_prefetch_count, 2);
// Check stats to make sure async prefetch is done.
{
// Enabled from the very first IO when ReadOptions.readahead_size is set.
block_prefetcher_.PrefetchIfNeeded(
rep, data_block_handle, read_options_.readahead_size, is_for_compaction,
- read_options_.async_io, read_options_.rate_limiter_priority);
+ /*async_io=*/false, read_options_.rate_limiter_priority);
Status s;
table_->NewDataBlockIterator<DataBlockIter>(
read_options_, data_block_handle, &block_iter_, BlockType::kData,
size_t max_readahead_size,
std::unique_ptr<FilePrefetchBuffer>* fpb,
bool implicit_auto_readahead,
- bool async_io) const {
+ uint64_t num_file_reads) const {
fpb->reset(new FilePrefetchBuffer(
readahead_size, max_readahead_size,
!ioptions.allow_mmap_reads /* enable */, false /* track_min_offset */,
- implicit_auto_readahead, async_io, ioptions.fs.get(), ioptions.clock,
- ioptions.stats));
+ implicit_auto_readahead, num_file_reads, ioptions.fs.get(),
+ ioptions.clock, ioptions.stats));
}
void CreateFilePrefetchBufferIfNotExists(
size_t readahead_size, size_t max_readahead_size,
std::unique_ptr<FilePrefetchBuffer>* fpb, bool implicit_auto_readahead,
- bool async_io) const {
+ uint64_t num_file_reads) const {
if (!(*fpb)) {
CreateFilePrefetchBuffer(readahead_size, max_readahead_size, fpb,
- implicit_auto_readahead, async_io);
+ implicit_auto_readahead, num_file_reads);
}
}
const BlockBasedTable::Rep* rep, const BlockHandle& handle,
const size_t readahead_size, bool is_for_compaction, const bool async_io,
const Env::IOPriority rate_limiter_priority) {
+ // num_file_reads is used by FilePrefetchBuffer only when
+ // implicit_auto_readahead is set.
if (is_for_compaction) {
rep->CreateFilePrefetchBufferIfNotExists(
compaction_readahead_size_, compaction_readahead_size_,
- &prefetch_buffer_, false, async_io);
+ &prefetch_buffer_, /*implicit_auto_readahead=*/false,
+ /*num_file_reads=*/0);
return;
}
// Explicit user requested readahead.
if (readahead_size > 0) {
rep->CreateFilePrefetchBufferIfNotExists(
- readahead_size, readahead_size, &prefetch_buffer_, false, async_io);
+ readahead_size, readahead_size, &prefetch_buffer_,
+ /*implicit_auto_readahead=*/false, /*num_file_reads=*/0);
return;
}
return;
}
- // In case of async_io, it always creates the PrefetchBuffer.
+ // In case of async_io, always creates the PrefetchBuffer irrespective of
+ // num_file_reads_.
if (async_io) {
rep->CreateFilePrefetchBufferIfNotExists(
initial_auto_readahead_size_, max_auto_readahead_size,
- &prefetch_buffer_, /*implicit_auto_readahead=*/true, async_io);
+ &prefetch_buffer_, /*implicit_auto_readahead=*/true,
+ /*num_file_reads=*/0);
return;
}
}
if (rep->file->use_direct_io()) {
- rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size_,
- max_auto_readahead_size,
- &prefetch_buffer_, true, async_io);
+ rep->CreateFilePrefetchBufferIfNotExists(
+ initial_auto_readahead_size_, max_auto_readahead_size,
+ &prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_);
return;
}
BlockBasedTable::BlockSizeWithTrailer(handle) + readahead_size_,
rate_limiter_priority);
if (s.IsNotSupported()) {
- rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size_,
- max_auto_readahead_size,
- &prefetch_buffer_, true, async_io);
+ rep->CreateFilePrefetchBufferIfNotExists(
+ initial_auto_readahead_size_, max_auto_readahead_size,
+ &prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_);
return;
}
// initial_auto_readahead_size_ is used if RocksDB uses internal prefetch
// buffer.
uint64_t initial_auto_readahead_size_;
- int64_t num_file_reads_ = 0;
+ uint64_t num_file_reads_ = 0;
uint64_t prev_offset_ = 0;
size_t prev_len_ = 0;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer_;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer,
false /* Implicit autoreadahead */,
- false /*async_io*/);
+ 0 /*num_reads_*/);
IOOptions opts;
s = rep->file->PrepareIOOptions(ro, opts);
// Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0.
// Explicit user requested readahead:
// Enabled from the very first IO when ReadOptions.readahead_size is set.
- block_prefetcher_.PrefetchIfNeeded(
- rep, partitioned_index_handle, read_options_.readahead_size,
- is_for_compaction, read_options_.async_io,
- read_options_.rate_limiter_priority);
+ block_prefetcher_.PrefetchIfNeeded(rep, partitioned_index_handle,
+ read_options_.readahead_size,
+ is_for_compaction, /*async_io=*/false,
+ read_options_.rate_limiter_priority);
Status s;
table_->NewDataBlockIterator<IndexBlockIter>(
read_options_, partitioned_index_handle, &block_iter_,
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer,
false /*Implicit auto readahead*/,
- false /*async_io*/);
+ 0 /*num_reads_*/);
IOOptions opts;
{
Status s = rep->file->PrepareIOOptions(ro, opts);