]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix table cache leak in MultiGet with async_io (#10997)
authoranand76 <anand76@devvm4702.ftw0.facebook.com>
Mon, 5 Dec 2022 06:58:25 +0000 (22:58 -0800)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Mon, 5 Dec 2022 06:58:25 +0000 (22:58 -0800)
Summary:
When MultiGet with the async_io option encounters an IO error in TableCache::GetTableReader, it may result in leakage of table cache handles due to queued coroutines being abandoned. This PR fixes it by ensuring any queued coroutines are run before aborting the MultiGet.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10997

Test Plan:
1. New unit test in db_basic_test
2. asan_crash

Reviewed By: pdillinger

Differential Revision: D41587244

Pulled By: anand1976

fbshipit-source-id: 900920cd3fba47cb0fc744a62facc5ffe2eccb64

HISTORY.md
db/db_basic_test.cc
db/table_cache.cc
db/version_set.cc

index 4d8a724846e33e290538caf351d6605a99a8515b..b13011eba325d55bb1b3c24c92abcef220a29ea2 100644 (file)
@@ -5,6 +5,7 @@
 
 ### Bug Fixes
 * Fixed a regression in iterator where range tombstones after `iterate_upper_bound` is processed.
+* Fixed a memory leak in MultiGet with async_io read option, caused by IO errors during table file open
 
 ## 7.9.0 (11/21/2022)
 ### Performance Improvements
index 622ea2f6ef166d0017bc0af45b6e8d594287fc2d..a28ac2b8856c30b37b777aeee2467f82e371d770 100644 (file)
@@ -2158,11 +2158,11 @@ class DBMultiGetAsyncIOTest : public DBBasicTest,
       : DBBasicTest(), statistics_(ROCKSDB_NAMESPACE::CreateDBStatistics()) {
     BlockBasedTableOptions bbto;
     bbto.filter_policy.reset(NewBloomFilterPolicy(10));
-    Options options = CurrentOptions();
-    options.disable_auto_compactions = true;
-    options.statistics = statistics_;
-    options.table_factory.reset(NewBlockBasedTableFactory(bbto));
-    Reopen(options);
+    options_ = CurrentOptions();
+    options_.disable_auto_compactions = true;
+    options_.statistics = statistics_;
+    options_.table_factory.reset(NewBlockBasedTableFactory(bbto));
+    Reopen(options_);
     int num_keys = 0;
 
     // Put all keys in the bottommost level, and overwrite some keys
@@ -2227,8 +2227,12 @@ class DBMultiGetAsyncIOTest : public DBBasicTest,
 
   const std::shared_ptr<Statistics>& statistics() { return statistics_; }
 
+ protected:
+  void ReopenDB() { Reopen(options_); }
+
  private:
   std::shared_ptr<Statistics> statistics_;
+  Options options_;
 };
 
 TEST_P(DBMultiGetAsyncIOTest, GetFromL0) {
@@ -2305,6 +2309,69 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1) {
   ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
 }
 
+TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) {
+  std::vector<std::string> key_strs;
+  std::vector<Slice> keys;
+  std::vector<PinnableSlice> values;
+  std::vector<Status> statuses;
+
+  key_strs.push_back(Key(33));
+  key_strs.push_back(Key(54));
+  key_strs.push_back(Key(102));
+  keys.push_back(key_strs[0]);
+  keys.push_back(key_strs[1]);
+  keys.push_back(key_strs[2]);
+  values.resize(keys.size());
+  statuses.resize(keys.size());
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "TableCache::GetTableReader:BeforeOpenFile", [&](void* status) {
+        static int count = 0;
+        count++;
+        // Fail the last table reader open, which is the 6th SST file
+        // since 3 overlapping L0 files + 3 L1 files containing the keys
+        if (count == 6) {
+          Status* s = static_cast<Status*>(status);
+          *s = Status::IOError();
+        }
+      });
+  // DB open will create table readers unless we reduce the table cache
+  // capacity.
+  // SanitizeOptions will set max_open_files to minimum of 20. Table cache
+  // is allocated with max_open_files - 10 as capacity. So override
+  // max_open_files to 11 so table cache capacity will become 1. This will
+  // prevent file open during DB open and force the file to be opened
+  // during MultiGet
+  SyncPoint::GetInstance()->SetCallBack(
+      "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
+        int* max_open_files = (int*)arg;
+        *max_open_files = 11;
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  ReopenDB();
+
+  ReadOptions ro;
+  ro.async_io = true;
+  ro.optimize_multiget_for_io = GetParam();
+  dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+                     keys.data(), values.data(), statuses.data());
+  SyncPoint::GetInstance()->DisableProcessing();
+  ASSERT_EQ(values.size(), 3);
+  ASSERT_EQ(statuses[0], Status::OK());
+  ASSERT_EQ(statuses[1], Status::OK());
+  ASSERT_EQ(statuses[2], Status::IOError());
+
+  HistogramData multiget_io_batch_size;
+
+  statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
+
+  // A batch of 3 async IOs is expected, one for each overlapping file in L1
+  ASSERT_EQ(multiget_io_batch_size.count, 1);
+  ASSERT_EQ(multiget_io_batch_size.max, 2);
+  ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
+}
+
 TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) {
   std::vector<std::string> key_strs;
   std::vector<Slice> keys;
index a9ea14348f08c164acc49c293d6223d03da69111..c44c4bb84dea59f869b4170aee2399dcd7eb0556 100644 (file)
@@ -127,6 +127,8 @@ Status TableCache::GetTableReader(
   FileOptions fopts = file_options;
   fopts.temperature = file_temperature;
   Status s = PrepareIOFromReadOptions(ro, ioptions_.clock, fopts.io_options);
+  TEST_SYNC_POINT_CALLBACK("TableCache::GetTableReader:BeforeOpenFile",
+                           const_cast<Status*>(&s));
   if (s.ok()) {
     s = ioptions_.fs->NewRandomAccessFile(fname, fopts, &file, nullptr);
   }
index 3b9df2aec82536828bd81fee8b89f81e704ed60a..33cd7002271e2e643fa1c2c2c29a45173cfdce4b 100644 (file)
@@ -2539,16 +2539,19 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
           }
           f = fp.GetNextFileInLevel();
         }
