From: Haomai Wang Date: Sat, 6 Dec 2014 12:29:42 +0000 (+0800) Subject: AsyncConnection: Add loopback connection support X-Git-Tag: v0.91~37^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1bf39db0489c01a856330d3972b99c64dc401377;p=ceph.git AsyncConnection: Add loopback connection support Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 815c767775f4..d7d1281cfd7c 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -139,6 +139,14 @@ class C_handle_signal : public EventCallback { } }; +class C_local_deliver : public EventCallback { + AsyncConnectionRef conn; + public: + C_local_deliver(AsyncConnectionRef c): conn(c) {} + void do_request(int id) { + conn->local_deliver(); + } +}; static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) { @@ -180,6 +188,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente signal_handler.reset(new C_handle_signal(this)); connect_handler.reset(new C_deliver_connect(async_msgr, this)); accept_handler.reset(new C_deliver_accept(async_msgr, this)); + local_deliver_handler.reset(new C_local_deliver(this)); memset(msgvec, 0, sizeof(msgvec)); } @@ -1723,6 +1732,10 @@ int AsyncConnection::send_message(Message *m) ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state) << " policy.server is false" << dendl; _connect(); + } else if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection + ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl; + local_messages.push_back(m); + center->dispatch_event_external(local_deliver_handler); } else if (sd > 0 && !open_write) { center->dispatch_event_external(write_handler); } @@ -2109,3 +2122,24 @@ void AsyncConnection::handle_write() fail: fault(); } + +void AsyncConnection::local_deliver() +{ + ldout(async_msgr->cct, 10) << __func__ << dendl; + Mutex::Locker l(lock); + while (!local_messages.empty()) { + Message *m = local_messages.back(); + local_messages.pop_back(); + m->set_connection(this); + m->set_recv_stamp(ceph_clock_now(async_msgr->cct)); + ldout(async_msgr->cct, 10) << __func__ << " " << *m << " local deliver " << dendl; + async_msgr->ms_fast_preprocess(m); + lock.Unlock(); + if (async_msgr->ms_can_fast_dispatch(m)) { + async_msgr->ms_fast_dispatch(m); + } else { + msgr->ms_deliver_dispatch(m); + } + lock.Lock(); + } +} diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 3ee416daeb88..e2e60fb2943f 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -245,6 +245,7 @@ class AsyncConnection : public Connection { Messenger::Policy policy; map > out_q; // priority queue for outbound msgs list sent; + list local_messages; // local deliver Mutex lock; utime_t backoff; // backoff time bool open_write; @@ -258,6 +259,7 @@ class AsyncConnection : public Connection { EventCallbackRef fast_accept_handler; EventCallbackRef stop_handler; EventCallbackRef signal_handler; + EventCallbackRef local_deliver_handler; bool keepalive; struct iovec msgvec[IOV_LEN]; Mutex stop_lock; // used to protect `mark_down_cond` @@ -310,6 +312,7 @@ class AsyncConnection : public Connection { Mutex::Locker l(stop_lock); stop_cond.Signal(); } + void local_deliver(); }; /* AsyncConnection */ typedef boost::intrusive_ptr AsyncConnectionRef; diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index e7660e79b6fe..95d7111db051 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -576,21 +576,8 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con, // local? if (my_inst.addr == dest_addr) { // local - ldout(cct, 20) << __func__ << " " << *m << " local" << dendl; - m->set_connection(local_connection.get()); - m->set_recv_stamp(ceph_clock_now(cct)); - ms_fast_preprocess(m); - if (ms_can_fast_dispatch(m)) { - ms_fast_dispatch(m); - } else { - if (m->get_priority() >= CEPH_MSG_PRIO_LOW) { - ms_fast_dispatch(m); - } else { - ms_deliver_dispatch(m); - } - } - - return; + static_cast(local_connection.get())->send_message(m); + return ; } // remote, no existing connection. @@ -602,6 +589,8 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con, m->put(); } else { ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addr << ", new connection." << dendl; + con = create_connect(dest_addr, dest_type); + con->send_message(m); } }