}
}
+void AsyncConnection::DelayedDelivery::discard() {
+ stop_dispatch = true;
+ center->submit_to(center->get_id(),
+ [this]() mutable {
+ std::lock_guard<std::mutex> l(delay_lock);
+ while (!delay_queue.empty()) {
+ Message *m = delay_queue.front();
+ dispatch_queue->dispatch_throttle_release(
+ m->get_dispatch_throttle_size());
+ m->put();
+ delay_queue.pop_front();
+ }
+ for (auto i : register_time_events)
+ center->delete_time_event(i);
+ register_time_events.clear();
+ stop_dispatch = false;
+ },
+ true);
+}
+
void AsyncConnection::DelayedDelivery::flush() {
stop_dispatch = true;
center->submit_to(
lock.unlock();
}
+void AsyncConnection::stop(bool queue_reset) {
+ lock.lock();
+ bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
+ _stop();
+ lock.unlock();
+ if (need_queue_reset) dispatch_queue->queue_reset(this);
+}
+
void AsyncConnection::wakeup_from(uint64_t id)
{
lock.lock();
#include "Stack.h"
class AsyncMessenger;
+class DispatchQueue;
class Worker;
static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
delay_queue.push_back(m);
register_time_events.insert(center->create_time_event(delay_period*1000000, this));
}
- void discard() {
- stop_dispatch = true;
- center->submit_to(center->get_id(), [this] () mutable {
- std::lock_guard<std::mutex> l(delay_lock);
- while (!delay_queue.empty()) {
- Message *m = delay_queue.front();
- dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
- m->put();
- delay_queue.pop_front();
- }
- for (auto i : register_time_events)
- center->delete_time_event(i);
- register_time_events.clear();
- stop_dispatch = false;
- }, true);
- }
+ void discard();
bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); }
void flush();
} *delay_state;
void wakeup_from(uint64_t id);
void tick(uint64_t id);
void local_deliver();
- void stop(bool queue_reset) {
- lock.lock();
- bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
- _stop();
- lock.unlock();
- if (need_queue_reset)
- dispatch_queue->queue_reset(this);
- }
+ void stop(bool queue_reset);
void cleanup() {
shutdown_socket();
delete read_handler;