From: Jianhui Yuan Date: Fri, 13 Nov 2015 07:36:36 +0000 (+0800) Subject: msg/async: support of non-block connect in async messenger X-Git-Tag: v10.0.1~77^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F5848%2Fhead;p=ceph.git msg/async: support of non-block connect in async messenger Fixes: #12802 Signed-off-by: Jianhui Yuan --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 6f38bbb5c856..f78e620004f4 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -971,16 +971,28 @@ int AsyncConnection::_process_connection() ::close(sd); } - sd = net.connect(get_peer_addr()); + sd = net.nonblock_connect(get_peer_addr()); if (sd < 0) { goto fail; } - r = net.set_nonblock(sd); + + center->create_file_event(sd, EVENT_READABLE, read_handler); + state = STATE_CONNECTING_RE; + break; + } + + case STATE_CONNECTING_RE: + { + r = net.reconnect(get_peer_addr(), sd); if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " reconnect failed " << dendl; goto fail; + } else if (r > 0) { + break; } - center->create_file_event(sd, EVENT_READABLE, read_handler); + net.set_socket_options(sd); + state = STATE_CONNECTING_WAIT_BANNER; break; } @@ -2132,6 +2144,7 @@ void AsyncConnection::fault() if (backoff > async_msgr->cct->_conf->ms_max_backoff) backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff); } + state = STATE_CONNECTING; ldout(async_msgr->cct, 10) << __func__ << " waiting " << backoff << dendl; } @@ -2424,7 +2437,7 @@ void AsyncConnection::handle_write() ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state) << " policy.server is false" << dendl; _connect(); - } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CLOSED) { + } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) { r = _try_send(bl); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl; diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 64c2921d904d..06d046f297c1 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -160,6 +160,7 @@ class AsyncConnection : public Connection { STATE_OPEN_TAG_CLOSE, STATE_WAIT_SEND, STATE_CONNECTING, + STATE_CONNECTING_RE, STATE_CONNECTING_WAIT_BANNER, STATE_CONNECTING_WAIT_IDENTIFY_PEER, STATE_CONNECTING_SEND_CONNECT_MSG, @@ -196,6 +197,7 @@ class AsyncConnection : public Connection { "STATE_OPEN_TAG_CLOSE", "STATE_WAIT_SEND", "STATE_CONNECTING", + "STATE_CONNECTING_RE", "STATE_CONNECTING_WAIT_BANNER", "STATE_CONNECTING_WAIT_IDENTIFY_PEER", "STATE_CONNECTING_SEND_CONNECT_MSG", diff --git a/src/msg/async/net_handler.cc b/src/msg/async/net_handler.cc index 2639fdc3b2b9..59e564f458a1 100644 --- a/src/msg/async/net_handler.cc +++ b/src/msg/async/net_handler.cc @@ -132,6 +132,20 @@ int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock) return s; } +int NetHandler::reconnect(const entity_addr_t &addr, int sd) +{ + int ret = ::connect(sd, (sockaddr*)&addr.addr, addr.addr_size()); + + if (ret < 0 && errno != EISCONN) { + ldout(cct, 10) << __func__ << " reconnect: " << strerror(errno) << dendl; + if (errno == EINPROGRESS || errno == EALREADY) + return 1; + return -errno; + } + + return 0; +} + int NetHandler::connect(const entity_addr_t &addr) { return generic_connect(addr, false); diff --git a/src/msg/async/net_handler.h b/src/msg/async/net_handler.h index 0179ddae8d7f..64423dc5bf36 100644 --- a/src/msg/async/net_handler.h +++ b/src/msg/async/net_handler.h @@ -30,6 +30,15 @@ namespace ceph { int set_nonblock(int sd); void set_socket_options(int sd); int connect(const entity_addr_t &addr); + + /** + * Try to reconnect the socket. + * + * @return 0 success + * > 0 just break, and wait for event + * < 0 need to goto fail + */ + int reconnect(const entity_addr_t &addr, int sd); int nonblock_connect(const entity_addr_t &addr); }; }