]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix queue manipulation in WriteThread::BeginWriteStall() (#6322)
authoranand76 <anand76@devvm1373.frc2.facebook.com>
Thu, 23 Jan 2020 21:59:48 +0000 (13:59 -0800)
committeranand76 <anand76@devvm1373.frc2.facebook.com>
Fri, 24 Jan 2020 18:08:01 +0000 (10:08 -0800)
Summary:
When there is a write stall, the active write group leader calls ```BeginWriteStall()``` to walk the queue of writers and remove any with the ```no_slowdown``` option set. There was a bug in the code which updated the back pointer but not the forward pointer (```link_newer```), corrupting the list and causing some threads to wait forever. This PR fixes it.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6322

Test Plan: Add a unit test in db_write_test

Differential Revision: D19538313

Pulled By: anand1976

fbshipit-source-id: 6fbed819e594913f435886606f5d36f74f235c3a

HISTORY.md
db/db_write_test.cc
db/write_thread.cc

index 7b430709d372163d40a9dc44efe3b8c32ffcbcad..5ff60f4bc5bb9e44517f8f9dae7178be691a87f0 100644 (file)
@@ -1,4 +1,8 @@
 # Rocksdb Change Log
+## 6.6.3 (01/24/2020)
+### Bug Fixes
+* Fix a bug that can cause write threads to hang when a slowdown/stall happens and there is a mix of writers with WriteOptions::no_slowdown set/unset.
+
 ## 6.6.2 (01/13/2020)
 ### Bug Fixes
 * Fixed a bug where non-L0 compaction input files were not considered to compute the `creation_time` of new compaction outputs.
index 9eca823c2b787f90712f7ddf32ce756a6c91918f..d74dff03d87a70f808e406d8735e3989e1e05d21 100644 (file)
@@ -39,6 +39,97 @@ TEST_P(DBWriteTest, SyncAndDisableWAL) {
   ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
 }
 
+TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
+  Options options = GetOptions();
+  options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4;
+  std::vector<port::Thread> threads;
+  std::atomic<int> thread_num(0);
+  port::Mutex mutex;
+  port::CondVar cv(&mutex);
+
+  Reopen(options);
+
+  std::function<void()> write_slowdown_func = [&]() {
+    int a = thread_num.fetch_add(1);
+    std::string key = "foo" + std::to_string(a);
+    WriteOptions wo;
+    wo.no_slowdown = false;
+    dbfull()->Put(wo, key, "bar");
+  };
+  std::function<void()> write_no_slowdown_func = [&]() {
+    int a = thread_num.fetch_add(1);
+    std::string key = "foo" + std::to_string(a);
+    WriteOptions wo;
+    wo.no_slowdown = true;
+    dbfull()->Put(wo, key, "bar");
+  };
+  std::function<void(void *)> unblock_main_thread_func = [&](void *) {
+    mutex.Lock();
+    cv.SignalAll();
+    mutex.Unlock();
+  };
+
+  // Create 3 L0 files and schedule 4th without waiting
+  Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+  Flush();
+  Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+  Flush();
+  Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+  Flush();
+  Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+    "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
+  rocksdb::SyncPoint::GetInstance()->LoadDependency(
+    {{"DBWriteTest::WriteThreadHangOnWriteStall:1",
+      "DBImpl::BackgroundCallFlush:start"},
+     {"DBWriteTest::WriteThreadHangOnWriteStall:2",
+      "DBImpl::WriteImpl:BeforeLeaderEnters"},
+     // Make compaction start wait for the write stall to be detected and
+     // implemented by a write group leader
+     {"DBWriteTest::WriteThreadHangOnWriteStall:3",
+      "BackgroundCallCompaction:0"}});
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+  // Schedule creation of 4th L0 file without waiting. This will seal the
+  // memtable and then wait for a sync point before writing the file. We need
+  // to do it this way because SwitchMemtable() needs to enter the
+  // write_thread
+  FlushOptions fopt;
+  fopt.wait = false;
+  dbfull()->Flush(fopt);
+
+  // Create a mix of slowdown/no_slowdown write threads
+  mutex.Lock();
+  // First leader
+  threads.emplace_back(write_slowdown_func);
+  cv.Wait();
+  // Second leader. Will stall writes
+  threads.emplace_back(write_slowdown_func);
+  cv.Wait();
+  threads.emplace_back(write_no_slowdown_func);
+  cv.Wait();
+  threads.emplace_back(write_slowdown_func);
+  cv.Wait();
+  threads.emplace_back(write_no_slowdown_func);
+  cv.Wait();
+  threads.emplace_back(write_slowdown_func);
+  cv.Wait();
+  mutex.Unlock();
+
+  TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
+  dbfull()->TEST_WaitForFlushMemTable(nullptr);
+  // This would have triggered a write stall. Unblock the write group leader
+  TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2");
+  // The leader is going to create missing newer links. When the leader finishes,
+  // the next leader is going to delay writes and fail writers with no_slowdown
+
+  TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3");
+  for (auto& t : threads) {
+    t.join();
+  }
+}
+
 TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
   constexpr int kNumThreads = 5;
   std::unique_ptr<FaultInjectionTestEnv> mock_env(
index 1ded68fde3bdcbb0405081ccaf7a5613348dc334..145f1f0298807d381947201f554ecb1a0180caa8 100644 (file)
@@ -344,6 +344,9 @@ void WriteThread::BeginWriteStall() {
       prev->link_older = w->link_older;
       w->status = Status::Incomplete("Write stall");
       SetState(w, STATE_COMPLETED);
+      if (prev->link_older) {
+        prev->link_older->link_newer = prev;
+      }
       w = prev->link_older;
     } else {
       prev = w;
@@ -355,7 +358,11 @@ void WriteThread::BeginWriteStall() {
 void WriteThread::EndWriteStall() {
   MutexLock lock(&stall_mu_);
 
+  // Unlink write_stall_dummy_ from the write queue. This will unblock
+  // pending write threads to enqueue themselves
   assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
+  assert(write_stall_dummy_.link_older != nullptr);
+  write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer;
   newest_writer_.exchange(write_stall_dummy_.link_older);
 
   // Wake up writers