]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Make RateLimiter not Customizable (#10378)
authorAndrew Kryczka <andrewkr@fb.com>
Mon, 18 Jul 2022 21:48:42 +0000 (14:48 -0700)
committerAndrew Kryczka <andrewkr@fb.com>
Tue, 19 Jul 2022 15:48:58 +0000 (08:48 -0700)
Summary:
(PR created for informational/testing purposes only.)

- Fixes lost dynamic updates to GenericRateLimiter bandwidth using `SetBytesPerSecond()`
- Benefit over #10374 is eliminating race conditions with Configurable framework.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10378

Reviewed By: pdillinger

Differential Revision: D37914865

fbshipit-source-id: d4f566d60ec9726d26932388c61671adf0ee0f30

HISTORY.md
db/db_test.cc
include/rocksdb/rate_limiter.h
options/customizable_test.cc
options/db_options.cc
util/rate_limiter.cc
util/rate_limiter.h
util/rate_limiter_test.cc
utilities/backup/backup_engine.cc

index a58995c502d6da1699a40b6df1e40f0ac819b913..24e760302bc947f7af12e76ffa2640a24e20c40e 100644 (file)
@@ -1,4 +1,11 @@
 # Rocksdb Change Log
+## Unreleased
+### Public API changes
+* Removed Customizable support for RateLimiter and removed its CreateFromString() and Type() functions.
+
+### Bug Fixes
+* Fix a bug where `GenericRateLimiter` could revert the bandwidth set dynamically using `SetBytesPerSecond()` when a user configures a structure enclosing it, e.g., using `GetOptionsFromString()` to configure an `Options` that references an existing `RateLimiter` object.
+
 ## 7.4.3 (07/13/2022)
 ### Behavior Changes
 * For track_and_verify_wals_in_manifest, revert to the original behavior before #10087: syncing of live WAL file is not tracked, and we track only the synced sizes of **closed** WALs. (PR #10330).
index a47e8fdb4832e6b8418c70c97fc77f0820058e12..25371d68673b8118bdc544925311477b46838b36 100644 (file)
@@ -4092,9 +4092,6 @@ class MockedRateLimiterWithNoOptionalAPIImpl : public RateLimiter {
 
   ~MockedRateLimiterWithNoOptionalAPIImpl() override {}
 
-  const char* Name() const override {
-    return "MockedRateLimiterWithNoOptionalAPI";
-  }
   void SetBytesPerSecond(int64_t bytes_per_second) override {
     (void)bytes_per_second;
   }
index 203d73dcfa252a50b609d1d168124740b1f90417..9cad6edf4aaee81dd562826f2b26e701ef611826 100644 (file)
@@ -9,7 +9,6 @@
 
 #pragma once
 
-#include "rocksdb/customizable.h"
 #include "rocksdb/env.h"
 #include "rocksdb/statistics.h"
 #include "rocksdb/status.h"
@@ -19,7 +18,7 @@ namespace ROCKSDB_NAMESPACE {
 // Exceptions MUST NOT propagate out of overridden functions into RocksDB,
 // because RocksDB is not exception-safe. This could cause undefined behavior
 // including data loss, unreported corruption, deadlocks, and more.
-class RateLimiter : public Customizable {
+class RateLimiter {
  public:
   enum class OpType {
     kRead,
@@ -32,20 +31,11 @@ class RateLimiter : public Customizable {
     kAllIo,
   };
 
-  static const char* Type() { return "RateLimiter"; }
-  static Status CreateFromString(const ConfigOptions& options,
-                                 const std::string& value,
-                                 std::shared_ptr<RateLimiter>* result);
-
   // For API compatibility, default to rate-limiting writes only.
-  explicit RateLimiter(Mode mode = Mode::kWritesOnly);
+  explicit RateLimiter(Mode mode = Mode::kWritesOnly) : mode_(mode) {}
 
   virtual ~RateLimiter() {}
 
-  // Deprecated. Will be removed in a major release. Derived classes
-  // should implement this method.
-  virtual const char* Name() const override { return ""; }
-
   // This API allows user to dynamically change rate limiter's bytes per second.
   // REQUIRED: bytes_per_second > 0
   virtual void SetBytesPerSecond(int64_t bytes_per_second) = 0;
@@ -135,7 +125,7 @@ class RateLimiter : public Customizable {
   Mode GetMode() { return mode_; }
 
  private:
-  Mode mode_;
+  const Mode mode_;
 };
 
 // Create a RateLimiter object, which can be shared among RocksDB instances to
index 61c0eb36876b973326cf3e6a49d9a0d7bb8574a5..e7ab2cd08f9469bf47e7b0b995592b0b24dd2273 100644 (file)
@@ -27,7 +27,6 @@
 #include "rocksdb/filter_policy.h"
 #include "rocksdb/flush_block_policy.h"
 #include "rocksdb/memory_allocator.h"
-#include "rocksdb/rate_limiter.h"
 #include "rocksdb/secondary_cache.h"
 #include "rocksdb/slice_transform.h"
 #include "rocksdb/sst_partitioner.h"
@@ -42,7 +41,6 @@
 #include "test_util/testharness.h"
 #include "test_util/testutil.h"
 #include "util/file_checksum_helper.h"
-#include "util/rate_limiter.h"
 #include "util/string_util.h"
 #include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h"
 #include "utilities/memory_allocators.h"
@@ -1472,21 +1470,6 @@ class MockFileChecksumGenFactory : public FileChecksumGenFactory {
   }
 };
 
-class MockRateLimiter : public RateLimiter {
- public:
-  static const char* kClassName() { return "MockRateLimiter"; }
-  const char* Name() const override { return kClassName(); }
-  void SetBytesPerSecond(int64_t /*bytes_per_second*/) override {}
-  int64_t GetBytesPerSecond() const override { return 0; }
-  int64_t GetSingleBurstBytes() const override { return 0; }
-  int64_t GetTotalBytesThrough(const Env::IOPriority /*pri*/) const override {
-    return 0;
-  }
-  int64_t GetTotalRequests(const Env::IOPriority /*pri*/) const override {
-    return 0;
-  }
-};
-
 class MockFilterPolicy : public FilterPolicy {
  public:
   static const char* kClassName() { return "MockFilterPolicy"; }
@@ -1618,14 +1601,6 @@ static int RegisterLocalObjects(ObjectLibrary& library,
         return guard->get();
       });
 
-  library.AddFactory<RateLimiter>(
-      MockRateLimiter::kClassName(),
-      [](const std::string& /*uri*/, std::unique_ptr<RateLimiter>* guard,
-         std::string* /* errmsg */) {
-        guard->reset(new MockRateLimiter());
-        return guard->get();
-      });
-
   library.AddFactory<const FilterPolicy>(
       MockFilterPolicy::kClassName(),
       [](const std::string& /*uri*/, std::unique_ptr<const FilterPolicy>* guard,
@@ -2149,37 +2124,6 @@ TEST_F(LoadCustomizableTest, LoadMemoryAllocatorTest) {
   }
 }
 
-TEST_F(LoadCustomizableTest, LoadRateLimiterTest) {
-#ifndef ROCKSDB_LITE
-  ASSERT_OK(TestSharedBuiltins<RateLimiter>(MockRateLimiter::kClassName(),
-                                            GenericRateLimiter::kClassName()));
-#else
-  ASSERT_OK(TestSharedBuiltins<RateLimiter>(MockRateLimiter::kClassName(), ""));
-#endif  // ROCKSDB_LITE
-
-  std::shared_ptr<RateLimiter> result;
-  ASSERT_OK(RateLimiter::CreateFromString(
-      config_options_, std::string(GenericRateLimiter::kClassName()) + ":1234",
-      &result));
-  ASSERT_NE(result, nullptr);
-  ASSERT_TRUE(result->IsInstanceOf(GenericRateLimiter::kClassName()));
-#ifndef ROCKSDB_LITE
-  ASSERT_OK(GetDBOptionsFromString(
-      config_options_, db_opts_,
-      std::string("rate_limiter=") + GenericRateLimiter::kClassName(),
-      &db_opts_));
-  ASSERT_NE(db_opts_.rate_limiter, nullptr);
-  if (RegisterTests("Test")) {
-    ExpectCreateShared<RateLimiter>(MockRateLimiter::kClassName());
-    ASSERT_OK(GetDBOptionsFromString(
-        config_options_, db_opts_,
-        std::string("rate_limiter=") + MockRateLimiter::kClassName(),
-        &db_opts_));
-    ASSERT_NE(db_opts_.rate_limiter, nullptr);
-  }
-#endif  // ROCKSDB_LITE
-}
-
 TEST_F(LoadCustomizableTest, LoadFilterPolicyTest) {
   const std::string kAutoBloom = BloomFilterPolicy::kClassName();
   const std::string kAutoRibbon = RibbonFilterPolicy::kClassName();
index e8846b222431b0e1748ea555a44fa5a88e178485..5039f9fb8246ae81c549385e26dc777343498a09 100644 (file)
@@ -422,12 +422,11 @@ static std::unordered_map<std::string, OptionTypeInfo>
         {"db_host_id",
          {offsetof(struct ImmutableDBOptions, db_host_id), OptionType::kString,
           OptionVerificationType::kNormal, OptionTypeFlags::kCompareNever}},
+        // Temporarily deprecated due to race conditions (examples in PR 10375).
         {"rate_limiter",
-         OptionTypeInfo::AsCustomSharedPtr<RateLimiter>(
-             offsetof(struct ImmutableDBOptions, rate_limiter),
-             OptionVerificationType::kNormal,
-             OptionTypeFlags::kCompareNever | OptionTypeFlags::kAllowNull)},
-
+         {offsetof(struct ImmutableDBOptions, rate_limiter),
+          OptionType::kUnknown, OptionVerificationType::kDeprecated,
+          OptionTypeFlags::kDontSerialize | OptionTypeFlags::kCompareNever}},
         // The following properties were handled as special cases in ParseOption
         // This means that the properties could be read from the options file
         // but never written to the file or compared to each other.
index f369e3220bdce2ae2c8b1d75494217b8b0a6fa29..3e3fe1787bf2b4ddc0d1bb0bebd0921c29b44961 100644 (file)
 
 #include "monitoring/statistics.h"
 #include "port/port.h"
-#include "rocksdb/convenience.h"
 #include "rocksdb/system_clock.h"
-#include "rocksdb/utilities/customizable_util.h"
-#include "rocksdb/utilities/object_registry.h"
-#include "rocksdb/utilities/options_type.h"
 #include "test_util/sync_point.h"
 #include "util/aligned_buffer.h"
-#include "util/string_util.h"
 
 namespace ROCKSDB_NAMESPACE {
 size_t RateLimiter::RequestToken(size_t bytes, size_t alignment,
@@ -50,68 +45,33 @@ struct GenericRateLimiter::Req {
   bool granted;
 };
 
-static std::unordered_map<std::string, OptionTypeInfo>
-    generic_rate_limiter_type_info = {
-#ifndef ROCKSDB_LITE
-        {"rate_bytes_per_sec",
-         {offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
-                   max_bytes_per_sec),
-          OptionType::kInt64T}},
-        {"refill_period_us",
-         {offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
-                   refill_period_us),
-          OptionType::kInt64T}},
-        {"fairness",
-         {offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
-                   fairness),
-          OptionType::kInt32T}},
-        {"auto_tuned",
-         {offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
-                   auto_tuned),
-          OptionType::kBoolean}},
-        {"clock",
-         OptionTypeInfo::AsCustomSharedPtr<SystemClock>(
-             offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
-                      clock),
-             OptionVerificationType::kByNameAllowFromNull,
-             OptionTypeFlags::kAllowNull)},
-#endif  // ROCKSDB_LITE
-};
-
 GenericRateLimiter::GenericRateLimiter(
     int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness,
     RateLimiter::Mode mode, const std::shared_ptr<SystemClock>& clock,
     bool auto_tuned)
     : RateLimiter(mode),
-      options_(rate_bytes_per_sec, refill_period_us, fairness, clock,
-               auto_tuned),
+      refill_period_us_(refill_period_us),
+      rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2
+                                     : rate_bytes_per_sec),
+      refill_bytes_per_period_(
+          CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)),
+      clock_(clock),
       stop_(false),
       exit_cv_(&request_mutex_),
       requests_to_wait_(0),
       available_bytes_(0),
+      next_refill_us_(NowMicrosMonotonic()),
+      fairness_(fairness > 100 ? 100 : fairness),
       rnd_((uint32_t)time(nullptr)),
       wait_until_refill_pending_(false),
-      num_drains_(0) {
-  RegisterOptions(&options_, &generic_rate_limiter_type_info);
+      auto_tuned_(auto_tuned),
+      num_drains_(0),
+      max_bytes_per_sec_(rate_bytes_per_sec),
+      tuned_time_(NowMicrosMonotonic()) {
   for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
     total_requests_[i] = 0;
     total_bytes_through_[i] = 0;
   }
-  Initialize();
-}
-void GenericRateLimiter::Initialize() {
-  if (options_.clock == nullptr) {
-    options_.clock = SystemClock::Default();
-  }
-  options_.fairness = std::min(options_.fairness, 100);
-  next_refill_us_ = NowMicrosMonotonic();
-  tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic());
-  if (options_.auto_tuned) {
-    rate_bytes_per_sec_ = options_.max_bytes_per_sec / 2;
-  } else {
-    rate_bytes_per_sec_ = options_.max_bytes_per_sec;
-  }
-  refill_bytes_per_period_ = CalculateRefillBytesPerPeriod(rate_bytes_per_sec_);
 }
 
 GenericRateLimiter::~GenericRateLimiter() {
@@ -135,18 +95,6 @@ GenericRateLimiter::~GenericRateLimiter() {
   }
 }
 
