]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Improve rate limiter implementation's readability (#8596)
authorhx235 <83968999+hx235@users.noreply.github.com>
Wed, 4 Aug 2021 17:42:49 +0000 (10:42 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Wed, 4 Aug 2021 17:43:47 +0000 (10:43 -0700)
Summary:
Context:
As need for new feature of resource management using RocksDB's rate limiter like [https://github.com/facebook/rocksdb/issues/8595](https://github.com/facebook/rocksdb/pull/8595) arises, it is about time to re-learn our rate limiter and make this learning process easier for others by improving its readability. The comment/assertion/one extra else-branch are added based on my best understanding toward the rate_limiter.cc and rate_limiter_test.cc up to date after giving it a hard read.
- Add code comments/assertion/one extra else-branch (that is not affecting existing behavior, see PR comment) to describe how leader-election works under multi-thread settings in GenericRateLimiter::Request()
- Add code comments to describe a non-obvious trick during clean-up of rate limiter destructor
- Add code comments to explain more about the starvation being fixed in GenericRateLimiter::Refill() through partial byte-granting
- Add code comments to the rate limiter's setup in a complicated unit test in rate_limiter_test

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8596

Test Plan: - passed existing rate_limiter_test.cc

Reviewed By: ajkr

Differential Revision: D29982590

Pulled By: hx235

fbshipit-source-id: c3592986bb5b0c90d8229fe44f425251ec7e8a0a

util/rate_limiter.cc
util/rate_limiter.h
util/rate_limiter_test.cc

index cefd9e299146d58a77b0a0ec7b501a02a43ef95a..70bdac0265d3963efdbb5714c89b0c5c278f1756 100644 (file)
@@ -118,6 +118,9 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
   }
 
   if (stop_) {
+    // It is now in the clean-up of ~GenericRateLimiter().
+    // Therefore any new incoming request will exit from here
+    // and not get satiesfied.
     return;
   }
 
@@ -138,77 +141,125 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
 
   do {
     bool timedout = false;
-    // Leader election, candidates can be:
-    // (1) a new incoming request,
-    // (2) a previous leader, whose quota has not been not assigned yet due
-    //     to lower priority
-    // (3) a previous waiter at the front of queue, who got notified by
-    //     previous leader
+
+    // Leader election:
+    //  Leader request's duty:
+    //  (1) Waiting for the next refill time;
+    //  (2) Refilling the bytes and granting requests.
+    //
+    //  If the following three conditions are all true for a request,
+    //  then the request is selected as a leader:
+    //  (1) The request thread acquired the request_mutex_ and is running;
+    //  (2) There is currently no leader;
+    //  (3) The request sits at the front of a queue.
+    //
+    //  If not selected as a leader, the request thread will wait
+    //  for one of the following signals to wake up and
+    //  compete for the request_mutex_:
+    //  (1) Signal from the previous leader to exit since its requested bytes
+    //      are fully granted;
+    //  (2) Signal from the previous leader to particpate in next-round
+    //      leader election;
+    //  (3) Signal from rate limiter's destructor as part of the clean-up.
+    //
+    //  Therefore, a leader request can only be one of the following types:
+    //  (1) a new incoming request placed at the front of a queue;
+    //  (2) a previous leader request whose quota has not been not fully
+    //      granted yet due to its lower priority, hence still at
+    //      the front of a queue;
+    //  (3) a waiting request at the front of a queue, which got
+    //      signaled by the previous leader to participate in leader election.
     if (leader_ == nullptr &&
         ((!queue_[Env::IO_HIGH].empty() &&
             &r == queue_[Env::IO_HIGH].front()) ||
          (!queue_[Env::IO_LOW].empty() &&
             &r == queue_[Env::IO_LOW].front()))) {
       leader_ = &r;
+
       int64_t delta = next_refill_us_ - NowMicrosMonotonic();
       delta = delta > 0 ? delta : 0;
       if (delta == 0) {
         timedout = true;
       } else {
+        // The leader request thread waits till next_refill_us_
         int64_t wait_until = clock_->NowMicros() + delta;
         RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);
         ++num_drains_;
         timedout = r.cv.TimedWait(wait_until);
       }
     } else {
-      // Not at the front of queue or an leader has already been elected
       r.cv.Wait();
     }
 