-        if (s.ok() && mget_tasks.size() > 0) {
+        if (mget_tasks.size() > 0) {
           RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT,
                      mget_tasks.size());
           // Collect all results so far
           std::vector<Status> statuses = folly::coro::blockingWait(
               folly::coro::collectAllRange(std::move(mget_tasks))
                   .scheduleOn(&range->context()->executor()));
-          for (Status stat : statuses) {
-            if (!stat.ok()) {
-              s = stat;
+          if (s.ok()) {
+            for (Status stat : statuses) {
+              if (!stat.ok()) {
+                s = std::move(stat);
+                break;
+              }
             }
           }
 
@@ -2794,6 +2797,9 @@ Status Version::MultiGetAsync(
     unsigned int num_tasks_queued = 0;
     to_process.pop_front();
     if (batch->IsSearchEnded() || batch->GetRange().empty()) {
+      // If to_process is empty, i.e no more batches to look at, then we need
+      // schedule the enqueued coroutines and wait for them. Otherwise, we
+      // skip this batch and move to the next one in to_process.
       if (!to_process.empty()) {
         continue;
       }
@@ -2802,9 +2808,6 @@ Status Version::MultiGetAsync(
       // to_process
       s = ProcessBatch(options, batch, mget_tasks, blob_ctxs, batches, waiting,
                        to_process, num_tasks_queued, mget_stats);
-      if (!s.ok()) {
-        break;
-      }
       // If ProcessBatch didn't enqueue any coroutine tasks, it means all
       // keys were filtered out. So put the batch back in to_process to
       // lookup in the next level
@@ -2815,8 +2818,10 @@ Status Version::MultiGetAsync(
         waiting.emplace_back(idx);
       }
     }
-    if (to_process.empty()) {
-      if (s.ok() && mget_tasks.size() > 0) {
+    // If ProcessBatch() returned an error, then schedule the enqueued
+    // coroutines and wait for them, then abort the MultiGet.
+    if (to_process.empty() || !s.ok()) {
+      if (mget_tasks.size() > 0) {
         assert(waiting.size());
         RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, mget_tasks.size());
         // Collect all results so far
@@ -2824,10 +2829,12 @@ Status Version::MultiGetAsync(
             folly::coro::collectAllRange(std::move(mget_tasks))
                 .scheduleOn(&range->context()->executor()));
         mget_tasks.clear();
-        for (Status stat : statuses) {
-          if (!stat.ok()) {
-            s = stat;
-            break;
+        if (s.ok()) {
+          for (Status stat : statuses) {
+            if (!stat.ok()) {
+              s = std::move(stat);
+              break;
+            }
           }
         }
 
@@ -2850,6 +2857,9 @@ Status Version::MultiGetAsync(
         assert(!s.ok() || waiting.size() == 0);
       }
     }
+    if (!s.ok()) {
+      break;
+    }
   }
 
   uint64_t num_levels = 0;