From: Haomai Wang Date: Fri, 6 May 2016 08:40:35 +0000 (+0800) Subject: AsyncConnection: change all exception deliver to DispatchQueue X-Git-Tag: v11.0.0~138^2~12 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=bb3bdc2c8b329f70b14f16cf5862b6a344ee6fde;p=ceph.git AsyncConnection: change all exception deliver to DispatchQueue Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 8f4410063f87..b9090720b6f3 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -77,51 +77,6 @@ class C_handle_write : public EventCallback { } }; -class C_handle_reset : public EventCallback { - AsyncMessenger *msgr; - AsyncConnectionRef conn; - - public: - C_handle_reset(AsyncMessenger *m, AsyncConnectionRef c): msgr(m), conn(c) {} - void do_request(int id) { - msgr->ms_deliver_handle_reset(conn.get()); - } -}; - -class C_handle_remote_reset : public EventCallback { - AsyncMessenger *msgr; - AsyncConnectionRef conn; - - public: - C_handle_remote_reset(AsyncMessenger *m, AsyncConnectionRef c): msgr(m), conn(c) {} - void do_request(int id) { - msgr->ms_deliver_handle_remote_reset(conn.get()); - } -}; - -class C_deliver_connect : public EventCallback { - AsyncMessenger *msgr; - AsyncConnectionRef conn; - - public: - C_deliver_connect(AsyncMessenger *msgr, AsyncConnectionRef c): msgr(msgr), conn(c) {} - void do_request(int id) { - msgr->ms_deliver_handle_connect(conn.get()); - } -}; - -class C_deliver_accept : public EventCallback { - AsyncMessenger *msgr; - AsyncConnectionRef conn; - - public: - C_deliver_accept(AsyncMessenger *msgr, AsyncConnectionRef c): msgr(msgr), conn(c) {} - void do_request(int id) { - msgr->ms_deliver_handle_accept(conn.get()); - delete this; - } -}; - class C_clean_handler : public EventCallback { AsyncConnectionRef conn; public: @@ -166,9 +121,6 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu { read_handler = new C_handle_read(this); write_handler = new C_handle_write(this); - reset_handler = new C_handle_reset(async_msgr, this); - remote_reset_handler = new C_handle_remote_reset(async_msgr, this); - connect_handler = new C_deliver_connect(async_msgr, this); wakeup_handler = new C_time_wakeup(this); memset(msgvec, 0, sizeof(msgvec)); // double recv_max_prefetch see "read_until" @@ -1346,7 +1298,7 @@ ssize_t AsyncConnection::_process_connection() session_security.reset(); } - center->dispatch_event_external(connect_handler); + dispatch_queue->queue_connect(this); async_msgr->ms_deliver_handle_fast_connect(this); // message may in queue between last _try_send and connection ready @@ -1807,14 +1759,14 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis inject_delay(); if (existing->policy.lossy) { // disconnect from the Connection - existing->center->dispatch_event_external(existing->reset_handler); + existing->dispatch_queue->queue_reset(this); ldout(async_msgr->cct, 1) << __func__ << " replacing on lossy channel, failing existing" << dendl; existing->_stop(); } else { assert(can_write == WriteStatus::NOWRITE); existing->write_lock.Lock(true); // queue a reset on the new connection, which we're dumping for the old - center->dispatch_event_external(reset_handler); + dispatch_queue->queue_reset(this); // reset the in_seq if this is a hard reset from peer, // otherwise we respect our original connection's value @@ -1936,7 +1888,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis goto fail_registered; // notify - center->dispatch_event_external(EventCallbackRef(new C_deliver_accept(async_msgr, this))); + dispatch_queue->queue_accept(this); async_msgr->ms_deliver_handle_fast_accept(this); once_ready = true; @@ -1997,7 +1949,7 @@ int AsyncConnection::send_message(Message *m) ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl; Mutex::Locker l(write_lock); if (can_write != WriteStatus::CLOSED) { - dispatch_queue.local_delivery(m, m->get_priority()); + dispatch_queue->local_delivery(m, m->get_priority()); } else { ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed." << " Drop message " << m << dendl; @@ -2135,7 +2087,7 @@ void AsyncConnection::fault() if (policy.lossy && !(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY)) { ldout(async_msgr->cct, 1) << __func__ << " on lossy channel, failing" << dendl; - center->dispatch_event_external(reset_handler); + dispatch_queue->queue_reset(this); _stop(); return ; } @@ -2164,7 +2116,7 @@ void AsyncConnection::fault() state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { ldout(async_msgr->cct, 0) << __func__ << " with nothing to send and in the half " << " accept state just closed" << dendl; - center->dispatch_event_external(reset_handler); + dispatch_queue->queue_reset(this); write_lock.Unlock(); _stop(); @@ -2217,7 +2169,7 @@ void AsyncConnection::was_session_reset() dispatch_queue->discard_queue(conn_id); discard_out_queue(); - center->dispatch_event_external(remote_reset_handler); + dispatch_queue->queue_remote_reset(this); if (randomize_out_seq()) { ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq.read() << dendl; diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 730508d90e26..fcbaf5e35d5a 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -307,9 +307,6 @@ class AsyncConnection : public Connection { utime_t backoff; // backoff time EventCallbackRef read_handler; EventCallbackRef write_handler; - EventCallbackRef reset_handler; - EventCallbackRef remote_reset_handler; - EventCallbackRef connect_handler; EventCallbackRef wakeup_handler; struct iovec msgvec[ASYNC_IOV_MAX]; char *recv_buf; @@ -368,16 +365,13 @@ class AsyncConnection : public Connection { void stop() { lock.Lock(); if (state != STATE_CLOSED) - center->dispatch_event_external(reset_handler); + dispatch_queue->queue_reset(this); lock.Unlock(); mark_down(); } void cleanup_handler() { delete read_handler; delete write_handler; - delete reset_handler; - delete remote_reset_handler; - delete connect_handler; delete wakeup_handler; if (delay_state) { delete delay_state;