* Delete deprecated classes for creating backups (BackupableDB) and restoring from backups (RestoreBackupableDB). Now, BackupEngine should be used for creating backups, and BackupEngineReadOnly should be used for restorations. For more details, see https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F
* Expose estimate of per-level compression ratio via DB property: "rocksdb.compression-ratio-at-levelN".
* Added EventListener::OnTableFileCreationStarted. EventListener::OnTableFileCreated will be called on failure case. User can check creation status via TableFileCreationInfo::status.
+### New Features
+* Add ReadOptions::readahead_size. If non-zero, NewIterator will create a new table reader which performs reads of the given size.
## 4.7.0 (4/8/2016)
### Public API Change
#include "db/db_test_util.h"
#include "port/stack_trace.h"
+#include "rocksdb/iostats_context.h"
#include "rocksdb/perf_context.h"
namespace rocksdb {
ASSERT_EQ(TestGetTickerCount(options, ITER_BYTES_READ), total_bytes);
}
+TEST_F(DBIteratorTest, ReadAhead) {
+ Options options;
+ auto env = new SpecialEnv(Env::Default());
+ env->count_random_reads_ = true;
+ options.env = env;
+ options.disable_auto_compactions = true;
+ options.write_buffer_size = 4 << 20;
+ options.statistics = rocksdb::CreateDBStatistics();
+ BlockBasedTableOptions table_options;
+ table_options.block_size = 1024;
+ table_options.no_block_cache = true;
+ options.table_factory.reset(new BlockBasedTableFactory(table_options));
+ Reopen(options);
+
+ std::string value(1024, 'a');
+ for (int i = 0; i < 100; i++) {
+ Put(Key(i), value);
+ }
+ ASSERT_OK(Flush());
+ MoveFilesToLevel(2);
+
+ for (int i = 0; i < 100; i++) {
+ Put(Key(i), value);
+ }
+ ASSERT_OK(Flush());
+ MoveFilesToLevel(1);
+
+ for (int i = 0; i < 100; i++) {
+ Put(Key(i), value);
+ }
+ ASSERT_OK(Flush());
+ ASSERT_EQ("1,1,1", FilesPerLevel());
+
+ env->random_read_bytes_counter_ = 0;
+ options.statistics->setTickerCount(NO_FILE_OPENS, 0);
+ ReadOptions read_options;
+ auto* iter = db_->NewIterator(read_options);
+ iter->SeekToFirst();
+ int64_t num_file_opens = TestGetTickerCount(options, NO_FILE_OPENS);
+ int64_t bytes_read = env->random_read_bytes_counter_;
+ delete iter;
+
+ env->random_read_bytes_counter_ = 0;
+ options.statistics->setTickerCount(NO_FILE_OPENS, 0);
+ read_options.readahead_size = 1024 * 10;
+ iter = db_->NewIterator(read_options);
+ iter->SeekToFirst();
+ int64_t num_file_opens_readahead = TestGetTickerCount(options, NO_FILE_OPENS);
+ int64_t bytes_read_readahead = env->random_read_bytes_counter_;
+ delete iter;
+ ASSERT_EQ(num_file_opens + 3, num_file_opens_readahead);
+ ASSERT_GT(bytes_read_readahead, bytes_read);
+ ASSERT_GT(bytes_read_readahead, read_options.readahead_size * 3);
+
+ // Verify correctness.
+ iter = db_->NewIterator(read_options);
+ int count = 0;
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ ASSERT_EQ(value, iter->value());
+ count++;
+ }
+ ASSERT_EQ(100, count);
+ for (int i = 0; i < 100; i++) {
+ iter->Seek(Key(i));
+ ASSERT_EQ(value, iter->value());
+ }
+ delete iter;
+}
+
} // namespace rocksdb
int main(int argc, char** argv) {
class CountingFile : public RandomAccessFile {
public:
CountingFile(unique_ptr<RandomAccessFile>&& target,
- anon::AtomicCounter* counter)
- : target_(std::move(target)), counter_(counter) {}
+ anon::AtomicCounter* counter,
+ std::atomic<int64_t>* bytes_read)
+ : target_(std::move(target)),
+ counter_(counter),
+ bytes_read_(bytes_read) {}
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
counter_->Increment();
- return target_->Read(offset, n, result, scratch);
+ Status s = target_->Read(offset, n, result, scratch);
+ *bytes_read_ += result->size();
+ return s;
}
private:
unique_ptr<RandomAccessFile> target_;
anon::AtomicCounter* counter_;
+ std::atomic<int64_t>* bytes_read_;
};
Status s = target()->NewRandomAccessFile(f, r, soptions);
random_file_open_counter_++;
if (s.ok() && count_random_reads_) {
- r->reset(new CountingFile(std::move(*r), &random_read_counter_));
+ r->reset(new CountingFile(std::move(*r), &random_read_counter_,
+ &random_read_bytes_counter_));
}
return s;
}
bool count_random_reads_;
anon::AtomicCounter random_read_counter_;
+ std::atomic<int64_t> random_read_bytes_counter_;
std::atomic<int> random_file_open_counter_;
bool count_sequential_reads_;
Status TableCache::GetTableReader(
const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
- bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist,
- unique_ptr<TableReader>* table_reader, bool skip_filters, int level) {
+ bool sequential_mode, size_t readahead, bool record_read_stats,
+ HistogramImpl* file_read_hist, unique_ptr<TableReader>* table_reader,
+ bool skip_filters, int level) {
std::string fname =
TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId());
unique_ptr<RandomAccessFile> file;
Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options);
- if (sequential_mode && ioptions_.compaction_readahead_size > 0) {
- file = NewReadaheadRandomAccessFile(std::move(file),
- ioptions_.compaction_readahead_size);
+
+ if (readahead > 0) {
+ file = NewReadaheadRandomAccessFile(std::move(file), readahead);
}
RecordTick(ioptions_.statistics, NO_FILE_OPENS);
if (s.ok()) {
}
unique_ptr<TableReader> table_reader;
s = GetTableReader(env_options, internal_comparator, fd,
- false /* sequential mode */, record_read_stats,
- file_read_hist, &table_reader, skip_filters, level);
+ false /* sequential mode */, 0 /* readahead */,
+ record_read_stats, file_read_hist, &table_reader,
+ skip_filters, level);
if (!s.ok()) {
assert(table_reader == nullptr);
RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
TableReader* table_reader = nullptr;
Cache::Handle* handle = nullptr;
- bool create_new_table_reader =
- (for_compaction && ioptions_.new_table_reader_for_compaction_inputs);
+
+ size_t readahead = 0;
+ bool create_new_table_reader = false;
+ if (for_compaction) {
+ if (ioptions_.new_table_reader_for_compaction_inputs) {
+ readahead = ioptions_.compaction_readahead_size;
+ create_new_table_reader = true;
+ }
+ } else {
+ readahead = options.readahead_size;
+ create_new_table_reader = readahead > 0;
+ }
+
if (create_new_table_reader) {
unique_ptr<TableReader> table_reader_unique_ptr;
Status s = GetTableReader(
- env_options, icomparator, fd, /* sequential mode */ true,
- /* record stats */ false, nullptr, &table_reader_unique_ptr,
+ env_options, icomparator, fd, true /* sequential_mode */, readahead,
+ !for_compaction /* record stats */, nullptr, &table_reader_unique_ptr,
false /* skip_filters */, level);
if (!s.ok()) {
return NewErrorInternalIterator(s, arena);
Status GetTableReader(const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, bool sequential_mode,
- bool record_read_stats, HistogramImpl* file_read_hist,
+ size_t readahead, bool record_read_stats,
+ HistogramImpl* file_read_hist,
unique_ptr<TableReader>* table_reader,
bool skip_filters = false, int level = -1);
// Default: false
bool pin_data;
+ // If non-zero, NewIterator will create a new table reader which
+ // performs reads of the given size. Using a large size (> 2MB) can
+ // improve the performance of forward iteration on spinning disks.
+ // Default: 0
+ size_t readahead_size;
+
ReadOptions();
ReadOptions(bool cksum, bool cache);
};
managed(false),
total_order_seek(false),
prefix_same_as_start(false),
- pin_data(false) {
+ pin_data(false),
+ readahead_size(0) {
XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
reinterpret_cast<ReadOptions*>(this));
}
managed(false),
total_order_seek(false),
prefix_same_as_start(false),
- pin_data(false) {
+ pin_data(false),
+ readahead_size(0) {
XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
reinterpret_cast<ReadOptions*>(this));
}