]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Lock existing's lock in advance avoid existing's state changed
authorHaomai Wang <haomaiwang@gmail.com>
Fri, 13 Mar 2015 04:21:32 +0000 (12:21 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Fri, 13 Mar 2015 06:10:22 +0000 (14:10 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc

index bda361a4440b888703a5feb358f09bd09d6f377e..8db6849302b80ae5d4772f75957ecf7d3835b7d6 100644 (file)
@@ -272,6 +272,7 @@ int AsyncConnection::do_sendmsg(struct msghdr &msg, int len, bool more)
 // else return < 0 means error
 int AsyncConnection::_try_send(bufferlist send_bl, bool send)
 {
+  assert(lock.is_locked());
   if (send_bl.length()) {
     if (outcoming_bl.length())
       outcoming_bl.claim_append(send_bl);
@@ -1587,11 +1588,27 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   if (existing == this)
     existing = NULL;
   if (existing) {
+    // There is no possible that existing connection will acquire this
+    // connection's lock
+    existing->lock.Lock();
+
+    if (existing->replacing || existing->state == STATE_CLOSED) {
+      ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
+                                << " state=" << get_state_name(existing->state) << dendl;
+      reply.global_seq = existing->peer_global_seq;
+      r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
+      existing->lock.Unlock();
+      if (r < 0)
+        goto fail;
+      return 0;
+    }
+
     if (connect.global_seq < existing->peer_global_seq) {
       ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
                            << ".gseq " << existing->peer_global_seq << " > "
                            << connect.global_seq << ", RETRY_GLOBAL" << dendl;
       reply.global_seq = existing->peer_global_seq;  // so we can send it below..
+      existing->lock.Unlock();
       return _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
     } else {
       ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
@@ -1625,6 +1642,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
                            << existing->connect_seq << " > " << connect.connect_seq
                            << ", RETRY_SESSION" << dendl;
       reply.connect_seq = existing->connect_seq + 1;
+      existing->lock.Unlock();
       return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
     }
 
@@ -1639,6 +1657,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
                              << ".cseq " << existing->connect_seq << " == "
                              << connect.connect_seq << ", OPEN|STANDBY, RETRY_SESSION" << dendl;
         reply.connect_seq = existing->connect_seq + 1;
+        existing->lock.Unlock();
         return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
       }
 
@@ -1657,6 +1676,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
         assert(peer_addr > async_msgr->get_myaddr());
         // make sure our outgoing connection will follow through
         existing->send_keepalive();
+        existing->lock.Unlock();
         return _reply_accept(CEPH_MSGR_TAG_WAIT, connect, reply, authorizer_reply);
       }
     }
@@ -1668,6 +1688,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
       ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
                           << connect.connect_seq << ", " << existing << ".cseq = "
                           << existing->connect_seq << "), sending RESETSESSION" << dendl;
+      existing->lock.Unlock();
       return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply);
     }
 
@@ -1700,20 +1721,6 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     t.sleep();
   }
 
-  // There is no possible that existing connection will acquire this lock
-  existing->lock.Lock();
-
-  if (existing->replacing || existing->state == STATE_CLOSED) {
-    ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
-                              << " state=" << get_state_name(existing->state) << dendl;
-    reply.global_seq = existing->peer_global_seq;
-    r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
-    existing->lock.Unlock();
-    if (r < 0)
-      goto fail;
-    return 0;
-  }
-
   if (existing->policy.lossy) {
     // disconnect from the Connection
     existing->center->dispatch_event_external(existing->reset_handler);