if (!delay_state &&
async_msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(peer_type)) != string::npos) {
ldout(msgr->cct, 1) << __func__ << " setting up a delay queue" << dendl;
- delay_state = new DelayedDelivery(async_msgr, center);
+ delay_state = new DelayedDelivery(async_msgr, center, dispatch_queue, conn_id);
}
}
delay_queue.pop_front();
}
if (msgr->ms_can_fast_dispatch(m)) {
- msgr->ms_fast_dispatch(m);
+ dispatch_queue->fast_dispatch(m);
} else {
- msgr->ms_deliver_dispatch(m);
+ dispatch_queue->enqueue(m, m->get_priority(), conn_id);
}
}
class C_flush_messages : public EventCallback {
std::deque<std::pair<utime_t, Message*> > delay_queue;
AsyncMessenger *msgr;
+ DispatchQueue *dispatch_queue;
+ uint64_t conn_id;
public:
- C_flush_messages(std::deque<std::pair<utime_t, Message*> > &&q, AsyncMessenger *m): delay_queue(std::move(q)), msgr(m) {}
+ C_flush_messages(std::deque<std::pair<utime_t, Message*> > &&q, AsyncMessenger *m,
+ DispatchQueue *dq, uint64_t cid)
+ : delay_queue(std::move(q)), msgr(m), dispatch_queue(dq), conn_id(cid) {}
void do_request(int id) {
while (!delay_queue.empty()) {
Message *m = delay_queue.front().second;
if (msgr->ms_can_fast_dispatch(m)) {
- msgr->ms_fast_dispatch(m);
+ dispatch_queue->fast_dispatch(m);
} else {
- msgr->ms_deliver_dispatch(m);
+ dispatch_queue->enqueue(m, m->get_priority(), conn_id);
}
delay_queue.pop_front();
}
void AsyncConnection::DelayedDelivery::flush() {
Mutex::Locker l(delay_lock);
- center->dispatch_event_external(new C_flush_messages(std::move(delay_queue), msgr));
+ center->dispatch_event_external(
+ new C_flush_messages(std::move(delay_queue), msgr, dispatch_queue, conn_id));
for (auto i : register_time_events)
center->delete_time_event(i);
register_time_events.clear();
AsyncMessenger *msgr;
EventCenter *center;
DispatchQueue *dispatch_queue;
+ uint64_t conn_id;
public:
explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
- DispatchQueue *q)
+ DispatchQueue *q, uint64_t cid)
: delay_lock("AsyncConnection::DelayedDelivery::delay_lock"),
- msgr(omsgr), center(c), dispatch_queue(q) { }
+ msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid) { }
~DelayedDelivery() {
assert(register_time_events.empty());
assert(delay_queue.empty());