]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Handle mixed slowdown/no_slowdown writer properly (#4475)
authorAnand Ananthabhotla <anand76@devvm1373.frc2.facebook.com>
Wed, 10 Oct 2018 05:50:59 +0000 (22:50 -0700)
committerAnand Ananthabhotla <anand76@devvm1373.frc2.facebook.com>
Fri, 19 Oct 2018 23:29:57 +0000 (16:29 -0700)
Summary:
There is a bug when the write queue leader is blocked on a write
delay/stop, and the queue has writers with WriteOptions::no_slowdown set
to true. They are not woken up until the write stall is cleared.

The fix introduces a dummy writer inserted at the tail to indicate a
write stall and prevent further inserts into the queue, and a condition
variable that writers who can tolerate slowdown wait on before adding
themselves to the queue. The leader calls WriteThread::BeginWriteStall()
to add the dummy writer and then walk the queue to fail any writers with
no_slowdown set. Once the stall clears, the leader calls
WriteThread::EndWriteStall() to remove the dummy writer and signal the
condition variable.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4475

Differential Revision: D10285827

Pulled By: anand1976

fbshipit-source-id: 747465e5e7f07a829b1fb0bc1afcd7b93f4ab1a9

HISTORY.md
db/db_impl.h
db/db_impl_write.cc
db/db_test.cc
db/write_thread.cc
db/write_thread.h

index 6dc6a51102689c1442eec3db729dd0868e2de5f2..d94696b27044bbffe8c643778cfc1701fb273571 100644 (file)
@@ -11,6 +11,9 @@
 ### New Features
 * Introduced CacheAllocator, which lets the user specify custom allocator for memory in block cache.
 
+### Bug Fixes
+* Fix corner case where a write group leader blocked due to write stall blocks other writers in queue with WriteOptions::no_slowdown set.
+
 ## 5.17.0 (10/05/2018)
 ### Public API Change
 * `OnTableFileCreated` will now be called for empty files generated during compaction. In that case, `TableFileCreationInfo::file_path` will be "(nil)" and `TableFileCreationInfo::file_size` will be zero.
index dceffedffdcdbba83de188f3d83fa9313876f621..2da8eca608f3da8654d6b6115045ead25c4ec2a8 100644 (file)
@@ -813,6 +813,7 @@ class DBImpl : public DB {
   friend struct SuperVersion;
   friend class CompactedDBImpl;
   friend class DBTest_ConcurrentFlushWAL_Test;
+  friend class DBTest_MixedSlowdownOptionsStop_Test;
 #ifndef NDEBUG
   friend class DBTest2_ReadCallbackTest_Test;
   friend class WriteCallbackTest_WriteWithCallbackTest_Test;
index ff786e1130c1d1c79d151be7a3d030e8d836fae7..29b54bfd1e2dec8fe9074ad35fbcd71c16ff61a4 100644 (file)
@@ -1162,10 +1162,14 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
     uint64_t delay = write_controller_.GetDelay(env_, num_bytes);
     if (delay > 0) {
       if (write_options.no_slowdown) {
-        return Status::Incomplete();
+        return Status::Incomplete("Write stall");
       }
       TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
 
+      // Notify write_thread_ about the stall so it can setup a barrier and
+      // fail any pending writers with no_slowdown
+      write_thread_.BeginWriteStall();
+      TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
       mutex_.Unlock();
       // We will delay the write until we have slept for delay ms or
       // we don't need a delay anymore
@@ -1182,6 +1186,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
         env_->SleepForMicroseconds(kDelayInterval);
       }
       mutex_.Lock();
+      write_thread_.EndWriteStall();
     }
 
     // Don't wait if there's a background error, even if its a soft error. We
@@ -1190,11 +1195,16 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
     // indefinitely
     while (error_handler_.GetBGError().ok() && write_controller_.IsStopped()) {
       if (write_options.no_slowdown) {
-        return Status::Incomplete();
+        return Status::Incomplete("Write stall");
       }
       delayed = true;
+
+      // Notify write_thread_ about the stall so it can setup a barrier and
+      // fail any pending writers with no_slowdown
+      write_thread_.BeginWriteStall();
       TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
       bg_cv_.Wait();
+      write_thread_.EndWriteStall();
     }
   }
   assert(!delayed || !write_options.no_slowdown);
