uint64_t delay = write_controller_.GetDelay(env_, num_bytes);
if (delay > 0) {
if (write_options.no_slowdown) {
- return Status::Incomplete();
+ return Status::Incomplete("Write stall");
}
TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
+ // Notify write_thread_ about the stall so it can setup a barrier and
+ // fail any pending writers with no_slowdown
+ write_thread_.BeginWriteStall();
+ TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
mutex_.Unlock();
// We will delay the write until we have slept for delay ms or
// we don't need a delay anymore
env_->SleepForMicroseconds(kDelayInterval);
}
mutex_.Lock();
+ write_thread_.EndWriteStall();
}
// Don't wait if there's a background error, even if its a soft error. We
// indefinitely
while (error_handler_.GetBGError().ok() && write_controller_.IsStopped()) {
if (write_options.no_slowdown) {
- return Status::Incomplete();
+ return Status::Incomplete("Write stall");
}
delayed = true;
+
+ // Notify write_thread_ about the stall so it can setup a barrier and
+ // fail any pending writers with no_slowdown
+ write_thread_.BeginWriteStall();
TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
bg_cv_.Wait();
+ write_thread_.EndWriteStall();
}
}
assert(!delayed || !write_options.no_slowdown);
}
}
+TEST_F(DBTest, MixedSlowdownOptions) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ options.write_buffer_size = 100000;
+ CreateAndReopenWithCF({"pikachu"}, options);
+ std::vector<port::Thread> threads;
+ std::atomic<int> thread_num(0);
+
+ std::function<void()> write_slowdown_func = [&]() {
+ int a = thread_num.fetch_add(1);
+ std::string key = "foo" + std::to_string(a);
+ WriteOptions wo;
+ wo.no_slowdown = false;
+ ASSERT_OK(dbfull()->Put(wo, key, "bar"));
+ };
+ std::function<void()> write_no_slowdown_func = [&]() {
+ int a = thread_num.fetch_add(1);
+ std::string key = "foo" + std::to_string(a);
+ WriteOptions wo;
+ wo.no_slowdown = true;
+ ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
+ };
+ // Use a small number to ensure a large delay that is still effective
+ // when we do Put
+ // TODO(myabandeh): this is time dependent and could potentially make
+ // the test flaky
+ auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
+ std::atomic<int> sleep_count(0);
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::DelayWrite:BeginWriteStallDone",
+ [&](void* /*arg*/) {
+ sleep_count.fetch_add(1);
+ if (threads.empty()) {
+ for (int i = 0; i < 2; ++i) {
+ threads.emplace_back(write_slowdown_func);
+ }
+ for (int i = 0; i < 2; ++i) {
+ threads.emplace_back(write_no_slowdown_func);
+ }
+ }
+ });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ WriteOptions wo;
+ wo.sync = false;
+ wo.disableWAL = false;
+ wo.no_slowdown = false;
+ dbfull()->Put(wo, "foo", "bar");
+ // We need the 2nd write to trigger delay. This is because delay is
+ // estimated based on the last write size which is 0 for the first write.
+ ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
+ token.reset();
+
+ for (auto& t : threads) {
+ t.join();
+ }
+ ASSERT_GE(sleep_count.load(), 1);
+
+ wo.no_slowdown = true;
+ ASSERT_OK(dbfull()->Put(wo, "foo3", "bar"));
+}
+
+TEST_F(DBTest, MixedSlowdownOptionsInQueue) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ options.write_buffer_size = 100000;
+ CreateAndReopenWithCF({"pikachu"}, options);
+ std::vector<port::Thread> threads;
+ std::atomic<int> thread_num(0);
+
+ std::function<void()> write_no_slowdown_func = [&]() {
+ int a = thread_num.fetch_add(1);
+ std::string key = "foo" + std::to_string(a);
+ WriteOptions wo;
+ wo.no_slowdown = true;
+ ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
+ };
+ // Use a small number to ensure a large delay that is still effective
+ // when we do Put
+ // TODO(myabandeh): this is time dependent and could potentially make
+ // the test flaky
+ auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
+ std::atomic<int> sleep_count(0);
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::DelayWrite:Sleep",
+ [&](void* /*arg*/) {
+ sleep_count.fetch_add(1);
+ if (threads.empty()) {
+ for (int i = 0; i < 2; ++i) {
+ threads.emplace_back(write_no_slowdown_func);
+ }
+ // Sleep for 2s to allow the threads to insert themselves into the
+ // write queue
+ env_->SleepForMicroseconds(3000000ULL);
+ }
+ });
+ std::atomic<int> wait_count(0);
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::DelayWrite:Wait",
+ [&](void* /*arg*/) { wait_count.fetch_add(1); });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ WriteOptions wo;
+ wo.sync = false;
+ wo.disableWAL = false;
+ wo.no_slowdown = false;
+ dbfull()->Put(wo, "foo", "bar");
+ // We need the 2nd write to trigger delay. This is because delay is
+ // estimated based on the last write size which is 0 for the first write.
+ ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
+ token.reset();
+
+ for (auto& t : threads) {
+ t.join();
+ }
+ ASSERT_EQ(sleep_count.load(), 1);
+ ASSERT_GE(wait_count.load(), 0);
+}
+
+TEST_F(DBTest, MixedSlowdownOptionsStop) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ options.write_buffer_size = 100000;
+ CreateAndReopenWithCF({"pikachu"}, options);
+ std::vector<port::Thread> threads;
+ std::atomic<int> thread_num(0);
+
+ std::function<void()> write_slowdown_func = [&]() {
+ int a = thread_num.fetch_add(1);
+ std::string key = "foo" + std::to_string(a);
+ WriteOptions wo;
+ wo.no_slowdown = false;
+ ASSERT_OK(dbfull()->Put(wo, key, "bar"));
+ };
+ std::function<void()> write_no_slowdown_func = [&]() {
+ int a = thread_num.fetch_add(1);
+ std::string key = "foo" + std::to_string(a);
+ WriteOptions wo;
+ wo.no_slowdown = true;
+ ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
+ };
+ std::function<void()> wakeup_writer = [&]() {
+ dbfull()->mutex_.Lock();
+ dbfull()->bg_cv_.SignalAll();
+ dbfull()->mutex_.Unlock();
+ };
+ // Use a small number to ensure a large delay that is still effective
+ // when we do Put
+ // TODO(myabandeh): this is time dependent and could potentially make
+ // the test flaky
+ auto token = dbfull()->TEST_write_controler().GetStopToken();
+ std::atomic<int> wait_count(0);
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::DelayWrite:Wait",
+ [&](void* /*arg*/) {
+ wait_count.fetch_add(1);
+ if (threads.empty()) {
+ for (int i = 0; i < 2; ++i) {
+ threads.emplace_back(write_slowdown_func);
+ }
+ for (int i = 0; i < 2; ++i) {
+ threads.emplace_back(write_no_slowdown_func);
+ }
+ // Sleep for 2s to allow the threads to insert themselves into the
+ // write queue
+ env_->SleepForMicroseconds(3000000ULL);
+ }
+ token.reset();
+ threads.emplace_back(wakeup_writer);
+ });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ WriteOptions wo;
+ wo.sync = false;
+ wo.disableWAL = false;
+ wo.no_slowdown = false;
+ dbfull()->Put(wo, "foo", "bar");
+ // We need the 2nd write to trigger delay. This is because delay is
+ // estimated based on the last write size which is 0 for the first write.
+ ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
+ token.reset();
+
+ for (auto& t : threads) {
+ t.join();
+ }
+ ASSERT_GE(wait_count.load(), 1);
+
+ wo.no_slowdown = true;
+ ASSERT_OK(dbfull()->Put(wo, "foo3", "bar"));
+}
#ifndef ROCKSDB_LITE
TEST_F(DBTest, LevelLimitReopen) {
enable_pipelined_write_(db_options.enable_pipelined_write),
newest_writer_(nullptr),
newest_memtable_writer_(nullptr),
- last_sequence_(0) {}
+ last_sequence_(0),
+ write_stall_dummy_(),
+ stall_mu_(),
+ stall_cv_(&stall_mu_) {}
uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
// We're going to block. Lazily create the mutex. We guarantee
assert(w->state == STATE_INIT);
Writer* writers = newest_writer->load(std::memory_order_relaxed);
while (true) {
+ // If write stall in effect, and w->no_slowdown is not true,
+ // block here until stall is cleared. If its true, then return
+ // immediately
+ if (writers == &write_stall_dummy_) {
+ if (w->no_slowdown) {
+ w->status = Status::Incomplete("Write stall");
+ SetState(w, STATE_COMPLETED);
+ return false;
+ }
+ // Since no_slowdown is false, wait here to be notified of the write
+ // stall clearing
+ {
+ MutexLock lock(&stall_mu_);
+ writers = newest_writer->load(std::memory_order_relaxed);
+ if (writers == &write_stall_dummy_) {
+ stall_cv_.Wait();
+ // Load newest_writers_ again since it may have changed
+ writers = newest_writer->load(std::memory_order_relaxed);
+ continue;
+ }
+ }
+ }
w->link_older = writers;
if (newest_writer->compare_exchange_weak(writers, w)) {
return (writers == nullptr);
SetState(w, STATE_COMPLETED);
}
+void WriteThread::BeginWriteStall() {
+ LinkOne(&write_stall_dummy_, &newest_writer_);
+
+ // Walk writer list until w->write_group != nullptr. The current write group
+ // will not have a mix of slowdown/no_slowdown, so its ok to stop at that
+ // point
+ Writer* w = write_stall_dummy_.link_older;
+ Writer* prev = &write_stall_dummy_;
+ while (w != nullptr && w->write_group == nullptr) {
+ if (w->no_slowdown) {
+ prev->link_older = w->link_older;
+ w->status = Status::Incomplete("Write stall");
+ SetState(w, STATE_COMPLETED);
+ w = prev->link_older;
+ } else {
+ prev = w;
+ w = w->link_older;
+ }
+ }
+}
+
+void WriteThread::EndWriteStall() {
+ MutexLock lock(&stall_mu_);
+
+ assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
+ newest_writer_.exchange(write_stall_dummy_.link_older);
+
+ // Wake up writers
+ stall_cv_.SignalAll();
+}
+
static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup");
void WriteThread::JoinBatchGroup(Writer* w) {
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w);
assert(w->batch != nullptr);
bool linked_as_leader = LinkOne(w, &newest_writer_);
+
if (linked_as_leader) {
SetState(w, STATE_GROUP_LEADER);
}