]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: support of non-block connect in async messenger 5848/head
authorJianhui Yuan <zuiwanyuan@gmail.com>
Fri, 13 Nov 2015 07:36:36 +0000 (15:36 +0800)
committerJianhui Yuan <zuiwanyuan@gmail.com>
Fri, 13 Nov 2015 07:36:36 +0000 (15:36 +0800)
Fixes: #12802
Signed-off-by: Jianhui Yuan <zuiwanyuan@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/net_handler.cc
src/msg/async/net_handler.h

index 6f38bbb5c8564734ac7b4b78006717ca0cf56f04..f78e620004f40067dcfe970f5a6fc9896dc7c86a 100644 (file)
@@ -971,16 +971,28 @@ int AsyncConnection::_process_connection()
           ::close(sd);
         }
 
-        sd = net.connect(get_peer_addr());
+        sd = net.nonblock_connect(get_peer_addr());
         if (sd < 0) {
           goto fail;
         }
-        r = net.set_nonblock(sd);
+
+        center->create_file_event(sd, EVENT_READABLE, read_handler);
+        state = STATE_CONNECTING_RE;
+        break;
+      }
+
+    case STATE_CONNECTING_RE:
+      {
+        r = net.reconnect(get_peer_addr(), sd);
         if (r < 0) {
+          ldout(async_msgr->cct, 1) << __func__ << " reconnect failed " << dendl;
           goto fail;
+        } else if (r > 0) {
+          break;
         }
 
-        center->create_file_event(sd, EVENT_READABLE, read_handler);
+        net.set_socket_options(sd);
+
         state = STATE_CONNECTING_WAIT_BANNER;
         break;
       }
@@ -2132,6 +2144,7 @@ void AsyncConnection::fault()
       if (backoff > async_msgr->cct->_conf->ms_max_backoff)
         backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff);
     }
+
     state = STATE_CONNECTING;
     ldout(async_msgr->cct, 10) << __func__ << " waiting " << backoff << dendl;
   }
@@ -2424,7 +2437,7 @@ void AsyncConnection::handle_write()
       ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
                                  << " policy.server is false" << dendl;
       _connect();
-    } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CLOSED) {
+    } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
       r = _try_send(bl);
       if (r < 0) {
         ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
index 64c2921d904dd44cfd126b7bf349f0f185fd5c52..06d046f297c10282ba6f43e88bfc85e4f581c372 100644 (file)
@@ -160,6 +160,7 @@ class AsyncConnection : public Connection {
     STATE_OPEN_TAG_CLOSE,
     STATE_WAIT_SEND,
     STATE_CONNECTING,
+    STATE_CONNECTING_RE,
     STATE_CONNECTING_WAIT_BANNER,
     STATE_CONNECTING_WAIT_IDENTIFY_PEER,
     STATE_CONNECTING_SEND_CONNECT_MSG,
@@ -196,6 +197,7 @@ class AsyncConnection : public Connection {
                                         "STATE_OPEN_TAG_CLOSE",
                                         "STATE_WAIT_SEND",
                                         "STATE_CONNECTING",
+                                        "STATE_CONNECTING_RE",
                                         "STATE_CONNECTING_WAIT_BANNER",
                                         "STATE_CONNECTING_WAIT_IDENTIFY_PEER",
                                         "STATE_CONNECTING_SEND_CONNECT_MSG",
index 2639fdc3b2b9f61f9093c8a849c189110c06d9ff..59e564f458a19e8bfb8a3cf28a90321da147ce90 100644 (file)
@@ -132,6 +132,20 @@ int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock)
   return s;
 }
 
+int NetHandler::reconnect(const entity_addr_t &addr, int sd)
+{
+  int ret = ::connect(sd, (sockaddr*)&addr.addr, addr.addr_size());
+
+  if (ret < 0 && errno != EISCONN) {
+    ldout(cct, 10) << __func__ << " reconnect: " << strerror(errno) << dendl;
+    if (errno == EINPROGRESS || errno == EALREADY)
+      return 1;
+    return -errno;
+  }
+
+  return 0;
+}
+
 int NetHandler::connect(const entity_addr_t &addr)
 {
   return generic_connect(addr, false);
index 0179ddae8d7fc78e00a9ea073993f8d106c1f0a4..64423dc5bf369a663d88ebc158463089ab1ad28e 100644 (file)
@@ -30,6 +30,15 @@ namespace ceph {
     int set_nonblock(int sd);
     void set_socket_options(int sd);
     int connect(const entity_addr_t &addr);
+    
+    /**
+     * Try to reconnect the socket.
+     *
+     * @return    0         success
+     *            > 0       just break, and wait for event
+     *            < 0       need to goto fail
+     */
+    int reconnect(const entity_addr_t &addr, int sd);
     int nonblock_connect(const entity_addr_t &addr);
   };
 }