]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
AsyncConnection: make local message deliver via DispatchQueue
authorHaomai Wang <haomai@xsky.com>
Fri, 6 May 2016 08:34:48 +0000 (16:34 +0800)
committerHaomai Wang <haomai@xsky.com>
Fri, 6 May 2016 16:24:56 +0000 (00:24 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index 6c96987af44db9288c0e50d7b015253f3c217ffa..8f4410063f877ce16b96801f4e91955c761ef8a8 100644 (file)
@@ -122,16 +122,6 @@ class C_deliver_accept : public EventCallback {
   }
 };
 
-class C_local_deliver : public EventCallback {
-  AsyncConnectionRef conn;
- public:
-  explicit C_local_deliver(AsyncConnectionRef c): conn(c) {}
-  void do_request(int id) {
-    conn->local_deliver();
-  }
-};
-
-
 class C_clean_handler : public EventCallback {
   AsyncConnectionRef conn;
  public:
@@ -179,7 +169,6 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu
   reset_handler = new C_handle_reset(async_msgr, this);
   remote_reset_handler = new C_handle_remote_reset(async_msgr, this);
   connect_handler = new C_deliver_connect(async_msgr, this);
-  local_deliver_handler = new C_local_deliver(this);
   wakeup_handler = new C_time_wakeup(this);
   memset(msgvec, 0, sizeof(msgvec));
   // double recv_max_prefetch see "read_until"
@@ -2008,8 +1997,7 @@ int AsyncConnection::send_message(Message *m)
     ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
     Mutex::Locker l(write_lock);
     if (can_write != WriteStatus::CLOSED) {
-      local_messages.push_back(m);
-      center->dispatch_event_external(local_deliver_handler);
+      dispatch_queue.local_delivery(m, m->get_priority());
     } else {
       ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed."
                                  << " Drop message " << m << dendl;
@@ -2595,24 +2583,3 @@ void AsyncConnection::wakeup_from(uint64_t id)
   lock.Unlock();
   process();
 }
-
-void AsyncConnection::local_deliver()
-{
-  ldout(async_msgr->cct, 10) << __func__ << dendl;
-  Mutex::Locker l(write_lock);
-  while (!local_messages.empty()) {
-    Message *m = local_messages.front();
-    local_messages.pop_front();
-    m->set_connection(this);
-    m->set_recv_stamp(ceph_clock_now(async_msgr->cct));
-    ldout(async_msgr->cct, 10) << __func__ << " " << *m << " local deliver " << dendl;
-    async_msgr->ms_fast_preprocess(m);
-    write_lock.Unlock();
-    if (async_msgr->ms_can_fast_dispatch(m)) {
-      async_msgr->ms_fast_dispatch(m);
-    } else {
-      msgr->ms_deliver_dispatch(m);
-    }
-    write_lock.Lock();
-  }
-}
index 79d50447385f054df50477909a8fea11bbad5c2b..730508d90e2605b06c0ff94de2972b1003c4c493 100644 (file)
@@ -300,7 +300,6 @@ class AsyncConnection : public Connection {
   bool open_write;
   map<int, list<pair<bufferlist, Message*> > > out_q;  // priority queue for outbound msgs
   list<Message*> sent; // the first bufferlist need to inject seq
-  list<Message*> local_messages;    // local deliver
   bufferlist outcoming_bl;
   bool keepalive;
 
@@ -311,7 +310,6 @@ class AsyncConnection : public Connection {
   EventCallbackRef reset_handler;
   EventCallbackRef remote_reset_handler;
   EventCallbackRef connect_handler;
-  EventCallbackRef local_deliver_handler;
   EventCallbackRef wakeup_handler;
   struct iovec msgvec[ASYNC_IOV_MAX];
   char *recv_buf;
@@ -380,7 +378,6 @@ class AsyncConnection : public Connection {
     delete reset_handler;
     delete remote_reset_handler;
     delete connect_handler;
-    delete local_deliver_handler;
     delete wakeup_handler;
     if (delay_state) {
       delete delay_state;