]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Return any errors returned by ReadAsync to the MultiGet caller (#11171)
authoranand76 <anand76@devvm4702.ftw0.facebook.com>
Fri, 3 Feb 2023 00:35:27 +0000 (16:35 -0800)
committeranand76 <anand76@devvm4702.ftw0.facebook.com>
Fri, 3 Feb 2023 01:28:08 +0000 (17:28 -0800)
Summary:
Currently, we incorrectly return a Status::Corruption to the MultiGet caller if the file system ReadAsync cannot issue a read and returns an error for some reason, such as IOStatus::NotSupported(). In this PR, we copy the ReadAsync error to the request status so it can be returned to the user.

Tests:
Update existing unit tests and add a new one for this scenario

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11171

Reviewed By: akankshamahajan15

Differential Revision: D42950057

Pulled By: anand1976

fbshipit-source-id: 85ffcb015fa6c064c311f8a28488fec78c487869

HISTORY.md
db/db_basic_test.cc
util/async_file_reader.cc

index f231d32203d5ec5bb0afa57aa2015c5b3456b362..904f93c79a128a1e47c416e354ca37215e3ce432 100644 (file)
@@ -4,6 +4,7 @@
 * Fixed a data race on `ColumnFamilyData::flush_reason` caused by concurrent flushes.
 * Fixed `DisableManualCompaction()` and `CompactRangeOptions::canceled` to cancel compactions even when they are waiting on conflicting compactions to finish
 * Fixed a bug in which a successful `GetMergeOperands()` could transiently return `Status::MergeInProgress()`
+* Return the correct error (Status::NotSupported()) to MultiGet caller when ReadOptions::async_io flag is true and IO uring is not enabled. Previously, Status::Corruption() was being returned when the actual failure was lack of async IO support.
 
 ## 7.10.0 (01/23/2023)
 ### Behavior changes
index 0fa53fead49d26acddb8648b484739e36952229a..f180d3ff9cb6c6248136254b62f9beb35fd02af1 100644 (file)
@@ -32,6 +32,9 @@
 
 namespace ROCKSDB_NAMESPACE {
 
+static bool enable_io_uring = true;
+extern "C" bool RocksDbIOUringEnable() { return enable_io_uring; }
+
 class DBBasicTest : public DBTestBase {
  public:
   DBBasicTest() : DBTestBase("db_basic_test", /*env_do_fsync=*/false) {}
@@ -2162,6 +2165,7 @@ class DBMultiGetAsyncIOTest : public DBBasicTest,
     options_.disable_auto_compactions = true;
     options_.statistics = statistics_;
     options_.table_factory.reset(NewBlockBasedTableFactory(bbto));
+    options_.env = Env::Default();
     Reopen(options_);
     int num_keys = 0;
 
@@ -2228,6 +2232,20 @@ class DBMultiGetAsyncIOTest : public DBBasicTest,
   const std::shared_ptr<Statistics>& statistics() { return statistics_; }
 
  protected:
+  void PrepareDBForTest() {
+#ifdef ROCKSDB_IOURING_PRESENT
+    Reopen(options_);
+#else   // ROCKSDB_IOURING_PRESENT
+    // Warm up the block cache so we don't need to use the IO uring
+    Iterator* iter = dbfull()->NewIterator(ReadOptions());
+    for (iter->SeekToFirst(); iter->Valid() && iter->status().ok();
+         iter->Next())
+      ;
+    EXPECT_OK(iter->status());
+    delete iter;
+#endif  // ROCKSDB_IOURING_PRESENT
+  }
+
   void ReopenDB() { Reopen(options_); }
 
  private:
@@ -2242,6 +2260,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL0) {
   std::vector<PinnableSlice> values(key_strs.size());
   std::vector<Status> statuses(key_strs.size());
 
+  PrepareDBForTest();
+
   ReadOptions ro;
   ro.async_io = true;
   ro.optimize_multiget_for_io = GetParam();
@@ -2260,6 +2280,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL0) {
   statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
 
   // With async IO, lookups will happen in parallel for each key
+#ifdef ROCKSDB_IOURING_PRESENT
   if (GetParam()) {
     ASSERT_EQ(multiget_io_batch_size.count, 1);
     ASSERT_EQ(multiget_io_batch_size.max, 3);
@@ -2269,6 +2290,11 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL0) {
     // L0 file
     ASSERT_EQ(multiget_io_batch_size.count, 3);
   }
+#else   // ROCKSDB_IOURING_PRESENT
+  if (GetParam()) {
+    ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
+  }
+#endif  // ROCKSDB_IOURING_PRESENT
 }
 
 TEST_P(DBMultiGetAsyncIOTest, GetFromL1) {
@@ -2286,6 +2312,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1) {
   values.resize(keys.size());
   statuses.resize(keys.size());
 
+  PrepareDBForTest();
+
   ReadOptions ro;
   ro.async_io = true;
   ro.optimize_multiget_for_io = GetParam();
@@ -2299,6 +2327,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1) {
   ASSERT_EQ(values[1], "val_l1_" + std::to_string(54));
   ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
 
+#ifdef ROCKSDB_IOURING_PRESENT
   HistogramData multiget_io_batch_size;
 
   statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
@@ -2306,9 +2335,11 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1) {
   // A batch of 3 async IOs is expected, one for each overlapping file in L1
   ASSERT_EQ(multiget_io_batch_size.count, 1);
   ASSERT_EQ(multiget_io_batch_size.max, 3);
+#endif  // ROCKSDB_IOURING_PRESENT
   ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
 }
 
+#ifdef ROCKSDB_IOURING_PRESENT
 TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) {
   std::vector<std::string> key_strs;
   std::vector<Slice> keys;
@@ -2324,9 +2355,9 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) {
   values.resize(keys.size());
   statuses.resize(keys.size());
 
+  int count = 0;
   SyncPoint::GetInstance()->SetCallBack(
       "TableCache::GetTableReader:BeforeOpenFile", [&](void* status) {
-        static int count = 0;
         count++;
         // Fail the last table reader open, which is the 6th SST file
         // since 3 overlapping L0 files + 3 L1 files containing the keys
@@ -2349,7 +2380,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) {
       });
   SyncPoint::GetInstance()->EnableProcessing();
 
-  ReopenDB();
+  PrepareDBForTest();
 
   ReadOptions ro;
   ro.async_io = true;
@@ -2371,6 +2402,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) {
   ASSERT_EQ(multiget_io_batch_size.max, 2);
   ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
 }
+#endif  // ROCKSDB_IOURING_PRESENT
 
 TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) {
   std::vector<std::string> key_strs;
@@ -2388,6 +2420,8 @@ TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) {
   values.resize(keys.size());
   statuses.resize(keys.size());
 
+  PrepareDBForTest();
+
   ReadOptions ro;
   ro.async_io = true;
   ro.optimize_multiget_for_io = GetParam();
@@ -2401,6 +2435,7 @@ TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) {
   ASSERT_EQ(values[1], "val_l1_" + std::to_string(54));
   ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
 
+#ifdef ROCKSDB_IOURING_PRESENT
   HistogramData multiget_io_batch_size;
 
   statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
@@ -2411,6 +2446,7 @@ TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) {
   // will lookup 2 files in parallel and issue 2 async reads
   ASSERT_EQ(multiget_io_batch_size.count, 2);
   ASSERT_EQ(multiget_io_batch_size.max, 2);
+#endif  // ROCKSDB_IOURING_PRESENT
 }
 
 TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) {
@@ -2429,6 +2465,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) {
   values.resize(keys.size());
   statuses.resize(keys.size());
 
+  PrepareDBForTest();
+
   ReadOptions ro;
   ro.async_io = true;
   ro.optimize_multiget_for_io = GetParam();
@@ -2442,6 +2480,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) {
   ASSERT_EQ(values[1], "val_l2_" + std::to_string(56));
   ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
 
+#ifdef ROCKSDB_IOURING_PRESENT
   HistogramData multiget_io_batch_size;
 
   statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
@@ -2451,6 +2490,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) {
   // Otherwise, the L2 lookup will happen after L1.
   ASSERT_EQ(multiget_io_batch_size.count, GetParam() ? 1 : 2);
   ASSERT_EQ(multiget_io_batch_size.max, GetParam() ? 3 : 2);
+#endif  // ROCKSDB_IOURING_PRESENT
 }
 
 TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) {
@@ -2467,6 +2507,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) {
   values.resize(keys.size());
   statuses.resize(keys.size());
 
+  PrepareDBForTest();
+
   ReadOptions ro;
   ro.async_io = true;
   ro.optimize_multiget_for_io = GetParam();
@@ -2482,6 +2524,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) {
   ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
 }
 
+#ifdef ROCKSDB_IOURING_PRESENT
 TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) {
   std::vector<std::string> key_strs;
   std::vector<Slice> keys;
@@ -2496,6 +2539,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) {
   values.resize(keys.size());
   statuses.resize(keys.size());
 
+  PrepareDBForTest();
+
   ReadOptions ro;
   ro.async_io = true;
   ro.optimize_multiget_for_io = GetParam();
@@ -2525,6 +2570,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2WithRangeDelInL1) {
   values.resize(keys.size());
   statuses.resize(keys.size());
 
+  PrepareDBForTest();
+
   ReadOptions ro;
   ro.async_io = true;
   ro.optimize_multiget_for_io = GetParam();
@@ -2539,6 +2586,45 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2WithRangeDelInL1) {
   // Bloom filters in L0/L1 will avoid the coroutine calls in those levels
   ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
 }
+#endif  // ROCKSDB_IOURING_PRESENT
+
+TEST_P(DBMultiGetAsyncIOTest, GetNoIOUring) {
+  std::vector<std::string> key_strs;
+  std::vector<Slice> keys;
+  std::vector<PinnableSlice> values;
+  std::vector<Status> statuses;
+
+  key_strs.push_back(Key(33));
+  key_strs.push_back(Key(54));
+  key_strs.push_back(Key(102));
+  keys.push_back(key_strs[0]);
+  keys.push_back(key_strs[1]);
+  keys.push_back(key_strs[2]);
+  values.resize(keys.size());
+  statuses.resize(keys.size());
+
+  enable_io_uring = false;
+  ReopenDB();
+
+  ReadOptions ro;
+  ro.async_io = true;
+  ro.optimize_multiget_for_io = GetParam();
+  dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+                     keys.data(), values.data(), statuses.data());
+  ASSERT_EQ(values.size(), 3);
+  ASSERT_EQ(statuses[0], Status::NotSupported());
+  ASSERT_EQ(statuses[1], Status::NotSupported());
+  ASSERT_EQ(statuses[2], Status::NotSupported());
+
+  HistogramData multiget_io_batch_size;
+
+  statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
+
+  // A batch of 3 async IOs is expected, one for each overlapping file in L1
+  ASSERT_EQ(multiget_io_batch_size.count, 1);
+  ASSERT_EQ(multiget_io_batch_size.max, 3);
+  ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
+}
 
 INSTANTIATE_TEST_CASE_P(DBMultiGetAsyncIOTest, DBMultiGetAsyncIOTest,
                         testing::Bool());