-Status GenericRateLimiter::PrepareOptions(const ConfigOptions& options) {
-  if (options_.fairness <= 0) {
-    return Status::InvalidArgument("Fairness must be > 0");
-  } else if (options_.max_bytes_per_sec <= 0) {
-    return Status::InvalidArgument("max_bytes_per_sec must be > 0");
-  } else if (options_.refill_period_us <= 0) {
-    return Status::InvalidArgument("Refill_period_us must be > 0");
-  }
-  Initialize();
-  return RateLimiter::PrepareOptions(options);
-}
-
 // This API allows user to dynamically change rate limiter's bytes per second.
 void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {
   assert(bytes_per_second > 0);
@@ -165,11 +113,11 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
                            &rate_bytes_per_sec_);
   MutexLock g(&request_mutex_);
 
-  if (options_.auto_tuned) {
+  if (auto_tuned_) {
     static const int kRefillsPerTune = 100;
     std::chrono::microseconds now(NowMicrosMonotonic());
-    if (now - tuned_time_ >= kRefillsPerTune * std::chrono::microseconds(
-                                                   options_.refill_period_us)) {
+    if (now - tuned_time_ >=
+        kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) {
       Status s = Tune();
       s.PermitUncheckedError();  //**TODO: What to do on error?
     }
@@ -213,7 +161,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
       } else {
         // Whichever thread reaches here first performs duty (1) as described
         // above.
-        int64_t wait_until = options_.clock->NowMicros() + time_until_refill_us;
+        int64_t wait_until = clock_->NowMicros() + time_until_refill_us;
         RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);
         ++num_drains_;
         wait_until_refill_pending_ = true;
@@ -273,12 +221,12 @@ GenericRateLimiter::GeneratePriorityIterationOrder() {
   // first
   pri_iteration_order[0] = Env::IO_USER;
 
-  bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(options_.fairness);
+  bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(fairness_);
   TEST_SYNC_POINT_CALLBACK(
       "GenericRateLimiter::GeneratePriorityIterationOrder::"
       "PostRandomOneInFairnessForHighPri",
       &high_pri_iterated_after_mid_low_pri);
-  bool mid_pri_itereated_after_low_pri = rnd_.OneIn(options_.fairness);
+  bool mid_pri_itereated_after_low_pri = rnd_.OneIn(fairness_);
   TEST_SYNC_POINT_CALLBACK(
       "GenericRateLimiter::GeneratePriorityIterationOrder::"
       "PostRandomOneInFairnessForMidPri",
@@ -307,7 +255,7 @@ GenericRateLimiter::GeneratePriorityIterationOrder() {
 
 void GenericRateLimiter::RefillBytesAndGrantRequests() {
   TEST_SYNC_POINT("GenericRateLimiter::RefillBytesAndGrantRequests");
-  next_refill_us_ = NowMicrosMonotonic() + options_.refill_period_us;
+  next_refill_us_ = NowMicrosMonotonic() + refill_period_us_;
   // Carry over the left over quota from the last period
   auto refill_bytes_per_period =
       refill_bytes_per_period_.load(std::memory_order_relaxed);
@@ -348,12 +296,12 @@ void GenericRateLimiter::RefillBytesAndGrantRequests() {
 int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod(
     int64_t rate_bytes_per_sec) {
   if (std::numeric_limits<int64_t>::max() / rate_bytes_per_sec <
-      options_.refill_period_us) {
+      refill_period_us_) {
     // Avoid unexpected result in the overflow case. The result now is still
     // inaccurate but is a number that is large enough.
     return std::numeric_limits<int64_t>::max() / 1000000;
   } else {
-    return rate_bytes_per_sec * options_.refill_period_us / 1000000;
+    return rate_bytes_per_sec * refill_period_us_ / 1000000;
   }
 }
 
@@ -368,11 +316,10 @@ Status GenericRateLimiter::Tune() {
   std::chrono::microseconds prev_tuned_time = tuned_time_;
   tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic());
 
-  int64_t elapsed_intervals =
-      (tuned_time_ - prev_tuned_time +
-       std::chrono::microseconds(options_.refill_period_us) -
-       std::chrono::microseconds(1)) /
-      std::chrono::microseconds(options_.refill_period_us);
+  int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time +
+                               std::chrono::microseconds(refill_period_us_) -
+                               std::chrono::microseconds(1)) /
+                              std::chrono::microseconds(refill_period_us_);
   // We tune every kRefillsPerTune intervals, so the overflow and division-by-
   // zero conditions should never happen.
   assert(num_drains_ <= std::numeric_limits<int64_t>::max() / 100);
@@ -382,13 +329,13 @@ Status GenericRateLimiter::Tune() {
   int64_t prev_bytes_per_sec = GetBytesPerSecond();
   int64_t new_bytes_per_sec;
   if (drained_pct == 0) {
-    new_bytes_per_sec = options_.max_bytes_per_sec / kAllowedRangeFactor;
+    new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor;
   } else if (drained_pct < kLowWatermarkPct) {
     // sanitize to prevent overflow
     int64_t sanitized_prev_bytes_per_sec =
         std::min(prev_bytes_per_sec, std::numeric_limits<int64_t>::max() / 100);
     new_bytes_per_sec =
-        std::max(options_.max_bytes_per_sec / kAllowedRangeFactor,
+        std::max(max_bytes_per_sec_ / kAllowedRangeFactor,
                  sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct));
   } else if (drained_pct > kHighWatermarkPct) {
     // sanitize to prevent overflow
@@ -396,7 +343,7 @@ Status GenericRateLimiter::Tune() {
         std::min(prev_bytes_per_sec, std::numeric_limits<int64_t>::max() /
                                          (100 + kAdjustFactorPct));
     new_bytes_per_sec =
-        std::min(options_.max_bytes_per_sec,
+        std::min(max_bytes_per_sec_,
                  sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100);
   } else {
     new_bytes_per_sec = prev_bytes_per_sec;
@@ -419,79 +366,7 @@ RateLimiter* NewGenericRateLimiter(
   std::unique_ptr<RateLimiter> limiter(
       new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness,
                              mode, SystemClock::Default(), auto_tuned));
-  Status s = limiter->PrepareOptions(ConfigOptions());
-  if (s.ok()) {
-    return limiter.release();
-  } else {
-    assert(false);
-    return nullptr;
-  }
-}
-namespace {
-#ifndef ROCKSDB_LITE
-static int RegisterBuiltinRateLimiters(ObjectLibrary& library,
-                                       const std::string& /*arg*/) {
-  library.AddFactory<RateLimiter>(
-      GenericRateLimiter::kClassName(),
-      [](const std::string& /*uri*/, std::unique_ptr<RateLimiter>* guard,
-         std::string* /*errmsg*/) {
-        guard->reset(
-            new GenericRateLimiter(std::numeric_limits<int64_t>::max()));
-        return guard->get();
-      });
-  size_t num_types;
-  return static_cast<int>(library.GetFactoryCount(&num_types));
-}
-
-static std::unordered_map<std::string, RateLimiter::Mode>
-    rate_limiter_mode_map = {
-        {"kReadsOnly", RateLimiter::Mode::kReadsOnly},
-        {"kWritesOnly", RateLimiter::Mode::kWritesOnly},
-        {"kAllIo", RateLimiter::Mode::kAllIo},
-};
-#endif  // ROCKSDB_LITE
-static bool LoadRateLimiter(const std::string& name,
-                            std::shared_ptr<RateLimiter>* limiter) {
-  auto plen = strlen(GenericRateLimiter::kClassName());
-  if (name.size() > plen + 2 && name[plen] == ':' &&
-      StartsWith(name, GenericRateLimiter::kClassName())) {
-    auto rate = ParseInt64(name.substr(plen + 1));
-    limiter->reset(new GenericRateLimiter(rate));
-    return true;
-  } else {
-    return false;
-  }
-}
-
-static std::unordered_map<std::string, OptionTypeInfo> rate_limiter_type_info =
-    {
-#ifndef ROCKSDB_LITE
-        {"mode",
-         OptionTypeInfo::Enum<RateLimiter::Mode>(0, &rate_limiter_mode_map)},
-#endif  // ROCKSDB_LITE
-};
-}  // namespace
-
-RateLimiter::RateLimiter(Mode mode) : mode_(mode) {
-  RegisterOptions("", &mode_, &rate_limiter_type_info);
-}
-
-Status RateLimiter::CreateFromString(const ConfigOptions& config_options,
-                                     const std::string& value,
-                                     std::shared_ptr<RateLimiter>* result) {
-  if (value.empty()) {
-    result->reset();
-    return Status::OK();
-  } else {
-#ifndef ROCKSDB_LITE
-    static std::once_flag once;
-    std::call_once(once, [&]() {
-      RegisterBuiltinRateLimiters(*(ObjectLibrary::Default().get()), "");
-    });
-#endif  // ROCKSDB_LITE
-    return LoadSharedObject<RateLimiter>(config_options, value, LoadRateLimiter,
-                                         result);
-  }
+  return limiter.release();
 }
 
 }  // namespace ROCKSDB_NAMESPACE
index 75751d3c5d84a645f3036ae4226d0bdf73d0456e..7f01864c5b86f8885c7c201326d1a6d414d92403 100644 (file)
@@ -26,38 +26,13 @@ namespace ROCKSDB_NAMESPACE {
 
 class GenericRateLimiter : public RateLimiter {
  public:
-  struct GenericRateLimiterOptions {
-    static const char* kName() { return "GenericRateLimiterOptions"; }
-    GenericRateLimiterOptions(int64_t _rate_bytes_per_sec,
-                              int64_t _refill_period_us, int32_t _fairness,
-                              const std::shared_ptr<SystemClock>& _clock,
-                              bool _auto_tuned)
-        : max_bytes_per_sec(_rate_bytes_per_sec),
-          refill_period_us(_refill_period_us),
-          clock(_clock),
-          fairness(_fairness > 100 ? 100 : _fairness),
-          auto_tuned(_auto_tuned) {}
-    int64_t max_bytes_per_sec;
-    int64_t refill_period_us;
-    std::shared_ptr<SystemClock> clock;
-    int32_t fairness;
-    bool auto_tuned;
-  };
-
- public:
-  explicit GenericRateLimiter(
-      int64_t refill_bytes, int64_t refill_period_us = 100 * 1000,
-      int32_t fairness = 10,
-      RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly,
-      const std::shared_ptr<SystemClock>& clock = nullptr,
-      bool auto_tuned = false);
+  GenericRateLimiter(int64_t refill_bytes, int64_t refill_period_us,
+                     int32_t fairness, RateLimiter::Mode mode,
+                     const std::shared_ptr<SystemClock>& clock,
+                     bool auto_tuned);
 
   virtual ~GenericRateLimiter();
 
-  static const char* kClassName() { return "GenericRateLimiter"; }
-  const char* Name() const override { return kClassName(); }
-  Status PrepareOptions(const ConfigOptions& options) override;
-
   // This API allows user to dynamically change rate limiter's bytes per second.
   virtual void SetBytesPerSecond(int64_t bytes_per_second) override;
 
@@ -120,25 +95,29 @@ class GenericRateLimiter : public RateLimiter {
     return rate_bytes_per_sec_;
   }
 
+  virtual void TEST_SetClock(std::shared_ptr<SystemClock> clock) {
+    MutexLock g(&request_mutex_);
+    clock_ = std::move(clock);
+    next_refill_us_ = NowMicrosMonotonic();
+  }
+
  private:
-  void Initialize();
   void RefillBytesAndGrantRequests();
   std::vector<Env::IOPriority> GeneratePriorityIterationOrder();
   int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec);
   Status Tune();
 
-  uint64_t NowMicrosMonotonic() {
-    return options_.clock->NowNanos() / std::milli::den;
-  }
+  uint64_t NowMicrosMonotonic() { return clock_->NowNanos() / std::milli::den; }
 
   // This mutex guard all internal states
   mutable port::Mutex request_mutex_;
 
-  GenericRateLimiterOptions options_;
+  const int64_t refill_period_us_;
 
   int64_t rate_bytes_per_sec_;
   // This variable can be changed dynamically.
   std::atomic<int64_t> refill_bytes_per_period_;
+  std::shared_ptr<SystemClock> clock_;
 
   bool stop_;
   port::CondVar exit_cv_;
@@ -149,13 +128,16 @@ class GenericRateLimiter : public RateLimiter {
   int64_t available_bytes_;
   int64_t next_refill_us_;
 
+  int32_t fairness_;
   Random rnd_;
 
   struct Req;
   std::deque<Req*> queue_[Env::IO_TOTAL];
   bool wait_until_refill_pending_;
 
+  bool auto_tuned_;
   int64_t num_drains_;
+  const int64_t max_bytes_per_sec_;
   std::chrono::microseconds tuned_time_;
 };
 
index cd809d183f543a434911bd042c5613ac77603314..5691ab26c318647fd636f4dc861ccd964f40a92f 100644 (file)
 #include <limits>
 
 #include "db/db_test_util.h"
-#include "options/options_parser.h"
 #include "port/port.h"
-#include "rocksdb/convenience.h"
 #include "rocksdb/system_clock.h"
-#include "rocksdb/utilities/options_type.h"
 #include "test_util/sync_point.h"
 #include "test_util/testharness.h"
 #include "util/random.h"
@@ -466,95 +463,6 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
   ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec);
 }
 
-TEST_F(RateLimiterTest, CreateGenericRateLimiterFromString) {
-  std::shared_ptr<RateLimiter> limiter;
-  ConfigOptions config_options;
-  std::string limiter_id = GenericRateLimiter::kClassName();
-  ASSERT_OK(RateLimiter::CreateFromString(config_options, limiter_id + ":1024",
-                                          &limiter));
-  ASSERT_NE(limiter, nullptr);
-  ASSERT_EQ(limiter->GetBytesPerSecond(), 1024U);
-#ifndef ROCKSDB_LITE
-  ASSERT_OK(RateLimiter::CreateFromString(
-      config_options, "rate_bytes_per_sec=2048;id=" + limiter_id, &limiter));
-  ASSERT_NE(limiter, nullptr);
-  ASSERT_EQ(limiter->GetBytesPerSecond(), 2048U);
-  ASSERT_NOK(RateLimiter::CreateFromString(
-      config_options, "rate_bytes_per_sec=0;id=" + limiter_id, &limiter));
-  ASSERT_NOK(RateLimiter::CreateFromString(
-      config_options, "rate_bytes_per_sec=2048;fairness=0;id=" + limiter_id,
-      &limiter));
-
-  ASSERT_OK(
-      RateLimiter::CreateFromString(config_options,
-                                    "rate_bytes_per_sec=2048;refill_period_us="
-                                    "1024;fairness=42;auto_tuned=true;"
-                                    "mode=kReadsOnly;id=" +
-                                        limiter_id,
-                                    &limiter));
-  ASSERT_NE(limiter, nullptr);
-  auto opts =
-      limiter->GetOptions<GenericRateLimiter::GenericRateLimiterOptions>();
-  ASSERT_NE(opts, nullptr);
-  ASSERT_EQ(opts->max_bytes_per_sec, 2048);
-  ASSERT_EQ(opts->refill_period_us, 1024);
-  ASSERT_EQ(opts->fairness, 42);
-  ASSERT_EQ(opts->auto_tuned, true);
-  ASSERT_TRUE(limiter->IsRateLimited(RateLimiter::OpType::kRead));
-  ASSERT_FALSE(limiter->IsRateLimited(RateLimiter::OpType::kWrite));
-#endif  // ROCKSDB_LITE
-}
-
-#ifndef ROCKSDB_LITE
-// This test is for a rate limiter that has no name (Name() returns "").
-// When the default Name() method is deprecated, this test should be removed.
-TEST_F(RateLimiterTest, NoNameRateLimiter) {
-  static std::unordered_map<std::string, OptionTypeInfo> dummy_limiter_options =
-      {
-          {"dummy",
-           {0, OptionType::kInt, OptionVerificationType::kNormal,
-            OptionTypeFlags::kNone}},
-      };
-  class NoNameRateLimiter : public RateLimiter {
-   public:
-    explicit NoNameRateLimiter(bool do_register) {
-      if (do_register) {
-        RegisterOptions("", &dummy, &dummy_limiter_options);
-      }
-    }
-    void SetBytesPerSecond(int64_t /*bytes_per_second*/) override {}
-    int64_t GetSingleBurstBytes() const override { return 0; }
-    int64_t GetTotalBytesThrough(const Env::IOPriority /*pri*/) const override {
-      return 0;
-    }
-    int64_t GetTotalRequests(const Env::IOPriority /*pri*/) const override {
-      return 0;
-    }
-    int64_t GetBytesPerSecond() const override { return 0; }
-
-   private:
-    int dummy;
-  };
-
-  ConfigOptions config_options;
-  DBOptions db_opts, copy;
-  db_opts.rate_limiter.reset(new NoNameRateLimiter(false));
-  ASSERT_EQ(db_opts.rate_limiter->GetId(), "");
-  ASSERT_EQ(db_opts.rate_limiter->ToString(config_options), "");
-  db_opts.rate_limiter.reset(new NoNameRateLimiter(true));
-  ASSERT_EQ(db_opts.rate_limiter->GetId(), "");
-  ASSERT_EQ(db_opts.rate_limiter->ToString(config_options), "");
-  std::string opt_str;
-  ASSERT_OK(GetStringFromDBOptions(config_options, db_opts, &opt_str));
-  ASSERT_OK(
-      GetDBOptionsFromString(config_options, DBOptions(), opt_str, &copy));
-  ASSERT_OK(
-      RocksDBOptionsParser::VerifyDBOptions(config_options, db_opts, copy));
-  ASSERT_EQ(copy.rate_limiter, nullptr);
-  ASSERT_NE(copy.rate_limiter, db_opts.rate_limiter);
-}
-#endif  // ROCKSDB_LITE
-
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {
index 37a8cfff6fbcfca011367cb9ae5f69e6de60c1bf..ab2bd37758c62332a6c10bdc93d7e9753298c6cb 100644 (file)
@@ -186,54 +186,13 @@ class BackupEngineImpl {
       const std::shared_ptr<SystemClock>& backup_rate_limiter_clock,
       const std::shared_ptr<SystemClock>& restore_rate_limiter_clock) {
     if (backup_rate_limiter_clock) {
-      assert(options_.backup_rate_limiter->IsInstanceOf(
-          GenericRateLimiter::kClassName()));
-      auto* backup_rate_limiter_options =
-          options_.backup_rate_limiter
-              ->GetOptions<GenericRateLimiter::GenericRateLimiterOptions>();
-
-      assert(backup_rate_limiter_options);
-      RateLimiter::Mode backup_rate_limiter_mode;
-      if (!options_.backup_rate_limiter->IsRateLimited(
-              RateLimiter::OpType::kRead)) {
-        backup_rate_limiter_mode = RateLimiter::Mode::kWritesOnly;
-      } else if (!options_.backup_rate_limiter->IsRateLimited(
-                     RateLimiter::OpType::kWrite)) {
-        backup_rate_limiter_mode = RateLimiter::Mode::kReadsOnly;
-      } else {
-        backup_rate_limiter_mode = RateLimiter::Mode::kAllIo;
-      }
-      options_.backup_rate_limiter.reset(new GenericRateLimiter(
-          backup_rate_limiter_options->max_bytes_per_sec,
-          backup_rate_limiter_options->refill_period_us,
-          backup_rate_limiter_options->fairness, backup_rate_limiter_mode,
-          backup_rate_limiter_clock, backup_rate_limiter_options->auto_tuned));
+      static_cast<GenericRateLimiter*>(options_.backup_rate_limiter.get())
+          ->TEST_SetClock(backup_rate_limiter_clock);
     }
 
     if (restore_rate_limiter_clock) {
-      assert(options_.restore_rate_limiter->IsInstanceOf(
-          GenericRateLimiter::kClassName()));
-      auto* restore_rate_limiter_options =
-          options_.restore_rate_limiter
-              ->GetOptions<GenericRateLimiter::GenericRateLimiterOptions>();
-      assert(restore_rate_limiter_options);
-
-      RateLimiter::Mode restore_rate_limiter_mode;
-      if (!options_.restore_rate_limiter->IsRateLimited(
-              RateLimiter::OpType::kRead)) {
-        restore_rate_limiter_mode = RateLimiter::Mode::kWritesOnly;
-      } else if (!options_.restore_rate_limiter->IsRateLimited(
-                     RateLimiter::OpType::kWrite)) {
-        restore_rate_limiter_mode = RateLimiter::Mode::kReadsOnly;
-      } else {
-        restore_rate_limiter_mode = RateLimiter::Mode::kAllIo;
-      }
-      options_.restore_rate_limiter.reset(new GenericRateLimiter(
-          restore_rate_limiter_options->max_bytes_per_sec,
-          restore_rate_limiter_options->refill_period_us,
-          restore_rate_limiter_options->fairness, restore_rate_limiter_mode,
-          restore_rate_limiter_clock,
-          restore_rate_limiter_options->auto_tuned));
+      static_cast<GenericRateLimiter*>(options_.restore_rate_limiter.get())
+          ->TEST_SetClock(restore_rate_limiter_clock);
     }
   }