]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: bind to multiple addresses
authorSage Weil <sage@redhat.com>
Mon, 4 Jun 2018 21:36:46 +0000 (16:36 -0500)
committerSage Weil <sage@redhat.com>
Tue, 3 Jul 2018 18:01:23 +0000 (13:01 -0500)
Signed-off-by: Sage Weil <sage@redhat.com>
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h

index 352b907d26e503a3ff78ea3ac85bb2378f065ad9..a8407e18315f77d52b1f1d97882c8689b3800ad7 100644 (file)
@@ -59,92 +59,91 @@ Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c)
   : msgr(r), net(c), worker(w),
     listen_handler(new C_processor_accept(this)) {}
 
-int Processor::bind(const entity_addr_t &bind_addr,
+int Processor::bind(const entity_addrvec_t &bind_addrs,
                    const set<int>& avoid_ports,
-                   entity_addr_t* bound_addr)
+                   entity_addrvec_t* bound_addrs)
 {
   const md_config_t *conf = msgr->cct->_conf;
-  // bind to a socket
-  ldout(msgr->cct, 10) << __func__ << dendl;
-
-  int family;
-  switch (bind_addr.get_family()) {
-    case AF_INET:
-    case AF_INET6:
-      family = bind_addr.get_family();
-      break;
-
-    default:
-      // bind_addr is empty
-      family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET;
-  }
+  // bind to socket(s)
+  ldout(msgr->cct, 10) << __func__ << " " << bind_addrs << dendl;
 
   SocketOptions opts;
   opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
   opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
 
-  // use whatever user specified (if anything)
-  entity_addr_t listen_addr = bind_addr;
-  if (listen_addr.get_type() == entity_addr_t::TYPE_NONE) {
-    listen_addr.set_type(entity_addr_t::TYPE_LEGACY);
-  }
-  listen_addr.set_family(family);
+  listen_sockets.resize(bind_addrs.v.size());
+  *bound_addrs = bind_addrs;
 
-  /* bind to port */
-  int r = -1;
-  listen_sockets.resize(1);
+  for (unsigned k = 0; k < bind_addrs.v.size(); ++k) {
+    auto& listen_addr = bound_addrs->v[k];
 
-  for (int i = 0; i < conf->ms_bind_retry_count; i++) {
-    if (i > 0) {
-      lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in "
-                       << conf->ms_bind_retry_delay << " seconds " << dendl;
-      sleep(conf->ms_bind_retry_delay);
-    }
+    /* bind to port */
+    int r = -1;
 
-    if (listen_addr.get_port()) {
-      worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() {
-        r = worker->listen(listen_addr, opts, &listen_sockets[0]);
-      }, false);
-      if (r < 0) {
-        lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
-                         << ": " << cpp_strerror(r) << dendl;
-        continue;
+    for (int i = 0; i < conf->ms_bind_retry_count; i++) {
+      if (i > 0) {
+       lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in "
+                        << conf->ms_bind_retry_delay << " seconds " << dendl;
+       sleep(conf->ms_bind_retry_delay);
       }
-    } else {
-      // try a range of ports
-      for (int port = msgr->cct->_conf->ms_bind_port_min; port <= msgr->cct->_conf->ms_bind_port_max; port++) {
-        if (avoid_ports.count(port))
-          continue;
-
-        listen_addr.set_port(port);
-        worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() {
-          r = worker->listen(listen_addr, opts, &listen_sockets[0]);
-        }, false);
-        if (r == 0)
-          break;
+
+      if (listen_addr.get_port()) {
+       worker->center.submit_to(worker->center.get_id(),
+                                [this, k, &listen_addr, &opts, &r]() {
+           r = worker->listen(listen_addr, opts, &listen_sockets[k]);
+         }, false);
+       if (r < 0) {
+         lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
+                          << ": " << cpp_strerror(r) << dendl;
+         continue;
+       }
+      } else {
+       // try a range of ports
+       for (int port = msgr->cct->_conf->ms_bind_port_min;
+            port <= msgr->cct->_conf->ms_bind_port_max;
+            port++) {
+         if (avoid_ports.count(port))
+           continue;
+
+         listen_addr.set_port(port);
+         worker->center.submit_to(worker->center.get_id(),
+                                  [this, k, &listen_addr, &opts, &r]() {
+             r = worker->listen(listen_addr, opts, &listen_sockets[k]);
+           }, false);
+         if (r == 0)
+           break;
+       }
+       if (r < 0) {
+         lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
+                          << " on any port in range "
+                          << msgr->cct->_conf->ms_bind_port_min
+                          << "-" << msgr->cct->_conf->ms_bind_port_max << ": "
+                          << cpp_strerror(r) << dendl;
+         listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again.
+         continue;
+       }
+       ldout(msgr->cct, 10) << __func__ << " bound on random port "
+                            << listen_addr << dendl;
       }
-      if (r < 0) {
-        lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
-                         << " on any port in range " << msgr->cct->_conf->ms_bind_port_min
-                         << "-" << msgr->cct->_conf->ms_bind_port_max << ": "
-                         << cpp_strerror(r) << dendl;
-        listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again.
-        continue;
+      if (r == 0) {
+       break;
       }
-      ldout(msgr->cct, 10) << __func__ << " bound on random port " << listen_addr << dendl;
     }
-    if (r == 0)
-      break;
-  }
-  // It seems that binding completely failed, return with that exit status
-  if (r < 0) {
-    lderr(msgr->cct) << __func__ << " was unable to bind after " << conf->ms_bind_retry_count
-                     << " attempts: " << cpp_strerror(r) << dendl;
-    return r;
+
+    // It seems that binding completely failed, return with that exit status
+    if (r < 0) {
+      lderr(msgr->cct) << __func__ << " was unable to bind after "
+                      << conf->ms_bind_retry_count
+                      << " attempts: " << cpp_strerror(r) << dendl;
+      for (unsigned j = 0; j < k; ++j) {
+       // clean up previous bind
+       listen_sockets[j].abort_accept();
+      }
+      return r;
+    }
   }
 
-  ldout(msgr->cct, 10) << __func__ << " bound to " << listen_addr << dendl;
-  *bound_addr = listen_addr;
+  ldout(msgr->cct, 10) << __func__ << " bound to " << *bound_addrs << dendl;
   return 0;
 }
 
@@ -305,7 +304,7 @@ void AsyncMessenger::ready()
 
   stack->ready();
   if (pending_bind) {
-    int err = bind(pending_bind_addr);
+    int err = bindv(pending_bind_addrs);
     if (err) {
       lderr(cct) << __func__ << " postponed bind failed" << dendl;
       ceph_abort();
@@ -337,8 +336,24 @@ int AsyncMessenger::shutdown()
   return 0;
 }
 
-
 int AsyncMessenger::bind(const entity_addr_t &bind_addr)
+{
+  ldout(cct,10) << __func__ << " " << bind_addr << dendl;
+  // old bind() can take entity_addr_t(). new bindv() can take a
+  // 0.0.0.0-like address but needs type and family to be set.
+  auto a = bind_addr;
+  if (a == entity_addr_t()) {
+    a.set_type(entity_addr_t::TYPE_LEGACY);
+    if (cct->_conf->ms_bind_ipv6) {
+      a.set_family(AF_INET6);
+    } else {
+      a.set_family(AF_INET);
+    }
+  }
+  return bindv(entity_addrvec_t(a));
+}
+
+int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
 {
   lock.Lock();
 
@@ -348,11 +363,11 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr)
     return -1;
   }
 
-  ldout(cct,10) << __func__ << " bind " << bind_addr << dendl;
+  ldout(cct,10) << __func__ << " " << bind_addrs << dendl;
 
   if (!stack->is_ready()) {
     ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl;
-    pending_bind_addr = bind_addr;
+    pending_bind_addrs = bind_addrs;
     pending_bind = true;
     lock.Unlock();
     return 0;
@@ -362,10 +377,10 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr)
 
   // bind to a socket
   set<int> avoid_ports;
-  entity_addr_t bound_addr;
+  entity_addrvec_t bound_addrs;
   unsigned i = 0;
   for (auto &&p : processors) {
-    int r = p->bind(bind_addr, avoid_ports, &bound_addr);
+    int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
     if (r) {
       // Note: this is related to local tcp listen table problem.
       // Posix(default kernel implementation) backend shares listen table
@@ -381,7 +396,7 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr)
     }
     ++i;
   }
-  _finish_bind(bind_addr, bound_addr);
+  _finish_bind(bind_addrs, bound_addrs);
   return 0;
 }
 
@@ -397,25 +412,27 @@ int AsyncMessenger::rebind(const set<int>& avoid_ports)
   // adjust the nonce; we want our entity_addr_t to be truly unique.
   nonce += 1000000;
   ldout(cct, 10) << __func__ << " new nonce " << nonce
-                << " and addr " << get_myaddr() << dendl;
+                << " and addr " << get_myaddrs() << dendl;
 
-  entity_addr_t bound_addr;
-  entity_addr_t bind_addr = get_myaddr();
-  bind_addr.set_port(0);
+  entity_addrvec_t bound_addrs;
+  entity_addrvec_t bind_addrs = get_myaddrs();
   set<int> new_avoid(avoid_ports);
-  new_avoid.insert(bind_addr.get_port());
-  ldout(cct, 10) << __func__ << " will try " << bind_addr
+  for (auto& a : bind_addrs.v) {
+    new_avoid.insert(a.get_port());
+    a.set_port(0);
+  }
+  ldout(cct, 10) << __func__ << " will try " << bind_addrs
                 << " and avoid ports " << new_avoid << dendl;
   unsigned i = 0;
   for (auto &&p : processors) {
-    int r = p->bind(bind_addr, avoid_ports, &bound_addr);
+    int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
     if (r) {
       assert(i == 0);
       return r;
     }
     ++i;
   }
-  _finish_bind(bind_addr, bound_addr);
+  _finish_bind(bind_addrs, bound_addrs);
   for (auto &&p : processors) {
     p->start();
   }
@@ -441,15 +458,18 @@ int AsyncMessenger::client_bind(const entity_addr_t &bind_addr)
   return 0;
 }
 
