#include "global/global_init.h"
#include <gtest/gtest.h>
+#include <thread>
+#include <atomic>
+#include <chrono>
+#include <mutex>
+#include <list>
+#include <random>
+
class ThrottleTest : public ::testing::Test {
protected:
}
}
+std::pair<double, std::chrono::duration<double> > test_backoff(
+ double low_threshhold,
+ double high_threshhold,
+ double expected_throughput,
+ double high_multiple,
+ double max_multiple,
+ uint64_t max,
+ double put_delay_per_count,
+ unsigned getters,
+ unsigned putters)
+{
+ std::mutex l;
+ std::condition_variable c;
+ uint64_t total = 0;
+ std::list<uint64_t> in_queue;
+ bool stop = false;
+
+ auto wait_time = std::chrono::duration<double>(0);
+ uint64_t waits = 0;
+
+ uint64_t total_observed_total = 0;
+ uint64_t total_observations = 0;
+
+ BackoffThrottle throttle(5);
+ bool valid = throttle.set_params(
+ low_threshhold,
+ high_threshhold,
+ expected_throughput,
+ high_multiple,
+ max_multiple,
+ max,
+ 0);
+ assert(valid);
+
+ auto getter = [&]() {
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution<> dis(0, 10);
+
+ std::unique_lock<std::mutex> g(l);
+ while (!stop) {
+ g.unlock();
+
+ uint64_t to_get = dis(gen);
+ auto waited = throttle.get(to_get);
+
+ g.lock();
+ wait_time += waited;
+ waits += to_get;
+ total += to_get;
+ in_queue.push_back(to_get);
+ c.notify_one();
+ }
+ };
+
+ auto putter = [&]() {
+ std::unique_lock<std::mutex> g(l);
+ while (!stop) {
+ while (in_queue.empty())
+ c.wait(g);
+
+ uint64_t c = in_queue.front();
+
+ total_observed_total += total;
+ total_observations++;
+ in_queue.pop_front();
+ assert(total <= max);
+
+ g.unlock();
+ std::this_thread::sleep_for(
+ c * std::chrono::duration<double>(put_delay_per_count*putters));
+ g.lock();
+
+ total -= c;
+ throttle.put(c);
+ }
+ };
+
+ vector<std::thread> gts(getters);
+ for (auto &&i: gts) i = std::thread(getter);
+
+ vector<std::thread> pts(putters);
+ for (auto &&i: pts) i = std::thread(putter);
+
+ std::this_thread::sleep_for(std::chrono::duration<double>(5));
+ {
+ std::unique_lock<std::mutex> g(l);
+ stop = true;
+ }
+ for (auto &&i: gts) i.join();
+ gts.clear();
+ for (auto &&i: pts) i.join();
+ pts.clear();
+
+ return make_pair(
+ ((double)total_observed_total)/((double)total_observations),
+ wait_time / waits);
+}
+
+TEST(BackoffThrottle, undersaturated)
+{
+ auto results = test_backoff(
+ 0.4,
+ 0.6,
+ 1000,
+ 2,
+ 10,
+ 100,
+ 0.0001,
+ 3,
+ 6);
+ ASSERT_LT(results.first, 45);
+ ASSERT_GT(results.first, 35);
+ ASSERT_LT(results.second.count(), 0.0002);
+ ASSERT_GT(results.second.count(), 0.00005);
+}
+
+TEST(BackoffThrottle, balanced)
+{
+ auto results = test_backoff(
+ 0.4,
+ 0.6,
+ 1000,
+ 2,
+ 10,
+ 100,
+ 0.001,
+ 7,
+ 2);
+ ASSERT_LT(results.first, 60);
+ ASSERT_GT(results.first, 40);
+ ASSERT_LT(results.second.count(), 0.002);
+ ASSERT_GT(results.second.count(), 0.0005);
+}
+
+TEST(BackoffThrottle, oversaturated)
+{
+ auto results = test_backoff(
+ 0.4,
+ 0.6,
+ 10000000,
+ 2,
+ 10,
+ 100,
+ 0.001,
+ 1,
+ 3);
+ ASSERT_LT(results.first, 101);
+ ASSERT_GT(results.first, 85);
+ ASSERT_LT(results.second.count(), 0.002);
+ ASSERT_GT(results.second.count(), 0.0005);
+}
+
int main(int argc, char **argv) {
vector<const char*> args;
argv_to_vec(argc, (const char **)argv, args);