From: Haomai Wang Date: Wed, 14 Jan 2015 03:14:16 +0000 (+0800) Subject: AsyncConnection: Fix replacing cause original state lossy X-Git-Tag: v0.93~247^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=296e5457be31bd7d22e467ed0a8f4d4cea8df77f;p=ceph.git AsyncConnection: Fix replacing cause original state lossy Because AsyncConnection won't enter "open" tag from "replace" tag, the codes which set reply_tag won't be used when enter "open" tag. It will cause server side discard out_q and lose state. Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index bae0511ff52b..157ec77f7034 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -1489,9 +1489,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a int r = 0; ceph_msg_connect_reply reply; bufferlist reply_bl; - uint64_t existing_seq = -1; bool is_reset_from_peer = false; - char reply_tag = 0; memset(&reply, 0, sizeof(reply)); reply.protocol_version = async_msgr->get_proto_version(peer_type, false); @@ -1663,11 +1661,6 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a assert(0); replace: - // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence - if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) { - reply_tag = CEPH_MSGR_TAG_SEQ; - existing_seq = existing->in_seq; - } ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl; if (async_msgr->cct->_conf->ms_inject_internal_delays) { @@ -1687,20 +1680,15 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a reply.connect_seq = existing->connect_seq + 1; r = _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply); existing->lock.Unlock(); - if (r < 0) { + if (r < 0) goto fail; - } - return r; + return 0; } - // Here we use "_stop" instead of "mark_down" because "mark_down" is a async - // operation, but now we need ensure all variables in `existing` is cleaned up - // and we will reuse it next. - existing->_stop(true); if (existing->policy.lossy) { // disconnect from the Connection center->dispatch_event_external(EventCallbackRef(new C_handle_reset(async_msgr, existing))); - existing->discard_out_queue(); + existing->_stop(); } else { // queue a reset on the new connection, which we're dumping for the old center->dispatch_event_external(reset_handler); @@ -1710,28 +1698,32 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a if (is_reset_from_peer) existing->in_seq = 0; - // Clean up output buffer - existing->outcoming_bl.clear(); - existing->requeue_sent(); - reply.connect_seq = connect.connect_seq + 1; - if (_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0) { - existing->lock.Unlock(); - goto fail; - } - // Now existing connection will be alive and the current connection will // exchange socket with existing connection because we want to maintain // original "connection_state" + existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE); center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); existing->center->create_file_event(sd, EVENT_READABLE, existing->read_handler); - existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE); - center->create_file_event(existing->sd, EVENT_READABLE, read_handler); + reply.connect_seq = connect.connect_seq + 1; + + // Clean up output buffer + existing->outcoming_bl.clear(); + existing->requeue_sent(); swap(existing->sd, sd); - existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG; existing->open_write = false; existing->replacing = true; + existing->state_offset = 0; + existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG; + // there should exist any buffer + assert(recv_start == recv_end); + + if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0) { + // handle error + existing->center->dispatch_event_external(existing->write_handler); + } + _stop(); existing->lock.Unlock(); return 0; @@ -1739,14 +1731,24 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a existing->lock.Unlock(); open: - replacing = false; connect_seq = connect.connect_seq + 1; peer_global_seq = connect.global_seq; ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = " - << connect_seq << ", sending READY" << dendl; + << connect_seq << ", sending READY" << dendl; + + int next_state; + + // if it is a hard reset from peer(in_seq == 0), we don't need a round-trip to negotiate in/out sequence + if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && in_seq) { + reply.tag = CEPH_MSGR_TAG_SEQ; + next_state = STATE_ACCEPTING_WAIT_SEQ; + } else { + reply.tag = CEPH_MSGR_TAG_READY; + next_state = STATE_ACCEPTING_READY; + discard_requeued_up_to(0); + } // send READY reply - reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY); reply.features = policy.features_supported; reply.global_seq = async_msgr->get_global_seq(); reply.connect_seq = connect_seq; @@ -1767,18 +1769,12 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a if (reply.authorizer_len) reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length()); - int next_state; + if (reply.tag == CEPH_MSGR_TAG_SEQ) + reply_bl.append((char*)&in_seq, sizeof(in_seq)); - if (reply_tag == CEPH_MSGR_TAG_SEQ) { - reply_bl.append((char*)&existing_seq, sizeof(existing_seq)); - next_state = STATE_ACCEPTING_WAIT_SEQ; - } else { - next_state = STATE_ACCEPTING_READY; - discard_requeued_up_to(0); - } - - // if replacing, this con is alreadly accepted. lock.Unlock(); + // Because "replacing" will prevent other connections preempt this addr, + // it's safe that here we don't acquire Connection's lock r = async_msgr->accept_conn(this); if (async_msgr->cct->_conf->ms_inject_internal_delays) { @@ -1790,6 +1786,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a } lock.Lock(); + replacing = false; if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr << " just fail later one(this)" << dendl; @@ -1806,6 +1803,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a center->dispatch_event_external(accept_handler); async_msgr->ms_deliver_handle_fast_accept(this); + r = _try_send(reply_bl); if (r < 0) goto fail_registered; @@ -1978,7 +1976,6 @@ void AsyncConnection::fault() if (policy.lossy && state != STATE_CONNECTING) { ldout(async_msgr->cct, 10) << __func__ << " on lossy channel, failing" << dendl; center->dispatch_event_external(reset_handler); - discard_out_queue(); _stop(); return ; } @@ -1993,6 +1990,7 @@ void AsyncConnection::fault() requeue_sent(); recv_start = recv_end = 0; state_offset = 0; + replacing = false; outcoming_bl.clear(); if (policy.standby && !is_queued()) { ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl; @@ -2044,17 +2042,15 @@ void AsyncConnection::was_session_reset() } // *note: `async` is true only happen when replacing connection process -void AsyncConnection::_stop(bool replacing) +void AsyncConnection::_stop() { assert(lock.is_locked()); ldout(async_msgr->cct, 10) << __func__ << dendl; if (sd > 0) center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); - if (!replacing) { - discard_out_queue(); - async_msgr->unregister_conn(this); - } + discard_out_queue(); + async_msgr->unregister_conn(this); if (async_msgr->cct->_conf->ms_inject_internal_delays) { ldout(msgr->cct, 10) << __func__ << " sleep for " @@ -2065,10 +2061,10 @@ void AsyncConnection::_stop(bool replacing) t.sleep(); } + state = STATE_CLOSED; shutdown_socket(); open_write = false; state_offset = 0; - state = STATE_CLOSED; if (sd > 0) ::close(sd); sd = -1; diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 690f5863725d..515b1d700d2a 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -53,7 +53,7 @@ class AsyncConnection : public Connection { int read_until(uint64_t needed, char *p); int _process_connection(); void _connect(); - void _stop(bool discard=true); + void _stop(); int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r); int handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl); void was_session_reset(); diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 0a6e9089be21..44b4da1d5188 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -389,6 +389,7 @@ public: AsyncConnectionRef existing = conns[conn->peer_addr]; // lazy delete, see "deleted_conns" + // If conn already in, we will return 0 Mutex::Locker l(deleted_lock); if (deleted_conns.count(existing)) { deleted_conns.erase(existing);