From: Willem Jan Withagen Date: Tue, 13 Sep 2016 09:56:47 +0000 (+0200) Subject: msg/simple/Accepter.cc: replace shutdown() with selfpipe event in poll() (FreeBSD) X-Git-Tag: v11.1.0~694^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fff44962d6c25ccca035ead48756672051d8c1b2;p=ceph.git msg/simple/Accepter.cc: replace shutdown() with selfpipe event in poll() (FreeBSD) In FreeBSD a shutdown on the socket here can generate: accepter.stop shutdown failed: errno 57 (57) Socket is not connected But that does not trigger an event with the poll on the socket. Closing the socket does. Signed-off-by: Willem Jan Withagen --- diff --git a/src/msg/simple/Accepter.cc b/src/msg/simple/Accepter.cc index 8aef0987382f..77b90f3ee949 100644 --- a/src/msg/simple/Accepter.cc +++ b/src/msg/simple/Accepter.cc @@ -12,6 +12,7 @@ * */ +#include "include/compat.h" #include #include #include @@ -27,6 +28,7 @@ #include "common/debug.h" #include "common/errno.h" +#include "common/safe_io.h" #define dout_subsys ceph_subsys_ms @@ -50,6 +52,19 @@ static int set_close_on_exec(int fd) return 0; } +int Accepter::create_selfpipe(int *pipe_rd, int *pipe_wr) { + int selfpipe[2]; + int ret = ::pipe2(selfpipe, (O_CLOEXEC|O_NONBLOCK)); + if (ret < 0 ) { + lderr(msgr->cct) << __func__ << " unable to create the selfpipe: " + << cpp_strerror(errno) << dendl; + return -errno; + } + *pipe_rd = selfpipe[0]; + *pipe_wr = selfpipe[1]; + return 0; +} + int Accepter::bind(const entity_addr_t &bind_addr, const set& avoid_ports) { const md_config_t *conf = msgr->cct->_conf; @@ -170,7 +185,8 @@ int Accepter::bind(const entity_addr_t &bind_addr, const set& avoid_ports) if (msgr->cct->_conf->ms_tcp_rcvbuf) { int size = msgr->cct->_conf->ms_tcp_rcvbuf; - rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size)); + rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_RCVBUF, + (void*)&size, sizeof(size)); if (rc < 0) { rc = -errno; lderr(msgr->cct) << "accepter.bind failed to set SO_RCVBUF to " << size @@ -209,7 +225,14 @@ int Accepter::bind(const entity_addr_t &bind_addr, const set& avoid_ports) msgr->init_local_connection(); - ldout(msgr->cct,1) << "accepter.bind my_inst.addr is " << msgr->get_myaddr() + rc = create_selfpipe(&shutdown_rd_fd, &shutdown_wr_fd); + if (rc < 0) { + lderr(msgr->cct) << __func__ << " unable to create signalling pipe " << listen_addr + << ": " << cpp_strerror(rc) << dendl; + return rc; + } + + ldout(msgr->cct,1) << __func__ << " my_inst.addr is " << msgr->get_myaddr() << " need_addr=" << msgr->get_need_addr() << dendl; return 0; } @@ -250,21 +273,45 @@ void *Accepter::entry() ldout(msgr->cct,10) << "accepter starting" << dendl; int errors = 0; + int ch; - struct pollfd pfd; - pfd.fd = listen_sd; - pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP; + struct pollfd pfd[2]; + memset(pfd, 0, sizeof(pfd)); + + pfd[0].fd = listen_sd; + pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; + pfd[1].fd = shutdown_rd_fd; + pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; while (!done) { - ldout(msgr->cct,20) << "accepter calling poll" << dendl; - int r = poll(&pfd, 1, -1); - if (r < 0) + ldout(msgr->cct,20) << __func__ << " calling poll for sd:" << listen_sd << dendl; + int r = poll(pfd, 2, -1); + if (r < 0) { + if (errno == EINTR) { + continue; + } + ldout(msgr->cct,1) << __func__ << " poll got error" + << " errno " << errno << " " << cpp_strerror(errno) << dendl; break; - ldout(msgr->cct,20) << "accepter poll got " << r << dendl; + } + ldout(msgr->cct,10) << __func__ << " poll returned oke: " << r << dendl; + ldout(msgr->cct,20) << __func__ << " pfd.revents[0]=" << pfd[0].revents << dendl; + ldout(msgr->cct,20) << __func__ << " pfd.revents[1]=" << pfd[1].revents << dendl; - if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP)) + if (pfd[0].revents & (POLLERR | POLLNVAL | POLLHUP)) { + ldout(msgr->cct,1) << __func__ << " poll got errors in revents " + << pfd[0].revents << dendl; break; - - ldout(msgr->cct,10) << "pfd.revents=" << pfd.revents << dendl; + } + if (pfd[1].revents & (POLLIN | POLLERR | POLLNVAL | POLLHUP)) { + // We got "signaled" to exit the poll + // clean the selfpipe + if (::read(shutdown_rd_fd, &ch, 1) == -1) { + if (errno != EAGAIN) + ldout(msgr->cct,1) << __func__ << " Cannot read selfpipe: " + << " errno " << errno << " " << cpp_strerror(errno) << dendl; + } + break; + } if (done) break; // accept @@ -289,11 +336,12 @@ void *Accepter::entry() } } - ldout(msgr->cct,20) << "accepter closing" << dendl; - // don't close socket, in case we start up again? blech. - if (listen_sd >= 0) { - ::close(listen_sd); - listen_sd = -1; + ldout(msgr->cct,20) << __func__ << " closing" << dendl; + // socket is closed right after the thread has joined. + // closing it here might race + if (shutdown_rd_fd >= 0) { + ::close(shutdown_rd_fd); + shutdown_rd_fd = -1; } ldout(msgr->cct,10) << "accepter stopping" << dendl; return 0; @@ -304,20 +352,42 @@ void Accepter::stop() done = true; ldout(msgr->cct,10) << "stop accepter" << dendl; - if (listen_sd >= 0) { - ::shutdown(listen_sd, SHUT_RDWR); + if (shutdown_wr_fd < 0) + return; + + // Send a byte to the shutdown pipe that the thread is listening to + char buf[1] = { 0x0 }; + int ret = safe_write(shutdown_wr_fd, buf, 1); + if (ret < 0) { + ldout(msgr->cct,1) << __func__ << "close failed: " + << " errno " << errno << " " << cpp_strerror(errno) << dendl; + } else { + ldout(msgr->cct,15) << __func__ << " signaled poll" << dendl; } + VOID_TEMP_FAILURE_RETRY(close(shutdown_wr_fd)); + shutdown_wr_fd = -1; // wait for thread to stop before closing the socket, to avoid // racing against fd re-use. if (is_started()) { + ldout(msgr->cct,5) << __func__ << " wait for thread to join." << dendl; join(); } if (listen_sd >= 0) { - ::close(listen_sd); + if (::close(listen_sd) < 0) { + ldout(msgr->cct,1) << __func__ << "close listen_sd failed: " + << " errno " << errno << " " << cpp_strerror(errno) << dendl; + } listen_sd = -1; } + if (shutdown_rd_fd >= 0) { + if (::close(shutdown_rd_fd) < 0) { + ldout(msgr->cct,1) << __func__ << "close shutdown_rd_fd failed: " + << " errno " << errno << " " << cpp_strerror(errno) << dendl; + } + shutdown_rd_fd = -1; + } done = false; } diff --git a/src/msg/simple/Accepter.h b/src/msg/simple/Accepter.h index f1be9c00f2b6..7824c3a16f08 100644 --- a/src/msg/simple/Accepter.h +++ b/src/msg/simple/Accepter.h @@ -29,9 +29,15 @@ class Accepter : public Thread { bool done; int listen_sd; uint64_t nonce; + int shutdown_rd_fd; + int shutdown_wr_fd; + int create_selfpipe(int *pipe_rd, int *pipe_wr); public: - Accepter(SimpleMessenger *r, uint64_t n) : msgr(r), done(false), listen_sd(-1), nonce(n) {} + Accepter(SimpleMessenger *r, uint64_t n) + : msgr(r), done(false), listen_sd(-1), nonce(n), + shutdown_rd_fd(-1), shutdown_wr_fd(-1) + {} void *entry() override; void stop();