]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: multiple listening ServerSockets
authorSage Weil <sage@redhat.com>
Mon, 4 Jun 2018 20:00:57 +0000 (15:00 -0500)
committerSage Weil <sage@redhat.com>
Tue, 3 Jul 2018 18:01:23 +0000 (13:01 -0500)
No functional change yet.

Signed-off-by: Sage Weil <sage@redhat.com>
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h

index f34e66357ab334af93b39950685d55a3eb3b22cf..352b907d26e503a3ff78ea3ac85bb2378f065ad9 100644 (file)
@@ -92,6 +92,7 @@ int Processor::bind(const entity_addr_t &bind_addr,
 
   /* bind to port */
   int r = -1;
+  listen_sockets.resize(1);
 
   for (int i = 0; i < conf->ms_bind_retry_count; i++) {
     if (i > 0) {
@@ -102,7 +103,7 @@ int Processor::bind(const entity_addr_t &bind_addr,
 
     if (listen_addr.get_port()) {
       worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() {
-        r = worker->listen(listen_addr, opts, &listen_socket);
+        r = worker->listen(listen_addr, opts, &listen_sockets[0]);
       }, false);
       if (r < 0) {
         lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
@@ -117,7 +118,7 @@ int Processor::bind(const entity_addr_t &bind_addr,
 
         listen_addr.set_port(port);
         worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() {
-          r = worker->listen(listen_addr, opts, &listen_socket);
+          r = worker->listen(listen_addr, opts, &listen_sockets[0]);
         }, false);
         if (r == 0)
           break;
@@ -152,50 +153,58 @@ void Processor::start()
   ldout(msgr->cct, 1) << __func__ << dendl;
 
   // start thread
-  if (listen_socket) {
-    worker->center.submit_to(worker->center.get_id(), [this]() {
-      worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler); }, false);
-  }
+  worker->center.submit_to(worker->center.get_id(), [this]() {
+      for (auto& l : listen_sockets) {
+       if (l) {
+         worker->center.create_file_event(l.fd(), EVENT_READABLE,
+                                          listen_handler); }
+      }
+    }, false);
 }
 
 void Processor::accept()
 {
-  ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd() << dendl;
   SocketOptions opts;
   opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
   opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
   opts.priority = msgr->get_socket_priority();
-  while (true) {
-    entity_addr_t addr;
-    ConnectedSocket cli_socket;
-    Worker *w = worker;
-    if (!msgr->get_stack()->support_local_listen_table())
-      w = msgr->get_stack()->get_worker();
-    else
-      ++w->references;
-    int r = listen_socket.accept(&cli_socket, opts, &addr, w);
-    if (r == 0) {
-      ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << cli_socket.fd() << dendl;
-
-      msgr->add_accept(w, std::move(cli_socket), addr);
-      continue;
-    } else {
-      if (r == -EINTR) {
-        continue;
-      } else if (r == -EAGAIN) {
-        break;
-      } else if (r == -EMFILE || r == -ENFILE) {
-        lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd()
-                         << " errno " << r << " " << cpp_strerror(r) << dendl;
-        break;
-      } else if (r == -ECONNABORTED) {
-        ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd()
-                            << " errno " << r << " " << cpp_strerror(r) << dendl;
-        continue;
+
+  for (auto& listen_socket : listen_sockets) {
+    ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd()
+                        << dendl;
+    while (true) {
+      entity_addr_t addr;
+      ConnectedSocket cli_socket;
+      Worker *w = worker;
+      if (!msgr->get_stack()->support_local_listen_table())
+       w = msgr->get_stack()->get_worker();
+      else
+       ++w->references;
+      int r = listen_socket.accept(&cli_socket, opts, &addr, w);
+      if (r == 0) {
+       ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd "
+                            << cli_socket.fd() << dendl;
+
+       msgr->add_accept(w, std::move(cli_socket), addr);
+       continue;
       } else {
-        lderr(msgr->cct) << __func__ << " no incoming connection?"
-                         << " errno " << r << " " << cpp_strerror(r) << dendl;
-        break;
+       if (r == -EINTR) {
+         continue;
+       } else if (r == -EAGAIN) {
+         break;
+       } else if (r == -EMFILE || r == -ENFILE) {
+         lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd()
+                          << " errno " << r << " " << cpp_strerror(r) << dendl;
+         break;
+       } else if (r == -ECONNABORTED) {
+         ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd()
+                             << " errno " << r << " " << cpp_strerror(r) << dendl;
+         continue;
+       } else {
+         lderr(msgr->cct) << __func__ << " no incoming connection?"
+                          << " errno " << r << " " << cpp_strerror(r) << dendl;
+         break;
+       }
       }
     }
   }
@@ -205,12 +214,14 @@ void Processor::stop()
 {
   ldout(msgr->cct,10) << __func__ << dendl;
 
-  if (listen_socket) {
-    worker->center.submit_to(worker->center.get_id(), [this]() {
-      worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE);
-      listen_socket.abort_accept();
+  worker->center.submit_to(worker->center.get_id(), [this]() {
+      for (auto& listen_socket : listen_sockets) {
+       if (listen_socket) {
+         worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE);
+         listen_socket.abort_accept();
+       }
+      }
     }, false);
-  }
 }
 
 
index 698d5e85f0380a40b8ae401a0d5a8f3c5a36f46f..0d4301e8942822500bffa3e2e6a6b08d1714c401 100644 (file)
@@ -47,7 +47,7 @@ class Processor {
   AsyncMessenger *msgr;
   NetHandler net;
   Worker *worker;
-  ServerSocket listen_socket;
+  vector<ServerSocket> listen_sockets;
   EventCallbackRef listen_handler;
 
   class C_processor_accept;