-    // request_mutex_ is held from now on
     if (stop_) {
+      // It is now in the clean-up of ~GenericRateLimiter().
+      // Therefore any woken-up request will exit here,
+      // might or might not has been satiesfied.
       --requests_to_wait_;
       exit_cv_.Signal();
       return;
     }
 
-    // Make sure the waken up request is always the header of its queue
+    // Assertion: request thread running through this point is one of the
+    // following in terms of the request type and quota granting situation:
+    // (1) a leader request that is not fully granted with quota and about
+    //     to carry out its leader's work;
+    // (2) a non-leader request that got fully granted with quota and is
+    //     running to exit;
+    // (3) a non-leader request that is not fully granted with quota and
+    //     is running to particpate in next-round leader election.
+    assert((&r == leader_ && !r.granted) || (&r != leader_ && r.granted) ||
+           (&r != leader_ && !r.granted));
+
+    // Assertion: request thread running through this point is one of the
+    // following in terms of its position in queue:
+    // (1) a request got popped off the queue because it is fully granted
+    //     with bytes;
+    // (2) a request sits at the front of its queue.
     assert(r.granted ||
            (!queue_[Env::IO_HIGH].empty() &&
             &r == queue_[Env::IO_HIGH].front()) ||
            (!queue_[Env::IO_LOW].empty() &&
             &r == queue_[Env::IO_LOW].front()));
-    assert(leader_ == nullptr ||
-           (!queue_[Env::IO_HIGH].empty() &&
-            leader_ == queue_[Env::IO_HIGH].front()) ||
-           (!queue_[Env::IO_LOW].empty() &&
-            leader_ == queue_[Env::IO_LOW].front()));
 
     if (leader_ == &r) {
-      // Waken up from TimedWait()
+      // The leader request thread is now running.
+      // It might or might not has been TimedWait().
       if (timedout) {
-        // Time to do refill!
-        Refill();
+        // Time for the leader to do refill and grant bytes to requests
+        RefillBytesAndGrantRequests();
 
-        // Re-elect a new leader regardless. This is to simplify the
-        // election handling.
+        // The leader request retires after refilling and granting bytes
+        // regardless. This is to simplify the election handling.
         leader_ = nullptr;
 
-        // Notify the header of queue if current leader is going away
         if (r.granted) {
-          // Current leader already got granted with quota. Notify header
-          // of waiting queue to participate next round of election.
+          // The leader request (that was just retired)
+          // already got fully granted with quota and will soon exit
+
+          // Assertion: the fully granted leader request is popped off its queue
           assert((queue_[Env::IO_HIGH].empty() ||
                     &r != queue_[Env::IO_HIGH].front()) &&
                  (queue_[Env::IO_LOW].empty() ||
                     &r != queue_[Env::IO_LOW].front()));
+
+          // If there is any remaining requests, the leader request (that was
+          // just retired) makes sure there exists at least one leader candidate
+          // by signaling a front request of a queue to particpate in
+          // next-round leader election
           if (!queue_[Env::IO_HIGH].empty()) {
             queue_[Env::IO_HIGH].front()->cv.Signal();
           } else if (!queue_[Env::IO_LOW].empty()) {
             queue_[Env::IO_LOW].front()->cv.Signal();
           }
-          // Done
+
+          // The leader request (that was just retired) exits
           break;
+        } else {
+          // The leader request (that was just retired) is not fully granted
+          // with quota. It will particpate in leader election and claim back
+          // the leader position immediately.
+          assert(!r.granted);
         }
       } else {
         // Spontaneous wake up, need to continue to wait
@@ -216,20 +267,24 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
         leader_ = nullptr;
       }
     } else {
-      // Waken up by previous leader:
-      // (1) if requested quota is granted, it is done.
-      // (2) if requested quota is not granted, this means current thread
-      // was picked as a new leader candidate (previous leader got quota).
-      // It needs to participate leader election because a new request may
-      // come in before this thread gets waken up. So it may actually need
-      // to do Wait() again.
-      assert(!timedout);
+      // The non-leader request thread is running.
+      // It is one of the following request types:
+      // (1) The request got fully granted with quota and signaled to run to
+      //     exit by the previous leader;
+      // (2) The request is not fully granted with quota and signaled to run to
+      //     particpate in next-round leader election by the previous leader.
+      //     It might or might not become the next-round leader because a new
+      //     request may come in and acquire the request_mutex_ before this
+      //     request thread does after it was signaled. The new request might
+      //     sit at front of a queue and hence become the next-round leader
+      //     instead.
+      assert(&r != leader_);
     }
   } while (!r.granted);
 }
 
