]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
dmclock: pickup latest dmclock subtree
authorKefu Chai <kchai@redhat.com>
Thu, 19 Oct 2017 05:19:17 +0000 (13:19 +0800)
committerKefu Chai <kchai@redhat.com>
Thu, 19 Oct 2017 05:20:29 +0000 (13:20 +0800)
to include the change of https://github.com/ceph/dmclock/pull/41
Merge commit '060a7777cb6fe3f052259e1324490cdbf7b49980'

Signed-off-by: Kefu Chai <kchai@redhat.com>
1  2 
src/dmclock/sim/src/test_dmclock_main.cc
src/dmclock/src/dmclock_server.h
src/dmclock/test/test_dmclock_server.cc

index f59b735465ba20f13f2ecb82a110e7e74fe1d87e,0000000000000000000000000000000000000000..ce9a31e404e84398293526cdd0d69d4e0a3473af
mode 100644,000000..100644
--- /dev/null
@@@ -1,329 -1,0 +1,329 @@@
-     auto client_info_f = [=](const ClientId& c) -> test::dmc::ClientInfo {
-       return client_info[ret_client_group_f(c)];
 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 +// vim: ts=8 sw=2 smarttab
 +
 +/*
 + * Copyright (C) 2016 Red Hat Inc.
 + */
 +
 +
 +#include "test_dmclock.h"
 +#include "config.h"
 +
 +#ifdef PROFILE
 +#include "profile.h"
 +#endif
 +
 +
 +namespace dmc = crimson::dmclock;
 +namespace test = crimson::test_dmc;
 +namespace sim = crimson::qos_simulation;
 +
 +using namespace std::placeholders;
 +
 +
 +namespace crimson {
 +    namespace test_dmc {
 +        void server_data(std::ostream& out,
 +                         test::MySim* sim,
 +                         test::MySim::ServerFilter server_disp_filter,
 +                         int head_w, int data_w, int data_prec);
 +
 +        void client_data(std::ostream& out,
 +                         test::MySim* sim,
 +                         test::MySim::ClientFilter client_disp_filter,
 +                         int head_w, int data_w, int data_prec);
 +    }
 +}
 +
 +
 +int main(int argc, char* argv[]) {
 +    std::vector<const char*> args;
 +    for (int i = 1; i < argc; ++i) {
 +      args.push_back(argv[i]);
 +    }
 +
 +    std::string conf_file_list;
 +    sim::ceph_argparse_early_args(args, &conf_file_list);
 +
 +    sim::sim_config_t g_conf;
 +    std::vector<sim::cli_group_t> &cli_group = g_conf.cli_group;
 +    std::vector<sim::srv_group_t> &srv_group = g_conf.srv_group;
 +
 +    if (!conf_file_list.empty()) {
 +      int ret;
 +      ret = sim::parse_config_file(conf_file_list, g_conf);
 +      if (ret) {
 +      // error
 +      _exit(1);
 +      }
 +    } else {
 +      // default simulation parameter
 +      g_conf.client_groups = 2;
 +
 +      sim::srv_group_t st;
 +      srv_group.push_back(st);
 +
 +      sim::cli_group_t ct1(99, 0);
 +      cli_group.push_back(ct1);
 +
 +      sim::cli_group_t ct2(1, 10);
 +      cli_group.push_back(ct2);
 +    }
 +
 +    const uint server_groups = g_conf.server_groups;
 +    const uint client_groups = g_conf.client_groups;
 +    const bool server_random_selection = g_conf.server_random_selection;
 +    const bool server_soft_limit = g_conf.server_soft_limit;
 +    const double anticipation_timeout = g_conf.anticipation_timeout;
 +    uint server_total_count = 0;
 +    uint client_total_count = 0;
 +
 +    for (uint i = 0; i < client_groups; ++i) {
 +      client_total_count += cli_group[i].client_count;
 +    }
 +
 +    for (uint i = 0; i < server_groups; ++i) {
 +      server_total_count += srv_group[i].server_count;
 +    }
 +
 +    std::vector<test::dmc::ClientInfo> client_info;
 +    for (uint i = 0; i < client_groups; ++i) {
 +      client_info.push_back(test::dmc::ClientInfo 
 +                        { cli_group[i].client_reservation,
 +                          cli_group[i].client_weight,
 +                          cli_group[i].client_limit } );
 +    }
 +
 +    auto ret_client_group_f = [&](const ClientId& c) -> uint {
 +      uint group_max = 0;
 +      uint i = 0;
 +      for (; i < client_groups; ++i) {
 +      group_max += cli_group[i].client_count;
 +      if (c < group_max) {
 +        break;
 +      }
 +      }
 +      return i;
 +    };
 +
 +    auto ret_server_group_f = [&](const ServerId& s) -> uint {
 +      uint group_max = 0;
 +      uint i = 0;
 +      for (; i < server_groups; ++i) {
 +      group_max += srv_group[i].server_count;
 +      if (s < group_max) {
 +        break;
 +      }
 +      }
 +      return i;
 +    };
 +
++    auto client_info_f = [=](const ClientId& c) -> const test::dmc::ClientInfo* {
++      return &client_info[ret_client_group_f(c)];
 +    };
 +
 +    auto client_disp_filter = [=] (const ClientId& i) -> bool {
 +        return i < 3 || i >= (client_total_count - 3);
 +    };
 +
 +    auto server_disp_filter = [=] (const ServerId& i) -> bool {
 +        return i < 3 || i >= (server_total_count - 3);
 +    };
 +
 +
 +    test::MySim *simulation;
 +  
 +
 +    // lambda to post a request to the identified server; called by client
 +    test::SubmitFunc server_post_f =
 +        [&simulation](const ServerId& server,
 +                      sim::TestRequest&& request,
 +                      const ClientId& client_id,
 +                      const test::dmc::ReqParams& req_params) {
 +        test::DmcServer& s = simulation->get_server(server);
 +        s.post(std::move(request), client_id, req_params);
 +    };
 +
 +    std::vector<std::vector<sim::CliInst>> cli_inst;
 +    for (uint i = 0; i < client_groups; ++i) {
 +      if (cli_group[i].client_wait == std::chrono::seconds(0)) {
 +      cli_inst.push_back(
 +          { { sim::req_op, 
 +              (uint32_t)cli_group[i].client_total_ops,
 +              (double)cli_group[i].client_iops_goal, 
 +              (uint16_t)cli_group[i].client_outstanding_ops } } );
 +      } else {
 +      cli_inst.push_back(
 +          { { sim::wait_op, cli_group[i].client_wait },
 +            { sim::req_op, 
 +              (uint32_t)cli_group[i].client_total_ops,
 +              (double)cli_group[i].client_iops_goal, 
 +              (uint16_t)cli_group[i].client_outstanding_ops } } );
 +      }
 +    }
 +
 +    simulation = new test::MySim();
 +
 +    test::DmcServer::ClientRespFunc client_response_f =
 +        [&simulation](ClientId client_id,
 +                      const sim::TestResponse& resp,
 +                      const ServerId& server_id,
 +                      const dmc::PhaseType& phase) {
 +        simulation->get_client(client_id).receive_response(resp,
 +                                                           server_id,
 +                                                           phase);
 +    };
 +
 +    test::CreateQueueF create_queue_f =
 +        [&](test::DmcQueue::CanHandleRequestFunc can_f,
 +            test::DmcQueue::HandleRequestFunc handle_f) -> test::DmcQueue* {
 +        return new test::DmcQueue(client_info_f,
 +                                  can_f,
 +                                  handle_f,
 +                                  server_soft_limit,
 +                                  anticipation_timeout);
 +    };
 +
 + 
 +    auto create_server_f = [&](ServerId id) -> test::DmcServer* {
 +      uint i = ret_server_group_f(id);
 +      return new test::DmcServer(id,
 +                                 srv_group[i].server_iops,
 +                               srv_group[i].server_threads,
 +                               client_response_f,
 +                               test::dmc_server_accumulate_f,
 +                               create_queue_f);
 +    };
 +
 +    auto create_client_f = [&](ClientId id) -> test::DmcClient* {
 +      uint i = ret_client_group_f(id);
 +      test::MySim::ClientBasedServerSelectFunc server_select_f;
 +      uint client_server_select_range = cli_group[i].client_server_select_range;
 +      if (!server_random_selection) {
 +      server_select_f = simulation->make_server_select_alt_range(client_server_select_range);
 +      } else {
 +      server_select_f = simulation->make_server_select_ran_range(client_server_select_range);
 +      }
 +      return new test::DmcClient(id,
 +                               server_post_f,
 +                               std::bind(server_select_f, _1, id),
 +                               test::dmc_client_accumulate_f,
 +                               cli_inst[i]);
 +    };
 +
 +#if 1
 +    std::cout << "[global]" << std::endl << g_conf << std::endl;
 +    for (uint i = 0; i < client_groups; ++i) {
 +      std::cout << std::endl << "[client." << i << "]" << std::endl;
 +      std::cout << cli_group[i] << std::endl;
 +    }
 +    for (uint i = 0; i < server_groups; ++i) {
 +      std::cout << std::endl << "[server." << i << "]" << std::endl;
 +      std::cout << srv_group[i] << std::endl;
 +    }
 +    std::cout << std::endl;
 +#endif
 +
 +    simulation->add_servers(server_total_count, create_server_f);
 +    simulation->add_clients(client_total_count, create_client_f);
 +
 +    simulation->run();
 +    simulation->display_stats(std::cout,
 +                              &test::server_data, &test::client_data,
 +                              server_disp_filter, client_disp_filter);
 +
 +    delete simulation;
 +} // main
 +
 +
 +void test::client_data(std::ostream& out,
 +                     test::MySim* sim,
 +                     test::MySim::ClientFilter client_disp_filter,
 +                     int head_w, int data_w, int data_prec) {
 +    // report how many ops were done by reservation and proportion for
 +    // each client
 +
 +    int total_r = 0;
 +    out << std::setw(head_w) << "res_ops:";
 +    for (uint i = 0; i < sim->get_client_count(); ++i) {
 +        const auto& client = sim->get_client(i);
 +        auto r = client.get_accumulator().reservation_count;
 +        total_r += r;
 +        if (!client_disp_filter(i)) continue;
 +        out << " " << std::setw(data_w) << r;
 +    }
 +    out << " " << std::setw(data_w) << std::setprecision(data_prec) <<
 +        std::fixed << total_r << std::endl;
 +
 +    int total_p = 0;
 +    out << std::setw(head_w) << "prop_ops:";
 +    for (uint i = 0; i < sim->get_client_count(); ++i) {
 +        const auto& client = sim->get_client(i);
 +        auto p = client.get_accumulator().proportion_count;
 +        total_p += p;
 +        if (!client_disp_filter(i)) continue;
 +        out << " " << std::setw(data_w) << p;
 +    }
 +    out << " " << std::setw(data_w) << std::setprecision(data_prec) <<
 +        std::fixed << total_p << std::endl;
 +}
 +
 +
 +void test::server_data(std::ostream& out,
 +                     test::MySim* sim,
 +                     test::MySim::ServerFilter server_disp_filter,
 +                     int head_w, int data_w, int data_prec) {
 +    out << std::setw(head_w) << "res_ops:";
 +    int total_r = 0;
 +    for (uint i = 0; i < sim->get_server_count(); ++i) {
 +        const auto& server = sim->get_server(i);
 +        auto rc = server.get_accumulator().reservation_count;
 +        total_r += rc;
 +        if (!server_disp_filter(i)) continue;
 +        out << " " << std::setw(data_w) << rc;
 +    }
 +    out << " " << std::setw(data_w) << std::setprecision(data_prec) <<
 +        std::fixed << total_r << std::endl;
 +
 +    out << std::setw(head_w) << "prop_ops:";
 +    int total_p = 0;
 +    for (uint i = 0; i < sim->get_server_count(); ++i) {
 +        const auto& server = sim->get_server(i);
 +        auto pc = server.get_accumulator().proportion_count;
 +        total_p += pc;
 +        if (!server_disp_filter(i)) continue;
 +        out << " " << std::setw(data_w) << pc;
 +    }
 +    out << " " << std::setw(data_w) << std::setprecision(data_prec) <<
 +        std::fixed << total_p << std::endl;
 +
 +    const auto& q = sim->get_server(0).get_priority_queue();
 +    out << std::endl <<
 +      " k-way heap: " << q.get_heap_branching_factor() << std::endl
 +      << std::endl;
 +
 +#ifdef PROFILE
 +    crimson::ProfileCombiner<std::chrono::nanoseconds> art_combiner;
 +    crimson::ProfileCombiner<std::chrono::nanoseconds> rct_combiner;
 +    for (uint i = 0; i < sim->get_server_count(); ++i) {
 +      const auto& q = sim->get_server(i).get_priority_queue();
 +      const auto& art = q.add_request_timer;
 +      art_combiner.combine(art);
 +      const auto& rct = q.request_complete_timer;
 +      rct_combiner.combine(rct);
 +    }
 +    out << "Server add_request_timer: count:" << art_combiner.get_count() <<
 +      ", mean:" << art_combiner.get_mean() <<
 +      ", std_dev:" << art_combiner.get_std_dev() <<
 +      ", low:" << art_combiner.get_low() <<
 +      ", high:" << art_combiner.get_high() << std::endl;
 +    out << "Server request_complete_timer: count:" << rct_combiner.get_count() <<
 +      ", mean:" << rct_combiner.get_mean() <<
 +      ", std_dev:" << rct_combiner.get_std_dev() <<
 +      ", low:" << rct_combiner.get_low() <<
 +      ", high:" << rct_combiner.get_high() << std::endl;
 +    out << "Server combined mean: " <<
 +      (art_combiner.get_mean() + rct_combiner.get_mean()) <<
 +      std::endl;
 +#endif
 +}
index 8f0e1925e05c3ae084860ea41d60d3143441299b,0000000000000000000000000000000000000000..b3178c52a0909101022545e66f16c617bf6571e2
mode 100644,000000..100644
--- /dev/null
@@@ -1,1652 -1,0 +1,1658 @@@
-       ClientInfo            info;
 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 +// vim: ts=8 sw=2 smarttab
 +
 +/*
 + * Copyright (C) 2017 Red Hat Inc.
 + */
 +
 +
 +#pragma once
 +
 +/* COMPILATION OPTIONS
 + *
 + * By default we include an optimization over the originally published
 + * dmclock algorithm using not the values of rho and delta that were
 + * sent in with a request but instead the most recent rho and delta
 + * values from the requests's client. To restore the algorithm's
 + * original behavior, define DO_NOT_DELAY_TAG_CALC (i.e., compiler
 + * argument -DDO_NOT_DELAY_TAG_CALC).
 + *
 + * The prop_heap does not seem to be necessary. The only thing it
 + * would help with is quickly finding the mininum proportion/prioity
 + * when an idle client became active. To have the code maintain the
 + * proportional heap, define USE_PROP_HEAP (i.e., compiler argument
 + * -DUSE_PROP_HEAP).
 + */
 +
 +#include <assert.h>
 +
 +#include <cmath>
 +#include <memory>
 +#include <map>
 +#include <deque>
 +#include <queue>
 +#include <atomic>
 +#include <mutex>
 +#include <condition_variable>
 +#include <thread>
 +#include <iostream>
 +#include <sstream>
 +#include <limits>
 +
 +#include <boost/variant.hpp>
 +
 +#include "indirect_intrusive_heap.h"
 +#include "run_every.h"
 +#include "dmclock_util.h"
 +#include "dmclock_recs.h"
 +
 +#ifdef PROFILE
 +#include "profile.h"
 +#endif
 +
 +
 +namespace crimson {
 +
 +  namespace dmclock {
 +
 +    namespace c = crimson;
 +
 +    constexpr double max_tag = std::numeric_limits<double>::is_iec559 ?
 +      std::numeric_limits<double>::infinity() :
 +      std::numeric_limits<double>::max();
 +    constexpr double min_tag = std::numeric_limits<double>::is_iec559 ?
 +      -std::numeric_limits<double>::infinity() :
 +      std::numeric_limits<double>::lowest();
 +    constexpr uint tag_modulo = 1000000;
 +
 +    struct ClientInfo {
 +      double reservation;  // minimum
 +      double weight;       // proportional
 +      double limit;        // maximum
 +
 +      // multiplicative inverses of above, which we use in calculations
 +      // and don't want to recalculate repeatedly
 +      double reservation_inv;
 +      double weight_inv;
 +      double limit_inv;
 +
 +      // order parameters -- min, "normal", max
 +      ClientInfo(double _reservation, double _weight, double _limit) :
 +      reservation(_reservation),
 +      weight(_weight),
 +      limit(_limit),
 +      reservation_inv(0.0 == reservation ? 0.0 : 1.0 / reservation),
 +      weight_inv(     0.0 == weight      ? 0.0 : 1.0 / weight),
 +      limit_inv(      0.0 == limit       ? 0.0 : 1.0 / limit)
 +      {
 +      // empty
 +      }
 +
 +
 +      friend std::ostream& operator<<(std::ostream& out,
 +                                    const ClientInfo& client) {
 +      out <<
 +        "{ ClientInfo:: r:" << client.reservation <<
 +        " w:" << std::fixed << client.weight <<
 +        " l:" << std::fixed << client.limit <<
 +        " 1/r:" << std::fixed << client.reservation_inv <<
 +        " 1/w:" << std::fixed << client.weight_inv <<
 +        " 1/l:" << std::fixed << client.limit_inv <<
 +        " }";
 +      return out;
 +      }
 +    }; // class ClientInfo
 +
 +
 +    struct RequestTag {
 +      double reservation;
 +      double proportion;
 +      double limit;
 +      bool   ready; // true when within limit
 +      Time   arrival;
 +
 +      RequestTag(const RequestTag& prev_tag,
 +               const ClientInfo& client,
 +               const uint32_t delta,
 +               const uint32_t rho,
 +               const Time time,
 +               const double cost = 0.0,
 +               const double anticipation_timeout = 0.0) :
 +      ready(false),
 +      arrival(time)
 +      {
 +      Time max_time = time;
 +      if (time - anticipation_timeout < prev_tag.arrival)
 +        max_time -= anticipation_timeout;
 +      
 +      reservation = cost + tag_calc(max_time,
 +                                    prev_tag.reservation,
 +                                    client.reservation_inv,
 +                                    rho,
 +                                    true);
 +      proportion = tag_calc(max_time,
 +                            prev_tag.proportion,
 +                            client.weight_inv,
 +                            delta,
 +                            true);
 +      limit = tag_calc(max_time,
 +                       prev_tag.limit,
 +                       client.limit_inv,
 +                       delta,
 +                       false);
 +
 +      assert(reservation < max_tag || proportion < max_tag);
 +      }
 +
 +      RequestTag(const RequestTag& prev_tag,
 +               const ClientInfo& client,
 +               const ReqParams req_params,
 +               const Time time,
 +               const double cost = 0.0,
 +               const double anticipation_timeout = 0.0) :
 +      RequestTag(prev_tag, client, req_params.delta, req_params.rho, time,
 +                 cost, anticipation_timeout)
 +      { /* empty */ }
 +
 +      RequestTag(double _res, double _prop, double _lim, const Time _arrival) :
 +      reservation(_res),
 +      proportion(_prop),
 +      limit(_lim),
 +      ready(false),
 +      arrival(_arrival)
 +      {
 +      assert(reservation < max_tag || proportion < max_tag);
 +      }
 +
 +      RequestTag(const RequestTag& other) :
 +      reservation(other.reservation),
 +      proportion(other.proportion),
 +      limit(other.limit),
 +      ready(other.ready),
 +      arrival(other.arrival)
 +      {
 +      // empty
 +      }
 +
 +      static std::string format_tag_change(double before, double after) {
 +      if (before == after) {
 +        return std::string("same");
 +      } else {
 +        std::stringstream ss;
 +        ss << format_tag(before) << "=>" << format_tag(after);
 +        return ss.str();
 +      }
 +      }
 +
 +      static std::string format_tag(double value) {
 +      if (max_tag == value) {
 +        return std::string("max");
 +      } else if (min_tag == value) {
 +        return std::string("min");
 +      } else {
 +        return format_time(value, tag_modulo);
 +      }
 +      }
 +
 +    private:
 +
 +      static double tag_calc(const Time time,
 +                           double prev,
 +                           double increment,
 +                           uint32_t dist_req_val,
 +                           bool extreme_is_high) {
 +      if (0.0 == increment) {
 +        return extreme_is_high ? max_tag : min_tag;
 +      } else {
 +        if (0 != dist_req_val) {
 +          increment *= dist_req_val;
 +        }
 +        return std::max(time, prev + increment);
 +      }
 +      }
 +
 +      friend std::ostream& operator<<(std::ostream& out,
 +                                    const RequestTag& tag) {
 +      out <<
 +        "{ RequestTag:: ready:" << (tag.ready ? "true" : "false") <<
 +        " r:" << format_tag(tag.reservation) <<
 +        " p:" << format_tag(tag.proportion) <<
 +        " l:" << format_tag(tag.limit) <<
 +#if 0 // try to resolve this to make sure Time is operator<<'able.
 +#ifndef DO_NOT_DELAY_TAG_CALC
 +        " arrival:" << tag.arrival <<
 +#endif
 +#endif
 +        " }";
 +      return out;
 +      }
 +    }; // class RequestTag
 +
 +
 +    // C is client identifier type, R is request type,
 +    // U1 determines whether to use client information function dynamically,
 +    // B is heap branching factor
 +    template<typename C, typename R, bool U1, uint B>
 +    class PriorityQueueBase {
 +      // we don't want to include gtest.h just for FRIEND_TEST
 +      friend class dmclock_server_client_idle_erase_Test;
 +
 +    public:
 +
 +      using RequestRef = std::unique_ptr<R>;
 +
 +    protected:
 +
 +      using TimePoint = decltype(std::chrono::steady_clock::now());
 +      using Duration = std::chrono::milliseconds;
 +      using MarkPoint = std::pair<TimePoint,Counter>;
 +
 +      enum class ReadyOption {ignore, lowers, raises};
 +
 +      // forward decl for friend decls
 +      template<double RequestTag::*, ReadyOption, bool>
 +      struct ClientCompare;
 +
 +      class ClientReq {
 +      friend PriorityQueueBase;
 +
 +      RequestTag tag;
 +      C          client_id;
 +      RequestRef request;
 +
 +      public:
 +
 +      ClientReq(const RequestTag& _tag,
 +                const C&          _client_id,
 +                RequestRef&&      _request) :
 +        tag(_tag),
 +        client_id(_client_id),
 +        request(std::move(_request))
 +      {
 +        // empty
 +      }
 +
 +      friend std::ostream& operator<<(std::ostream& out, const ClientReq& c) {
 +        out << "{ ClientReq:: tag:" << c.tag << " client:" <<
 +          c.client_id << " }";
 +        return out;
 +      }
 +      }; // class ClientReq
 +
 +    public:
 +
 +      // NOTE: ClientRec is in the "public" section for compatibility
 +      // with g++ 4.8.4, which complains if it's not. By g++ 6.3.1
 +      // ClientRec could be "protected" with no issue. [See comments
 +      // associated with function submit_top_request.]
 +      class ClientRec {
 +      friend PriorityQueueBase<C,R,U1,B>;
 +
 +      C                     client;
 +      RequestTag            prev_tag;
 +      std::deque<ClientReq> requests;
 +
 +      // amount added from the proportion tag as a result of
 +      // an idle client becoming unidle
 +      double                prop_delta = 0.0;
 +
 +      c::IndIntruHeapData   reserv_heap_data {};
 +      c::IndIntruHeapData   lim_heap_data {};
 +      c::IndIntruHeapData   ready_heap_data {};
 +#if USE_PROP_HEAP
 +      c::IndIntruHeapData   prop_heap_data {};
 +#endif
 +
 +      public:
 +
-                 const ClientInfo& _info,
++      const ClientInfo*     info;
 +      bool                  idle;
 +      Counter               last_tick;
 +      uint32_t              cur_rho;
 +      uint32_t              cur_delta;
 +
 +      ClientRec(C _client,
-       using ClientInfoFunc = std::function<ClientInfo(const C&)>;
++                const ClientInfo* _info,
 +                Counter current_tick) :
 +        client(_client),
 +        prev_tag(0.0, 0.0, 0.0, TimeZero),
 +        info(_info),
 +        idle(true),
 +        last_tick(current_tick),
 +        cur_rho(1),
 +        cur_delta(1)
 +      {
 +        // empty
 +      }
 +
 +      inline const RequestTag& get_req_tag() const {
 +        return prev_tag;
 +      }
 +
 +      static inline void assign_unpinned_tag(double& lhs, const double rhs) {
 +        if (rhs != max_tag && rhs != min_tag) {
 +          lhs = rhs;
 +        }
 +      }
 +
 +      inline void update_req_tag(const RequestTag& _prev,
 +                                 const Counter& _tick) {
 +        assign_unpinned_tag(prev_tag.reservation, _prev.reservation);
 +        assign_unpinned_tag(prev_tag.limit, _prev.limit);
 +        assign_unpinned_tag(prev_tag.proportion, _prev.proportion);
 +        prev_tag.arrival = _prev.arrival;
 +        last_tick = _tick;
 +      }
 +
 +      inline void add_request(const RequestTag& tag,
 +                              const C&          client_id,
 +                              RequestRef&&      request) {
 +        requests.emplace_back(ClientReq(tag, client_id, std::move(request)));
 +      }
 +
 +      inline const ClientReq& next_request() const {
 +        return requests.front();
 +      }
 +
 +      inline ClientReq& next_request() {
 +        return requests.front();
 +      }
 +
 +      inline void pop_request() {
 +        requests.pop_front();
 +      }
 +
 +      inline bool has_request() const {
 +        return !requests.empty();
 +      }
 +
 +      inline size_t request_count() const {
 +        return requests.size();
 +      }
 +
 +      // NB: because a deque is the underlying structure, this
 +      // operation might be expensive
 +      bool remove_by_req_filter_fw(std::function<bool(R&&)> filter_accum) {
 +        bool any_removed = false;
 +        for (auto i = requests.begin();
 +             i != requests.end();
 +             /* no inc */) {
 +          if (filter_accum(std::move(*i->request))) {
 +            any_removed = true;
 +            i = requests.erase(i);
 +          } else {
 +            ++i;
 +          }
 +        }
 +        return any_removed;
 +      }
 +
 +      // NB: because a deque is the underlying structure, this
 +      // operation might be expensive
 +      bool remove_by_req_filter_bw(std::function<bool(R&&)> filter_accum) {
 +        bool any_removed = false;
 +        for (auto i = requests.rbegin();
 +             i != requests.rend();
 +             /* no inc */) {
 +          if (filter_accum(std::move(*i->request))) {
 +            any_removed = true;
 +            i = decltype(i){ requests.erase(std::next(i).base()) };
 +          } else {
 +            ++i;
 +          }
 +        }
 +        return any_removed;
 +      }
 +
 +      inline bool
 +      remove_by_req_filter(std::function<bool(R&&)> filter_accum,
 +                           bool visit_backwards) {
 +        if (visit_backwards) {
 +          return remove_by_req_filter_bw(filter_accum);
 +        } else {
 +          return remove_by_req_filter_fw(filter_accum);
 +        }
 +      }
 +
 +      friend std::ostream&
 +      operator<<(std::ostream& out,
 +                 const typename PriorityQueueBase<C,R,U1,B>::ClientRec& e) {
 +        out << "{ ClientRec::" <<
 +          " client:" << e.client <<
 +          " prev_tag:" << e.prev_tag <<
 +          " req_count:" << e.requests.size() <<
 +          " top_req:";
 +        if (e.has_request()) {
 +          out << e.next_request();
 +        } else {
 +          out << "none";
 +        }
 +        out << " }";
 +
 +        return out;
 +      }
 +      }; // class ClientRec
 +
 +      using ClientRecRef = std::shared_ptr<ClientRec>;
 +
 +      // when we try to get the next request, we'll be in one of three
 +      // situations -- we'll have one to return, have one that can
 +      // fire in the future, or not have any
 +      enum class NextReqType { returning, future, none };
 +
 +      // specifies which queue next request will get popped from
 +      enum class HeapId { reservation, ready };
 +
 +      // this is returned from next_req to tell the caller the situation
 +      struct NextReq {
 +      NextReqType type;
 +      union {
 +        HeapId    heap_id;
 +        Time      when_ready;
 +      };
 +
 +      inline explicit NextReq() :
 +        type(NextReqType::none)
 +      { }
 +
 +      inline NextReq(HeapId _heap_id) :
 +        type(NextReqType::returning),
 +        heap_id(_heap_id)
 +      { }
 +
 +      inline NextReq(Time _when_ready) :
 +        type(NextReqType::future),
 +        when_ready(_when_ready)
 +      { }
 +
 +      // calls to this are clearer than calls to the default
 +      // constructor
 +      static inline NextReq none() {
 +        return NextReq();
 +      }
 +      };
 +
 +
 +      // a function that can be called to look up client information
-       inline const ClientInfo get_cli_info(ClientRec& client) const {
++      using ClientInfoFunc = std::function<const ClientInfo*(const C&)>;
 +
 +
 +      bool empty() const {
 +      DataGuard g(data_mtx);
 +      return (resv_heap.empty() || ! resv_heap.top().has_request());
 +      }
 +
 +
 +      size_t client_count() const {
 +      DataGuard g(data_mtx);
 +      return resv_heap.size();
 +      }
 +
 +
 +      size_t request_count() const {
 +      DataGuard g(data_mtx);
 +      size_t total = 0;
 +      for (auto i = resv_heap.cbegin(); i != resv_heap.cend(); ++i) {
 +        total += i->request_count();
 +      }
 +      return total;
 +      }
 +
 +
 +      bool remove_by_req_filter(std::function<bool(R&&)> filter_accum,
 +                              bool visit_backwards = false) {
 +      bool any_removed = false;
 +      DataGuard g(data_mtx);
 +      for (auto i : client_map) {
 +        bool modified =
 +          i.second->remove_by_req_filter(filter_accum, visit_backwards);
 +        if (modified) {
 +          resv_heap.adjust(*i.second);
 +          limit_heap.adjust(*i.second);
 +          ready_heap.adjust(*i.second);
 +#if USE_PROP_HEAP
 +          prop_heap.adjust(*i.second);
 +#endif
 +          any_removed = true;
 +        }
 +      }
 +      return any_removed;
 +      }
 +
 +
 +      // use as a default value when no accumulator is provide
 +      static void request_sink(R&& req) {
 +      // do nothing
 +      }
 +
 +
 +      void remove_by_client(const C& client,
 +                          bool reverse = false,
 +                          std::function<void (R&&)> accum = request_sink) {
 +      DataGuard g(data_mtx);
 +
 +      auto i = client_map.find(client);
 +
 +      if (i == client_map.end()) return;
 +
 +      if (reverse) {
 +        for (auto j = i->second->requests.rbegin();
 +             j != i->second->requests.rend();
 +             ++j) {
 +          accum(std::move(*j->request));
 +        }
 +      } else {
 +        for (auto j = i->second->requests.begin();
 +             j != i->second->requests.end();
 +             ++j) {
 +          accum(std::move(*j->request));
 +        }
 +      }
 +
 +      i->second->requests.clear();
 +
 +      resv_heap.adjust(*i->second);
 +      limit_heap.adjust(*i->second);
 +      ready_heap.adjust(*i->second);
 +#if USE_PROP_HEAP
 +      prop_heap.adjust(*i->second);
 +#endif
 +      }
 +
 +
 +      uint get_heap_branching_factor() const {
 +      return B;
 +      }
 +
 +
 +      void update_client_info(const C& client_id) {
 +      DataGuard g(data_mtx);
 +      auto client_it = client_map.find(client_id);
 +      if (client_map.end() != client_it) {
 +        ClientRec& client = (*client_it->second);
 +        client.info = client_info_f(client_id);
 +      }
 +      }
 +
 +
 +      void update_client_infos() {
 +      DataGuard g(data_mtx);
 +      for (auto i : client_map) {
 +        i.second->info = client_info_f(i.second->client);
 +      }
 +      }
 +
 +
 +      friend std::ostream& operator<<(std::ostream& out,
 +                                    const PriorityQueueBase& q) {
 +      std::lock_guard<decltype(q.data_mtx)> guard(q.data_mtx);
 +
 +      out << "{ PriorityQueue::";
 +      for (const auto& c : q.client_map) {
 +        out << "  { client:" << c.first << ", record:" << *c.second <<
 +          " }";
 +      }
 +      if (!q.resv_heap.empty()) {
 +        const auto& resv = q.resv_heap.top();
 +        out << " { reservation_top:" << resv << " }";
 +        const auto& ready = q.ready_heap.top();
 +        out << " { ready_top:" << ready << " }";
 +        const auto& limit = q.limit_heap.top();
 +        out << " { limit_top:" << limit << " }";
 +      } else {
 +        out << " HEAPS-EMPTY";
 +      }
 +      out << " }";
 +
 +      return out;
 +      }
 +
 +      // for debugging
 +      void display_queues(std::ostream& out,
 +                        bool show_res = true,
 +                        bool show_lim = true,
 +                        bool show_ready = true,
 +                        bool show_prop = true) const {
 +      auto filter = [](const ClientRec& e)->bool { return true; };
 +      DataGuard g(data_mtx);
 +      if (show_res) {
 +        resv_heap.display_sorted(out << "RESER:", filter);
 +      }
 +      if (show_lim) {
 +        limit_heap.display_sorted(out << "LIMIT:", filter);
 +      }
 +      if (show_ready) {
 +        ready_heap.display_sorted(out << "READY:", filter);
 +      }
 +#if USE_PROP_HEAP
 +      if (show_prop) {
 +        prop_heap.display_sorted(out << "PROPO:", filter);
 +      }
 +#endif
 +      } // display_queues
 +
 +
 +    protected:
 +
 +      // The ClientCompare functor is essentially doing a precedes?
 +      // operator, returning true if and only if the first parameter
 +      // must precede the second parameter. If the second must precede
 +      // the first, or if they are equivalent, false should be
 +      // returned. The reason for this behavior is that it will be
 +      // called to test if two items are out of order and if true is
 +      // returned it will reverse the items. Therefore false is the
 +      // default return when it doesn't matter to prevent unnecessary
 +      // re-ordering.
 +      //
 +      // The template is supporting variations in sorting based on the
 +      // heap in question and allowing these variations to be handled
 +      // at compile-time.
 +      //
 +      // tag_field determines which tag is being used for comparison
 +      //
 +      // ready_opt determines how the ready flag influences the sort
 +      //
 +      // use_prop_delta determines whether the proportional delta is
 +      // added in for comparison
 +      template<double RequestTag::*tag_field,
 +             ReadyOption ready_opt,
 +             bool use_prop_delta>
 +      struct ClientCompare {
 +      bool operator()(const ClientRec& n1, const ClientRec& n2) const {
 +        if (n1.has_request()) {
 +          if (n2.has_request()) {
 +            const auto& t1 = n1.next_request().tag;
 +            const auto& t2 = n2.next_request().tag;
 +            if (ReadyOption::ignore == ready_opt || t1.ready == t2.ready) {
 +              // if we don't care about ready or the ready values are the same
 +              if (use_prop_delta) {
 +                return (t1.*tag_field + n1.prop_delta) <
 +                  (t2.*tag_field + n2.prop_delta);
 +              } else {
 +                return t1.*tag_field < t2.*tag_field;
 +              }
 +            } else if (ReadyOption::raises == ready_opt) {
 +              // use_ready == true && the ready fields are different
 +              return t1.ready;
 +            } else {
 +              return t2.ready;
 +            }
 +          } else {
 +            // n1 has request but n2 does not
 +            return true;
 +          }
 +        } else if (n2.has_request()) {
 +          // n2 has request but n1 does not
 +          return false;
 +        } else {
 +          // both have none; keep stable w false
 +          return false;
 +        }
 +      }
 +      };
 +
 +      ClientInfoFunc        client_info_f;
 +      static constexpr bool is_dynamic_cli_info_f = U1;
 +
 +      mutable std::mutex data_mtx;
 +      using DataGuard = std::lock_guard<decltype(data_mtx)>;
 +
 +      // stable mapping between client ids and client queues
 +      std::map<C,ClientRecRef> client_map;
 +
 +      c::IndIntruHeap<ClientRecRef,
 +                    ClientRec,
 +                    &ClientRec::reserv_heap_data,
 +                    ClientCompare<&RequestTag::reservation,
 +                                  ReadyOption::ignore,
 +                                  false>,
 +                    B> resv_heap;
 +#if USE_PROP_HEAP
 +      c::IndIntruHeap<ClientRecRef,
 +                    ClientRec,
 +                    &ClientRec::prop_heap_data,
 +                    ClientCompare<&RequestTag::proportion,
 +                                  ReadyOption::ignore,
 +                                  true>,
 +                    B> prop_heap;
 +#endif
 +      c::IndIntruHeap<ClientRecRef,
 +                    ClientRec,
 +                    &ClientRec::lim_heap_data,
 +                    ClientCompare<&RequestTag::limit,
 +                                  ReadyOption::lowers,
 +                                  false>,
 +                    B> limit_heap;
 +      c::IndIntruHeap<ClientRecRef,
 +                    ClientRec,
 +                    &ClientRec::ready_heap_data,
 +                    ClientCompare<&RequestTag::proportion,
 +                                  ReadyOption::raises,
 +                                  true>,
 +                    B> ready_heap;
 +
 +      // if all reservations are met and all other requestes are under
 +      // limit, this will allow the request next in terms of
 +      // proportion to still get issued
 +      bool             allow_limit_break;
 +      double           anticipation_timeout;
 +
 +      std::atomic_bool finishing;
 +
 +      // every request creates a tick
 +      Counter tick = 0;
 +
 +      // performance data collection
 +      size_t reserv_sched_count = 0;
 +      size_t prop_sched_count = 0;
 +      size_t limit_break_sched_count = 0;
 +
 +      Duration                  idle_age;
 +      Duration                  erase_age;
 +      Duration                  check_time;
 +      std::deque<MarkPoint>     clean_mark_points;
 +
 +      // NB: All threads declared at end, so they're destructed first!
 +
 +      std::unique_ptr<RunEvery> cleaning_job;
 +
 +
 +      // COMMON constructor that others feed into; we can accept three
 +      // different variations of durations
 +      template<typename Rep, typename Per>
 +      PriorityQueueBase(ClientInfoFunc _client_info_f,
 +                      std::chrono::duration<Rep,Per> _idle_age,
 +                      std::chrono::duration<Rep,Per> _erase_age,
 +                      std::chrono::duration<Rep,Per> _check_time,
 +                      bool _allow_limit_break,
 +                      double _anticipation_timeout) :
 +      client_info_f(_client_info_f),
 +      allow_limit_break(_allow_limit_break),
 +      anticipation_timeout(_anticipation_timeout),
 +      finishing(false),
 +      idle_age(std::chrono::duration_cast<Duration>(_idle_age)),
 +      erase_age(std::chrono::duration_cast<Duration>(_erase_age)),
 +      check_time(std::chrono::duration_cast<Duration>(_check_time))
 +      {
 +      assert(_erase_age >= _idle_age);
 +      assert(_check_time < _idle_age);
 +      cleaning_job =
 +        std::unique_ptr<RunEvery>(
 +          new RunEvery(check_time,
 +                       std::bind(&PriorityQueueBase::do_clean, this)));
 +      }
 +
 +
 +      ~PriorityQueueBase() {
 +      finishing = true;
 +      }
 +
 +
-         ClientInfo info = client_info_f(client_id);
++      inline const ClientInfo* get_cli_info(ClientRec& client) const {
 +      if (is_dynamic_cli_info_f) {
 +        client.info = client_info_f(client.client);
 +      }
 +      return client.info;
 +      }
 +
 +
 +      // data_mtx must be held by caller
 +      void do_add_request(RequestRef&& request,
 +                        const C& client_id,
 +                        const ReqParams& req_params,
 +                        const Time time,
 +                        const double cost = 0.0) {
 +      ++tick;
 +
 +      // this pointer will help us create a reference to a shared
 +      // pointer, no matter which of two codepaths we take
 +      ClientRec* temp_client;
 +
 +      auto client_it = client_map.find(client_id);
 +      if (client_map.end() != client_it) {
 +        temp_client = &(*client_it->second); // address of obj of shared_ptr
 +      } else {
-                          get_cli_info(client),
++        const ClientInfo* info = client_info_f(client_id);
 +        ClientRecRef client_rec =
 +          std::make_shared<ClientRec>(client_id, info, tick);
 +        resv_heap.push(client_rec);
 +#if USE_PROP_HEAP
 +        prop_heap.push(client_rec);
 +#endif
 +        limit_heap.push(client_rec);
 +        ready_heap.push(client_rec);
 +        client_map[client_id] = client_rec;
 +        temp_client = &(*client_rec); // address of obj of shared_ptr
 +      }
 +
 +      // for convenience, we'll create a reference to the shared pointer
 +      ClientRec& client = *temp_client;
 +
 +      if (client.idle) {
 +        // We need to do an adjustment so that idle clients compete
 +        // fairly on proportional tags since those tags may have
 +        // drifted from real-time. Either use the lowest existing
 +        // proportion tag -- O(1) -- or the client with the lowest
 +        // previous proportion tag -- O(n) where n = # clients.
 +        //
 +        // So we don't have to maintain a propotional queue that
 +        // keeps the minimum on proportional tag alone (we're
 +        // instead using a ready queue), we'll have to check each
 +        // client.
 +        //
 +        // The alternative would be to maintain a proportional queue
 +        // (define USE_PROP_TAG) and do an O(1) operation here.
 +
 +        // Was unable to confirm whether equality testing on
 +        // std::numeric_limits<double>::max() is guaranteed, so
 +        // we'll use a compile-time calculated trigger that is one
 +        // third the max, which should be much larger than any
 +        // expected organic value.
 +        constexpr double lowest_prop_tag_trigger =
 +          std::numeric_limits<double>::max() / 3.0;
 +
 +        double lowest_prop_tag = std::numeric_limits<double>::max();
 +        for (auto const &c : client_map) {
 +          // don't use ourselves (or anything else that might be
 +          // listed as idle) since we're now in the map
 +          if (!c.second->idle) {
 +            double p;
 +            // use either lowest proportion tag or previous proportion tag
 +            if (c.second->has_request()) {
 +              p = c.second->next_request().tag.proportion +
 +                c.second->prop_delta;
 +            } else {
 +              p = c.second->get_req_tag().proportion + c.second->prop_delta;
 +            }
 +
 +            if (p < lowest_prop_tag) {
 +              lowest_prop_tag = p;
 +            }
 +          }
 +        }
 +
 +        // if this conditional does not fire, it
 +        if (lowest_prop_tag < lowest_prop_tag_trigger) {
 +          client.prop_delta = lowest_prop_tag - time;
 +        }
 +        client.idle = false;
 +      } // if this client was idle
 +
 +#ifndef DO_NOT_DELAY_TAG_CALC
 +      RequestTag tag(0, 0, 0, time);
 +
 +      if (!client.has_request()) {
++        const ClientInfo* client_info = get_cli_info(client);
++        assert(client_info);
 +        tag = RequestTag(client.get_req_tag(),
-                      get_cli_info(client),
++                         *client_info,
 +                         req_params,
 +                         time,
 +                         cost,
 +                           anticipation_timeout);
 +
 +        // copy tag to previous tag for client
 +        client.update_req_tag(tag, tick);
 +      }
 +#else
++      const ClientInfo* client_info = get_cli_info(client);
++      assert(client_info);
 +      RequestTag tag(client.get_req_tag(),
-         next_first.tag = RequestTag(tag, get_cli_info(top),
++                     *client_info,
 +                     req_params,
 +                     time,
 +                     cost,
 +                     anticipation_timeout);
 +
 +      // copy tag to previous tag for client
 +      client.update_req_tag(tag, tick);
 +#endif
 +
 +      client.add_request(tag, client.client, std::move(request));
 +      if (1 == client.requests.size()) {
 +        // NB: can the following 4 calls to adjust be changed
 +        // promote? Can adding a request ever demote a client in the
 +        // heaps?
 +        resv_heap.adjust(client);
 +        limit_heap.adjust(client);
 +        ready_heap.adjust(client);
 +#if USE_PROP_HEAP
 +        prop_heap.adjust(client);
 +#endif
 +      }
 +
 +      client.cur_rho = req_params.rho;
 +      client.cur_delta = req_params.delta;
 +
 +      resv_heap.adjust(client);
 +      limit_heap.adjust(client);
 +      ready_heap.adjust(client);
 +#if USE_PROP_HEAP
 +      prop_heap.adjust(client);
 +#endif
 +      } // add_request
 +
 +
 +      // data_mtx should be held when called; top of heap should have
 +      // a ready request
 +      template<typename C1, IndIntruHeapData ClientRec::*C2, typename C3>
 +      void pop_process_request(IndIntruHeap<C1, ClientRec, C2, C3, B>& heap,
 +                             std::function<void(const C& client,
 +                                                RequestRef& request)> process) {
 +      // gain access to data
 +      ClientRec& top = heap.top();
 +
 +      RequestRef request = std::move(top.next_request().request);
 +#ifndef DO_NOT_DELAY_TAG_CALC
 +      RequestTag tag = top.next_request().tag;
 +#endif
 +
 +      // pop request and adjust heaps
 +      top.pop_request();
 +
 +#ifndef DO_NOT_DELAY_TAG_CALC
 +      if (top.has_request()) {
 +        ClientReq& next_first = top.next_request();
-         r.tag.reservation -= client.info.reservation_inv;
++        const ClientInfo* client_info = get_cli_info(top);
++        assert(client_info);
++        next_first.tag = RequestTag(tag, *client_info,
 +                                    top.cur_delta, top.cur_rho,
 +                                    next_first.tag.arrival,
 +                                      0.0, anticipation_timeout);
 +
 +        // copy tag to previous tag for client
 +        top.update_req_tag(next_first.tag, tick);
 +      }
 +#endif
 +
 +      resv_heap.demote(top);
 +      limit_heap.adjust(top);
 +#if USE_PROP_HEAP
 +      prop_heap.demote(top);
 +#endif
 +      ready_heap.demote(top);
 +
 +      // process
 +      process(top.client, request);
 +      } // pop_process_request
 +
 +
 +      // data_mtx should be held when called
 +      void reduce_reservation_tags(ClientRec& client) {
 +      for (auto& r : client.requests) {
-       client.prev_tag.reservation -= client.info.reservation_inv;
++        r.tag.reservation -= client.info->reservation_inv;
 +
 +#ifndef DO_NOT_DELAY_TAG_CALC
 +        // reduce only for front tag. because next tags' value are invalid
 +        break;
 +#endif
 +      }
 +      // don't forget to update previous tag
++      client.prev_tag.reservation -= client.info->reservation_inv;
 +      resv_heap.promote(client);
 +      }
 +
 +
 +      // data_mtx should be held when called
 +      void reduce_reservation_tags(const C& client_id) {
 +      auto client_it = client_map.find(client_id);
 +
 +      // means the client was cleaned from map; should never happen
 +      // as long as cleaning times are long enough
 +      assert(client_map.end() != client_it);
 +      reduce_reservation_tags(*client_it->second);
 +      }
 +
 +
 +      // data_mtx should be held when called
 +      NextReq do_next_request(Time now) {
 +      // if reservation queue is empty, all are empty (i.e., no
 +      // active clients)
 +      if(resv_heap.empty()) {
 +        return NextReq::none();
 +      }
 +
 +      // try constraint (reservation) based scheduling
 +
 +      auto& reserv = resv_heap.top();
 +      if (reserv.has_request() &&
 +          reserv.next_request().tag.reservation <= now) {
 +        return NextReq(HeapId::reservation);
 +      }
 +
 +      // no existing reservations before now, so try weight-based
 +      // scheduling
 +
 +      // all items that are within limit are eligible based on
 +      // priority
 +      auto limits = &limit_heap.top();
 +      while (limits->has_request() &&
 +             !limits->next_request().tag.ready &&
 +             limits->next_request().tag.limit <= now) {
 +        limits->next_request().tag.ready = true;
 +        ready_heap.promote(*limits);
 +        limit_heap.demote(*limits);
 +
 +        limits = &limit_heap.top();
 +      }
 +
 +      auto& readys = ready_heap.top();
 +      if (readys.has_request() &&
 +          readys.next_request().tag.ready &&
 +          readys.next_request().tag.proportion < max_tag) {
 +        return NextReq(HeapId::ready);
 +      }
 +
 +      // if nothing is schedulable by reservation or
 +      // proportion/weight, and if we allow limit break, try to
 +      // schedule something with the lowest proportion tag or
 +      // alternatively lowest reservation tag.
 +      if (allow_limit_break) {
 +        if (readys.has_request() &&
 +            readys.next_request().tag.proportion < max_tag) {
 +          return NextReq(HeapId::ready);
 +        } else if (reserv.has_request() &&
 +                   reserv.next_request().tag.reservation < max_tag) {
 +          return NextReq(HeapId::reservation);
 +        }
 +      }
 +
 +      // nothing scheduled; make sure we re-run when next
 +      // reservation item or next limited item comes up
 +
 +      Time next_call = TimeMax;
 +      if (resv_heap.top().has_request()) {
 +        next_call =
 +          min_not_0_time(next_call,
 +                         resv_heap.top().next_request().tag.reservation);
 +      }
 +      if (limit_heap.top().has_request()) {
 +        const auto& next = limit_heap.top().next_request();
 +        assert(!next.tag.ready || max_tag == next.tag.proportion);
 +        next_call = min_not_0_time(next_call, next.tag.limit);
 +      }
 +      if (next_call < TimeMax) {
 +        return NextReq(next_call);
 +      } else {
 +        return NextReq::none();
 +      }
 +      } // do_next_request
 +
 +
 +      // if possible is not zero and less than current then return it;
 +      // otherwise return current; the idea is we're trying to find
 +      // the minimal time but ignoring zero
 +      static inline const Time& min_not_0_time(const Time& current,
 +                                             const Time& possible) {
 +      return TimeZero == possible ? current : std::min(current, possible);
 +      }
 +
 +
 +      /*
 +       * This is being called regularly by RunEvery. Every time it's
 +       * called it notes the time and delta counter (mark point) in a
 +       * deque. It also looks at the deque to find the most recent
 +       * mark point that is older than clean_age. It then walks the
 +       * map and delete all server entries that were last used before
 +       * that mark point.
 +       */
 +      void do_clean() {
 +      TimePoint now = std::chrono::steady_clock::now();
 +      DataGuard g(data_mtx);
 +      clean_mark_points.emplace_back(MarkPoint(now, tick));
 +
 +      // first erase the super-old client records
 +
 +      Counter erase_point = 0;
 +      auto point = clean_mark_points.front();
 +      while (point.first <= now - erase_age) {
 +        erase_point = point.second;
 +        clean_mark_points.pop_front();
 +        point = clean_mark_points.front();
 +      }
 +
 +      Counter idle_point = 0;
 +      for (auto i : clean_mark_points) {
 +        if (i.first <= now - idle_age) {
 +          idle_point = i.second;
 +        } else {
 +          break;
 +        }
 +      }
 +
 +      if (erase_point > 0 || idle_point > 0) {
 +        for (auto i = client_map.begin(); i != client_map.end(); /* empty */) {
 +          auto i2 = i++;
 +          if (erase_point && i2->second->last_tick <= erase_point) {
 +            delete_from_heaps(i2->second);
 +            client_map.erase(i2);
 +          } else if (idle_point && i2->second->last_tick <= idle_point) {
 +            i2->second->idle = true;
 +          }
 +        } // for
 +      } // if
 +      } // do_clean
 +
 +
 +      // data_mtx must be held by caller
 +      template<IndIntruHeapData ClientRec::*C1,typename C2>
 +      void delete_from_heap(ClientRecRef& client,
 +                          c::IndIntruHeap<ClientRecRef,ClientRec,C1,C2,B>& heap) {
 +      auto i = heap.rfind(client);
 +      heap.remove(i);
 +      }
 +
 +
 +      // data_mtx must be held by caller
 +      void delete_from_heaps(ClientRecRef& client) {
 +      delete_from_heap(client, resv_heap);
 +#if USE_PROP_HEAP
 +      delete_from_heap(client, prop_heap);
 +#endif
 +      delete_from_heap(client, limit_heap);
 +      delete_from_heap(client, ready_heap);
 +      }
 +    }; // class PriorityQueueBase
 +
 +
 +    template<typename C, typename R, bool U1=false, uint B=2>
 +    class PullPriorityQueue : public PriorityQueueBase<C,R,U1,B> {
 +      using super = PriorityQueueBase<C,R,U1,B>;
 +
 +    public:
 +
 +      // When a request is pulled, this is the return type.
 +      struct PullReq {
 +      struct Retn {
 +        C                           client;
 +        typename super::RequestRef  request;
 +        PhaseType                   phase;
 +      };
 +
 +      typename super::NextReqType   type;
 +      boost::variant<Retn,Time>     data;
 +
 +      bool is_none() const { return type == super::NextReqType::none; }
 +
 +      bool is_retn() const { return type == super::NextReqType::returning; }
 +      Retn& get_retn() {
 +        return boost::get<Retn>(data);
 +      }
 +
 +      bool is_future() const { return type == super::NextReqType::future; }
 +      Time getTime() const { return boost::get<Time>(data); }
 +      };
 +
 +
 +#ifdef PROFILE
 +      ProfileTimer<std::chrono::nanoseconds> pull_request_timer;
 +      ProfileTimer<std::chrono::nanoseconds> add_request_timer;
 +#endif
 +
 +      template<typename Rep, typename Per>
 +      PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
 +                      std::chrono::duration<Rep,Per> _idle_age,
 +                      std::chrono::duration<Rep,Per> _erase_age,
 +                      std::chrono::duration<Rep,Per> _check_time,
 +                      bool _allow_limit_break = false,
 +                      double _anticipation_timeout = 0.0) :
 +      super(_client_info_f,
 +            _idle_age, _erase_age, _check_time,
 +            _allow_limit_break, _anticipation_timeout)
 +      {
 +      // empty
 +      }
 +
 +
 +      // pull convenience constructor
 +      PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
 +                      bool _allow_limit_break = false) :
 +      PullPriorityQueue(_client_info_f,
 +                        std::chrono::minutes(10),
 +                        std::chrono::minutes(15),
 +                        std::chrono::minutes(6),
 +                        _allow_limit_break)
 +      {
 +      // empty
 +      }
 +
 +
 +      inline void add_request(R&& request,
 +                            const C& client_id,
 +                            const ReqParams& req_params,
 +                            double addl_cost = 0.0) {
 +      add_request(typename super::RequestRef(new R(std::move(request))),
 +                  client_id,
 +                  req_params,
 +                  get_time(),
 +                  addl_cost);
 +      }
 +
 +
 +      inline void add_request(R&& request,
 +                            const C& client_id,
 +                            double addl_cost = 0.0) {
 +      static const ReqParams null_req_params;
 +      add_request(typename super::RequestRef(new R(std::move(request))),
 +                  client_id,
 +                  null_req_params,
 +                  get_time(),
 +                  addl_cost);
 +      }
 +
 +
 +
 +      inline void add_request_time(R&& request,
 +                                 const C& client_id,
 +                                 const ReqParams& req_params,
 +                                 const Time time,
 +                                 double addl_cost = 0.0) {
 +      add_request(typename super::RequestRef(new R(std::move(request))),
 +                  client_id,
 +                  req_params,
 +                  time,
 +                  addl_cost);
 +      }
 +
 +
 +      inline void add_request(typename super::RequestRef&& request,
 +                            const C& client_id,
 +                            const ReqParams& req_params,
 +                            double addl_cost = 0.0) {
 +      add_request(request, req_params, client_id, get_time(), addl_cost);
 +      }
 +
 +
 +      inline void add_request(typename super::RequestRef&& request,
 +                            const C& client_id,
 +                            double addl_cost = 0.0) {
 +      static const ReqParams null_req_params;
 +      add_request(request, null_req_params, client_id, get_time(), addl_cost);
 +      }
 +
 +
 +      // this does the work; the versions above provide alternate interfaces
 +      void add_request(typename super::RequestRef&& request,
 +                     const C&                     client_id,
 +                     const ReqParams&             req_params,
 +                     const Time                   time,
 +                     double                       addl_cost = 0.0) {
 +      typename super::DataGuard g(this->data_mtx);
 +#ifdef PROFILE
 +      add_request_timer.start();
 +#endif
 +      super::do_add_request(std::move(request),
 +                            client_id,
 +                            req_params,
 +                            time,
 +                            addl_cost);
 +      // no call to schedule_request for pull version
 +#ifdef PROFILE
 +      add_request_timer.stop();
 +#endif
 +      }
 +
 +
 +      inline PullReq pull_request() {
 +      return pull_request(get_time());
 +      }
 +
 +
 +      PullReq pull_request(Time now) {
 +      PullReq result;
 +      typename super::DataGuard g(this->data_mtx);
 +#ifdef PROFILE
 +      pull_request_timer.start();
 +#endif
 +
 +      typename super::NextReq next = super::do_next_request(now);
 +      result.type = next.type;
 +      switch(next.type) {
 +      case super::NextReqType::none:
 +        return result;
 +      case super::NextReqType::future:
 +        result.data = next.when_ready;
 +        return result;
 +      case super::NextReqType::returning:
 +        // to avoid nesting, break out and let code below handle this case
 +        break;
 +      default:
 +        assert(false);
 +      }
 +
 +      // we'll only get here if we're returning an entry
 +
 +      auto process_f =
 +        [&] (PullReq& pull_result, PhaseType phase) ->
 +        std::function<void(const C&,
 +                           typename super::RequestRef&)> {
 +        return [&pull_result, phase](const C& client,
 +                                     typename super::RequestRef& request) {
 +          pull_result.data =
 +          typename PullReq::Retn{client, std::move(request), phase};
 +        };
 +      };
 +
 +      switch(next.heap_id) {
 +      case super::HeapId::reservation:
 +        super::pop_process_request(this->resv_heap,
 +                                   process_f(result, PhaseType::reservation));
 +        ++this->reserv_sched_count;
 +        break;
 +      case super::HeapId::ready:
 +        super::pop_process_request(this->ready_heap,
 +                                   process_f(result, PhaseType::priority));
 +        { // need to use retn temporarily
 +          auto& retn = boost::get<typename PullReq::Retn>(result.data);
 +          super::reduce_reservation_tags(retn.client);
 +        }
 +        ++this->prop_sched_count;
 +        break;
 +      default:
 +        assert(false);
 +      }
 +
 +#ifdef PROFILE
 +      pull_request_timer.stop();
 +#endif
 +      return result;
 +      } // pull_request
 +
 +
 +    protected:
 +
 +
 +      // data_mtx should be held when called; unfortunately this
 +      // function has to be repeated in both push & pull
 +      // specializations
 +      typename super::NextReq next_request() {
 +      return next_request(get_time());
 +      }
 +    }; // class PullPriorityQueue
 +
 +
 +    // PUSH version
 +    template<typename C, typename R, bool U1=false, uint B=2>
 +    class PushPriorityQueue : public PriorityQueueBase<C,R,U1,B> {
 +
 +    protected:
 +
 +      using super = PriorityQueueBase<C,R,U1,B>;
 +
 +    public:
 +
 +      // a function to see whether the server can handle another request
 +      using CanHandleRequestFunc = std::function<bool(void)>;
 +
 +      // a function to submit a request to the server; the second
 +      // parameter is a callback when it's completed
 +      using HandleRequestFunc =
 +      std::function<void(const C&,typename super::RequestRef,PhaseType)>;
 +
 +    protected:
 +
 +      CanHandleRequestFunc can_handle_f;
 +      HandleRequestFunc    handle_f;
 +      // for handling timed scheduling
 +      std::mutex  sched_ahead_mtx;
 +      std::condition_variable sched_ahead_cv;
 +      Time sched_ahead_when = TimeZero;
 +
 +#ifdef PROFILE
 +    public:
 +      ProfileTimer<std::chrono::nanoseconds> add_request_timer;
 +      ProfileTimer<std::chrono::nanoseconds> request_complete_timer;
 +    protected:
 +#endif
 +
 +      // NB: threads declared last, so constructed last and destructed first
 +
 +      std::thread sched_ahead_thd;
 +
 +    public:
 +
 +      // push full constructor
 +      template<typename Rep, typename Per>
 +      PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
 +                      CanHandleRequestFunc _can_handle_f,
 +                      HandleRequestFunc _handle_f,
 +                      std::chrono::duration<Rep,Per> _idle_age,
 +                      std::chrono::duration<Rep,Per> _erase_age,
 +                      std::chrono::duration<Rep,Per> _check_time,
 +                      bool _allow_limit_break = false,
 +                      double anticipation_timeout = 0.0) :
 +      super(_client_info_f,
 +            _idle_age, _erase_age, _check_time,
 +            _allow_limit_break, anticipation_timeout)
 +      {
 +      can_handle_f = _can_handle_f;
 +      handle_f = _handle_f;
 +      sched_ahead_thd = std::thread(&PushPriorityQueue::run_sched_ahead, this);
 +      }
 +
 +
 +      // push convenience constructor
 +      PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
 +                      CanHandleRequestFunc _can_handle_f,
 +                      HandleRequestFunc _handle_f,
 +                      bool _allow_limit_break = false,
 +                      double _anticipation_timeout = 0.0) :
 +      PushPriorityQueue(_client_info_f,
 +                        _can_handle_f,
 +                        _handle_f,
 +                        std::chrono::minutes(10),
 +                        std::chrono::minutes(15),
 +                        std::chrono::minutes(6),
 +                        _allow_limit_break,
 +                        _anticipation_timeout)
 +      {
 +      // empty
 +      }
 +
 +
 +      ~PushPriorityQueue() {
 +      this->finishing = true;
 +      sched_ahead_cv.notify_one();
 +      sched_ahead_thd.join();
 +      }
 +
 +    public:
 +
 +      inline void add_request(R&& request,
 +                            const C& client_id,
 +                            const ReqParams& req_params,
 +                            double addl_cost = 0.0) {
 +      add_request(typename super::RequestRef(new R(std::move(request))),
 +                  client_id,
 +                  req_params,
 +                  get_time(),
 +                  addl_cost);
 +      }
 +
 +
 +      inline void add_request(typename super::RequestRef&& request,
 +                            const C& client_id,
 +                            const ReqParams& req_params,
 +                            double addl_cost = 0.0) {
 +      add_request(request, req_params, client_id, get_time(), addl_cost);
 +      }
 +
 +
 +      inline void add_request_time(const R& request,
 +                                 const C& client_id,
 +                                 const ReqParams& req_params,
 +                                 const Time time,
 +                                 double addl_cost = 0.0) {
 +      add_request(typename super::RequestRef(new R(request)),
 +                  client_id,
 +                  req_params,
 +                  time,
 +                  addl_cost);
 +      }
 +
 +
 +      void add_request(typename super::RequestRef&& request,
 +                     const C& client_id,
 +                     const ReqParams& req_params,
 +                     const Time time,
 +                     double addl_cost = 0.0) {
 +      typename super::DataGuard g(this->data_mtx);
 +#ifdef PROFILE
 +      add_request_timer.start();
 +#endif
 +      super::do_add_request(std::move(request),
 +                            client_id,
 +                            req_params,
 +                            time,
 +                            addl_cost);
 +      schedule_request();
 +#ifdef PROFILE
 +      add_request_timer.stop();
 +#endif
 +      }
 +
 +
 +      void request_completed() {
 +      typename super::DataGuard g(this->data_mtx);
 +#ifdef PROFILE
 +      request_complete_timer.start();
 +#endif
 +      schedule_request();
 +#ifdef PROFILE
 +      request_complete_timer.stop();
 +#endif
 +      }
 +
 +    protected:
 +
 +      // data_mtx should be held when called; furthermore, the heap
 +      // should not be empty and the top element of the heap should
 +      // not be already handled
 +      //
 +      // NOTE: the use of "super::ClientRec" in either the template
 +      // construct or as a parameter to submit_top_request generated
 +      // a compiler error in g++ 4.8.4, when ClientRec was
 +      // "protected" rather than "public". By g++ 6.3.1 this was not
 +      // an issue. But for backwards compatibility
 +      // PriorityQueueBase::ClientRec is public.
 +      template<typename C1,
 +             IndIntruHeapData super::ClientRec::*C2,
 +             typename C3,
 +             uint B4>
 +      C submit_top_request(IndIntruHeap<C1,typename super::ClientRec,C2,C3,B4>& heap,
 +                         PhaseType phase) {
 +      C client_result;
 +      super::pop_process_request(heap,
 +                                 [this, phase, &client_result]
 +                                 (const C& client,
 +                                  typename super::RequestRef& request) {
 +                                   client_result = client;
 +                                   handle_f(client, std::move(request), phase);
 +                                 });
 +      return client_result;
 +      }
 +
 +
 +      // data_mtx should be held when called
 +      void submit_request(typename super::HeapId heap_id) {
 +      C client;
 +      switch(heap_id) {
 +      case super::HeapId::reservation:
 +        // don't need to note client
 +        (void) submit_top_request(this->resv_heap, PhaseType::reservation);
 +        // unlike the other two cases, we do not reduce reservation
 +        // tags here
 +        ++this->reserv_sched_count;
 +        break;
 +      case super::HeapId::ready:
 +        client = submit_top_request(this->ready_heap, PhaseType::priority);
 +        super::reduce_reservation_tags(client);
 +        ++this->prop_sched_count;
 +        break;
 +      default:
 +        assert(false);
 +      }
 +      } // submit_request
 +
 +
 +      // data_mtx should be held when called; unfortunately this
 +      // function has to be repeated in both push & pull
 +      // specializations
 +      typename super::NextReq next_request() {
 +      return next_request(get_time());
 +      }
 +
 +
 +      // data_mtx should be held when called; overrides member
 +      // function in base class to add check for whether a request can
 +      // be pushed to the server
 +      typename super::NextReq next_request(Time now) {
 +      if (!can_handle_f()) {
 +        typename super::NextReq result;
 +        result.type = super::NextReqType::none;
 +        return result;
 +      } else {
 +        return super::do_next_request(now);
 +      }
 +      } // next_request
 +
 +
 +      // data_mtx should be held when called
 +      void schedule_request() {
 +      typename super::NextReq next_req = next_request();
 +      switch (next_req.type) {
 +      case super::NextReqType::none:
 +        return;
 +      case super::NextReqType::future:
 +        sched_at(next_req.when_ready);
 +        break;
 +      case super::NextReqType::returning:
 +        submit_request(next_req.heap_id);
 +        break;
 +      default:
 +        assert(false);
 +      }
 +      }
 +
 +
 +      // this is the thread that handles running schedule_request at
 +      // future times when nothing can be scheduled immediately
 +      void run_sched_ahead() {
 +      std::unique_lock<std::mutex> l(sched_ahead_mtx);
 +
 +      while (!this->finishing) {
 +        if (TimeZero == sched_ahead_when) {
 +          sched_ahead_cv.wait(l);
 +        } else {
 +          Time now;
 +          while (!this->finishing && (now = get_time()) < sched_ahead_when) {
 +            long microseconds_l = long(1 + 1000000 * (sched_ahead_when - now));
 +            auto microseconds = std::chrono::microseconds(microseconds_l);
 +            sched_ahead_cv.wait_for(l, microseconds);
 +          }
 +          sched_ahead_when = TimeZero;
 +          if (this->finishing) return;
 +
 +          l.unlock();
 +          if (!this->finishing) {
 +            typename super::DataGuard g(this->data_mtx);
 +            schedule_request();
 +          }
 +          l.lock();
 +        }
 +      }
 +      }
 +
 +
 +      void sched_at(Time when) {
 +      std::lock_guard<std::mutex> l(sched_ahead_mtx);
 +      if (this->finishing) return;
 +      if (TimeZero == sched_ahead_when || when < sched_ahead_when) {
 +        sched_ahead_when = when;
 +        sched_ahead_cv.notify_one();
 +      }
 +      }
 +    }; // class PushPriorityQueue
 +
 +  } // namespace dmclock
 +} // namespace crimson
