g_conf.server_random_selection = stobool(val);
if (!cf.read("global", "server_soft_limit", val))
g_conf.server_soft_limit = stobool(val);
+ if (!cf.read("global", "anticipation_timeout", val))
+ g_conf.anticipation_timeout = stod(val);
for (uint i = 0; i < g_conf.server_groups; i++) {
srv_group_t st;
uint client_groups;
bool server_random_selection;
bool server_soft_limit;
+ double anticipation_timeout;
std::vector<cli_group_t> cli_group;
std::vector<srv_group_t> srv_group;
sim_config_t(uint _server_groups = 1,
uint _client_groups = 1,
bool _server_random_selection = false,
- bool _server_soft_limit = true) :
+ bool _server_soft_limit = true,
+ double _anticipation_timeout = 0.0) :
server_groups(_server_groups),
client_groups(_client_groups),
server_random_selection(_server_random_selection),
- server_soft_limit(_server_soft_limit)
+ server_soft_limit(_server_soft_limit),
+ anticipation_timeout(_anticipation_timeout)
{
srv_group.reserve(server_groups);
cli_group.reserve(client_groups);
"server_groups = " << sim_config.server_groups << "\n" <<
"client_groups = " << sim_config.client_groups << "\n" <<
"server_random_selection = " << sim_config.server_random_selection << "\n" <<
- "server_soft_limit = " << sim_config.server_soft_limit;
+ "server_soft_limit = " << sim_config.server_soft_limit << "\n" <<
+ std::fixed << std::setprecision(3) <<
+ "anticipation_timeout = " << sim_config.anticipation_timeout;
return out;
}
}; // class sim_config_t
};
using DmcQueue = dmc::PushPriorityQueue<ClientId,sim::TestRequest>;
+ using DmcServiceTracker = dmc::ServiceTracker<ServerId,dmc::BorrowingTracker>;
using DmcServer = sim::SimulatedServer<DmcQueue,
dmc::ReqParams,
dmc::PhaseType,
DmcAccum>;
- using DmcClient = sim::SimulatedClient<dmc::ServiceTracker<ServerId>,
+ using DmcClient = sim::SimulatedClient<DmcServiceTracker,
dmc::ReqParams,
dmc::PhaseType,
DmcAccum>;
const uint client_groups = g_conf.client_groups;
const bool server_random_selection = g_conf.server_random_selection;
const bool server_soft_limit = g_conf.server_soft_limit;
+ const double anticipation_timeout = g_conf.anticipation_timeout;
uint server_total_count = 0;
uint client_total_count = 0;
test::CreateQueueF create_queue_f =
[&](test::DmcQueue::CanHandleRequestFunc can_f,
test::DmcQueue::HandleRequestFunc handle_f) -> test::DmcQueue* {
- return new test::DmcQueue(client_info_f, can_f, handle_f, server_soft_limit);
+ return new test::DmcQueue(client_info_f,
+ can_f,
+ handle_f,
+ server_soft_limit,
+ anticipation_timeout);
};
void test::client_data(std::ostream& out,
- test::MySim* sim,
- test::MySim::ClientFilter client_disp_filter,
- int head_w, int data_w, int data_prec) {
+ test::MySim* sim,
+ test::MySim::ClientFilter client_disp_filter,
+ int head_w, int data_w, int data_prec) {
// report how many ops were done by reservation and proportion for
// each client
void test::server_data(std::ostream& out,
- test::MySim* sim,
- test::MySim::ServerFilter server_disp_filter,
- int head_w, int data_w, int data_prec) {
+ test::MySim* sim,
+ test::MySim::ServerFilter server_disp_filter,
+ int head_w, int data_w, int data_prec) {
out << std::setw(head_w) << "res_ops:";
int total_r = 0;
for (uint i = 0; i < sim->get_server_count(); ++i) {
namespace crimson {
namespace dmclock {
- struct ServerInfo {
+
+ // OrigTracker is a best-effort implementation of the the original
+ // dmClock calculations of delta and rho. It adheres to an
+ // interface, implemented via a template type, that allows it to
+ // be replaced with an alternative. The interface consists of the
+ // static create, prepare_req, resp_update, and get_last_delta
+ // functions.
+ class OrigTracker {
Counter delta_prev_req;
Counter rho_prev_req;
uint32_t my_delta;
uint32_t my_rho;
- ServerInfo(Counter _delta_prev_req,
- Counter _rho_prev_req) :
- delta_prev_req(_delta_prev_req),
- rho_prev_req(_rho_prev_req),
+ public:
+
+ OrigTracker(Counter global_delta,
+ Counter global_rho) :
+ delta_prev_req(global_delta),
+ rho_prev_req(global_rho),
my_delta(0),
my_rho(0)
- {
- // empty
+ { /* empty */ }
+
+ static inline OrigTracker create(Counter the_delta, Counter the_rho) {
+ return OrigTracker(the_delta, the_rho);
}
- inline void req_update(Counter delta, Counter rho) {
- delta_prev_req = delta;
- rho_prev_req = rho;
+ inline ReqParams prepare_req(Counter& the_delta, Counter& the_rho) {
+ Counter delta_out = 1 + the_delta - delta_prev_req - my_delta;
+ Counter rho_out = 1 + the_rho - rho_prev_req - my_rho;
+ delta_prev_req = the_delta;
+ rho_prev_req = the_rho;
my_delta = 0;
my_rho = 0;
+ return ReqParams(uint32_t(delta_out), uint32_t(rho_out));
}
- inline void resp_update(PhaseType phase) {
+ inline void resp_update(PhaseType phase,
+ Counter& the_delta,
+ Counter& the_rho) {
+ ++the_delta;
++my_delta;
- if (phase == PhaseType::reservation) ++my_rho;
+ if (phase == PhaseType::reservation) {
+ ++the_rho;
+ ++my_rho;
+ }
+ }
+
+ inline Counter get_last_delta() const {
+ return delta_prev_req;
}
- };
+ }; // struct OrigTracker
+
+
+ // BorrowingTracker always returns a positive delta and rho. If
+ // not enough responses have come in to allow that, we will borrow
+ // a future response and repay it later.
+ class BorrowingTracker {
+ Counter delta_prev_req;
+ Counter rho_prev_req;
+ Counter delta_borrow;
+ Counter rho_borrow;
+
+ public:
+
+ BorrowingTracker(Counter global_delta, Counter global_rho) :
+ delta_prev_req(global_delta),
+ rho_prev_req(global_rho),
+ delta_borrow(0),
+ rho_borrow(0)
+ { /* empty */ }
+
+ static inline BorrowingTracker create(Counter the_delta,
+ Counter the_rho) {
+ return BorrowingTracker(the_delta, the_rho);
+ }
+
+ inline Counter calc_with_borrow(const Counter& global,
+ const Counter& previous,
+ Counter& borrow) {
+ Counter result = global - previous;
+ if (0 == result) {
+ // if no replies have come in, borrow one from the future
+ ++borrow;
+ return 1;
+ } else if (result > borrow) {
+ // if we can give back all of what we borrowed, do so
+ result -= borrow;
+ borrow = 0;
+ return result;
+ } else {
+ // can only return part of what was borrowed in order to
+ // return positive
+ borrow = borrow - result + 1;
+ return 1;
+ }
+ }
+
+ inline ReqParams prepare_req(Counter& the_delta, Counter& the_rho) {
+ Counter delta_out =
+ calc_with_borrow(the_delta, delta_prev_req, delta_borrow);
+ Counter rho_out =
+ calc_with_borrow(the_rho, rho_prev_req, rho_borrow);
+ delta_prev_req = the_delta;
+ rho_prev_req = the_rho;
+ return ReqParams(uint32_t(delta_out), uint32_t(rho_out));
+ }
+
+ inline void resp_update(PhaseType phase,
+ Counter& the_delta,
+ Counter& the_rho) {
+ ++the_delta;
+ if (phase == PhaseType::reservation) {
+ ++the_rho;
+ }
+ }
+
+ inline Counter get_last_delta() const {
+ return delta_prev_req;
+ }
+ }; // struct BorrowingTracker
// S is server identifier type
- template<typename S>
+ // T is the server info class that adheres to ServerTrackerIfc interface
+ template<typename S, typename T = BorrowingTracker>
class ServiceTracker {
// we don't want to include gtest.h just for FRIEND_TEST
friend class dmclock_client_server_erase_Test;
Counter delta_counter; // # reqs completed
Counter rho_counter; // # reqs completed via reservation
- std::map<S,ServerInfo> server_map;
+ std::map<S,T> server_map;
mutable std::mutex data_mtx; // protects Counters and map
using DataGuard = std::lock_guard<decltype(data_mtx)>;
// clean config
std::deque<MarkPoint> clean_mark_points;
- Duration clean_age; // age at which ServerInfo cleaned
+ Duration clean_age; // age at which server tracker cleaned
// NB: All threads declared at end, so they're destructed firs!
// this code can only run if a request did not precede the
// response or if the record was cleaned up b/w when
// the request was made and now
- ServerInfo si(delta_counter, rho_counter);
- si.resp_update(phase);
- server_map.emplace(server_id, si);
- } else {
- it->second.resp_update(phase);
- }
-
- ++delta_counter;
- if (PhaseType::reservation == phase) {
- ++rho_counter;
+ auto i = server_map.emplace(server_id,
+ T::create(delta_counter, rho_counter));
+ it = i.first;
}
+ it->second.resp_update(phase, delta_counter, rho_counter);
}
-
/*
* Returns the ReqParams for the given server.
*/
DataGuard g(data_mtx);
auto it = server_map.find(server);
if (server_map.end() == it) {
- server_map.emplace(server, ServerInfo(delta_counter, rho_counter));
+ server_map.emplace(server,
+ T::create(delta_counter, rho_counter));
return ReqParams(1, 1);
} else {
- Counter delta =
- 1 + delta_counter - it->second.delta_prev_req - it->second.my_delta;
- Counter rho =
- 1 + rho_counter - it->second.rho_prev_req - it->second.my_rho;
-
- it->second.req_update(delta_counter, rho_counter);
-
- return ReqParams(uint32_t(delta), uint32_t(rho));
+ return it->second.prepare_req(delta_counter, rho_counter);
}
}
i != server_map.end();
/* empty */) {
auto i2 = i++;
- if (i2->second.delta_prev_req <= earliest) {
+ if (i2->second.get_last_delta() <= earliest) {
server_map.erase(i2);
}
}
double proportion;
double limit;
bool ready; // true when within limit
-#ifndef DO_NOT_DELAY_TAG_CALC
Time arrival;
-#endif
RequestTag(const RequestTag& prev_tag,
const ClientInfo& client,
const uint32_t delta,
const uint32_t rho,
const Time time,
- const double cost = 0.0) :
- reservation(cost + tag_calc(time,
- prev_tag.reservation,
- client.reservation_inv,
- rho,
- true)),
- proportion(tag_calc(time,
- prev_tag.proportion,
- client.weight_inv,
- delta,
- true)),
- limit(tag_calc(time,
- prev_tag.limit,
- client.limit_inv,
- delta,
- false)),
- ready(false)
-#ifndef DO_NOT_DELAY_TAG_CALC
- , arrival(time)
-#endif
+ const double cost = 0.0,
+ const double anticipation_timeout = 0.0) :
+ ready(false),
+ arrival(time)
{
+ Time max_time = time;
+ if (time - anticipation_timeout < prev_tag.arrival)
+ max_time -= anticipation_timeout;
+
+ reservation = cost + tag_calc(max_time,
+ prev_tag.reservation,
+ client.reservation_inv,
+ rho,
+ true);
+ proportion = tag_calc(max_time,
+ prev_tag.proportion,
+ client.weight_inv,
+ delta,
+ true);
+ limit = tag_calc(max_time,
+ prev_tag.limit,
+ client.limit_inv,
+ delta,
+ false);
+
assert(reservation < max_tag || proportion < max_tag);
}
const ClientInfo& client,
const ReqParams req_params,
const Time time,
- const double cost = 0.0) :
- RequestTag(prev_tag, client, req_params.delta, req_params.rho, time, cost)
+ const double cost = 0.0,
+ const double anticipation_timeout = 0.0) :
+ RequestTag(prev_tag, client, req_params.delta, req_params.rho, time,
+ cost, anticipation_timeout)
{ /* empty */ }
RequestTag(double _res, double _prop, double _lim, const Time _arrival) :
reservation(_res),
proportion(_prop),
limit(_lim),
- ready(false)
-#ifndef DO_NOT_DELAY_TAG_CALC
- , arrival(_arrival)
-#endif
+ ready(false),
+ arrival(_arrival)
{
assert(reservation < max_tag || proportion < max_tag);
}
reservation(other.reservation),
proportion(other.proportion),
limit(other.limit),
- ready(other.ready)
-#ifndef DO_NOT_DELAY_TAG_CALC
- , arrival(other.arrival)
-#endif
+ ready(other.ready),
+ arrival(other.arrival)
{
// empty
}
// an idle client becoming unidle
double prop_delta = 0.0;
- c::IndIntruHeapData reserv_heap_data;
- c::IndIntruHeapData lim_heap_data;
- c::IndIntruHeapData ready_heap_data;
+ c::IndIntruHeapData reserv_heap_data {};
+ c::IndIntruHeapData lim_heap_data {};
+ c::IndIntruHeapData ready_heap_data {};
#if USE_PROP_HEAP
- c::IndIntruHeapData prop_heap_data;
+ c::IndIntruHeapData prop_heap_data {};
#endif
public:
assign_unpinned_tag(prev_tag.reservation, _prev.reservation);
assign_unpinned_tag(prev_tag.limit, _prev.limit);
assign_unpinned_tag(prev_tag.proportion, _prev.proportion);
+ prev_tag.arrival = _prev.arrival;
last_tick = _tick;
}
HeapId heap_id;
Time when_ready;
};
+
+ inline explicit NextReq() :
+ type(NextReqType::none)
+ { }
+
+ inline NextReq(HeapId _heap_id) :
+ type(NextReqType::returning),
+ heap_id(_heap_id)
+ { }
+
+ inline NextReq(Time _when_ready) :
+ type(NextReqType::future),
+ when_ready(_when_ready)
+ { }
+
+ // calls to this are clearer than calls to the default
+ // constructor
+ static inline NextReq none() {
+ return NextReq();
+ }
};
// limit, this will allow the request next in terms of
// proportion to still get issued
bool allow_limit_break;
+ double anticipation_timeout;
std::atomic_bool finishing;
std::chrono::duration<Rep,Per> _idle_age,
std::chrono::duration<Rep,Per> _erase_age,
std::chrono::duration<Rep,Per> _check_time,
- bool _allow_limit_break) :
+ bool _allow_limit_break,
+ double _anticipation_timeout) :
client_info_f(_client_info_f),
allow_limit_break(_allow_limit_break),
+ anticipation_timeout(_anticipation_timeout),
finishing(false),
idle_age(std::chrono::duration_cast<Duration>(_idle_age)),
erase_age(std::chrono::duration_cast<Duration>(_erase_age)),
get_cli_info(client),
req_params,
time,
- cost);
+ cost,
+ anticipation_timeout);
// copy tag to previous tag for client
client.update_req_tag(tag, tick);
}
#else
- RequestTag tag(client.get_req_tag(), get_cli_info(client), req_params, time, cost);
+ RequestTag tag(client.get_req_tag(),
+ get_cli_info(client),
+ req_params,
+ time,
+ cost,
+ anticipation_timeout);
+
// copy tag to previous tag for client
client.update_req_tag(tag, tick);
#endif
ClientReq& next_first = top.next_request();
next_first.tag = RequestTag(tag, get_cli_info(top),
top.cur_delta, top.cur_rho,
- next_first.tag.arrival);
+ next_first.tag.arrival,
+ 0.0, anticipation_timeout);
// copy tag to previous tag for client
top.update_req_tag(next_first.tag, tick);
// data_mtx should be held when called
NextReq do_next_request(Time now) {
- NextReq result{};
-
- // if reservation queue is empty, all are empty (i.e., no active clients)
+ // if reservation queue is empty, all are empty (i.e., no
+ // active clients)
if(resv_heap.empty()) {
- result.type = NextReqType::none;
- return result;
+ return NextReq::none();
}
// try constraint (reservation) based scheduling
auto& reserv = resv_heap.top();
if (reserv.has_request() &&
reserv.next_request().tag.reservation <= now) {
- result.type = NextReqType::returning;
- result.heap_id = HeapId::reservation;
- return result;
+ return NextReq(HeapId::reservation);
}
// no existing reservations before now, so try weight-based
if (readys.has_request() &&
readys.next_request().tag.ready &&
readys.next_request().tag.proportion < max_tag) {
- result.type = NextReqType::returning;
- result.heap_id = HeapId::ready;
- return result;
+ return NextReq(HeapId::ready);
}
// if nothing is schedulable by reservation or
if (allow_limit_break) {
if (readys.has_request() &&
readys.next_request().tag.proportion < max_tag) {
- result.type = NextReqType::returning;
- result.heap_id = HeapId::ready;
- return result;
+ return NextReq(HeapId::ready);
} else if (reserv.has_request() &&
reserv.next_request().tag.reservation < max_tag) {
- result.type = NextReqType::returning;
- result.heap_id = HeapId::reservation;
- return result;
+ return NextReq(HeapId::reservation);
}
}
next_call = min_not_0_time(next_call, next.tag.limit);
}
if (next_call < TimeMax) {
- result.type = NextReqType::future;
- result.when_ready = next_call;
- return result;
+ return NextReq(next_call);
} else {
- result.type = NextReqType::none;
- return result;
+ return NextReq::none();
}
} // do_next_request
std::chrono::duration<Rep,Per> _idle_age,
std::chrono::duration<Rep,Per> _erase_age,
std::chrono::duration<Rep,Per> _check_time,
- bool _allow_limit_break = false) :
+ bool _allow_limit_break = false,
+ double _anticipation_timeout = 0.0) :
super(_client_info_f,
_idle_age, _erase_age, _check_time,
- _allow_limit_break)
+ _allow_limit_break, _anticipation_timeout)
{
// empty
}
std::chrono::duration<Rep,Per> _idle_age,
std::chrono::duration<Rep,Per> _erase_age,
std::chrono::duration<Rep,Per> _check_time,
- bool _allow_limit_break = false) :
+ bool _allow_limit_break = false,
+ double anticipation_timeout = 0.0) :
super(_client_info_f,
_idle_age, _erase_age, _check_time,
- _allow_limit_break)
+ _allow_limit_break, anticipation_timeout)
{
can_handle_f = _can_handle_f;
handle_f = _handle_f;
PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
CanHandleRequestFunc _can_handle_f,
HandleRequestFunc _handle_f,
- bool _allow_limit_break = false) :
+ bool _allow_limit_break = false,
+ double _anticipation_timeout = 0.0) :
PushPriorityQueue(_client_info_f,
_can_handle_f,
_handle_f,
std::chrono::minutes(10),
std::chrono::minutes(15),
std::chrono::minutes(6),
- _allow_limit_break)
+ _allow_limit_break,
+ _anticipation_timeout)
{
// empty
}
dmc::ServiceTracker<ServerId> st(std::chrono::seconds(2),
std::chrono::seconds(3));
-
auto rp1 = st.get_req_params(server1);
EXPECT_EQ(1u, rp1.delta) <<
"rho should be 1 with no intervening reservation responses by" <<
"other servers";
+ // RESPONSE
st.track_resp(server1, dmc::PhaseType::priority);
auto rp3 = st.get_req_params(server1);
"rho should be 1 with no intervening reservation responses by" <<
"other servers";
+ // RESPONSE
st.track_resp(server2, dmc::PhaseType::priority);
auto rp4 = st.get_req_params(server1);
- EXPECT_EQ(2u, rp4.delta) <<
+ EXPECT_EQ(1u, rp4.delta) <<
"delta should be 2 with one intervening priority response by " <<
"another server";
EXPECT_EQ(1u, rp4.rho) <<
"rho should be 1 with no intervening reservation responses by" <<
"other servers";
+ // RESPONSE
st.track_resp(server2, dmc::PhaseType::reservation);
auto rp6 = st.get_req_params(server1);
- EXPECT_EQ(2u, rp6.delta) <<
+ EXPECT_EQ(1u, rp6.delta) <<
"delta should be 2 with one intervening reservation response by " <<
"another server";
- EXPECT_EQ(2u, rp6.rho) <<
+ EXPECT_EQ(1u, rp6.rho) <<
"rho should be 2 with one intervening reservation responses by " <<
"another server";
- // auto rp6_b = st.get_req_params(server2);
-
st.track_resp(server2, dmc::PhaseType::reservation);
st.track_resp(server1, dmc::PhaseType::priority);
st.track_resp(server2, dmc::PhaseType::priority);
auto rp7 = st.get_req_params(server1);
EXPECT_EQ(5u, rp7.delta) <<
- "delta should be 5 with fourintervening responses by " <<
+ "delta should be 5 with four intervening responses by " <<
"another server";
- EXPECT_EQ(3u, rp7.rho) <<
- "rho should be 3 with two intervening reservation responses by " <<
+ EXPECT_EQ(1u, rp7.rho) <<
+ "rho should be 1 with two intervening reservation responses by " <<
"another server";
auto rp7b = st.get_req_params(server2);
- EXPECT_EQ(4u, rp7b.delta) <<
- "delta should be 4 with three intervening responses by " <<
+ EXPECT_EQ(9u, rp7b.delta) <<
+ "delta should be 9 with three intervening responses by " <<
"another server";
- EXPECT_EQ(2u, rp7b.rho) <<
- "rho should be 2 with one intervening reservation responses by " <<
+ EXPECT_EQ(4u, rp7b.rho) <<
+ "rho should be 4 with one intervening reservation responses by " <<
"another server";
auto rp8 = st.get_req_params(server1);
"rho should be 1 with no intervening reservation responses by " <<
"another server";
} // TEST
+
+
+ // NB: the BorrowingTracker has not been fully tested and the
+ // expected values below have not yet been compared with the
+ // theoretically correct values.
+ TEST(dmclock_client, orig_tracker_delta_rho_values) {
+ using ServerId = int;
+
+ ServerId server1 = 101;
+ ServerId server2 = 7;
+
+ dmc::ServiceTracker<ServerId,OrigTracker>
+ st(std::chrono::seconds(2), std::chrono::seconds(3));
+
+ auto rp1 = st.get_req_params(server1);
+
+ EXPECT_EQ(1u, rp1.delta);
+ EXPECT_EQ(1u, rp1.rho);
+
+ auto rp2 = st.get_req_params(server1);
+
+ EXPECT_EQ(1u, rp2.delta);
+ EXPECT_EQ(1u, rp2.rho);
+
+ st.track_resp(server1, dmc::PhaseType::priority);
+
+ auto rp3 = st.get_req_params(server1);
+
+ EXPECT_EQ(1u, rp3.delta);
+ EXPECT_EQ(1u, rp3.rho);
+
+ st.track_resp(server2, dmc::PhaseType::priority);
+
+ auto rp4 = st.get_req_params(server1);
+
+ EXPECT_EQ(2u, rp4.delta);
+ EXPECT_EQ(1u, rp4.rho);
+
+ auto rp5 = st.get_req_params(server1);
+
+ EXPECT_EQ(1u, rp5.delta);
+ EXPECT_EQ(1u, rp5.rho);
+
+ st.track_resp(server2, dmc::PhaseType::reservation);
+
+ auto rp6 = st.get_req_params(server1);
+
+ EXPECT_EQ(2u, rp6.delta);
+ EXPECT_EQ(2u, rp6.rho);
+
+ // auto rp6_b = st.get_req_params(server2);
+
+ st.track_resp(server2, dmc::PhaseType::reservation);
+ st.track_resp(server1, dmc::PhaseType::priority);
+ st.track_resp(server2, dmc::PhaseType::priority);
+ st.track_resp(server2, dmc::PhaseType::reservation);
+ st.track_resp(server1, dmc::PhaseType::reservation);
+ st.track_resp(server1, dmc::PhaseType::priority);
+ st.track_resp(server2, dmc::PhaseType::priority);
+
+ auto rp7 = st.get_req_params(server1);
+
+ EXPECT_EQ(5u, rp7.delta);
+ EXPECT_EQ(3u, rp7.rho);
+
+ auto rp7b = st.get_req_params(server2);
+
+ EXPECT_EQ(4u, rp7b.delta);
+ EXPECT_EQ(2u, rp7b.rho);
+
+ auto rp8 = st.get_req_params(server1);
+
+ EXPECT_EQ(1u, rp8.delta);
+ EXPECT_EQ(1u, rp8.rho);
+
+ auto rp8b = st.get_req_params(server2);
+ EXPECT_EQ(1u, rp8b.delta);
+ EXPECT_EQ(1u, rp8b.rho);
+ } // TEST
+
} // namespace dmclock
} // namespace crimson