r = ms_hb_front_server->bind(hb_front_addr);
if (r < 0)
exit(1);
- r = ms_hb_front_client->client_bind(hb_back_addr);
+ r = ms_hb_front_client->client_bind(hb_front_addr);
if (r < 0)
exit(1);
OPTION(ms_bind_retry_count, OPT_INT, 6) // If binding fails, how many times do we retry to bind
OPTION(ms_bind_retry_delay, OPT_INT, 6) // Delay between attemps to bind
#endif
+OPTION(ms_bind_before_connect, OPT_BOOL, true)
OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10)
OPTION(ms_tcp_read_timeout, OPT_U64, 900)
OPTION(ms_pq_max_tokens_per_priority, OPT_U64, 16777216)
SocketOptions opts;
opts.priority = async_msgr->get_socket_priority();
+ opts.connect_bind_addr = msgr->get_myaddr();
r = worker->connect(get_peer_addr(), opts, &cs);
if (r < 0)
goto fail;
int sd;
if (opts.nonblock) {
- sd = net.nonblock_connect(addr);
+ sd = net.nonblock_connect(addr, opts.connect_bind_addr);
} else {
- sd = net.connect(addr);
+ sd = net.connect(addr, opts.connect_bind_addr);
}
if (sd < 0) {
bool nodelay = true;
int rcbuf_size = 0;
int priority = -1;
+ entity_addr_t connect_bind_addr;
};
/// \cond internal
}
}
-int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock)
+int NetHandler::generic_connect(const entity_addr_t& addr, const entity_addr_t &bind_addr, bool nonblock)
{
int ret;
int s = create_socket(addr.get_family());
set_socket_options(s, cct->_conf->ms_tcp_nodelay, cct->_conf->ms_tcp_rcvbuf);
+ {
+ entity_addr_t addr = bind_addr;
+ if (cct->_conf->ms_bind_before_connect && (!addr.is_blank_ip())) {
+ addr.set_port(0);
+ ret = ::bind(s, addr.get_sockaddr(), addr.get_sockaddr_len());
+ if (ret < 0) {
+ ret = -errno;
+ ldout(cct, 2) << __func__ << " client bind error " << ", " << cpp_strerror(ret) << dendl;
+ return ret;
+ }
+ }
+ }
ret = ::connect(s, addr.get_sockaddr(), addr.get_sockaddr_len());
if (ret < 0) {
return 0;
}
-int NetHandler::connect(const entity_addr_t &addr)
+int NetHandler::connect(const entity_addr_t &addr, const entity_addr_t& bind_addr)
{
- return generic_connect(addr, false);
+ return generic_connect(addr, bind_addr, false);
}
-int NetHandler::nonblock_connect(const entity_addr_t &addr)
+int NetHandler::nonblock_connect(const entity_addr_t &addr, const entity_addr_t& bind_addr)
{
- return generic_connect(addr, true);
+ return generic_connect(addr, bind_addr, true);
}
namespace ceph {
class NetHandler {
- int generic_connect(const entity_addr_t& addr, bool nonblock);
+ int generic_connect(const entity_addr_t& addr, const entity_addr_t& bind_addr, bool nonblock);
CephContext *cct;
public:
int set_nonblock(int sd);
void set_close_on_exec(int sd);
int set_socket_options(int sd, bool nodelay, int size);
- int connect(const entity_addr_t &addr);
+ int connect(const entity_addr_t &addr, const entity_addr_t& bind_addr);
/**
* Try to reconnect the socket.
* < 0 need to goto fail
*/
int reconnect(const entity_addr_t &addr, int sd);
- int nonblock_connect(const entity_addr_t &addr);
+ int nonblock_connect(const entity_addr_t &addr, const entity_addr_t& bind_addr);
void set_priority(int sd, int priority);
};
}
set_socket_options();
+ {
+ entity_addr_t addr2bind = msgr->get_myaddr();
+ if (msgr->cct->_conf->ms_bind_before_connect && (!addr2bind.is_blank_ip())) {
+ addr2bind.set_port(0);
+ int r = ::bind(sd , addr2bind.get_sockaddr(), addr2bind.get_sockaddr_len());
+ if (r < 0) {
+ ldout(msgr->cct,2) << "client bind error " << ", " << cpp_strerror(errno) << dendl;
+ goto fail;
+ }
+ }
+ }
+
// connect!
ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len());