-void AsyncMessenger::_finish_bind(const entity_addr_t& bind_addr,
-                                 const entity_addr_t& listen_addr)
+void AsyncMessenger::_finish_bind(const entity_addrvec_t& bind_addrs,
+                                 const entity_addrvec_t& listen_addrs)
 {
-  set_myaddrs(entity_addrvec_t(bind_addr));
-  if (bind_addr != entity_addr_t())
-    learned_addr(bind_addr);
+  set_myaddrs(bind_addrs);
+  for (auto& a : bind_addrs.v) {
+    if (!a.is_blank_ip()) {
+      learned_addr(a);
+    }
+  }
 
-  if (get_myaddr().get_port() == 0) {
-    set_myaddrs(entity_addrvec_t(listen_addr));
+  if (get_myaddrs().front().get_port() == 0) {
+    set_myaddrs(listen_addrs);
   }
   for (auto& a : my_addrs.v) {
     a.set_nonce(nonce);
index 0d4301e8942822500bffa3e2e6a6b08d1714c401..16b9b9b4ba68b9cc68be2ced80eb634691312fd3 100644 (file)
@@ -57,9 +57,9 @@ class Processor {
   ~Processor() { delete listen_handler; };
 
   void stop();
-  int bind(const entity_addr_t &bind_addr,
+  int bind(const entity_addrvec_t &bind_addrs,
           const set<int>& avoid_ports,
-          entity_addr_t* bound_addr);
+          entity_addrvec_t* bound_addrs);
   void start();
   void accept();
 };
@@ -119,6 +119,8 @@ public:
   int rebind(const set<int>& avoid_ports) override;
   int client_bind(const entity_addr_t& bind_addr) override;
 
+  int bindv(const entity_addrvec_t& bind_addrs) override;
+
   /** @} Configuration functions */
 
   /**
@@ -211,8 +213,8 @@ private:
                       const entity_addr_t& dest_addr, int dest_type);
 
   int _send_message(Message *m, const entity_inst_t& dest);
-  void _finish_bind(const entity_addr_t& bind_addr,
-                   const entity_addr_t& listen_addr);
+  void _finish_bind(const entity_addrvec_t& bind_addrs,
+                   const entity_addrvec_t& listen_addrs);
 
  private:
   static const uint64_t ReapDeadConnectionThreshold = 5;
@@ -238,10 +240,10 @@ private:
   bool need_addr;
 
   /**
-   * set to bind address if bind was called before NetworkStack was ready to
+   * set to bind addresses if bind was called before NetworkStack was ready to
    * bind
    */
-  entity_addr_t pending_bind_addr;
+  entity_addrvec_t pending_bind_addrs;
 
   /**
    * false; set to true if a pending bind exists