]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Enable configurable readahead for iterators
authorYi Wu <yiwu@fb.com>
Wed, 4 May 2016 22:25:58 +0000 (15:25 -0700)
committerYi Wu <yiwu@fb.com>
Thu, 5 May 2016 17:09:16 +0000 (10:09 -0700)
Summary:
Add an option `iterator_readahead_size` to `ReadOptions` to enable
configurable readahead for iterators similar to the corresponding
option for compaction.

Test Plan:
```
make commit_prereq
```

Reviewers: kumar.rangarajan, ott, igor, sdong

Reviewed By: sdong

Subscribers: yiwu, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D55419

HISTORY.md
db/db_iterator_test.cc
db/db_test_util.h
db/table_cache.cc
db/table_cache.h
include/rocksdb/options.h
util/options.cc

index d945115c6040fb8adf1ec91e3a8caaf0b5606583..ad2050af01b27d28aa13512bffcb84756acf4aac 100644 (file)
@@ -5,6 +5,8 @@
 * Delete deprecated classes for creating backups (BackupableDB) and restoring from backups (RestoreBackupableDB). Now, BackupEngine should be used for creating backups, and BackupEngineReadOnly should be used for restorations. For more details, see https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F
 * Expose estimate of per-level compression ratio via DB property: "rocksdb.compression-ratio-at-levelN".
 * Added EventListener::OnTableFileCreationStarted. EventListener::OnTableFileCreated will be called on failure case. User can check creation status via TableFileCreationInfo::status.
+### New Features
+* Add ReadOptions::readahead_size. If non-zero, NewIterator will create a new table reader which performs reads of the given size.
 
 ## 4.7.0 (4/8/2016)
 ### Public API Change
index 48f7cf9e1b0dacec9ca071cdd4acbd369aea1a39..7635900e8db5e62ef83b455b151c2247e1c0fc5d 100644 (file)
@@ -9,6 +9,7 @@
 
 #include "db/db_test_util.h"
 #include "port/stack_trace.h"
