]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Add loopback connection support
authorHaomai Wang <haomaiwang@gmail.com>
Sat, 6 Dec 2014 12:29:42 +0000 (20:29 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Sun, 7 Dec 2014 05:04:27 +0000 (13:04 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc

index 815c767775f4c58add8eeecd53b8728797106929..d7d1281cfd7c65acb6702418f13870fbb7a24dc3 100644 (file)
@@ -139,6 +139,14 @@ class C_handle_signal : public EventCallback {
   }
 };
 
+class C_local_deliver : public EventCallback {
+  AsyncConnectionRef conn;
+ public:
+  C_local_deliver(AsyncConnectionRef c): conn(c) {}
+  void do_request(int id) {
+    conn->local_deliver();
+  }
+};
 
 static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
 {
@@ -180,6 +188,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente
   signal_handler.reset(new C_handle_signal(this));
   connect_handler.reset(new C_deliver_connect(async_msgr, this));
   accept_handler.reset(new C_deliver_accept(async_msgr, this));
+  local_deliver_handler.reset(new C_local_deliver(this));
   memset(msgvec, 0, sizeof(msgvec));
 }
 
@@ -1723,6 +1732,10 @@ int AsyncConnection::send_message(Message *m)
       ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
                                  << " policy.server is false" << dendl;
       _connect();
+    } else if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
+      ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
+      local_messages.push_back(m);
+      center->dispatch_event_external(local_deliver_handler);
     } else if (sd > 0 && !open_write) {
       center->dispatch_event_external(write_handler);
     }
@@ -2109,3 +2122,24 @@ void AsyncConnection::handle_write()
  fail:
   fault();
 }
+
+void AsyncConnection::local_deliver()
+{
+  ldout(async_msgr->cct, 10) << __func__ << dendl;
+  Mutex::Locker l(lock);
+  while (!local_messages.empty()) {
+    Message *m = local_messages.back();
+    local_messages.pop_back();
+    m->set_connection(this);
+    m->set_recv_stamp(ceph_clock_now(async_msgr->cct));
+    ldout(async_msgr->cct, 10) << __func__ << " " << *m << " local deliver " << dendl;
+    async_msgr->ms_fast_preprocess(m);
+    lock.Unlock();
+    if (async_msgr->ms_can_fast_dispatch(m)) {
+      async_msgr->ms_fast_dispatch(m);
+    } else {
+      msgr->ms_deliver_dispatch(m);
+    }
+    lock.Lock();
+  }
+}
index 3ee416daeb8837f8aeca83f4590db5000471f7c0..e2e60fb2943f82a7564c9987d9a6a7ac40a7dede 100644 (file)
@@ -245,6 +245,7 @@ class AsyncConnection : public Connection {
   Messenger::Policy policy;
   map<int, list<Message*> > out_q;  // priority queue for outbound msgs
   list<Message*> sent;
+  list<Message*> local_messages;    // local deliver
   Mutex lock;
   utime_t backoff;         // backoff time
   bool open_write;
@@ -258,6 +259,7 @@ class AsyncConnection : public Connection {
   EventCallbackRef fast_accept_handler;
   EventCallbackRef stop_handler;
   EventCallbackRef signal_handler;
+  EventCallbackRef local_deliver_handler;
   bool keepalive;
   struct iovec msgvec[IOV_LEN];
   Mutex stop_lock; // used to protect `mark_down_cond`
@@ -310,6 +312,7 @@ class AsyncConnection : public Connection {
     Mutex::Locker l(stop_lock);
     stop_cond.Signal();
   }
+  void local_deliver();
 }; /* AsyncConnection */
 
 typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
index e7660e79b6fef2f8bc84b974503eb6740a4b05e5..95d7111db051364855cf2356a5b65bc0f4583115 100644 (file)
@@ -576,21 +576,8 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
   // local?
   if (my_inst.addr == dest_addr) {
     // local
-    ldout(cct, 20) << __func__ << " " << *m << " local" << dendl;
-    m->set_connection(local_connection.get());
-    m->set_recv_stamp(ceph_clock_now(cct));
-    ms_fast_preprocess(m);
-    if (ms_can_fast_dispatch(m)) {
-      ms_fast_dispatch(m);
-    } else {
-      if (m->get_priority() >= CEPH_MSG_PRIO_LOW) {
-        ms_fast_dispatch(m);
-      } else {
-        ms_deliver_dispatch(m);
-      }
-    }
-
-    return;
+    static_cast<AsyncConnection*>(local_connection.get())->send_message(m);
+    return ;
   }
 
   // remote, no existing connection.
@@ -602,6 +589,8 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
     m->put();
   } else {
     ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addr << ", new connection." << dendl;
+    con = create_connect(dest_addr, dest_type);
+    con->send_message(m);
   }
 }