index f331f8e77b7ae45ebe5d6abf2e592fb918f094ee..682a382d7a7a9faf300b09bcecd6e4ba2d2dae50 100644 (file)
@@ -262,6 +262,196 @@ TEST_F(DBTest, SkipDelay) {
   }
 }
 
+TEST_F(DBTest, MixedSlowdownOptions) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.write_buffer_size = 100000;
+  CreateAndReopenWithCF({"pikachu"}, options);
+  std::vector<port::Thread> threads;
+  std::atomic<int> thread_num(0);
+
+  std::function<void()> write_slowdown_func = [&]() {
+    int a = thread_num.fetch_add(1);
+    std::string key = "foo" + std::to_string(a);
+    WriteOptions wo;
+    wo.no_slowdown = false;
+    ASSERT_OK(dbfull()->Put(wo, key, "bar"));
+  };
+  std::function<void()> write_no_slowdown_func = [&]() {
+    int a = thread_num.fetch_add(1);
+    std::string key = "foo" + std::to_string(a);
+    WriteOptions wo;
+    wo.no_slowdown = true;
+    ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
+  };
+  // Use a small number to ensure a large delay that is still effective
+  // when we do Put
+  // TODO(myabandeh): this is time dependent and could potentially make
+  // the test flaky
+  auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
+  std::atomic<int> sleep_count(0);
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::DelayWrite:BeginWriteStallDone",
+      [&](void* /*arg*/) {
+        sleep_count.fetch_add(1);
+        if (threads.empty()) {
+          for (int i = 0; i < 2; ++i) {
+            threads.emplace_back(write_slowdown_func);
+          }
+          for (int i = 0; i < 2; ++i) {
+            threads.emplace_back(write_no_slowdown_func);
+          }
+        }
+      });
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+  WriteOptions wo;
+  wo.sync = false;
+  wo.disableWAL = false;
+  wo.no_slowdown = false;
+  dbfull()->Put(wo, "foo", "bar");
+  // We need the 2nd write to trigger delay. This is because delay is
+  // estimated based on the last write size which is 0 for the first write.
+  ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
+          token.reset();
+
+  for (auto& t : threads) {
+    t.join();
+  }
+  ASSERT_GE(sleep_count.load(), 1);
+
+  wo.no_slowdown = true;
+  ASSERT_OK(dbfull()->Put(wo, "foo3", "bar"));
+}
+
+TEST_F(DBTest, MixedSlowdownOptionsInQueue) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.write_buffer_size = 100000;
+  CreateAndReopenWithCF({"pikachu"}, options);
+  std::vector<port::Thread> threads;
+  std::atomic<int> thread_num(0);
+
+  std::function<void()> write_no_slowdown_func = [&]() {
+    int a = thread_num.fetch_add(1);
+    std::string key = "foo" + std::to_string(a);
+    WriteOptions wo;
+    wo.no_slowdown = true;
+    ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
+  };
+  // Use a small number to ensure a large delay that is still effective
+  // when we do Put
+  // TODO(myabandeh): this is time dependent and could potentially make
+  // the test flaky
+  auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
+  std::atomic<int> sleep_count(0);
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::DelayWrite:Sleep",
+      [&](void* /*arg*/) {
+        sleep_count.fetch_add(1);
+        if (threads.empty()) {
+          for (int i = 0; i < 2; ++i) {
+            threads.emplace_back(write_no_slowdown_func);
+          }
+          // Sleep for 2s to allow the threads to insert themselves into the
+          // write queue
+          env_->SleepForMicroseconds(3000000ULL);
+        }
+      });
+  std::atomic<int> wait_count(0);
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::DelayWrite:Wait",
+      [&](void* /*arg*/) { wait_count.fetch_add(1); });
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+  WriteOptions wo;
+  wo.sync = false;
+  wo.disableWAL = false;
+  wo.no_slowdown = false;
+  dbfull()->Put(wo, "foo", "bar");
+  // We need the 2nd write to trigger delay. This is because delay is
+  // estimated based on the last write size which is 0 for the first write.
+  ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
+          token.reset();
+
+  for (auto& t : threads) {
+    t.join();
+  }
+  ASSERT_EQ(sleep_count.load(), 1);
+  ASSERT_GE(wait_count.load(), 0);
+}
+
+TEST_F(DBTest, MixedSlowdownOptionsStop) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.write_buffer_size = 100000;
+  CreateAndReopenWithCF({"pikachu"}, options);
+  std::vector<port::Thread> threads;
+  std::atomic<int> thread_num(0);
+
+  std::function<void()> write_slowdown_func = [&]() {
+    int a = thread_num.fetch_add(1);
+    std::string key = "foo" + std::to_string(a);
+    WriteOptions wo;
+    wo.no_slowdown = false;
+    ASSERT_OK(dbfull()->Put(wo, key, "bar"));
+  };
+  std::function<void()> write_no_slowdown_func = [&]() {
+    int a = thread_num.fetch_add(1);
+    std::string key = "foo" + std::to_string(a);
+    WriteOptions wo;
+    wo.no_slowdown = true;
+    ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
+  };
+  std::function<void()> wakeup_writer = [&]() {
+    dbfull()->mutex_.Lock();
+    dbfull()->bg_cv_.SignalAll();
+    dbfull()->mutex_.Unlock();
+  };
+  // Use a small number to ensure a large delay that is still effective
+  // when we do Put
+  // TODO(myabandeh): this is time dependent and could potentially make
+  // the test flaky
+  auto token = dbfull()->TEST_write_controler().GetStopToken();
+  std::atomic<int> wait_count(0);
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::DelayWrite:Wait",
+      [&](void* /*arg*/) {
+        wait_count.fetch_add(1);
+        if (threads.empty()) {
+          for (int i = 0; i < 2; ++i) {
+            threads.emplace_back(write_slowdown_func);
+          }
+          for (int i = 0; i < 2; ++i) {
+            threads.emplace_back(write_no_slowdown_func);
+          }
+          // Sleep for 2s to allow the threads to insert themselves into the
+          // write queue
+          env_->SleepForMicroseconds(3000000ULL);
+        }
+        token.reset();
+        threads.emplace_back(wakeup_writer);
+      });
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+  WriteOptions wo;
+  wo.sync = false;
+  wo.disableWAL = false;
+  wo.no_slowdown = false;
+  dbfull()->Put(wo, "foo", "bar");
+  // We need the 2nd write to trigger delay. This is because delay is
+  // estimated based on the last write size which is 0 for the first write.
+  ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
+          token.reset();
+
+  for (auto& t : threads) {
+    t.join();
+  }
+  ASSERT_GE(wait_count.load(), 1);
+
+  wo.no_slowdown = true;
+  ASSERT_OK(dbfull()->Put(wo, "foo3", "bar"));
+}
 #ifndef ROCKSDB_LITE
 
 TEST_F(DBTest, LevelLimitReopen) {
index 6eb140b6bc60a74c748fb3032a1d868578afa0db..5ea7715c6923a7cf07c0281eb631fccef47b2d09 100644 (file)
@@ -24,7 +24,10 @@ WriteThread::WriteThread(const ImmutableDBOptions& db_options)
       enable_pipelined_write_(db_options.enable_pipelined_write),
       newest_writer_(nullptr),
       newest_memtable_writer_(nullptr),
-      last_sequence_(0) {}
+      last_sequence_(0),
+      write_stall_dummy_(),
+      stall_mu_(),
+      stall_cv_(&stall_mu_) {}
 
 uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
   // We're going to block.  Lazily create the mutex.  We guarantee
@@ -219,6 +222,28 @@ bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
   assert(w->state == STATE_INIT);
   Writer* writers = newest_writer->load(std::memory_order_relaxed);
   while (true) {
+    // If write stall in effect, and w->no_slowdown is not true,
+    // block here until stall is cleared. If its true, then return
+    // immediately
+    if (writers == &write_stall_dummy_) {
+      if (w->no_slowdown) {
+        w->status = Status::Incomplete("Write stall");
+        SetState(w, STATE_COMPLETED);
+        return false;
+      }
+      // Since no_slowdown is false, wait here to be notified of the write
+      // stall clearing
+      {
+        MutexLock lock(&stall_mu_);
+        writers = newest_writer->load(std::memory_order_relaxed);
+        if (writers == &write_stall_dummy_) {
+          stall_cv_.Wait();
+          // Load newest_writers_ again since it may have changed
+          writers = newest_writer->load(std::memory_order_relaxed);
+          continue;
+        }
+      }
+    }
     w->link_older = writers;
     if (newest_writer->compare_exchange_weak(writers, w)) {
       return (writers == nullptr);
@@ -303,12 +328,44 @@ void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) {
   SetState(w, STATE_COMPLETED);
 }
 
+void WriteThread::BeginWriteStall() {
+  LinkOne(&write_stall_dummy_, &newest_writer_);
+
+  // Walk writer list until w->write_group != nullptr. The current write group
+  // will not have a mix of slowdown/no_slowdown, so its ok to stop at that
+  // point
+  Writer* w = write_stall_dummy_.link_older;
+  Writer* prev = &write_stall_dummy_;
+  while (w != nullptr && w->write_group == nullptr) {
+    if (w->no_slowdown) {
+      prev->link_older = w->link_older;
+      w->status = Status::Incomplete("Write stall");
+      SetState(w, STATE_COMPLETED);
+      w = prev->link_older;
+    } else {
+      prev = w;
+      w = w->link_older;
+    }
+  }
+}
+
+void WriteThread::EndWriteStall() {
+  MutexLock lock(&stall_mu_);
+
+  assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
+  newest_writer_.exchange(write_stall_dummy_.link_older);
+
+  // Wake up writers
+  stall_cv_.SignalAll();
+}
+
 static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup");
 void WriteThread::JoinBatchGroup(Writer* w) {
   TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w);
   assert(w->batch != nullptr);
 
   bool linked_as_leader = LinkOne(w, &newest_writer_);
+
   if (linked_as_leader) {
     SetState(w, STATE_GROUP_LEADER);
   }
index 31190199b4d3db0b6a1bc7505cae71c5bc57b787..a3802c996b0747a3b5aa9bcec7d1104fd5d029e8 100644 (file)
@@ -342,6 +342,13 @@ class WriteThread {
     return last_sequence_;
   }
 
+  // Insert a dummy writer at the tail of the write queue to indicate a write
+  // stall, and fail any writers in the queue with no_slowdown set to true
+  void BeginWriteStall();
+
+  // Remove the dummy writer and wake up waiting writers
+  void EndWriteStall();
+
  private:
   // See AwaitState.
   const uint64_t max_yield_usec_;
@@ -365,6 +372,17 @@ class WriteThread {
   // is not necessary visible to reads because the writer can be ongoing.
   SequenceNumber last_sequence_;
 
+  // A dummy writer to indicate a write stall condition. This will be inserted
+  // at the tail of the writer queue by the leader, so newer writers can just
+  // check for this and bail
+  Writer write_stall_dummy_;
+
+  // Mutex and condvar for writers to block on a write stall. During a write
+  // stall, writers with no_slowdown set to false will wait on this rather
+  // on the writer queue
+  port::Mutex stall_mu_;
+  port::CondVar stall_cv_;
+
   // Waits for w->state & goal_mask using w->StateMutex().  Returns
   // the state that satisfies goal_mask.
   uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);