From cf4414684dd2ca5f2a565449be4686849695f62f Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 8 Mar 2010 15:34:24 -0800 Subject: [PATCH] messenger: don't use signal SIGUSRx anymore Was used internally to wake up blocking messenger threads. --- src/msg/SimpleMessenger.cc | 71 +++++++++++--------------------------- src/msg/SimpleMessenger.h | 1 - src/msg/tcp.cc | 29 ++++++++++++++-- 3 files changed, 48 insertions(+), 53 deletions(-) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 32be1979b845b..4613ecb899810 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include @@ -58,11 +57,6 @@ static ostream& _prefix(SimpleMessenger *messenger) { * Accepter */ -void noop_signal_handler(int s) -{ - //dout(0) << "blah_handler got " << s << dendl; -} - int SimpleMessenger::Accepter::bind(int64_t force_nonce) { // bind to a socket @@ -151,14 +145,6 @@ int SimpleMessenger::Accepter::start() { dout(1) << "accepter.start" << dendl; - // set a harmless handle for SIGUSR1 (we'll use it to stop the accepter) - struct sigaction sa; - memset(&sa, 0, sizeof(sa)); - sa.sa_handler = noop_signal_handler; - sa.sa_flags = 0; - sigemptyset(&sa.sa_mask); - sigaction(SIGUSR1, &sa, NULL); - // start thread create(); @@ -169,26 +155,24 @@ void *SimpleMessenger::Accepter::entry() { dout(10) << "accepter starting" << dendl; - fd_set fds; int errors = 0; - sigset_t sigmask, sigempty; - sigemptyset(&sigmask); - sigaddset(&sigmask, SIGUSR1); - sigemptyset(&sigempty); - - // block SIGUSR1 - pthread_sigmask(SIG_BLOCK, &sigmask, NULL); - char buf[80]; + struct pollfd pfd; + pfd.fd = listen_sd; + pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP; while (!done) { - FD_ZERO(&fds); - FD_SET(listen_sd, &fds); - dout(20) << "accepter calling select" << dendl; - int r = ::pselect(listen_sd+1, &fds, 0, &fds, 0, &sigempty); // unblock SIGUSR1 inside select() - dout(20) << "accepter select got " << r << dendl; - + dout(20) << "accepter calling poll" << dendl; + int r = poll(&pfd, 1, -1); + if (r < 0) + break; + dout(20) << "accepter poll got " << r << dendl; + + if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP)) + break; + + dout(10) << "pfd.revents=" << pfd.revents << dendl; if (done) break; // accept @@ -227,9 +211,6 @@ void *SimpleMessenger::Accepter::entry() } } - // unblock SIGUSR1 - pthread_sigmask(SIG_UNBLOCK, &sigmask, NULL); - dout(20) << "accepter closing" << dendl; // don't close socket, in case we start up again? blech. if (listen_sd >= 0) { @@ -244,8 +225,12 @@ void *SimpleMessenger::Accepter::entry() void SimpleMessenger::Accepter::stop() { done = true; - dout(10) << "stop sending SIGUSR1" << dendl; - this->kill(SIGUSR1); + dout(10) << "stop accepter" << dendl; + if (listen_sd) { + ::shutdown(listen_sd, SHUT_RDWR); + ::close(listen_sd); + listen_sd = -1; + } join(); done = false; } @@ -852,9 +837,7 @@ int SimpleMessenger::Pipe::accept() replace: dout(10) << "accept replacing " << existing << dendl; - existing->state = STATE_CLOSED; - existing->cond.Signal(); - existing->reader_thread.kill(SIGUSR2); + existing->stop(); existing->unregister_pipe(); // steal queue and out_seq @@ -1427,13 +1410,10 @@ void SimpleMessenger::Pipe::stop() state = STATE_CLOSED; cond.Signal(); if (sd >= 0) { + ::shutdown(sd, SHUT_RDWR); ::close(sd); sd = -1; } - if (reader_running) - reader_thread.kill(SIGUSR2); - if (writer_running) - writer_thread.kill(SIGUSR2); } @@ -2204,15 +2184,6 @@ int SimpleMessenger::start(bool nodaemon) if (g_conf.kill_after) g_timer.add_event_after(g_conf.kill_after, new C_Die); - // set noop handlers for SIGUSR2, SIGPIPE - struct sigaction sa; - memset(&sa, 0, sizeof(sa)); - sa.sa_handler = noop_signal_handler; - sa.sa_flags = 0; - sigemptyset(&sa.sa_mask); - sigaction(SIGUSR2, &sa, NULL); - sigaction(SIGPIPE, &sa, NULL); // mask SIGPIPE too. FIXME: i'm quite certain this is a roundabout way to do that. - // go! if (did_bind) accepter.start(); diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 7a9ee73e8e2c8..38799a4cb0f7a 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -216,7 +216,6 @@ private: assert(!reader_joining); reader_joining = true; cond.Signal(); - reader_thread.kill(SIGUSR2); pipe_lock.Unlock(); reader_thread.join(); pipe_lock.Lock(); diff --git a/src/msg/tcp.cc b/src/msg/tcp.cc index 8d97a80ebebe2..f5b3b210f9697 100644 --- a/src/msg/tcp.cc +++ b/src/msg/tcp.cc @@ -1,6 +1,7 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab +#include #include "tcp.h" /****************** @@ -8,8 +9,22 @@ */ int tcp_read(int sd, char *buf, int len) { + struct pollfd pfd; + pfd.fd = sd; + pfd.events = POLLIN | POLLHUP | POLLRDHUP | POLLNVAL | POLLERR; while (len > 0) { - int got = ::recv( sd, buf, len, 0 ); + if (poll(&pfd, 1, -1) < 0) + return -1; + + if (!(pfd.revents & POLLIN)) + return -1; + + /* + * although we turn on the MSG_DONTWAIT flag, we don't expect + * receivng an EAGAIN, as we polled on the socket, so there + * should be data waiting for us. + */ + int got = ::recv( sd, buf, len, MSG_DONTWAIT ); if (got <= 0) { //char buf[100]; //generic_dout(0) << "tcp_read socket " << sd << " returned " << got @@ -24,10 +39,20 @@ int tcp_read(int sd, char *buf, int len) { } int tcp_write(int sd, const char *buf, int len) { + struct pollfd pfd; + pfd.fd = sd; + pfd.events = POLLOUT | POLLHUP | POLLRDHUP | POLLNVAL | POLLERR; + + if (poll(&pfd, 1, -1) < 0) + return -1; + + if (!(pfd.revents & POLLOUT)) + return -1; + //generic_dout(DBL) << "tcp_write writing " << len << dendl; assert(len > 0); while (len > 0) { - int did = ::send( sd, buf, len, 0 ); + int did = ::send( sd, buf, len, MSG_NOSIGNAL ); if (did < 0) { //generic_dout(1) << "tcp_write error did = " << did << " errno " << errno << " " << strerror(errno) << dendl; //generic_derr(1) << "tcp_write error did = " << did << " errno " << errno << " " << strerror(errno) << dendl; -- 2.39.5