return total_requests_[pri];
}
+ virtual int64_t GetTotalPendingRequests(
+ const Env::IOPriority pri = Env::IO_TOTAL) const override {
+ MutexLock g(&request_mutex_);
+ if (pri == Env::IO_TOTAL) {
+ int64_t total_pending_requests_sum = 0;
+ for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+ total_pending_requests_sum += static_cast<int64_t>(queue_[i].size());
+ }
+ return total_pending_requests_sum;
+ }
+ return static_cast<int64_t>(queue_[pri].size());
+ }
+
virtual int64_t GetBytesPerSecond() const override {
return rate_bytes_per_sec_;
}
#include <limits>
#include "db/db_test_util.h"
+#include "port/port.h"
#include "rocksdb/system_clock.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
"Env::IO_TOTAL";
}
+TEST_F(RateLimiterTest, GetTotalPendingRequests) {
+ std::unique_ptr<RateLimiter> limiter(
+ NewGenericRateLimiter(20 /* rate_bytes_per_sec */));
+ for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
+ ASSERT_EQ(limiter->GetTotalPendingRequests(static_cast<Env::IOPriority>(i)),
+ 0);
+ }
+ // This is a variable for making sure the following callback is called
+ // and the assertions in it are indeed excuted
+ bool nonzero_pending_requests_verified_ = false;
+ SyncPoint::GetInstance()->SetCallBack(
+ "GenericRateLimiter::Request:PostEnqueueRequest", [&](void* arg) {
+ port::Mutex* request_mutex = (port::Mutex*)arg;
+ // We temporarily unlock the mutex so that the following
+ // GetTotalPendingRequests() can acquire it
+ request_mutex->Unlock();
+ EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 1);
+ EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0);
+ EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0);
+ EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0);
+ EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 1);
+ // We lock the mutex again so that the request thread can resume running
+ // with the mutex locked
+ request_mutex->Lock();
+ nonzero_pending_requests_verified_ = true;
+ });
+
+ SyncPoint::GetInstance()->EnableProcessing();
+ limiter->Request(20, Env::IO_USER, nullptr /* stats */,
+ RateLimiter::OpType::kWrite);
+ ASSERT_EQ(nonzero_pending_requests_verified_, true);
+ EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 0);
+ EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0);
+ EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0);
+ EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0);
+ EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 0);
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearCallBack(
+ "GenericRateLimiter::Request:PostEnqueueRequest");
+}
+
TEST_F(RateLimiterTest, Modes) {
for (auto mode : {RateLimiter::Mode::kWritesOnly,
RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) {