: DBBasicTest(), statistics_(ROCKSDB_NAMESPACE::CreateDBStatistics()) {
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10));
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- options.statistics = statistics_;
- options.table_factory.reset(NewBlockBasedTableFactory(bbto));
- Reopen(options);
+ options_ = CurrentOptions();
+ options_.disable_auto_compactions = true;
+ options_.statistics = statistics_;
+ options_.table_factory.reset(NewBlockBasedTableFactory(bbto));
+ Reopen(options_);
int num_keys = 0;
// Put all keys in the bottommost level, and overwrite some keys
const std::shared_ptr<Statistics>& statistics() { return statistics_; }
+ protected:
+ void ReopenDB() { Reopen(options_); }
+
private:
std::shared_ptr<Statistics> statistics_;
+ Options options_;
};
TEST_P(DBMultiGetAsyncIOTest, GetFromL0) {
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
}
+TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) {
+ std::vector<std::string> key_strs;
+ std::vector<Slice> keys;
+ std::vector<PinnableSlice> values;
+ std::vector<Status> statuses;
+
+ key_strs.push_back(Key(33));
+ key_strs.push_back(Key(54));
+ key_strs.push_back(Key(102));
+ keys.push_back(key_strs[0]);
+ keys.push_back(key_strs[1]);
+ keys.push_back(key_strs[2]);
+ values.resize(keys.size());
+ statuses.resize(keys.size());
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "TableCache::GetTableReader:BeforeOpenFile", [&](void* status) {
+ static int count = 0;
+ count++;
+ // Fail the last table reader open, which is the 6th SST file
+ // since 3 overlapping L0 files + 3 L1 files containing the keys
+ if (count == 6) {
+ Status* s = static_cast<Status*>(status);
+ *s = Status::IOError();
+ }
+ });
+ // DB open will create table readers unless we reduce the table cache
+ // capacity.
+ // SanitizeOptions will set max_open_files to minimum of 20. Table cache
+ // is allocated with max_open_files - 10 as capacity. So override
+ // max_open_files to 11 so table cache capacity will become 1. This will
+ // prevent file open during DB open and force the file to be opened
+ // during MultiGet
+ SyncPoint::GetInstance()->SetCallBack(
+ "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
+ int* max_open_files = (int*)arg;
+ *max_open_files = 11;
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ ReopenDB();
+
+ ReadOptions ro;
+ ro.async_io = true;
+ ro.optimize_multiget_for_io = GetParam();
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data());
+ SyncPoint::GetInstance()->DisableProcessing();
+ ASSERT_EQ(values.size(), 3);
+ ASSERT_EQ(statuses[0], Status::OK());
+ ASSERT_EQ(statuses[1], Status::OK());
+ ASSERT_EQ(statuses[2], Status::IOError());
+
+ HistogramData multiget_io_batch_size;
+
+ statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
+
+ // A batch of 3 async IOs is expected, one for each overlapping file in L1
+ ASSERT_EQ(multiget_io_batch_size.count, 1);
+ ASSERT_EQ(multiget_io_batch_size.max, 2);
+ ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
+}
+
TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) {
std::vector<std::string> key_strs;
std::vector<Slice> keys;
}
f = fp.GetNextFileInLevel();
}
- if (s.ok() && mget_tasks.size() > 0) {
+ if (mget_tasks.size() > 0) {
RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT,
mget_tasks.size());
// Collect all results so far
std::vector<Status> statuses = folly::coro::blockingWait(
folly::coro::collectAllRange(std::move(mget_tasks))
.scheduleOn(&range->context()->executor()));
- for (Status stat : statuses) {
- if (!stat.ok()) {
- s = stat;
+ if (s.ok()) {
+ for (Status stat : statuses) {
+ if (!stat.ok()) {
+ s = std::move(stat);
+ break;
+ }
}
}
unsigned int num_tasks_queued = 0;
to_process.pop_front();
if (batch->IsSearchEnded() || batch->GetRange().empty()) {
+ // If to_process is empty, i.e no more batches to look at, then we need
+ // schedule the enqueued coroutines and wait for them. Otherwise, we
+ // skip this batch and move to the next one in to_process.
if (!to_process.empty()) {
continue;
}
// to_process
s = ProcessBatch(options, batch, mget_tasks, blob_ctxs, batches, waiting,
to_process, num_tasks_queued, mget_stats);
- if (!s.ok()) {
- break;
- }
// If ProcessBatch didn't enqueue any coroutine tasks, it means all
// keys were filtered out. So put the batch back in to_process to
// lookup in the next level
waiting.emplace_back(idx);
}
}
- if (to_process.empty()) {
- if (s.ok() && mget_tasks.size() > 0) {
+ // If ProcessBatch() returned an error, then schedule the enqueued
+ // coroutines and wait for them, then abort the MultiGet.
+ if (to_process.empty() || !s.ok()) {
+ if (mget_tasks.size() > 0) {
assert(waiting.size());
RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, mget_tasks.size());
// Collect all results so far
folly::coro::collectAllRange(std::move(mget_tasks))
.scheduleOn(&range->context()->executor()));
mget_tasks.clear();
- for (Status stat : statuses) {
- if (!stat.ok()) {
- s = stat;
- break;
+ if (s.ok()) {
+ for (Status stat : statuses) {
+ if (!stat.ok()) {
+ s = std::move(stat);
+ break;
+ }
}
}
assert(!s.ok() || waiting.size() == 0);
}
}
+ if (!s.ok()) {
+ break;
+ }
}
uint64_t num_levels = 0;