}
};
+class C_local_deliver : public EventCallback {
+ AsyncConnectionRef conn;
+ public:
+ C_local_deliver(AsyncConnectionRef c): conn(c) {}
+ void do_request(int id) {
+ conn->local_deliver();
+ }
+};
static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
{
signal_handler.reset(new C_handle_signal(this));
connect_handler.reset(new C_deliver_connect(async_msgr, this));
accept_handler.reset(new C_deliver_accept(async_msgr, this));
+ local_deliver_handler.reset(new C_local_deliver(this));
memset(msgvec, 0, sizeof(msgvec));
}
ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
<< " policy.server is false" << dendl;
_connect();
+ } else if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
+ ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
+ local_messages.push_back(m);
+ center->dispatch_event_external(local_deliver_handler);
} else if (sd > 0 && !open_write) {
center->dispatch_event_external(write_handler);
}
fail:
fault();
}
+
+void AsyncConnection::local_deliver()
+{
+ ldout(async_msgr->cct, 10) << __func__ << dendl;
+ Mutex::Locker l(lock);
+ while (!local_messages.empty()) {
+ Message *m = local_messages.back();
+ local_messages.pop_back();
+ m->set_connection(this);
+ m->set_recv_stamp(ceph_clock_now(async_msgr->cct));
+ ldout(async_msgr->cct, 10) << __func__ << " " << *m << " local deliver " << dendl;
+ async_msgr->ms_fast_preprocess(m);
+ lock.Unlock();
+ if (async_msgr->ms_can_fast_dispatch(m)) {
+ async_msgr->ms_fast_dispatch(m);
+ } else {
+ msgr->ms_deliver_dispatch(m);
+ }
+ lock.Lock();
+ }
+}
Messenger::Policy policy;
map<int, list<Message*> > out_q; // priority queue for outbound msgs
list<Message*> sent;
+ list<Message*> local_messages; // local deliver
Mutex lock;
utime_t backoff; // backoff time
bool open_write;
EventCallbackRef fast_accept_handler;
EventCallbackRef stop_handler;
EventCallbackRef signal_handler;
+ EventCallbackRef local_deliver_handler;
bool keepalive;
struct iovec msgvec[IOV_LEN];
Mutex stop_lock; // used to protect `mark_down_cond`
Mutex::Locker l(stop_lock);
stop_cond.Signal();
}
+ void local_deliver();
}; /* AsyncConnection */
typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
// local?
if (my_inst.addr == dest_addr) {
// local
- ldout(cct, 20) << __func__ << " " << *m << " local" << dendl;
- m->set_connection(local_connection.get());
- m->set_recv_stamp(ceph_clock_now(cct));
- ms_fast_preprocess(m);
- if (ms_can_fast_dispatch(m)) {
- ms_fast_dispatch(m);
- } else {
- if (m->get_priority() >= CEPH_MSG_PRIO_LOW) {
- ms_fast_dispatch(m);
- } else {
- ms_deliver_dispatch(m);
- }
- }
-
- return;
+ static_cast<AsyncConnection*>(local_connection.get())->send_message(m);
+ return ;
}
// remote, no existing connection.
m->put();
} else {
ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addr << ", new connection." << dendl;
+ con = create_connect(dest_addr, dest_type);
+ con->send_message(m);
}
}