+#include "rocksdb/iostats_context.h"
 #include "rocksdb/perf_context.h"
 
 namespace rocksdb {
@@ -1525,6 +1526,75 @@ TEST_F(DBIteratorTest, IteratorWithLocalStatistics) {
   ASSERT_EQ(TestGetTickerCount(options, ITER_BYTES_READ), total_bytes);
 }
 
+TEST_F(DBIteratorTest, ReadAhead) {
+  Options options;
+  auto env = new SpecialEnv(Env::Default());
+  env->count_random_reads_ = true;
+  options.env = env;
+  options.disable_auto_compactions = true;
+  options.write_buffer_size = 4 << 20;
+  options.statistics = rocksdb::CreateDBStatistics();
+  BlockBasedTableOptions table_options;
+  table_options.block_size = 1024;
+  table_options.no_block_cache = true;
+  options.table_factory.reset(new BlockBasedTableFactory(table_options));
+  Reopen(options);
+
+  std::string value(1024, 'a');
+  for (int i = 0; i < 100; i++) {
+    Put(Key(i), value);
+  }
+  ASSERT_OK(Flush());
+  MoveFilesToLevel(2);
+
+  for (int i = 0; i < 100; i++) {
+    Put(Key(i), value);
+  }
+  ASSERT_OK(Flush());
+  MoveFilesToLevel(1);
+
+  for (int i = 0; i < 100; i++) {
+    Put(Key(i), value);
+  }
+  ASSERT_OK(Flush());
+  ASSERT_EQ("1,1,1", FilesPerLevel());
+
+  env->random_read_bytes_counter_ = 0;
+  options.statistics->setTickerCount(NO_FILE_OPENS, 0);
+  ReadOptions read_options;
+  auto* iter = db_->NewIterator(read_options);
+  iter->SeekToFirst();
+  int64_t num_file_opens = TestGetTickerCount(options, NO_FILE_OPENS);
+  int64_t bytes_read = env->random_read_bytes_counter_;
+  delete iter;
+
+  env->random_read_bytes_counter_ = 0;
+  options.statistics->setTickerCount(NO_FILE_OPENS, 0);
+  read_options.readahead_size = 1024 * 10;
+  iter = db_->NewIterator(read_options);
+  iter->SeekToFirst();
+  int64_t num_file_opens_readahead = TestGetTickerCount(options, NO_FILE_OPENS);
+  int64_t bytes_read_readahead = env->random_read_bytes_counter_;
+  delete iter;
+  ASSERT_EQ(num_file_opens + 3, num_file_opens_readahead);
+  ASSERT_GT(bytes_read_readahead, bytes_read);
+  ASSERT_GT(bytes_read_readahead, read_options.readahead_size * 3);
+
+  // Verify correctness.
+  iter = db_->NewIterator(read_options);
+  int count = 0;
+  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+    ASSERT_EQ(value, iter->value());
+    count++;
+  }
+  ASSERT_EQ(100, count);
+  for (int i = 0; i < 100; i++) {
+    iter->Seek(Key(i));
+    ASSERT_EQ(value, iter->value());
+  }
+  delete iter;
+}
+
 }  // namespace rocksdb
 
 int main(int argc, char** argv) {
index dffcc9037c8c8beed708ca9375efdc4cbeb4ce43..4f2cea29cbfcc2c0061c5bc2bb097a70770e814c 100644 (file)
@@ -358,23 +358,30 @@ class SpecialEnv : public EnvWrapper {
     class CountingFile : public RandomAccessFile {
      public:
       CountingFile(unique_ptr<RandomAccessFile>&& target,
-                   anon::AtomicCounter* counter)
-          : target_(std::move(target)), counter_(counter) {}
+                   anon::AtomicCounter* counter,
+                   std::atomic<int64_t>* bytes_read)
+          : target_(std::move(target)),
+            counter_(counter),
+            bytes_read_(bytes_read) {}
       virtual Status Read(uint64_t offset, size_t n, Slice* result,
                           char* scratch) const override {
         counter_->Increment();
-        return target_->Read(offset, n, result, scratch);
+        Status s = target_->Read(offset, n, result, scratch);
+        *bytes_read_ += result->size();
+        return s;
       }
 
      private:
       unique_ptr<RandomAccessFile> target_;
       anon::AtomicCounter* counter_;
+      std::atomic<int64_t>* bytes_read_;
     };
 
     Status s = target()->NewRandomAccessFile(f, r, soptions);
     random_file_open_counter_++;
     if (s.ok() && count_random_reads_) {
-      r->reset(new CountingFile(std::move(*r), &random_read_counter_));
+      r->reset(new CountingFile(std::move(*r), &random_read_counter_,
+                                &random_read_bytes_counter_));
     }
     return s;
   }
@@ -464,6 +471,7 @@ class SpecialEnv : public EnvWrapper {
 
   bool count_random_reads_;
   anon::AtomicCounter random_read_counter_;
+  std::atomic<int64_t> random_read_bytes_counter_;
   std::atomic<int> random_file_open_counter_;
 
   bool count_sequential_reads_;
index f8c81f9f90a932026d3d4f7194643ae4477cdd9f..9b0e8edb9bf7bf3497075ccbfb916e515b893c59 100644 (file)
@@ -87,15 +87,16 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) {
 Status TableCache::GetTableReader(
     const EnvOptions& env_options,
     const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
-    bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist,
-    unique_ptr<TableReader>* table_reader, bool skip_filters, int level) {
+    bool sequential_mode, size_t readahead, bool record_read_stats,
+    HistogramImpl* file_read_hist, unique_ptr<TableReader>* table_reader,
+    bool skip_filters, int level) {
   std::string fname =
       TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId());
   unique_ptr<RandomAccessFile> file;
   Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options);
