]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
IO uring improvements (#14158)
authorMaciej Szeszko <mszeszko@meta.com>
Fri, 12 Dec 2025 22:25:40 +0000 (14:25 -0800)
committermeta-codesync[bot] <215208954+meta-codesync[bot]@users.noreply.github.com>
Fri, 12 Dec 2025 22:25:40 +0000 (14:25 -0800)
Summary:
`PosixRandomAccessFile::MultiRead` was introduced in Dec 2019 in https://github.com/facebook/rocksdb/pull/5881. Subsequently, 2 years after, we introduced the `PosixRandomAccessFile::ReadAsync` API in https://github.com/facebook/rocksdb/pull/9578, which was reusing the same `PosixFileSystem` IO ring as `MultiRead` API, consequently writing to the very same ring's submission queue (without waiting!). This 'shared ring' design is problematic, since sequentially interleaving `ReadAsync` and `MultiRead` API calls on the very same thread might result in reading 'unknown' events in `MultiRead` leading to `Bad cqe data` errors (and therefore falsely perceived  as a corruption) - which, for some services (running on local flash), in itself is a hard blocker for adopting RocksDB async prefetching ('async IO') that heavily relies on the `ReadAsync` API. This change aims to solve this problem by maintaining separate thread local IO rings for `async reads` and `multi reads` assuring correct execution. In addition, we're adding more robust error handling in form of retries for kernel interrupts and draining the queue when process is experiencing terse memory condition. Separately, we're enhancing the performance aspect by explicitly marking the rings to be written to / read from by a single thread (`IORING_SETUP_SINGLE_ISSUER` [if available]) and defer the task just before the application intends to process completions (`IORING_SETUP_DEFER_TASKRUN` [if available]). See https://man7.org/linux/man-pages/man2/io_uring_setup.2.html for reference.

## Benchmark

**TLDR**
There's no evident advantage of using `io_uring_submit` (relative to proposed `io_uring_submit_and_wait`) across batches of size 10, 250 and 1000 simulating significantly-less, close-to and 4x-above `kIoUringDepth` batch size. `io_uring_submit` might be more appealing if (at least) one of the IOs is slow (which was NOT the case during the benchmark). More notably, with this PR switching from `io_uring_submit_and_wait` -> `io_uring_submit` can be done with a single line change due to implemented guardrails (we can followup with adding optional config for true ring semantics [if needed]).

**Compilation**
```
DEBUG_LEVEL=0 make db_bench
```

**Create DB**

```
./db_bench \
    --db=/db/testdb_2.5m_k100_v6144_16kB_LZ4 \
    --benchmarks=fillseq \
    --num=2500000 \
    --key_size=100 \
    --value_size=6144 \
    --compression_type=LZ4 \
    --block_size=16384 \
    --seed=1723056275
```

**LSM**

* L0: 2 files, L1: 5, L2: 49, L3: 79
* Each file is roughly ~35M in size

### MultiReadRandom (with caching disabled)

Each run was preceded by OS page cache cleanup with `echo 1 | sudo tee /proc/sys/vm/drop_caches`.

```
./db_bench \
    --use_existing_db=true \
    --db=/db/testdb_2.5m_k100_v6144_16kB_LZ4 \
    --compression_type=LZ4 \
    --benchmarks=multireadrandom \
    --num= **<N>** \
    --batch_size= **<B>** \
    --io_uring_enabled=true \
    --async_io=false \
    --optimize_multiget_for_io=false \
    --threads=4 \
    --cache_size=0 \
    --use_direct_reads=true \
    --use_direct_io_for_flush_and_compaction=true \
    --cache_index_and_filter_blocks=false \
    --pin_l0_filter_and_index_blocks_in_cache=false \
    --pin_top_level_index_and_filter=false \
    --prepopulate_block_cache=0 \
    --row_cache_size=0 \
    --use_blob_cache=false \
    --use_compressed_secondary_cache=false
```

  | B=10; N=100,000 | B = 250; N=80,000  | B = 1,000; N=20,000
-- | -- | -- | --
baseline | 31.5 (± 0.4) us/op | 17.5 (± 0.5) us/op | 13.5 (± 0.4) us/op
io_uring_submit_and_wait |  31.5 (± 0.6) us/op |  17.7 (± 0.4) us/op |  13.6 (± 0.4) us/op
io_uring_submit | 31.5 (± 0.6) us/op | 17.5 (± 0.5) us/op | 13.4 (± 0.45) us/op

### Specs

  | Property | Value
-- | --
RocksDB | version 10.9.0
Date | Tue Dec 9 15:57:03 2025
CPU | 56 * Intel Sapphire Rapids (T10 SPR)
Kernel version | 6.9.0-0_fbk12_0_g28f2d09ad102

Pull Request resolved: https://github.com/facebook/rocksdb/pull/14158

Reviewed By: anand1976

Differential Revision: D88172809

Pulled By: mszeszko-meta

fbshipit-source-id: 5198de3d2f18f76fee661a2ec5f447e79ba06fbd

env/env_test.cc
env/fs_posix.cc
env/io_posix.cc
env/io_posix.h

index 30cfdde5105535eb30cc0d231dc29a148db60e0f..e6f56402ea776d5f76c4ed7c5e970eb1d036330e 100644 (file)
@@ -1655,42 +1655,6 @@ void GenerateFilesAndRequest(Env* env, const std::string& fname,
   }
 }
 
