]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
WriteOptions.low_pri which can throttle low pri writes if needed
authorSiying Dong <siying.d@fb.com>
Mon, 5 Jun 2017 21:42:34 +0000 (14:42 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Mon, 5 Jun 2017 22:02:35 +0000 (15:02 -0700)
Summary:
If ReadOptions.low_pri=true and compaction is behind, the write will either return immediate or be slowed down based on ReadOptions.no_slowdown.
Closes https://github.com/facebook/rocksdb/pull/2369

Differential Revision: D5127619

Pulled By: siying

fbshipit-source-id: d30e1cff515890af0eff32dfb869d2e4c9545eb0

HISTORY.md
db/column_family.cc
db/db_impl.cc
db/db_impl.h
db/db_impl_write.cc
db/db_test.cc
db/db_test2.cc
db/write_controller.cc
db/write_controller.h
include/rocksdb/options.h
util/rate_limiter.cc

index 7d1dad42be954103f8c4ba8fe10ed5a78e1d6572..5a5d63ea5a8b0b933f3f6174bceeddb61fba597a 100644 (file)
@@ -10,6 +10,7 @@
 * Change ticker/histogram statistics implementations to use core-local storage. This improves aggregation speed compared to our previous thread-local approach, particularly for applications with many threads.
 * Users can pass a cache object to write buffer manager, so that they can cap memory usage for memtable and block cache using one single limit.
 * Flush will be triggered when 7/8 of the limit introduced by write_buffer_manager or db_write_buffer_size is triggered, so that the hard threshold is hard to hit.
+* Introduce WriteOptions.low_pri. If it is true, low priority writes will be throttled if the compaction is behind.
 
 ## 5.5.0 (05/17/2017)
 ### New Features
index bf4d8a72db9dbcf9f52263829f46161728f7d21c..72186b706a0ff53f61b1211ab43f32eb9db50e7d 100644 (file)
@@ -760,6 +760,12 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
         uint64_t write_rate = write_controller->delayed_write_rate();
         write_controller->set_delayed_write_rate(static_cast<uint64_t>(
             static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
+        // Set the low pri limit to be 1/4 the delayed write rate.
+        // Note we don't reset this value even after delay condition is relased.
+        // Low-pri rate will continue to apply if there is a compaction
+        // pressure.
+        write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
+                                                                    4);
       }
     }
     prev_compaction_needed_bytes_ = compaction_needed_bytes;
index 37eb0f203254f1568ea669ae8127906af84da870..0f412a754bc3eac5f098d706b5e73100eedd97c0 100644 (file)
@@ -134,6 +134,8 @@ void DumpSupportInfo(Logger* logger) {
   ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %d",
                    crc32c::IsFastCrc32Supported());
 }
+
+int64_t kDefaultLowPriThrottledRate = 2 * 1024 * 1024;
 } // namespace
 
 DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
@@ -159,6 +161,11 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
       write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
       write_thread_(immutable_db_options_),
       write_controller_(mutable_db_options_.delayed_write_rate),
+      // Use delayed_write_rate as a base line to determine the initial
+      // low pri write rate limit. It may be adjusted later.
+      low_pri_write_rate_limiter_(NewGenericRateLimiter(std::min(
+          static_cast<int64_t>(mutable_db_options_.delayed_write_rate / 8),
+          kDefaultLowPriThrottledRate))),
       last_batch_group_size_(0),
       unscheduled_flushes_(0),
       unscheduled_compactions_(0),
index a35312f04a225770be0ca482cc6d6e31bf715860..fa3a294dfd24bca06d91cafecf9ea4688d8877d3 100644 (file)
@@ -722,6 +722,9 @@ class DBImpl : public DB {
   //            `num_bytes` going through.
   Status DelayWrite(uint64_t num_bytes, const WriteOptions& write_options);
 
+  Status ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
+                                      WriteBatch* my_batch);
+
   Status ScheduleFlushes(WriteContext* context);
 
   Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
@@ -941,6 +944,8 @@ class DBImpl : public DB {
 
   WriteController write_controller_;
 
+  unique_ptr<RateLimiter> low_pri_write_rate_limiter_;
+
   // Size of the last batch group. In slowdown mode, next write needs to
   // sleep if it uses up the quota.
   uint64_t last_batch_group_size_;
index 3fa103ff7ea9deb019e94ec23914dc8a495ef8f9..477fc7469a1c90d478893800659b8aac7a34f431 100644 (file)
@@ -66,13 +66,19 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
     return Status::Corruption("Batch is nullptr!");
   }
 
