From: Haomai Wang Date: Tue, 13 Jan 2015 15:52:27 +0000 (+0800) Subject: AsyncConnection: Don't discard out_q and unregister when replacing X-Git-Tag: v0.93~247^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2bc16752c41f82eafce81f8c8d91e7f6c203e28f;p=ceph.git AsyncConnection: Don't discard out_q and unregister when replacing Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index b7d3b374aff..bae0511ff52 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -188,6 +188,8 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente AsyncConnection::~AsyncConnection() { + assert(out_q.empty()); + assert(sent.empty()); assert(!authorizer); if (recv_buf) delete recv_buf; @@ -1555,6 +1557,8 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a goto fail; } + if (existing == this) + existing = NULL; if (existing) { if (connect.global_seq < existing->peer_global_seq) { ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing @@ -1692,10 +1696,11 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a // Here we use "_stop" instead of "mark_down" because "mark_down" is a async // operation, but now we need ensure all variables in `existing` is cleaned up // and we will reuse it next. - existing->_stop(); + existing->_stop(true); if (existing->policy.lossy) { // disconnect from the Connection center->dispatch_event_external(EventCallbackRef(new C_handle_reset(async_msgr, existing))); + existing->discard_out_queue(); } else { // queue a reset on the new connection, which we're dumping for the old center->dispatch_event_external(reset_handler); @@ -1726,7 +1731,6 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a swap(existing->sd, sd); existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG; existing->open_write = false; - existing->discard_out_queue(); existing->replacing = true; _stop(); existing->lock.Unlock(); @@ -1974,6 +1978,7 @@ void AsyncConnection::fault() if (policy.lossy && state != STATE_CONNECTING) { ldout(async_msgr->cct, 10) << __func__ << " on lossy channel, failing" << dendl; center->dispatch_event_external(reset_handler); + discard_out_queue(); _stop(); return ; } @@ -2039,14 +2044,17 @@ void AsyncConnection::was_session_reset() } // *note: `async` is true only happen when replacing connection process -void AsyncConnection::_stop() +void AsyncConnection::_stop(bool replacing) { assert(lock.is_locked()); ldout(async_msgr->cct, 10) << __func__ << dendl; if (sd > 0) center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); - async_msgr->unregister_conn(this); + if (!replacing) { + discard_out_queue(); + async_msgr->unregister_conn(this); + } if (async_msgr->cct->_conf->ms_inject_internal_delays) { ldout(msgr->cct, 10) << __func__ << " sleep for " @@ -2058,7 +2066,6 @@ void AsyncConnection::_stop() } shutdown_socket(); - discard_out_queue(); open_write = false; state_offset = 0; state = STATE_CLOSED; diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 515b1d700d2..690f5863725 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -53,7 +53,7 @@ class AsyncConnection : public Connection { int read_until(uint64_t needed, char *p); int _process_connection(); void _connect(); - void _stop(); + void _stop(bool discard=true); int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r); int handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl); void was_session_reset(); diff --git a/src/msg/async/EventEpoll.cc b/src/msg/async/EventEpoll.cc index 65e31615fcf..c33dd3d51f8 100644 --- a/src/msg/async/EventEpoll.cc +++ b/src/msg/async/EventEpoll.cc @@ -47,7 +47,7 @@ int EpollDriver::init(int nevent) int EpollDriver::add_event(int fd, int cur_mask, int add_mask) { ldout(cct, 20) << __func__ << " add event fd=" << fd << " cur_mask=" << cur_mask - << " add_mask=" << add_mask << dendl; + << " add_mask=" << add_mask << " to " << epfd << dendl; struct epoll_event ee; /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ @@ -63,8 +63,8 @@ int EpollDriver::add_event(int fd, int cur_mask, int add_mask) ee.data.u64 = 0; /* avoid valgrind warning */ ee.data.fd = fd; if (epoll_ctl(epfd, op, fd, &ee) == -1) { - lderr(cct) << __func__ << " unable to add event: " - << cpp_strerror(errno) << dendl; + lderr(cct) << __func__ << " epoll_ctl: add fd=" << fd << " failed. " + << cpp_strerror(errno) << dendl; return -errno; } @@ -74,7 +74,7 @@ int EpollDriver::add_event(int fd, int cur_mask, int add_mask) void EpollDriver::del_event(int fd, int cur_mask, int delmask) { ldout(cct, 20) << __func__ << " del event fd=" << fd << " cur_mask=" << cur_mask - << " delmask=" << delmask << dendl; + << " delmask=" << delmask << " to " << epfd << dendl; struct epoll_event ee; int mask = cur_mask & (~delmask);