From: Yi Wu Date: Wed, 4 May 2016 22:25:58 +0000 (-0700) Subject: Enable configurable readahead for iterators X-Git-Tag: rocksdb-4.8~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2182bf2828ee34fc635ed78acb6606842bc93474;p=rocksdb.git Enable configurable readahead for iterators Summary: Add an option `iterator_readahead_size` to `ReadOptions` to enable configurable readahead for iterators similar to the corresponding option for compaction. Test Plan: ``` make commit_prereq ``` Reviewers: kumar.rangarajan, ott, igor, sdong Reviewed By: sdong Subscribers: yiwu, andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D55419 --- diff --git a/HISTORY.md b/HISTORY.md index d945115c..ad2050af 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,8 @@ * Delete deprecated classes for creating backups (BackupableDB) and restoring from backups (RestoreBackupableDB). Now, BackupEngine should be used for creating backups, and BackupEngineReadOnly should be used for restorations. For more details, see https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F * Expose estimate of per-level compression ratio via DB property: "rocksdb.compression-ratio-at-levelN". * Added EventListener::OnTableFileCreationStarted. EventListener::OnTableFileCreated will be called on failure case. User can check creation status via TableFileCreationInfo::status. +### New Features +* Add ReadOptions::readahead_size. If non-zero, NewIterator will create a new table reader which performs reads of the given size. ## 4.7.0 (4/8/2016) ### Public API Change diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 48f7cf9e..7635900e 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -9,6 +9,7 @@ #include "db/db_test_util.h" #include "port/stack_trace.h" +#include "rocksdb/iostats_context.h" #include "rocksdb/perf_context.h" namespace rocksdb { @@ -1525,6 +1526,75 @@ TEST_F(DBIteratorTest, IteratorWithLocalStatistics) { ASSERT_EQ(TestGetTickerCount(options, ITER_BYTES_READ), total_bytes); } +TEST_F(DBIteratorTest, ReadAhead) { + Options options; + auto env = new SpecialEnv(Env::Default()); + env->count_random_reads_ = true; + options.env = env; + options.disable_auto_compactions = true; + options.write_buffer_size = 4 << 20; + options.statistics = rocksdb::CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.block_size = 1024; + table_options.no_block_cache = true; + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + Reopen(options); + + std::string value(1024, 'a'); + for (int i = 0; i < 100; i++) { + Put(Key(i), value); + } + ASSERT_OK(Flush()); + MoveFilesToLevel(2); + + for (int i = 0; i < 100; i++) { + Put(Key(i), value); + } + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + for (int i = 0; i < 100; i++) { + Put(Key(i), value); + } + ASSERT_OK(Flush()); + ASSERT_EQ("1,1,1", FilesPerLevel()); + + env->random_read_bytes_counter_ = 0; + options.statistics->setTickerCount(NO_FILE_OPENS, 0); + ReadOptions read_options; + auto* iter = db_->NewIterator(read_options); + iter->SeekToFirst(); + int64_t num_file_opens = TestGetTickerCount(options, NO_FILE_OPENS); + int64_t bytes_read = env->random_read_bytes_counter_; + delete iter; + + env->random_read_bytes_counter_ = 0; + options.statistics->setTickerCount(NO_FILE_OPENS, 0); + read_options.readahead_size = 1024 * 10; + iter = db_->NewIterator(read_options); + iter->SeekToFirst(); + int64_t num_file_opens_readahead = TestGetTickerCount(options, NO_FILE_OPENS); + int64_t bytes_read_readahead = env->random_read_bytes_counter_; + delete iter; + ASSERT_EQ(num_file_opens + 3, num_file_opens_readahead); + ASSERT_GT(bytes_read_readahead, bytes_read); + ASSERT_GT(bytes_read_readahead, read_options.readahead_size * 3); + + // Verify correctness. + iter = db_->NewIterator(read_options); + int count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_EQ(value, iter->value()); + count++; + } + ASSERT_EQ(100, count); + for (int i = 0; i < 100; i++) { + iter->Seek(Key(i)); + ASSERT_EQ(value, iter->value()); + } + delete iter; +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_test_util.h b/db/db_test_util.h index dffcc903..4f2cea29 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -358,23 +358,30 @@ class SpecialEnv : public EnvWrapper { class CountingFile : public RandomAccessFile { public: CountingFile(unique_ptr&& target, - anon::AtomicCounter* counter) - : target_(std::move(target)), counter_(counter) {} + anon::AtomicCounter* counter, + std::atomic* bytes_read) + : target_(std::move(target)), + counter_(counter), + bytes_read_(bytes_read) {} virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const override { counter_->Increment(); - return target_->Read(offset, n, result, scratch); + Status s = target_->Read(offset, n, result, scratch); + *bytes_read_ += result->size(); + return s; } private: unique_ptr target_; anon::AtomicCounter* counter_; + std::atomic* bytes_read_; }; Status s = target()->NewRandomAccessFile(f, r, soptions); random_file_open_counter_++; if (s.ok() && count_random_reads_) { - r->reset(new CountingFile(std::move(*r), &random_read_counter_)); + r->reset(new CountingFile(std::move(*r), &random_read_counter_, + &random_read_bytes_counter_)); } return s; } @@ -464,6 +471,7 @@ class SpecialEnv : public EnvWrapper { bool count_random_reads_; anon::AtomicCounter random_read_counter_; + std::atomic random_read_bytes_counter_; std::atomic random_file_open_counter_; bool count_sequential_reads_; diff --git a/db/table_cache.cc b/db/table_cache.cc index f8c81f9f..9b0e8edb 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -87,15 +87,16 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) { Status TableCache::GetTableReader( const EnvOptions& env_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, - bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist, - unique_ptr* table_reader, bool skip_filters, int level) { + bool sequential_mode, size_t readahead, bool record_read_stats, + HistogramImpl* file_read_hist, unique_ptr* table_reader, + bool skip_filters, int level) { std::string fname = TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId()); unique_ptr file; Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options); - if (sequential_mode && ioptions_.compaction_readahead_size > 0) { - file = NewReadaheadRandomAccessFile(std::move(file), - ioptions_.compaction_readahead_size); + + if (readahead > 0) { + file = NewReadaheadRandomAccessFile(std::move(file), readahead); } RecordTick(ioptions_.statistics, NO_FILE_OPENS); if (s.ok()) { @@ -143,8 +144,9 @@ Status TableCache::FindTable(const EnvOptions& env_options, } unique_ptr table_reader; s = GetTableReader(env_options, internal_comparator, fd, - false /* sequential mode */, record_read_stats, - file_read_hist, &table_reader, skip_filters, level); + false /* sequential mode */, 0 /* readahead */, + record_read_stats, file_read_hist, &table_reader, + skip_filters, level); if (!s.ok()) { assert(table_reader == nullptr); RecordTick(ioptions_.statistics, NO_FILE_ERRORS); @@ -175,13 +177,24 @@ InternalIterator* TableCache::NewIterator( TableReader* table_reader = nullptr; Cache::Handle* handle = nullptr; - bool create_new_table_reader = - (for_compaction && ioptions_.new_table_reader_for_compaction_inputs); + + size_t readahead = 0; + bool create_new_table_reader = false; + if (for_compaction) { + if (ioptions_.new_table_reader_for_compaction_inputs) { + readahead = ioptions_.compaction_readahead_size; + create_new_table_reader = true; + } + } else { + readahead = options.readahead_size; + create_new_table_reader = readahead > 0; + } + if (create_new_table_reader) { unique_ptr table_reader_unique_ptr; Status s = GetTableReader( - env_options, icomparator, fd, /* sequential mode */ true, - /* record stats */ false, nullptr, &table_reader_unique_ptr, + env_options, icomparator, fd, true /* sequential_mode */, readahead, + !for_compaction /* record stats */, nullptr, &table_reader_unique_ptr, false /* skip_filters */, level); if (!s.ok()) { return NewErrorInternalIterator(s, arena); diff --git a/db/table_cache.h b/db/table_cache.h index fbb7cacb..18882c6a 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -111,7 +111,8 @@ class TableCache { Status GetTableReader(const EnvOptions& env_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, bool sequential_mode, - bool record_read_stats, HistogramImpl* file_read_hist, + size_t readahead, bool record_read_stats, + HistogramImpl* file_read_hist, unique_ptr* table_reader, bool skip_filters = false, int level = -1); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 67ab2d8f..d93af231 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1466,6 +1466,12 @@ struct ReadOptions { // Default: false bool pin_data; + // If non-zero, NewIterator will create a new table reader which + // performs reads of the given size. Using a large size (> 2MB) can + // improve the performance of forward iteration on spinning disks. + // Default: 0 + size_t readahead_size; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/util/options.cc b/util/options.cc index 44c7604e..7eef1ab2 100644 --- a/util/options.cc +++ b/util/options.cc @@ -794,7 +794,8 @@ ReadOptions::ReadOptions() managed(false), total_order_seek(false), prefix_same_as_start(false), - pin_data(false) { + pin_data(false), + readahead_size(0) { XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, reinterpret_cast(this)); } @@ -809,7 +810,8 @@ ReadOptions::ReadOptions(bool cksum, bool cache) managed(false), total_order_seek(false), prefix_same_as_start(false), - pin_data(false) { + pin_data(false), + readahead_size(0) { XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, reinterpret_cast(this)); }