From d2eaeea081a729f9c5b4fb0386c717a21becdb1d Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Sun, 8 Feb 2015 15:47:39 +0800 Subject: [PATCH] AsyncConnection: fix incorrect condition for exchanging in_seq Originally we use "in_seq==0" to judge whether need to exchange in_seq, it's wrong when peer side already receive message and need to reply new in_seq to this side. Now use "is_reset_from_peer" to indicate whether not need to exchange Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 14 ++++++++------ src/msg/async/AsyncConnection.h | 1 + src/test/msgr/test_async_driver.cc | 2 +- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index e6014a682047..19647dbe92d8 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -181,7 +181,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), recv_start(0), recv_end(0), stop_lock("AsyncConnection::stop_lock"), got_bad_auth(false), authorizer(NULL), replacing(false), once_session_reset(false), - state_buffer(NULL), state_offset(0), net(cct), center(c) + is_reset_from_peer(false), state_buffer(NULL), state_offset(0), net(cct), center(c) { read_handler.reset(new C_handle_read(this)); write_handler.reset(new C_handle_write(this)); @@ -1510,7 +1510,6 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a int r = 0; ceph_msg_connect_reply reply; bufferlist reply_bl; - bool is_reset_from_peer = false; memset(&reply, 0, sizeof(reply)); reply.protocol_version = async_msgr->get_proto_version(peer_type, false); @@ -1717,8 +1716,9 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a // reset the in_seq if this is a hard reset from peer, // otherwise we respect our original connection's value - if (is_reset_from_peer) - existing->in_seq = 0; + if (is_reset_from_peer) { + existing->is_reset_from_peer = true; + } // Now existing connection will be alive and the current connection will // exchange socket with existing connection because we want to maintain @@ -1761,14 +1761,16 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a int next_state; - // if it is a hard reset from peer(in_seq == 0), we don't need a round-trip to negotiate in/out sequence - if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && in_seq) { + // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence + if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) { reply.tag = CEPH_MSGR_TAG_SEQ; next_state = STATE_ACCEPTING_WAIT_SEQ; } else { reply.tag = CEPH_MSGR_TAG_READY; next_state = STATE_ACCEPTING_READY; discard_requeued_up_to(0); + is_reset_from_peer = false; + in_seq = 0; } // send READY reply diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 25a21b4ff2a8..76436f2788e7 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -267,6 +267,7 @@ class AsyncConnection : public Connection { // "replacing" to skip RESETSESSION to avoid detect wrong // presentation bool once_session_reset; + bool is_reset_from_peer; atomic_t stopping; // used only for local state, it will be overwrite when state transition diff --git a/src/test/msgr/test_async_driver.cc b/src/test/msgr/test_async_driver.cc index 369085109d90..91af54f79795 100644 --- a/src/test/msgr/test_async_driver.cc +++ b/src/test/msgr/test_async_driver.cc @@ -247,7 +247,7 @@ class FakeEvent : public EventCallback { TEST(EventCenterTest, FileEventExpansion) { vector sds; - EventCenter center; + EventCenter center(g_ceph_context); center.init(100); EventCallbackRef e(new FakeEvent()); for (int i = 0; i < 10000; i++) { -- 2.47.3