#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
+#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE {
const Env::IOPriority pri = Env::IO_TOTAL) const = 0;
// Total # of requests that are pending for bytes in rate limiter
- // For convenience, this function is implemented by the RateLimiter returned
- // by NewGenericRateLimiter but is not required by RocksDB. The default
- // implementation indicates "not supported".
- virtual int64_t GetTotalPendingRequests(
+ // For convenience, this function is supported by the RateLimiter returned
+ // by NewGenericRateLimiter but is not required by RocksDB.
+ //
+ // REQUIRED: total_pending_request != nullptr
+ virtual Status GetTotalPendingRequests(
+ int64_t* total_pending_requests,
const Env::IOPriority pri = Env::IO_TOTAL) const {
- assert(false);
+ assert(total_pending_requests != nullptr);
+ (void)total_pending_requests;
(void)pri;
- return -1;
+ return Status::NotSupported();
}
virtual int64_t GetBytesPerSecond() const = 0;
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/rate_limiter.h"
+#include "rocksdb/status.h"
#include "rocksdb/system_clock.h"
#include "util/mutexlock.h"
#include "util/random.h"
return total_requests_[pri];
}
- virtual int64_t GetTotalPendingRequests(
+ virtual Status GetTotalPendingRequests(
+ int64_t* total_pending_requests,
const Env::IOPriority pri = Env::IO_TOTAL) const override {
+ assert(total_pending_requests != nullptr);
MutexLock g(&request_mutex_);
if (pri == Env::IO_TOTAL) {
int64_t total_pending_requests_sum = 0;
for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
total_pending_requests_sum += static_cast<int64_t>(queue_[i].size());
}
- return total_pending_requests_sum;
+ *total_pending_requests = total_pending_requests_sum;
+ } else {
+ *total_pending_requests = static_cast<int64_t>(queue_[pri].size());
}
- return static_cast<int64_t>(queue_[pri].size());
+ return Status::OK();
}
virtual int64_t GetBytesPerSecond() const override {
std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(
200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
10 /* fairness */));
+ int64_t total_pending_requests = 0;
for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
- ASSERT_EQ(limiter->GetTotalPendingRequests(static_cast<Env::IOPriority>(i)),
- 0);
+ ASSERT_OK(limiter->GetTotalPendingRequests(
+ &total_pending_requests, static_cast<Env::IOPriority>(i)));
+ ASSERT_EQ(total_pending_requests, 0);
}
// This is a variable for making sure the following callback is called
// and the assertions in it are indeed excuted
// We temporarily unlock the mutex so that the following
// GetTotalPendingRequests() can acquire it
request_mutex->Unlock();
- EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 1);
- EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0);
- EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0);
- EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0);
- EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 1);
+ for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
+ EXPECT_OK(limiter->GetTotalPendingRequests(
+ &total_pending_requests, static_cast<Env::IOPriority>(i)))
+ << "Failed to return total pending requests for priority level = "
+ << static_cast<Env::IOPriority>(i);
+ if (i == Env::IO_USER || i == Env::IO_TOTAL) {
+ EXPECT_EQ(total_pending_requests, 1)
+ << "Failed to correctly return total pending requests for "
+ "priority level = "
+ << static_cast<Env::IOPriority>(i);
+ } else {
+ EXPECT_EQ(total_pending_requests, 0)
+ << "Failed to correctly return total pending requests for "
+ "priority level = "
+ << static_cast<Env::IOPriority>(i);
+ }
+ }
// We lock the mutex again so that the request thread can resume running
// with the mutex locked
request_mutex->Lock();
limiter->Request(200, Env::IO_USER, nullptr /* stats */,
RateLimiter::OpType::kWrite);
ASSERT_EQ(nonzero_pending_requests_verified, true);
- EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 0);
- EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0);
- EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0);
- EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0);
- EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 0);
+ for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
+ EXPECT_OK(limiter->GetTotalPendingRequests(&total_pending_requests,
+ static_cast<Env::IOPriority>(i)))
+ << "Failed to return total pending requests for priority level = "
+ << static_cast<Env::IOPriority>(i);
+ EXPECT_EQ(total_pending_requests, 0)
+ << "Failed to correctly return total pending requests for priority "
+ "level = "
+ << static_cast<Env::IOPriority>(i);
+ }
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearCallBack(
"GenericRateLimiter::Request:PostEnqueueRequest");