ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
}
+TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
+ Options options = GetOptions();
+ options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4;
+ std::vector<port::Thread> threads;
+ std::atomic<int> thread_num(0);
+ port::Mutex mutex;
+ port::CondVar cv(&mutex);
+
+ Reopen(options);
+
+ std::function<void()> write_slowdown_func = [&]() {
+ int a = thread_num.fetch_add(1);
+ std::string key = "foo" + std::to_string(a);
+ WriteOptions wo;
+ wo.no_slowdown = false;
+ dbfull()->Put(wo, key, "bar");
+ };
+ std::function<void()> write_no_slowdown_func = [&]() {
+ int a = thread_num.fetch_add(1);
+ std::string key = "foo" + std::to_string(a);
+ WriteOptions wo;
+ wo.no_slowdown = true;
+ dbfull()->Put(wo, key, "bar");
+ };
+ std::function<void(void *)> unblock_main_thread_func = [&](void *) {
+ mutex.Lock();
+ cv.SignalAll();
+ mutex.Unlock();
+ };
+
+ // Create 3 L0 files and schedule 4th without waiting
+ Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+ Flush();
+ Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+ Flush();
+ Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+ Flush();
+ Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
+ rocksdb::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBWriteTest::WriteThreadHangOnWriteStall:1",
+ "DBImpl::BackgroundCallFlush:start"},
+ {"DBWriteTest::WriteThreadHangOnWriteStall:2",
+ "DBImpl::WriteImpl:BeforeLeaderEnters"},
+ // Make compaction start wait for the write stall to be detected and
+ // implemented by a write group leader
+ {"DBWriteTest::WriteThreadHangOnWriteStall:3",
+ "BackgroundCallCompaction:0"}});
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ // Schedule creation of 4th L0 file without waiting. This will seal the
+ // memtable and then wait for a sync point before writing the file. We need
+ // to do it this way because SwitchMemtable() needs to enter the
+ // write_thread
+ FlushOptions fopt;
+ fopt.wait = false;
+ dbfull()->Flush(fopt);
+
+ // Create a mix of slowdown/no_slowdown write threads
+ mutex.Lock();
+ // First leader
+ threads.emplace_back(write_slowdown_func);
+ cv.Wait();
+ // Second leader. Will stall writes
+ threads.emplace_back(write_slowdown_func);
+ cv.Wait();
+ threads.emplace_back(write_no_slowdown_func);
+ cv.Wait();
+ threads.emplace_back(write_slowdown_func);
+ cv.Wait();
+ threads.emplace_back(write_no_slowdown_func);
+ cv.Wait();
+ threads.emplace_back(write_slowdown_func);
+ cv.Wait();
+ mutex.Unlock();
+
+ TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
+ dbfull()->TEST_WaitForFlushMemTable(nullptr);
+ // This would have triggered a write stall. Unblock the write group leader
+ TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2");
+ // The leader is going to create missing newer links. When the leader finishes,
+ // the next leader is going to delay writes and fail writers with no_slowdown
+
+ TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3");
+ for (auto& t : threads) {
+ t.join();
+ }
+}
+
TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
constexpr int kNumThreads = 5;
std::unique_ptr<FaultInjectionTestEnv> mock_env(
prev->link_older = w->link_older;
w->status = Status::Incomplete("Write stall");
SetState(w, STATE_COMPLETED);
+ if (prev->link_older) {
+ prev->link_older->link_newer = prev;
+ }
w = prev->link_older;
} else {
prev = w;
void WriteThread::EndWriteStall() {
MutexLock lock(&stall_mu_);
+ // Unlink write_stall_dummy_ from the write queue. This will unblock
+ // pending write threads to enqueue themselves
assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
+ assert(write_stall_dummy_.link_older != nullptr);
+ write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer;
newest_writer_.exchange(write_stall_dummy_.link_older);
// Wake up writers