index 8401a6b44ce4ab76ad5800ca4d204de2933e3a65..080c1ae96689ce56717c4f29866a32fac0b0f032 100644 (file)
@@ -20,17 +20,20 @@ bool AsyncFileReader::MultiReadAsyncImpl(ReadAwaiter* awaiter) {
   awaiter->io_handle_.resize(awaiter->num_reqs_);
   awaiter->del_fn_.resize(awaiter->num_reqs_);
   for (size_t i = 0; i < awaiter->num_reqs_; ++i) {
-    awaiter->file_
-        ->ReadAsync(
-            awaiter->read_reqs_[i], awaiter->opts_,
-            [](const FSReadRequest& req, void* cb_arg) {
-              FSReadRequest* read_req = static_cast<FSReadRequest*>(cb_arg);
-              read_req->status = req.status;
-              read_req->result = req.result;
-            },
-            &awaiter->read_reqs_[i], &awaiter->io_handle_[i],
-            &awaiter->del_fn_[i], /*aligned_buf=*/nullptr)
-        .PermitUncheckedError();
+    IOStatus s = awaiter->file_->ReadAsync(
+        awaiter->read_reqs_[i], awaiter->opts_,
+        [](const FSReadRequest& req, void* cb_arg) {
+          FSReadRequest* read_req = static_cast<FSReadRequest*>(cb_arg);
+          read_req->status = req.status;
+          read_req->result = req.result;
+        },
+        &awaiter->read_reqs_[i], &awaiter->io_handle_[i], &awaiter->del_fn_[i],
+        /*aligned_buf=*/nullptr);
+    if (!s.ok()) {
+      // For any non-ok status, the FileSystem will not call the callback
+      // So let's update the status ourselves
+      awaiter->read_reqs_[i].status = s;
+    }
   }
   return true;
 }
@@ -41,6 +44,7 @@ void AsyncFileReader::Wait() {
   }
   ReadAwaiter* waiter;
   std::vector<void*> io_handles;
+  IOStatus s;
   io_handles.reserve(num_reqs_);
   waiter = head_;
   do {
@@ -52,7 +56,7 @@ void AsyncFileReader::Wait() {
   } while (waiter != tail_ && (waiter = waiter->next_));
   if (io_handles.size() > 0) {
     StopWatch sw(SystemClock::Default().get(), stats_, POLL_WAIT_MICROS);
-    fs_->Poll(io_handles, io_handles.size()).PermitUncheckedError();
+    s = fs_->Poll(io_handles, io_handles.size());
   }
   do {
     waiter = head_;
@@ -62,6 +66,10 @@ void AsyncFileReader::Wait() {
       if (waiter->io_handle_[i] && waiter->del_fn_[i]) {
         waiter->del_fn_[i](waiter->io_handle_[i]);
       }
+      if (waiter->read_reqs_[i].status.ok() && !s.ok()) {
+        // Override the request status with the Poll error
+        waiter->read_reqs_[i].status = s;
+      }
     }
     waiter->awaiting_coro_.resume();
   } while (waiter != tail_);