]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix DelayWrite() calls for two_write_queues (#11130)
authorPeter Dillinger <peterd@fb.com>
Wed, 25 Jan 2023 22:18:27 +0000 (14:18 -0800)
committerPeter Dillinger <peterd@fb.com>
Wed, 25 Jan 2023 22:25:33 +0000 (14:25 -0800)
Summary:
PR https://github.com/facebook/rocksdb/issues/11020 fixed a case where it was easy to deadlock the DB with LockWAL() but introduced a bug showing up as a rare assertion failure in the stress test. Specifically, `assert(w->state == STATE_INIT)` in `WriteThread::LinkOne()` called from `BeginWriteStall()`, `DelayWrite()`, `WriteImplWALOnly()`. I haven't been about to generate a unit test that reproduces this failure but I believe the root cause is that DelayWrite() was never meant to be re-entrant, only called from the DB's write_thread_ leader. https://github.com/facebook/rocksdb/issues/11020 introduced a call to DelayWrite() from the nonmem_write_thread_ group leader.

This fix is to make DelayWrite() apply to the specific write queue that it is being called from (inject a dummy write stall entry to the head of the appropriate write queue). WriteController is re-entrant, based on polling and state changes signalled with bg_cv_, so can manage stalling two queues. The only anticipated complication (called out by Andrew in previous PR) is that we don't want timed write delays being injected in parallel for the two queues, because that dimishes the intended throttling effect. Thus, we only allow timed delays for the primary write queue.

HISTORY not updated because this is intended for the same release where the bug was introduced.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11130

Test Plan:
Although I was not able to reproduce the assertion failure, I was able to reproduce a distinct flaw with what I believe is the same root cause: a kind of deadlock if both write queues need to wake up from stopped writes. Only one will be waiting on bg_cv_ (the other waiting in `LinkOne()` for the write queue to open up), so a single SignalAll() will only unblock one of the queues, with the other re-instating the stop until another signal on bg_cv_. A simple unit test is added for this case.

Will also run crash_test_with_multiops_wc_txn for a while looking for issues.

Reviewed By: ajkr

Differential Revision: D42749330

Pulled By: pdillinger

fbshipit-source-id: 4317dd899a93d57c26fd5af7143038f82d4d4d1b

db/db_impl/db_impl.h
db/db_impl/db_impl_write.cc
utilities/transactions/transaction_test.cc

index 9fcd9efea24c29b15980b23ffc7088f9ec06234b..29b703e248bcde5eaeb9901a348878ec2f351e4f 100644 (file)
@@ -1865,7 +1865,8 @@ class DBImpl : public DB {
 
   // num_bytes: for slowdown case, delay time is calculated based on
   //            `num_bytes` going through.
-  Status DelayWrite(uint64_t num_bytes, const WriteOptions& write_options);
+  Status DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
+                    const WriteOptions& write_options);
 
   // Begin stalling of writes when memory usage increases beyond a certain
   // threshold.
index cbeab046fd0ffa42572db09cafd6f9765aa98353..08407ba85e10f8419ee94cce01ccb1172da63963 100644 (file)
@@ -926,7 +926,8 @@ Status DBImpl::WriteImplWALOnly(
     }
   } else {
     InstrumentedMutexLock lock(&mutex_);
-    Status status = DelayWrite(/*num_bytes=*/0ull, write_options);
+    Status status =
+        DelayWrite(/*num_bytes=*/0ull, *write_thread, write_options);
     if (!status.ok()) {
       WriteThread::WriteGroup write_group;
       write_thread->EnterAsBatchGroupLeader(&w, &write_group);
@@ -1201,7 +1202,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
     // might happen for smaller writes but larger writes can go through.
     // Can optimize it if it is an issue.
     InstrumentedMutexLock l(&mutex_);
-    status = DelayWrite(last_batch_group_size_, write_options);
+    status = DelayWrite(last_batch_group_size_, write_thread_, write_options);
     PERF_TIMER_START(write_pre_and_post_process_time);
   }
 
@@ -1768,8 +1769,8 @@ uint64_t DBImpl::GetMaxTotalWalSize() const {
 }
 
 // REQUIRES: mutex_ is held
-// REQUIRES: this thread is currently at the front of the writer queue
-Status DBImpl::DelayWrite(uint64_t num_bytes,
+// REQUIRES: this thread is currently at the leader for write_thread
+Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
                           const WriteOptions& write_options) {
   mutex_.AssertHeld();
   uint64_t time_delayed = 0;
@@ -1777,8 +1778,16 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
   {
     StopWatch sw(immutable_db_options_.clock, stats_, WRITE_STALL,
                  &time_delayed);
-    uint64_t delay =
-        write_controller_.GetDelay(immutable_db_options_.clock, num_bytes);
+    // To avoid parallel timed delays (bad throttling), only support them
+    // on the primary write queue.
+    uint64_t delay;
+    if (&write_thread == &write_thread_) {
+      delay =
+          write_controller_.GetDelay(immutable_db_options_.clock, num_bytes);
+    } else {
+      assert(num_bytes == 0);
+      delay = 0;
+    }
     TEST_SYNC_POINT("DBImpl::DelayWrite:Start");
     if (delay > 0) {
       if (write_options.no_slowdown) {
@@ -1786,9 +1795,9 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
       }
       TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
 
-      // Notify write_thread_ about the stall so it can setup a barrier and
+      // Notify write_thread about the stall so it can setup a barrier and
       // fail any pending writers with no_slowdown
-      write_thread_.BeginWriteStall();
+      write_thread.BeginWriteStall();
       mutex_.Unlock();
       TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
       // We will delay the write until we have slept for `delay` microseconds
@@ -1808,7 +1817,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
         immutable_db_options_.clock->SleepForMicroseconds(kDelayInterval);
       }
       mutex_.Lock();
-      write_thread_.EndWriteStall();
+      write_thread.EndWriteStall();
     }
 
     // Don't wait if there's a background error, even if its a soft error. We
@@ -1822,12 +1831,12 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
       }
       delayed = true;
 
-      // Notify write_thread_ about the stall so it can setup a barrier and
+      // Notify write_thread about the stall so it can setup a barrier and
       // fail any pending writers with no_slowdown
-      write_thread_.BeginWriteStall();
+      write_thread.BeginWriteStall();
       TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
       bg_cv_.Wait();
-      write_thread_.EndWriteStall();
+      write_thread.EndWriteStall();
     }
   }
   assert(!delayed || !write_options.no_slowdown);
