}
if (stop_) {
+ // It is now in the clean-up of ~GenericRateLimiter().
+ // Therefore any new incoming request will exit from here
+ // and not get satiesfied.
return;
}
do {
bool timedout = false;
- // Leader election, candidates can be:
- // (1) a new incoming request,
- // (2) a previous leader, whose quota has not been not assigned yet due
- // to lower priority
- // (3) a previous waiter at the front of queue, who got notified by
- // previous leader
+
+ // Leader election:
+ // Leader request's duty:
+ // (1) Waiting for the next refill time;
+ // (2) Refilling the bytes and granting requests.
+ //
+ // If the following three conditions are all true for a request,
+ // then the request is selected as a leader:
+ // (1) The request thread acquired the request_mutex_ and is running;
+ // (2) There is currently no leader;
+ // (3) The request sits at the front of a queue.
+ //
+ // If not selected as a leader, the request thread will wait
+ // for one of the following signals to wake up and
+ // compete for the request_mutex_:
+ // (1) Signal from the previous leader to exit since its requested bytes
+ // are fully granted;
+ // (2) Signal from the previous leader to particpate in next-round
+ // leader election;
+ // (3) Signal from rate limiter's destructor as part of the clean-up.
+ //
+ // Therefore, a leader request can only be one of the following types:
+ // (1) a new incoming request placed at the front of a queue;
+ // (2) a previous leader request whose quota has not been not fully
+ // granted yet due to its lower priority, hence still at
+ // the front of a queue;
+ // (3) a waiting request at the front of a queue, which got
+ // signaled by the previous leader to participate in leader election.
if (leader_ == nullptr &&
((!queue_[Env::IO_HIGH].empty() &&
&r == queue_[Env::IO_HIGH].front()) ||
(!queue_[Env::IO_LOW].empty() &&
&r == queue_[Env::IO_LOW].front()))) {
leader_ = &r;
+
int64_t delta = next_refill_us_ - NowMicrosMonotonic();
delta = delta > 0 ? delta : 0;
if (delta == 0) {
timedout = true;
} else {
+ // The leader request thread waits till next_refill_us_
int64_t wait_until = clock_->NowMicros() + delta;
RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);
++num_drains_;
timedout = r.cv.TimedWait(wait_until);
}
} else {
- // Not at the front of queue or an leader has already been elected
r.cv.Wait();
}
- // request_mutex_ is held from now on
if (stop_) {
+ // It is now in the clean-up of ~GenericRateLimiter().
+ // Therefore any woken-up request will exit here,
+ // might or might not has been satiesfied.
--requests_to_wait_;
exit_cv_.Signal();
return;
}
- // Make sure the waken up request is always the header of its queue
+ // Assertion: request thread running through this point is one of the
+ // following in terms of the request type and quota granting situation:
+ // (1) a leader request that is not fully granted with quota and about
+ // to carry out its leader's work;
+ // (2) a non-leader request that got fully granted with quota and is
+ // running to exit;
+ // (3) a non-leader request that is not fully granted with quota and
+ // is running to particpate in next-round leader election.
+ assert((&r == leader_ && !r.granted) || (&r != leader_ && r.granted) ||
+ (&r != leader_ && !r.granted));
+
+ // Assertion: request thread running through this point is one of the
+ // following in terms of its position in queue:
+ // (1) a request got popped off the queue because it is fully granted
+ // with bytes;
+ // (2) a request sits at the front of its queue.
assert(r.granted ||
(!queue_[Env::IO_HIGH].empty() &&
&r == queue_[Env::IO_HIGH].front()) ||
(!queue_[Env::IO_LOW].empty() &&
&r == queue_[Env::IO_LOW].front()));
- assert(leader_ == nullptr ||
- (!queue_[Env::IO_HIGH].empty() &&
- leader_ == queue_[Env::IO_HIGH].front()) ||
- (!queue_[Env::IO_LOW].empty() &&
- leader_ == queue_[Env::IO_LOW].front()));
if (leader_ == &r) {
- // Waken up from TimedWait()
+ // The leader request thread is now running.
+ // It might or might not has been TimedWait().
if (timedout) {
- // Time to do refill!
- Refill();
+ // Time for the leader to do refill and grant bytes to requests
+ RefillBytesAndGrantRequests();
- // Re-elect a new leader regardless. This is to simplify the
- // election handling.
+ // The leader request retires after refilling and granting bytes
+ // regardless. This is to simplify the election handling.
leader_ = nullptr;
- // Notify the header of queue if current leader is going away
if (r.granted) {
- // Current leader already got granted with quota. Notify header
- // of waiting queue to participate next round of election.
+ // The leader request (that was just retired)
+ // already got fully granted with quota and will soon exit
+
+ // Assertion: the fully granted leader request is popped off its queue
assert((queue_[Env::IO_HIGH].empty() ||
&r != queue_[Env::IO_HIGH].front()) &&
(queue_[Env::IO_LOW].empty() ||
&r != queue_[Env::IO_LOW].front()));
+
+ // If there is any remaining requests, the leader request (that was
+ // just retired) makes sure there exists at least one leader candidate
+ // by signaling a front request of a queue to particpate in
+ // next-round leader election
if (!queue_[Env::IO_HIGH].empty()) {
queue_[Env::IO_HIGH].front()->cv.Signal();
} else if (!queue_[Env::IO_LOW].empty()) {
queue_[Env::IO_LOW].front()->cv.Signal();
}
- // Done
+
+ // The leader request (that was just retired) exits
break;
+ } else {
+ // The leader request (that was just retired) is not fully granted
+ // with quota. It will particpate in leader election and claim back
+ // the leader position immediately.
+ assert(!r.granted);
}
} else {
// Spontaneous wake up, need to continue to wait
leader_ = nullptr;
}
} else {
- // Waken up by previous leader:
- // (1) if requested quota is granted, it is done.
- // (2) if requested quota is not granted, this means current thread
- // was picked as a new leader candidate (previous leader got quota).
- // It needs to participate leader election because a new request may
- // come in before this thread gets waken up. So it may actually need
- // to do Wait() again.
- assert(!timedout);
+ // The non-leader request thread is running.
+ // It is one of the following request types:
+ // (1) The request got fully granted with quota and signaled to run to
+ // exit by the previous leader;
+ // (2) The request is not fully granted with quota and signaled to run to
+ // particpate in next-round leader election by the previous leader.
+ // It might or might not become the next-round leader because a new
+ // request may come in and acquire the request_mutex_ before this
+ // request thread does after it was signaled. The new request might
+ // sit at front of a queue and hence become the next-round leader
+ // instead.
+ assert(&r != leader_);
}
} while (!r.granted);
}
-void GenericRateLimiter::Refill() {
- TEST_SYNC_POINT("GenericRateLimiter::Refill");
+void GenericRateLimiter::RefillBytesAndGrantRequests() {
+ TEST_SYNC_POINT("GenericRateLimiter::RefillBytesAndGrantRequests");
next_refill_us_ = NowMicrosMonotonic() + refill_period_us_;
// Carry over the left over quota from the last period
auto refill_bytes_per_period =
while (!queue->empty()) {
auto* next_req = queue->front();
if (available_bytes_ < next_req->request_bytes) {
- // avoid starvation
+ // Grant partial request_bytes to avoid starvation of requests
+ // that become asking for more bytes than available_bytes_
+ // due to dynamically reduced rate limiter's bytes_per_second that
+ // leads to reduced refill_bytes_per_period hence available_bytes_
next_req->request_bytes -= available_bytes_;
available_bytes_ = 0;
break;
next_req->granted = true;
if (next_req != leader_) {
- // Quota granted, signal the thread
+ // Quota granted, signal the thread to exit
next_req->cv.Signal();
}
}
auto* env = Env::Default();
struct Arg {
Arg(int32_t _target_rate, int _burst)
- : limiter(NewGenericRateLimiter(_target_rate, 100 * 1000, 10)),
- request_size(_target_rate / 10),
+ : limiter(NewGenericRateLimiter(_target_rate /* rate_bytes_per_sec */,
+ 100 * 1000 /* refill_period_us */,
+ 10 /* fairness */)),
+ request_size(_target_rate /
+ 10 /* refill period here is 1/10 second */),
burst(_burst) {}
std::unique_ptr<RateLimiter> limiter;
int32_t request_size;
{{"GenericRateLimiter::Request",
"RateLimiterTest::LimitChangeTest:changeLimitStart"},
{"RateLimiterTest::LimitChangeTest:changeLimitEnd",
- "GenericRateLimiter::Refill"}});
+ "GenericRateLimiter::RefillBytesAndGrantRequests"}});
Arg arg(target, Env::IO_HIGH, limiter);
// The idea behind is to start a request first, then before it refills,
// update limit to a different value (2X/0.5X). No starvation should
true /* auto_tuned */));
// Use callback to advance time because we need to advance (1) after Request()
- // has determined the bytes are not available; and (2) before Refill()
- // computes the next refill time (ensuring refill time in the future allows
- // the next request to drain the rate limiter).
+ // has determined the bytes are not available; and (2) before
+ // RefillBytesAndGrantRequests() computes the next refill time (ensuring
+ // refill time in the future allows the next request to drain the rate
+ // limiter).
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "GenericRateLimiter::Refill", [&](void* /*arg*/) {
+ "GenericRateLimiter::RefillBytesAndGrantRequests", [&](void* /*arg*/) {
special_env.SleepForMicroseconds(static_cast<int>(
std::chrono::microseconds(kTimePerRefill).count()));
});