From 57d756332d6a42f7065f26004d6662772c8ea61d Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 4 Jun 2018 16:36:46 -0500 Subject: [PATCH] msg/async: bind to multiple addresses Signed-off-by: Sage Weil --- src/msg/async/AsyncMessenger.cc | 202 ++++++++++++++++++-------------- src/msg/async/AsyncMessenger.h | 14 ++- 2 files changed, 119 insertions(+), 97 deletions(-) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 352b907d26e..a8407e18315 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -59,92 +59,91 @@ Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c) : msgr(r), net(c), worker(w), listen_handler(new C_processor_accept(this)) {} -int Processor::bind(const entity_addr_t &bind_addr, +int Processor::bind(const entity_addrvec_t &bind_addrs, const set& avoid_ports, - entity_addr_t* bound_addr) + entity_addrvec_t* bound_addrs) { const md_config_t *conf = msgr->cct->_conf; - // bind to a socket - ldout(msgr->cct, 10) << __func__ << dendl; - - int family; - switch (bind_addr.get_family()) { - case AF_INET: - case AF_INET6: - family = bind_addr.get_family(); - break; - - default: - // bind_addr is empty - family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET; - } + // bind to socket(s) + ldout(msgr->cct, 10) << __func__ << " " << bind_addrs << dendl; SocketOptions opts; opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay; opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf; - // use whatever user specified (if anything) - entity_addr_t listen_addr = bind_addr; - if (listen_addr.get_type() == entity_addr_t::TYPE_NONE) { - listen_addr.set_type(entity_addr_t::TYPE_LEGACY); - } - listen_addr.set_family(family); + listen_sockets.resize(bind_addrs.v.size()); + *bound_addrs = bind_addrs; - /* bind to port */ - int r = -1; - listen_sockets.resize(1); + for (unsigned k = 0; k < bind_addrs.v.size(); ++k) { + auto& listen_addr = bound_addrs->v[k]; - for (int i = 0; i < conf->ms_bind_retry_count; i++) { - if (i > 0) { - lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in " - << conf->ms_bind_retry_delay << " seconds " << dendl; - sleep(conf->ms_bind_retry_delay); - } + /* bind to port */ + int r = -1; - if (listen_addr.get_port()) { - worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() { - r = worker->listen(listen_addr, opts, &listen_sockets[0]); - }, false); - if (r < 0) { - lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr - << ": " << cpp_strerror(r) << dendl; - continue; + for (int i = 0; i < conf->ms_bind_retry_count; i++) { + if (i > 0) { + lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in " + << conf->ms_bind_retry_delay << " seconds " << dendl; + sleep(conf->ms_bind_retry_delay); } - } else { - // try a range of ports - for (int port = msgr->cct->_conf->ms_bind_port_min; port <= msgr->cct->_conf->ms_bind_port_max; port++) { - if (avoid_ports.count(port)) - continue; - - listen_addr.set_port(port); - worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() { - r = worker->listen(listen_addr, opts, &listen_sockets[0]); - }, false); - if (r == 0) - break; + + if (listen_addr.get_port()) { + worker->center.submit_to(worker->center.get_id(), + [this, k, &listen_addr, &opts, &r]() { + r = worker->listen(listen_addr, opts, &listen_sockets[k]); + }, false); + if (r < 0) { + lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr + << ": " << cpp_strerror(r) << dendl; + continue; + } + } else { + // try a range of ports + for (int port = msgr->cct->_conf->ms_bind_port_min; + port <= msgr->cct->_conf->ms_bind_port_max; + port++) { + if (avoid_ports.count(port)) + continue; + + listen_addr.set_port(port); + worker->center.submit_to(worker->center.get_id(), + [this, k, &listen_addr, &opts, &r]() { + r = worker->listen(listen_addr, opts, &listen_sockets[k]); + }, false); + if (r == 0) + break; + } + if (r < 0) { + lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr + << " on any port in range " + << msgr->cct->_conf->ms_bind_port_min + << "-" << msgr->cct->_conf->ms_bind_port_max << ": " + << cpp_strerror(r) << dendl; + listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again. + continue; + } + ldout(msgr->cct, 10) << __func__ << " bound on random port " + << listen_addr << dendl; } - if (r < 0) { - lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr - << " on any port in range " << msgr->cct->_conf->ms_bind_port_min - << "-" << msgr->cct->_conf->ms_bind_port_max << ": " - << cpp_strerror(r) << dendl; - listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again. - continue; + if (r == 0) { + break; } - ldout(msgr->cct, 10) << __func__ << " bound on random port " << listen_addr << dendl; } - if (r == 0) - break; - } - // It seems that binding completely failed, return with that exit status - if (r < 0) { - lderr(msgr->cct) << __func__ << " was unable to bind after " << conf->ms_bind_retry_count - << " attempts: " << cpp_strerror(r) << dendl; - return r; + + // It seems that binding completely failed, return with that exit status + if (r < 0) { + lderr(msgr->cct) << __func__ << " was unable to bind after " + << conf->ms_bind_retry_count + << " attempts: " << cpp_strerror(r) << dendl; + for (unsigned j = 0; j < k; ++j) { + // clean up previous bind + listen_sockets[j].abort_accept(); + } + return r; + } } - ldout(msgr->cct, 10) << __func__ << " bound to " << listen_addr << dendl; - *bound_addr = listen_addr; + ldout(msgr->cct, 10) << __func__ << " bound to " << *bound_addrs << dendl; return 0; } @@ -305,7 +304,7 @@ void AsyncMessenger::ready() stack->ready(); if (pending_bind) { - int err = bind(pending_bind_addr); + int err = bindv(pending_bind_addrs); if (err) { lderr(cct) << __func__ << " postponed bind failed" << dendl; ceph_abort(); @@ -337,8 +336,24 @@ int AsyncMessenger::shutdown() return 0; } - int AsyncMessenger::bind(const entity_addr_t &bind_addr) +{ + ldout(cct,10) << __func__ << " " << bind_addr << dendl; + // old bind() can take entity_addr_t(). new bindv() can take a + // 0.0.0.0-like address but needs type and family to be set. + auto a = bind_addr; + if (a == entity_addr_t()) { + a.set_type(entity_addr_t::TYPE_LEGACY); + if (cct->_conf->ms_bind_ipv6) { + a.set_family(AF_INET6); + } else { + a.set_family(AF_INET); + } + } + return bindv(entity_addrvec_t(a)); +} + +int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs) { lock.Lock(); @@ -348,11 +363,11 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr) return -1; } - ldout(cct,10) << __func__ << " bind " << bind_addr << dendl; + ldout(cct,10) << __func__ << " " << bind_addrs << dendl; if (!stack->is_ready()) { ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl; - pending_bind_addr = bind_addr; + pending_bind_addrs = bind_addrs; pending_bind = true; lock.Unlock(); return 0; @@ -362,10 +377,10 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr) // bind to a socket set avoid_ports; - entity_addr_t bound_addr; + entity_addrvec_t bound_addrs; unsigned i = 0; for (auto &&p : processors) { - int r = p->bind(bind_addr, avoid_ports, &bound_addr); + int r = p->bind(bind_addrs, avoid_ports, &bound_addrs); if (r) { // Note: this is related to local tcp listen table problem. // Posix(default kernel implementation) backend shares listen table @@ -381,7 +396,7 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr) } ++i; } - _finish_bind(bind_addr, bound_addr); + _finish_bind(bind_addrs, bound_addrs); return 0; } @@ -397,25 +412,27 @@ int AsyncMessenger::rebind(const set& avoid_ports) // adjust the nonce; we want our entity_addr_t to be truly unique. nonce += 1000000; ldout(cct, 10) << __func__ << " new nonce " << nonce - << " and addr " << get_myaddr() << dendl; + << " and addr " << get_myaddrs() << dendl; - entity_addr_t bound_addr; - entity_addr_t bind_addr = get_myaddr(); - bind_addr.set_port(0); + entity_addrvec_t bound_addrs; + entity_addrvec_t bind_addrs = get_myaddrs(); set new_avoid(avoid_ports); - new_avoid.insert(bind_addr.get_port()); - ldout(cct, 10) << __func__ << " will try " << bind_addr + for (auto& a : bind_addrs.v) { + new_avoid.insert(a.get_port()); + a.set_port(0); + } + ldout(cct, 10) << __func__ << " will try " << bind_addrs << " and avoid ports " << new_avoid << dendl; unsigned i = 0; for (auto &&p : processors) { - int r = p->bind(bind_addr, avoid_ports, &bound_addr); + int r = p->bind(bind_addrs, avoid_ports, &bound_addrs); if (r) { assert(i == 0); return r; } ++i; } - _finish_bind(bind_addr, bound_addr); + _finish_bind(bind_addrs, bound_addrs); for (auto &&p : processors) { p->start(); } @@ -441,15 +458,18 @@ int AsyncMessenger::client_bind(const entity_addr_t &bind_addr) return 0; } -void AsyncMessenger::_finish_bind(const entity_addr_t& bind_addr, - const entity_addr_t& listen_addr) +void AsyncMessenger::_finish_bind(const entity_addrvec_t& bind_addrs, + const entity_addrvec_t& listen_addrs) { - set_myaddrs(entity_addrvec_t(bind_addr)); - if (bind_addr != entity_addr_t()) - learned_addr(bind_addr); + set_myaddrs(bind_addrs); + for (auto& a : bind_addrs.v) { + if (!a.is_blank_ip()) { + learned_addr(a); + } + } - if (get_myaddr().get_port() == 0) { - set_myaddrs(entity_addrvec_t(listen_addr)); + if (get_myaddrs().front().get_port() == 0) { + set_myaddrs(listen_addrs); } for (auto& a : my_addrs.v) { a.set_nonce(nonce); diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 0d4301e8942..16b9b9b4ba6 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -57,9 +57,9 @@ class Processor { ~Processor() { delete listen_handler; }; void stop(); - int bind(const entity_addr_t &bind_addr, + int bind(const entity_addrvec_t &bind_addrs, const set& avoid_ports, - entity_addr_t* bound_addr); + entity_addrvec_t* bound_addrs); void start(); void accept(); }; @@ -119,6 +119,8 @@ public: int rebind(const set& avoid_ports) override; int client_bind(const entity_addr_t& bind_addr) override; + int bindv(const entity_addrvec_t& bind_addrs) override; + /** @} Configuration functions */ /** @@ -211,8 +213,8 @@ private: const entity_addr_t& dest_addr, int dest_type); int _send_message(Message *m, const entity_inst_t& dest); - void _finish_bind(const entity_addr_t& bind_addr, - const entity_addr_t& listen_addr); + void _finish_bind(const entity_addrvec_t& bind_addrs, + const entity_addrvec_t& listen_addrs); private: static const uint64_t ReapDeadConnectionThreshold = 5; @@ -238,10 +240,10 @@ private: bool need_addr; /** - * set to bind address if bind was called before NetworkStack was ready to + * set to bind addresses if bind was called before NetworkStack was ready to * bind */ - entity_addr_t pending_bind_addr; + entity_addrvec_t pending_bind_addrs; /** * false; set to true if a pending bind exists -- 2.39.5