]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: fix incorrect condition for exchanging in_seq 3665/head
authorHaomai Wang <haomaiwang@gmail.com>
Sun, 8 Feb 2015 07:47:39 +0000 (15:47 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Sun, 8 Feb 2015 08:01:43 +0000 (16:01 +0800)
Originally we use "in_seq==0" to judge whether need to exchange in_seq,
it's wrong when peer side already receive message and need to reply new
in_seq to this side.

Now use "is_reset_from_peer" to indicate whether not need to exchange

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/test/msgr/test_async_driver.cc

index e6014a6820470f9531c46227fee2c14c13e52b3b..19647dbe92d87c3f06e89c4e1977a4b17782bf8a 100644 (file)
@@ -181,7 +181,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente
     recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
     recv_start(0), recv_end(0), stop_lock("AsyncConnection::stop_lock"),
     got_bad_auth(false), authorizer(NULL), replacing(false), once_session_reset(false),
-    state_buffer(NULL), state_offset(0), net(cct), center(c)
+    is_reset_from_peer(false), state_buffer(NULL), state_offset(0), net(cct), center(c)
 {
   read_handler.reset(new C_handle_read(this));
   write_handler.reset(new C_handle_write(this));
@@ -1510,7 +1510,6 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   int r = 0;
   ceph_msg_connect_reply reply;
   bufferlist reply_bl;
-  bool is_reset_from_peer = false;
 
   memset(&reply, 0, sizeof(reply));
   reply.protocol_version = async_msgr->get_proto_version(peer_type, false);
@@ -1717,8 +1716,9 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
 
     // reset the in_seq if this is a hard reset from peer,
     // otherwise we respect our original connection's value
-    if (is_reset_from_peer)
-      existing->in_seq = 0;
+    if (is_reset_from_peer) {
+      existing->is_reset_from_peer = true;
+    }
 
     // Now existing connection will be alive and the current connection will
     // exchange socket with existing connection because we want to maintain
@@ -1761,14 +1761,16 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
 
   int next_state;
 
-  // if it is a hard reset from peer(in_seq == 0), we don't need a round-trip to negotiate in/out sequence
-  if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && in_seq) {
+  // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
+  if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
     reply.tag = CEPH_MSGR_TAG_SEQ;
     next_state = STATE_ACCEPTING_WAIT_SEQ;
   } else {
     reply.tag = CEPH_MSGR_TAG_READY;
     next_state = STATE_ACCEPTING_READY;
     discard_requeued_up_to(0);
+    is_reset_from_peer = false;
+    in_seq = 0;
   }
 
   // send READY reply
index 25a21b4ff2a8a23866250887bfe02fd85cb1593f..76436f2788e7398d0fe16b47a6b3c0b2c244f305 100644 (file)
@@ -267,6 +267,7 @@ class AsyncConnection : public Connection {
                      // "replacing" to skip RESETSESSION to avoid detect wrong
                      // presentation
   bool once_session_reset;
+  bool is_reset_from_peer;
   atomic_t stopping;
 
   // used only for local state, it will be overwrite when state transition
index 369085109d909330223564db6ce61f1b8861ff85..91af54f79795bc9fdc0538a8b63d8c8f21f0acd0 100644 (file)
@@ -247,7 +247,7 @@ class FakeEvent : public EventCallback {
 
 TEST(EventCenterTest, FileEventExpansion) {
   vector<int> sds;
-  EventCenter center;
+  EventCenter center(g_ceph_context);
   center.init(100);
   EventCallbackRef e(new FakeEvent());
   for (int i = 0; i < 10000; i++) {