constexpr uint tag_modulo = 1000000;
struct ClientInfo {
- const double reservation; // minimum
- const double weight; // proportional
- const double limit; // maximum
+ double reservation; // minimum
+ double weight; // proportional
+ double limit; // maximum
// multiplicative inverses of above, which we use in calculations
// and don't want to recalculate repeatedly
- const double reservation_inv;
- const double weight_inv;
- const double limit_inv;
+ double reservation_inv;
+ double weight_inv;
+ double limit_inv;
// order parameters -- min, "normal", max
ClientInfo(double _reservation, double _weight, double _limit) :
}; // class RequestTag
- // C is client identifier type, R is request type, B is heap
- // branching factor
- template<typename C, typename R, uint B>
+ // C is client identifier type, R is request type,
+ // U1 determines whether to use client information function dynamically,
+ // B is heap branching factor
+ template<typename C, typename R, bool U1, uint B>
class PriorityQueueBase {
// we don't want to include gtest.h just for FRIEND_TEST
friend class dmclock_server_client_idle_erase_Test;
// ClientRec could be "protected" with no issue. [See comments
// associated with function submit_top_request.]
class ClientRec {
- friend PriorityQueueBase<C,R,B>;
+ friend PriorityQueueBase<C,R,U1,B>;
C client;
RequestTag prev_tag;
friend std::ostream&
operator<<(std::ostream& out,
- const typename PriorityQueueBase<C,R,B>::ClientRec& e) {
+ const typename PriorityQueueBase<C,R,U1,B>::ClientRec& e) {
out << "{ ClientRec::" <<
" client:" << e.client <<
" prev_tag:" << e.prev_tag <<
}
+ void update_client_info(const C& client_id) {
+ DataGuard g(data_mtx);
+ auto client_it = client_map.find(client_id);
+ if (client_map.end() != client_it) {
+ ClientRec& client = (*client_it->second);
+ client.info = client_info_f(client_id);
+ }
+ }
+
+
+ void update_client_infos() {
+ DataGuard g(data_mtx);
+ for (auto i : client_map) {
+ i.second->info = client_info_f(i.second->client);
+ }
+ }
+
+
friend std::ostream& operator<<(std::ostream& out,
const PriorityQueueBase& q) {
std::lock_guard<decltype(q.data_mtx)> guard(q.data_mtx);
}
};
- ClientInfoFunc client_info_f;
+ ClientInfoFunc client_info_f;
+ static constexpr bool is_dynamic_cli_info_f = U1;
mutable std::mutex data_mtx;
using DataGuard = std::lock_guard<decltype(data_mtx)>;
}
+ inline const ClientInfo get_cli_info(ClientRec& client) const {
+ if (is_dynamic_cli_info_f) {
+ client.info = client_info_f(client.client);
+ }
+ return client.info;
+ }
+
+
// data_mtx must be held by caller
void do_add_request(RequestRef&& request,
const C& client_id,
if (!client.has_request()) {
tag = RequestTag(client.get_req_tag(),
- client.info,
+ get_cli_info(client),
req_params,
time,
cost);
client.update_req_tag(tag, tick);
}
#else
- RequestTag tag(client.get_req_tag(), client.info, req_params, time, cost);
+ RequestTag tag(client.get_req_tag(), get_cli_info(client), req_params, time, cost);
// copy tag to previous tag for client
client.update_req_tag(tag, tick);
#endif
#ifndef DO_NOT_DELAY_TAG_CALC
if (top.has_request()) {
ClientReq& next_first = top.next_request();
- next_first.tag = RequestTag(tag, top.info,
+ next_first.tag = RequestTag(tag, get_cli_info(top),
top.cur_delta, top.cur_rho,
next_first.tag.arrival);
// data_mtx should be held when called
NextReq do_next_request(Time now) {
- NextReq result;
+ NextReq result{};
// if reservation queue is empty, all are empty (i.e., no active clients)
if(resv_heap.empty()) {
}; // class PriorityQueueBase
- template<typename C, typename R, uint B=2>
- class PullPriorityQueue : public PriorityQueueBase<C,R,B> {
- using super = PriorityQueueBase<C,R,B>;
+ template<typename C, typename R, bool U1=false, uint B=2>
+ class PullPriorityQueue : public PriorityQueueBase<C,R,U1,B> {
+ using super = PriorityQueueBase<C,R,U1,B>;
public:
// PUSH version
- template<typename C, typename R, uint B=2>
- class PushPriorityQueue : public PriorityQueueBase<C,R,B> {
+ template<typename C, typename R, bool U1=false, uint B=2>
+ class PushPriorityQueue : public PriorityQueueBase<C,R,U1,B> {
protected:
- using super = PriorityQueueBase<C,R,B>;
+ using super = PriorityQueueBase<C,R,U1,B>;
public:
} // dmclock_server_pull.pull_reservation
+ TEST(dmclock_server_pull, update_client_info) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request,false>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 17;
+ ClientId client2 = 98;
+
+ dmc::ClientInfo info1(0.0, 100.0, 0.0);
+ dmc::ClientInfo info2(0.0, 200.0, 0.0);
+
+ QueueRef pq;
+
+ auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
+ if (client1 == c) return info1;
+ else if (client2 == c) return info2;
+ else {
+ ADD_FAILURE() << "client info looked up for non-existant client";
+ return info1;
+ }
+ };
+
+ pq = QueueRef(new Queue(client_info_f, false));
+
+ ReqParams req_params(1,1);
+
+ auto now = dmc::get_time();
+
+ for (int i = 0; i < 5; ++i) {
+ pq->add_request(Request{}, client1, req_params);
+ pq->add_request(Request{}, client2, req_params);
+ now += 0.0001;
+ }
+
+ int c1_count = 0;
+ int c2_count = 0;
+ for (int i = 0; i < 10; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (i > 5) continue;
+ if (client1 == retn.client) ++c1_count;
+ else if (client2 == retn.client) ++c2_count;
+ else ADD_FAILURE() << "got request from neither of two clients";
+
+ EXPECT_EQ(PhaseType::priority, retn.phase);
+ }
+
+ EXPECT_EQ(2, c1_count) <<
+ "before: one-third of request should have come from first client";
+ EXPECT_EQ(4, c2_count) <<
+ "before: two-thirds of request should have come from second client";
+
+ std::chrono::seconds dura(1);
+ std::this_thread::sleep_for(dura);
+
+ info1 = dmc::ClientInfo(0.0, 200.0, 0.0);
+ pq->update_client_info(17);
+
+ now = dmc::get_time();
+
+ for (int i = 0; i < 5; ++i) {
+ pq->add_request(Request{}, client1, req_params);
+ pq->add_request(Request{}, client2, req_params);
+ now += 0.0001;
+ }
+
+ c1_count = 0;
+ c2_count = 0;
+ for (int i = 0; i < 6; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (client1 == retn.client) ++c1_count;
+ else if (client2 == retn.client) ++c2_count;
+ else ADD_FAILURE() << "got request from neither of two clients";
+
+ EXPECT_EQ(PhaseType::priority, retn.phase);
+ }
+
+ EXPECT_EQ(3, c1_count) <<
+ "after: one-third of request should have come from first client";
+ EXPECT_EQ(3, c2_count) <<
+ "after: two-thirds of request should have come from second client";
+ }
+
+
+ TEST(dmclock_server_pull, dynamic_cli_info_f) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request,true>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 17;
+ ClientId client2 = 98;
+
+ std::vector<dmc::ClientInfo> info1;
+ std::vector<dmc::ClientInfo> info2;
+
+ info1.push_back(dmc::ClientInfo(0.0, 100.0, 0.0));
+ info1.push_back(dmc::ClientInfo(0.0, 150.0, 0.0));
+
+ info2.push_back(dmc::ClientInfo(0.0, 200.0, 0.0));
+ info2.push_back(dmc::ClientInfo(0.0, 50.0, 0.0));
+
+ uint cli_info_group = 0;
+
+ QueueRef pq;
+
+ auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
+ if (client1 == c) return info1[cli_info_group];
+ else if (client2 == c) return info2[cli_info_group];
+ else {
+ ADD_FAILURE() << "client info looked up for non-existant client";
+ return info1[0];
+ }
+ };
+
+ pq = QueueRef(new Queue(client_info_f, false));
+
+ ReqParams req_params(1,1);
+
+ auto now = dmc::get_time();
+
+ for (int i = 0; i < 5; ++i) {
+ pq->add_request(Request{}, client1, req_params);
+ pq->add_request(Request{}, client2, req_params);
+ now += 0.0001;
+ }
+
+ int c1_count = 0;
+ int c2_count = 0;
+ for (int i = 0; i < 10; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (i > 5) continue;
+ if (client1 == retn.client) ++c1_count;
+ else if (client2 == retn.client) ++c2_count;
+ else ADD_FAILURE() << "got request from neither of two clients";
+
+ EXPECT_EQ(PhaseType::priority, retn.phase);
+ }
+
+ EXPECT_EQ(2, c1_count) <<
+ "before: one-third of request should have come from first client";
+ EXPECT_EQ(4, c2_count) <<
+ "before: two-thirds of request should have come from second client";
+
+ std::chrono::seconds dura(1);
+ std::this_thread::sleep_for(dura);
+
+ cli_info_group = 1;
+
+ now = dmc::get_time();
+
+ for (int i = 0; i < 6; ++i) {
+ pq->add_request(Request{}, client1, req_params);
+ pq->add_request(Request{}, client2, req_params);
+ now += 0.0001;
+ }
+
+ c1_count = 0;
+ c2_count = 0;
+ for (int i = 0; i < 8; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (client1 == retn.client) ++c1_count;
+ else if (client2 == retn.client) ++c2_count;
+ else ADD_FAILURE() << "got request from neither of two clients";
+
+ EXPECT_EQ(PhaseType::priority, retn.phase);
+ }
+
+ EXPECT_EQ(6, c1_count) <<
+ "after: one-third of request should have come from first client";
+ EXPECT_EQ(2, c2_count) <<
+ "after: two-thirds of request should have come from second client";
+ }
+
+
// This test shows what happens when a request can be ready (under
// limit) but not schedulable since proportion tag is 0. We expect
// to get some future and none responses.