]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix table cache leak in MultiGet with async_io (#10997)
authoranand76 <anand76@devvm4702.ftw0.facebook.com>
Mon, 5 Dec 2022 06:58:25 +0000 (22:58 -0800)
committeranand76 <anand76@devvm4702.ftw0.facebook.com>
Fri, 9 Dec 2022 01:51:33 +0000 (17:51 -0800)
Summary:
When MultiGet with the async_io option encounters an IO error in TableCache::GetTableReader, it may result in leakage of table cache handles due to queued coroutines being abandoned. This PR fixes it by ensuring any queued coroutines are run before aborting the MultiGet.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10997

Test Plan:
1. New unit test in db_basic_test
2. asan_crash

Reviewed By: pdillinger

Differential Revision: D41587244

Pulled By: anand1976

fbshipit-source-id: 900920cd3fba47cb0fc744a62facc5ffe2eccb64

HISTORY.md
db/db_basic_test.cc
db/table_cache.cc
db/version_set.cc

index 92ea10fe6360f19df03737bb94c55697835f36a8..a454a64bc15344b3cbd0c6cf505eab6294469826 100644 (file)
@@ -1,4 +1,8 @@
 # Rocksdb Change Log
+## 7.9.1 (12/8/2022)
+### Bug Fixes
+* Fixed a memory leak in MultiGet with async_io read option, caused by IO errors during table file open
+
 ## 7.9.0 (11/21/2022)
 ### Performance Improvements
 * Fixed an iterator performance regression for delete range users when scanning through a consecutive sequence of range tombstones (#10877).
index 622ea2f6ef166d0017bc0af45b6e8d594287fc2d..a28ac2b8856c30b37b777aeee2467f82e371d770 100644 (file)
@@ -2158,11 +2158,11 @@ class DBMultiGetAsyncIOTest : public DBBasicTest,
       : DBBasicTest(), statistics_(ROCKSDB_NAMESPACE::CreateDBStatistics()) {
     BlockBasedTableOptions bbto;
     bbto.filter_policy.reset(NewBloomFilterPolicy(10));
-    Options options = CurrentOptions();
-    options.disable_auto_compactions = true;
-    options.statistics = statistics_;
-    options.table_factory.reset(NewBlockBasedTableFactory(bbto));
-    Reopen(options);
+    options_ = CurrentOptions();
+    options_.disable_auto_compactions = true;
+    options_.statistics = statistics_;
+    options_.table_factory.reset(NewBlockBasedTableFactory(bbto));
+    Reopen(options_);
     int num_keys = 0;
 
     // Put all keys in the bottommost level, and overwrite some keys
@@ -2227,8 +2227,12 @@ class DBMultiGetAsyncIOTest : public DBBasicTest,
 
   const std::shared_ptr<Statistics>& statistics() { return statistics_; }
 
+ protected:
+  void ReopenDB() { Reopen(options_); }
+
  private:
   std::shared_ptr<Statistics> statistics_;
+  Options options_;
 };
 
 TEST_P(DBMultiGetAsyncIOTest, GetFromL0) {
@@ -2305,6 +2309,69 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1) {
   ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
 }
 
+TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) {
+  std::vector<std::string> key_strs;
+  std::vector<Slice> keys;
+  std::vector<PinnableSlice> values;
+  std::vector<Status> statuses;
+
+  key_strs.push_back(Key(33));
+  key_strs.push_back(Key(54));
+  key_strs.push_back(Key(102));
+  keys.push_back(key_strs[0]);
+  keys.push_back(key_strs[1]);
+  keys.push_back(key_strs[2]);
+  values.resize(keys.size());
+  statuses.resize(keys.size());
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "TableCache::GetTableReader:BeforeOpenFile", [&](void* status) {
+        static int count = 0;
+        count++;
+        // Fail the last table reader open, which is the 6th SST file
+        // since 3 overlapping L0 files + 3 L1 files containing the keys
+        if (count == 6) {
+          Status* s = static_cast<Status*>(status);
+          *s = Status::IOError();
+        }
+      });
+  // DB open will create table readers unless we reduce the table cache
+  // capacity.
+  // SanitizeOptions will set max_open_files to minimum of 20. Table cache
+  // is allocated with max_open_files - 10 as capacity. So override
+  // max_open_files to 11 so table cache capacity will become 1. This will
+  // prevent file open during DB open and force the file to be opened
+  // during MultiGet
+  SyncPoint::GetInstance()->SetCallBack(
+      "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
+        int* max_open_files = (int*)arg;
+        *max_open_files = 11;
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  ReopenDB();
+
+  ReadOptions ro;
+  ro.async_io = true;
+  ro.optimize_multiget_for_io = GetParam();
+  dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+                     keys.data(), values.data(), statuses.data());
+  SyncPoint::GetInstance()->DisableProcessing();
+  ASSERT_EQ(values.size(), 3);
+  ASSERT_EQ(statuses[0], Status::OK());
+  ASSERT_EQ(statuses[1], Status::OK());
+  ASSERT_EQ(statuses[2], Status::IOError());
+
+  HistogramData multiget_io_batch_size;
+
+  statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
+
+  // A batch of 3 async IOs is expected, one for each overlapping file in L1
+  ASSERT_EQ(multiget_io_batch_size.count, 1);
+  ASSERT_EQ(multiget_io_batch_size.max, 2);
+  ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
+}
+
 TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) {
   std::vector<std::string> key_strs;
   std::vector<Slice> keys;
index a9ea14348f08c164acc49c293d6223d03da69111..c44c4bb84dea59f869b4170aee2399dcd7eb0556 100644 (file)
@@ -127,6 +127,8 @@ Status TableCache::GetTableReader(
   FileOptions fopts = file_options;
   fopts.temperature = file_temperature;
   Status s = PrepareIOFromReadOptions(ro, ioptions_.clock, fopts.io_options);
+  TEST_SYNC_POINT_CALLBACK("TableCache::GetTableReader:BeforeOpenFile",
+                           const_cast<Status*>(&s));
   if (s.ok()) {
     s = ioptions_.fs->NewRandomAccessFile(fname, fopts, &file, nullptr);
   }
index 082e55217cfebd5610f0175c7ff99ccf67eb6a6e..427af6e25f3c36827a3be29f4fae89d43c751800 100644 (file)
@@ -2539,16 +2539,19 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
           }
           f = fp.GetNextFileInLevel();
         }
