*
*/
+#include "include/compat.h"
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <sys/uio.h>
#include "common/debug.h"
#include "common/errno.h"
+#include "common/safe_io.h"
#define dout_subsys ceph_subsys_ms
return 0;
}
+int Accepter::create_selfpipe(int *pipe_rd, int *pipe_wr) {
+ int selfpipe[2];
+ int ret = ::pipe2(selfpipe, (O_CLOEXEC|O_NONBLOCK));
+ if (ret < 0 ) {
+ lderr(msgr->cct) << __func__ << " unable to create the selfpipe: "
+ << cpp_strerror(errno) << dendl;
+ return -errno;
+ }
+ *pipe_rd = selfpipe[0];
+ *pipe_wr = selfpipe[1];
+ return 0;
+}
+
int Accepter::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
{
const md_config_t *conf = msgr->cct->_conf;
if (msgr->cct->_conf->ms_tcp_rcvbuf) {
int size = msgr->cct->_conf->ms_tcp_rcvbuf;
- rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size));
+ rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_RCVBUF,
+ (void*)&size, sizeof(size));
if (rc < 0) {
rc = -errno;
lderr(msgr->cct) << "accepter.bind failed to set SO_RCVBUF to " << size
msgr->init_local_connection();
- ldout(msgr->cct,1) << "accepter.bind my_inst.addr is " << msgr->get_myaddr()
+ rc = create_selfpipe(&shutdown_rd_fd, &shutdown_wr_fd);
+ if (rc < 0) {
+ lderr(msgr->cct) << __func__ << " unable to create signalling pipe " << listen_addr
+ << ": " << cpp_strerror(rc) << dendl;
+ return rc;
+ }
+
+ ldout(msgr->cct,1) << __func__ << " my_inst.addr is " << msgr->get_myaddr()
<< " need_addr=" << msgr->get_need_addr() << dendl;
return 0;
}
ldout(msgr->cct,10) << "accepter starting" << dendl;
int errors = 0;
+ int ch;
- struct pollfd pfd;
- pfd.fd = listen_sd;
- pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+ struct pollfd pfd[2];
+ memset(pfd, 0, sizeof(pfd));
+
+ pfd[0].fd = listen_sd;
+ pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+ pfd[1].fd = shutdown_rd_fd;
+ pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
while (!done) {
- ldout(msgr->cct,20) << "accepter calling poll" << dendl;
- int r = poll(&pfd, 1, -1);
- if (r < 0)
+ ldout(msgr->cct,20) << __func__ << " calling poll for sd:" << listen_sd << dendl;
+ int r = poll(pfd, 2, -1);
+ if (r < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ ldout(msgr->cct,1) << __func__ << " poll got error"
+ << " errno " << errno << " " << cpp_strerror(errno) << dendl;
break;
- ldout(msgr->cct,20) << "accepter poll got " << r << dendl;
+ }
+ ldout(msgr->cct,10) << __func__ << " poll returned oke: " << r << dendl;
+ ldout(msgr->cct,20) << __func__ << " pfd.revents[0]=" << pfd[0].revents << dendl;
+ ldout(msgr->cct,20) << __func__ << " pfd.revents[1]=" << pfd[1].revents << dendl;
- if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP))
+ if (pfd[0].revents & (POLLERR | POLLNVAL | POLLHUP)) {
+ ldout(msgr->cct,1) << __func__ << " poll got errors in revents "
+ << pfd[0].revents << dendl;
break;
-
- ldout(msgr->cct,10) << "pfd.revents=" << pfd.revents << dendl;
+ }
+ if (pfd[1].revents & (POLLIN | POLLERR | POLLNVAL | POLLHUP)) {
+ // We got "signaled" to exit the poll
+ // clean the selfpipe
+ if (::read(shutdown_rd_fd, &ch, 1) == -1) {
+ if (errno != EAGAIN)
+ ldout(msgr->cct,1) << __func__ << " Cannot read selfpipe: "
+ << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+ }
+ break;
+ }
if (done) break;
// accept
}
}
- ldout(msgr->cct,20) << "accepter closing" << dendl;
- // don't close socket, in case we start up again? blech.
- if (listen_sd >= 0) {
- ::close(listen_sd);
- listen_sd = -1;
+ ldout(msgr->cct,20) << __func__ << " closing" << dendl;
+ // socket is closed right after the thread has joined.
+ // closing it here might race
+ if (shutdown_rd_fd >= 0) {
+ ::close(shutdown_rd_fd);
+ shutdown_rd_fd = -1;
}
ldout(msgr->cct,10) << "accepter stopping" << dendl;
return 0;
done = true;
ldout(msgr->cct,10) << "stop accepter" << dendl;
- if (listen_sd >= 0) {
- ::shutdown(listen_sd, SHUT_RDWR);
+ if (shutdown_wr_fd < 0)
+ return;
+
+ // Send a byte to the shutdown pipe that the thread is listening to
+ char buf[1] = { 0x0 };
+ int ret = safe_write(shutdown_wr_fd, buf, 1);
+ if (ret < 0) {
+ ldout(msgr->cct,1) << __func__ << "close failed: "
+ << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+ } else {
+ ldout(msgr->cct,15) << __func__ << " signaled poll" << dendl;
}
+ VOID_TEMP_FAILURE_RETRY(close(shutdown_wr_fd));
+ shutdown_wr_fd = -1;
// wait for thread to stop before closing the socket, to avoid
// racing against fd re-use.
if (is_started()) {
+ ldout(msgr->cct,5) << __func__ << " wait for thread to join." << dendl;
join();
}
if (listen_sd >= 0) {
- ::close(listen_sd);
+ if (::close(listen_sd) < 0) {
+ ldout(msgr->cct,1) << __func__ << "close listen_sd failed: "
+ << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+ }
listen_sd = -1;
}
+ if (shutdown_rd_fd >= 0) {
+ if (::close(shutdown_rd_fd) < 0) {
+ ldout(msgr->cct,1) << __func__ << "close shutdown_rd_fd failed: "
+ << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+ }
+ shutdown_rd_fd = -1;
+ }
done = false;
}