}
};
-class C_local_deliver : public EventCallback {
- AsyncConnectionRef conn;
- public:
- explicit C_local_deliver(AsyncConnectionRef c): conn(c) {}
- void do_request(int id) {
- conn->local_deliver();
- }
-};
-
-
class C_clean_handler : public EventCallback {
AsyncConnectionRef conn;
public:
reset_handler = new C_handle_reset(async_msgr, this);
remote_reset_handler = new C_handle_remote_reset(async_msgr, this);
connect_handler = new C_deliver_connect(async_msgr, this);
- local_deliver_handler = new C_local_deliver(this);
wakeup_handler = new C_time_wakeup(this);
memset(msgvec, 0, sizeof(msgvec));
// double recv_max_prefetch see "read_until"
ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
Mutex::Locker l(write_lock);
if (can_write != WriteStatus::CLOSED) {
- local_messages.push_back(m);
- center->dispatch_event_external(local_deliver_handler);
+ dispatch_queue.local_delivery(m, m->get_priority());
} else {
ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed."
<< " Drop message " << m << dendl;
lock.Unlock();
process();
}
-
-void AsyncConnection::local_deliver()
-{
- ldout(async_msgr->cct, 10) << __func__ << dendl;
- Mutex::Locker l(write_lock);
- while (!local_messages.empty()) {
- Message *m = local_messages.front();
- local_messages.pop_front();
- m->set_connection(this);
- m->set_recv_stamp(ceph_clock_now(async_msgr->cct));
- ldout(async_msgr->cct, 10) << __func__ << " " << *m << " local deliver " << dendl;
- async_msgr->ms_fast_preprocess(m);
- write_lock.Unlock();
- if (async_msgr->ms_can_fast_dispatch(m)) {
- async_msgr->ms_fast_dispatch(m);
- } else {
- msgr->ms_deliver_dispatch(m);
- }
- write_lock.Lock();
- }
-}
bool open_write;
map<int, list<pair<bufferlist, Message*> > > out_q; // priority queue for outbound msgs
list<Message*> sent; // the first bufferlist need to inject seq
- list<Message*> local_messages; // local deliver
bufferlist outcoming_bl;
bool keepalive;
EventCallbackRef reset_handler;
EventCallbackRef remote_reset_handler;
EventCallbackRef connect_handler;
- EventCallbackRef local_deliver_handler;
EventCallbackRef wakeup_handler;
struct iovec msgvec[ASYNC_IOV_MAX];
char *recv_buf;
delete reset_handler;
delete remote_reset_handler;
delete connect_handler;
- delete local_deliver_handler;
delete wakeup_handler;
if (delay_state) {
delete delay_state;