index e035e1f889022e69c38fa45e30f157183675f956..d74a4b8b116f3723122ced9713ab2423dba32ed4 100644 (file)
@@ -6589,6 +6589,58 @@ TEST_P(TransactionTest, LockWal) {
   SyncPoint::GetInstance()->DisableProcessing();
 }
 
+TEST_P(TransactionTest, StallTwoWriteQueues) {
+  // There was a two_write_queues bug in which both write thread leaders (for
+  // each queue) would attempt to own the stopping of writes in the primary
+  // write queue. This nearly worked but could lead to some broken assertions
+  // and a kind of deadlock in the test below. (Would resume if someone
+  // eventually signalled bg_cv_ again.)
+  if (!options.two_write_queues) {
+    ROCKSDB_GTEST_BYPASS("Test only needed with two_write_queues");
+    return;
+  }
+
+  // Stop writes
+  ASSERT_OK(db->LockWAL());
+
+  WriteOptions wopts;
+  wopts.sync = true;
+  wopts.disableWAL = false;
+
+  // Create one write thread that blocks in the primary write queue and one
+  // that blocks in the nonmem queue.
+  bool t1_completed = false;
+  bool t2_completed = false;
+  port::Thread t1{[&]() {
+    ASSERT_OK(db->Put(wopts, "x", "y"));
+    t1_completed = true;
+  }};
+  port::Thread t2{[&]() {
+    std::unique_ptr<Transaction> txn0{db->BeginTransaction(wopts, {})};
+    ASSERT_OK(txn0->SetName("xid"));
+    ASSERT_OK(txn0->Prepare());  // nonmem
+    ASSERT_OK(txn0->Commit());
+    t2_completed = true;
+  }};
+
+  // Sleep long enough to that above threads can usually reach a waiting point,
+  // to usually reveal deadlock if the bug is present.
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));
+  // Ensure proper test setup
+  ASSERT_FALSE(t1_completed);
+  ASSERT_FALSE(t2_completed);
+
+  // Resume writes
+  ASSERT_OK(db->UnlockWAL());
+
+  // Wait for writes to finish
+  t1.join();
+  t2.join();
+  // Ensure proper test setup
+  ASSERT_TRUE(t1_completed);
+  ASSERT_TRUE(t2_completed);
+}
+
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {