From: Haomai Wang Date: Wed, 14 Jan 2015 18:32:25 +0000 (+0800) Subject: AsyncConnection: Fix deadlock if socket failed when replacing X-Git-Tag: v0.93~247^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7bb7b1ec1e3f8b4141dedc3b88aa2aac476876d4;p=ceph.git AsyncConnection: Fix deadlock if socket failed when replacing If client reconnect a already mark_down endpoint, server-side will detect remote reset happen, so it will reset existing connection. Meanwhile, retry tag is received by client-side connection and it will try to reconnect. Again, client-side connection will send connect_msg with connect_seq(1). But it will met server-side connection's connect_seq(0), it will make server-side reply with reset tag. So this connection will loop in reset and retry tag. One solution is that we close server-side connection if connect_seq ==0 and no message in queue. But it will trigger another problem: 1. client try to connect a already mark_down endpoint 2. client->send_message 3. server-side accept new socket, replace old one and reply retry tag 4. client plus one to connect_seq but socket failure happen 5. server-side connection detected and close because of connect_seq==0 and no message 6. client reconnect, server-side has no existing connection and met "connect.connect_seq > 0". So server-side will reply to RESET tag 7. client discard all messages in queue. So we lose a message never delivered This solution add a new "once_session_reset" flag to indicate whether "existing" reset. Because server-side's connect_seq is 0 only when it never successfully or ever session reset. We only need to reply RESET tag if ever session reset. Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 16830e360605..d9803fabaae6 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -170,7 +170,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente port(-1), lock("AsyncConnection::lock"), open_write(false), keepalive(false), recv_buf(NULL), recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), recv_start(0), recv_end(0), stop_lock("AsyncConnection::stop_lock"), - got_bad_auth(false), authorizer(NULL), replacing(false), + got_bad_auth(false), authorizer(NULL), replacing(false), once_session_reset(false), state_buffer(NULL), state_offset(0), net(cct), center(c) { read_handler.reset(new C_handle_read(this)); @@ -1635,10 +1635,11 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a assert(connect.connect_seq > existing->connect_seq); assert(connect.global_seq >= existing->peer_global_seq); if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other - existing->connect_seq == 0) { + existing->connect_seq == 0 && once_session_reset) { ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq " << connect.connect_seq << ", " << existing << ".cseq = " << existing->connect_seq << "), sending RESETSESSION" << dendl; + once_session_reset = false; return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply); } @@ -2040,6 +2041,7 @@ void AsyncConnection::was_session_reset() in_seq = 0; connect_seq = 0; in_seq_acked = 0; + once_session_reset = true; } // *note: `async` is true only happen when replacing connection process diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 515b1d700d2a..cb53b6545ae8 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -266,6 +266,7 @@ class AsyncConnection : public Connection { // there won't exists conflicting connection so we use // "replacing" to skip RESETSESSION to avoid detect wrong // presentation + bool once_session_reset; atomic_t stopping; // used only for local state, it will be overwrite when state transition