]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: change all exception deliver to DispatchQueue
authorHaomai Wang <haomai@xsky.com>
Fri, 6 May 2016 08:40:35 +0000 (16:40 +0800)
committerHaomai Wang <haomai@xsky.com>
Fri, 6 May 2016 16:25:41 +0000 (00:25 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index 8f4410063f877ce16b96801f4e91955c761ef8a8..b9090720b6f3582e0fdaa70b3833b6f1d5c4abf9 100644 (file)
@@ -77,51 +77,6 @@ class C_handle_write : public EventCallback {
   }
 };
 
-class C_handle_reset : public EventCallback {
-  AsyncMessenger *msgr;
-  AsyncConnectionRef conn;
-
- public:
-  C_handle_reset(AsyncMessenger *m, AsyncConnectionRef c): msgr(m), conn(c) {}
-  void do_request(int id) {
-    msgr->ms_deliver_handle_reset(conn.get());
-  }
-};
-
-class C_handle_remote_reset : public EventCallback {
-  AsyncMessenger *msgr;
-  AsyncConnectionRef conn;
-
- public:
-  C_handle_remote_reset(AsyncMessenger *m, AsyncConnectionRef c): msgr(m), conn(c) {}
-  void do_request(int id) {
-    msgr->ms_deliver_handle_remote_reset(conn.get());
-  }
-};
-
-class C_deliver_connect : public EventCallback {
-  AsyncMessenger *msgr;
-  AsyncConnectionRef conn;
-
- public:
-  C_deliver_connect(AsyncMessenger *msgr, AsyncConnectionRef c): msgr(msgr), conn(c) {}
-  void do_request(int id) {
-    msgr->ms_deliver_handle_connect(conn.get());
-  }
-};
-
-class C_deliver_accept : public EventCallback {
-  AsyncMessenger *msgr;
-  AsyncConnectionRef conn;
-
- public:
-  C_deliver_accept(AsyncMessenger *msgr, AsyncConnectionRef c): msgr(msgr), conn(c) {}
-  void do_request(int id) {
-    msgr->ms_deliver_handle_accept(conn.get());
-    delete this;
-  }
-};
-
 class C_clean_handler : public EventCallback {
   AsyncConnectionRef conn;
  public:
@@ -166,9 +121,6 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu
 {
   read_handler = new C_handle_read(this);
   write_handler = new C_handle_write(this);
-  reset_handler = new C_handle_reset(async_msgr, this);
-  remote_reset_handler = new C_handle_remote_reset(async_msgr, this);
-  connect_handler = new C_deliver_connect(async_msgr, this);
   wakeup_handler = new C_time_wakeup(this);
   memset(msgvec, 0, sizeof(msgvec));
   // double recv_max_prefetch see "read_until"
@@ -1346,7 +1298,7 @@ ssize_t AsyncConnection::_process_connection()
           session_security.reset();
         }
 
-        center->dispatch_event_external(connect_handler);
+        dispatch_queue->queue_connect(this);
         async_msgr->ms_deliver_handle_fast_connect(this);
 
         // message may in queue between last _try_send and connection ready
@@ -1807,14 +1759,14 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
   inject_delay();
   if (existing->policy.lossy) {
     // disconnect from the Connection
-    existing->center->dispatch_event_external(existing->reset_handler);
+    existing->dispatch_queue->queue_reset(this);
     ldout(async_msgr->cct, 1) << __func__ << " replacing on lossy channel, failing existing" << dendl;
     existing->_stop();
   } else {
     assert(can_write == WriteStatus::NOWRITE);
     existing->write_lock.Lock(true);
     // queue a reset on the new connection, which we're dumping for the old
-    center->dispatch_event_external(reset_handler);
+    dispatch_queue->queue_reset(this);
 
     // reset the in_seq if this is a hard reset from peer,
     // otherwise we respect our original connection's value
@@ -1936,7 +1888,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     goto fail_registered;
 
   // notify
-  center->dispatch_event_external(EventCallbackRef(new C_deliver_accept(async_msgr, this)));
+  dispatch_queue->queue_accept(this);
   async_msgr->ms_deliver_handle_fast_accept(this);
   once_ready = true;
 
@@ -1997,7 +1949,7 @@ int AsyncConnection::send_message(Message *m)
     ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
     Mutex::Locker l(write_lock);
     if (can_write != WriteStatus::CLOSED) {
-      dispatch_queue.local_delivery(m, m->get_priority());
+      dispatch_queue->local_delivery(m, m->get_priority());
     } else {
       ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed."
                                  << " Drop message " << m << dendl;
@@ -2135,7 +2087,7 @@ void AsyncConnection::fault()
 
   if (policy.lossy && !(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY)) {
     ldout(async_msgr->cct, 1) << __func__ << " on lossy channel, failing" << dendl;
-    center->dispatch_event_external(reset_handler);
+    dispatch_queue->queue_reset(this);
     _stop();
     return ;
   }
@@ -2164,7 +2116,7 @@ void AsyncConnection::fault()
       state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
     ldout(async_msgr->cct, 0) << __func__ << " with nothing to send and in the half "
                               << " accept state just closed" << dendl;
-    center->dispatch_event_external(reset_handler);
+    dispatch_queue->queue_reset(this);
 
     write_lock.Unlock();
     _stop();
@@ -2217,7 +2169,7 @@ void AsyncConnection::was_session_reset()
   dispatch_queue->discard_queue(conn_id);
   discard_out_queue();
 
-  center->dispatch_event_external(remote_reset_handler);
+  dispatch_queue->queue_remote_reset(this);
 
   if (randomize_out_seq()) {
     ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq.read() << dendl;
index 730508d90e2605b06c0ff94de2972b1003c4c493..fcbaf5e35d5a2f93926f1dd1f5da7b4e65fd0607 100644 (file)
@@ -307,9 +307,6 @@ class AsyncConnection : public Connection {
   utime_t backoff;         // backoff time
   EventCallbackRef read_handler;
   EventCallbackRef write_handler;
-  EventCallbackRef reset_handler;
-  EventCallbackRef remote_reset_handler;
-  EventCallbackRef connect_handler;
   EventCallbackRef wakeup_handler;
   struct iovec msgvec[ASYNC_IOV_MAX];
   char *recv_buf;
@@ -368,16 +365,13 @@ class AsyncConnection : public Connection {
   void stop() {
     lock.Lock();
     if (state != STATE_CLOSED)
-      center->dispatch_event_external(reset_handler);
+      dispatch_queue->queue_reset(this);
     lock.Unlock();
     mark_down();
   }
   void cleanup_handler() {
     delete read_handler;
     delete write_handler;
-    delete reset_handler;
-    delete remote_reset_handler;
-    delete connect_handler;
     delete wakeup_handler;
     if (delay_state) {
       delete delay_state;