]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Add public API RateLimiter::GetTotalPendingRequests() (#8890)
authorHui Xiao <huixiao@fb.com>
Fri, 10 Sep 2021 15:35:59 +0000 (08:35 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Fri, 10 Sep 2021 15:37:04 +0000 (08:37 -0700)
Summary:
Context/Summary:
As users requested, a public API RateLimiter::GetTotalPendingRequests() is added to expose the total number of pending requests for bytes in the rate limiter, which is the size of the request queue of that priority (or of all priorities, if IO_TOTAL is interested) at the time when this API is called.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8890

Test Plan:
- Passing added new unit tests
- Passing existing unit tests

Reviewed By: ajkr

Differential Revision: D30815500

Pulled By: hx235

fbshipit-source-id: 2dfa990f651c1c47378b6215c751ad76a5824300

HISTORY.md
include/rocksdb/rate_limiter.h
util/rate_limiter.cc
util/rate_limiter.h
util/rate_limiter_test.cc

index 821e58b243cc1061e9c4da08c0c378e0d5b33bbc..ec65434d37937690344a77f2addad22b111296cd 100644 (file)
@@ -19,6 +19,7 @@
 
 ### Public API change
 * Remove obsolete implementation details FullKey and ParseFullKey from public API
+* Add a public API RateLimiter::GetTotalPendingRequests() for the total number of requests that are pending for bytes in the rate limiter.
 
 ## 6.24.0 (2021-08-20)
 ### Bug Fixes
index 0ee89f5c8091b063fabe445db44f172a7b22d161..518db7aa6a6d44f5b602296887ea2a1978628aa1 100644 (file)
@@ -89,6 +89,10 @@ class RateLimiter {
   virtual int64_t GetTotalRequests(
       const Env::IOPriority pri = Env::IO_TOTAL) const = 0;
 
+  // Total # of requests that are pending for bytes in rate limiter
+  virtual int64_t GetTotalPendingRequests(
+      const Env::IOPriority pri = Env::IO_TOTAL) const = 0;
+
   virtual int64_t GetBytesPerSecond() const = 0;
 
   virtual bool IsRateLimited(OpType op_type) {
index 7d7936e5f144c5c4fdf96d29f213b2e228306be4..2faafdbbb4df8cbfc4a2a4ec8e80e85e1070f595 100644 (file)
@@ -143,7 +143,8 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
   // Request cannot be satisfied at this moment, enqueue
   Req r(bytes, &request_mutex_);
   queue_[pri].push_back(&r);
-
+  TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostEnqueueRequest",
+                           &request_mutex_);
   // A thread representing a queued request coordinates with other such threads.
   // There are two main duties.
   //
index f3f593c366f6d9d85728ab742a0d5843771a4b9e..7596374b06c014c8c726b34815d1c8ef11daf16e 100644 (file)
@@ -72,6 +72,19 @@ class GenericRateLimiter : public RateLimiter {
     return total_requests_[pri];
   }
 
+  virtual int64_t GetTotalPendingRequests(
+      const Env::IOPriority pri = Env::IO_TOTAL) const override {
+    MutexLock g(&request_mutex_);
+    if (pri == Env::IO_TOTAL) {
+      int64_t total_pending_requests_sum = 0;
+      for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+        total_pending_requests_sum += static_cast<int64_t>(queue_[i].size());
+      }
+      return total_pending_requests_sum;
+    }
+    return static_cast<int64_t>(queue_[pri].size());
+  }
+
   virtual int64_t GetBytesPerSecond() const override {
     return rate_bytes_per_sec_;
   }
index 27bbc2d3176d42a5967c341dc1e05d650abb7e02..cae82da34ceae320d5ffa23a5b2321e340dde968 100644 (file)
@@ -15,6 +15,7 @@
 #include <limits>
 
 #include "db/db_test_util.h"
+#include "port/port.h"
 #include "rocksdb/system_clock.h"
 #include "test_util/sync_point.h"
 #include "test_util/testharness.h"
@@ -89,6 +90,47 @@ TEST_F(RateLimiterTest, GetTotalRequests) {
          "Env::IO_TOTAL";
 }
 
+TEST_F(RateLimiterTest, GetTotalPendingRequests) {
+  std::unique_ptr<RateLimiter> limiter(
+      NewGenericRateLimiter(20 /* rate_bytes_per_sec */));
+  for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
+    ASSERT_EQ(limiter->GetTotalPendingRequests(static_cast<Env::IOPriority>(i)),
+              0);
+  }
+  // This is a variable for making sure the following callback is called
+  // and the assertions in it are indeed excuted
+  bool nonzero_pending_requests_verified_ = false;
+  SyncPoint::GetInstance()->SetCallBack(
+      "GenericRateLimiter::Request:PostEnqueueRequest", [&](void* arg) {
+        port::Mutex* request_mutex = (port::Mutex*)arg;
+        // We temporarily unlock the mutex so that the following
+        // GetTotalPendingRequests() can acquire it
+        request_mutex->Unlock();
+        EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 1);
+        EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0);
+        EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0);
+        EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0);
+        EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 1);
+        // We lock the mutex again so that the request thread can resume running
+        // with the mutex locked
+        request_mutex->Lock();
+        nonzero_pending_requests_verified_ = true;
+      });
+
+  SyncPoint::GetInstance()->EnableProcessing();
+  limiter->Request(20, Env::IO_USER, nullptr /* stats */,
+                   RateLimiter::OpType::kWrite);
+  ASSERT_EQ(nonzero_pending_requests_verified_, true);
+  EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 0);
+  EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0);
+  EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0);
+  EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0);
+  EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 0);
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearCallBack(
+      "GenericRateLimiter::Request:PostEnqueueRequest");
+}
+
 TEST_F(RateLimiterTest, Modes) {
   for (auto mode : {RateLimiter::Mode::kWritesOnly,
                     RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) {