]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Use retry_global tag instead of retry_session
authorHaomai Wang <haomaiwang@gmail.com>
Wed, 11 Feb 2015 09:37:34 +0000 (17:37 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Wed, 11 Feb 2015 09:37:34 +0000 (17:37 +0800)
Via two qa lab tests, it's hard to use retry_session which will cause connect_seq
increase, it will let reset_check harder. So use retry_global should avoid side
effect for replacing.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc

index 55431f091a114db8221e687e80a1b64e36bc5efe..e043f778ead444e8dc8c43649d3615dd400035f4 100644 (file)
@@ -180,7 +180,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente
     port(-1), lock("AsyncConnection::lock"), open_write(false), keepalive(false), recv_buf(NULL),
     recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
     recv_start(0), recv_end(0), stop_lock("AsyncConnection::stop_lock"),
-    got_bad_auth(false), authorizer(NULL), replacing(false), allow_session_reset(true),
+    got_bad_auth(false), authorizer(NULL), replacing(false),
     is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c)
 {
   read_handler.reset(new C_handle_read(this));
@@ -1472,10 +1472,10 @@ int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_co
   }
   if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
     assert(reply.connect_seq > connect_seq);
-    connect_seq = reply.connect_seq;
     ldout(async_msgr->cct, 10) << __func__ << " connect got RETRY_SESSION "
-                         << connect_seq << " -> "
-                         << reply.connect_seq << dendl;
+                               << connect_seq << " -> "
+                               << reply.connect_seq << dendl;
+    connect_seq = reply.connect_seq;
     state = STATE_CONNECTING_SEND_CONNECT_MSG;
   }
   if (reply.tag == CEPH_MSGR_TAG_WAIT) {
@@ -1656,11 +1656,10 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     assert(connect.connect_seq > existing->connect_seq);
     assert(connect.global_seq >= existing->peer_global_seq);
     if (policy.resetcheck &&   // RESETSESSION only used by servers; peers do not reset each other
-        existing->connect_seq == 0 && allow_session_reset) {
+        existing->connect_seq == 0) {
       ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
                           << connect.connect_seq << ", " << existing << ".cseq = "
                           << existing->connect_seq << "), sending RESETSESSION" << dendl;
-      allow_session_reset = false;
       return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply);
     }
 
@@ -1699,8 +1698,8 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   if (existing->replacing || existing->state == STATE_CLOSED) {
     ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
                               << " state=" << get_state_name(existing->state) << dendl;
-    reply.connect_seq = connect.connect_seq + 1;
-    r = _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
+    reply.global_seq = existing->peer_global_seq;
+    r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
     existing->lock.Unlock();
     if (r < 0)
       goto fail;
@@ -1729,7 +1728,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
     existing->center->create_file_event(sd, EVENT_READABLE, existing->read_handler);
 
-    reply.connect_seq = connect.connect_seq + 1;
+    reply.global_seq = existing->peer_global_seq;
 
     // Clean up output buffer
     existing->outcoming_bl.clear();
@@ -1743,7 +1742,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     // there should exist any buffer
     assert(recv_start == recv_end);
 
-    if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0) {
+    if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
       // handle error
       existing->center->dispatch_event_external(existing->write_handler);
     }
@@ -1812,9 +1811,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   }
 
   lock.Lock();
-  once_ready = true;
   replacing = false;
-  allow_session_reset = true;
   if (r < 0) {
     ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr
                               << " just fail later one(this)" << dendl;
@@ -1827,15 +1824,15 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     goto fail_registered;
   }
 
-  // notify
-  center->dispatch_event_external(accept_handler);
-  async_msgr->ms_deliver_handle_fast_accept(this);
-
-
   r = _try_send(reply_bl);
   if (r < 0)
     goto fail_registered;
 
+  // notify
+  center->dispatch_event_external(accept_handler);
+  async_msgr->ms_deliver_handle_fast_accept(this);
+  once_ready = true;
+
   if (r == 0) {
     state = next_state;
     ldout(async_msgr->cct, 2) << __func__ << " accept write reply msg done" << dendl;
@@ -2025,14 +2022,17 @@ void AsyncConnection::fault()
   state_offset = 0;
   replacing = false;
   outcoming_bl.clear();
+  if (!once_ready && !is_queued() &&
+    state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
+  ldout(async_msgr->cct, 0) << __func__ << " with nothing to send and in the half "
+                            << "accept state just closed, state="
+                            << get_state_name(state) << dendl;
+   _stop();
+   return ;
+  }
   if (policy.standby && !is_queued()) {
-    if (!once_ready) {
-      // a half connection, close
-      _stop();
-    } else {
-      ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl;
-      state = STATE_STANDBY;
-    }
+    ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl;
+    state = STATE_STANDBY;
     return;
   }
 
@@ -2072,13 +2072,13 @@ void AsyncConnection::was_session_reset()
   center->dispatch_event_external(remote_reset_handler);
 
   if (randomize_out_seq()) {
-    lsubdout(async_msgr->cct,ms,15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
+    ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
   }
 
   in_seq = 0;
   connect_seq = 0;
   in_seq_acked = 0;
-  allow_session_reset = true;
+  once_ready = false;
 }
 
 void AsyncConnection::_stop()