::close(sd);
}
- sd = net.connect(get_peer_addr());
+ sd = net.nonblock_connect(get_peer_addr());
if (sd < 0) {
goto fail;
}
- r = net.set_nonblock(sd);
+
+ center->create_file_event(sd, EVENT_READABLE, read_handler);
+ state = STATE_CONNECTING_RE;
+ break;
+ }
+
+ case STATE_CONNECTING_RE:
+ {
+ r = net.reconnect(get_peer_addr(), sd);
if (r < 0) {
+ ldout(async_msgr->cct, 1) << __func__ << " reconnect failed " << dendl;
goto fail;
+ } else if (r > 0) {
+ break;
}
- center->create_file_event(sd, EVENT_READABLE, read_handler);
+ net.set_socket_options(sd);
+
state = STATE_CONNECTING_WAIT_BANNER;
break;
}
if (backoff > async_msgr->cct->_conf->ms_max_backoff)
backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff);
}
+
state = STATE_CONNECTING;
ldout(async_msgr->cct, 10) << __func__ << " waiting " << backoff << dendl;
}
ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
<< " policy.server is false" << dendl;
_connect();
- } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CLOSED) {
+ } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
r = _try_send(bl);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
STATE_OPEN_TAG_CLOSE,
STATE_WAIT_SEND,
STATE_CONNECTING,
+ STATE_CONNECTING_RE,
STATE_CONNECTING_WAIT_BANNER,
STATE_CONNECTING_WAIT_IDENTIFY_PEER,
STATE_CONNECTING_SEND_CONNECT_MSG,
"STATE_OPEN_TAG_CLOSE",
"STATE_WAIT_SEND",
"STATE_CONNECTING",
+ "STATE_CONNECTING_RE",
"STATE_CONNECTING_WAIT_BANNER",
"STATE_CONNECTING_WAIT_IDENTIFY_PEER",
"STATE_CONNECTING_SEND_CONNECT_MSG",
return s;
}
+int NetHandler::reconnect(const entity_addr_t &addr, int sd)
+{
+ int ret = ::connect(sd, (sockaddr*)&addr.addr, addr.addr_size());
+
+ if (ret < 0 && errno != EISCONN) {
+ ldout(cct, 10) << __func__ << " reconnect: " << strerror(errno) << dendl;
+ if (errno == EINPROGRESS || errno == EALREADY)
+ return 1;
+ return -errno;
+ }
+
+ return 0;
+}
+
int NetHandler::connect(const entity_addr_t &addr)
{
return generic_connect(addr, false);
int set_nonblock(int sd);
void set_socket_options(int sd);
int connect(const entity_addr_t &addr);
+
+ /**
+ * Try to reconnect the socket.
+ *
+ * @return 0 success
+ * > 0 just break, and wait for event
+ * < 0 need to goto fail
+ */
+ int reconnect(const entity_addr_t &addr, int sd);
int nonblock_connect(const entity_addr_t &addr);
};
}