From: hx235 <83968999+hx235@users.noreply.github.com> Date: Wed, 4 Aug 2021 17:42:49 +0000 (-0700) Subject: Improve rate limiter implementation's readability (#8596) X-Git-Tag: v6.24.2~58 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=dbe3810c74a0103919ccbcccee52feba7e652866;p=rocksdb.git Improve rate limiter implementation's readability (#8596) Summary: Context: As need for new feature of resource management using RocksDB's rate limiter like [https://github.com/facebook/rocksdb/issues/8595](https://github.com/facebook/rocksdb/pull/8595) arises, it is about time to re-learn our rate limiter and make this learning process easier for others by improving its readability. The comment/assertion/one extra else-branch are added based on my best understanding toward the rate_limiter.cc and rate_limiter_test.cc up to date after giving it a hard read. - Add code comments/assertion/one extra else-branch (that is not affecting existing behavior, see PR comment) to describe how leader-election works under multi-thread settings in GenericRateLimiter::Request() - Add code comments to describe a non-obvious trick during clean-up of rate limiter destructor - Add code comments to explain more about the starvation being fixed in GenericRateLimiter::Refill() through partial byte-granting - Add code comments to the rate limiter's setup in a complicated unit test in rate_limiter_test Pull Request resolved: https://github.com/facebook/rocksdb/pull/8596 Test Plan: - passed existing rate_limiter_test.cc Reviewed By: ajkr Differential Revision: D29982590 Pulled By: hx235 fbshipit-source-id: c3592986bb5b0c90d8229fe44f425251ec7e8a0a --- diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index cefd9e299..70bdac026 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -118,6 +118,9 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, } if (stop_) { + // It is now in the clean-up of ~GenericRateLimiter(). + // Therefore any new incoming request will exit from here + // and not get satiesfied. return; } @@ -138,77 +141,125 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, do { bool timedout = false; - // Leader election, candidates can be: - // (1) a new incoming request, - // (2) a previous leader, whose quota has not been not assigned yet due - // to lower priority - // (3) a previous waiter at the front of queue, who got notified by - // previous leader + + // Leader election: + // Leader request's duty: + // (1) Waiting for the next refill time; + // (2) Refilling the bytes and granting requests. + // + // If the following three conditions are all true for a request, + // then the request is selected as a leader: + // (1) The request thread acquired the request_mutex_ and is running; + // (2) There is currently no leader; + // (3) The request sits at the front of a queue. + // + // If not selected as a leader, the request thread will wait + // for one of the following signals to wake up and + // compete for the request_mutex_: + // (1) Signal from the previous leader to exit since its requested bytes + // are fully granted; + // (2) Signal from the previous leader to particpate in next-round + // leader election; + // (3) Signal from rate limiter's destructor as part of the clean-up. + // + // Therefore, a leader request can only be one of the following types: + // (1) a new incoming request placed at the front of a queue; + // (2) a previous leader request whose quota has not been not fully + // granted yet due to its lower priority, hence still at + // the front of a queue; + // (3) a waiting request at the front of a queue, which got + // signaled by the previous leader to participate in leader election. if (leader_ == nullptr && ((!queue_[Env::IO_HIGH].empty() && &r == queue_[Env::IO_HIGH].front()) || (!queue_[Env::IO_LOW].empty() && &r == queue_[Env::IO_LOW].front()))) { leader_ = &r; + int64_t delta = next_refill_us_ - NowMicrosMonotonic(); delta = delta > 0 ? delta : 0; if (delta == 0) { timedout = true; } else { + // The leader request thread waits till next_refill_us_ int64_t wait_until = clock_->NowMicros() + delta; RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS); ++num_drains_; timedout = r.cv.TimedWait(wait_until); } } else { - // Not at the front of queue or an leader has already been elected r.cv.Wait(); } - // request_mutex_ is held from now on if (stop_) { + // It is now in the clean-up of ~GenericRateLimiter(). + // Therefore any woken-up request will exit here, + // might or might not has been satiesfied. --requests_to_wait_; exit_cv_.Signal(); return; } - // Make sure the waken up request is always the header of its queue + // Assertion: request thread running through this point is one of the + // following in terms of the request type and quota granting situation: + // (1) a leader request that is not fully granted with quota and about + // to carry out its leader's work; + // (2) a non-leader request that got fully granted with quota and is + // running to exit; + // (3) a non-leader request that is not fully granted with quota and + // is running to particpate in next-round leader election. + assert((&r == leader_ && !r.granted) || (&r != leader_ && r.granted) || + (&r != leader_ && !r.granted)); + + // Assertion: request thread running through this point is one of the + // following in terms of its position in queue: + // (1) a request got popped off the queue because it is fully granted + // with bytes; + // (2) a request sits at the front of its queue. assert(r.granted || (!queue_[Env::IO_HIGH].empty() && &r == queue_[Env::IO_HIGH].front()) || (!queue_[Env::IO_LOW].empty() && &r == queue_[Env::IO_LOW].front())); - assert(leader_ == nullptr || - (!queue_[Env::IO_HIGH].empty() && - leader_ == queue_[Env::IO_HIGH].front()) || - (!queue_[Env::IO_LOW].empty() && - leader_ == queue_[Env::IO_LOW].front())); if (leader_ == &r) { - // Waken up from TimedWait() + // The leader request thread is now running. + // It might or might not has been TimedWait(). if (timedout) { - // Time to do refill! - Refill(); + // Time for the leader to do refill and grant bytes to requests + RefillBytesAndGrantRequests(); - // Re-elect a new leader regardless. This is to simplify the - // election handling. + // The leader request retires after refilling and granting bytes + // regardless. This is to simplify the election handling. leader_ = nullptr; - // Notify the header of queue if current leader is going away if (r.granted) { - // Current leader already got granted with quota. Notify header - // of waiting queue to participate next round of election. + // The leader request (that was just retired) + // already got fully granted with quota and will soon exit + + // Assertion: the fully granted leader request is popped off its queue assert((queue_[Env::IO_HIGH].empty() || &r != queue_[Env::IO_HIGH].front()) && (queue_[Env::IO_LOW].empty() || &r != queue_[Env::IO_LOW].front())); + + // If there is any remaining requests, the leader request (that was + // just retired) makes sure there exists at least one leader candidate + // by signaling a front request of a queue to particpate in + // next-round leader election if (!queue_[Env::IO_HIGH].empty()) { queue_[Env::IO_HIGH].front()->cv.Signal(); } else if (!queue_[Env::IO_LOW].empty()) { queue_[Env::IO_LOW].front()->cv.Signal(); } - // Done + + // The leader request (that was just retired) exits break; + } else { + // The leader request (that was just retired) is not fully granted + // with quota. It will particpate in leader election and claim back + // the leader position immediately. + assert(!r.granted); } } else { // Spontaneous wake up, need to continue to wait @@ -216,20 +267,24 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, leader_ = nullptr; } } else { - // Waken up by previous leader: - // (1) if requested quota is granted, it is done. - // (2) if requested quota is not granted, this means current thread - // was picked as a new leader candidate (previous leader got quota). - // It needs to participate leader election because a new request may - // come in before this thread gets waken up. So it may actually need - // to do Wait() again. - assert(!timedout); + // The non-leader request thread is running. + // It is one of the following request types: + // (1) The request got fully granted with quota and signaled to run to + // exit by the previous leader; + // (2) The request is not fully granted with quota and signaled to run to + // particpate in next-round leader election by the previous leader. + // It might or might not become the next-round leader because a new + // request may come in and acquire the request_mutex_ before this + // request thread does after it was signaled. The new request might + // sit at front of a queue and hence become the next-round leader + // instead. + assert(&r != leader_); } } while (!r.granted); } -void GenericRateLimiter::Refill() { - TEST_SYNC_POINT("GenericRateLimiter::Refill"); +void GenericRateLimiter::RefillBytesAndGrantRequests() { + TEST_SYNC_POINT("GenericRateLimiter::RefillBytesAndGrantRequests"); next_refill_us_ = NowMicrosMonotonic() + refill_period_us_; // Carry over the left over quota from the last period auto refill_bytes_per_period = @@ -245,7 +300,10 @@ void GenericRateLimiter::Refill() { while (!queue->empty()) { auto* next_req = queue->front(); if (available_bytes_ < next_req->request_bytes) { - // avoid starvation + // Grant partial request_bytes to avoid starvation of requests + // that become asking for more bytes than available_bytes_ + // due to dynamically reduced rate limiter's bytes_per_second that + // leads to reduced refill_bytes_per_period hence available_bytes_ next_req->request_bytes -= available_bytes_; available_bytes_ = 0; break; @@ -257,7 +315,7 @@ void GenericRateLimiter::Refill() { next_req->granted = true; if (next_req != leader_) { - // Quota granted, signal the thread + // Quota granted, signal the thread to exit next_req->cv.Signal(); } } diff --git a/util/rate_limiter.h b/util/rate_limiter.h index ec391162b..58342a097 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -70,7 +70,7 @@ class GenericRateLimiter : public RateLimiter { } private: - void Refill(); + void RefillBytesAndGrantRequests(); int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec); Status Tune(); diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index a979dfd5c..04625964c 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -64,8 +64,11 @@ TEST_F(RateLimiterTest, Rate) { auto* env = Env::Default(); struct Arg { Arg(int32_t _target_rate, int _burst) - : limiter(NewGenericRateLimiter(_target_rate, 100 * 1000, 10)), - request_size(_target_rate / 10), + : limiter(NewGenericRateLimiter(_target_rate /* rate_bytes_per_sec */, + 100 * 1000 /* refill_period_us */, + 10 /* fairness */)), + request_size(_target_rate / + 10 /* refill period here is 1/10 second */), burst(_burst) {} std::unique_ptr limiter; int32_t request_size; @@ -175,7 +178,7 @@ TEST_F(RateLimiterTest, LimitChangeTest) { {{"GenericRateLimiter::Request", "RateLimiterTest::LimitChangeTest:changeLimitStart"}, {"RateLimiterTest::LimitChangeTest:changeLimitEnd", - "GenericRateLimiter::Refill"}}); + "GenericRateLimiter::RefillBytesAndGrantRequests"}}); Arg arg(target, Env::IO_HIGH, limiter); // The idea behind is to start a request first, then before it refills, // update limit to a different value (2X/0.5X). No starvation should @@ -209,11 +212,12 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) { true /* auto_tuned */)); // Use callback to advance time because we need to advance (1) after Request() - // has determined the bytes are not available; and (2) before Refill() - // computes the next refill time (ensuring refill time in the future allows - // the next request to drain the rate limiter). + // has determined the bytes are not available; and (2) before + // RefillBytesAndGrantRequests() computes the next refill time (ensuring + // refill time in the future allows the next request to drain the rate + // limiter). ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "GenericRateLimiter::Refill", [&](void* /*arg*/) { + "GenericRateLimiter::RefillBytesAndGrantRequests", [&](void* /*arg*/) { special_env.SleepForMicroseconds(static_cast( std::chrono::microseconds(kTimePerRefill).count())); });