namespace ROCKSDB_NAMESPACE {
+static bool enable_io_uring = true;
+extern "C" bool RocksDbIOUringEnable() { return enable_io_uring; }
+
class DBBasicTest : public DBTestBase {
public:
DBBasicTest() : DBTestBase("db_basic_test", /*env_do_fsync=*/false) {}
options_.disable_auto_compactions = true;
options_.statistics = statistics_;
options_.table_factory.reset(NewBlockBasedTableFactory(bbto));
+ options_.env = Env::Default();
Reopen(options_);
int num_keys = 0;
const std::shared_ptr<Statistics>& statistics() { return statistics_; }
protected:
+ void PrepareDBForTest() {
+#ifdef ROCKSDB_IOURING_PRESENT
+ Reopen(options_);
+#else // ROCKSDB_IOURING_PRESENT
+ // Warm up the block cache so we don't need to use the IO uring
+ Iterator* iter = dbfull()->NewIterator(ReadOptions());
+ for (iter->SeekToFirst(); iter->Valid() && iter->status().ok();
+ iter->Next())
+ ;
+ EXPECT_OK(iter->status());
+ delete iter;
+#endif // ROCKSDB_IOURING_PRESENT
+ }
+
void ReopenDB() { Reopen(options_); }
private:
std::vector<PinnableSlice> values(key_strs.size());
std::vector<Status> statuses(key_strs.size());
+ PrepareDBForTest();
+
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
// With async IO, lookups will happen in parallel for each key
+#ifdef ROCKSDB_IOURING_PRESENT
if (GetParam()) {
ASSERT_EQ(multiget_io_batch_size.count, 1);
ASSERT_EQ(multiget_io_batch_size.max, 3);
// L0 file
ASSERT_EQ(multiget_io_batch_size.count, 3);
}
+#else // ROCKSDB_IOURING_PRESENT
+ if (GetParam()) {
+ ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
+ }
+#endif // ROCKSDB_IOURING_PRESENT
}
TEST_P(DBMultiGetAsyncIOTest, GetFromL1) {
values.resize(keys.size());
statuses.resize(keys.size());
+ PrepareDBForTest();
+
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
ASSERT_EQ(values[1], "val_l1_" + std::to_string(54));
ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
+#ifdef ROCKSDB_IOURING_PRESENT
HistogramData multiget_io_batch_size;
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
// A batch of 3 async IOs is expected, one for each overlapping file in L1
ASSERT_EQ(multiget_io_batch_size.count, 1);
ASSERT_EQ(multiget_io_batch_size.max, 3);
+#endif // ROCKSDB_IOURING_PRESENT
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
}
+#ifdef ROCKSDB_IOURING_PRESENT
TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) {
std::vector<std::string> key_strs;
std::vector<Slice> keys;
values.resize(keys.size());
statuses.resize(keys.size());
+ int count = 0;
SyncPoint::GetInstance()->SetCallBack(
"TableCache::GetTableReader:BeforeOpenFile", [&](void* status) {
- static int count = 0;
count++;
// Fail the last table reader open, which is the 6th SST file
// since 3 overlapping L0 files + 3 L1 files containing the keys
});
SyncPoint::GetInstance()->EnableProcessing();
- ReopenDB();
+ PrepareDBForTest();
ReadOptions ro;
ro.async_io = true;
ASSERT_EQ(multiget_io_batch_size.max, 2);
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
}
+#endif // ROCKSDB_IOURING_PRESENT
TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) {
std::vector<std::string> key_strs;
values.resize(keys.size());
statuses.resize(keys.size());
+ PrepareDBForTest();
+
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
ASSERT_EQ(values[1], "val_l1_" + std::to_string(54));
ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
+#ifdef ROCKSDB_IOURING_PRESENT
HistogramData multiget_io_batch_size;
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
// will lookup 2 files in parallel and issue 2 async reads
ASSERT_EQ(multiget_io_batch_size.count, 2);
ASSERT_EQ(multiget_io_batch_size.max, 2);
+#endif // ROCKSDB_IOURING_PRESENT
}
TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) {
values.resize(keys.size());
statuses.resize(keys.size());
+ PrepareDBForTest();
+
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
ASSERT_EQ(values[1], "val_l2_" + std::to_string(56));
ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
+#ifdef ROCKSDB_IOURING_PRESENT
HistogramData multiget_io_batch_size;
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
// Otherwise, the L2 lookup will happen after L1.
ASSERT_EQ(multiget_io_batch_size.count, GetParam() ? 1 : 2);
ASSERT_EQ(multiget_io_batch_size.max, GetParam() ? 3 : 2);
+#endif // ROCKSDB_IOURING_PRESENT
}
TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) {
values.resize(keys.size());
statuses.resize(keys.size());
+ PrepareDBForTest();
+
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
}
+#ifdef ROCKSDB_IOURING_PRESENT
TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) {
std::vector<std::string> key_strs;
std::vector<Slice> keys;
values.resize(keys.size());
statuses.resize(keys.size());
+ PrepareDBForTest();
+
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
values.resize(keys.size());
statuses.resize(keys.size());
+ PrepareDBForTest();
+
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
// Bloom filters in L0/L1 will avoid the coroutine calls in those levels
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
}
+#endif // ROCKSDB_IOURING_PRESENT
+
+TEST_P(DBMultiGetAsyncIOTest, GetNoIOUring) {
+ std::vector<std::string> key_strs;
+ std::vector<Slice> keys;
+ std::vector<PinnableSlice> values;
+ std::vector<Status> statuses;
+
+ key_strs.push_back(Key(33));
+ key_strs.push_back(Key(54));
+ key_strs.push_back(Key(102));
+ keys.push_back(key_strs[0]);
+ keys.push_back(key_strs[1]);
+ keys.push_back(key_strs[2]);
+ values.resize(keys.size());
+ statuses.resize(keys.size());
+
+ enable_io_uring = false;
+ ReopenDB();
+
+ ReadOptions ro;
+ ro.async_io = true;
+ ro.optimize_multiget_for_io = GetParam();
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data());
+ ASSERT_EQ(values.size(), 3);
+ ASSERT_EQ(statuses[0], Status::NotSupported());
+ ASSERT_EQ(statuses[1], Status::NotSupported());
+ ASSERT_EQ(statuses[2], Status::NotSupported());
+
+ HistogramData multiget_io_batch_size;
+
+ statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
+
+ // A batch of 3 async IOs is expected, one for each overlapping file in L1
+ ASSERT_EQ(multiget_io_batch_size.count, 1);
+ ASSERT_EQ(multiget_io_batch_size.max, 3);
+ ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
+}
INSTANTIATE_TEST_CASE_P(DBMultiGetAsyncIOTest, DBMultiGetAsyncIOTest,
testing::Bool());
awaiter->io_handle_.resize(awaiter->num_reqs_);
awaiter->del_fn_.resize(awaiter->num_reqs_);
for (size_t i = 0; i < awaiter->num_reqs_; ++i) {
- awaiter->file_
- ->ReadAsync(
- awaiter->read_reqs_[i], awaiter->opts_,
- [](const FSReadRequest& req, void* cb_arg) {
- FSReadRequest* read_req = static_cast<FSReadRequest*>(cb_arg);
- read_req->status = req.status;
- read_req->result = req.result;
- },
- &awaiter->read_reqs_[i], &awaiter->io_handle_[i],
- &awaiter->del_fn_[i], /*aligned_buf=*/nullptr)
- .PermitUncheckedError();
+ IOStatus s = awaiter->file_->ReadAsync(
+ awaiter->read_reqs_[i], awaiter->opts_,
+ [](const FSReadRequest& req, void* cb_arg) {
+ FSReadRequest* read_req = static_cast<FSReadRequest*>(cb_arg);
+ read_req->status = req.status;
+ read_req->result = req.result;
+ },
+ &awaiter->read_reqs_[i], &awaiter->io_handle_[i], &awaiter->del_fn_[i],
+ /*aligned_buf=*/nullptr);
+ if (!s.ok()) {
+ // For any non-ok status, the FileSystem will not call the callback
+ // So let's update the status ourselves
+ awaiter->read_reqs_[i].status = s;
+ }
}
return true;
}
}
ReadAwaiter* waiter;
std::vector<void*> io_handles;
+ IOStatus s;
io_handles.reserve(num_reqs_);
waiter = head_;
do {
} while (waiter != tail_ && (waiter = waiter->next_));
if (io_handles.size() > 0) {
StopWatch sw(SystemClock::Default().get(), stats_, POLL_WAIT_MICROS);
- fs_->Poll(io_handles, io_handles.size()).PermitUncheckedError();
+ s = fs_->Poll(io_handles, io_handles.size());
}
do {
waiter = head_;
if (waiter->io_handle_[i] && waiter->del_fn_[i]) {
waiter->del_fn_[i](waiter->io_handle_[i]);
}
+ if (waiter->read_reqs_[i].status.ok() && !s.ok()) {
+ // Override the request status with the Poll error
+ waiter->read_reqs_[i].status = s;
+ }
}
waiter->awaiting_coro_.resume();
} while (waiter != tail_);