]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/simple/Accepter.cc: replace shutdown() with selfpipe event in poll() (FreeBSD)
authorWillem Jan Withagen <wjw@digiware.nl>
Tue, 13 Sep 2016 09:56:47 +0000 (11:56 +0200)
committerWillem Jan Withagen <wjw@digiware.nl>
Thu, 22 Sep 2016 17:33:58 +0000 (19:33 +0200)
In FreeBSD a shutdown on the socket here can generate:
accepter.stop shutdown failed: errno 57 (57) Socket is not connected
But that does not trigger an event with the poll on the socket.

Closing the socket does.

Signed-off-by: Willem Jan Withagen <wjw@digiware.nl>
src/msg/simple/Accepter.cc
src/msg/simple/Accepter.h

index 8aef0987382fa875df326b81541db6f4b1c68be6..77b90f3ee9499d5325b42607f5e76f9f96915553 100644 (file)
@@ -12,6 +12,7 @@
  * 
  */
 
+#include "include/compat.h"
 #include <sys/socket.h>
 #include <netinet/tcp.h>
 #include <sys/uio.h>
@@ -27,6 +28,7 @@
 
 #include "common/debug.h"
 #include "common/errno.h"
+#include "common/safe_io.h"
 
 #define dout_subsys ceph_subsys_ms
 
@@ -50,6 +52,19 @@ static int set_close_on_exec(int fd)
   return 0;
 }
 
+int Accepter::create_selfpipe(int *pipe_rd, int *pipe_wr) {
+  int selfpipe[2];
+  int ret = ::pipe2(selfpipe, (O_CLOEXEC|O_NONBLOCK));
+  if (ret < 0 ) {
+    lderr(msgr->cct) << __func__ << " unable to create the selfpipe: "
+                    << cpp_strerror(errno) << dendl;
+    return -errno;
+  }
+  *pipe_rd = selfpipe[0];
+  *pipe_wr = selfpipe[1];
+  return 0;
+}
+
 int Accepter::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
 {
   const md_config_t *conf = msgr->cct->_conf;
@@ -170,7 +185,8 @@ int Accepter::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
   
   if (msgr->cct->_conf->ms_tcp_rcvbuf) {
     int size = msgr->cct->_conf->ms_tcp_rcvbuf;
-    rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size));
+    rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_RCVBUF, 
+                       (void*)&size, sizeof(size));
     if (rc < 0)  {
       rc = -errno;
       lderr(msgr->cct) << "accepter.bind failed to set SO_RCVBUF to " << size
@@ -209,7 +225,14 @@ int Accepter::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
 
   msgr->init_local_connection();
 
-  ldout(msgr->cct,1) << "accepter.bind my_inst.addr is " << msgr->get_myaddr()
+  rc = create_selfpipe(&shutdown_rd_fd, &shutdown_wr_fd);
+  if (rc < 0) {
+    lderr(msgr->cct) <<  __func__ << " unable to create signalling pipe " << listen_addr
+                    << ": " << cpp_strerror(rc) << dendl;
+    return rc;
+  }
+
+  ldout(msgr->cct,1) <<  __func__ << " my_inst.addr is " << msgr->get_myaddr()
                     << " need_addr=" << msgr->get_need_addr() << dendl;
   return 0;
 }
@@ -250,21 +273,45 @@ void *Accepter::entry()
   ldout(msgr->cct,10) << "accepter starting" << dendl;
   
   int errors = 0;
+  int ch;
 
