}
}
-class C_flush_messages : public EventCallback {
- std::deque<std::pair<utime_t, Message*> > delay_queue;
- AsyncMessenger *msgr;
- DispatchQueue *dispatch_queue;
- uint64_t conn_id;
- public:
- C_flush_messages(std::deque<std::pair<utime_t, Message*> > &&q, AsyncMessenger *m,
- DispatchQueue *dq, uint64_t cid)
- : delay_queue(std::move(q)), msgr(m), dispatch_queue(dq), conn_id(cid) {}
- void do_request(int id) {
+void AsyncConnection::DelayedDelivery::flush() {
+ EventCenter::submit_to(
+ center->get_id(), [this] () mutable {
+ Mutex::Locker l(delay_lock);
while (!delay_queue.empty()) {
Message *m = delay_queue.front().second;
if (msgr->ms_can_fast_dispatch(m)) {
}
delay_queue.pop_front();
}
- delete this;
- }
-};
-
-void AsyncConnection::DelayedDelivery::flush() {
- Mutex::Locker l(delay_lock);
- center->dispatch_event_external(
- new C_flush_messages(std::move(delay_queue), msgr, dispatch_queue, conn_id));
- for (auto i : register_time_events)
- center->delete_time_event(i);
- register_time_events.clear();
+ for (auto i : register_time_events)
+ center->delete_time_event(i);
+ register_time_events.clear();
+ }, true);
}
void AsyncConnection::send_keepalive()
register_time_events.insert(center->create_time_event(delay_period*1000000, this));
}
void discard() {
- Mutex::Locker l(delay_lock);
- while (!delay_queue.empty()) {
- Message *m = delay_queue.front().second;
- dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
- m->put();
- delay_queue.pop_front();
- }
- for (auto i : register_time_events)
- center->delete_time_event(i);
- register_time_events.clear();
+ EventCenter::submit_to(center->get_id(), [this] () mutable {
+ Mutex::Locker l(delay_lock);
+ while (!delay_queue.empty()) {
+ Message *m = delay_queue.front().second;
+ dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
+ m->put();
+ delay_queue.pop_front();
+ }
+ for (auto i : register_time_events)
+ center->delete_time_event(i);
+ register_time_events.clear();
+ }, true);
}
void flush();
} *delay_state;