--- /dev/null
- auto client_info_f = [=](const ClientId& c) -> test::dmc::ClientInfo {
- return client_info[ret_client_group_f(c)];
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ */
+
+
+#include "test_dmclock.h"
+#include "config.h"
+
+#ifdef PROFILE
+#include "profile.h"
+#endif
+
+
+namespace dmc = crimson::dmclock;
+namespace test = crimson::test_dmc;
+namespace sim = crimson::qos_simulation;
+
+using namespace std::placeholders;
+
+
+namespace crimson {
+ namespace test_dmc {
+ void server_data(std::ostream& out,
+ test::MySim* sim,
+ test::MySim::ServerFilter server_disp_filter,
+ int head_w, int data_w, int data_prec);
+
+ void client_data(std::ostream& out,
+ test::MySim* sim,
+ test::MySim::ClientFilter client_disp_filter,
+ int head_w, int data_w, int data_prec);
+ }
+}
+
+
+int main(int argc, char* argv[]) {
+ std::vector<const char*> args;
+ for (int i = 1; i < argc; ++i) {
+ args.push_back(argv[i]);
+ }
+
+ std::string conf_file_list;
+ sim::ceph_argparse_early_args(args, &conf_file_list);
+
+ sim::sim_config_t g_conf;
+ std::vector<sim::cli_group_t> &cli_group = g_conf.cli_group;
+ std::vector<sim::srv_group_t> &srv_group = g_conf.srv_group;
+
+ if (!conf_file_list.empty()) {
+ int ret;
+ ret = sim::parse_config_file(conf_file_list, g_conf);
+ if (ret) {
+ // error
+ _exit(1);
+ }
+ } else {
+ // default simulation parameter
+ g_conf.client_groups = 2;
+
+ sim::srv_group_t st;
+ srv_group.push_back(st);
+
+ sim::cli_group_t ct1(99, 0);
+ cli_group.push_back(ct1);
+
+ sim::cli_group_t ct2(1, 10);
+ cli_group.push_back(ct2);
+ }
+
+ const uint server_groups = g_conf.server_groups;
+ const uint client_groups = g_conf.client_groups;
+ const bool server_random_selection = g_conf.server_random_selection;
+ const bool server_soft_limit = g_conf.server_soft_limit;
+ const double anticipation_timeout = g_conf.anticipation_timeout;
+ uint server_total_count = 0;
+ uint client_total_count = 0;
+
+ for (uint i = 0; i < client_groups; ++i) {
+ client_total_count += cli_group[i].client_count;
+ }
+
+ for (uint i = 0; i < server_groups; ++i) {
+ server_total_count += srv_group[i].server_count;
+ }
+
+ std::vector<test::dmc::ClientInfo> client_info;
+ for (uint i = 0; i < client_groups; ++i) {
+ client_info.push_back(test::dmc::ClientInfo
+ { cli_group[i].client_reservation,
+ cli_group[i].client_weight,
+ cli_group[i].client_limit } );
+ }
+
+ auto ret_client_group_f = [&](const ClientId& c) -> uint {
+ uint group_max = 0;
+ uint i = 0;
+ for (; i < client_groups; ++i) {
+ group_max += cli_group[i].client_count;
+ if (c < group_max) {
+ break;
+ }
+ }
+ return i;
+ };
+
+ auto ret_server_group_f = [&](const ServerId& s) -> uint {
+ uint group_max = 0;
+ uint i = 0;
+ for (; i < server_groups; ++i) {
+ group_max += srv_group[i].server_count;
+ if (s < group_max) {
+ break;
+ }
+ }
+ return i;
+ };
+
++ auto client_info_f = [=](const ClientId& c) -> const test::dmc::ClientInfo* {
++ return &client_info[ret_client_group_f(c)];
+ };
+
+ auto client_disp_filter = [=] (const ClientId& i) -> bool {
+ return i < 3 || i >= (client_total_count - 3);
+ };
+
+ auto server_disp_filter = [=] (const ServerId& i) -> bool {
+ return i < 3 || i >= (server_total_count - 3);
+ };
+
+
+ test::MySim *simulation;
+
+
+ // lambda to post a request to the identified server; called by client
+ test::SubmitFunc server_post_f =
+ [&simulation](const ServerId& server,
+ sim::TestRequest&& request,
+ const ClientId& client_id,
+ const test::dmc::ReqParams& req_params) {
+ test::DmcServer& s = simulation->get_server(server);
+ s.post(std::move(request), client_id, req_params);
+ };
+
+ std::vector<std::vector<sim::CliInst>> cli_inst;
+ for (uint i = 0; i < client_groups; ++i) {
+ if (cli_group[i].client_wait == std::chrono::seconds(0)) {
+ cli_inst.push_back(
+ { { sim::req_op,
+ (uint32_t)cli_group[i].client_total_ops,
+ (double)cli_group[i].client_iops_goal,
+ (uint16_t)cli_group[i].client_outstanding_ops } } );
+ } else {
+ cli_inst.push_back(
+ { { sim::wait_op, cli_group[i].client_wait },
+ { sim::req_op,
+ (uint32_t)cli_group[i].client_total_ops,
+ (double)cli_group[i].client_iops_goal,
+ (uint16_t)cli_group[i].client_outstanding_ops } } );
+ }
+ }
+
+ simulation = new test::MySim();
+
+ test::DmcServer::ClientRespFunc client_response_f =
+ [&simulation](ClientId client_id,
+ const sim::TestResponse& resp,
+ const ServerId& server_id,
+ const dmc::PhaseType& phase) {
+ simulation->get_client(client_id).receive_response(resp,
+ server_id,
+ phase);
+ };
+
+ test::CreateQueueF create_queue_f =
+ [&](test::DmcQueue::CanHandleRequestFunc can_f,
+ test::DmcQueue::HandleRequestFunc handle_f) -> test::DmcQueue* {
+ return new test::DmcQueue(client_info_f,
+ can_f,
+ handle_f,
+ server_soft_limit,
+ anticipation_timeout);
+ };
+
+
+ auto create_server_f = [&](ServerId id) -> test::DmcServer* {
+ uint i = ret_server_group_f(id);
+ return new test::DmcServer(id,
+ srv_group[i].server_iops,
+ srv_group[i].server_threads,
+ client_response_f,
+ test::dmc_server_accumulate_f,
+ create_queue_f);
+ };
+
+ auto create_client_f = [&](ClientId id) -> test::DmcClient* {
+ uint i = ret_client_group_f(id);
+ test::MySim::ClientBasedServerSelectFunc server_select_f;
+ uint client_server_select_range = cli_group[i].client_server_select_range;
+ if (!server_random_selection) {
+ server_select_f = simulation->make_server_select_alt_range(client_server_select_range);
+ } else {
+ server_select_f = simulation->make_server_select_ran_range(client_server_select_range);
+ }
+ return new test::DmcClient(id,
+ server_post_f,
+ std::bind(server_select_f, _1, id),
+ test::dmc_client_accumulate_f,
+ cli_inst[i]);
+ };
+
+#if 1
+ std::cout << "[global]" << std::endl << g_conf << std::endl;
+ for (uint i = 0; i < client_groups; ++i) {
+ std::cout << std::endl << "[client." << i << "]" << std::endl;
+ std::cout << cli_group[i] << std::endl;
+ }
+ for (uint i = 0; i < server_groups; ++i) {
+ std::cout << std::endl << "[server." << i << "]" << std::endl;
+ std::cout << srv_group[i] << std::endl;
+ }
+ std::cout << std::endl;
+#endif
+
+ simulation->add_servers(server_total_count, create_server_f);
+ simulation->add_clients(client_total_count, create_client_f);
+
+ simulation->run();
+ simulation->display_stats(std::cout,
+ &test::server_data, &test::client_data,
+ server_disp_filter, client_disp_filter);
+
+ delete simulation;
+} // main
+
+
+void test::client_data(std::ostream& out,
+ test::MySim* sim,
+ test::MySim::ClientFilter client_disp_filter,
+ int head_w, int data_w, int data_prec) {
+ // report how many ops were done by reservation and proportion for
+ // each client
+
+ int total_r = 0;
+ out << std::setw(head_w) << "res_ops:";
+ for (uint i = 0; i < sim->get_client_count(); ++i) {
+ const auto& client = sim->get_client(i);
+ auto r = client.get_accumulator().reservation_count;
+ total_r += r;
+ if (!client_disp_filter(i)) continue;
+ out << " " << std::setw(data_w) << r;
+ }
+ out << " " << std::setw(data_w) << std::setprecision(data_prec) <<
+ std::fixed << total_r << std::endl;
+
+ int total_p = 0;
+ out << std::setw(head_w) << "prop_ops:";
+ for (uint i = 0; i < sim->get_client_count(); ++i) {
+ const auto& client = sim->get_client(i);
+ auto p = client.get_accumulator().proportion_count;
+ total_p += p;
+ if (!client_disp_filter(i)) continue;
+ out << " " << std::setw(data_w) << p;
+ }
+ out << " " << std::setw(data_w) << std::setprecision(data_prec) <<
+ std::fixed << total_p << std::endl;
+}
+
+
+void test::server_data(std::ostream& out,
+ test::MySim* sim,
+ test::MySim::ServerFilter server_disp_filter,
+ int head_w, int data_w, int data_prec) {
+ out << std::setw(head_w) << "res_ops:";
+ int total_r = 0;
+ for (uint i = 0; i < sim->get_server_count(); ++i) {
+ const auto& server = sim->get_server(i);
+ auto rc = server.get_accumulator().reservation_count;
+ total_r += rc;
+ if (!server_disp_filter(i)) continue;
+ out << " " << std::setw(data_w) << rc;
+ }
+ out << " " << std::setw(data_w) << std::setprecision(data_prec) <<
+ std::fixed << total_r << std::endl;
+
+ out << std::setw(head_w) << "prop_ops:";
+ int total_p = 0;
+ for (uint i = 0; i < sim->get_server_count(); ++i) {
+ const auto& server = sim->get_server(i);
+ auto pc = server.get_accumulator().proportion_count;
+ total_p += pc;
+ if (!server_disp_filter(i)) continue;
+ out << " " << std::setw(data_w) << pc;
+ }
+ out << " " << std::setw(data_w) << std::setprecision(data_prec) <<
+ std::fixed << total_p << std::endl;
+
+ const auto& q = sim->get_server(0).get_priority_queue();
+ out << std::endl <<
+ " k-way heap: " << q.get_heap_branching_factor() << std::endl
+ << std::endl;
+
+#ifdef PROFILE
+ crimson::ProfileCombiner<std::chrono::nanoseconds> art_combiner;
+ crimson::ProfileCombiner<std::chrono::nanoseconds> rct_combiner;
+ for (uint i = 0; i < sim->get_server_count(); ++i) {
+ const auto& q = sim->get_server(i).get_priority_queue();
+ const auto& art = q.add_request_timer;
+ art_combiner.combine(art);
+ const auto& rct = q.request_complete_timer;
+ rct_combiner.combine(rct);
+ }
+ out << "Server add_request_timer: count:" << art_combiner.get_count() <<
+ ", mean:" << art_combiner.get_mean() <<
+ ", std_dev:" << art_combiner.get_std_dev() <<
+ ", low:" << art_combiner.get_low() <<
+ ", high:" << art_combiner.get_high() << std::endl;
+ out << "Server request_complete_timer: count:" << rct_combiner.get_count() <<
+ ", mean:" << rct_combiner.get_mean() <<
+ ", std_dev:" << rct_combiner.get_std_dev() <<
+ ", low:" << rct_combiner.get_low() <<
+ ", high:" << rct_combiner.get_high() << std::endl;
+ out << "Server combined mean: " <<
+ (art_combiner.get_mean() + rct_combiner.get_mean()) <<
+ std::endl;
+#endif
+}
--- /dev/null
- ClientInfo info;
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2017 Red Hat Inc.
+ */
+
+
+#pragma once
+
+/* COMPILATION OPTIONS
+ *
+ * By default we include an optimization over the originally published
+ * dmclock algorithm using not the values of rho and delta that were
+ * sent in with a request but instead the most recent rho and delta
+ * values from the requests's client. To restore the algorithm's
+ * original behavior, define DO_NOT_DELAY_TAG_CALC (i.e., compiler
+ * argument -DDO_NOT_DELAY_TAG_CALC).
+ *
+ * The prop_heap does not seem to be necessary. The only thing it
+ * would help with is quickly finding the mininum proportion/prioity
+ * when an idle client became active. To have the code maintain the
+ * proportional heap, define USE_PROP_HEAP (i.e., compiler argument
+ * -DUSE_PROP_HEAP).
+ */
+
+#include <assert.h>
+
+#include <cmath>
+#include <memory>
+#include <map>
+#include <deque>
+#include <queue>
+#include <atomic>
+#include <mutex>
+#include <condition_variable>
+#include <thread>
+#include <iostream>
+#include <sstream>
+#include <limits>
+
+#include <boost/variant.hpp>
+
+#include "indirect_intrusive_heap.h"
+#include "run_every.h"
+#include "dmclock_util.h"
+#include "dmclock_recs.h"
+
+#ifdef PROFILE
+#include "profile.h"
+#endif
+
+
+namespace crimson {
+
+ namespace dmclock {
+
+ namespace c = crimson;
+
+ constexpr double max_tag = std::numeric_limits<double>::is_iec559 ?
+ std::numeric_limits<double>::infinity() :
+ std::numeric_limits<double>::max();
+ constexpr double min_tag = std::numeric_limits<double>::is_iec559 ?
+ -std::numeric_limits<double>::infinity() :
+ std::numeric_limits<double>::lowest();
+ constexpr uint tag_modulo = 1000000;
+
+ struct ClientInfo {
+ double reservation; // minimum
+ double weight; // proportional
+ double limit; // maximum
+
+ // multiplicative inverses of above, which we use in calculations
+ // and don't want to recalculate repeatedly
+ double reservation_inv;
+ double weight_inv;
+ double limit_inv;
+
+ // order parameters -- min, "normal", max
+ ClientInfo(double _reservation, double _weight, double _limit) :
+ reservation(_reservation),
+ weight(_weight),
+ limit(_limit),
+ reservation_inv(0.0 == reservation ? 0.0 : 1.0 / reservation),
+ weight_inv( 0.0 == weight ? 0.0 : 1.0 / weight),
+ limit_inv( 0.0 == limit ? 0.0 : 1.0 / limit)
+ {
+ // empty
+ }
+
+
+ friend std::ostream& operator<<(std::ostream& out,
+ const ClientInfo& client) {
+ out <<
+ "{ ClientInfo:: r:" << client.reservation <<
+ " w:" << std::fixed << client.weight <<
+ " l:" << std::fixed << client.limit <<
+ " 1/r:" << std::fixed << client.reservation_inv <<
+ " 1/w:" << std::fixed << client.weight_inv <<
+ " 1/l:" << std::fixed << client.limit_inv <<
+ " }";
+ return out;
+ }
+ }; // class ClientInfo
+
+
+ struct RequestTag {
+ double reservation;
+ double proportion;
+ double limit;
+ bool ready; // true when within limit
+ Time arrival;
+
+ RequestTag(const RequestTag& prev_tag,
+ const ClientInfo& client,
+ const uint32_t delta,
+ const uint32_t rho,
+ const Time time,
+ const double cost = 0.0,
+ const double anticipation_timeout = 0.0) :
+ ready(false),
+ arrival(time)
+ {
+ Time max_time = time;
+ if (time - anticipation_timeout < prev_tag.arrival)
+ max_time -= anticipation_timeout;
+
+ reservation = cost + tag_calc(max_time,
+ prev_tag.reservation,
+ client.reservation_inv,
+ rho,
+ true);
+ proportion = tag_calc(max_time,
+ prev_tag.proportion,
+ client.weight_inv,
+ delta,
+ true);
+ limit = tag_calc(max_time,
+ prev_tag.limit,
+ client.limit_inv,
+ delta,
+ false);
+
+ assert(reservation < max_tag || proportion < max_tag);
+ }
+
+ RequestTag(const RequestTag& prev_tag,
+ const ClientInfo& client,
+ const ReqParams req_params,
+ const Time time,
+ const double cost = 0.0,
+ const double anticipation_timeout = 0.0) :
+ RequestTag(prev_tag, client, req_params.delta, req_params.rho, time,
+ cost, anticipation_timeout)
+ { /* empty */ }
+
+ RequestTag(double _res, double _prop, double _lim, const Time _arrival) :
+ reservation(_res),
+ proportion(_prop),
+ limit(_lim),
+ ready(false),
+ arrival(_arrival)
+ {
+ assert(reservation < max_tag || proportion < max_tag);
+ }
+
+ RequestTag(const RequestTag& other) :
+ reservation(other.reservation),
+ proportion(other.proportion),
+ limit(other.limit),
+ ready(other.ready),
+ arrival(other.arrival)
+ {
+ // empty
+ }
+
+ static std::string format_tag_change(double before, double after) {
+ if (before == after) {
+ return std::string("same");
+ } else {
+ std::stringstream ss;
+ ss << format_tag(before) << "=>" << format_tag(after);
+ return ss.str();
+ }
+ }
+
+ static std::string format_tag(double value) {
+ if (max_tag == value) {
+ return std::string("max");
+ } else if (min_tag == value) {
+ return std::string("min");
+ } else {
+ return format_time(value, tag_modulo);
+ }
+ }
+
+ private:
+
+ static double tag_calc(const Time time,
+ double prev,
+ double increment,
+ uint32_t dist_req_val,
+ bool extreme_is_high) {
+ if (0.0 == increment) {
+ return extreme_is_high ? max_tag : min_tag;
+ } else {
+ if (0 != dist_req_val) {
+ increment *= dist_req_val;
+ }
+ return std::max(time, prev + increment);
+ }
+ }
+
+ friend std::ostream& operator<<(std::ostream& out,
+ const RequestTag& tag) {
+ out <<
+ "{ RequestTag:: ready:" << (tag.ready ? "true" : "false") <<
+ " r:" << format_tag(tag.reservation) <<
+ " p:" << format_tag(tag.proportion) <<
+ " l:" << format_tag(tag.limit) <<
+#if 0 // try to resolve this to make sure Time is operator<<'able.
+#ifndef DO_NOT_DELAY_TAG_CALC
+ " arrival:" << tag.arrival <<
+#endif
+#endif
+ " }";
+ return out;
+ }
+ }; // class RequestTag
+
+
+ // C is client identifier type, R is request type,
+ // U1 determines whether to use client information function dynamically,
+ // B is heap branching factor
+ template<typename C, typename R, bool U1, uint B>
+ class PriorityQueueBase {
+ // we don't want to include gtest.h just for FRIEND_TEST
+ friend class dmclock_server_client_idle_erase_Test;
+
+ public:
+
+ using RequestRef = std::unique_ptr<R>;
+
+ protected:
+
+ using TimePoint = decltype(std::chrono::steady_clock::now());
+ using Duration = std::chrono::milliseconds;
+ using MarkPoint = std::pair<TimePoint,Counter>;
+
+ enum class ReadyOption {ignore, lowers, raises};
+
+ // forward decl for friend decls
+ template<double RequestTag::*, ReadyOption, bool>
+ struct ClientCompare;
+
+ class ClientReq {
+ friend PriorityQueueBase;
+
+ RequestTag tag;
+ C client_id;
+ RequestRef request;
+
+ public:
+
+ ClientReq(const RequestTag& _tag,
+ const C& _client_id,
+ RequestRef&& _request) :
+ tag(_tag),
+ client_id(_client_id),
+ request(std::move(_request))
+ {
+ // empty
+ }
+
+ friend std::ostream& operator<<(std::ostream& out, const ClientReq& c) {
+ out << "{ ClientReq:: tag:" << c.tag << " client:" <<
+ c.client_id << " }";
+ return out;
+ }
+ }; // class ClientReq
+
+ public:
+
+ // NOTE: ClientRec is in the "public" section for compatibility
+ // with g++ 4.8.4, which complains if it's not. By g++ 6.3.1
+ // ClientRec could be "protected" with no issue. [See comments
+ // associated with function submit_top_request.]
+ class ClientRec {
+ friend PriorityQueueBase<C,R,U1,B>;
+
+ C client;
+ RequestTag prev_tag;
+ std::deque<ClientReq> requests;
+
+ // amount added from the proportion tag as a result of
+ // an idle client becoming unidle
+ double prop_delta = 0.0;
+
+ c::IndIntruHeapData reserv_heap_data {};
+ c::IndIntruHeapData lim_heap_data {};
+ c::IndIntruHeapData ready_heap_data {};
+#if USE_PROP_HEAP
+ c::IndIntruHeapData prop_heap_data {};
+#endif
+
+ public:
+
- const ClientInfo& _info,
++ const ClientInfo* info;
+ bool idle;
+ Counter last_tick;
+ uint32_t cur_rho;
+ uint32_t cur_delta;
+
+ ClientRec(C _client,
- using ClientInfoFunc = std::function<ClientInfo(const C&)>;
++ const ClientInfo* _info,
+ Counter current_tick) :
+ client(_client),
+ prev_tag(0.0, 0.0, 0.0, TimeZero),
+ info(_info),
+ idle(true),
+ last_tick(current_tick),
+ cur_rho(1),
+ cur_delta(1)
+ {
+ // empty
+ }
+
+ inline const RequestTag& get_req_tag() const {
+ return prev_tag;
+ }
+
+ static inline void assign_unpinned_tag(double& lhs, const double rhs) {
+ if (rhs != max_tag && rhs != min_tag) {
+ lhs = rhs;
+ }
+ }
+
+ inline void update_req_tag(const RequestTag& _prev,
+ const Counter& _tick) {
+ assign_unpinned_tag(prev_tag.reservation, _prev.reservation);
+ assign_unpinned_tag(prev_tag.limit, _prev.limit);
+ assign_unpinned_tag(prev_tag.proportion, _prev.proportion);
+ prev_tag.arrival = _prev.arrival;
+ last_tick = _tick;
+ }
+
+ inline void add_request(const RequestTag& tag,
+ const C& client_id,
+ RequestRef&& request) {
+ requests.emplace_back(ClientReq(tag, client_id, std::move(request)));
+ }
+
+ inline const ClientReq& next_request() const {
+ return requests.front();
+ }
+
+ inline ClientReq& next_request() {
+ return requests.front();
+ }
+
+ inline void pop_request() {
+ requests.pop_front();
+ }
+
+ inline bool has_request() const {
+ return !requests.empty();
+ }
+
+ inline size_t request_count() const {
+ return requests.size();
+ }
+
+ // NB: because a deque is the underlying structure, this
+ // operation might be expensive
+ bool remove_by_req_filter_fw(std::function<bool(R&&)> filter_accum) {
+ bool any_removed = false;
+ for (auto i = requests.begin();
+ i != requests.end();
+ /* no inc */) {
+ if (filter_accum(std::move(*i->request))) {
+ any_removed = true;
+ i = requests.erase(i);
+ } else {
+ ++i;
+ }
+ }
+ return any_removed;
+ }
+
+ // NB: because a deque is the underlying structure, this
+ // operation might be expensive
+ bool remove_by_req_filter_bw(std::function<bool(R&&)> filter_accum) {
+ bool any_removed = false;
+ for (auto i = requests.rbegin();
+ i != requests.rend();
+ /* no inc */) {
+ if (filter_accum(std::move(*i->request))) {
+ any_removed = true;
+ i = decltype(i){ requests.erase(std::next(i).base()) };
+ } else {
+ ++i;
+ }
+ }
+ return any_removed;
+ }
+
+ inline bool
+ remove_by_req_filter(std::function<bool(R&&)> filter_accum,
+ bool visit_backwards) {
+ if (visit_backwards) {
+ return remove_by_req_filter_bw(filter_accum);
+ } else {
+ return remove_by_req_filter_fw(filter_accum);
+ }
+ }
+
+ friend std::ostream&
+ operator<<(std::ostream& out,
+ const typename PriorityQueueBase<C,R,U1,B>::ClientRec& e) {
+ out << "{ ClientRec::" <<
+ " client:" << e.client <<
+ " prev_tag:" << e.prev_tag <<
+ " req_count:" << e.requests.size() <<
+ " top_req:";
+ if (e.has_request()) {
+ out << e.next_request();
+ } else {
+ out << "none";
+ }
+ out << " }";
+
+ return out;
+ }
+ }; // class ClientRec
+
+ using ClientRecRef = std::shared_ptr<ClientRec>;
+
+ // when we try to get the next request, we'll be in one of three
+ // situations -- we'll have one to return, have one that can
+ // fire in the future, or not have any
+ enum class NextReqType { returning, future, none };
+
+ // specifies which queue next request will get popped from
+ enum class HeapId { reservation, ready };
+
+ // this is returned from next_req to tell the caller the situation
+ struct NextReq {
+ NextReqType type;
+ union {
+ HeapId heap_id;
+ Time when_ready;
+ };
+
+ inline explicit NextReq() :
+ type(NextReqType::none)
+ { }
+
+ inline NextReq(HeapId _heap_id) :
+ type(NextReqType::returning),
+ heap_id(_heap_id)
+ { }
+
+ inline NextReq(Time _when_ready) :
+ type(NextReqType::future),
+ when_ready(_when_ready)
+ { }
+
+ // calls to this are clearer than calls to the default
+ // constructor
+ static inline NextReq none() {
+ return NextReq();
+ }
+ };
+
+
+ // a function that can be called to look up client information
- inline const ClientInfo get_cli_info(ClientRec& client) const {
++ using ClientInfoFunc = std::function<const ClientInfo*(const C&)>;
+
+
+ bool empty() const {
+ DataGuard g(data_mtx);
+ return (resv_heap.empty() || ! resv_heap.top().has_request());
+ }
+
+
+ size_t client_count() const {
+ DataGuard g(data_mtx);
+ return resv_heap.size();
+ }
+
+
+ size_t request_count() const {
+ DataGuard g(data_mtx);
+ size_t total = 0;
+ for (auto i = resv_heap.cbegin(); i != resv_heap.cend(); ++i) {
+ total += i->request_count();
+ }
+ return total;
+ }
+
+
+ bool remove_by_req_filter(std::function<bool(R&&)> filter_accum,
+ bool visit_backwards = false) {
+ bool any_removed = false;
+ DataGuard g(data_mtx);
+ for (auto i : client_map) {
+ bool modified =
+ i.second->remove_by_req_filter(filter_accum, visit_backwards);
+ if (modified) {
+ resv_heap.adjust(*i.second);
+ limit_heap.adjust(*i.second);
+ ready_heap.adjust(*i.second);
+#if USE_PROP_HEAP
+ prop_heap.adjust(*i.second);
+#endif
+ any_removed = true;
+ }
+ }
+ return any_removed;
+ }
+
+
+ // use as a default value when no accumulator is provide
+ static void request_sink(R&& req) {
+ // do nothing
+ }
+
+
+ void remove_by_client(const C& client,
+ bool reverse = false,
+ std::function<void (R&&)> accum = request_sink) {
+ DataGuard g(data_mtx);
+
+ auto i = client_map.find(client);
+
+ if (i == client_map.end()) return;
+
+ if (reverse) {
+ for (auto j = i->second->requests.rbegin();
+ j != i->second->requests.rend();
+ ++j) {
+ accum(std::move(*j->request));
+ }
+ } else {
+ for (auto j = i->second->requests.begin();
+ j != i->second->requests.end();
+ ++j) {
+ accum(std::move(*j->request));
+ }
+ }
+
+ i->second->requests.clear();
+
+ resv_heap.adjust(*i->second);
+ limit_heap.adjust(*i->second);
+ ready_heap.adjust(*i->second);
+#if USE_PROP_HEAP
+ prop_heap.adjust(*i->second);
+#endif
+ }
+
+
+ uint get_heap_branching_factor() const {
+ return B;
+ }
+
+
+ void update_client_info(const C& client_id) {
+ DataGuard g(data_mtx);
+ auto client_it = client_map.find(client_id);
+ if (client_map.end() != client_it) {
+ ClientRec& client = (*client_it->second);
+ client.info = client_info_f(client_id);
+ }
+ }
+
+
+ void update_client_infos() {
+ DataGuard g(data_mtx);
+ for (auto i : client_map) {
+ i.second->info = client_info_f(i.second->client);
+ }
+ }
+
+
+ friend std::ostream& operator<<(std::ostream& out,
+ const PriorityQueueBase& q) {
+ std::lock_guard<decltype(q.data_mtx)> guard(q.data_mtx);
+
+ out << "{ PriorityQueue::";
+ for (const auto& c : q.client_map) {
+ out << " { client:" << c.first << ", record:" << *c.second <<
+ " }";
+ }
+ if (!q.resv_heap.empty()) {
+ const auto& resv = q.resv_heap.top();
+ out << " { reservation_top:" << resv << " }";
+ const auto& ready = q.ready_heap.top();
+ out << " { ready_top:" << ready << " }";
+ const auto& limit = q.limit_heap.top();
+ out << " { limit_top:" << limit << " }";
+ } else {
+ out << " HEAPS-EMPTY";
+ }
+ out << " }";
+
+ return out;
+ }
+
+ // for debugging
+ void display_queues(std::ostream& out,
+ bool show_res = true,
+ bool show_lim = true,
+ bool show_ready = true,
+ bool show_prop = true) const {
+ auto filter = [](const ClientRec& e)->bool { return true; };
+ DataGuard g(data_mtx);
+ if (show_res) {
+ resv_heap.display_sorted(out << "RESER:", filter);
+ }
+ if (show_lim) {
+ limit_heap.display_sorted(out << "LIMIT:", filter);
+ }
+ if (show_ready) {
+ ready_heap.display_sorted(out << "READY:", filter);
+ }
+#if USE_PROP_HEAP
+ if (show_prop) {
+ prop_heap.display_sorted(out << "PROPO:", filter);
+ }
+#endif
+ } // display_queues
+
+
+ protected:
+
+ // The ClientCompare functor is essentially doing a precedes?
+ // operator, returning true if and only if the first parameter
+ // must precede the second parameter. If the second must precede
+ // the first, or if they are equivalent, false should be
+ // returned. The reason for this behavior is that it will be
+ // called to test if two items are out of order and if true is
+ // returned it will reverse the items. Therefore false is the
+ // default return when it doesn't matter to prevent unnecessary
+ // re-ordering.
+ //
+ // The template is supporting variations in sorting based on the
+ // heap in question and allowing these variations to be handled
+ // at compile-time.
+ //
+ // tag_field determines which tag is being used for comparison
+ //
+ // ready_opt determines how the ready flag influences the sort
+ //
+ // use_prop_delta determines whether the proportional delta is
+ // added in for comparison
+ template<double RequestTag::*tag_field,
+ ReadyOption ready_opt,
+ bool use_prop_delta>
+ struct ClientCompare {
+ bool operator()(const ClientRec& n1, const ClientRec& n2) const {
+ if (n1.has_request()) {
+ if (n2.has_request()) {
+ const auto& t1 = n1.next_request().tag;
+ const auto& t2 = n2.next_request().tag;
+ if (ReadyOption::ignore == ready_opt || t1.ready == t2.ready) {
+ // if we don't care about ready or the ready values are the same
+ if (use_prop_delta) {
+ return (t1.*tag_field + n1.prop_delta) <
+ (t2.*tag_field + n2.prop_delta);
+ } else {
+ return t1.*tag_field < t2.*tag_field;
+ }
+ } else if (ReadyOption::raises == ready_opt) {
+ // use_ready == true && the ready fields are different
+ return t1.ready;
+ } else {
+ return t2.ready;
+ }
+ } else {
+ // n1 has request but n2 does not
+ return true;
+ }
+ } else if (n2.has_request()) {
+ // n2 has request but n1 does not
+ return false;
+ } else {
+ // both have none; keep stable w false
+ return false;
+ }
+ }
+ };
+
+ ClientInfoFunc client_info_f;
+ static constexpr bool is_dynamic_cli_info_f = U1;
+
+ mutable std::mutex data_mtx;
+ using DataGuard = std::lock_guard<decltype(data_mtx)>;
+
+ // stable mapping between client ids and client queues
+ std::map<C,ClientRecRef> client_map;
+
+ c::IndIntruHeap<ClientRecRef,
+ ClientRec,
+ &ClientRec::reserv_heap_data,
+ ClientCompare<&RequestTag::reservation,
+ ReadyOption::ignore,
+ false>,
+ B> resv_heap;
+#if USE_PROP_HEAP
+ c::IndIntruHeap<ClientRecRef,
+ ClientRec,
+ &ClientRec::prop_heap_data,
+ ClientCompare<&RequestTag::proportion,
+ ReadyOption::ignore,
+ true>,
+ B> prop_heap;
+#endif
+ c::IndIntruHeap<ClientRecRef,
+ ClientRec,
+ &ClientRec::lim_heap_data,
+ ClientCompare<&RequestTag::limit,
+ ReadyOption::lowers,
+ false>,
+ B> limit_heap;
+ c::IndIntruHeap<ClientRecRef,
+ ClientRec,
+ &ClientRec::ready_heap_data,
+ ClientCompare<&RequestTag::proportion,
+ ReadyOption::raises,
+ true>,
+ B> ready_heap;
+
+ // if all reservations are met and all other requestes are under
+ // limit, this will allow the request next in terms of
+ // proportion to still get issued
+ bool allow_limit_break;
+ double anticipation_timeout;
+
+ std::atomic_bool finishing;
+
+ // every request creates a tick
+ Counter tick = 0;
+
+ // performance data collection
+ size_t reserv_sched_count = 0;
+ size_t prop_sched_count = 0;
+ size_t limit_break_sched_count = 0;
+
+ Duration idle_age;
+ Duration erase_age;
+ Duration check_time;
+ std::deque<MarkPoint> clean_mark_points;
+
+ // NB: All threads declared at end, so they're destructed first!
+
+ std::unique_ptr<RunEvery> cleaning_job;
+
+
+ // COMMON constructor that others feed into; we can accept three
+ // different variations of durations
+ template<typename Rep, typename Per>
+ PriorityQueueBase(ClientInfoFunc _client_info_f,
+ std::chrono::duration<Rep,Per> _idle_age,
+ std::chrono::duration<Rep,Per> _erase_age,
+ std::chrono::duration<Rep,Per> _check_time,
+ bool _allow_limit_break,
+ double _anticipation_timeout) :
+ client_info_f(_client_info_f),
+ allow_limit_break(_allow_limit_break),
+ anticipation_timeout(_anticipation_timeout),
+ finishing(false),
+ idle_age(std::chrono::duration_cast<Duration>(_idle_age)),
+ erase_age(std::chrono::duration_cast<Duration>(_erase_age)),
+ check_time(std::chrono::duration_cast<Duration>(_check_time))
+ {
+ assert(_erase_age >= _idle_age);
+ assert(_check_time < _idle_age);
+ cleaning_job =
+ std::unique_ptr<RunEvery>(
+ new RunEvery(check_time,
+ std::bind(&PriorityQueueBase::do_clean, this)));
+ }
+
+
+ ~PriorityQueueBase() {
+ finishing = true;
+ }
+
+
- ClientInfo info = client_info_f(client_id);
++ inline const ClientInfo* get_cli_info(ClientRec& client) const {
+ if (is_dynamic_cli_info_f) {
+ client.info = client_info_f(client.client);
+ }
+ return client.info;
+ }
+
+
+ // data_mtx must be held by caller
+ void do_add_request(RequestRef&& request,
+ const C& client_id,
+ const ReqParams& req_params,
+ const Time time,
+ const double cost = 0.0) {
+ ++tick;
+
+ // this pointer will help us create a reference to a shared
+ // pointer, no matter which of two codepaths we take
+ ClientRec* temp_client;
+
+ auto client_it = client_map.find(client_id);
+ if (client_map.end() != client_it) {
+ temp_client = &(*client_it->second); // address of obj of shared_ptr
+ } else {
- get_cli_info(client),
++ const ClientInfo* info = client_info_f(client_id);
+ ClientRecRef client_rec =
+ std::make_shared<ClientRec>(client_id, info, tick);
+ resv_heap.push(client_rec);
+#if USE_PROP_HEAP
+ prop_heap.push(client_rec);
+#endif
+ limit_heap.push(client_rec);
+ ready_heap.push(client_rec);
+ client_map[client_id] = client_rec;
+ temp_client = &(*client_rec); // address of obj of shared_ptr
+ }
+
+ // for convenience, we'll create a reference to the shared pointer
+ ClientRec& client = *temp_client;
+
+ if (client.idle) {
+ // We need to do an adjustment so that idle clients compete
+ // fairly on proportional tags since those tags may have
+ // drifted from real-time. Either use the lowest existing
+ // proportion tag -- O(1) -- or the client with the lowest
+ // previous proportion tag -- O(n) where n = # clients.
+ //
+ // So we don't have to maintain a propotional queue that
+ // keeps the minimum on proportional tag alone (we're
+ // instead using a ready queue), we'll have to check each
+ // client.
+ //
+ // The alternative would be to maintain a proportional queue
+ // (define USE_PROP_TAG) and do an O(1) operation here.
+
+ // Was unable to confirm whether equality testing on
+ // std::numeric_limits<double>::max() is guaranteed, so
+ // we'll use a compile-time calculated trigger that is one
+ // third the max, which should be much larger than any
+ // expected organic value.
+ constexpr double lowest_prop_tag_trigger =
+ std::numeric_limits<double>::max() / 3.0;
+
+ double lowest_prop_tag = std::numeric_limits<double>::max();
+ for (auto const &c : client_map) {
+ // don't use ourselves (or anything else that might be
+ // listed as idle) since we're now in the map
+ if (!c.second->idle) {
+ double p;
+ // use either lowest proportion tag or previous proportion tag
+ if (c.second->has_request()) {
+ p = c.second->next_request().tag.proportion +
+ c.second->prop_delta;
+ } else {
+ p = c.second->get_req_tag().proportion + c.second->prop_delta;
+ }
+
+ if (p < lowest_prop_tag) {
+ lowest_prop_tag = p;
+ }
+ }
+ }
+
+ // if this conditional does not fire, it
+ if (lowest_prop_tag < lowest_prop_tag_trigger) {
+ client.prop_delta = lowest_prop_tag - time;
+ }
+ client.idle = false;
+ } // if this client was idle
+
+#ifndef DO_NOT_DELAY_TAG_CALC
+ RequestTag tag(0, 0, 0, time);
+
+ if (!client.has_request()) {
++ const ClientInfo* client_info = get_cli_info(client);
++ assert(client_info);
+ tag = RequestTag(client.get_req_tag(),
- get_cli_info(client),
++ *client_info,
+ req_params,
+ time,
+ cost,
+ anticipation_timeout);
+
+ // copy tag to previous tag for client
+ client.update_req_tag(tag, tick);
+ }
+#else
++ const ClientInfo* client_info = get_cli_info(client);
++ assert(client_info);
+ RequestTag tag(client.get_req_tag(),
- next_first.tag = RequestTag(tag, get_cli_info(top),
++ *client_info,
+ req_params,
+ time,
+ cost,
+ anticipation_timeout);
+
+ // copy tag to previous tag for client
+ client.update_req_tag(tag, tick);
+#endif
+
+ client.add_request(tag, client.client, std::move(request));
+ if (1 == client.requests.size()) {
+ // NB: can the following 4 calls to adjust be changed
+ // promote? Can adding a request ever demote a client in the
+ // heaps?
+ resv_heap.adjust(client);
+ limit_heap.adjust(client);
+ ready_heap.adjust(client);
+#if USE_PROP_HEAP
+ prop_heap.adjust(client);
+#endif
+ }
+
+ client.cur_rho = req_params.rho;
+ client.cur_delta = req_params.delta;
+
+ resv_heap.adjust(client);
+ limit_heap.adjust(client);
+ ready_heap.adjust(client);
+#if USE_PROP_HEAP
+ prop_heap.adjust(client);
+#endif
+ } // add_request
+
+
+ // data_mtx should be held when called; top of heap should have
+ // a ready request
+ template<typename C1, IndIntruHeapData ClientRec::*C2, typename C3>
+ void pop_process_request(IndIntruHeap<C1, ClientRec, C2, C3, B>& heap,
+ std::function<void(const C& client,
+ RequestRef& request)> process) {
+ // gain access to data
+ ClientRec& top = heap.top();
+
+ RequestRef request = std::move(top.next_request().request);
+#ifndef DO_NOT_DELAY_TAG_CALC
+ RequestTag tag = top.next_request().tag;
+#endif
+
+ // pop request and adjust heaps
+ top.pop_request();
+
+#ifndef DO_NOT_DELAY_TAG_CALC
+ if (top.has_request()) {
+ ClientReq& next_first = top.next_request();
- r.tag.reservation -= client.info.reservation_inv;
++ const ClientInfo* client_info = get_cli_info(top);
++ assert(client_info);
++ next_first.tag = RequestTag(tag, *client_info,
+ top.cur_delta, top.cur_rho,
+ next_first.tag.arrival,
+ 0.0, anticipation_timeout);
+
+ // copy tag to previous tag for client
+ top.update_req_tag(next_first.tag, tick);
+ }
+#endif
+
+ resv_heap.demote(top);
+ limit_heap.adjust(top);
+#if USE_PROP_HEAP
+ prop_heap.demote(top);
+#endif
+ ready_heap.demote(top);
+
+ // process
+ process(top.client, request);
+ } // pop_process_request
+
+
+ // data_mtx should be held when called
+ void reduce_reservation_tags(ClientRec& client) {
+ for (auto& r : client.requests) {
- client.prev_tag.reservation -= client.info.reservation_inv;
++ r.tag.reservation -= client.info->reservation_inv;
+
+#ifndef DO_NOT_DELAY_TAG_CALC
+ // reduce only for front tag. because next tags' value are invalid
+ break;
+#endif
+ }
+ // don't forget to update previous tag
++ client.prev_tag.reservation -= client.info->reservation_inv;
+ resv_heap.promote(client);
+ }
+
+
+ // data_mtx should be held when called
+ void reduce_reservation_tags(const C& client_id) {
+ auto client_it = client_map.find(client_id);
+
+ // means the client was cleaned from map; should never happen
+ // as long as cleaning times are long enough
+ assert(client_map.end() != client_it);
+ reduce_reservation_tags(*client_it->second);
+ }
+
+
+ // data_mtx should be held when called
+ NextReq do_next_request(Time now) {
+ // if reservation queue is empty, all are empty (i.e., no
+ // active clients)
+ if(resv_heap.empty()) {
+ return NextReq::none();
+ }
+
+ // try constraint (reservation) based scheduling
+
+ auto& reserv = resv_heap.top();
+ if (reserv.has_request() &&
+ reserv.next_request().tag.reservation <= now) {
+ return NextReq(HeapId::reservation);
+ }
+
+ // no existing reservations before now, so try weight-based
+ // scheduling
+
+ // all items that are within limit are eligible based on
+ // priority
+ auto limits = &limit_heap.top();
+ while (limits->has_request() &&
+ !limits->next_request().tag.ready &&
+ limits->next_request().tag.limit <= now) {
+ limits->next_request().tag.ready = true;
+ ready_heap.promote(*limits);
+ limit_heap.demote(*limits);
+
+ limits = &limit_heap.top();
+ }
+
+ auto& readys = ready_heap.top();
+ if (readys.has_request() &&
+ readys.next_request().tag.ready &&
+ readys.next_request().tag.proportion < max_tag) {
+ return NextReq(HeapId::ready);
+ }
+
+ // if nothing is schedulable by reservation or
+ // proportion/weight, and if we allow limit break, try to
+ // schedule something with the lowest proportion tag or
+ // alternatively lowest reservation tag.
+ if (allow_limit_break) {
+ if (readys.has_request() &&
+ readys.next_request().tag.proportion < max_tag) {
+ return NextReq(HeapId::ready);
+ } else if (reserv.has_request() &&
+ reserv.next_request().tag.reservation < max_tag) {
+ return NextReq(HeapId::reservation);
+ }
+ }
+
+ // nothing scheduled; make sure we re-run when next
+ // reservation item or next limited item comes up
+
+ Time next_call = TimeMax;
+ if (resv_heap.top().has_request()) {
+ next_call =
+ min_not_0_time(next_call,
+ resv_heap.top().next_request().tag.reservation);
+ }
+ if (limit_heap.top().has_request()) {
+ const auto& next = limit_heap.top().next_request();
+ assert(!next.tag.ready || max_tag == next.tag.proportion);
+ next_call = min_not_0_time(next_call, next.tag.limit);
+ }
+ if (next_call < TimeMax) {
+ return NextReq(next_call);
+ } else {
+ return NextReq::none();
+ }
+ } // do_next_request
+
+
+ // if possible is not zero and less than current then return it;
+ // otherwise return current; the idea is we're trying to find
+ // the minimal time but ignoring zero
+ static inline const Time& min_not_0_time(const Time& current,
+ const Time& possible) {
+ return TimeZero == possible ? current : std::min(current, possible);
+ }
+
+
+ /*
+ * This is being called regularly by RunEvery. Every time it's
+ * called it notes the time and delta counter (mark point) in a
+ * deque. It also looks at the deque to find the most recent
+ * mark point that is older than clean_age. It then walks the
+ * map and delete all server entries that were last used before
+ * that mark point.
+ */
+ void do_clean() {
+ TimePoint now = std::chrono::steady_clock::now();
+ DataGuard g(data_mtx);
+ clean_mark_points.emplace_back(MarkPoint(now, tick));
+
+ // first erase the super-old client records
+
+ Counter erase_point = 0;
+ auto point = clean_mark_points.front();
+ while (point.first <= now - erase_age) {
+ erase_point = point.second;
+ clean_mark_points.pop_front();
+ point = clean_mark_points.front();
+ }
+
+ Counter idle_point = 0;
+ for (auto i : clean_mark_points) {
+ if (i.first <= now - idle_age) {
+ idle_point = i.second;
+ } else {
+ break;
+ }
+ }
+
+ if (erase_point > 0 || idle_point > 0) {
+ for (auto i = client_map.begin(); i != client_map.end(); /* empty */) {
+ auto i2 = i++;
+ if (erase_point && i2->second->last_tick <= erase_point) {
+ delete_from_heaps(i2->second);
+ client_map.erase(i2);
+ } else if (idle_point && i2->second->last_tick <= idle_point) {
+ i2->second->idle = true;
+ }
+ } // for
+ } // if
+ } // do_clean
+
+
+ // data_mtx must be held by caller
+ template<IndIntruHeapData ClientRec::*C1,typename C2>
+ void delete_from_heap(ClientRecRef& client,
+ c::IndIntruHeap<ClientRecRef,ClientRec,C1,C2,B>& heap) {
+ auto i = heap.rfind(client);
+ heap.remove(i);
+ }
+
+
+ // data_mtx must be held by caller
+ void delete_from_heaps(ClientRecRef& client) {
+ delete_from_heap(client, resv_heap);
+#if USE_PROP_HEAP
+ delete_from_heap(client, prop_heap);
+#endif
+ delete_from_heap(client, limit_heap);
+ delete_from_heap(client, ready_heap);
+ }
+ }; // class PriorityQueueBase
+
+
+ template<typename C, typename R, bool U1=false, uint B=2>
+ class PullPriorityQueue : public PriorityQueueBase<C,R,U1,B> {
+ using super = PriorityQueueBase<C,R,U1,B>;
+
+ public:
+
+ // When a request is pulled, this is the return type.
+ struct PullReq {
+ struct Retn {
+ C client;
+ typename super::RequestRef request;
+ PhaseType phase;
+ };
+
+ typename super::NextReqType type;
+ boost::variant<Retn,Time> data;
+
+ bool is_none() const { return type == super::NextReqType::none; }
+
+ bool is_retn() const { return type == super::NextReqType::returning; }
+ Retn& get_retn() {
+ return boost::get<Retn>(data);
+ }
+
+ bool is_future() const { return type == super::NextReqType::future; }
+ Time getTime() const { return boost::get<Time>(data); }
+ };
+
+
+#ifdef PROFILE
+ ProfileTimer<std::chrono::nanoseconds> pull_request_timer;
+ ProfileTimer<std::chrono::nanoseconds> add_request_timer;
+#endif
+
+ template<typename Rep, typename Per>
+ PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
+ std::chrono::duration<Rep,Per> _idle_age,
+ std::chrono::duration<Rep,Per> _erase_age,
+ std::chrono::duration<Rep,Per> _check_time,
+ bool _allow_limit_break = false,
+ double _anticipation_timeout = 0.0) :
+ super(_client_info_f,
+ _idle_age, _erase_age, _check_time,
+ _allow_limit_break, _anticipation_timeout)
+ {
+ // empty
+ }
+
+
+ // pull convenience constructor
+ PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
+ bool _allow_limit_break = false) :
+ PullPriorityQueue(_client_info_f,
+ std::chrono::minutes(10),
+ std::chrono::minutes(15),
+ std::chrono::minutes(6),
+ _allow_limit_break)
+ {
+ // empty
+ }
+
+
+ inline void add_request(R&& request,
+ const C& client_id,
+ const ReqParams& req_params,
+ double addl_cost = 0.0) {
+ add_request(typename super::RequestRef(new R(std::move(request))),
+ client_id,
+ req_params,
+ get_time(),
+ addl_cost);
+ }
+
+
+ inline void add_request(R&& request,
+ const C& client_id,
+ double addl_cost = 0.0) {
+ static const ReqParams null_req_params;
+ add_request(typename super::RequestRef(new R(std::move(request))),
+ client_id,
+ null_req_params,
+ get_time(),
+ addl_cost);
+ }
+
+
+
+ inline void add_request_time(R&& request,
+ const C& client_id,
+ const ReqParams& req_params,
+ const Time time,
+ double addl_cost = 0.0) {
+ add_request(typename super::RequestRef(new R(std::move(request))),
+ client_id,
+ req_params,
+ time,
+ addl_cost);
+ }
+
+
+ inline void add_request(typename super::RequestRef&& request,
+ const C& client_id,
+ const ReqParams& req_params,
+ double addl_cost = 0.0) {
+ add_request(request, req_params, client_id, get_time(), addl_cost);
+ }
+
+
+ inline void add_request(typename super::RequestRef&& request,
+ const C& client_id,
+ double addl_cost = 0.0) {
+ static const ReqParams null_req_params;
+ add_request(request, null_req_params, client_id, get_time(), addl_cost);
+ }
+
+
+ // this does the work; the versions above provide alternate interfaces
+ void add_request(typename super::RequestRef&& request,
+ const C& client_id,
+ const ReqParams& req_params,
+ const Time time,
+ double addl_cost = 0.0) {
+ typename super::DataGuard g(this->data_mtx);
+#ifdef PROFILE
+ add_request_timer.start();
+#endif
+ super::do_add_request(std::move(request),
+ client_id,
+ req_params,
+ time,
+ addl_cost);
+ // no call to schedule_request for pull version
+#ifdef PROFILE
+ add_request_timer.stop();
+#endif
+ }
+
+
+ inline PullReq pull_request() {
+ return pull_request(get_time());
+ }
+
+
+ PullReq pull_request(Time now) {
+ PullReq result;
+ typename super::DataGuard g(this->data_mtx);
+#ifdef PROFILE
+ pull_request_timer.start();
+#endif
+
+ typename super::NextReq next = super::do_next_request(now);
+ result.type = next.type;
+ switch(next.type) {
+ case super::NextReqType::none:
+ return result;
+ case super::NextReqType::future:
+ result.data = next.when_ready;
+ return result;
+ case super::NextReqType::returning:
+ // to avoid nesting, break out and let code below handle this case
+ break;
+ default:
+ assert(false);
+ }
+
+ // we'll only get here if we're returning an entry
+
+ auto process_f =
+ [&] (PullReq& pull_result, PhaseType phase) ->
+ std::function<void(const C&,
+ typename super::RequestRef&)> {
+ return [&pull_result, phase](const C& client,
+ typename super::RequestRef& request) {
+ pull_result.data =
+ typename PullReq::Retn{client, std::move(request), phase};
+ };
+ };
+
+ switch(next.heap_id) {
+ case super::HeapId::reservation:
+ super::pop_process_request(this->resv_heap,
+ process_f(result, PhaseType::reservation));
+ ++this->reserv_sched_count;
+ break;
+ case super::HeapId::ready:
+ super::pop_process_request(this->ready_heap,
+ process_f(result, PhaseType::priority));
+ { // need to use retn temporarily
+ auto& retn = boost::get<typename PullReq::Retn>(result.data);
+ super::reduce_reservation_tags(retn.client);
+ }
+ ++this->prop_sched_count;
+ break;
+ default:
+ assert(false);
+ }
+
+#ifdef PROFILE
+ pull_request_timer.stop();
+#endif
+ return result;
+ } // pull_request
+
+
+ protected:
+
+
+ // data_mtx should be held when called; unfortunately this
+ // function has to be repeated in both push & pull
+ // specializations
+ typename super::NextReq next_request() {
+ return next_request(get_time());
+ }
+ }; // class PullPriorityQueue
+
+
+ // PUSH version
+ template<typename C, typename R, bool U1=false, uint B=2>
+ class PushPriorityQueue : public PriorityQueueBase<C,R,U1,B> {
+
+ protected:
+
+ using super = PriorityQueueBase<C,R,U1,B>;
+
+ public:
+
+ // a function to see whether the server can handle another request
+ using CanHandleRequestFunc = std::function<bool(void)>;
+
+ // a function to submit a request to the server; the second
+ // parameter is a callback when it's completed
+ using HandleRequestFunc =
+ std::function<void(const C&,typename super::RequestRef,PhaseType)>;
+
+ protected:
+
+ CanHandleRequestFunc can_handle_f;
+ HandleRequestFunc handle_f;
+ // for handling timed scheduling
+ std::mutex sched_ahead_mtx;
+ std::condition_variable sched_ahead_cv;
+ Time sched_ahead_when = TimeZero;
+
+#ifdef PROFILE
+ public:
+ ProfileTimer<std::chrono::nanoseconds> add_request_timer;
+ ProfileTimer<std::chrono::nanoseconds> request_complete_timer;
+ protected:
+#endif
+
+ // NB: threads declared last, so constructed last and destructed first
+
+ std::thread sched_ahead_thd;
+
+ public:
+
+ // push full constructor
+ template<typename Rep, typename Per>
+ PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
+ CanHandleRequestFunc _can_handle_f,
+ HandleRequestFunc _handle_f,
+ std::chrono::duration<Rep,Per> _idle_age,
+ std::chrono::duration<Rep,Per> _erase_age,
+ std::chrono::duration<Rep,Per> _check_time,
+ bool _allow_limit_break = false,
+ double anticipation_timeout = 0.0) :
+ super(_client_info_f,
+ _idle_age, _erase_age, _check_time,
+ _allow_limit_break, anticipation_timeout)
+ {
+ can_handle_f = _can_handle_f;
+ handle_f = _handle_f;
+ sched_ahead_thd = std::thread(&PushPriorityQueue::run_sched_ahead, this);
+ }
+
+
+ // push convenience constructor
+ PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
+ CanHandleRequestFunc _can_handle_f,
+ HandleRequestFunc _handle_f,
+ bool _allow_limit_break = false,
+ double _anticipation_timeout = 0.0) :
+ PushPriorityQueue(_client_info_f,
+ _can_handle_f,
+ _handle_f,
+ std::chrono::minutes(10),
+ std::chrono::minutes(15),
+ std::chrono::minutes(6),
+ _allow_limit_break,
+ _anticipation_timeout)
+ {
+ // empty
+ }
+
+
+ ~PushPriorityQueue() {
+ this->finishing = true;
+ sched_ahead_cv.notify_one();
+ sched_ahead_thd.join();
+ }
+
+ public:
+
+ inline void add_request(R&& request,
+ const C& client_id,
+ const ReqParams& req_params,
+ double addl_cost = 0.0) {
+ add_request(typename super::RequestRef(new R(std::move(request))),
+ client_id,
+ req_params,
+ get_time(),
+ addl_cost);
+ }
+
+
+ inline void add_request(typename super::RequestRef&& request,
+ const C& client_id,
+ const ReqParams& req_params,
+ double addl_cost = 0.0) {
+ add_request(request, req_params, client_id, get_time(), addl_cost);
+ }
+
+
+ inline void add_request_time(const R& request,
+ const C& client_id,
+ const ReqParams& req_params,
+ const Time time,
+ double addl_cost = 0.0) {
+ add_request(typename super::RequestRef(new R(request)),
+ client_id,
+ req_params,
+ time,
+ addl_cost);
+ }
+
+
+ void add_request(typename super::RequestRef&& request,
+ const C& client_id,
+ const ReqParams& req_params,
+ const Time time,
+ double addl_cost = 0.0) {
+ typename super::DataGuard g(this->data_mtx);
+#ifdef PROFILE
+ add_request_timer.start();
+#endif
+ super::do_add_request(std::move(request),
+ client_id,
+ req_params,
+ time,
+ addl_cost);
+ schedule_request();
+#ifdef PROFILE
+ add_request_timer.stop();
+#endif
+ }
+
+
+ void request_completed() {
+ typename super::DataGuard g(this->data_mtx);
+#ifdef PROFILE
+ request_complete_timer.start();
+#endif
+ schedule_request();
+#ifdef PROFILE
+ request_complete_timer.stop();
+#endif
+ }
+
+ protected:
+
+ // data_mtx should be held when called; furthermore, the heap
+ // should not be empty and the top element of the heap should
+ // not be already handled
+ //
+ // NOTE: the use of "super::ClientRec" in either the template
+ // construct or as a parameter to submit_top_request generated
+ // a compiler error in g++ 4.8.4, when ClientRec was
+ // "protected" rather than "public". By g++ 6.3.1 this was not
+ // an issue. But for backwards compatibility
+ // PriorityQueueBase::ClientRec is public.
+ template<typename C1,
+ IndIntruHeapData super::ClientRec::*C2,
+ typename C3,
+ uint B4>
+ C submit_top_request(IndIntruHeap<C1,typename super::ClientRec,C2,C3,B4>& heap,
+ PhaseType phase) {
+ C client_result;
+ super::pop_process_request(heap,
+ [this, phase, &client_result]
+ (const C& client,
+ typename super::RequestRef& request) {
+ client_result = client;
+ handle_f(client, std::move(request), phase);
+ });
+ return client_result;
+ }
+
+
+ // data_mtx should be held when called
+ void submit_request(typename super::HeapId heap_id) {
+ C client;
+ switch(heap_id) {
+ case super::HeapId::reservation:
+ // don't need to note client
+ (void) submit_top_request(this->resv_heap, PhaseType::reservation);
+ // unlike the other two cases, we do not reduce reservation
+ // tags here
+ ++this->reserv_sched_count;
+ break;
+ case super::HeapId::ready:
+ client = submit_top_request(this->ready_heap, PhaseType::priority);
+ super::reduce_reservation_tags(client);
+ ++this->prop_sched_count;
+ break;
+ default:
+ assert(false);
+ }
+ } // submit_request
+
+
+ // data_mtx should be held when called; unfortunately this
+ // function has to be repeated in both push & pull
+ // specializations
+ typename super::NextReq next_request() {
+ return next_request(get_time());
+ }
+
+
+ // data_mtx should be held when called; overrides member
+ // function in base class to add check for whether a request can
+ // be pushed to the server
+ typename super::NextReq next_request(Time now) {
+ if (!can_handle_f()) {
+ typename super::NextReq result;
+ result.type = super::NextReqType::none;
+ return result;
+ } else {
+ return super::do_next_request(now);
+ }
+ } // next_request
+
+
+ // data_mtx should be held when called
+ void schedule_request() {
+ typename super::NextReq next_req = next_request();
+ switch (next_req.type) {
+ case super::NextReqType::none:
+ return;
+ case super::NextReqType::future:
+ sched_at(next_req.when_ready);
+ break;
+ case super::NextReqType::returning:
+ submit_request(next_req.heap_id);
+ break;
+ default:
+ assert(false);
+ }
+ }
+
+
+ // this is the thread that handles running schedule_request at
+ // future times when nothing can be scheduled immediately
+ void run_sched_ahead() {
+ std::unique_lock<std::mutex> l(sched_ahead_mtx);
+
+ while (!this->finishing) {
+ if (TimeZero == sched_ahead_when) {
+ sched_ahead_cv.wait(l);
+ } else {
+ Time now;
+ while (!this->finishing && (now = get_time()) < sched_ahead_when) {
+ long microseconds_l = long(1 + 1000000 * (sched_ahead_when - now));
+ auto microseconds = std::chrono::microseconds(microseconds_l);
+ sched_ahead_cv.wait_for(l, microseconds);
+ }
+ sched_ahead_when = TimeZero;
+ if (this->finishing) return;
+
+ l.unlock();
+ if (!this->finishing) {
+ typename super::DataGuard g(this->data_mtx);
+ schedule_request();
+ }
+ l.lock();
+ }
+ }
+ }
+
+
+ void sched_at(Time when) {
+ std::lock_guard<std::mutex> l(sched_ahead_mtx);
+ if (this->finishing) return;
+ if (TimeZero == sched_ahead_when || when < sched_ahead_when) {
+ sched_ahead_when = when;
+ sched_ahead_cv.notify_one();
+ }
+ }
+ }; // class PushPriorityQueue
+
+ } // namespace dmclock
+} // namespace crimson
--- /dev/null
- auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
- if (client1 == c) return ci1;
- else if (client2 == c) return ci2;
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ */
+
+
+#include <memory>
+#include <chrono>
+#include <iostream>
+#include <list>
+#include <vector>
+
+
+#include "dmclock_server.h"
+#include "dmclock_util.h"
+#include "gtest/gtest.h"
+
+// process control to prevent core dumps during gtest death tests
+#include "dmcPrCtl.h"
+
+
+namespace dmc = crimson::dmclock;
+
+
+// we need a request object; an empty one will do
+struct Request {
+};
+
+
+namespace crimson {
+ namespace dmclock {
+
+ /*
+ * Allows us to test the code provided with the mutex provided locked.
+ */
+ static void test_locked(std::mutex& mtx, std::function<void()> code) {
+ std::unique_lock<std::mutex> l(mtx);
+ code();
+ }
+
+
+ TEST(dmclock_server, bad_tag_deathtest) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 17;
+ ClientId client2 = 18;
+
+ double reservation = 0.0;
+ double weight = 0.0;
+
+ dmc::ClientInfo ci1(reservation, weight, 0.0);
+ dmc::ClientInfo ci2(reservation, weight, 1.0);
+
- return ci1; // must return
++ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++ if (client1 == c) return &ci1;
++ else if (client2 == c) return &ci2;
+ else {
+ ADD_FAILURE() << "got request from neither of two clients";
- auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo { return ci; };
++ return nullptr;
+ }
+ };
+
+ QueueRef pq(new Queue(client_info_f, false));
+ ReqParams req_params(1,1);
+
+ // Disable coredumps
+ PrCtl unset_dumpable;
+
+ EXPECT_DEATH_IF_SUPPORTED(pq->add_request(Request{}, client1, req_params),
+ "Assertion.*reservation.*max_tag.*"
+ "proportion.*max_tag") <<
+ "we should fail if a client tries to generate a reservation tag "
+ "where reservation and proportion are both 0";
+
+
+ EXPECT_DEATH_IF_SUPPORTED(pq->add_request(Request{}, client2, req_params),
+ "Assertion.*reservation.*max_tag.*"
+ "proportion.*max_tag") <<
+ "we should fail if a client tries to generate a reservation tag "
+ "where reservation and proportion are both 0";
+ }
+
+
+ TEST(dmclock_server, client_idle_erase) {
+ using ClientId = int;
+ using Queue = dmc::PushPriorityQueue<ClientId,Request>;
+ int client = 17;
+ double reservation = 100.0;
+
+ dmc::ClientInfo ci(reservation, 1.0, 0.0);
- auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo { return ci; };
++ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++ return &ci;
++ };
+ auto server_ready_f = [] () -> bool { return true; };
+ auto submit_req_f = [] (const ClientId& c,
+ std::unique_ptr<Request> req,
+ dmc::PhaseType phase) {
+ // empty; do nothing
+ };
+
+ Queue pq(client_info_f,
+ server_ready_f,
+ submit_req_f,
+ std::chrono::seconds(3),
+ std::chrono::seconds(5),
+ std::chrono::seconds(2),
+ false);
+
+ auto lock_pq = [&](std::function<void()> code) {
+ test_locked(pq.data_mtx, code);
+ };
+
+
+ /* The timeline should be as follows:
+ *
+ * 0 seconds : request created
+ *
+ * 1 seconds : map is size 1, idle is false
+ *
+ * 2 seconds : clean notes first mark; +2 is base for further calcs
+ *
+ * 4 seconds : clean does nothing except makes another mark
+ *
+ * 5 seconds : when we're secheduled to idle (+2 + 3)
+ *
+ * 6 seconds : clean idles client
+ *
+ * 7 seconds : when we're secheduled to erase (+2 + 5)
+ *
+ * 7 seconds : verified client is idle
+ *
+ * 8 seconds : clean erases client info
+ *
+ * 9 seconds : verified client is erased
+ */
+
+ lock_pq([&] () {
+ EXPECT_EQ(0u, pq.client_map.size()) <<
+ "client map initially has size 0";
+ });
+
+ Request req;
+ dmc::ReqParams req_params(1, 1);
+ pq.add_request_time(req, client, req_params, dmc::get_time());
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ lock_pq([&] () {
+ EXPECT_EQ(1u, pq.client_map.size()) <<
+ "client map has 1 after 1 client";
+ EXPECT_FALSE(pq.client_map.at(client)->idle) <<
+ "initially client map entry shows not idle.";
+ });
+
+ std::this_thread::sleep_for(std::chrono::seconds(6));
+
+ lock_pq([&] () {
+ EXPECT_TRUE(pq.client_map.at(client)->idle) <<
+ "after idle age client map entry shows idle.";
+ });
+
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+
+ lock_pq([&] () {
+ EXPECT_EQ(0u, pq.client_map.size()) <<
+ "client map loses its entry after erase age";
+ });
+ } // TEST
+
+
+#if 0
+ TEST(dmclock_server, reservation_timing) {
+ using ClientId = int;
+ // NB? PUSH OR PULL
+ using Queue = std::unique_ptr<dmc::PriorityQueue<ClientId,Request>>;
+ using std::chrono::steady_clock;
+
+ int client = 17;
+
+ std::vector<dmc::Time> times;
+ std::mutex times_mtx;
+ using Guard = std::lock_guard<decltype(times_mtx)>;
+
+ // reservation every second
+ dmc::ClientInfo ci(1.0, 0.0, 0.0);
+ Queue pq;
+
- auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
- return info1;
++ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++ return &ci;
++ };
+ auto server_ready_f = [] () -> bool { return true; };
+ auto submit_req_f = [&] (const ClientId& c,
+ std::unique_ptr<Request> req,
+ dmc::PhaseType phase) {
+ {
+ Guard g(times_mtx);
+ times.emplace_back(dmc::get_time());
+ }
+ std::thread complete([&](){ pq->request_completed(); });
+ complete.detach();
+ };
+
+ // NB? PUSH OR PULL
+ pq = Queue(new dmc::PriorityQueue<ClientId,Request>(client_info_f,
+ server_ready_f,
+ submit_req_f,
+ false));
+
+ Request req;
+ ReqParams<ClientId> req_params(client, 1, 1);
+
+ for (int i = 0; i < 5; ++i) {
+ pq->add_request_time(req, req_params, dmc::get_time());
+ }
+
+ {
+ Guard g(times_mtx);
+ std::this_thread::sleep_for(std::chrono::milliseconds(5500));
+ EXPECT_EQ(5, times.size()) <<
+ "after 5.5 seconds, we should have 5 requests times at 1 second apart";
+ }
+ } // TEST
+#endif
+
+
+ TEST(dmclock_server, remove_by_req_filter) {
+ struct MyReq {
+ int id;
+
+ MyReq(int _id) :
+ id(_id)
+ {
+ // empty
+ }
+ }; // MyReq
+
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,MyReq>;
+
+ ClientId client1 = 17;
+ ClientId client2 = 98;
+
+ dmc::ClientInfo info1(0.0, 1.0, 0.0);
+
- auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
- return info1;
++ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++ return &info1;
+ };
+
+ Queue pq(client_info_f, true);
+
+ EXPECT_EQ(0u, pq.client_count());
+ EXPECT_EQ(0u, pq.request_count());
+
+ ReqParams req_params(1,1);
+
+ pq.add_request(MyReq(1), client1, req_params);
+ pq.add_request(MyReq(11), client1, req_params);
+ pq.add_request(MyReq(2), client2, req_params);
+ pq.add_request(MyReq(0), client2, req_params);
+ pq.add_request(MyReq(13), client2, req_params);
+ pq.add_request(MyReq(2), client2, req_params);
+ pq.add_request(MyReq(13), client2, req_params);
+ pq.add_request(MyReq(98), client2, req_params);
+ pq.add_request(MyReq(44), client1, req_params);
+
+ EXPECT_EQ(2u, pq.client_count());
+ EXPECT_EQ(9u, pq.request_count());
+
+ pq.remove_by_req_filter([](const MyReq& r) -> bool {return 1 == r.id % 2;});
+
+ EXPECT_EQ(5u, pq.request_count());
+
+ std::list<MyReq> capture;
+ pq.remove_by_req_filter(
+ [&capture] (const MyReq& r) -> bool {
+ if (0 == r.id % 2) {
+ capture.push_front(r);
+ return true;
+ } else {
+ return false;
+ }
+ },
+ true);
+
+ EXPECT_EQ(0u, pq.request_count());
+ EXPECT_EQ(5u, capture.size());
+ int total = 0;
+ for (auto i : capture) {
+ total += i.id;
+ }
+ EXPECT_EQ(146, total) << " sum of captured items should be 146";
+ } // TEST
+
+
+ TEST(dmclock_server, remove_by_req_filter_ordering_forwards_visit) {
+ struct MyReq {
+ int id;
+
+ MyReq(int _id) :
+ id(_id)
+ {
+ // empty
+ }
+ }; // MyReq
+
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,MyReq>;
+
+ ClientId client1 = 17;
+
+ dmc::ClientInfo info1(0.0, 1.0, 0.0);
+
- auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
- return info1;
++ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++ return &info1;
+ };
+
+ Queue pq(client_info_f, true);
+
+ EXPECT_EQ(0u, pq.client_count());
+ EXPECT_EQ(0u, pq.request_count());
+
+ ReqParams req_params(1,1);
+
+ pq.add_request(MyReq(1), client1, req_params);
+ pq.add_request(MyReq(2), client1, req_params);
+ pq.add_request(MyReq(3), client1, req_params);
+ pq.add_request(MyReq(4), client1, req_params);
+ pq.add_request(MyReq(5), client1, req_params);
+ pq.add_request(MyReq(6), client1, req_params);
+
+ EXPECT_EQ(1u, pq.client_count());
+ EXPECT_EQ(6u, pq.request_count());
+
+ // remove odd ids in forward order and append to end
+
+ std::vector<MyReq> capture;
+ pq.remove_by_req_filter(
+ [&capture] (const MyReq& r) -> bool {
+ if (1 == r.id % 2) {
+ capture.push_back(r);
+ return true;
+ } else {
+ return false;
+ }
+ },
+ false);
+
+ EXPECT_EQ(3u, pq.request_count());
+ EXPECT_EQ(3u, capture.size());
+ EXPECT_EQ(1, capture[0].id) << "items should come out in forward order";
+ EXPECT_EQ(3, capture[1].id) << "items should come out in forward order";
+ EXPECT_EQ(5, capture[2].id) << "items should come out in forward order";
+
+ // remove even ids in reverse order but insert at front so comes
+ // out forwards
+
+ std::vector<MyReq> capture2;
+ pq.remove_by_req_filter(
+ [&capture2] (const MyReq& r) -> bool {
+ if (0 == r.id % 2) {
+ capture2.insert(capture2.begin(), r);
+ return true;
+ } else {
+ return false;
+ }
+ },
+ false);
+
+ EXPECT_EQ(0u, pq.request_count());
+ EXPECT_EQ(3u, capture2.size());
+ EXPECT_EQ(6, capture2[0].id) << "items should come out in reverse order";
+ EXPECT_EQ(4, capture2[1].id) << "items should come out in reverse order";
+ EXPECT_EQ(2, capture2[2].id) << "items should come out in reverse order";
+ } // TEST
+
+
+ TEST(dmclock_server, remove_by_req_filter_ordering_backwards_visit) {
+ struct MyReq {
+ int id;
+
+ MyReq(int _id) :
+ id(_id)
+ {
+ // empty
+ }
+ }; // MyReq
+
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,MyReq>;
+
+ ClientId client1 = 17;
+
+ dmc::ClientInfo info1(0.0, 1.0, 0.0);
+
- auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
- return info1;
++ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++ return &info1;
+ };
+
+ Queue pq(client_info_f, true);
+
+ EXPECT_EQ(0u, pq.client_count());
+ EXPECT_EQ(0u, pq.request_count());
+
+ ReqParams req_params(1,1);
+
+ pq.add_request(MyReq(1), client1, req_params);
+ pq.add_request(MyReq(2), client1, req_params);
+ pq.add_request(MyReq(3), client1, req_params);
+ pq.add_request(MyReq(4), client1, req_params);
+ pq.add_request(MyReq(5), client1, req_params);
+ pq.add_request(MyReq(6), client1, req_params);
+
+ EXPECT_EQ(1u, pq.client_count());
+ EXPECT_EQ(6u, pq.request_count());
+
+ // now remove odd ids in forward order
+
+ std::vector<MyReq> capture;
+ pq.remove_by_req_filter(
+ [&capture] (const MyReq& r) -> bool {
+ if (1 == r.id % 2) {
+ capture.insert(capture.begin(), r);
+ return true;
+ } else {
+ return false;
+ }
+ },
+ true);
+
+ EXPECT_EQ(3u, pq.request_count());
+ EXPECT_EQ(3u, capture.size());
+ EXPECT_EQ(1, capture[0].id) << "items should come out in forward order";
+ EXPECT_EQ(3, capture[1].id) << "items should come out in forward order";
+ EXPECT_EQ(5, capture[2].id) << "items should come out in forward order";
+
+ // now remove even ids in reverse order
+
+ std::vector<MyReq> capture2;
+ pq.remove_by_req_filter(
+ [&capture2] (const MyReq& r) -> bool {
+ if (0 == r.id % 2) {
+ capture2.push_back(r);
+ return true;
+ } else {
+ return false;
+ }
+ },
+ true);
+
+ EXPECT_EQ(0u, pq.request_count());
+ EXPECT_EQ(3u, capture2.size());
+ EXPECT_EQ(6, capture2[0].id) << "items should come out in reverse order";
+ EXPECT_EQ(4, capture2[1].id) << "items should come out in reverse order";
+ EXPECT_EQ(2, capture2[2].id) << "items should come out in reverse order";
+ } // TEST
+
+
+ TEST(dmclock_server, remove_by_client) {
+ struct MyReq {
+ int id;
+
+ MyReq(int _id) :
+ id(_id)
+ {
+ // empty
+ }
+ }; // MyReq
+
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,MyReq>;
+
+ ClientId client1 = 17;
+ ClientId client2 = 98;
+
+ dmc::ClientInfo info1(0.0, 1.0, 0.0);
+
- auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
- if (client1 == c) return info1;
- else if (client2 == c) return info2;
++ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++ return &info1;
+ };
+
+ Queue pq(client_info_f, true);
+
+ EXPECT_EQ(0u, pq.client_count());
+ EXPECT_EQ(0u, pq.request_count());
+
+ ReqParams req_params(1,1);
+
+ pq.add_request(MyReq(1), client1, req_params);
+ pq.add_request(MyReq(11), client1, req_params);
+ pq.add_request(MyReq(2), client2, req_params);
+ pq.add_request(MyReq(0), client2, req_params);
+ pq.add_request(MyReq(13), client2, req_params);
+ pq.add_request(MyReq(2), client2, req_params);
+ pq.add_request(MyReq(13), client2, req_params);
+ pq.add_request(MyReq(98), client2, req_params);
+ pq.add_request(MyReq(44), client1, req_params);
+
+ EXPECT_EQ(2u, pq.client_count());
+ EXPECT_EQ(9u, pq.request_count());
+
+ std::list<MyReq> removed;
+
+ pq.remove_by_client(client1,
+ true,
+ [&removed] (const MyReq& r) {
+ removed.push_front(r);
+ });
+
+ EXPECT_EQ(3u, removed.size());
+ EXPECT_EQ(1, removed.front().id);
+ removed.pop_front();
+ EXPECT_EQ(11, removed.front().id);
+ removed.pop_front();
+ EXPECT_EQ(44, removed.front().id);
+ removed.pop_front();
+
+ EXPECT_EQ(6u, pq.request_count());
+
+ Queue::PullReq pr = pq.pull_request();
+ EXPECT_TRUE(pr.is_retn());
+ EXPECT_EQ(2, pr.get_retn().request->id);
+
+ pr = pq.pull_request();
+ EXPECT_TRUE(pr.is_retn());
+ EXPECT_EQ(0, pr.get_retn().request->id);
+
+ pq.remove_by_client(client2);
+ EXPECT_EQ(0u, pq.request_count()) <<
+ "after second client removed, none left";
+ } // TEST
+
+
+ TEST(dmclock_server_pull, pull_weight) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 17;
+ ClientId client2 = 98;
+
+ dmc::ClientInfo info1(0.0, 1.0, 0.0);
+ dmc::ClientInfo info2(0.0, 2.0, 0.0);
+
+ QueueRef pq;
+
- return info1;
++ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++ if (client1 == c) return &info1;
++ else if (client2 == c) return &info2;
+ else {
+ ADD_FAILURE() << "client info looked up for non-existant client";
- auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
- if (client1 == c) return info1;
- else if (client2 == c) return info2;
++ return nullptr;
+ }
+ };
+
+ pq = QueueRef(new Queue(client_info_f, false));
+
+ ReqParams req_params(1,1);
+
+ auto now = dmc::get_time();
+
+ for (int i = 0; i < 5; ++i) {
+ pq->add_request(Request{}, client1, req_params);
+ pq->add_request(Request{}, client2, req_params);
+ now += 0.0001;
+ }
+
+ int c1_count = 0;
+ int c2_count = 0;
+ for (int i = 0; i < 6; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (client1 == retn.client) ++c1_count;
+ else if (client2 == retn.client) ++c2_count;
+ else ADD_FAILURE() << "got request from neither of two clients";
+
+ EXPECT_EQ(PhaseType::priority, retn.phase);
+ }
+
+ EXPECT_EQ(2, c1_count) <<
+ "one-third of request should have come from first client";
+ EXPECT_EQ(4, c2_count) <<
+ "two-thirds of request should have come from second client";
+ }
+
+
+ TEST(dmclock_server_pull, pull_reservation) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 52;
+ ClientId client2 = 8;
+
+ dmc::ClientInfo info1(2.0, 0.0, 0.0);
+ dmc::ClientInfo info2(1.0, 0.0, 0.0);
+
- return info1;
++ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++ if (client1 == c) return &info1;
++ else if (client2 == c) return &info2;
+ else {
+ ADD_FAILURE() << "client info looked up for non-existant client";
- auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
- if (client1 == c) return info1;
- else if (client2 == c) return info2;
++ return nullptr;
+ }
+ };
+
+ QueueRef pq(new Queue(client_info_f, false));
+
+ ReqParams req_params(1,1);
+
+ // make sure all times are well before now
+ auto old_time = dmc::get_time() - 100.0;
+
+ for (int i = 0; i < 5; ++i) {
+ pq->add_request_time(Request{}, client1, req_params, old_time);
+ pq->add_request_time(Request{}, client2, req_params, old_time);
+ old_time += 0.001;
+ }
+
+ int c1_count = 0;
+ int c2_count = 0;
+
+ for (int i = 0; i < 6; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (client1 == retn.client) ++c1_count;
+ else if (client2 == retn.client) ++c2_count;
+ else ADD_FAILURE() << "got request from neither of two clients";
+
+ EXPECT_EQ(PhaseType::reservation, retn.phase);
+ }
+
+ EXPECT_EQ(4, c1_count) <<
+ "two-thirds of request should have come from first client";
+ EXPECT_EQ(2, c2_count) <<
+ "one-third of request should have come from second client";
+ } // dmclock_server_pull.pull_reservation
+
+
+ TEST(dmclock_server_pull, update_client_info) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request,false>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 17;
+ ClientId client2 = 98;
+
+ dmc::ClientInfo info1(0.0, 100.0, 0.0);
+ dmc::ClientInfo info2(0.0, 200.0, 0.0);
+
+ QueueRef pq;
+
- return info1;
++ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++ if (client1 == c) return &info1;
++ else if (client2 == c) return &info2;
+ else {
+ ADD_FAILURE() << "client info looked up for non-existant client";
- auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
- if (client1 == c) return info1[cli_info_group];
- else if (client2 == c) return info2[cli_info_group];
++ return nullptr;
+ }
+ };
+
+ pq = QueueRef(new Queue(client_info_f, false));
+
+ ReqParams req_params(1,1);
+
+ auto now = dmc::get_time();
+
+ for (int i = 0; i < 5; ++i) {
+ pq->add_request(Request{}, client1, req_params);
+ pq->add_request(Request{}, client2, req_params);
+ now += 0.0001;
+ }
+
+ int c1_count = 0;
+ int c2_count = 0;
+ for (int i = 0; i < 10; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (i > 5) continue;
+ if (client1 == retn.client) ++c1_count;
+ else if (client2 == retn.client) ++c2_count;
+ else ADD_FAILURE() << "got request from neither of two clients";
+
+ EXPECT_EQ(PhaseType::priority, retn.phase);
+ }
+
+ EXPECT_EQ(2, c1_count) <<
+ "before: one-third of request should have come from first client";
+ EXPECT_EQ(4, c2_count) <<
+ "before: two-thirds of request should have come from second client";
+
+ std::chrono::seconds dura(1);
+ std::this_thread::sleep_for(dura);
+
+ info1 = dmc::ClientInfo(0.0, 200.0, 0.0);
+ pq->update_client_info(17);
+
+ now = dmc::get_time();
+
+ for (int i = 0; i < 5; ++i) {
+ pq->add_request(Request{}, client1, req_params);
+ pq->add_request(Request{}, client2, req_params);
+ now += 0.0001;
+ }
+
+ c1_count = 0;
+ c2_count = 0;
+ for (int i = 0; i < 6; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (client1 == retn.client) ++c1_count;
+ else if (client2 == retn.client) ++c2_count;
+ else ADD_FAILURE() << "got request from neither of two clients";
+
+ EXPECT_EQ(PhaseType::priority, retn.phase);
+ }
+
+ EXPECT_EQ(3, c1_count) <<
+ "after: one-third of request should have come from first client";
+ EXPECT_EQ(3, c2_count) <<
+ "after: two-thirds of request should have come from second client";
+ }
+
+
+ TEST(dmclock_server_pull, dynamic_cli_info_f) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request,true>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 17;
+ ClientId client2 = 98;
+
+ std::vector<dmc::ClientInfo> info1;
+ std::vector<dmc::ClientInfo> info2;
+
+ info1.push_back(dmc::ClientInfo(0.0, 100.0, 0.0));
+ info1.push_back(dmc::ClientInfo(0.0, 150.0, 0.0));
+
+ info2.push_back(dmc::ClientInfo(0.0, 200.0, 0.0));
+ info2.push_back(dmc::ClientInfo(0.0, 50.0, 0.0));
+
+ uint cli_info_group = 0;
+
+ QueueRef pq;
+
- return info1[0];
++ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++ if (client1 == c) return &info1[cli_info_group];
++ else if (client2 == c) return &info2[cli_info_group];
+ else {
+ ADD_FAILURE() << "client info looked up for non-existant client";
- auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
- if (client1 == c) return info1;
- else if (client2 == c) return info2;
++ return nullptr;
+ }
+ };
+
+ pq = QueueRef(new Queue(client_info_f, false));
+
+ ReqParams req_params(1,1);
+
+ auto now = dmc::get_time();
+
+ for (int i = 0; i < 5; ++i) {
+ pq->add_request(Request{}, client1, req_params);
+ pq->add_request(Request{}, client2, req_params);
+ now += 0.0001;
+ }
+
+ int c1_count = 0;
+ int c2_count = 0;
+ for (int i = 0; i < 10; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (i > 5) continue;
+ if (client1 == retn.client) ++c1_count;
+ else if (client2 == retn.client) ++c2_count;
+ else ADD_FAILURE() << "got request from neither of two clients";
+
+ EXPECT_EQ(PhaseType::priority, retn.phase);
+ }
+
+ EXPECT_EQ(2, c1_count) <<
+ "before: one-third of request should have come from first client";
+ EXPECT_EQ(4, c2_count) <<
+ "before: two-thirds of request should have come from second client";
+
+ std::chrono::seconds dura(1);
+ std::this_thread::sleep_for(dura);
+
+ cli_info_group = 1;
+
+ now = dmc::get_time();
+
+ for (int i = 0; i < 6; ++i) {
+ pq->add_request(Request{}, client1, req_params);
+ pq->add_request(Request{}, client2, req_params);
+ now += 0.0001;
+ }
+
+ c1_count = 0;
+ c2_count = 0;
+ for (int i = 0; i < 8; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (client1 == retn.client) ++c1_count;
+ else if (client2 == retn.client) ++c2_count;
+ else ADD_FAILURE() << "got request from neither of two clients";
+
+ EXPECT_EQ(PhaseType::priority, retn.phase);
+ }
+
+ EXPECT_EQ(6, c1_count) <<
+ "after: one-third of request should have come from first client";
+ EXPECT_EQ(2, c2_count) <<
+ "after: two-thirds of request should have come from second client";
+ }
+
+
+ // This test shows what happens when a request can be ready (under
+ // limit) but not schedulable since proportion tag is 0. We expect
+ // to get some future and none responses.
+ TEST(dmclock_server_pull, ready_and_under_limit) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 52;
+ ClientId client2 = 8;
+
+ dmc::ClientInfo info1(1.0, 0.0, 0.0);
+ dmc::ClientInfo info2(1.0, 0.0, 0.0);
+
- return info1;
++ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++ if (client1 == c) return &info1;
++ else if (client2 == c) return &info2;
+ else {
+ ADD_FAILURE() << "client info looked up for non-existant client";
- auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
- return info;
++ return nullptr;
+ }
+ };
+
+ QueueRef pq(new Queue(client_info_f, false));
+
+ ReqParams req_params(1,1);
+
+ // make sure all times are well before now
+ auto start_time = dmc::get_time() - 100.0;
+
+ // add six requests; for same client reservations spaced one apart
+ for (int i = 0; i < 3; ++i) {
+ pq->add_request_time(Request{}, client1, req_params, start_time);
+ pq->add_request_time(Request{}, client2, req_params, start_time);
+ }
+
+ Queue::PullReq pr = pq->pull_request(start_time + 0.5);
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+
+ pr = pq->pull_request(start_time + 0.5);
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+
+ pr = pq->pull_request(start_time + 0.5);
+ EXPECT_EQ(Queue::NextReqType::future, pr.type) <<
+ "too soon for next reservation";
+
+ pr = pq->pull_request(start_time + 1.5);
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+
+ pr = pq->pull_request(start_time + 1.5);
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+
+ pr = pq->pull_request(start_time + 1.5);
+ EXPECT_EQ(Queue::NextReqType::future, pr.type) <<
+ "too soon for next reservation";
+
+ pr = pq->pull_request(start_time + 2.5);
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+
+ pr = pq->pull_request(start_time + 2.5);
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+
+ pr = pq->pull_request(start_time + 2.5);
+ EXPECT_EQ(Queue::NextReqType::none, pr.type) << "no more requests left";
+ }
+
+
+ TEST(dmclock_server_pull, pull_none) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ dmc::ClientInfo info(1.0, 1.0, 1.0);
+
- auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
- return info;
++ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++ return &info;
+ };
+
+ QueueRef pq(new Queue(client_info_f, false));
+
+ // Request req;
+ ReqParams req_params(1,1);
+
+ auto now = dmc::get_time();
+
+ Queue::PullReq pr = pq->pull_request(now + 100);
+
+ EXPECT_EQ(Queue::NextReqType::none, pr.type);
+ }
+
+
+ TEST(dmclock_server_pull, pull_future) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 52;
+ // ClientId client2 = 8;
+
+ dmc::ClientInfo info(1.0, 0.0, 1.0);
+
- auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
- return info;
++ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++ return &info;
+ };
+
+ QueueRef pq(new Queue(client_info_f, false));
+
+ ReqParams req_params(1,1);
+
+ // make sure all times are well before now
+ auto now = dmc::get_time();
+
+ pq->add_request_time(Request{}, client1, req_params, now + 100);
+ Queue::PullReq pr = pq->pull_request(now);
+
+ EXPECT_EQ(Queue::NextReqType::future, pr.type);
+
+ Time when = boost::get<Time>(pr.data);
+ EXPECT_EQ(now + 100, when);
+ }
+
+
+ TEST(dmclock_server_pull, pull_future_limit_break_weight) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 52;
+ // ClientId client2 = 8;
+
+ dmc::ClientInfo info(0.0, 1.0, 1.0);
+
- auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
- return info;
++ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++ return &info;
+ };
+
+ QueueRef pq(new Queue(client_info_f, true));
+
+ ReqParams req_params(1,1);
+
+ // make sure all times are well before now
+ auto now = dmc::get_time();
+
+ pq->add_request_time(Request{}, client1, req_params, now + 100);
+ Queue::PullReq pr = pq->pull_request(now);
+
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+ EXPECT_EQ(client1, retn.client);
+ }
+
+
+ TEST(dmclock_server_pull, pull_future_limit_break_reservation) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 52;
+ // ClientId client2 = 8;
+
+ dmc::ClientInfo info(1.0, 0.0, 1.0);
+
++ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++ return &info;
+ };
+
+ QueueRef pq(new Queue(client_info_f, true));
+
+ ReqParams req_params(1,1);
+
+ // make sure all times are well before now
+ auto now = dmc::get_time();
+
+ pq->add_request_time(Request{}, client1, req_params, now + 100);
+ Queue::PullReq pr = pq->pull_request(now);
+
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+ EXPECT_EQ(client1, retn.client);
+ }
+ } // namespace dmclock
+} // namespace crimson