-TEST_F(EnvPosixTest, MultiReadIOUringError) {
-  // In this test we don't do aligned read, so we can't do direct I/O.
-  EnvOptions soptions;
-  soptions.use_direct_reads = soptions.use_direct_writes = false;
-  std::string fname = test::PerThreadDBPath(env_, "testfile");
-
-  std::vector<std::string> scratches;
-  std::vector<ReadRequest> reqs;
-  GenerateFilesAndRequest(env_, fname, &reqs, &scratches);
-  // Query the data
-  std::unique_ptr<RandomAccessFile> file;
-  ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
-
-  bool io_uring_wait_cqe_called = false;
-  SyncPoint::GetInstance()->SetCallBack(
-      "PosixRandomAccessFile::MultiRead:io_uring_wait_cqe:return",
-      [&](void* arg) {
-        if (!io_uring_wait_cqe_called) {
-          io_uring_wait_cqe_called = true;
-          ssize_t& ret = *(static_cast<ssize_t*>(arg));
-          ret = 1;
-        }
-      });
-  SyncPoint::GetInstance()->EnableProcessing();
-
-  Status s = file->MultiRead(reqs.data(), reqs.size());
-  if (io_uring_wait_cqe_called) {
-    ASSERT_NOK(s);
-  } else {
-    s.PermitUncheckedError();
-  }
-
-  SyncPoint::GetInstance()->DisableProcessing();
-  SyncPoint::GetInstance()->ClearAllCallBacks();
-}
-
 TEST_F(EnvPosixTest, MultiReadIOUringError2) {
   // In this test we don't do aligned read, so we can't do direct I/O.
   EnvOptions soptions;
@@ -1706,19 +1670,20 @@ TEST_F(EnvPosixTest, MultiReadIOUringError2) {
 
   bool io_uring_submit_and_wait_called = false;
   SyncPoint::GetInstance()->SetCallBack(
-      "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return1",
+      "PosixRandomAccessFile::MultiRead:io_uring_sq_ready:return1",
       [&](void* arg) {
         io_uring_submit_and_wait_called = true;
-        ssize_t* ret = static_cast<ssize_t*>(arg);
-        (*ret)--;
+        unsigned* ret = static_cast<unsigned*>(arg);
+        *ret = 1;
       });
   SyncPoint::GetInstance()->SetCallBack(
       "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return2",
       [&](void* arg) {
         struct io_uring* iu = static_cast<struct io_uring*>(arg);
         struct io_uring_cqe* cqe;
-        assert(io_uring_wait_cqe(iu, &cqe) == 0);
-        io_uring_cqe_seen(iu, cqe);
+        // CQ should be empty after drain - peek should fail
+        int ret = io_uring_peek_cqe(iu, &cqe);
+        assert(-EAGAIN == ret);  // No CQEs available
       });
   SyncPoint::GetInstance()->EnableProcessing();
 
@@ -3640,6 +3605,126 @@ TEST_F(TestAsyncRead, ReadAsync) {
   }
 }
 
+// Test ReadAsync -> MultiRead -> Poll with real io_uring (not mock).
+// This verifies that MultiRead doesn't interfere with async read buffers.
+TEST_F(TestAsyncRead, InterleavingIOUringOperations) {
+#if defined(ROCKSDB_IOURING_PRESENT)
+  // Use the real filesystem directly (not the mock ReadAsyncFS).
+  std::shared_ptr<FileSystem> fs = env_->GetFileSystem();
+  std::string fname = test::PerThreadDBPath(env_, "testfile_iouring");
+
+  constexpr size_t kSectorSize = 4096;
+  constexpr size_t kNumSectors = 8;
+
+  // 1. Create & write to a file.
+  {
+    std::unique_ptr<FSWritableFile> wfile;
+    ASSERT_OK(
+        fs->NewWritableFile(fname, FileOptions(), &wfile, nullptr /*dbg*/));
+
+    for (size_t i = 0; i < kNumSectors; ++i) {
+      auto data = NewAligned(kSectorSize * 8, static_cast<char>(i + 1));
+      Slice slice(data.get(), kSectorSize);
+      ASSERT_OK(wfile->Append(slice, IOOptions(), nullptr));
+    }
+    ASSERT_OK(wfile->Close(IOOptions(), nullptr));
+  }
+
+  // 2. Test interleaved ReadAsync and MultiRead operations.
+  {
+    std::unique_ptr<FSRandomAccessFile> file;
+    ASSERT_OK(fs->NewRandomAccessFile(fname, FileOptions(), &file, nullptr));
+
+    IOOptions opts;
+    std::vector<void*> io_handles(kNumSectors);
+    std::vector<FSReadRequest> async_reqs(kNumSectors);
+    std::vector<std::unique_ptr<char, Deleter>> async_data;
+    std::vector<size_t> vals;
+    IOHandleDeleter del_fn;
+
+    // Initialize async read requests.
+    for (size_t i = 0; i < kNumSectors; i++) {
+      async_reqs[i].offset = i * kSectorSize;
+      async_reqs[i].len = kSectorSize;
+      async_data.emplace_back(NewAligned(kSectorSize, 0));
+      async_reqs[i].scratch = async_data.back().get();
+      vals.push_back(i);
+    }
+
+    // Callback function for async reads.
+    std::function<void(FSReadRequest&, void*)> callback =
+        [&](FSReadRequest& req, void* cb_arg) {
+          assert(cb_arg != nullptr);
+          size_t i = *(reinterpret_cast<size_t*>(cb_arg));
+          async_reqs[i].offset = req.offset;
+          async_reqs[i].result = req.result;
+          async_reqs[i].status = req.status;
+        };
+
+    // Submit asynchronous read requests.
+    for (size_t i = 0; i < kNumSectors; i++) {
+      void* cb_arg = static_cast<void*>(&(vals[i]));
+      IOStatus s = file->ReadAsync(async_reqs[i], opts, callback, cb_arg,
+                                   &(io_handles[i]), &del_fn, nullptr);
+      if (s.IsNotSupported()) {
+        // io_uring not supported on this system, skip the test.
+        fprintf(stderr, "Skipping test - io_uring not supported: %s\n",
+                s.ToString().c_str());
+        for (size_t j = 0; j < i; j++) {
+          if (io_handles[j] != nullptr) {
+            del_fn(io_handles[j]);
+          }
+        }
+        return;
+      }
+      // For any other error, fail the test.
+      ASSERT_OK(s);
+    }
+
+    // Do a MultiRead on same sectors while async reads are submitted.
+    std::vector<FSReadRequest> multi_reqs(kNumSectors);
+    std::vector<std::unique_ptr<char, Deleter>> multi_data;
+    for (size_t i = 0; i < kNumSectors; i++) {
+      multi_reqs[i].offset = i * kSectorSize;
+      multi_reqs[i].len = kSectorSize;
+      multi_data.emplace_back(NewAligned(kSectorSize, 0));
+      multi_reqs[i].scratch = multi_data.back().get();
+    }
+    ASSERT_OK(file->MultiRead(multi_reqs.data(), kNumSectors, opts, nullptr));
+
+    // Check the status of MultiRead requests (should all succeed).
+    for (size_t i = 0; i < kNumSectors; i++) {
+      auto buf = NewAligned(kSectorSize * 8, static_cast<char>(i + 1));
+      Slice expected_data(buf.get(), kSectorSize);
+
+      ASSERT_EQ(multi_reqs[i].offset, i * kSectorSize);
+      ASSERT_OK(multi_reqs[i].status);
+      ASSERT_EQ(expected_data.ToString(), multi_reqs[i].result.ToString());
+    }
+
+    // Poll for the submitted async requests.
+    ASSERT_OK(fs->Poll(io_handles, kNumSectors));
+
+    // Check the status of async read requests (should all succeed).
+    for (size_t i = 0; i < kNumSectors; i++) {
+      auto buf = NewAligned(kSectorSize * 8, static_cast<char>(i + 1));
+      Slice expected_data(buf.get(), kSectorSize);
+
+      ASSERT_EQ(async_reqs[i].offset, i * kSectorSize);
+      ASSERT_OK(async_reqs[i].status);
+      ASSERT_EQ(expected_data.ToString(), async_reqs[i].result.ToString());
+    }
+
+    // Delete io_handles.
+    for (size_t i = 0; i < io_handles.size(); i++) {
+      del_fn(io_handles[i]);
+    }
+  }
+#else
+  fprintf(stderr, "Skipping test - ROCKSDB_IOURING_PRESENT not defined\n");
+#endif
+}
+
 struct StaticDestructionTester {
   bool activated = false;
   ~StaticDestructionTester() {
index c93d9ce8675fe2cb0888eb68edc8ba43b48f08da..34efe1204f6d9ddf062b89d2360208b12eca9a44 100644 (file)
@@ -270,7 +270,10 @@ class PosixFileSystem : public FileSystem {
           options
 #if defined(ROCKSDB_IOURING_PRESENT)
           ,
-          !IsIOUringEnabled() ? nullptr : thread_local_io_urings_.get()
+          !IsIOUringEnabled() ? nullptr
+                              : thread_local_async_read_io_urings_.get(),
+          !IsIOUringEnabled() ? nullptr
+                              : thread_local_multi_read_io_urings_.get()
 #endif
               ));
     }
@@ -1087,8 +1090,9 @@ class PosixFileSystem : public FileSystem {
 #if defined(ROCKSDB_IOURING_PRESENT)
     // io_uring_queue_init.
     struct io_uring* iu = nullptr;
-    if (thread_local_io_urings_) {
-      iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
+    if (thread_local_async_read_io_urings_) {
+      iu = static_cast<struct io_uring*>(
+          thread_local_async_read_io_urings_->Get());
     }
 
     // Init failed, platform doesn't support io_uring.
@@ -1161,8 +1165,9 @@ class PosixFileSystem : public FileSystem {
 #if defined(ROCKSDB_IOURING_PRESENT)
     // io_uring_queue_init.
     struct io_uring* iu = nullptr;
-    if (thread_local_io_urings_) {
-      iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
+    if (thread_local_async_read_io_urings_) {
+      iu = static_cast<struct io_uring*>(
+          thread_local_async_read_io_urings_->Get());
     }
 
     // Init failed, platform doesn't support io_uring.
@@ -1277,7 +1282,8 @@ class PosixFileSystem : public FileSystem {
 
 #if defined(ROCKSDB_IOURING_PRESENT)
   // io_uring instance
-  std::unique_ptr<ThreadLocalPtr> thread_local_io_urings_;
+  std::unique_ptr<ThreadLocalPtr> thread_local_async_read_io_urings_;
+  std::unique_ptr<ThreadLocalPtr> thread_local_multi_read_io_urings_;
 #endif
 
   size_t page_size_;
@@ -1337,7 +1343,8 @@ PosixFileSystem::PosixFileSystem()
   // io_uring can be created.
   struct io_uring* new_io_uring = CreateIOUring();
   if (new_io_uring != nullptr) {
-    thread_local_io_urings_.reset(new ThreadLocalPtr(DeleteIOUring));
+    thread_local_async_read_io_urings_.reset(new ThreadLocalPtr(DeleteIOUring));
+    thread_local_multi_read_io_urings_.reset(new ThreadLocalPtr(DeleteIOUring));
     delete new_io_uring;
   }
 #endif
index 5a0f0338d50ab4678f5a7b4f3202e07e59937f9a..489e5b3a9e50a622e0b092b993a21bb0b6234042 100644 (file)
@@ -589,7 +589,8 @@ PosixRandomAccessFile::PosixRandomAccessFile(
     const EnvOptions& options
 #if defined(ROCKSDB_IOURING_PRESENT)
     ,
-    ThreadLocalPtr* thread_local_io_urings
+    ThreadLocalPtr* thread_local_async_read_io_urings,
+    ThreadLocalPtr* thread_local_multi_read_io_urings
 #endif
     )
     : filename_(fname),
@@ -598,7 +599,8 @@ PosixRandomAccessFile::PosixRandomAccessFile(
       logical_sector_size_(logical_block_size)
 #if defined(ROCKSDB_IOURING_PRESENT)
       ,
-      thread_local_io_urings_(thread_local_io_urings)
+      thread_local_async_read_io_urings_(thread_local_async_read_io_urings),
+      thread_local_multi_read_io_urings_(thread_local_multi_read_io_urings)
 #endif
 {
   assert(!options.use_direct_reads || !options.use_mmap_reads);
@@ -659,6 +661,83 @@ IOStatus PosixRandomAccessFile::Read(uint64_t offset, size_t n,
   return s;
 }
 
+// MultiRead: Perform multiple concurrent read requests using io_uring.
+//
+// OVERVIEW:
+// This function batches multiple read requests and submits them concurrently
+// to io_uring for improved I/O performance. It operates synchronously from the
+// caller's perspective (blocks until all reads complete) but uses io_uring's
+// async capabilities internally for parallel I/O execution.
+//
+// IO_URING LIFECYCLE:
+// 1. Preparation Phase:
+//    - Allocate SQEs (Submission Queue Entries) for read requests
+//    - Limited by: min(pending_work, io_uring_sq_space_left(), kIoUringDepth -
+//    inflight)
+//    - Uses io_uring_sq_space_left() to query available SQ slots
+//    - Each SQE is tracked in wrap_cache for completion matching
+//
+// 2. Submission Phase:
+//    - Loop: while io_uring_sq_ready() > 0 (SQEs pending submission)
+//    - Call io_uring_submit_and_wait() to submit SQEs and wait for CQEs
+//    - Handles retryable errors (EINTR, EAGAIN) by continuing
+//    - Breaks on terminal errors (logs error, sets err variable)
+//
+// 3. Completion Phase:
+//    - Non-blocking CQE reaping via io_uring_for_each_cqe()
+//    - Matches CQEs to requests using user_data pointer
+//    - Processes results: updates bytes read, handles partial reads
+//    - Removes completed requests from wrap_cache
+//
+// 4. Loop Iteration:
+//    - Repeats until: all requests submitted AND all completions reaped
+//    - Termination condition: (num_reqs == reqs_off) &&
+//    resubmit_rq_list.empty() && wrap_cache.empty()
+//
+// ERROR HANDLING STRATEGY:
+// - Retryable submission errors (-EINTR, -EAGAIN): Retry submission
+// - Memory pressure (-ENOMEM): Mark memory_pressure_on_submission, attempt
+// recovery
+// - Terminal submission errors: Break, enter teardown path
+// - Retryable CQE errors (-EINTR, -EAGAIN): Add to resubmit_rq_list for retry
+// - Terminal CQE errors: Set ios to IOError, continue processing other CQEs
+// - Teardown path: If SQEs remain unsubmitted after error, reap submitted CQEs,
+//   destroy io_uring instance, return error
+//
+// PARTIAL READ HANDLING:
+// - Short reads (bytes_read < requested): Request added to resubmit_rq_list
+// - finished_len tracks cumulative bytes read across resubmissions
+// - iov.iov_base/iov_len adjusted on each resubmission attempt
+// - UpdateResult() determines if read should be retried based on:
+//   * Direct I/O alignment requirements
+//   * EOF detection
+//   * Error conditions
+//
+// RESUBMISSION LOGIC:
+// - resubmit_rq_list: Requests needing retry (short reads, EINTR/EAGAIN errors)
+// - Prioritized in SQE allocation loop: resubmits before new requests
+// - List cleared after SQE preparation
+// - Requests remain in wrap_cache across resubmissions until fully complete
+//
+// CONCURRENCY CONTROL:
+// - wrap_cache.size(): Tracks total inflight requests (SQ + CQ)
+// - io_uring_sq_ready(): Queries SQEs prepared but not yet submitted
+// - io_uring_sq_space_left(): Queries available SQ slots
+// - Max concurrency: kIoUringDepth (256)
+//
+// ACCOUNTING CORRECTNESS:
+// - Uses io_uring native APIs (io_uring_sq_ready, io_uring_sq_space_left)
+//   instead of manual counters for robustness
+// - wrap_cache is the authoritative source for inflight request tracking
+// - Re-query io_uring_sq_ready() after submission loop to detect
+//   unsubmitted SQEs (indicates submission errors)
+//
+// THREAD SAFETY:
+// - Uses thread-local io_uring instance (thread_local_multi_read_io_urings_)
+// - IORING_SETUP_SINGLE_ISSUER: Only one thread submits to this ring
+// - IORING_SETUP_DEFER_TASKRUN: Task work runs in submitting thread
+// - No cross-thread coordination required
+//
 IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs,
                                           const IOOptions& options,
                                           IODebugContext* dbg) {
@@ -672,12 +751,16 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs,
 
 #if defined(ROCKSDB_IOURING_PRESENT)
   struct io_uring* iu = nullptr;
-  if (thread_local_io_urings_) {
-    iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
+  if (thread_local_multi_read_io_urings_) {
+    iu = static_cast<struct io_uring*>(
+        thread_local_multi_read_io_urings_->Get());
     if (iu == nullptr) {
-      iu = CreateIOUring();
+      unsigned int flags = 0;
+      flags |= IORING_SETUP_SINGLE_ISSUER;
+      flags |= IORING_SETUP_DEFER_TASKRUN;
+      iu = CreateIOUring(flags);
       if (iu != nullptr) {
-        thread_local_io_urings_->Reset(iu);
+        thread_local_multi_read_io_urings_->Reset(iu);
       }
     }
   }
@@ -688,8 +771,6 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs,
     return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
   }
 
-  IOStatus ios = IOStatus::OK();
-
   struct WrappedReadRequest {
     FSReadRequest* req;
     struct iovec iov;
@@ -698,118 +779,199 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs,
   };
 
   autovector<WrappedReadRequest, 32> req_wraps;
-  autovector<WrappedReadRequest*, 4> incomplete_rq_list;
+  autovector<WrappedReadRequest*, 4> resubmit_rq_list;
   std::unordered_set<WrappedReadRequest*> wrap_cache;
 
   for (size_t i = 0; i < num_reqs; i++) {
     req_wraps.emplace_back(&reqs[i]);
   }
 
+  IOStatus ios = IOStatus::OK();
   size_t reqs_off = 0;
-  while (num_reqs > reqs_off || !incomplete_rq_list.empty()) {
-    size_t this_reqs = (num_reqs - reqs_off) + incomplete_rq_list.size();
-
-    // If requests exceed depth, split it into batches
-    if (this_reqs > kIoUringDepth) {
-      this_reqs = kIoUringDepth;
-    }
-
-    assert(incomplete_rq_list.size() <= this_reqs);
-    for (size_t i = 0; i < this_reqs; i++) {
-      WrappedReadRequest* rep_to_submit;
-      if (i < incomplete_rq_list.size()) {
-        rep_to_submit = incomplete_rq_list[i];
+  while ((num_reqs > reqs_off) || !resubmit_rq_list.empty() ||
+         !wrap_cache.empty()) {
+    assert(resubmit_rq_list.size() + wrap_cache.size() <= kIoUringDepth);
+    // Total number of requests that still need to be submitted, includes:
+    //
+    //  1) requests NOT yet submitted (num_reqs - reqs_off)
+    //  2) requests on resubmission list (resubmit_rq_list)
+    //
+    // capped by min of the # of remaining entries in IO ring submission queue
+    // and the max IO ring depth less the inflight requests.
+    size_t new_sqe_reqs_count = std::min({
+        num_reqs - reqs_off + resubmit_rq_list.size(),
+        static_cast<size_t>(io_uring_sq_space_left(iu)),
+        kIoUringDepth - wrap_cache.size()  // queue depth less inflight requests
+    });
+    for (size_t i = 0; i < new_sqe_reqs_count; i++) {
+      WrappedReadRequest* req;
+      if (i < resubmit_rq_list.size()) {
+        req = resubmit_rq_list[i];
       } else {
-        rep_to_submit = &req_wraps[reqs_off++];
+        req = &req_wraps[reqs_off++];
       }
-      assert(rep_to_submit->req->len > rep_to_submit->finished_len);
-      rep_to_submit->iov.iov_base =
-          rep_to_submit->req->scratch + rep_to_submit->finished_len;
-      rep_to_submit->iov.iov_len =
-          rep_to_submit->req->len - rep_to_submit->finished_len;
+      assert(req->req->len > req->finished_len);
+      req->iov.iov_base = req->req->scratch + req->finished_len;
+      req->iov.iov_len = req->req->len - req->finished_len;
 
       struct io_uring_sqe* sqe;
       sqe = io_uring_get_sqe(iu);
-      io_uring_prep_readv(
-          sqe, fd_, &rep_to_submit->iov, 1,
-          rep_to_submit->req->offset + rep_to_submit->finished_len);
-      io_uring_sqe_set_data(sqe, rep_to_submit);
-      wrap_cache.emplace(rep_to_submit);
+      // NULL is unexpected as we do maintain proper ring accounting.
+      assert(sqe);
+      io_uring_prep_readv(sqe, fd_, &req->iov, 1,
+                          req->req->offset + req->finished_len);
+      io_uring_sqe_set_data(sqe, req);
+      wrap_cache.emplace(req);
     }
-    incomplete_rq_list.clear();
+    resubmit_rq_list.clear();
+
+    struct io_uring_cqe* cqe = nullptr;
+    unsigned head;
+    ssize_t err = 0;
+    bool memory_pressure_on_submission = false;
+    unsigned reqs_pending_submission;
+    unsigned reqs_submitted = 0;
+    while ((reqs_pending_submission = io_uring_sq_ready(iu))) {
+      // MultiRead is synchronous in nature. io_uring_submit_and_wait provides
+      // batching semantics (submit + best effort wait in one syscall), while
+      // io_uring_submit enables async producer/consumer semantics (submit
+      // only, requires separate reaping). We chose batching approach to
+      // reduce the volume of syscalls and context switches.
+      ssize_t ret = io_uring_submit_and_wait(iu, reqs_pending_submission);
+      if (ret < 0) {
+        if (-EINTR == ret || -EAGAIN == ret) {
+          // Submission failed due to rare, retryable syscall error. Try again.
+          continue;
+        }
+        if (-ENOMEM == ret) {
+          fprintf(stderr,
+                  "PosixRandomAccessFile::MultiRead: io_uring_submit_and_wait "
+                  "experienced terse memory condition.\n");
+          // Best effort to reclaim resources in terse condition.
+          memory_pressure_on_submission = true;
+        } else {
+          fprintf(stderr,
+                  "PosixRandomAccessFile::MultiRead: "
+                  "io_uring_submit_and_wait returned terminal error: %zd.\n",
+                  ret);
+          err = ret;
+        }
+        break;
+      }
+      if (0 == ret) {
+        // This scenario is unexpected for any modern kernel!
+        // We deliberately error out to avoid bugs around infinite loops.
+        fprintf(stderr,
+                "PosixRandomAccessFile::MultiRead: "
+                "io_uring_submit_and_wait returned 0 submissions!\n");
+        break;
+      }
+      reqs_submitted += static_cast<unsigned int>(ret);
+    };
+    reqs_pending_submission = io_uring_sq_ready(iu);
 
-    ssize_t ret =
-        io_uring_submit_and_wait(iu, static_cast<unsigned int>(this_reqs));
     TEST_SYNC_POINT_CALLBACK(
-        "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return1",
-        &ret);
-    TEST_SYNC_POINT_CALLBACK(
-        "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return2",
-        iu);
-
-    if (static_cast<size_t>(ret) != this_reqs) {
-      fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs);
-      // If error happens and we submitted fewer than expected, it is an
-      // exception case and we don't retry here. We should still consume
-      // what is is submitted in the ring.
-      for (ssize_t i = 0; i < ret; i++) {
-        struct io_uring_cqe* cqe = nullptr;
-        io_uring_wait_cqe(iu, &cqe);
-        if (cqe != nullptr) {
-          io_uring_cqe_seen(iu, cqe);
+        "PosixRandomAccessFile::MultiRead:io_uring_sq_ready:return1",
+        &reqs_pending_submission);
+
+    // Error occurred or IO uring stopped submitting outstanding requests.
+    if (reqs_pending_submission && !memory_pressure_on_submission) {
+      // IO ring is initialized once in thread-local variable and then reused
+      // to handle the consecutive MultiRead API calls. Therefore, it's crucial
+      // to reap all the submitted requests.
+      //
+      // NOTE: Loop will run indefinitely until we reap all the completions!!!
+      size_t nr = 0;
+      assert(reqs_pending_submission <= wrap_cache.size());
+      size_t nr_await_cqe = wrap_cache.size() - reqs_pending_submission;
+      while (nr < nr_await_cqe) {
+        // blocking
+        io_uring_wait_cqes(iu, &cqe,
+                           static_cast<unsigned int>(nr_await_cqe - nr),
+                           nullptr, nullptr);
+        size_t reaped_cqe_count = 0;
+        io_uring_for_each_cqe(iu, head, cqe) { reaped_cqe_count++; }
+        if (reaped_cqe_count > 0) {
+          io_uring_cq_advance(iu, static_cast<unsigned int>(reaped_cqe_count));
+          nr += reaped_cqe_count;
         }
       }
-      return IOStatus::IOError("io_uring_submit_and_wait() requested " +
-                               std::to_string(this_reqs) + " but returned " +
-                               std::to_string(ret));
-    }
-
-    for (size_t i = 0; i < this_reqs; i++) {
-      struct io_uring_cqe* cqe = nullptr;
-      WrappedReadRequest* req_wrap;
 
-      // We could use the peek variant here, but this seems safer in terms
-      // of our initial wait not reaping all completions
-      ret = io_uring_wait_cqe(iu, &cqe);
       TEST_SYNC_POINT_CALLBACK(
-          "PosixRandomAccessFile::MultiRead:io_uring_wait_cqe:return", &ret);
-      if (ret) {
-        ios = IOStatus::IOError("io_uring_wait_cqe() returns " +
-                                std::to_string(ret));
-
-        if (cqe != nullptr) {
-          io_uring_cqe_seen(iu, cqe);
-        }
-        continue;
+          "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return2",
+          iu);
+
+      // While all the submitted completions have been reaped successfully,
+      // IO ring submission queue still contains at least one non-submitted
+      // request. Destroy io_uring (discards unsubmitted SQEs).
+      //
+      // NOTE: This is a rare scenario and should not happen in normal cases.
+      //       Hence, this should NOT materially impact the performance metrics.
+      io_uring_queue_exit(iu);
+      delete iu;
+      thread_local_multi_read_io_urings_->Reset(nullptr);
+
+      if (err < 0) {
+        return IOStatus::IOError(
+            "io_uring_submit_and_wait() failed with an error " +
+            std::to_string(err));
       }
+      return IOStatus::IOError(
+          "io_uring_submit_and_wait() requested " +
+          std::to_string(reqs_submitted + reqs_pending_submission) +
+          " but returned " + std::to_string(reqs_submitted));
+    }
 
-      req_wrap = static_cast<WrappedReadRequest*>(io_uring_cqe_get_data(cqe));
-      // Reset cqe data to catch any stray reuse of it
-      static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5;
-      // Check that we got a valid unique cqe data
-      auto wrap_check = wrap_cache.find(req_wrap);
-      if (wrap_check == wrap_cache.end()) {
-        fprintf(stderr,
-                "PosixRandomAccessFile::MultiRead: "
-                "Bad cqe data from IO uring - %p\n",
-                req_wrap);
-        port::PrintStack();
-        ios = IOStatus::IOError("io_uring_cqe_get_data() returned " +
-                                std::to_string((uint64_t)req_wrap));
-        continue;
-      }
-      wrap_cache.erase(wrap_check);
-
-      FSReadRequest* req = req_wrap->req;
-      size_t bytes_read = 0;
-      bool read_again = false;
-      UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len,
-                   false /*async_read*/, use_direct_io(),
-                   GetRequiredBufferAlignment(), req_wrap->finished_len, req,
-                   bytes_read, read_again);
-      int32_t res = cqe->res;
-      if (res >= 0) {
-        if (bytes_read == 0) {
+    if ((0 == reqs_submitted) && wrap_cache.size() > reqs_pending_submission) {
+      // If no requests have been submitted and there is at least one request
+      // pending completion, wait for at least one completion to arrive.
+      // This is a guardrail to prevent the busy CPU loops.
+      //
+      // NOTE: it's not really a tight CPU-burning loop in the traditional sense
+      // as it's naturally throttled by the io_uring_submit_and_wait() syscall.
+      io_uring_wait_cqe(iu, &cqe);
+    }
+
+    unsigned int nr = 0;
+    io_uring_for_each_cqe(iu, head, cqe) {  // non-blocking
+      if (cqe->user_data) {  // non-discarded, valid user data only!
+        nr++;
+        WrappedReadRequest* req_wrap =
+            static_cast<WrappedReadRequest*>(io_uring_cqe_get_data(cqe));
+        // Reset cqe data to catch any stray reuse of it
+        static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5;
+        // Check that we got a valid unique cqe data
+        auto wrap_check = wrap_cache.find(req_wrap);
+        if (wrap_check == wrap_cache.end()) {
+          fprintf(stderr,
+                  "PosixRandomAccessFile::MultiRead: "
+                  "Bad cqe data from IO uring - %p\n",
+                  req_wrap);
+          port::PrintStack();
+          ios = IOStatus::IOError("io_uring_cqe_get_data() returned " +
+                                  std::to_string((uint64_t)req_wrap));
+          continue;
+        }
+        wrap_cache.erase(wrap_check);
+        if (cqe->res < 0) {
+          if (-EINTR == cqe->res || -EAGAIN == cqe->res) {
+            resubmit_rq_list.push_back(req_wrap);
+          } else {
+            ios = IOStatus::IOError("io_uring_for_each_cqe() returns " +
+                                    std::to_string(cqe->res));
+          }
+          continue;
+        }
+        // cqe->res >= 0
+        FSReadRequest* req = req_wrap->req;
+        size_t bytes_read = 0;
+        bool read_again = false;
+        UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len,
+                     false /*async_read*/, use_direct_io(),
+                     GetRequiredBufferAlignment(), req_wrap->finished_len, req,
+                     bytes_read, read_again);
+
+        if (0 == bytes_read) {
           if (read_again) {
             Slice tmp_slice;
             req->status =
@@ -819,14 +981,15 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs,
             req->result =
                 Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
           }
-          // else It means EOF so no need to do anything.
+          // else it means EOF so no need to do anything.
         } else if (bytes_read < req_wrap->iov.iov_len) {
-          incomplete_rq_list.push_back(req_wrap);
+          resubmit_rq_list.push_back(req_wrap);
         }
       }
-      io_uring_cqe_seen(iu, cqe);
     }
-    wrap_cache.clear();
+    if (nr > 0) {
+      io_uring_cq_advance(iu, nr);
+    }
   }
   return ios;
 #else
@@ -923,12 +1086,16 @@ IOStatus PosixRandomAccessFile::ReadAsync(
 #if defined(ROCKSDB_IOURING_PRESENT)
   // io_uring_queue_init.
   struct io_uring* iu = nullptr;
-  if (thread_local_io_urings_) {
-    iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
+  if (thread_local_async_read_io_urings_) {
+    iu = static_cast<struct io_uring*>(
+        thread_local_async_read_io_urings_->Get());
     if (iu == nullptr) {
-      iu = CreateIOUring();
+      unsigned int flags = 0;
+      flags |= IORING_SETUP_SINGLE_ISSUER;
+      flags |= IORING_SETUP_DEFER_TASKRUN;
+      iu = CreateIOUring(flags);
       if (iu != nullptr) {
-        thread_local_io_urings_->Reset(iu);
+        thread_local_async_read_io_urings_->Reset(iu);
       }
     }
   }
@@ -966,11 +1133,35 @@ IOStatus PosixRandomAccessFile::ReadAsync(
   io_uring_sqe_set_data(sqe, posix_handle);
 
   // Step 4: io_uring_submit
-  ssize_t ret = io_uring_submit(iu);
-  if (ret < 0) {
-    fprintf(stderr, "io_uring_submit error: %ld\n", long(ret));
-    return IOStatus::IOError("io_uring_submit() requested but returned " +
-                             std::to_string(ret));
+  ssize_t ret;
+  do {
+    ret = io_uring_submit(iu);
+    if (ret < 0) {
+      if (-EINTR == ret || -EAGAIN == ret) {
+        // Submission failed due to transient error. Try again.
+        continue;
+      }
+      fprintf(stderr,
+              "PosixRandomAccessFile::ReadAsync: "
+              "io_uring_submit returned terminal error = %zd\n",
+              ret);
+      break;
+    }
+    if (0 == ret) {
+      // Unexpected. Will be reported as error.
+      break;
+    }
+  } while (ret < 1);
+  if (ret <= 0) {
+    return IOStatus::IOError(
+        "PosixRandomAccessFile::ReadAsync: io_uring_submit() returned " +
+        std::to_string(ret));
+  }
+  if (ret > 1) {
+    fprintf(stderr,
+            "PosixRandomAccessFile::ReadAsync: "
+            "io_uring_submit() returned = %zd\n",
+            ret);
   }
   return IOStatus::OK();
 #else
index 39fd8c0f49d124caedf6746478469b59d85fc77e..ca33b8e3e948057450bb60a1e46bb54103aa968c 100644 (file)
 #if defined(ROCKSDB_IOURING_PRESENT)
 #include <liburing.h>
 #include <sys/uio.h>
+
+// Compatibility defines for io_uring flags that may not be present in older
+// kernel headers. These values are fixed and won't change, so it's safe to
+// define them even if the running kernel doesn't support them.
+#ifndef IORING_SETUP_SINGLE_ISSUER
+#define IORING_SETUP_SINGLE_ISSUER (1U << 12)
+#endif
+#ifndef IORING_SETUP_DEFER_TASKRUN
+#define IORING_SETUP_DEFER_TASKRUN (1U << 13)
+#endif
 #endif
 #include <unistd.h>
 
@@ -297,9 +307,9 @@ inline void DeleteIOUring(void* p) {
   delete iu;
 }
 
-inline struct io_uring* CreateIOUring() {
+inline struct io_uring* CreateIOUring(unsigned int flags = 0) {
   struct io_uring* new_io_uring = new struct io_uring;
-  int ret = io_uring_queue_init(kIoUringDepth, new_io_uring, 0);
+  int ret = io_uring_queue_init(kIoUringDepth, new_io_uring, flags);
   if (ret) {
     delete new_io_uring;
     new_io_uring = nullptr;
@@ -315,7 +325,8 @@ class PosixRandomAccessFile : public FSRandomAccessFile {
   bool use_direct_io_;
   size_t logical_sector_size_;
 #if defined(ROCKSDB_IOURING_PRESENT)
-  ThreadLocalPtr* thread_local_io_urings_;
+  ThreadLocalPtr* thread_local_async_read_io_urings_;
+  ThreadLocalPtr* thread_local_multi_read_io_urings_;
 #endif
 
  public:
@@ -323,7 +334,8 @@ class PosixRandomAccessFile : public FSRandomAccessFile {
                         size_t logical_block_size, const EnvOptions& options
 #if defined(ROCKSDB_IOURING_PRESENT)
                         ,
-                        ThreadLocalPtr* thread_local_io_urings
+                        ThreadLocalPtr* thread_local_async_read_io_urings,
+                        ThreadLocalPtr* thread_local_multi_read_io_urings
 #endif
   );
   virtual ~PosixRandomAccessFile();