From: Haomai Wang Date: Sat, 6 Dec 2014 11:47:01 +0000 (+0800) Subject: AsyncMessenger: Fix accept connection replacing process X-Git-Tag: v0.91~37^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fca14dc69286940d41911454499a1af98dfa92ad;p=ceph.git AsyncMessenger: Fix accept connection replacing process Original there exists bugs that we don't modify existing file event, it will let remaining alive connection's event won't be called by EventCenter. Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 68f34d613e52..815c767775f4 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -169,7 +169,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente state(STATE_NONE), state_after_send(0), sd(-1), lock("AsyncConnection::lock"), open_write(false), keepalive(false), stop_lock("AsyncConnection::stop_lock"), - got_bad_auth(false), authorizer(NULL), + got_bad_auth(false), authorizer(NULL), replacing(false), state_buffer(4096), state_offset(0), net(cct), center(c) { read_handler.reset(new C_handle_read(this)); @@ -1544,7 +1544,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a << " > " << existing->connect_seq << dendl; goto replace; } // existing - else if (policy.resetcheck && connect.connect_seq > 0) { + else if (!replacing && policy.resetcheck && connect.connect_seq > 0) { // we reset, and they are opening a new session ldout(async_msgr->cct, 0) << __func__ << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl; @@ -1597,10 +1597,17 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a if (_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0) goto fail; - uint64_t s = existing->sd; - existing->sd = sd; - sd = s; + // Now existing connection will be alive and the current connection will + // exchange socket with existing connection because we want to maintain + // original "connection_state" + center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); + center->create_file_event(sd, EVENT_READABLE, existing->read_handler); + + swap(existing->sd, sd); existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG; + existing->open_write = false; + existing->discard_out_queue(); + existing->replacing = true; _stop(); existing->lock.Unlock(); return 0; @@ -1608,6 +1615,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a existing->lock.Unlock(); open: + replacing = false; connect_seq = connect.connect_seq + 1; peer_global_seq = connect.global_seq; ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = " @@ -1777,6 +1785,7 @@ void AsyncConnection::discard_out_queue() (*r)->put(); } out_q.clear(); + outcoming_bl.clear(); } int AsyncConnection::randomize_out_seq() @@ -1855,7 +1864,6 @@ void AsyncConnection::was_session_reset() { ldout(async_msgr->cct,10) << __func__ << "was_session_reset" << dendl; discard_out_queue(); - outcoming_bl.clear(); center->dispatch_event_external(remote_reset_handler); @@ -1871,15 +1879,16 @@ void AsyncConnection::was_session_reset() void AsyncConnection::_stop() { ldout(async_msgr->cct, 10) << __func__ << dendl; - center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); + if (sd > 0) + center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); shutdown_socket(); discard_out_queue(); - outcoming_bl.clear(); open_write = false; state = STATE_CLOSED; - ::close(sd); + if (sd > 0) + ::close(sd); sd = -1; - async_msgr->unregister_conn(peer_addr); + async_msgr->unregister_conn(this); // Here we need to dispatch "signal" event, because we want to ensure signal // it after all events called by this "_stop" has be done. center->dispatch_event_external(signal_handler); diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 5ccd7904c7fa..3ee416daeb88 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -109,8 +109,8 @@ class AsyncConnection : public Connection { ostream& _conn_prefix(std::ostream *_dout); bool is_connected() { - // FIXME? - return state != STATE_CLOSED; + Mutex::Locker l(lock); + return state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE; } // Only call when AsyncConnection first construct @@ -146,7 +146,7 @@ class AsyncConnection : public Connection { void mark_down() { Mutex::Locker l(stop_lock); if (center->get_owner() == pthread_self()) { - _stop(); + stop(); } else { center->dispatch_event_external(stop_handler); stop_cond.Wait(stop_lock); @@ -281,6 +281,12 @@ class AsyncConnection : public Connection { // Accepting state entity_addr_t socket_addr; CryptoKey session_key; + bool replacing; // when replacing process happened, we will reply connect + // side with RETRY tag and accept side will clear replaced + // connection. So when connect side reissue connect_msg, + // there won't exists conflicting connection so we use + // "replacing" to skip RESETSESSION to avoid detect wrong + // presentation // used only for local state, it will be overwrite when state transition bufferptr state_buffer; diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 032538aa6985..e7660e79b6fe 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -352,7 +352,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, processor(this, _nonce), lock("AsyncMessenger::lock"), nonce(_nonce), did_bind(false), - global_seq(0), + global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"), cluster_protocol(0), stopped(true) { ceph_spin_init(&global_seq_lock); @@ -471,7 +471,7 @@ void AsyncMessenger::wait() while (!conns.empty()) { AsyncConnectionRef p = conns.begin()->second; - _stop_conn(p); + p->mark_down(); } } lock.Unlock(); @@ -658,7 +658,7 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr) AsyncConnectionRef p = _lookup_conn(addr); if (p) { ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl; - _stop_conn(p); + p->mark_down(); p->get(); ms_deliver_handle_reset(p.get()); } else { diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index aff5ed3d2b8d..4d01e2665464 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -293,6 +293,20 @@ private: // FIXME clear up set accepting_conns; + /** + * list of connection are closed which need to be clean up + * + * Because AsyncMessenger and AsyncConnection follow a lock rule that + * we can lock AsyncMesenger::lock firstly then lock AsyncConnection::lock + * but can't reversed. This rule is aimed to avoid dead lock. + * So if AsyncConnection want to unregister itself from AsyncMessenger, + * we pick up this idea that just queue itself to this set and do lazy + * deleted for AsyncConnection. "_lookup_conn" must ensure not return a + * AsyncConnection in this set. + */ + Mutex deleted_lock; + set deleted_conns; + /// internal cluster protocol version, if any, for talking to entities of the same type. int cluster_protocol; @@ -305,16 +319,15 @@ private: if (p == conns.end()) return NULL; - assert(p->second->is_connected()); - return p->second; - } - - void _stop_conn(AsyncConnectionRef c) { - assert(lock.is_locked()); - if (c) { - c->mark_down(); - conns.erase(c->peer_addr); + // lazy delete, see "deleted_conns" + Mutex::Locker l(deleted_lock); + if (deleted_conns.count(p->second)) { + deleted_conns.erase(p->second); + conns.erase(p); + return NULL; } + + return p->second; } void _init_local_connection() { @@ -324,7 +337,6 @@ private: ms_deliver_handle_fast_connect(local_connection.get()); } - public: /// con used for sending messages to ourselves @@ -399,10 +411,12 @@ public: /** * Unregister connection from `conns` + * + * See "deleted_conns" */ - void unregister_conn(const entity_addr_t &addr) { - Mutex::Locker l(lock); - conns.erase(addr); + void unregister_conn(AsyncConnectionRef conn) { + Mutex::Locker l(deleted_lock); + deleted_conns.insert(conn); } /** * @} // AsyncMessenger Internals