}
}
-TEST_F(EnvPosixTest, MultiReadIOUringError) {
- // In this test we don't do aligned read, so we can't do direct I/O.
- EnvOptions soptions;
- soptions.use_direct_reads = soptions.use_direct_writes = false;
- std::string fname = test::PerThreadDBPath(env_, "testfile");
-
- std::vector<std::string> scratches;
- std::vector<ReadRequest> reqs;
- GenerateFilesAndRequest(env_, fname, &reqs, &scratches);
- // Query the data
- std::unique_ptr<RandomAccessFile> file;
- ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
-
- bool io_uring_wait_cqe_called = false;
- SyncPoint::GetInstance()->SetCallBack(
- "PosixRandomAccessFile::MultiRead:io_uring_wait_cqe:return",
- [&](void* arg) {
- if (!io_uring_wait_cqe_called) {
- io_uring_wait_cqe_called = true;
- ssize_t& ret = *(static_cast<ssize_t*>(arg));
- ret = 1;
- }
- });
- SyncPoint::GetInstance()->EnableProcessing();
-
- Status s = file->MultiRead(reqs.data(), reqs.size());
- if (io_uring_wait_cqe_called) {
- ASSERT_NOK(s);
- } else {
- s.PermitUncheckedError();
- }
-
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
-}
-
TEST_F(EnvPosixTest, MultiReadIOUringError2) {
// In this test we don't do aligned read, so we can't do direct I/O.
EnvOptions soptions;
bool io_uring_submit_and_wait_called = false;
SyncPoint::GetInstance()->SetCallBack(
- "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return1",
+ "PosixRandomAccessFile::MultiRead:io_uring_sq_ready:return1",
[&](void* arg) {
io_uring_submit_and_wait_called = true;
- ssize_t* ret = static_cast<ssize_t*>(arg);
- (*ret)--;
+ unsigned* ret = static_cast<unsigned*>(arg);
+ *ret = 1;
});
SyncPoint::GetInstance()->SetCallBack(
"PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return2",
[&](void* arg) {
struct io_uring* iu = static_cast<struct io_uring*>(arg);
struct io_uring_cqe* cqe;
- assert(io_uring_wait_cqe(iu, &cqe) == 0);
- io_uring_cqe_seen(iu, cqe);
+ // CQ should be empty after drain - peek should fail
+ int ret = io_uring_peek_cqe(iu, &cqe);
+ assert(-EAGAIN == ret); // No CQEs available
});
SyncPoint::GetInstance()->EnableProcessing();
}
}
+// Test ReadAsync -> MultiRead -> Poll with real io_uring (not mock).
+// This verifies that MultiRead doesn't interfere with async read buffers.
+TEST_F(TestAsyncRead, InterleavingIOUringOperations) {
+#if defined(ROCKSDB_IOURING_PRESENT)
+ // Use the real filesystem directly (not the mock ReadAsyncFS).
+ std::shared_ptr<FileSystem> fs = env_->GetFileSystem();
+ std::string fname = test::PerThreadDBPath(env_, "testfile_iouring");
+
+ constexpr size_t kSectorSize = 4096;
+ constexpr size_t kNumSectors = 8;
+
+ // 1. Create & write to a file.
+ {
+ std::unique_ptr<FSWritableFile> wfile;
+ ASSERT_OK(
+ fs->NewWritableFile(fname, FileOptions(), &wfile, nullptr /*dbg*/));
+
+ for (size_t i = 0; i < kNumSectors; ++i) {
+ auto data = NewAligned(kSectorSize * 8, static_cast<char>(i + 1));
+ Slice slice(data.get(), kSectorSize);
+ ASSERT_OK(wfile->Append(slice, IOOptions(), nullptr));
+ }
+ ASSERT_OK(wfile->Close(IOOptions(), nullptr));
+ }
+
+ // 2. Test interleaved ReadAsync and MultiRead operations.
+ {
+ std::unique_ptr<FSRandomAccessFile> file;
+ ASSERT_OK(fs->NewRandomAccessFile(fname, FileOptions(), &file, nullptr));
+
+ IOOptions opts;
+ std::vector<void*> io_handles(kNumSectors);
+ std::vector<FSReadRequest> async_reqs(kNumSectors);
+ std::vector<std::unique_ptr<char, Deleter>> async_data;
+ std::vector<size_t> vals;
+ IOHandleDeleter del_fn;
+
+ // Initialize async read requests.
+ for (size_t i = 0; i < kNumSectors; i++) {
+ async_reqs[i].offset = i * kSectorSize;
+ async_reqs[i].len = kSectorSize;
+ async_data.emplace_back(NewAligned(kSectorSize, 0));
+ async_reqs[i].scratch = async_data.back().get();
+ vals.push_back(i);
+ }
+
+ // Callback function for async reads.
+ std::function<void(FSReadRequest&, void*)> callback =
+ [&](FSReadRequest& req, void* cb_arg) {
+ assert(cb_arg != nullptr);
+ size_t i = *(reinterpret_cast<size_t*>(cb_arg));
+ async_reqs[i].offset = req.offset;
+ async_reqs[i].result = req.result;
+ async_reqs[i].status = req.status;
+ };
+
+ // Submit asynchronous read requests.
+ for (size_t i = 0; i < kNumSectors; i++) {
+ void* cb_arg = static_cast<void*>(&(vals[i]));
+ IOStatus s = file->ReadAsync(async_reqs[i], opts, callback, cb_arg,
+ &(io_handles[i]), &del_fn, nullptr);
+ if (s.IsNotSupported()) {
+ // io_uring not supported on this system, skip the test.
+ fprintf(stderr, "Skipping test - io_uring not supported: %s\n",
+ s.ToString().c_str());
+ for (size_t j = 0; j < i; j++) {
+ if (io_handles[j] != nullptr) {
+ del_fn(io_handles[j]);
+ }
+ }
+ return;
+ }
+ // For any other error, fail the test.
+ ASSERT_OK(s);
+ }
+
+ // Do a MultiRead on same sectors while async reads are submitted.
+ std::vector<FSReadRequest> multi_reqs(kNumSectors);
+ std::vector<std::unique_ptr<char, Deleter>> multi_data;
+ for (size_t i = 0; i < kNumSectors; i++) {
+ multi_reqs[i].offset = i * kSectorSize;
+ multi_reqs[i].len = kSectorSize;
+ multi_data.emplace_back(NewAligned(kSectorSize, 0));
+ multi_reqs[i].scratch = multi_data.back().get();
+ }
+ ASSERT_OK(file->MultiRead(multi_reqs.data(), kNumSectors, opts, nullptr));
+
+ // Check the status of MultiRead requests (should all succeed).
+ for (size_t i = 0; i < kNumSectors; i++) {
+ auto buf = NewAligned(kSectorSize * 8, static_cast<char>(i + 1));
+ Slice expected_data(buf.get(), kSectorSize);
+
+ ASSERT_EQ(multi_reqs[i].offset, i * kSectorSize);
+ ASSERT_OK(multi_reqs[i].status);
+ ASSERT_EQ(expected_data.ToString(), multi_reqs[i].result.ToString());
+ }
+
+ // Poll for the submitted async requests.
+ ASSERT_OK(fs->Poll(io_handles, kNumSectors));
+
+ // Check the status of async read requests (should all succeed).
+ for (size_t i = 0; i < kNumSectors; i++) {
+ auto buf = NewAligned(kSectorSize * 8, static_cast<char>(i + 1));
+ Slice expected_data(buf.get(), kSectorSize);
+
+ ASSERT_EQ(async_reqs[i].offset, i * kSectorSize);
+ ASSERT_OK(async_reqs[i].status);
+ ASSERT_EQ(expected_data.ToString(), async_reqs[i].result.ToString());
+ }
+
+ // Delete io_handles.
+ for (size_t i = 0; i < io_handles.size(); i++) {
+ del_fn(io_handles[i]);
+ }
+ }
+#else
+ fprintf(stderr, "Skipping test - ROCKSDB_IOURING_PRESENT not defined\n");
+#endif
+}
+
struct StaticDestructionTester {
bool activated = false;
~StaticDestructionTester() {
const EnvOptions& options
#if defined(ROCKSDB_IOURING_PRESENT)
,
- ThreadLocalPtr* thread_local_io_urings
+ ThreadLocalPtr* thread_local_async_read_io_urings,
+ ThreadLocalPtr* thread_local_multi_read_io_urings
#endif
)
: filename_(fname),
logical_sector_size_(logical_block_size)
#if defined(ROCKSDB_IOURING_PRESENT)
,
- thread_local_io_urings_(thread_local_io_urings)
+ thread_local_async_read_io_urings_(thread_local_async_read_io_urings),
+ thread_local_multi_read_io_urings_(thread_local_multi_read_io_urings)
#endif
{
assert(!options.use_direct_reads || !options.use_mmap_reads);
return s;
}
+// MultiRead: Perform multiple concurrent read requests using io_uring.
+//
+// OVERVIEW:
+// This function batches multiple read requests and submits them concurrently
+// to io_uring for improved I/O performance. It operates synchronously from the
+// caller's perspective (blocks until all reads complete) but uses io_uring's
+// async capabilities internally for parallel I/O execution.
+//
+// IO_URING LIFECYCLE:
+// 1. Preparation Phase:
+// - Allocate SQEs (Submission Queue Entries) for read requests
+// - Limited by: min(pending_work, io_uring_sq_space_left(), kIoUringDepth -
+// inflight)
+// - Uses io_uring_sq_space_left() to query available SQ slots
+// - Each SQE is tracked in wrap_cache for completion matching
+//
+// 2. Submission Phase:
+// - Loop: while io_uring_sq_ready() > 0 (SQEs pending submission)
+// - Call io_uring_submit_and_wait() to submit SQEs and wait for CQEs
+// - Handles retryable errors (EINTR, EAGAIN) by continuing
+// - Breaks on terminal errors (logs error, sets err variable)
+//
+// 3. Completion Phase:
+// - Non-blocking CQE reaping via io_uring_for_each_cqe()
+// - Matches CQEs to requests using user_data pointer
+// - Processes results: updates bytes read, handles partial reads
+// - Removes completed requests from wrap_cache
+//
+// 4. Loop Iteration:
+// - Repeats until: all requests submitted AND all completions reaped
+// - Termination condition: (num_reqs == reqs_off) &&
+// resubmit_rq_list.empty() && wrap_cache.empty()
+//
+// ERROR HANDLING STRATEGY:
+// - Retryable submission errors (-EINTR, -EAGAIN): Retry submission
+// - Memory pressure (-ENOMEM): Mark memory_pressure_on_submission, attempt
+// recovery
+// - Terminal submission errors: Break, enter teardown path
+// - Retryable CQE errors (-EINTR, -EAGAIN): Add to resubmit_rq_list for retry
+// - Terminal CQE errors: Set ios to IOError, continue processing other CQEs
+// - Teardown path: If SQEs remain unsubmitted after error, reap submitted CQEs,
+// destroy io_uring instance, return error
+//
+// PARTIAL READ HANDLING:
+// - Short reads (bytes_read < requested): Request added to resubmit_rq_list
+// - finished_len tracks cumulative bytes read across resubmissions
+// - iov.iov_base/iov_len adjusted on each resubmission attempt
+// - UpdateResult() determines if read should be retried based on:
+// * Direct I/O alignment requirements
+// * EOF detection
+// * Error conditions
+//
+// RESUBMISSION LOGIC:
+// - resubmit_rq_list: Requests needing retry (short reads, EINTR/EAGAIN errors)
+// - Prioritized in SQE allocation loop: resubmits before new requests
+// - List cleared after SQE preparation
+// - Requests remain in wrap_cache across resubmissions until fully complete
+//
+// CONCURRENCY CONTROL:
+// - wrap_cache.size(): Tracks total inflight requests (SQ + CQ)
+// - io_uring_sq_ready(): Queries SQEs prepared but not yet submitted
+// - io_uring_sq_space_left(): Queries available SQ slots
+// - Max concurrency: kIoUringDepth (256)
+//
+// ACCOUNTING CORRECTNESS:
+// - Uses io_uring native APIs (io_uring_sq_ready, io_uring_sq_space_left)
+// instead of manual counters for robustness
+// - wrap_cache is the authoritative source for inflight request tracking
+// - Re-query io_uring_sq_ready() after submission loop to detect
+// unsubmitted SQEs (indicates submission errors)
+//
+// THREAD SAFETY:
+// - Uses thread-local io_uring instance (thread_local_multi_read_io_urings_)
+// - IORING_SETUP_SINGLE_ISSUER: Only one thread submits to this ring
+// - IORING_SETUP_DEFER_TASKRUN: Task work runs in submitting thread
+// - No cross-thread coordination required
+//
IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options,
IODebugContext* dbg) {
#if defined(ROCKSDB_IOURING_PRESENT)
struct io_uring* iu = nullptr;
- if (thread_local_io_urings_) {
- iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
+ if (thread_local_multi_read_io_urings_) {
+ iu = static_cast<struct io_uring*>(
+ thread_local_multi_read_io_urings_->Get());
if (iu == nullptr) {
- iu = CreateIOUring();
+ unsigned int flags = 0;
+ flags |= IORING_SETUP_SINGLE_ISSUER;
+ flags |= IORING_SETUP_DEFER_TASKRUN;
+ iu = CreateIOUring(flags);
if (iu != nullptr) {
- thread_local_io_urings_->Reset(iu);
+ thread_local_multi_read_io_urings_->Reset(iu);
}
}
}
return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
}
- IOStatus ios = IOStatus::OK();
-
struct WrappedReadRequest {
FSReadRequest* req;
struct iovec iov;
};
autovector<WrappedReadRequest, 32> req_wraps;
- autovector<WrappedReadRequest*, 4> incomplete_rq_list;
+ autovector<WrappedReadRequest*, 4> resubmit_rq_list;
std::unordered_set<WrappedReadRequest*> wrap_cache;
for (size_t i = 0; i < num_reqs; i++) {
req_wraps.emplace_back(&reqs[i]);
}
+ IOStatus ios = IOStatus::OK();
size_t reqs_off = 0;
- while (num_reqs > reqs_off || !incomplete_rq_list.empty()) {
- size_t this_reqs = (num_reqs - reqs_off) + incomplete_rq_list.size();
-
- // If requests exceed depth, split it into batches
- if (this_reqs > kIoUringDepth) {
- this_reqs = kIoUringDepth;
- }
-
- assert(incomplete_rq_list.size() <= this_reqs);
- for (size_t i = 0; i < this_reqs; i++) {
- WrappedReadRequest* rep_to_submit;
- if (i < incomplete_rq_list.size()) {
- rep_to_submit = incomplete_rq_list[i];
+ while ((num_reqs > reqs_off) || !resubmit_rq_list.empty() ||
+ !wrap_cache.empty()) {
+ assert(resubmit_rq_list.size() + wrap_cache.size() <= kIoUringDepth);
+ // Total number of requests that still need to be submitted, includes:
+ //
+ // 1) requests NOT yet submitted (num_reqs - reqs_off)
+ // 2) requests on resubmission list (resubmit_rq_list)
+ //
+ // capped by min of the # of remaining entries in IO ring submission queue
+ // and the max IO ring depth less the inflight requests.
+ size_t new_sqe_reqs_count = std::min({
+ num_reqs - reqs_off + resubmit_rq_list.size(),
+ static_cast<size_t>(io_uring_sq_space_left(iu)),
+ kIoUringDepth - wrap_cache.size() // queue depth less inflight requests
+ });
+ for (size_t i = 0; i < new_sqe_reqs_count; i++) {
+ WrappedReadRequest* req;
+ if (i < resubmit_rq_list.size()) {
+ req = resubmit_rq_list[i];
} else {
- rep_to_submit = &req_wraps[reqs_off++];
+ req = &req_wraps[reqs_off++];
}
- assert(rep_to_submit->req->len > rep_to_submit->finished_len);
- rep_to_submit->iov.iov_base =
- rep_to_submit->req->scratch + rep_to_submit->finished_len;
- rep_to_submit->iov.iov_len =
- rep_to_submit->req->len - rep_to_submit->finished_len;
+ assert(req->req->len > req->finished_len);
+ req->iov.iov_base = req->req->scratch + req->finished_len;
+ req->iov.iov_len = req->req->len - req->finished_len;
struct io_uring_sqe* sqe;
sqe = io_uring_get_sqe(iu);
- io_uring_prep_readv(
- sqe, fd_, &rep_to_submit->iov, 1,
- rep_to_submit->req->offset + rep_to_submit->finished_len);
- io_uring_sqe_set_data(sqe, rep_to_submit);
- wrap_cache.emplace(rep_to_submit);
+ // NULL is unexpected as we do maintain proper ring accounting.
+ assert(sqe);
+ io_uring_prep_readv(sqe, fd_, &req->iov, 1,
+ req->req->offset + req->finished_len);
+ io_uring_sqe_set_data(sqe, req);
+ wrap_cache.emplace(req);
}
- incomplete_rq_list.clear();
+ resubmit_rq_list.clear();
+
+ struct io_uring_cqe* cqe = nullptr;
+ unsigned head;
+ ssize_t err = 0;
+ bool memory_pressure_on_submission = false;
+ unsigned reqs_pending_submission;
+ unsigned reqs_submitted = 0;
+ while ((reqs_pending_submission = io_uring_sq_ready(iu))) {
+ // MultiRead is synchronous in nature. io_uring_submit_and_wait provides
+ // batching semantics (submit + best effort wait in one syscall), while
+ // io_uring_submit enables async producer/consumer semantics (submit
+ // only, requires separate reaping). We chose batching approach to
+ // reduce the volume of syscalls and context switches.
+ ssize_t ret = io_uring_submit_and_wait(iu, reqs_pending_submission);
+ if (ret < 0) {
+ if (-EINTR == ret || -EAGAIN == ret) {
+ // Submission failed due to rare, retryable syscall error. Try again.
+ continue;
+ }
+ if (-ENOMEM == ret) {
+ fprintf(stderr,
+ "PosixRandomAccessFile::MultiRead: io_uring_submit_and_wait "
+ "experienced terse memory condition.\n");
+ // Best effort to reclaim resources in terse condition.
+ memory_pressure_on_submission = true;
+ } else {
+ fprintf(stderr,
+ "PosixRandomAccessFile::MultiRead: "
+ "io_uring_submit_and_wait returned terminal error: %zd.\n",
+ ret);
+ err = ret;
+ }
+ break;
+ }
+ if (0 == ret) {
+ // This scenario is unexpected for any modern kernel!
+ // We deliberately error out to avoid bugs around infinite loops.
+ fprintf(stderr,
+ "PosixRandomAccessFile::MultiRead: "
+ "io_uring_submit_and_wait returned 0 submissions!\n");
+ break;
+ }
+ reqs_submitted += static_cast<unsigned int>(ret);
+ };
+ reqs_pending_submission = io_uring_sq_ready(iu);
- ssize_t ret =
- io_uring_submit_and_wait(iu, static_cast<unsigned int>(this_reqs));
TEST_SYNC_POINT_CALLBACK(
- "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return1",
- &ret);
- TEST_SYNC_POINT_CALLBACK(
- "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return2",
- iu);
-
- if (static_cast<size_t>(ret) != this_reqs) {
- fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs);
- // If error happens and we submitted fewer than expected, it is an
- // exception case and we don't retry here. We should still consume
- // what is is submitted in the ring.
- for (ssize_t i = 0; i < ret; i++) {
- struct io_uring_cqe* cqe = nullptr;
- io_uring_wait_cqe(iu, &cqe);
- if (cqe != nullptr) {
- io_uring_cqe_seen(iu, cqe);
+ "PosixRandomAccessFile::MultiRead:io_uring_sq_ready:return1",
+ &reqs_pending_submission);
+
+ // Error occurred or IO uring stopped submitting outstanding requests.
+ if (reqs_pending_submission && !memory_pressure_on_submission) {
+ // IO ring is initialized once in thread-local variable and then reused
+ // to handle the consecutive MultiRead API calls. Therefore, it's crucial
+ // to reap all the submitted requests.
+ //
+ // NOTE: Loop will run indefinitely until we reap all the completions!!!
+ size_t nr = 0;
+ assert(reqs_pending_submission <= wrap_cache.size());
+ size_t nr_await_cqe = wrap_cache.size() - reqs_pending_submission;
+ while (nr < nr_await_cqe) {
+ // blocking
+ io_uring_wait_cqes(iu, &cqe,
+ static_cast<unsigned int>(nr_await_cqe - nr),
+ nullptr, nullptr);
+ size_t reaped_cqe_count = 0;
+ io_uring_for_each_cqe(iu, head, cqe) { reaped_cqe_count++; }
+ if (reaped_cqe_count > 0) {
+ io_uring_cq_advance(iu, static_cast<unsigned int>(reaped_cqe_count));
+ nr += reaped_cqe_count;
}
}
- return IOStatus::IOError("io_uring_submit_and_wait() requested " +
- std::to_string(this_reqs) + " but returned " +
- std::to_string(ret));
- }
-
- for (size_t i = 0; i < this_reqs; i++) {
- struct io_uring_cqe* cqe = nullptr;
- WrappedReadRequest* req_wrap;
- // We could use the peek variant here, but this seems safer in terms
- // of our initial wait not reaping all completions
- ret = io_uring_wait_cqe(iu, &cqe);
TEST_SYNC_POINT_CALLBACK(
- "PosixRandomAccessFile::MultiRead:io_uring_wait_cqe:return", &ret);
- if (ret) {
- ios = IOStatus::IOError("io_uring_wait_cqe() returns " +
- std::to_string(ret));
-
- if (cqe != nullptr) {
- io_uring_cqe_seen(iu, cqe);
- }
- continue;
+ "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return2",
+ iu);
+
+ // While all the submitted completions have been reaped successfully,
+ // IO ring submission queue still contains at least one non-submitted
+ // request. Destroy io_uring (discards unsubmitted SQEs).
+ //
+ // NOTE: This is a rare scenario and should not happen in normal cases.
+ // Hence, this should NOT materially impact the performance metrics.
+ io_uring_queue_exit(iu);
+ delete iu;
+ thread_local_multi_read_io_urings_->Reset(nullptr);
+
+ if (err < 0) {
+ return IOStatus::IOError(
+ "io_uring_submit_and_wait() failed with an error " +
+ std::to_string(err));
}
+ return IOStatus::IOError(
+ "io_uring_submit_and_wait() requested " +
+ std::to_string(reqs_submitted + reqs_pending_submission) +
+ " but returned " + std::to_string(reqs_submitted));
+ }
- req_wrap = static_cast<WrappedReadRequest*>(io_uring_cqe_get_data(cqe));
- // Reset cqe data to catch any stray reuse of it
- static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5;
- // Check that we got a valid unique cqe data
- auto wrap_check = wrap_cache.find(req_wrap);
- if (wrap_check == wrap_cache.end()) {
- fprintf(stderr,
- "PosixRandomAccessFile::MultiRead: "
- "Bad cqe data from IO uring - %p\n",
- req_wrap);
- port::PrintStack();
- ios = IOStatus::IOError("io_uring_cqe_get_data() returned " +
- std::to_string((uint64_t)req_wrap));
- continue;
- }
- wrap_cache.erase(wrap_check);
-
- FSReadRequest* req = req_wrap->req;
- size_t bytes_read = 0;
- bool read_again = false;
- UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len,
- false /*async_read*/, use_direct_io(),
- GetRequiredBufferAlignment(), req_wrap->finished_len, req,
- bytes_read, read_again);
- int32_t res = cqe->res;
- if (res >= 0) {
- if (bytes_read == 0) {
+ if ((0 == reqs_submitted) && wrap_cache.size() > reqs_pending_submission) {
+ // If no requests have been submitted and there is at least one request
+ // pending completion, wait for at least one completion to arrive.
+ // This is a guardrail to prevent the busy CPU loops.
+ //
+ // NOTE: it's not really a tight CPU-burning loop in the traditional sense
+ // as it's naturally throttled by the io_uring_submit_and_wait() syscall.
+ io_uring_wait_cqe(iu, &cqe);
+ }
+
+ unsigned int nr = 0;
+ io_uring_for_each_cqe(iu, head, cqe) { // non-blocking
+ if (cqe->user_data) { // non-discarded, valid user data only!
+ nr++;
+ WrappedReadRequest* req_wrap =
+ static_cast<WrappedReadRequest*>(io_uring_cqe_get_data(cqe));
+ // Reset cqe data to catch any stray reuse of it
+ static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5;
+ // Check that we got a valid unique cqe data
+ auto wrap_check = wrap_cache.find(req_wrap);
+ if (wrap_check == wrap_cache.end()) {
+ fprintf(stderr,
+ "PosixRandomAccessFile::MultiRead: "
+ "Bad cqe data from IO uring - %p\n",
+ req_wrap);
+ port::PrintStack();
+ ios = IOStatus::IOError("io_uring_cqe_get_data() returned " +
+ std::to_string((uint64_t)req_wrap));
+ continue;
+ }
+ wrap_cache.erase(wrap_check);
+ if (cqe->res < 0) {
+ if (-EINTR == cqe->res || -EAGAIN == cqe->res) {
+ resubmit_rq_list.push_back(req_wrap);
+ } else {
+ ios = IOStatus::IOError("io_uring_for_each_cqe() returns " +
+ std::to_string(cqe->res));
+ }
+ continue;
+ }
+ // cqe->res >= 0
+ FSReadRequest* req = req_wrap->req;
+ size_t bytes_read = 0;
+ bool read_again = false;
+ UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len,
+ false /*async_read*/, use_direct_io(),
+ GetRequiredBufferAlignment(), req_wrap->finished_len, req,
+ bytes_read, read_again);
+
+ if (0 == bytes_read) {
if (read_again) {
Slice tmp_slice;
req->status =
req->result =
Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
}
- // else It means EOF so no need to do anything.
+ // else it means EOF so no need to do anything.
} else if (bytes_read < req_wrap->iov.iov_len) {
- incomplete_rq_list.push_back(req_wrap);
+ resubmit_rq_list.push_back(req_wrap);
}
}
- io_uring_cqe_seen(iu, cqe);
}
- wrap_cache.clear();
+ if (nr > 0) {
+ io_uring_cq_advance(iu, nr);
+ }
}
return ios;
#else
#if defined(ROCKSDB_IOURING_PRESENT)
// io_uring_queue_init.
struct io_uring* iu = nullptr;
- if (thread_local_io_urings_) {
- iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
+ if (thread_local_async_read_io_urings_) {
+ iu = static_cast<struct io_uring*>(
+ thread_local_async_read_io_urings_->Get());
if (iu == nullptr) {
- iu = CreateIOUring();
+ unsigned int flags = 0;
+ flags |= IORING_SETUP_SINGLE_ISSUER;
+ flags |= IORING_SETUP_DEFER_TASKRUN;
+ iu = CreateIOUring(flags);
if (iu != nullptr) {
- thread_local_io_urings_->Reset(iu);
+ thread_local_async_read_io_urings_->Reset(iu);
}
}
}
io_uring_sqe_set_data(sqe, posix_handle);
// Step 4: io_uring_submit
- ssize_t ret = io_uring_submit(iu);
- if (ret < 0) {
- fprintf(stderr, "io_uring_submit error: %ld\n", long(ret));
- return IOStatus::IOError("io_uring_submit() requested but returned " +
- std::to_string(ret));
+ ssize_t ret;
+ do {
+ ret = io_uring_submit(iu);
+ if (ret < 0) {
+ if (-EINTR == ret || -EAGAIN == ret) {
+ // Submission failed due to transient error. Try again.
+ continue;
+ }
+ fprintf(stderr,
+ "PosixRandomAccessFile::ReadAsync: "
+ "io_uring_submit returned terminal error = %zd\n",
+ ret);
+ break;
+ }
+ if (0 == ret) {
+ // Unexpected. Will be reported as error.
+ break;
+ }
+ } while (ret < 1);
+ if (ret <= 0) {
+ return IOStatus::IOError(
+ "PosixRandomAccessFile::ReadAsync: io_uring_submit() returned " +
+ std::to_string(ret));
+ }
+ if (ret > 1) {
+ fprintf(stderr,
+ "PosixRandomAccessFile::ReadAsync: "
+ "io_uring_submit() returned = %zd\n",
+ ret);
}
return IOStatus::OK();
#else