-  if (sequential_mode && ioptions_.compaction_readahead_size > 0) {
-    file = NewReadaheadRandomAccessFile(std::move(file),
-                                        ioptions_.compaction_readahead_size);
+
+  if (readahead > 0) {
+    file = NewReadaheadRandomAccessFile(std::move(file), readahead);
   }
   RecordTick(ioptions_.statistics, NO_FILE_OPENS);
   if (s.ok()) {
@@ -143,8 +144,9 @@ Status TableCache::FindTable(const EnvOptions& env_options,
     }
     unique_ptr<TableReader> table_reader;
     s = GetTableReader(env_options, internal_comparator, fd,
-                       false /* sequential mode */, record_read_stats,
-                       file_read_hist, &table_reader, skip_filters, level);
+                       false /* sequential mode */, 0 /* readahead */,
+                       record_read_stats, file_read_hist, &table_reader,
+                       skip_filters, level);
     if (!s.ok()) {
       assert(table_reader == nullptr);
       RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
@@ -175,13 +177,24 @@ InternalIterator* TableCache::NewIterator(
 
   TableReader* table_reader = nullptr;
   Cache::Handle* handle = nullptr;
-  bool create_new_table_reader =
-      (for_compaction && ioptions_.new_table_reader_for_compaction_inputs);
+
+  size_t readahead = 0;
+  bool create_new_table_reader = false;
+  if (for_compaction) {
+    if (ioptions_.new_table_reader_for_compaction_inputs) {
+      readahead = ioptions_.compaction_readahead_size;
+      create_new_table_reader = true;
+    }
+  } else {
+    readahead = options.readahead_size;
+    create_new_table_reader = readahead > 0;
+  }
+
   if (create_new_table_reader) {
     unique_ptr<TableReader> table_reader_unique_ptr;
     Status s = GetTableReader(
-        env_options, icomparator, fd, /* sequential mode */ true,
-        /* record stats */ false, nullptr, &table_reader_unique_ptr,
+        env_options, icomparator, fd, true /* sequential_mode */, readahead,
+        !for_compaction /* record stats */, nullptr, &table_reader_unique_ptr,
         false /* skip_filters */, level);
     if (!s.ok()) {
       return NewErrorInternalIterator(s, arena);
index fbb7cacbf8e6fa3b97168b945d544eef2fbed5ff..18882c6a24366ba4303393afc46fa3b912291c0e 100644 (file)
@@ -111,7 +111,8 @@ class TableCache {
   Status GetTableReader(const EnvOptions& env_options,
                         const InternalKeyComparator& internal_comparator,
                         const FileDescriptor& fd, bool sequential_mode,
-                        bool record_read_stats, HistogramImpl* file_read_hist,
+                        size_t readahead, bool record_read_stats,
+                        HistogramImpl* file_read_hist,
                         unique_ptr<TableReader>* table_reader,
                         bool skip_filters = false, int level = -1);
 
index 67ab2d8f5d2797da72607d6c0a64d49c54fb3865..d93af2314bae08388b56b7cef106e3df098f8cc2 100644 (file)
@@ -1466,6 +1466,12 @@ struct ReadOptions {
   // Default: false
   bool pin_data;
 
+  // If non-zero, NewIterator will create a new table reader which
+  // performs reads of the given size. Using a large size (> 2MB) can
+  // improve the performance of forward iteration on spinning disks.
+  // Default: 0
+  size_t readahead_size;
+
   ReadOptions();
   ReadOptions(bool cksum, bool cache);
 };
index 44c7604eb65d069d7564f7d890cf07217ac263cc..7eef1ab2725676da3da0df16218e3712c7e31c7f 100644 (file)
@@ -794,7 +794,8 @@ ReadOptions::ReadOptions()
       managed(false),
       total_order_seek(false),
       prefix_same_as_start(false),
-      pin_data(false) {
+      pin_data(false),
+      readahead_size(0) {
   XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
              reinterpret_cast<ReadOptions*>(this));
 }
@@ -809,7 +810,8 @@ ReadOptions::ReadOptions(bool cksum, bool cache)
       managed(false),
       total_order_seek(false),
       prefix_same_as_start(false),
-      pin_data(false) {
+      pin_data(false),
+      readahead_size(0) {
   XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
              reinterpret_cast<ReadOptions*>(this));
 }