]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
When closing BlobDB, should first wait for all background tasks (#5005)
authorSiying Dong <siying.d@fb.com>
Fri, 22 Feb 2019 01:23:05 +0000 (17:23 -0800)
committerSagar Vemuri <svemuri@fb.com>
Tue, 26 Mar 2019 18:35:26 +0000 (11:35 -0700)
Summary:
When closing a BlobDB, it only waits for background tasks
to finish as the last thing, but the background task may access
some variables that are destroyed. The fix is to introduce a
shutdown function in the timer queue and call the function as
the first thing when destorying BlobDB.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5005

Differential Revision: D14170342

Pulled By: siying

fbshipit-source-id: 081e6a2d99b9765d5956cf6cdfc290c07270c233

util/timer_queue.h
utilities/blob_db/blob_db_impl.cc
utilities/blob_db/blob_db_test.cc

index f068ffefbf3e57c5d5b25a047b57fc707c9e142e..bd8a4f85048661cc215e2f95240e35174509e3c8 100644 (file)
@@ -22,8 +22,6 @@
 
 #pragma once
 
-#include "port/port.h"
-
 #include <assert.h>
 #include <chrono>
 #include <condition_variable>
@@ -33,6 +31,9 @@
 #include <utility>
 #include <vector>
 
+#include "port/port.h"
+#include "util/sync_point.h"
+
 // Allows execution of handlers at a specified time in the future
 // Guarantees:
 //  - All handlers are executed ONCE, even if cancelled (aborted parameter will
@@ -48,7 +49,13 @@ class TimerQueue {
  public:
   TimerQueue() : m_th(&TimerQueue::run, this) {}
 
-  ~TimerQueue() {
+  ~TimerQueue() { shutdown(); }
+
+  // This function is not thread-safe.
+  void shutdown() {
+    if (closed_) {
+      return;
+    }
     cancelAll();
     // Abusing the timer queue to trigger the shutdown.
     add(0, [this](bool) {
@@ -56,6 +63,7 @@ class TimerQueue {
       return std::make_pair(false, 0);
     });
     m_th.join();
+    closed_ = true;
   }
 
   // Adds a new timer
@@ -67,6 +75,7 @@ class TimerQueue {
     WorkItem item;
     Clock::time_point tp = Clock::now();
     item.end = tp + std::chrono::milliseconds(milliseconds);
+    TEST_SYNC_POINT_CALLBACK("TimeQueue::Add:item.end", &item.end);
     item.period = milliseconds;
     item.handler = std::move(handler);
 
@@ -217,4 +226,5 @@ class TimerQueue {
     std::vector<WorkItem>& getContainer() { return this->c; }
   } m_items;
   rocksdb::port::Thread m_th;
+  bool closed_ = false;
 };
index d1f6c87efe289697e285c6648a8c18a90158f6a5..7c938c16e4c9ba84f4b0975d07a8fbbfe4aa3a3f 100644 (file)
@@ -94,6 +94,7 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
 }
 
 BlobDBImpl::~BlobDBImpl() {
+  tqueue_.shutdown();
   // CancelAllBackgroundWork(db_, true);
   Status s __attribute__((__unused__)) = Close();
   assert(s.ok());
@@ -1308,6 +1309,9 @@ std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
     return std::make_pair(false, -1);
   }
 
+  TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:0");
+  TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:1");
+
   std::vector<std::shared_ptr<BlobFile>> process_files;
   uint64_t now = EpochNow();
   {
@@ -1322,6 +1326,10 @@ std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
     }
   }
 
+  TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:2");
+  TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:3");
+  TEST_SYNC_POINT_CALLBACK("BlobDBImpl::EvictExpiredFiles:cb", nullptr);
+
   SequenceNumber seq = GetLatestSequenceNumber();
   {
     MutexLock l(&write_mutex_);
index bc86d04b3b8b0c2cac85f08a96b54f54a205e31a..f9b4063c7aa4dfc6981100e1f5a9aabd2e409633 100644 (file)
@@ -6,6 +6,7 @@
 #ifndef ROCKSDB_LITE
 
 #include <algorithm>
+#include <chrono>
 #include <cstdlib>
 #include <map>
 #include <memory>
@@ -72,6 +73,12 @@ class BlobDBTest : public testing::Test {
     Open(bdb_options, options);
   }
 
+  void Close() {
+    assert(blob_db_ != nullptr);
+    delete blob_db_;
+    blob_db_ = nullptr;
+  }
+
   void Destroy() {
     if (blob_db_) {
       Options options = blob_db_->GetOptions();
@@ -1542,6 +1549,68 @@ TEST_F(BlobDBTest, DisableFileDeletions) {
   }
 }
 
+TEST_F(BlobDBTest, ShutdownWait) {
+  BlobDBOptions bdb_options;
+  bdb_options.ttl_range_secs = 100;
+  bdb_options.min_blob_size = 0;
+  bdb_options.disable_background_tasks = false;
+  Options options;
+  options.env = mock_env_.get();
+
+  SyncPoint::GetInstance()->LoadDependency({
+      {"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"},
+      {"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"},
+      {"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"},
+      {"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"},
+  });
+  // Force all tasks to be scheduled immediately.
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "TimeQueue::Add:item.end", [&](void *arg) {
+        std::chrono::steady_clock::time_point *tp =
+            static_cast<std::chrono::steady_clock::time_point *>(arg);
+        *tp =
+            std::chrono::steady_clock::now() - std::chrono::milliseconds(10000);
+      });
+
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) {
+        // Sleep 3 ms to increase the chance of data race.
+        // We've synced up the code so that EvictExpiredFiles()
+        // is called concurrently with ~BlobDBImpl().
+        // ~BlobDBImpl() is supposed to wait for all background
+        // task to shutdown before doing anything else. In order
+        // to use the same test to reproduce a bug of the waiting
+        // logic, we wait a little bit here, so that TSAN can
+        // catch the data race.
+        // We should improve the test if we find a better way.
+        Env::Default()->SleepForMicroseconds(3000);
+      });
+
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  Open(bdb_options, options);
+  mock_env_->set_current_time(50);
+  std::map<std::string, std::string> data;
+  ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
+  auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+  ASSERT_EQ(1, blob_files.size());
+  auto blob_file = blob_files[0];
+  ASSERT_FALSE(blob_file->Immutable());
+  ASSERT_FALSE(blob_file->Obsolete());
+  VerifyDB(data);
+
+  TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0");
+  mock_env_->set_current_time(250);
+  // The key should expired now.
+  TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1");
+
+  TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2");
+  TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3");
+  Close();
+
+  SyncPoint::GetInstance()->DisableProcessing();
+}
+
 }  //  namespace blob_db
 }  //  namespace rocksdb