+  Status status;
+  if (write_options.low_pri) {
+    status = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
+    if (!status.ok()) {
+      return status;
+    }
+  }
+
   if (immutable_db_options_.enable_pipelined_write) {
     return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
                               log_ref, disable_memtable);
   }
 
-  Status status;
-
   PERF_TIMER_GUARD(write_pre_and_post_process_time);
   WriteThread::Writer w(write_options, my_batch, callback, log_ref,
                         disable_memtable);
@@ -742,6 +748,34 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
   return bg_error_;
 }
 
+Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
+                                            WriteBatch* my_batch) {
+  assert(write_options.low_pri);
+  // This is called outside the DB mutex. Although it is safe to make the call,
+  // the consistency condition is not guaranteed to hold. It's OK to live with
+  // it in this case.
+  // If we need to speed compaction, it means the compaction is left behind
+  // and we start to limit low pri writes to a limit.
+  if (write_controller_.NeedSpeedupCompaction()) {
+    if (allow_2pc() && (my_batch->HasCommit() || my_batch->HasRollback())) {
+      // For 2PC, we only rate limit prepare, not commit.
+      return Status::OK();
+    }
+    if (write_options.no_slowdown) {
+      return Status::Incomplete();
+    } else {
+      assert(my_batch != nullptr);
+      // Rate limit those writes. The reason that we don't completely wait
+      // is that in case the write is heavy, low pri writes may never have
+      // a chance to run. Now we guarantee we are still slowly making
+      // progress.
+      write_controller_.low_pri_rate_limiter()->Request(my_batch->GetDataSize(),
+                                                        Env::IO_HIGH, nullptr);
+    }
+  }
+  return Status::OK();
+}
+
 Status DBImpl::ScheduleFlushes(WriteContext* context) {
   ColumnFamilyData* cfd;
   while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
index 93e5fe609b0740db4a076cb4eec84fbc8c3befdf..52ee7306842a471b8eb9972bca1af9bf8f9e2bb4 100644 (file)
@@ -5155,7 +5155,6 @@ TEST_F(DBTest, PauseBackgroundWorkTest) {
   // now it's done
   ASSERT_TRUE(done.load());
 }
-
 }  // namespace rocksdb
 
 int main(int argc, char** argv) {
index 714342a8b1afc36a8803fbd8b4b92ff9f2d40a19..42403f2aae82fbafbe991e424708efa50b7254b2 100644 (file)
@@ -2186,6 +2186,56 @@ TEST_F(DBTest2, MemtableOnlyIterator) {
   ASSERT_EQ(1, count);
   delete it;
 }
+
+TEST_F(DBTest2, LowPriWrite) {
+  Options options = CurrentOptions();
+  // Compaction pressure should trigger since 6 files
+  options.level0_file_num_compaction_trigger = 4;
+  options.level0_slowdown_writes_trigger = 12;
+  options.level0_stop_writes_trigger = 30;
+  options.delayed_write_rate = 8 * 1024 * 1024;
+  Reopen(options);
+
+  std::atomic<int> rate_limit_count(0);
+
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "GenericRateLimiter::Request:1", [&](void* arg) {
+        rate_limit_count.fetch_add(1);
+        int64_t* rate_bytes_per_sec = static_cast<int64_t*>(arg);
+        ASSERT_EQ(1024 * 1024, *rate_bytes_per_sec);
+      });
+  // Block compaction
+  rocksdb::SyncPoint::GetInstance()->LoadDependency({
+      {"DBTest.LowPriWrite:0", "DBImpl::BGWorkCompaction"},
+  });
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+  WriteOptions wo;
+  for (int i = 0; i < 6; i++) {
+    wo.low_pri = false;
+    Put("", "", wo);
+    wo.low_pri = true;
+    Put("", "", wo);
+    Flush();
+  }
+  ASSERT_EQ(0, rate_limit_count.load());
+  wo.low_pri = true;
+  Put("", "", wo);
+  ASSERT_EQ(1, rate_limit_count.load());
+  wo.low_pri = false;
+  Put("", "", wo);
+  ASSERT_EQ(1, rate_limit_count.load());
+
+  TEST_SYNC_POINT("DBTest.LowPriWrite:0");
+  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+
+  dbfull()->TEST_WaitForCompact();
+  wo.low_pri = true;
+  Put("", "", wo);
+  ASSERT_EQ(1, rate_limit_count.load());
+  wo.low_pri = false;
+  Put("", "", wo);
+  ASSERT_EQ(1, rate_limit_count.load());
+}
 }  // namespace rocksdb
 
 int main(int argc, char** argv) {
index 3437127d873e2362227dab779681754182ead655..699044ec20a7d9db4816957c567c62290d0b103c 100644 (file)
@@ -36,17 +36,19 @@ WriteController::GetCompactionPressureToken() {
       new CompactionPressureToken(this));
 }
 
-bool WriteController::IsStopped() const { return total_stopped_ > 0; }
+bool WriteController::IsStopped() const {
+  return total_stopped_.load(std::memory_order_relaxed) > 0;
+}
 // This is inside DB mutex, so we can't sleep and need to minimize
 // frequency to get time.
 // If it turns out to be a performance issue, we can redesign the thread
 // synchronization model here.
 // The function trust caller will sleep micros returned.
 uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) {
-  if (total_stopped_ > 0) {
+  if (total_stopped_.load(std::memory_order_relaxed) > 0) {
     return 0;
   }
-  if (total_delayed_.load() == 0) {
+  if (total_delayed_.load(std::memory_order_relaxed) == 0) {
     return 0;
   }
 
index 8e4c20826897f3216e9f345f96aad74db90ca7c3..c344a74d89004e4c6d914ef80e5497b4c676da5b 100644 (file)
@@ -11,6 +11,7 @@
 
 #include <atomic>
 #include <memory>
+#include "rocksdb/rate_limiter.h"
 
 namespace rocksdb {
 
@@ -23,12 +24,15 @@ class WriteControllerToken;
 // to be called while holding DB mutex
 class WriteController {
  public:
-  explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u)
+  explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u,
+                           int64_t low_pri_rate_bytes_per_sec = 1024 * 1024)
       : total_stopped_(0),
         total_delayed_(0),
         total_compaction_pressure_(0),
         bytes_left_(0),
-        last_refill_time_(0) {
+        last_refill_time_(0),
+        low_pri_rate_limiter_(
+            NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) {
     set_max_delayed_write_rate(_delayed_write_rate);
   }
   ~WriteController() = default;
@@ -80,6 +84,8 @@ class WriteController {
 
   uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; }
 
+  RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); }
+
  private:
   uint64_t NowMicrosMonotonic(Env* env);
 
@@ -88,15 +94,17 @@ class WriteController {
   friend class DelayWriteToken;
   friend class CompactionPressureToken;
 
-  int total_stopped_;
+  std::atomic<int> total_stopped_;
   std::atomic<int> total_delayed_;
-  int total_compaction_pressure_;
+  std::atomic<int> total_compaction_pressure_;
   uint64_t bytes_left_;
   uint64_t last_refill_time_;
   // write rate set when initialization or by `DBImpl::SetDBOptions`
   uint64_t max_delayed_write_rate_;
   // current write rate
   uint64_t delayed_write_rate_;
+
+  std::unique_ptr<RateLimiter> low_pri_rate_limiter_;
 };
 
 class WriteControllerToken {
index 75f7a7a2af5496f04df642123eef3c08ef6fa992..283085a531291b8809099227e4435da2c610e772 100644 (file)
@@ -1079,11 +1079,21 @@ struct WriteOptions {
   // immediately with Status::Incomplete().
   bool no_slowdown;
 
+  // If true, this write request is of lower priority if compaction is
+  // behind. In this case, no_slowdown = true, the request will be cancelled
+  // immediately with Status::Incomplete() returned. Otherwise, it will be
+  // slowed down. The slowdown value is determined by RocksDB to guarantee
+  // it introduces minimum impacts to high priority writes.
+  //
+  // Default: false
+  bool low_pri;
+
   WriteOptions()
       : sync(false),
         disableWAL(false),
         ignore_missing_column_families(false),
-        no_slowdown(false) {}
+        no_slowdown(false),
+        low_pri(false) {}
 };
 
 // Options that control flush operations
index 8175745ee8299eb7525bc0b0dc311bf81d9ff95b..064764cb63bc75bce02a271e1d641125704ca9c7 100644 (file)
@@ -79,6 +79,8 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
                                  Statistics* stats) {
   assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed));
   TEST_SYNC_POINT("GenericRateLimiter::Request");
+  TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1",
+                           &rate_bytes_per_sec_);
   MutexLock g(&request_mutex_);
   if (stop_) {
     return;