From: Haomai Wang Date: Thu, 12 Jan 2017 07:14:42 +0000 (+0800) Subject: msg: add ms_bind_before_connect to bind before connect X-Git-Tag: v12.0.0~112^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6e4ed291afc39c145e501a2dfee8e02d41103ad0;p=ceph.git msg: add ms_bind_before_connect to bind before connect Signed-off-by: Zengran Zhang Signed-off-by: Haomai Wang --- diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index 4aab15d4515e..49f7b2e62505 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -573,7 +573,7 @@ int main(int argc, const char **argv) r = ms_hb_front_server->bind(hb_front_addr); if (r < 0) exit(1); - r = ms_hb_front_client->client_bind(hb_back_addr); + r = ms_hb_front_client->client_bind(hb_front_addr); if (r < 0) exit(1); diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 85121c1180e4..ba8b360992b5 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -198,6 +198,7 @@ OPTION(ms_bind_retry_delay, OPT_INT, 5) // Delay between attemps to bind OPTION(ms_bind_retry_count, OPT_INT, 6) // If binding fails, how many times do we retry to bind OPTION(ms_bind_retry_delay, OPT_INT, 6) // Delay between attemps to bind #endif +OPTION(ms_bind_before_connect, OPT_BOOL, true) OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10) OPTION(ms_tcp_read_timeout, OPT_U64, 900) OPTION(ms_pq_max_tokens_per_priority, OPT_U64, 16777216) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index fc4b75197cf2..3abe25388e2d 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -861,6 +861,7 @@ ssize_t AsyncConnection::_process_connection() SocketOptions opts; opts.priority = async_msgr->get_socket_priority(); + opts.connect_bind_addr = msgr->get_myaddr(); r = worker->connect(get_peer_addr(), opts, &cs); if (r < 0) goto fail; diff --git a/src/msg/async/PosixStack.cc b/src/msg/async/PosixStack.cc index 7668ee7919f3..67495c60c988 100644 --- a/src/msg/async/PosixStack.cc +++ b/src/msg/async/PosixStack.cc @@ -339,9 +339,9 @@ int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, C int sd; if (opts.nonblock) { - sd = net.nonblock_connect(addr); + sd = net.nonblock_connect(addr, opts.connect_bind_addr); } else { - sd = net.connect(addr); + sd = net.connect(addr, opts.connect_bind_addr); } if (sd < 0) { diff --git a/src/msg/async/Stack.h b/src/msg/async/Stack.h index b1cb5c3cf5da..c16308cc9c49 100644 --- a/src/msg/async/Stack.h +++ b/src/msg/async/Stack.h @@ -42,6 +42,7 @@ struct SocketOptions { bool nodelay = true; int rcbuf_size = 0; int priority = -1; + entity_addr_t connect_bind_addr; }; /// \cond internal diff --git a/src/msg/async/net_handler.cc b/src/msg/async/net_handler.cc index 3ab1855da9f6..690799406d8e 100644 --- a/src/msg/async/net_handler.cc +++ b/src/msg/async/net_handler.cc @@ -150,7 +150,7 @@ void NetHandler::set_priority(int sd, int prio) } } -int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock) +int NetHandler::generic_connect(const entity_addr_t& addr, const entity_addr_t &bind_addr, bool nonblock) { int ret; int s = create_socket(addr.get_family()); @@ -167,6 +167,18 @@ int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock) set_socket_options(s, cct->_conf->ms_tcp_nodelay, cct->_conf->ms_tcp_rcvbuf); + { + entity_addr_t addr = bind_addr; + if (cct->_conf->ms_bind_before_connect && (!addr.is_blank_ip())) { + addr.set_port(0); + ret = ::bind(s, addr.get_sockaddr(), addr.get_sockaddr_len()); + if (ret < 0) { + ret = -errno; + ldout(cct, 2) << __func__ << " client bind error " << ", " << cpp_strerror(ret) << dendl; + return ret; + } + } + } ret = ::connect(s, addr.get_sockaddr(), addr.get_sockaddr_len()); if (ret < 0) { @@ -195,14 +207,14 @@ int NetHandler::reconnect(const entity_addr_t &addr, int sd) return 0; } -int NetHandler::connect(const entity_addr_t &addr) +int NetHandler::connect(const entity_addr_t &addr, const entity_addr_t& bind_addr) { - return generic_connect(addr, false); + return generic_connect(addr, bind_addr, false); } -int NetHandler::nonblock_connect(const entity_addr_t &addr) +int NetHandler::nonblock_connect(const entity_addr_t &addr, const entity_addr_t& bind_addr) { - return generic_connect(addr, true); + return generic_connect(addr, bind_addr, true); } diff --git a/src/msg/async/net_handler.h b/src/msg/async/net_handler.h index 311276dba8ba..e9e701f48813 100644 --- a/src/msg/async/net_handler.h +++ b/src/msg/async/net_handler.h @@ -20,7 +20,7 @@ namespace ceph { class NetHandler { - int generic_connect(const entity_addr_t& addr, bool nonblock); + int generic_connect(const entity_addr_t& addr, const entity_addr_t& bind_addr, bool nonblock); CephContext *cct; public: @@ -29,7 +29,7 @@ namespace ceph { int set_nonblock(int sd); void set_close_on_exec(int sd); int set_socket_options(int sd, bool nodelay, int size); - int connect(const entity_addr_t &addr); + int connect(const entity_addr_t &addr, const entity_addr_t& bind_addr); /** * Try to reconnect the socket. @@ -39,7 +39,7 @@ namespace ceph { * < 0 need to goto fail */ int reconnect(const entity_addr_t &addr, int sd); - int nonblock_connect(const entity_addr_t &addr); + int nonblock_connect(const entity_addr_t &addr, const entity_addr_t& bind_addr); void set_priority(int sd, int priority); }; } diff --git a/src/msg/simple/Pipe.cc b/src/msg/simple/Pipe.cc index c47f7346246d..fb8bed2fa292 100644 --- a/src/msg/simple/Pipe.cc +++ b/src/msg/simple/Pipe.cc @@ -989,6 +989,18 @@ int Pipe::connect() set_socket_options(); + { + entity_addr_t addr2bind = msgr->get_myaddr(); + if (msgr->cct->_conf->ms_bind_before_connect && (!addr2bind.is_blank_ip())) { + addr2bind.set_port(0); + int r = ::bind(sd , addr2bind.get_sockaddr(), addr2bind.get_sockaddr_len()); + if (r < 0) { + ldout(msgr->cct,2) << "client bind error " << ", " << cpp_strerror(errno) << dendl; + goto fail; + } + } + } + // connect! ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl; rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len());