const Time& time, Cost cost,
CompletionToken&& token)
{
- boost::asio::async_completion<CompletionToken, Signature> init(token);
-
- auto ex1 = get_executor();
- auto& handler = init.completion_handler;
-
- // allocate the Request and add it to the queue
- auto completion = Completion::create(ex1, std::move(handler),
- Request{client, time, cost});
- // cast to unique_ptr<Request>
- auto req = RequestRef{std::move(completion)};
- int r = queue.add_request(std::move(req), client, params, time, cost);
- if (r == 0) {
- // schedule an immediate call to process() on the executor
- schedule(crimson::dmclock::TimeZero);
- if (auto c = counters(client)) {
- c->inc(queue_counters::l_qlen);
- c->inc(queue_counters::l_cost, cost);
- }
- } else {
- // post the error code
- boost::system::error_code ec(r, boost::system::system_category());
- // cast back to Completion
- auto completion = static_cast<Completion*>(req.release());
- async::post(std::unique_ptr<Completion>{completion},
- ec, PhaseType::priority);
- if (auto c = counters(client)) {
- c->inc(queue_counters::l_limit);
- c->inc(queue_counters::l_limit_cost, cost);
- }
- }
-
- return init.result.get();
+ return boost::asio::async_initiate<CompletionToken, Signature>(
+ [this] (auto handler, auto ex, const client_id& client,
+ const ReqParams& params, const Time& time, Cost cost) {
+ // allocate the Request and add it to the queue
+ auto completion = Completion::create(ex, std::move(handler),
+ Request{client, time, cost});
+ // cast to unique_ptr<Request>
+ auto req = RequestRef{std::move(completion)};
+ int r = queue.add_request(std::move(req), client, params, time, cost);
+ if (r == 0) {
+ // schedule an immediate call to process() on the executor
+ schedule(crimson::dmclock::TimeZero);
+ if (auto c = counters(client)) {
+ c->inc(queue_counters::l_qlen);
+ c->inc(queue_counters::l_cost, cost);
+ }
+ } else {
+ // post the error code
+ boost::system::error_code ec(r, boost::system::system_category());
+ // cast back to Completion
+ auto completion = static_cast<Completion*>(req.release());
+ async::post(std::unique_ptr<Completion>{completion},
+ ec, PhaseType::priority);
+ if (auto c = counters(client)) {
+ c->inc(queue_counters::l_limit);
+ c->inc(queue_counters::l_limit_cost, cost);
+ }
+ }
+ }, token, get_executor(), client, params, time, cost);
}
class SimpleThrottler : public md_config_obs_t, public dmclock::Scheduler {