]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Return Status::NotSupported() in RateLimiter::GetTotalPendingRequests default impl...
authorHui Xiao <huixiao@fb.com>
Thu, 23 Sep 2021 02:35:05 +0000 (19:35 -0700)
committerPeter Dillinger <peterd@fb.com>
Thu, 23 Sep 2021 04:41:04 +0000 (21:41 -0700)
Summary:
Context:
After more discussion, a fix in https://github.com/facebook/rocksdb/issues/8938 might turn out to be too restrictive for the case where `GetTotalPendingRequests` might be invoked on RateLimiter classes that does not support the recently added API `RateLimiter::GetTotalPendingRequests` (https://github.com/facebook/rocksdb/issues/8890) due to the `assert(false)` in https://github.com/facebook/rocksdb/issues/8938. Furthermore, sentinel value like `-1` proposed in https://github.com/facebook/rocksdb/issues/8938 is easy to be ignored and unchecked. Therefore we decided to adopt `Status::NotSupported()`, which is also a convention of adding new API to public header in RocksDB.
- Changed return value type of  `RateLimiter::GetTotalPendingRequests` in related declaration/definition
- Passed in pointer argument to hold the output instead of returning it as before
- Adapted to the changes above in calling `RateLimiter::GetTotalPendingRequests` in test
- Minor improvement to `TEST_F(RateLimiterTest, GetTotalPendingRequests)`:  added failure message for assertion and replaced repetitive statements with a loop

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8950

Reviewed By: ajkr, pdillinger

Differential Revision: D31128450

Pulled By: hx235

fbshipit-source-id: 282ac9c4f3dacaa0aec6d0a993161f77ad47a040

HISTORY.md
include/rocksdb/rate_limiter.h
util/rate_limiter.h
util/rate_limiter_test.cc

index 27fc91226cbb683c54241312f0c7afdae6a6800e..7b756745a4c3ea1dc928612538635be8f184d4f7 100644 (file)
@@ -27,7 +27,7 @@
 * Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file.
 * Batch blob read requests for `DB::MultiGet` using `MultiRead`.
 * Add support for fallback to local compaction, the user can return `CompactionServiceJobStatus::kUseLocal` to instruct RocksDB to run the compaction locally instead of waiting for the remote compaction result.
-* Add built-in rate limiter's implementation for `RateLimiter::GetTotalPendingRequests()` for the total number of requests that are pending for bytes in the rate limiter.
+* Add built-in rate limiter's implementation of `RateLimiter::GetTotalPendingRequest(int64_t* total_pending_requests, const Env::IOPriority pri)` for the total number of requests that are pending for bytes in the rate limiter.
 
 ### Public API change
 * Remove obsolete implementation details FullKey and ParseFullKey from public API
index 1e91047de080c39471b920dcf5690731fc7b661d..6bc0e7a4d54f92ae3db11049c3174cc97bb3f5d3 100644 (file)
@@ -11,6 +11,7 @@
 
 #include "rocksdb/env.h"
 #include "rocksdb/statistics.h"