-        if (s.ok() && mget_tasks.size() > 0) {
+        if (mget_tasks.size() > 0) {
           RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT,
                      mget_tasks.size());
           // Collect all results so far
           std::vector<Status> statuses = folly::coro::blockingWait(
               folly::coro::collectAllRange(std::move(mget_tasks))
                   .scheduleOn(&range->context()->executor()));
-          for (Status stat : statuses) {
-            if (!stat.ok()) {
-              s = stat;
+          if (s.ok()) {
+            for (Status stat : statuses) {
+              if (!stat.ok()) {
+                s = std::move(stat);
+                break;
+              }
             }
           }
 
@@ -2794,6 +2797,9 @@ Status Version::MultiGetAsync(
     unsigned int num_tasks_queued = 0;
     to_process.pop_front();
     if (batch->IsSearchEnded() || batch->GetRange().empty()) {
+      // If to_process is empty, i.e no more batches to look at, then we need
+      // schedule the enqueued coroutines and wait for them. Otherwise, we
+      // skip this batch and move to the next one in to_process.
       if (!to_process.empty()) {
         continue;
       }
@@ -2802,9 +2808,6 @@ Status Version::MultiGetAsync(
       // to_process
       s = ProcessBatch(options, batch, mget_tasks, blob_ctxs, batches, waiting,
                        to_process, num_tasks_queued, mget_stats);
-      if (!s.ok()) {
-        break;
-      }
       // If ProcessBatch didn't enqueue any coroutine tasks, it means all
       // keys were filtered out. So put the batch back in to_process to
       // lookup in the next level
@@ -2815,8 +2818,10 @@ Status Version::MultiGetAsync(
         waiting.emplace_back(idx);
       }
     }
-    if (to_process.empty()) {
-      if (s.ok() && mget_tasks.size() > 0) {
+    // If ProcessBatch() returned an error, then schedule the enqueued
+    // coroutines and wait for them, then abort the MultiGet.
+    if (to_process.empty() || !s.ok()) {
+      if (mget_tasks.size() > 0) {
         assert(waiting.size());
         RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, mget_tasks.size());
         // Collect all results so far
@@ -2824,10 +2829,12 @@ Status Version::MultiGetAsync(
             folly::coro::collectAllRange(std::move(mget_tasks))
                 .scheduleOn(&range->context()->executor()));
         mget_tasks.clear();
-        for (Status stat : statuses) {
-          if (!stat.ok()) {
-            s = stat;
-            break;
+        if (s.ok()) {
+          for (Status stat : statuses) {
+            if (!stat.ok()) {
+              s = std::move(stat);
+              break;
+            }
           }
         }
 
@@ -2850,6 +2857,9 @@ Status Version::MultiGetAsync(
         assert(!s.ok() || waiting.size() == 0);
       }
     }
+    if (!s.ok()) {
+      break;
+    }
   }
 
   uint64_t num_levels = 0;