}
};
-class C_handle_reset : public EventCallback {
- AsyncMessenger *msgr;
- AsyncConnectionRef conn;
-
- public:
- C_handle_reset(AsyncMessenger *m, AsyncConnectionRef c): msgr(m), conn(c) {}
- void do_request(int id) {
- msgr->ms_deliver_handle_reset(conn.get());
- }
-};
-
-class C_handle_remote_reset : public EventCallback {
- AsyncMessenger *msgr;
- AsyncConnectionRef conn;
-
- public:
- C_handle_remote_reset(AsyncMessenger *m, AsyncConnectionRef c): msgr(m), conn(c) {}
- void do_request(int id) {
- msgr->ms_deliver_handle_remote_reset(conn.get());
- }
-};
-
-class C_deliver_connect : public EventCallback {
- AsyncMessenger *msgr;
- AsyncConnectionRef conn;
-
- public:
- C_deliver_connect(AsyncMessenger *msgr, AsyncConnectionRef c): msgr(msgr), conn(c) {}
- void do_request(int id) {
- msgr->ms_deliver_handle_connect(conn.get());
- }
-};
-
-class C_deliver_accept : public EventCallback {
- AsyncMessenger *msgr;
- AsyncConnectionRef conn;
-
- public:
- C_deliver_accept(AsyncMessenger *msgr, AsyncConnectionRef c): msgr(msgr), conn(c) {}
- void do_request(int id) {
- msgr->ms_deliver_handle_accept(conn.get());
- delete this;
- }
-};
-
class C_clean_handler : public EventCallback {
AsyncConnectionRef conn;
public:
{
read_handler = new C_handle_read(this);
write_handler = new C_handle_write(this);
- reset_handler = new C_handle_reset(async_msgr, this);
- remote_reset_handler = new C_handle_remote_reset(async_msgr, this);
- connect_handler = new C_deliver_connect(async_msgr, this);
wakeup_handler = new C_time_wakeup(this);
memset(msgvec, 0, sizeof(msgvec));
// double recv_max_prefetch see "read_until"
session_security.reset();
}
- center->dispatch_event_external(connect_handler);
+ dispatch_queue->queue_connect(this);
async_msgr->ms_deliver_handle_fast_connect(this);
// message may in queue between last _try_send and connection ready
inject_delay();
if (existing->policy.lossy) {
// disconnect from the Connection
- existing->center->dispatch_event_external(existing->reset_handler);
+ existing->dispatch_queue->queue_reset(this);
ldout(async_msgr->cct, 1) << __func__ << " replacing on lossy channel, failing existing" << dendl;
existing->_stop();
} else {
assert(can_write == WriteStatus::NOWRITE);
existing->write_lock.Lock(true);
// queue a reset on the new connection, which we're dumping for the old
- center->dispatch_event_external(reset_handler);
+ dispatch_queue->queue_reset(this);
// reset the in_seq if this is a hard reset from peer,
// otherwise we respect our original connection's value
goto fail_registered;
// notify
- center->dispatch_event_external(EventCallbackRef(new C_deliver_accept(async_msgr, this)));
+ dispatch_queue->queue_accept(this);
async_msgr->ms_deliver_handle_fast_accept(this);
once_ready = true;
ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
Mutex::Locker l(write_lock);
if (can_write != WriteStatus::CLOSED) {
- dispatch_queue.local_delivery(m, m->get_priority());
+ dispatch_queue->local_delivery(m, m->get_priority());
} else {
ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed."
<< " Drop message " << m << dendl;
if (policy.lossy && !(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY)) {
ldout(async_msgr->cct, 1) << __func__ << " on lossy channel, failing" << dendl;
- center->dispatch_event_external(reset_handler);
+ dispatch_queue->queue_reset(this);
_stop();
return ;
}
state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
ldout(async_msgr->cct, 0) << __func__ << " with nothing to send and in the half "
<< " accept state just closed" << dendl;
- center->dispatch_event_external(reset_handler);
+ dispatch_queue->queue_reset(this);
write_lock.Unlock();
_stop();
dispatch_queue->discard_queue(conn_id);
discard_out_queue();
- center->dispatch_event_external(remote_reset_handler);
+ dispatch_queue->queue_remote_reset(this);
if (randomize_out_seq()) {
ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq.read() << dendl;