index 1d74e58683b82a12739429373676134f4aa4d916,0000000000000000000000000000000000000000..7e6e13463ba950bfc239e8c1e2846ce9425d4c43
mode 100644,000000..100644
--- /dev/null
@@@ -1,1010 -1,0 +1,1014 @@@
-       auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
-       if (client1 == c) return ci1;
-       else if (client2 == c) return ci2;
 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 +// vim: ts=8 sw=2 smarttab
 +/*
 + * Copyright (C) 2016 Red Hat Inc.
 + */
 +
 +
 +#include <memory>
 +#include <chrono>
 +#include <iostream>
 +#include <list>
 +#include <vector>
 +
 +
 +#include "dmclock_server.h"
 +#include "dmclock_util.h"
 +#include "gtest/gtest.h"
 +
 +// process control to prevent core dumps during gtest death tests
 +#include "dmcPrCtl.h"
 +
 +
 +namespace dmc = crimson::dmclock;
 +
 +
 +// we need a request object; an empty one will do
 +struct Request {
 +};
 +
 +
 +namespace crimson {
 +  namespace dmclock {
 +
 +    /*
 +     * Allows us to test the code provided with the mutex provided locked.
 +     */
 +    static void test_locked(std::mutex& mtx, std::function<void()> code) {
 +      std::unique_lock<std::mutex> l(mtx);
 +      code();
 +    }
 +
 +
 +    TEST(dmclock_server, bad_tag_deathtest) {
 +      using ClientId = int;
 +      using Queue = dmc::PullPriorityQueue<ClientId,Request>;
 +      using QueueRef = std::unique_ptr<Queue>;
 +
 +      ClientId client1 = 17;
 +      ClientId client2 = 18;
 +
 +      double reservation = 0.0;
 +      double weight = 0.0;
 +
 +      dmc::ClientInfo ci1(reservation, weight, 0.0);
 +      dmc::ClientInfo ci2(reservation, weight, 1.0);
 +
-         return ci1; // must return
++      auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++      if (client1 == c) return &ci1;
++      else if (client2 == c) return &ci2;
 +      else {
 +        ADD_FAILURE() << "got request from neither of two clients";
-       auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo { return ci; };
++        return nullptr;
 +      }
 +      };
 +
 +      QueueRef pq(new Queue(client_info_f, false));
 +      ReqParams req_params(1,1);
 +
 +      // Disable coredumps
 +      PrCtl unset_dumpable;
 +
 +      EXPECT_DEATH_IF_SUPPORTED(pq->add_request(Request{}, client1, req_params),
 +                              "Assertion.*reservation.*max_tag.*"
 +                              "proportion.*max_tag") <<
 +      "we should fail if a client tries to generate a reservation tag "
 +      "where reservation and proportion are both 0";
 +
 +
 +      EXPECT_DEATH_IF_SUPPORTED(pq->add_request(Request{}, client2, req_params),
 +                              "Assertion.*reservation.*max_tag.*"
 +                              "proportion.*max_tag") <<
 +      "we should fail if a client tries to generate a reservation tag "
 +      "where reservation and proportion are both 0";
 +    }
 +
 +
 +    TEST(dmclock_server, client_idle_erase) {
 +      using ClientId = int;
 +      using Queue = dmc::PushPriorityQueue<ClientId,Request>;
 +      int client = 17;
 +      double reservation = 100.0;
 +
 +      dmc::ClientInfo ci(reservation, 1.0, 0.0);
-       auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo { return ci; };
++      auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++      return &ci;
++      };
 +      auto server_ready_f = [] () -> bool { return true; };
 +      auto submit_req_f = [] (const ClientId& c,
 +                            std::unique_ptr<Request> req,
 +                            dmc::PhaseType phase) {
 +      // empty; do nothing
 +      };
 +
 +      Queue pq(client_info_f,
 +             server_ready_f,
 +             submit_req_f,
 +             std::chrono::seconds(3),
 +             std::chrono::seconds(5),
 +             std::chrono::seconds(2),
 +             false);
 +
 +      auto lock_pq = [&](std::function<void()> code) {
 +      test_locked(pq.data_mtx, code);
 +      };
 +
 +
 +      /* The timeline should be as follows:
 +       *
 +       *     0 seconds : request created
 +       *
 +       *     1 seconds : map is size 1, idle is false
 +       *
 +       * 2 seconds : clean notes first mark; +2 is base for further calcs
 +       *
 +       * 4 seconds : clean does nothing except makes another mark
 +       *
 +       *   5 seconds : when we're secheduled to idle (+2 + 3)
 +       *
 +       * 6 seconds : clean idles client
 +       *
 +       *   7 seconds : when we're secheduled to erase (+2 + 5)
 +       *
 +       *     7 seconds : verified client is idle
 +       *
 +       * 8 seconds : clean erases client info
 +       *
 +       *     9 seconds : verified client is erased
 +       */
 +
 +      lock_pq([&] () {
 +        EXPECT_EQ(0u, pq.client_map.size()) <<
 +          "client map initially has size 0";
 +      });
 +
 +      Request req;
 +      dmc::ReqParams req_params(1, 1);
 +      pq.add_request_time(req, client, req_params, dmc::get_time());
 +
 +      std::this_thread::sleep_for(std::chrono::seconds(1));
 +
 +      lock_pq([&] () {
 +        EXPECT_EQ(1u, pq.client_map.size()) <<
 +          "client map has 1 after 1 client";
 +        EXPECT_FALSE(pq.client_map.at(client)->idle) <<
 +          "initially client map entry shows not idle.";
 +      });
 +
 +      std::this_thread::sleep_for(std::chrono::seconds(6));
 +
 +      lock_pq([&] () {
 +        EXPECT_TRUE(pq.client_map.at(client)->idle) <<
 +          "after idle age client map entry shows idle.";
 +      });
 +
 +      std::this_thread::sleep_for(std::chrono::seconds(2));
 +
 +      lock_pq([&] () {
 +        EXPECT_EQ(0u, pq.client_map.size()) <<
 +          "client map loses its entry after erase age";
 +      });
 +    } // TEST
 +
 +
 +#if 0
 +    TEST(dmclock_server, reservation_timing) {
 +      using ClientId = int;
 +      // NB? PUSH OR PULL
 +      using Queue = std::unique_ptr<dmc::PriorityQueue<ClientId,Request>>;
 +      using std::chrono::steady_clock;
 +
 +      int client = 17;
 +
 +      std::vector<dmc::Time> times;
 +      std::mutex times_mtx;
 +      using Guard = std::lock_guard<decltype(times_mtx)>;
 +
 +      // reservation every second
 +      dmc::ClientInfo ci(1.0, 0.0, 0.0);
 +      Queue pq;
 +
-       auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
-       return info1;
++      auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++      return &ci;
++      };
 +      auto server_ready_f = [] () -> bool { return true; };
 +      auto submit_req_f = [&] (const ClientId& c,
 +                             std::unique_ptr<Request> req,
 +                             dmc::PhaseType phase) {
 +      {
 +        Guard g(times_mtx);
 +        times.emplace_back(dmc::get_time());
 +      }
 +      std::thread complete([&](){ pq->request_completed(); });
 +      complete.detach();
 +      };
 +
 +      // NB? PUSH OR PULL
 +      pq = Queue(new dmc::PriorityQueue<ClientId,Request>(client_info_f,
 +                                                        server_ready_f,
 +                                                        submit_req_f,
 +                                                        false));
 +
 +      Request req;
 +      ReqParams<ClientId> req_params(client, 1, 1);
 +
 +      for (int i = 0; i < 5; ++i) {
 +      pq->add_request_time(req, req_params, dmc::get_time());
 +      }
 +
 +      {
 +      Guard g(times_mtx);
 +      std::this_thread::sleep_for(std::chrono::milliseconds(5500));
 +      EXPECT_EQ(5, times.size()) <<
 +        "after 5.5 seconds, we should have 5 requests times at 1 second apart";
 +      }
 +    } // TEST
 +#endif
 +
 +
 +    TEST(dmclock_server, remove_by_req_filter) {
 +      struct MyReq {
 +      int id;
 +
 +      MyReq(int _id) :
 +        id(_id)
 +      {
 +        // empty
 +      }
 +      }; // MyReq
 +
 +      using ClientId = int;
 +      using Queue = dmc::PullPriorityQueue<ClientId,MyReq>;
 +
 +      ClientId client1 = 17;
 +      ClientId client2 = 98;
 +
 +      dmc::ClientInfo info1(0.0, 1.0, 0.0);
 +
-       auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
-       return info1;
++      auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++      return &info1;
 +      };
 +
 +      Queue pq(client_info_f, true);
 +
 +      EXPECT_EQ(0u, pq.client_count());
 +      EXPECT_EQ(0u, pq.request_count());
 +
 +      ReqParams req_params(1,1);
 +
 +      pq.add_request(MyReq(1), client1, req_params);
 +      pq.add_request(MyReq(11), client1, req_params);
 +      pq.add_request(MyReq(2), client2, req_params);
 +      pq.add_request(MyReq(0), client2, req_params);
 +      pq.add_request(MyReq(13), client2, req_params);
 +      pq.add_request(MyReq(2), client2, req_params);
 +      pq.add_request(MyReq(13), client2, req_params);
 +      pq.add_request(MyReq(98), client2, req_params);
 +      pq.add_request(MyReq(44), client1, req_params);
 +
 +      EXPECT_EQ(2u, pq.client_count());
 +      EXPECT_EQ(9u, pq.request_count());
 +
 +      pq.remove_by_req_filter([](const MyReq& r) -> bool {return 1 == r.id % 2;});
 +
 +      EXPECT_EQ(5u, pq.request_count());
 +
 +      std::list<MyReq> capture;
 +      pq.remove_by_req_filter(
 +      [&capture] (const MyReq& r) -> bool {
 +        if (0 == r.id % 2) {
 +          capture.push_front(r);
 +          return true;
 +        } else {
 +          return false;
 +        }
 +      },
 +      true);
 +
 +      EXPECT_EQ(0u, pq.request_count());
 +      EXPECT_EQ(5u, capture.size());
 +      int total = 0;
 +      for (auto i : capture) {
 +      total += i.id;
 +      }
 +      EXPECT_EQ(146, total) << " sum of captured items should be 146";
 +    } // TEST
 +
 +
 +    TEST(dmclock_server, remove_by_req_filter_ordering_forwards_visit) {
 +      struct MyReq {
 +      int id;
 +
 +      MyReq(int _id) :
 +        id(_id)
 +      {
 +        // empty
 +      }
 +      }; // MyReq
 +
 +      using ClientId = int;
 +      using Queue = dmc::PullPriorityQueue<ClientId,MyReq>;
 +
 +      ClientId client1 = 17;
 +
 +      dmc::ClientInfo info1(0.0, 1.0, 0.0);
 +
-       auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
-       return info1;
++      auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++      return &info1;
 +      };
 +
 +      Queue pq(client_info_f, true);
 +
 +      EXPECT_EQ(0u, pq.client_count());
 +      EXPECT_EQ(0u, pq.request_count());
 +
 +      ReqParams req_params(1,1);
 +
 +      pq.add_request(MyReq(1), client1, req_params);
 +      pq.add_request(MyReq(2), client1, req_params);
 +      pq.add_request(MyReq(3), client1, req_params);
 +      pq.add_request(MyReq(4), client1, req_params);
 +      pq.add_request(MyReq(5), client1, req_params);
 +      pq.add_request(MyReq(6), client1, req_params);
 +
 +      EXPECT_EQ(1u, pq.client_count());
 +      EXPECT_EQ(6u, pq.request_count());
 +
 +      // remove odd ids in forward order and append to end
 +
 +      std::vector<MyReq> capture;
 +      pq.remove_by_req_filter(
 +      [&capture] (const MyReq& r) -> bool {
 +        if (1 == r.id % 2) {
 +          capture.push_back(r);
 +          return true;
 +        } else {
 +          return false;
 +        }
 +      },
 +      false);
 +
 +      EXPECT_EQ(3u, pq.request_count());
 +      EXPECT_EQ(3u, capture.size());
 +      EXPECT_EQ(1, capture[0].id) << "items should come out in forward order";
 +      EXPECT_EQ(3, capture[1].id) << "items should come out in forward order";
 +      EXPECT_EQ(5, capture[2].id) << "items should come out in forward order";
 +
 +      // remove even ids in reverse order but insert at front so comes
 +      // out forwards
 +
 +      std::vector<MyReq> capture2;
 +      pq.remove_by_req_filter(
 +      [&capture2] (const MyReq& r) -> bool {
 +        if (0 == r.id % 2) {
 +          capture2.insert(capture2.begin(), r);
 +          return true;
 +        } else {
 +          return false;
 +        }
 +      },
 +      false);
 +
 +      EXPECT_EQ(0u, pq.request_count());
 +      EXPECT_EQ(3u, capture2.size());
 +      EXPECT_EQ(6, capture2[0].id) << "items should come out in reverse order";
 +      EXPECT_EQ(4, capture2[1].id) << "items should come out in reverse order";
 +      EXPECT_EQ(2, capture2[2].id) << "items should come out in reverse order";
 +    } // TEST
 +
 +
 +    TEST(dmclock_server, remove_by_req_filter_ordering_backwards_visit) {
 +      struct MyReq {
 +      int id;
 +
 +      MyReq(int _id) :
 +        id(_id)
 +      {
 +        // empty
 +      }
 +      }; // MyReq
 +
 +      using ClientId = int;
 +      using Queue = dmc::PullPriorityQueue<ClientId,MyReq>;
 +
 +      ClientId client1 = 17;
 +
 +      dmc::ClientInfo info1(0.0, 1.0, 0.0);
 +
-       auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
-       return info1;
++      auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++      return &info1;
 +      };
 +
 +      Queue pq(client_info_f, true);
 +
 +      EXPECT_EQ(0u, pq.client_count());
 +      EXPECT_EQ(0u, pq.request_count());
 +
 +      ReqParams req_params(1,1);
 +
 +      pq.add_request(MyReq(1), client1, req_params);
 +      pq.add_request(MyReq(2), client1, req_params);
 +      pq.add_request(MyReq(3), client1, req_params);
 +      pq.add_request(MyReq(4), client1, req_params);
 +      pq.add_request(MyReq(5), client1, req_params);
 +      pq.add_request(MyReq(6), client1, req_params);
 +
 +      EXPECT_EQ(1u, pq.client_count());
 +      EXPECT_EQ(6u, pq.request_count());
 +
 +      // now remove odd ids in forward order
 +
 +      std::vector<MyReq> capture;
 +      pq.remove_by_req_filter(
 +      [&capture] (const MyReq& r) -> bool {
 +        if (1 == r.id % 2) {
 +          capture.insert(capture.begin(), r);
 +          return true;
 +        } else {
 +          return false;
 +        }
 +      },
 +      true);
 +
 +      EXPECT_EQ(3u, pq.request_count());
 +      EXPECT_EQ(3u, capture.size());
 +      EXPECT_EQ(1, capture[0].id) << "items should come out in forward order";
 +      EXPECT_EQ(3, capture[1].id) << "items should come out in forward order";
 +      EXPECT_EQ(5, capture[2].id) << "items should come out in forward order";
 +
 +      // now remove even ids in reverse order
 +
 +      std::vector<MyReq> capture2;
 +      pq.remove_by_req_filter(
 +      [&capture2] (const MyReq& r) -> bool {
 +        if (0 == r.id % 2) {
 +          capture2.push_back(r);
 +          return true;
 +        } else {
 +          return false;
 +        }
 +      },
 +      true);
 +
 +      EXPECT_EQ(0u, pq.request_count());
 +      EXPECT_EQ(3u, capture2.size());
 +      EXPECT_EQ(6, capture2[0].id) << "items should come out in reverse order";
 +      EXPECT_EQ(4, capture2[1].id) << "items should come out in reverse order";
 +      EXPECT_EQ(2, capture2[2].id) << "items should come out in reverse order";
 +    } // TEST
 +
 +
 +    TEST(dmclock_server, remove_by_client) {
 +      struct MyReq {
 +      int id;
 +
 +      MyReq(int _id) :
 +        id(_id)
 +      {
 +        // empty
 +      }
 +      }; // MyReq
 +
 +      using ClientId = int;
 +      using Queue = dmc::PullPriorityQueue<ClientId,MyReq>;
 +
 +      ClientId client1 = 17;
 +      ClientId client2 = 98;
 +
 +      dmc::ClientInfo info1(0.0, 1.0, 0.0);
 +
-       auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
-       if (client1 == c) return info1;
-       else if (client2 == c) return info2;
++      auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++      return &info1;
 +      };
 +
 +      Queue pq(client_info_f, true);
 +
 +      EXPECT_EQ(0u, pq.client_count());
 +      EXPECT_EQ(0u, pq.request_count());
 +
 +      ReqParams req_params(1,1);
 +
 +      pq.add_request(MyReq(1), client1, req_params);
 +      pq.add_request(MyReq(11), client1, req_params);
 +      pq.add_request(MyReq(2), client2, req_params);
 +      pq.add_request(MyReq(0), client2, req_params);
 +      pq.add_request(MyReq(13), client2, req_params);
 +      pq.add_request(MyReq(2), client2, req_params);
 +      pq.add_request(MyReq(13), client2, req_params);
 +      pq.add_request(MyReq(98), client2, req_params);
 +      pq.add_request(MyReq(44), client1, req_params);
 +
 +      EXPECT_EQ(2u, pq.client_count());
 +      EXPECT_EQ(9u, pq.request_count());
 +
 +      std::list<MyReq> removed;
 +
 +      pq.remove_by_client(client1,
 +                        true,
 +                        [&removed] (const MyReq& r) {
 +                          removed.push_front(r);
 +                        });
 +
 +      EXPECT_EQ(3u, removed.size());
 +      EXPECT_EQ(1, removed.front().id);
 +      removed.pop_front();
 +      EXPECT_EQ(11, removed.front().id);
 +      removed.pop_front();
 +      EXPECT_EQ(44, removed.front().id);
 +      removed.pop_front();
 +
 +      EXPECT_EQ(6u, pq.request_count());
 +
 +      Queue::PullReq pr = pq.pull_request();
 +      EXPECT_TRUE(pr.is_retn());
 +      EXPECT_EQ(2, pr.get_retn().request->id);
 +
 +      pr = pq.pull_request();
 +      EXPECT_TRUE(pr.is_retn());
 +      EXPECT_EQ(0, pr.get_retn().request->id);
 +
 +      pq.remove_by_client(client2);
 +      EXPECT_EQ(0u, pq.request_count()) <<
 +      "after second client removed, none left";
 +    } // TEST
 +
 +
 +    TEST(dmclock_server_pull, pull_weight) {
 +      using ClientId = int;
 +      using Queue = dmc::PullPriorityQueue<ClientId,Request>;
 +      using QueueRef = std::unique_ptr<Queue>;
 +
 +      ClientId client1 = 17;
 +      ClientId client2 = 98;
 +
 +      dmc::ClientInfo info1(0.0, 1.0, 0.0);
 +      dmc::ClientInfo info2(0.0, 2.0, 0.0);
 +
 +      QueueRef pq;
 +
-         return info1;
++      auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++      if (client1 == c) return &info1;
++      else if (client2 == c) return &info2;
 +      else {
 +        ADD_FAILURE() << "client info looked up for non-existant client";
-       auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
-       if (client1 == c) return info1;
-       else if (client2 == c) return info2;
++        return nullptr;
 +      }
 +      };
 +
 +      pq = QueueRef(new Queue(client_info_f, false));
 +
 +      ReqParams req_params(1,1);
 +
 +      auto now = dmc::get_time();
 +
 +      for (int i = 0; i < 5; ++i) {
 +      pq->add_request(Request{}, client1, req_params);
 +      pq->add_request(Request{}, client2, req_params);
 +      now += 0.0001;
 +      }
 +
 +      int c1_count = 0;
 +      int c2_count = 0;
 +      for (int i = 0; i < 6; ++i) {
 +      Queue::PullReq pr = pq->pull_request();
 +      EXPECT_EQ(Queue::NextReqType::returning, pr.type);
 +      auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
 +
 +      if (client1 == retn.client) ++c1_count;
 +      else if (client2 == retn.client) ++c2_count;
 +      else ADD_FAILURE() << "got request from neither of two clients";
 +
 +      EXPECT_EQ(PhaseType::priority, retn.phase);
 +      }
 +
 +      EXPECT_EQ(2, c1_count) <<
 +      "one-third of request should have come from first client";
 +      EXPECT_EQ(4, c2_count) <<
 +      "two-thirds of request should have come from second client";
 +    }
 +
 +
 +    TEST(dmclock_server_pull, pull_reservation) {
 +      using ClientId = int;
 +      using Queue = dmc::PullPriorityQueue<ClientId,Request>;
 +      using QueueRef = std::unique_ptr<Queue>;
 +
 +      ClientId client1 = 52;
 +      ClientId client2 = 8;
 +
 +      dmc::ClientInfo info1(2.0, 0.0, 0.0);
 +      dmc::ClientInfo info2(1.0, 0.0, 0.0);
 +
-         return info1;
++      auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++      if (client1 == c) return &info1;
++      else if (client2 == c) return &info2;
 +      else {
 +        ADD_FAILURE() << "client info looked up for non-existant client";
-       auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
-       if (client1 == c) return info1;
-       else if (client2 == c) return info2;
++        return nullptr;
 +      }
 +      };
 +
 +      QueueRef pq(new Queue(client_info_f, false));
 +
 +      ReqParams req_params(1,1);
 +
 +      // make sure all times are well before now
 +      auto old_time = dmc::get_time() - 100.0;
 +
 +      for (int i = 0; i < 5; ++i) {
 +      pq->add_request_time(Request{}, client1, req_params, old_time);
 +      pq->add_request_time(Request{}, client2, req_params, old_time);
 +      old_time += 0.001;
 +      }
 +
 +      int c1_count = 0;
 +      int c2_count = 0;
 +
 +      for (int i = 0; i < 6; ++i) {
 +      Queue::PullReq pr = pq->pull_request();
 +      EXPECT_EQ(Queue::NextReqType::returning, pr.type);
 +      auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
 +
 +      if (client1 == retn.client) ++c1_count;
 +      else if (client2 == retn.client) ++c2_count;
 +      else ADD_FAILURE() << "got request from neither of two clients";
 +
 +      EXPECT_EQ(PhaseType::reservation, retn.phase);
 +      }
 +
 +      EXPECT_EQ(4, c1_count) <<
 +      "two-thirds of request should have come from first client";
 +      EXPECT_EQ(2, c2_count) <<
 +      "one-third of request should have come from second client";
 +    } // dmclock_server_pull.pull_reservation
 +
 +
 +    TEST(dmclock_server_pull, update_client_info) {
 +      using ClientId = int;
 +      using Queue = dmc::PullPriorityQueue<ClientId,Request,false>;
 +      using QueueRef = std::unique_ptr<Queue>;
 +
 +      ClientId client1 = 17;
 +      ClientId client2 = 98;
 +
 +      dmc::ClientInfo info1(0.0, 100.0, 0.0);
 +      dmc::ClientInfo info2(0.0, 200.0, 0.0);
 +
 +      QueueRef pq;
 +
-         return info1;
++      auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++      if (client1 == c) return &info1;
++      else if (client2 == c) return &info2;
 +      else {
 +        ADD_FAILURE() << "client info looked up for non-existant client";
-       auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
-       if (client1 == c) return info1[cli_info_group];
-       else if (client2 == c) return info2[cli_info_group];
++        return nullptr;
 +      }
 +      };
 +
 +      pq = QueueRef(new Queue(client_info_f, false));
 +
 +      ReqParams req_params(1,1);
 +
 +      auto now = dmc::get_time();
 +
 +      for (int i = 0; i < 5; ++i) {
 +      pq->add_request(Request{}, client1, req_params);
 +      pq->add_request(Request{}, client2, req_params);
 +      now += 0.0001;
 +      }
 +
 +      int c1_count = 0;
 +      int c2_count = 0;
 +      for (int i = 0; i < 10; ++i) {
 +      Queue::PullReq pr = pq->pull_request();
 +      EXPECT_EQ(Queue::NextReqType::returning, pr.type);
 +      auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
 +
 +      if (i > 5) continue;
 +      if (client1 == retn.client) ++c1_count;
 +      else if (client2 == retn.client) ++c2_count;
 +      else ADD_FAILURE() << "got request from neither of two clients";
 +
 +      EXPECT_EQ(PhaseType::priority, retn.phase);
 +      }
 +
 +      EXPECT_EQ(2, c1_count) <<
 +      "before: one-third of request should have come from first client";
 +      EXPECT_EQ(4, c2_count) <<
 +      "before: two-thirds of request should have come from second client";
 +
 +      std::chrono::seconds dura(1);
 +      std::this_thread::sleep_for(dura);
 +
 +      info1 = dmc::ClientInfo(0.0, 200.0, 0.0);
 +      pq->update_client_info(17);
 +
 +      now = dmc::get_time();
 +
 +      for (int i = 0; i < 5; ++i) {
 +      pq->add_request(Request{}, client1, req_params);
 +      pq->add_request(Request{}, client2, req_params);
 +      now += 0.0001;
 +      }
 +
 +      c1_count = 0;
 +      c2_count = 0;
 +      for (int i = 0; i < 6; ++i) {
 +      Queue::PullReq pr = pq->pull_request();
 +      EXPECT_EQ(Queue::NextReqType::returning, pr.type);
 +      auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
 +
 +      if (client1 == retn.client) ++c1_count;
 +      else if (client2 == retn.client) ++c2_count;
 +      else ADD_FAILURE() << "got request from neither of two clients";
 +
 +      EXPECT_EQ(PhaseType::priority, retn.phase);
 +      }
 +
 +      EXPECT_EQ(3, c1_count) <<
 +      "after: one-third of request should have come from first client";
 +      EXPECT_EQ(3, c2_count) <<
 +      "after: two-thirds of request should have come from second client";
 +    }
 +
 +
 +    TEST(dmclock_server_pull, dynamic_cli_info_f) {
 +      using ClientId = int;
 +      using Queue = dmc::PullPriorityQueue<ClientId,Request,true>;
 +      using QueueRef = std::unique_ptr<Queue>;
 +
 +      ClientId client1 = 17;
 +      ClientId client2 = 98;
 +
 +      std::vector<dmc::ClientInfo> info1;
 +      std::vector<dmc::ClientInfo> info2;
 +
 +      info1.push_back(dmc::ClientInfo(0.0, 100.0, 0.0));
 +      info1.push_back(dmc::ClientInfo(0.0, 150.0, 0.0));
 +
 +      info2.push_back(dmc::ClientInfo(0.0, 200.0, 0.0));
 +      info2.push_back(dmc::ClientInfo(0.0, 50.0, 0.0));
 +
 +      uint cli_info_group = 0;
 +
 +      QueueRef pq;
 +
-         return info1[0];
++      auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++      if (client1 == c) return &info1[cli_info_group];
++      else if (client2 == c) return &info2[cli_info_group];
 +      else {
 +        ADD_FAILURE() << "client info looked up for non-existant client";
-       auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
-       if (client1 == c) return info1;
-       else if (client2 == c) return info2;
++        return nullptr;
 +      }
 +      };
 +
 +      pq = QueueRef(new Queue(client_info_f, false));
 +
 +      ReqParams req_params(1,1);
 +
 +      auto now = dmc::get_time();
 +
 +      for (int i = 0; i < 5; ++i) {
 +      pq->add_request(Request{}, client1, req_params);
 +      pq->add_request(Request{}, client2, req_params);
 +      now += 0.0001;
 +      }
 +
 +      int c1_count = 0;
 +      int c2_count = 0;
 +      for (int i = 0; i < 10; ++i) {
 +      Queue::PullReq pr = pq->pull_request();
 +      EXPECT_EQ(Queue::NextReqType::returning, pr.type);
 +      auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
 +
 +      if (i > 5) continue;
 +      if (client1 == retn.client) ++c1_count;
 +      else if (client2 == retn.client) ++c2_count;
 +      else ADD_FAILURE() << "got request from neither of two clients";
 +
 +      EXPECT_EQ(PhaseType::priority, retn.phase);
 +      }
 +
 +      EXPECT_EQ(2, c1_count) <<
 +      "before: one-third of request should have come from first client";
 +      EXPECT_EQ(4, c2_count) <<
 +      "before: two-thirds of request should have come from second client";
 +
 +      std::chrono::seconds dura(1);
 +      std::this_thread::sleep_for(dura);
 +
 +      cli_info_group = 1;
 + 
 +      now = dmc::get_time();
 +
 +      for (int i = 0; i < 6; ++i) {
 +      pq->add_request(Request{}, client1, req_params);
 +      pq->add_request(Request{}, client2, req_params);
 +      now += 0.0001;
 +      }
 +
 +      c1_count = 0;
 +      c2_count = 0;
 +      for (int i = 0; i < 8; ++i) {
 +      Queue::PullReq pr = pq->pull_request();
 +      EXPECT_EQ(Queue::NextReqType::returning, pr.type);
 +      auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
 +
 +      if (client1 == retn.client) ++c1_count;
 +      else if (client2 == retn.client) ++c2_count;
 +      else ADD_FAILURE() << "got request from neither of two clients";
 +
 +      EXPECT_EQ(PhaseType::priority, retn.phase);
 +      }
 +
 +      EXPECT_EQ(6, c1_count) <<
 +      "after: one-third of request should have come from first client";
 +      EXPECT_EQ(2, c2_count) <<
 +      "after: two-thirds of request should have come from second client";
 +    }
 +
 +
 +    // This test shows what happens when a request can be ready (under
 +    // limit) but not schedulable since proportion tag is 0. We expect
 +    // to get some future and none responses.
 +    TEST(dmclock_server_pull, ready_and_under_limit) {
 +      using ClientId = int;
 +      using Queue = dmc::PullPriorityQueue<ClientId,Request>;
 +      using QueueRef = std::unique_ptr<Queue>;
 +
 +      ClientId client1 = 52;
 +      ClientId client2 = 8;
 +
 +      dmc::ClientInfo info1(1.0, 0.0, 0.0);
 +      dmc::ClientInfo info2(1.0, 0.0, 0.0);
 +
-         return info1;
++      auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++      if (client1 == c) return &info1;
++      else if (client2 == c) return &info2;
 +      else {
 +        ADD_FAILURE() << "client info looked up for non-existant client";
-       auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
-       return info;
++        return nullptr;
 +      }
 +      };
 +
 +      QueueRef pq(new Queue(client_info_f, false));
 +
 +      ReqParams req_params(1,1);
 +
 +      // make sure all times are well before now
 +      auto start_time = dmc::get_time() - 100.0;
 +
 +      // add six requests; for same client reservations spaced one apart
 +      for (int i = 0; i < 3; ++i) {
 +      pq->add_request_time(Request{}, client1, req_params, start_time);
 +      pq->add_request_time(Request{}, client2, req_params, start_time);
 +      }
 +
 +      Queue::PullReq pr = pq->pull_request(start_time + 0.5);
 +      EXPECT_EQ(Queue::NextReqType::returning, pr.type);
 +
 +      pr = pq->pull_request(start_time + 0.5);
 +      EXPECT_EQ(Queue::NextReqType::returning, pr.type);
 +
 +      pr = pq->pull_request(start_time + 0.5);
 +      EXPECT_EQ(Queue::NextReqType::future, pr.type) <<
 +      "too soon for next reservation";
 +
 +      pr = pq->pull_request(start_time + 1.5);
 +      EXPECT_EQ(Queue::NextReqType::returning, pr.type);
 +
 +      pr = pq->pull_request(start_time + 1.5);
 +      EXPECT_EQ(Queue::NextReqType::returning, pr.type);
 +
 +      pr = pq->pull_request(start_time + 1.5);
 +      EXPECT_EQ(Queue::NextReqType::future, pr.type) <<
 +      "too soon for next reservation";
 +
 +      pr = pq->pull_request(start_time + 2.5);
 +      EXPECT_EQ(Queue::NextReqType::returning, pr.type);
 +
 +      pr = pq->pull_request(start_time + 2.5);
 +      EXPECT_EQ(Queue::NextReqType::returning, pr.type);
 +
 +      pr = pq->pull_request(start_time + 2.5);
 +      EXPECT_EQ(Queue::NextReqType::none, pr.type) << "no more requests left";
 +    }
 +
 +
 +    TEST(dmclock_server_pull, pull_none) {
 +      using ClientId = int;
 +      using Queue = dmc::PullPriorityQueue<ClientId,Request>;
 +      using QueueRef = std::unique_ptr<Queue>;
 +
 +      dmc::ClientInfo info(1.0, 1.0, 1.0);
 +
-       auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
-       return info;
++      auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++      return &info;
 +      };
 +
 +      QueueRef pq(new Queue(client_info_f, false));
 +
 +      // Request req;
 +      ReqParams req_params(1,1);
 +
 +      auto now = dmc::get_time();
 +
 +      Queue::PullReq pr = pq->pull_request(now + 100);
 +
 +      EXPECT_EQ(Queue::NextReqType::none, pr.type);
 +    }
 +
 +
 +    TEST(dmclock_server_pull, pull_future) {
 +      using ClientId = int;
 +      using Queue = dmc::PullPriorityQueue<ClientId,Request>;
 +      using QueueRef = std::unique_ptr<Queue>;
 +
 +      ClientId client1 = 52;
 +      // ClientId client2 = 8;
 +
 +      dmc::ClientInfo info(1.0, 0.0, 1.0);
 +
-       auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
-       return info;
++      auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++      return &info;
 +      };
 +
 +      QueueRef pq(new Queue(client_info_f, false));
 +
 +      ReqParams req_params(1,1);
 +
 +      // make sure all times are well before now
 +      auto now = dmc::get_time();
 +
 +      pq->add_request_time(Request{}, client1, req_params, now + 100);
 +      Queue::PullReq pr = pq->pull_request(now);
 +
 +      EXPECT_EQ(Queue::NextReqType::future, pr.type);
 +
 +      Time when = boost::get<Time>(pr.data);
 +      EXPECT_EQ(now + 100, when);
 +    }
 +
 +
 +    TEST(dmclock_server_pull, pull_future_limit_break_weight) {
 +      using ClientId = int;
 +      using Queue = dmc::PullPriorityQueue<ClientId,Request>;
 +      using QueueRef = std::unique_ptr<Queue>;
 +
 +      ClientId client1 = 52;
 +      // ClientId client2 = 8;
 +
 +      dmc::ClientInfo info(0.0, 1.0, 1.0);
 +
-       auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
-       return info;
++      auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++      return &info;
 +      };
 +
 +      QueueRef pq(new Queue(client_info_f, true));
 +
 +      ReqParams req_params(1,1);
 +
 +      // make sure all times are well before now
 +      auto now = dmc::get_time();
 +
 +      pq->add_request_time(Request{}, client1, req_params, now + 100);
 +      Queue::PullReq pr = pq->pull_request(now);
 +
 +      EXPECT_EQ(Queue::NextReqType::returning, pr.type);
 +
 +      auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
 +      EXPECT_EQ(client1, retn.client);
 +    }
 +
 +
 +    TEST(dmclock_server_pull, pull_future_limit_break_reservation) {
 +      using ClientId = int;
 +      using Queue = dmc::PullPriorityQueue<ClientId,Request>;
 +      using QueueRef = std::unique_ptr<Queue>;
 +
 +      ClientId client1 = 52;
 +      // ClientId client2 = 8;
 +
 +      dmc::ClientInfo info(1.0, 0.0, 1.0);
 +
++      auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
++      return &info;
 +      };
 +
 +      QueueRef pq(new Queue(client_info_f, true));
 +
 +      ReqParams req_params(1,1);
 +
 +      // make sure all times are well before now
 +      auto now = dmc::get_time();
 +
 +      pq->add_request_time(Request{}, client1, req_params, now + 100);
 +      Queue::PullReq pr = pq->pull_request(now);
 +
 +      EXPECT_EQ(Queue::NextReqType::returning, pr.type);
 +
 +      auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
 +      EXPECT_EQ(client1, retn.client);
 +    }
 +  } // namespace dmclock
 +} // namespace crimson