-  struct pollfd pfd;
-  pfd.fd = listen_sd;
-  pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+  struct pollfd pfd[2];
+  memset(pfd, 0, sizeof(pfd));
+
+  pfd[0].fd = listen_sd;
+  pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+  pfd[1].fd = shutdown_rd_fd;
+  pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
   while (!done) {
-    ldout(msgr->cct,20) << "accepter calling poll" << dendl;
-    int r = poll(&pfd, 1, -1);
-    if (r < 0)
+    ldout(msgr->cct,20) << __func__ << " calling poll for sd:" << listen_sd << dendl;
+    int r = poll(pfd, 2, -1);
+    if (r < 0) {
+      if (errno == EINTR) {
+        continue;
+      }
+      ldout(msgr->cct,1) << __func__ << " poll got error"  
+                         << " errno " << errno << " " << cpp_strerror(errno) << dendl;
       break;
-    ldout(msgr->cct,20) << "accepter poll got " << r << dendl;
+    }
+    ldout(msgr->cct,10) << __func__ << " poll returned oke: " << r << dendl;
+    ldout(msgr->cct,20) << __func__ <<  " pfd.revents[0]=" << pfd[0].revents << dendl;
+    ldout(msgr->cct,20) << __func__ <<  " pfd.revents[1]=" << pfd[1].revents << dendl;
 
-    if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP))
+    if (pfd[0].revents & (POLLERR | POLLNVAL | POLLHUP)) {
+      ldout(msgr->cct,1) << __func__ << " poll got errors in revents "  
+                        <<  pfd[0].revents << dendl;
       break;
-
-    ldout(msgr->cct,10) << "pfd.revents=" << pfd.revents << dendl;
+    }
+    if (pfd[1].revents & (POLLIN | POLLERR | POLLNVAL | POLLHUP)) {
+      // We got "signaled" to exit the poll
+      // clean the selfpipe
+      if (::read(shutdown_rd_fd, &ch, 1) == -1) {
+        if (errno != EAGAIN)
+          ldout(msgr->cct,1) << __func__ << " Cannot read selfpipe: "
+                             << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+        }
+      break;
+    }
     if (done) break;
 
     // accept
@@ -289,11 +336,12 @@ void *Accepter::entry()
     }
   }
 
-  ldout(msgr->cct,20) << "accepter closing" << dendl;
-  // don't close socket, in case we start up again?  blech.
-  if (listen_sd >= 0) {
-    ::close(listen_sd);
-    listen_sd = -1;
+  ldout(msgr->cct,20) << __func__ << " closing" << dendl;
+  // socket is closed right after the thread has joined.
+  // closing it here might race
+  if (shutdown_rd_fd >= 0) {
+    ::close(shutdown_rd_fd);
+    shutdown_rd_fd = -1;
   }
   ldout(msgr->cct,10) << "accepter stopping" << dendl;
   return 0;
@@ -304,20 +352,42 @@ void Accepter::stop()
   done = true;
   ldout(msgr->cct,10) << "stop accepter" << dendl;
 
-  if (listen_sd >= 0) {
-    ::shutdown(listen_sd, SHUT_RDWR);
+  if (shutdown_wr_fd < 0)
+    return;
+
+  // Send a byte to the shutdown pipe that the thread is listening to
+  char buf[1] = { 0x0 };
+  int ret = safe_write(shutdown_wr_fd, buf, 1);
+  if (ret < 0) {
+    ldout(msgr->cct,1) << __func__ << "close failed: "
+             << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+  } else {
+    ldout(msgr->cct,15) << __func__ << " signaled poll" << dendl;
   }
+  VOID_TEMP_FAILURE_RETRY(close(shutdown_wr_fd));
+  shutdown_wr_fd = -1;
 
   // wait for thread to stop before closing the socket, to avoid
   // racing against fd re-use.
   if (is_started()) {
+    ldout(msgr->cct,5) << __func__ << " wait for thread to join." << dendl;
     join();
   }
 
   if (listen_sd >= 0) {
-    ::close(listen_sd);
+    if (::close(listen_sd) < 0) {
+      ldout(msgr->cct,1) << __func__ << "close listen_sd failed: "
+             << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+    }
     listen_sd = -1;
   }
+  if (shutdown_rd_fd >= 0) {
+    if (::close(shutdown_rd_fd) < 0) {
+      ldout(msgr->cct,1) << __func__ << "close shutdown_rd_fd failed: "
+             << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+    }
+    shutdown_rd_fd = -1;
+  }
   done = false;
 }
 
index f1be9c00f2b6ed0cd5b501c94cdb0c45269452b5..7824c3a16f081823f4d0428a15db2e363169f157 100644 (file)
@@ -29,9 +29,15 @@ class Accepter : public Thread {
   bool done;
   int listen_sd;
   uint64_t nonce;
+  int shutdown_rd_fd;
+  int shutdown_wr_fd;
+  int create_selfpipe(int *pipe_rd, int *pipe_wr);
 
 public:
-  Accepter(SimpleMessenger *r, uint64_t n) : msgr(r), done(false), listen_sd(-1), nonce(n) {}
+  Accepter(SimpleMessenger *r, uint64_t n) 
+    : msgr(r), done(false), listen_sd(-1), nonce(n),
+      shutdown_rd_fd(-1), shutdown_wr_fd(-1)
+    {}
     
   void *entry() override;
   void stop();