+#include "rocksdb/status.h"
 
 namespace ROCKSDB_NAMESPACE {
 
@@ -90,14 +91,17 @@ class RateLimiter {
       const Env::IOPriority pri = Env::IO_TOTAL) const = 0;
 
   // Total # of requests that are pending for bytes in rate limiter
-  // For convenience, this function is implemented by the RateLimiter returned
-  // by NewGenericRateLimiter but is not required by RocksDB. The default
-  // implementation indicates "not supported".
-  virtual int64_t GetTotalPendingRequests(
+  // For convenience, this function is supported by the RateLimiter returned
+  // by NewGenericRateLimiter but is not required by RocksDB.
+  //
+  // REQUIRED: total_pending_request != nullptr
+  virtual Status GetTotalPendingRequests(
+      int64_t* total_pending_requests,
       const Env::IOPriority pri = Env::IO_TOTAL) const {
-    assert(false);
+    assert(total_pending_requests != nullptr);
+    (void)total_pending_requests;
     (void)pri;
-    return -1;
+    return Status::NotSupported();
   }
 
   virtual int64_t GetBytesPerSecond() const = 0;
index 7596374b06c014c8c726b34815d1c8ef11daf16e..1640ef07648faee8c854208dc9058336561b43c9 100644 (file)
@@ -17,6 +17,7 @@
 #include "port/port.h"
 #include "rocksdb/env.h"
 #include "rocksdb/rate_limiter.h"
+#include "rocksdb/status.h"
 #include "rocksdb/system_clock.h"
 #include "util/mutexlock.h"
 #include "util/random.h"
@@ -72,17 +73,21 @@ class GenericRateLimiter : public RateLimiter {
     return total_requests_[pri];
   }
 
-  virtual int64_t GetTotalPendingRequests(
+  virtual Status GetTotalPendingRequests(
+      int64_t* total_pending_requests,
       const Env::IOPriority pri = Env::IO_TOTAL) const override {
+    assert(total_pending_requests != nullptr);
     MutexLock g(&request_mutex_);
     if (pri == Env::IO_TOTAL) {
       int64_t total_pending_requests_sum = 0;
       for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
         total_pending_requests_sum += static_cast<int64_t>(queue_[i].size());
       }
-      return total_pending_requests_sum;
+      *total_pending_requests = total_pending_requests_sum;
+    } else {
+      *total_pending_requests = static_cast<int64_t>(queue_[pri].size());
     }
-    return static_cast<int64_t>(queue_[pri].size());
+    return Status::OK();
   }
 
   virtual int64_t GetBytesPerSecond() const override {
index f99394475a123728612a5d3ad5f32d9bd7e77db7..5ea3da4759b7436f4ad476276289701fd40bddb0 100644 (file)
@@ -100,9 +100,11 @@ TEST_F(RateLimiterTest, GetTotalPendingRequests) {
   std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(
       200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
       10 /* fairness */));
+  int64_t total_pending_requests = 0;
   for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
-    ASSERT_EQ(limiter->GetTotalPendingRequests(static_cast<Env::IOPriority>(i)),
-              0);
+    ASSERT_OK(limiter->GetTotalPendingRequests(
+        &total_pending_requests, static_cast<Env::IOPriority>(i)));
+    ASSERT_EQ(total_pending_requests, 0);
   }
   // This is a variable for making sure the following callback is called
   // and the assertions in it are indeed excuted
@@ -113,11 +115,23 @@ TEST_F(RateLimiterTest, GetTotalPendingRequests) {
         // We temporarily unlock the mutex so that the following
         // GetTotalPendingRequests() can acquire it
         request_mutex->Unlock();
-        EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 1);
-        EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0);
-        EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0);
-        EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0);
-        EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 1);
+        for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
+          EXPECT_OK(limiter->GetTotalPendingRequests(
+              &total_pending_requests, static_cast<Env::IOPriority>(i)))
+              << "Failed to return total pending requests for priority level = "
+              << static_cast<Env::IOPriority>(i);
+          if (i == Env::IO_USER || i == Env::IO_TOTAL) {
+            EXPECT_EQ(total_pending_requests, 1)
+                << "Failed to correctly return total pending requests for "
+                   "priority level = "
+                << static_cast<Env::IOPriority>(i);
+          } else {
+            EXPECT_EQ(total_pending_requests, 0)
+                << "Failed to correctly return total pending requests for "
+                   "priority level = "
+                << static_cast<Env::IOPriority>(i);
+          }
+        }
         // We lock the mutex again so that the request thread can resume running
         // with the mutex locked
         request_mutex->Lock();
@@ -128,11 +142,16 @@ TEST_F(RateLimiterTest, GetTotalPendingRequests) {
   limiter->Request(200, Env::IO_USER, nullptr /* stats */,
                    RateLimiter::OpType::kWrite);
   ASSERT_EQ(nonzero_pending_requests_verified, true);
-  EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 0);
-  EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0);
-  EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0);
-  EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0);
-  EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 0);
+  for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
+    EXPECT_OK(limiter->GetTotalPendingRequests(&total_pending_requests,
+                                               static_cast<Env::IOPriority>(i)))
+        << "Failed to return total pending requests for priority level = "
+        << static_cast<Env::IOPriority>(i);
+    EXPECT_EQ(total_pending_requests, 0)
+        << "Failed to correctly return total pending requests for priority "
+           "level = "
+        << static_cast<Env::IOPriority>(i);
+  }
   SyncPoint::GetInstance()->DisableProcessing();
   SyncPoint::GetInstance()->ClearCallBack(
       "GenericRateLimiter::Request:PostEnqueueRequest");