}
} else {
InstrumentedMutexLock lock(&mutex_);
- Status status = DelayWrite(/*num_bytes=*/0ull, write_options);
+ Status status =
+ DelayWrite(/*num_bytes=*/0ull, *write_thread, write_options);
if (!status.ok()) {
WriteThread::WriteGroup write_group;
write_thread->EnterAsBatchGroupLeader(&w, &write_group);
// might happen for smaller writes but larger writes can go through.
// Can optimize it if it is an issue.
InstrumentedMutexLock l(&mutex_);
- status = DelayWrite(last_batch_group_size_, write_options);
+ status = DelayWrite(last_batch_group_size_, write_thread_, write_options);
PERF_TIMER_START(write_pre_and_post_process_time);
}
}
// REQUIRES: mutex_ is held
-// REQUIRES: this thread is currently at the front of the writer queue
-Status DBImpl::DelayWrite(uint64_t num_bytes,
+// REQUIRES: this thread is currently at the leader for write_thread
+Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
const WriteOptions& write_options) {
mutex_.AssertHeld();
uint64_t time_delayed = 0;
{
StopWatch sw(immutable_db_options_.clock, stats_, WRITE_STALL,
&time_delayed);
- uint64_t delay =
- write_controller_.GetDelay(immutable_db_options_.clock, num_bytes);
+ // To avoid parallel timed delays (bad throttling), only support them
+ // on the primary write queue.
+ uint64_t delay;
+ if (&write_thread == &write_thread_) {
+ delay =
+ write_controller_.GetDelay(immutable_db_options_.clock, num_bytes);
+ } else {
+ assert(num_bytes == 0);
+ delay = 0;
+ }
TEST_SYNC_POINT("DBImpl::DelayWrite:Start");
if (delay > 0) {
if (write_options.no_slowdown) {
}
TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
- // Notify write_thread_ about the stall so it can setup a barrier and
+ // Notify write_thread about the stall so it can setup a barrier and
// fail any pending writers with no_slowdown
- write_thread_.BeginWriteStall();
+ write_thread.BeginWriteStall();
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
// We will delay the write until we have slept for `delay` microseconds
immutable_db_options_.clock->SleepForMicroseconds(kDelayInterval);
}
mutex_.Lock();
- write_thread_.EndWriteStall();
+ write_thread.EndWriteStall();
}
// Don't wait if there's a background error, even if its a soft error. We
}
delayed = true;
- // Notify write_thread_ about the stall so it can setup a barrier and
+ // Notify write_thread about the stall so it can setup a barrier and
// fail any pending writers with no_slowdown
- write_thread_.BeginWriteStall();
+ write_thread.BeginWriteStall();
TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
bg_cv_.Wait();
- write_thread_.EndWriteStall();
+ write_thread.EndWriteStall();
}
}
assert(!delayed || !write_options.no_slowdown);
SyncPoint::GetInstance()->DisableProcessing();
}
+TEST_P(TransactionTest, StallTwoWriteQueues) {
+ // There was a two_write_queues bug in which both write thread leaders (for
+ // each queue) would attempt to own the stopping of writes in the primary
+ // write queue. This nearly worked but could lead to some broken assertions
+ // and a kind of deadlock in the test below. (Would resume if someone
+ // eventually signalled bg_cv_ again.)
+ if (!options.two_write_queues) {
+ ROCKSDB_GTEST_BYPASS("Test only needed with two_write_queues");
+ return;
+ }
+
+ // Stop writes
+ ASSERT_OK(db->LockWAL());
+
+ WriteOptions wopts;
+ wopts.sync = true;
+ wopts.disableWAL = false;
+
+ // Create one write thread that blocks in the primary write queue and one
+ // that blocks in the nonmem queue.
+ bool t1_completed = false;
+ bool t2_completed = false;
+ port::Thread t1{[&]() {
+ ASSERT_OK(db->Put(wopts, "x", "y"));
+ t1_completed = true;
+ }};
+ port::Thread t2{[&]() {
+ std::unique_ptr<Transaction> txn0{db->BeginTransaction(wopts, {})};
+ ASSERT_OK(txn0->SetName("xid"));
+ ASSERT_OK(txn0->Prepare()); // nonmem
+ ASSERT_OK(txn0->Commit());
+ t2_completed = true;
+ }};
+
+ // Sleep long enough to that above threads can usually reach a waiting point,
+ // to usually reveal deadlock if the bug is present.
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ // Ensure proper test setup
+ ASSERT_FALSE(t1_completed);
+ ASSERT_FALSE(t2_completed);
+
+ // Resume writes
+ ASSERT_OK(db->UnlockWAL());
+
+ // Wait for writes to finish
+ t1.join();
+ t2.join();
+ // Ensure proper test setup
+ ASSERT_TRUE(t1_completed);
+ ASSERT_TRUE(t2_completed);
+}
+
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {