]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Add rate-limiting support to batched MultiGet() (#10159)
authorHui Xiao <huixiao@fb.com>
Fri, 17 Jun 2022 23:40:47 +0000 (16:40 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Fri, 17 Jun 2022 23:40:47 +0000 (16:40 -0700)
Summary:
**Context/Summary:**
https://github.com/facebook/rocksdb/pull/9424 added rate-limiting support for user reads, which does not include batched `MultiGet()`s that call `RandomAccessFileReader::MultiRead()`. The reason is that it's harder (compared with RandomAccessFileReader::Read()) to implement the ideal rate-limiting where we first call `RateLimiter::RequestToken()` for allowed bytes to multi-read and then consume those bytes by satisfying as many requests in `MultiRead()` as possible. For example, it can be tricky to decide whether we want partially fulfilled requests within one `MultiRead()` or not.

However, due to a recent urgent user request, we decide to pursue an elementary (but a conditionally ineffective) solution where we accumulate enough rate limiter requests toward the total bytes needed by one `MultiRead()` before doing that `MultiRead()`. This is not ideal when the total bytes are huge as we will actually consume a huge bandwidth from rate-limiter causing a burst on disk. This is not what we ultimately want with rate limiter. Therefore a follow-up work is noted through TODO comments.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10159

Test Plan:
- Modified existing unit test `DBRateLimiterOnReadTest/DBRateLimiterOnReadTest.NewMultiGet`
- Traced the underlying system calls `io_uring_enter` and verified they are 10 seconds apart from each other correctly under the setting of  `strace -ftt -e trace=io_uring_enter ./db_bench -benchmarks=multireadrandom -db=/dev/shm/testdb2 -readonly -num=50 -threads=1 -multiread_batched=1 -batch_size=100 -duration=10 -rate_limiter_bytes_per_sec=200 -rate_limiter_refill_period_us=1000000 -rate_limit_bg_reads=1 -disable_auto_compactions=1 -rate_limit_user_ops=1` where each `MultiRead()` read about 2000 bytes (inspected by debugger) and the rate limiter grants 200 bytes per seconds.
- Stress test:
   - Verified `./db_stress (-test_cf_consistency=1/test_batches_snapshots=1) -use_multiget=1 -cache_size=1048576 -rate_limiter_bytes_per_sec=10241024 -rate_limit_bg_reads=1 -rate_limit_user_ops=1` work

Reviewed By: ajkr, anand1976

Differential Revision: D37135172

Pulled By: hx235

fbshipit-source-id: 73b8e8f14761e5d4b77235dfe5d41f4eea968bcd

HISTORY.md
db/db_rate_limiter_test.cc
db_stress_tool/batched_ops_stress.cc
db_stress_tool/cf_consistency_stress.cc
db_stress_tool/no_batched_ops_stress.cc
file/random_access_file_reader.cc
include/rocksdb/options.h
tools/db_bench_tool.cc
utilities/backup/backup_engine.cc

index b5cb8cd251f0b376354732a93f43a158e8d74445..a115c051a2ca4a4a1770d387ef04f1e40f185b38 100644 (file)
@@ -36,6 +36,7 @@
 * Add a new column family option `blob_file_starting_level` to enable writing blob files during flushes and compactions starting from the specified LSM tree level.
 * Add support for timestamped snapshots (#9879)
 * Provide support for AbortIO in posix to cancel submitted asynchronous requests using io_uring.
+* Add support for rate-limiting batched `MultiGet()` APIs
 
 ### Behavior changes
 * DB::Open(), DB::OpenAsSecondary() will fail if a Logger cannot be created (#9984)
index f30af19740d97afb6113319c9ec130dc457eca8d..e44cc047dcd2e510572e6f66975ab5a8502a40bc 100644 (file)
@@ -139,8 +139,6 @@ TEST_P(DBRateLimiterOnReadTest, Get) {
 }
 
 TEST_P(DBRateLimiterOnReadTest, NewMultiGet) {
-  // The new void-returning `MultiGet()` APIs use `MultiRead()`, which does not
-  // yet support rate limiting.
   if (use_direct_io_ && !IsDirectIOSupported()) {
     return;
   }
@@ -149,6 +147,7 @@ TEST_P(DBRateLimiterOnReadTest, NewMultiGet) {
   ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
 
   const int kNumKeys = kNumFiles * kNumKeysPerFile;
+  int64_t expected = 0;
   {
     std::vector<std::string> key_bufs;
     key_bufs.reserve(kNumKeys);
@@ -160,13 +159,19 @@ TEST_P(DBRateLimiterOnReadTest, NewMultiGet) {
     }
     std::vector<Status> statuses(kNumKeys);
     std::vector<PinnableSlice> values(kNumKeys);
+    const int64_t prev_total_rl_req = options_.rate_limiter->GetTotalRequests();
     db_->MultiGet(GetReadOptions(), dbfull()->DefaultColumnFamily(), kNumKeys,
                   keys.data(), values.data(), statuses.data());
+    const int64_t cur_total_rl_req = options_.rate_limiter->GetTotalRequests();
     for (int i = 0; i < kNumKeys; ++i) {
-      ASSERT_TRUE(statuses[i].IsNotSupported());
+      ASSERT_TRUE(statuses[i].ok());
     }
+    ASSERT_GT(cur_total_rl_req, prev_total_rl_req);
+    ASSERT_EQ(cur_total_rl_req - prev_total_rl_req,
+              options_.rate_limiter->GetTotalRequests(Env::IO_USER));
   }
-  ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
+  expected += kNumKeys;
+  ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
 }
 
 TEST_P(DBRateLimiterOnReadTest, OldMultiGet) {
index 13f3aba5cc6a8a5e249dd6eb6c2dc087e966c068..52287c0aee70869325142604fe767c5b52456ea9 100644 (file)
@@ -190,6 +190,8 @@ class BatchedOpsStressTest : public StressTest {
       std::vector<Status> statuses(num_prefixes);
       ReadOptions readoptionscopy = readoptions;
       readoptionscopy.snapshot = db_->GetSnapshot();
+      readoptionscopy.rate_limiter_priority =
+          FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
       std::vector<std::string> key_str;
       key_str.reserve(num_prefixes);
       key_slices.reserve(num_prefixes);
index b7cc4c376c7a61ac80ddd9c9bae0ac00bf2ce29a..4f6530590b3b1077f7ef437f88e8cb09e43afe11 100644 (file)
@@ -214,12 +214,15 @@ class CfConsistencyStressTest : public StressTest {
     std::vector<PinnableSlice> values(num_keys);
     std::vector<Status> statuses(num_keys);
     ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
+    ReadOptions readoptionscopy = read_opts;
+    readoptionscopy.rate_limiter_priority =
+        FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
 
     for (size_t i = 0; i < num_keys; ++i) {
       key_str.emplace_back(Key(rand_keys[i]));
       keys.emplace_back(key_str.back());
     }
-    db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
+    db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
                   statuses.data());
     for (auto s : statuses) {
       if (s.ok()) {
index 08cd9b91a3971532f7b202ece115a8812b071ddb..16e238501edc070bb962aa3fe4b813c80716fca8 100644 (file)
@@ -391,6 +391,8 @@ class NonBatchedOpsStressTest : public StressTest {
     if (do_consistency_check) {
       readoptionscopy.snapshot = db_->GetSnapshot();
     }
+    readoptionscopy.rate_limiter_priority =
+        FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
 
     // To appease clang analyzer
     const bool use_txn = FLAGS_use_txn;
index e74b78cc4eb6070022e3c2d2ec83f9bd87b0ef57..d02b7b5f6eafa2ec1e1a3b93700931d0da919b17 100644 (file)
@@ -270,9 +270,6 @@ bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
 IOStatus RandomAccessFileReader::MultiRead(
     const IOOptions& opts, FSReadRequest* read_reqs, size_t num_reqs,
     AlignedBuf* aligned_buf, Env::IOPriority rate_limiter_priority) const {
-  if (rate_limiter_priority != Env::IO_TOTAL) {
-    return IOStatus::NotSupported("Unable to rate limit MultiRead()");
-  }
   (void)aligned_buf;  // suppress warning of unused variable in LITE mode
   assert(num_reqs > 0);
 
@@ -359,6 +356,30 @@ IOStatus RandomAccessFileReader::MultiRead(
 
     {
       IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
+      if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) {
+        // TODO: ideally we should call `RateLimiter::RequestToken()` for
+        // allowed bytes to multi-read and then consume those bytes by
+        // satisfying as many requests in `MultiRead()` as possible, instead of
+        // what we do here, which can cause burst when the
+        // `total_multi_read_size` is big.
+        size_t total_multi_read_size = 0;
+        assert(fs_reqs != nullptr);
+        for (size_t i = 0; i < num_fs_reqs; ++i) {
+          FSReadRequest& req = fs_reqs[i];
+          total_multi_read_size += req.len;
+        }
+        size_t remaining_bytes = total_multi_read_size;
+        size_t request_bytes = 0;
+        while (remaining_bytes > 0) {
+          request_bytes = std::min(
+              static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()),
+              remaining_bytes);
+          rate_limiter_->Request(request_bytes, rate_limiter_priority,
+                                 nullptr /* stats */,
+                                 RateLimiter::OpType::kRead);
+          remaining_bytes -= request_bytes;
+        }
+      }
       io_s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, nullptr);
     }
 
index 72a2f7de11d5bfcd217d6f41bf020e851bd5baee..542955e9050e408e83fce5ecf5a9f4bea8e5eaf0 100644 (file)
@@ -1641,10 +1641,6 @@ struct ReadOptions {
   // is a `PlainTableFactory`) and cuckoo tables (these can exist when
   // `ColumnFamilyOptions::table_factory` is a `CuckooTableFactory`).
   //
-  // The new `DB::MultiGet()` APIs (i.e., the ones returning `void`) will return
-  // `Status::NotSupported` when that operation requires file read(s) and
-  // `rate_limiter_priority != Env::IO_TOTAL`.
-  //
   // The bytes charged to rate limiter may not exactly match the file read bytes
   // since there are some seemingly insignificant reads, like for file
   // headers/footers, that we currently do not charge to rate limiter.
index 9cebbbca97381f06e888573bb0d0d5ff1c63ce08..5b20143fda567a6194a098393fffb1de6abd44bf 100644 (file)
@@ -4562,6 +4562,9 @@ class Benchmark {
         options.rate_limiter.reset(NewGenericRateLimiter(
             FLAGS_rate_limiter_bytes_per_sec,
             FLAGS_rate_limiter_refill_period_us, 10 /* fairness */,
+            // TODO: replace this with a more general FLAG for deciding
+            // RateLimiter::Mode as now we also rate-limit foreground reads e.g,
+            // Get()/MultiGet()
             FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
                                       : RateLimiter::Mode::kWritesOnly,
             FLAGS_rate_limiter_auto_tuned));
index a14dbc8800b8dbb0bf799e69d73ae73a11641ae6..37a8cfff6fbcfca011367cb9ae5f69e6de60c1bf 100644 (file)
@@ -296,6 +296,9 @@ class BackupEngineImpl {
     }
   };
 
+  // TODO: deprecate this function once we migrate all BackupEngine's rate
+  // limiting to lower-level ones (i.e, ones in file access wrapper level like
+  // `WritableFileWriter`)
   static void LoopRateLimitRequestHelper(const size_t total_bytes_to_request,
                                          RateLimiter* rate_limiter,
                                          const Env::IOPriority pri,