]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Enhance replace process
authorHaomai Wang <haomaiwang@gmail.com>
Tue, 13 Jan 2015 03:54:54 +0000 (11:54 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Thu, 15 Jan 2015 19:07:12 +0000 (03:07 +0800)
Make handle_connect_msg follow lock rule: unlock any lock before acquire
messenger's lock. Otherwise, deadlock will happen.

Enhance lock condition check because connection's state maybe change while
unlock itself and lock again.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.h

index 5673c179e16fcfae51d34fbd70e625a3a329acc6..b588c4fe6e82dae83b460384829e4380740ae4ba 100644 (file)
@@ -1469,7 +1469,7 @@ int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_co
 int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &authorizer_bl,
                                         bufferlist &authorizer_reply)
 {
-  int r;
+  int r = 0;
   ceph_msg_connect_reply reply;
   bufferlist reply_bl;
   uint64_t existing_seq = -1;
@@ -1521,7 +1521,16 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   ldout(async_msgr->cct, 10) << __func__ << " accept:  setting up session_security." << dendl;
 
   // existing?
+  lock.Unlock();
   AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr);
+  lock.Lock();
+  if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
+    ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down, state="
+                              << get_state_name(state) << dendl;
+    assert(state == STATE_CLOSED);
+    goto fail;
+  }
+
   if (existing) {
     if (connect.global_seq < existing->peer_global_seq) {
       ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
@@ -1633,18 +1642,21 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   }
   ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl;
 
-  // In order to avoid dead lock, here need to lock in ordering.
-  // It may be another thread access this connection between unlock and lock
-  // call, this is rely to EventCenter to guarantee only one thread can access
-  // one connection.
-  lock.Unlock();
-  if (existing->sd > sd) {
-    existing->lock.Lock();
-    lock.Lock();
-  } else {
-    lock.Lock();
-    existing->lock.Lock();
+  // There is no possible that existing connection will acquire this lock
+  existing->lock.Lock();
+
+  if (existing->replacing || existing->state == STATE_CLOSED) {
+    ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
+                              << " state=" << get_state_name(existing->state) << dendl;
+    reply.connect_seq = existing->connect_seq + 1;
+    r = _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
+    existing->lock.Unlock();
+    if (r < 0) {
+      goto fail;
+    }
+    return r;
   }
+
   // Here we use "_stop" instead of "mark_down" because "mark_down" is a async
   // operation, but now we need ensure all variables in `existing` is cleaned up
   // and we will reuse it next.
@@ -1665,14 +1677,19 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     existing->outcoming_bl.clear();
     existing->requeue_sent();
     reply.connect_seq = connect.connect_seq + 1;
-    if (_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0)
+    if (_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0) {
+      existing->lock.Unlock();
       goto fail;
+    }
 
     // Now existing connection will be alive and the current connection will
     // exchange socket with existing connection because we want to maintain
     // original "connection_state"
     center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
-    center->create_file_event(sd, EVENT_READABLE, existing->read_handler);
+    existing->center->create_file_event(sd, EVENT_READABLE, existing->read_handler);
+
+    existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE);
+    center->create_file_event(existing->sd, EVENT_READABLE, read_handler);
 
     swap(existing->sd, sd);
     existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
@@ -1709,13 +1726,6 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
       get_auth_session_handler(async_msgr->cct, connect.authorizer_protocol,
                                session_key, get_features()));
 
-  // notify
-  center->dispatch_event_external(accept_handler);
-  async_msgr->ms_deliver_handle_fast_accept(this);
-
-  // ok!
-  async_msgr->accept_conn(this);
-
   reply_bl.append((char*)&reply, sizeof(reply));
 
   if (reply.authorizer_len)
@@ -1731,10 +1741,29 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     discard_requeued_up_to(0);
   }
 
-  r = _try_send(reply_bl);
+  // if replacing, this con is alreadly accepted.
+  lock.Unlock();
+  r = async_msgr->accept_conn(this);
+  lock.Lock();
   if (r < 0) {
+    ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr
+                              << " just fail later one(this)" << dendl;
     goto fail;
   }
+  if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
+    ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down, state="
+                              << get_state_name(state) << dendl;
+    assert(state == STATE_CLOSED);
+    goto fail;
+  }
+
+  // notify
+  center->dispatch_event_external(accept_handler);
+  async_msgr->ms_deliver_handle_fast_accept(this);
+
+  r = _try_send(reply_bl);
+  if (r < 0)
+    goto fail;
 
   if (r == 0) {
     state = next_state;
index d6c1771495ddd7982078a7f691e56926db4131b3..515b1d700d2a042ca3b6fae7dfce9e1c154d80c3 100644 (file)
@@ -157,7 +157,6 @@ class AsyncConnection : public Connection {
     STATE_CONNECTING_WAIT_ACK_SEQ,
     STATE_CONNECTING_READY,
     STATE_ACCEPTING,
-    STATE_ACCEPTING_HANDLE_CONNECT,
     STATE_ACCEPTING_WAIT_BANNER_ADDR,
     STATE_ACCEPTING_WAIT_CONNECT_MSG,
     STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH,
@@ -194,7 +193,6 @@ class AsyncConnection : public Connection {
                                         "STATE_CONNECTING_WAIT_ACK_SEQ",
                                         "STATE_CONNECTING_READY",
                                         "STATE_ACCEPTING",
-                                        "STATE_ACCEPTING_HANDLE_CONNECT",
                                         "STATE_ACCEPTING_WAIT_BANNER_ADDR",
                                         "STATE_ACCEPTING_WAIT_CONNECT_MSG",
                                         "STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH",
@@ -202,8 +200,7 @@ class AsyncConnection : public Connection {
                                         "STATE_ACCEPTING_READY",
                                         "STATE_STANDBY",
                                         "STATE_CLOSED",
-                                        "STATE_WAIT",
-                                        "STATE_FAULT"};
+                                        "STATE_WAIT"};
       return statenames[state];
   }
 
index 972934ad8efb670f2650c5fa6ec0cb22c08cc356..0a6e9089be21069324e5ac1e54baeea645df1a4d 100644 (file)
@@ -383,10 +383,23 @@ public:
     return _lookup_conn(k);
   }
 
-  void accept_conn(AsyncConnectionRef conn) {
+  int accept_conn(AsyncConnectionRef conn) {
     Mutex::Locker l(lock);
+    if (conns.count(conn->peer_addr)) {
+      AsyncConnectionRef existing = conns[conn->peer_addr];
+
+      // lazy delete, see "deleted_conns"
+      Mutex::Locker l(deleted_lock);
+      if (deleted_conns.count(existing)) {
+        deleted_conns.erase(existing);
+        existing->put();
+      } else if (conn != existing) {
+        return -1;
+      }
+    }
     conns[conn->peer_addr] = conn;
     accepting_conns.erase(conn);
+    return 0;
   }
 
   void learned_addr(const entity_addr_t &peer_addr_for_me);