session_security.reset();
}
+ if (delay_state)
+ assert(delay_state->ready());
dispatch_queue->queue_connect(this);
async_msgr->ms_deliver_handle_fast_connect(this);
state = STATE_OPEN;
memset(&connect_msg, 0, sizeof(connect_msg));
+ if (delay_state)
+ assert(delay_state->ready());
// make sure no pending tick timer
center->delete_time_event(last_tick_id);
last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
{
Mutex::Locker l(delay_lock);
register_time_events.erase(id);
+ if (stop_dispatch)
+ return ;
if (delay_queue.empty())
return ;
utime_t release = delay_queue.front().first;
}
void AsyncConnection::DelayedDelivery::flush() {
+ stop_dispatch = true;
EventCenter::submit_to(
center->get_id(), [this] () mutable {
Mutex::Locker l(delay_lock);
for (auto i : register_time_events)
center->delete_time_event(i);
register_time_events.clear();
+ stop_dispatch = false;
}, true);
}
EventCenter *center;
DispatchQueue *dispatch_queue;
uint64_t conn_id;
+ std::atomic_bool stop_dispatch;
public:
explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
DispatchQueue *q, uint64_t cid)
: delay_lock("AsyncConnection::DelayedDelivery::delay_lock"),
- msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid) { }
+ msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
+ stop_dispatch(false) { }
~DelayedDelivery() {
assert(register_time_events.empty());
assert(delay_queue.empty());
register_time_events.insert(center->create_time_event(delay_period*1000000, this));
}
void discard() {
+ stop_dispatch = true;
EventCenter::submit_to(center->get_id(), [this] () mutable {
Mutex::Locker l(delay_lock);
while (!delay_queue.empty()) {
for (auto i : register_time_events)
center->delete_time_event(i);
register_time_events.clear();
+ stop_dispatch = false;
}, true);
}
+ bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); }
void flush();
} *delay_state;