-void GenericRateLimiter::Refill() {
-  TEST_SYNC_POINT("GenericRateLimiter::Refill");
+void GenericRateLimiter::RefillBytesAndGrantRequests() {
+  TEST_SYNC_POINT("GenericRateLimiter::RefillBytesAndGrantRequests");
   next_refill_us_ = NowMicrosMonotonic() + refill_period_us_;
   // Carry over the left over quota from the last period
   auto refill_bytes_per_period =
@@ -245,7 +300,10 @@ void GenericRateLimiter::Refill() {
     while (!queue->empty()) {
       auto* next_req = queue->front();
       if (available_bytes_ < next_req->request_bytes) {
-        // avoid starvation
+        // Grant partial request_bytes to avoid starvation of requests
+        // that become asking for more bytes than available_bytes_
+        // due to dynamically reduced rate limiter's bytes_per_second that
+        // leads to reduced refill_bytes_per_period hence available_bytes_
         next_req->request_bytes -= available_bytes_;
         available_bytes_ = 0;
         break;
@@ -257,7 +315,7 @@ void GenericRateLimiter::Refill() {
 
       next_req->granted = true;
       if (next_req != leader_) {
-        // Quota granted, signal the thread
+        // Quota granted, signal the thread to exit
         next_req->cv.Signal();
       }
     }
index ec391162b348145ed2a7d821bdf923cc98060b3b..58342a097b9b4a6041584246bca09aaf5fba675f 100644 (file)
@@ -70,7 +70,7 @@ class GenericRateLimiter : public RateLimiter {
   }
 
  private:
-  void Refill();
+  void RefillBytesAndGrantRequests();
   int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec);
   Status Tune();
 
index a979dfd5cb0491997e17f4f20c49bfcba34a77a2..04625964c6e21ce245d3a45eeeb613aed30a8b34 100644 (file)
@@ -64,8 +64,11 @@ TEST_F(RateLimiterTest, Rate) {
   auto* env = Env::Default();
   struct Arg {
     Arg(int32_t _target_rate, int _burst)
-        : limiter(NewGenericRateLimiter(_target_rate, 100 * 1000, 10)),
-          request_size(_target_rate / 10),
+        : limiter(NewGenericRateLimiter(_target_rate /* rate_bytes_per_sec */,
+                                        100 * 1000 /* refill_period_us */,
+                                        10 /* fairness */)),
+          request_size(_target_rate /
+                       10 /* refill period here is 1/10 second */),
           burst(_burst) {}
     std::unique_ptr<RateLimiter> limiter;
     int32_t request_size;
@@ -175,7 +178,7 @@ TEST_F(RateLimiterTest, LimitChangeTest) {
           {{"GenericRateLimiter::Request",
             "RateLimiterTest::LimitChangeTest:changeLimitStart"},
            {"RateLimiterTest::LimitChangeTest:changeLimitEnd",
-            "GenericRateLimiter::Refill"}});
+            "GenericRateLimiter::RefillBytesAndGrantRequests"}});
       Arg arg(target, Env::IO_HIGH, limiter);
       // The idea behind is to start a request first, then before it refills,
       // update limit to a different value (2X/0.5X). No starvation should
@@ -209,11 +212,12 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
       true /* auto_tuned */));
 
   // Use callback to advance time because we need to advance (1) after Request()
-  // has determined the bytes are not available; and (2) before Refill()
-  // computes the next refill time (ensuring refill time in the future allows
-  // the next request to drain the rate limiter).
+  // has determined the bytes are not available; and (2) before
+  // RefillBytesAndGrantRequests() computes the next refill time (ensuring
+  // refill time in the future allows the next request to drain the rate
+  // limiter).
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
-      "GenericRateLimiter::Refill", [&](void* /*arg*/) {
+      "GenericRateLimiter::RefillBytesAndGrantRequests", [&](void* /*arg*/) {
         special_env.SleepForMicroseconds(static_cast<int>(
             std::chrono::microseconds(kTimePerRefill).count()));
       });