From 131deb39769c1187c334ee84f552d3be01f1751b Mon Sep 17 00:00:00 2001 From: Rohan Mars Date: Mon, 26 Oct 2015 20:34:08 -0700 Subject: [PATCH] SIGPIPE suppression for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL Signed-off-by: Rohan Mars --- COPYING | 5 ++ src/include/sock_compat.h | 12 ----- src/msg/async/AsyncConnection.cc | 73 +++++++++++++++++++++++++- src/msg/async/AsyncConnection.h | 9 ++++ src/msg/async/net_handler.cc | 2 +- src/msg/simple/Pipe.cc | 90 ++++++++++++++++++++++++++++++-- src/msg/simple/Pipe.h | 9 ++++ 7 files changed, 183 insertions(+), 17 deletions(-) diff --git a/COPYING b/COPYING index 5efc838319c48..189b1acb28a30 100644 --- a/COPYING +++ b/COPYING @@ -149,3 +149,8 @@ Files: src/include/timegm.h Copyright (C) Copyright Howard Hinnant Copyright (C) Copyright 2010-2011 Vicente J. Botet Escriba License: Boost Software License, Version 1.0 + +Files: src/msg/async/AsyncConnection.cc, src/msg/simple/Pipe.cc (sigpipe suppression) + Copyright (C) 2010 Tomash Brechko. All rights reserved. + License: GPL3 + diff --git a/src/include/sock_compat.h b/src/include/sock_compat.h index 5faacc343edc6..56eb92bd6e3c2 100644 --- a/src/include/sock_compat.h +++ b/src/include/sock_compat.h @@ -11,16 +11,4 @@ # define MSG_MORE 0 #endif -/* - * On BSD SO_NOSIGPIPE can be set via setsockopt to block SIGPIPE. - */ -#ifndef MSG_NOSIGNAL -# define MSG_NOSIGNAL 0 -# ifdef SO_NOSIGPIPE -# define CEPH_USE_SO_NOSIGPIPE -# else -# error "Cannot block SIGPIPE!" -# endif -#endif - #endif diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 42c55adad58c1..63da55e57b1e2 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -229,12 +229,82 @@ int AsyncConnection::read_bulk(int fd, char *buf, int len) return nread; } +/* + SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL + http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html + http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html +*/ +void AsyncConnection::suppress_sigpipe() +{ +#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) + /* + We want to ignore possible SIGPIPE that we can generate on write. + SIGPIPE is delivered *synchronously* and *only* to the thread + doing the write. So if it is reported as already pending (which + means the thread blocks it), then we do nothing: if we generate + SIGPIPE, it will be merged with the pending one (there's no + queuing), and that suits us well. If it is not pending, we block + it in this thread (and we avoid changing signal action, because it + is per-process). + */ + sigset_t pending; + sigemptyset(&pending); + sigpending(&pending); + sigpipe_pending = sigismember(&pending, SIGPIPE); + if (!sigpipe_pending) { + sigset_t blocked; + sigemptyset(&blocked); + pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked); + + /* Maybe is was blocked already? */ + sigpipe_unblock = ! sigismember(&blocked, SIGPIPE); + } +#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */ +} + + +void AsyncConnection::restore_sigpipe() +{ +#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) + /* + If SIGPIPE was pending already we do nothing. Otherwise, if it + become pending (i.e., we generated it), then we sigwait() it (thus + clearing pending status). Then we unblock SIGPIPE, but only if it + were us who blocked it. + */ + if (!sigpipe_pending) { + sigset_t pending; + sigemptyset(&pending); + sigpending(&pending); + if (sigismember(&pending, SIGPIPE)) { + /* + Protect ourselves from a situation when SIGPIPE was sent + by the user to the whole process, and was delivered to + other thread before we had a chance to wait for it. + */ + static const struct timespec nowait = { 0, 0 }; + TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait)); + } + + if (sigpipe_unblock) + pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL); + } +#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */ +} + // return the length of msg needed to be sent, // < 0 means error occured int AsyncConnection::do_sendmsg(struct msghdr &msg, int len, bool more) { + suppress_sigpipe(); + while (len > 0) { - int r = ::sendmsg(sd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); + int r; +#if defined(MSG_NOSIGNAL) + r = ::sendmsg(sd, &msg, MSG_NOSIGNAL); +#else + r = ::sendmsg(sd, &msg, 0); +#endif /* defined(MSG_NOSIGNAL) */ if (r == 0) { ldout(async_msgr->cct, 10) << __func__ << " sendmsg got r==0!" << dendl; @@ -266,6 +336,7 @@ int AsyncConnection::do_sendmsg(struct msghdr &msg, int len, bool more) break; } } + restore_sigpipe(); } return len; } diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 64c2921d904dd..43c3e56697514 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -18,6 +18,7 @@ #define CEPH_MSG_ASYNCCONNECTION_H #include +#include #include #include #include @@ -45,6 +46,8 @@ class AsyncMessenger; class AsyncConnection : public Connection { int read_bulk(int fd, char *buf, int len); + void suppress_sigpipe(); + void restore_sigpipe(); int do_sendmsg(struct msghdr &msg, int len, bool more); int try_send(bufferlist &bl, bool send=true) { Mutex::Locker l(write_lock); @@ -291,6 +294,12 @@ class AsyncConnection : public Connection { EventCenter *center; ceph::shared_ptr session_security; +#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) + sigset_t sigpipe_mask; + bool sigpipe_pending; + bool sigpipe_unblock; +#endif + public: // used by eventcallback void handle_write(); diff --git a/src/msg/async/net_handler.cc b/src/msg/async/net_handler.cc index 2639fdc3b2b9f..3d4810a65e971 100644 --- a/src/msg/async/net_handler.cc +++ b/src/msg/async/net_handler.cc @@ -92,7 +92,7 @@ void NetHandler::set_socket_options(int sd) } // block ESIGPIPE -#ifdef CEPH_USE_SO_NOSIGPIPE +#ifdef SO_NOSIGPIPE int val = 1; int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val)); if (r) { diff --git a/src/msg/simple/Pipe.cc b/src/msg/simple/Pipe.cc index de94c15bf045e..b61685b59815d 100644 --- a/src/msg/simple/Pipe.cc +++ b/src/msg/simple/Pipe.cc @@ -827,7 +827,7 @@ void Pipe::set_socket_options() } // block ESIGPIPE -#ifdef CEPH_USE_SO_NOSIGPIPE +#if defined(SO_NOSIGPIPE) int val = 1; int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val)); if (r) { @@ -847,6 +847,7 @@ void Pipe::set_socket_options() << ": " << cpp_strerror(errno) << dendl; } #endif +#if defined(SO_PRIORITY) // setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0. // See http://goo.gl/QWhvsD and http://goo.gl/laTbjT // We need to call setsockopt(SO_PRIORITY) after it. @@ -857,6 +858,7 @@ void Pipe::set_socket_options() ldout(msgr->cct,0) << "couldn't set SO_PRIORITY to " << prio << ": " << cpp_strerror(errno) << dendl; } +#endif } } @@ -2120,8 +2122,73 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler) return ret; } +/* + SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL + http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html + http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html +*/ +void Pipe::suppress_sigpipe() +{ +#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) + /* + We want to ignore possible SIGPIPE that we can generate on write. + SIGPIPE is delivered *synchronously* and *only* to the thread + doing the write. So if it is reported as already pending (which + means the thread blocks it), then we do nothing: if we generate + SIGPIPE, it will be merged with the pending one (there's no + queuing), and that suits us well. If it is not pending, we block + it in this thread (and we avoid changing signal action, because it + is per-process). + */ + sigset_t pending; + sigemptyset(&pending); + sigpending(&pending); + sigpipe_pending = sigismember(&pending, SIGPIPE); + if (!sigpipe_pending) { + sigset_t blocked; + sigemptyset(&blocked); + pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked); + + /* Maybe is was blocked already? */ + sigpipe_unblock = ! sigismember(&blocked, SIGPIPE); + } +#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */ +} + + +void Pipe::restore_sigpipe() +{ +#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) + /* + If SIGPIPE was pending already we do nothing. Otherwise, if it + become pending (i.e., we generated it), then we sigwait() it (thus + clearing pending status). Then we unblock SIGPIPE, but only if it + were us who blocked it. + */ + if (!sigpipe_pending) { + sigset_t pending; + sigemptyset(&pending); + sigpending(&pending); + if (sigismember(&pending, SIGPIPE)) { + /* + Protect ourselves from a situation when SIGPIPE was sent + by the user to the whole process, and was delivered to + other thread before we had a chance to wait for it. + */ + static const struct timespec nowait = { 0, 0 }; + TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait)); + } + + if (sigpipe_unblock) + pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL); + } +#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */ +} + + int Pipe::do_sendmsg(struct msghdr *msg, int len, bool more) { + suppress_sigpipe(); while (len > 0) { if (0) { // sanity int l = 0; @@ -2130,16 +2197,23 @@ int Pipe::do_sendmsg(struct msghdr *msg, int len, bool more) assert(l == len); } - int r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); + int r; +#if defined(MSG_NOSIGNAL) + r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); +#else + r = ::sendmsg(sd, msg, (more ? MSG_MORE : 0)); +#endif if (r == 0) ldout(msgr->cct,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl; if (r < 0) { ldout(msgr->cct,1) << "do_sendmsg error " << cpp_strerror(errno) << dendl; + restore_sigpipe(); return -1; } if (state == STATE_CLOSED) { ldout(msgr->cct,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl; errno = EINTR; + restore_sigpipe(); return -1; // close enough } @@ -2164,6 +2238,7 @@ int Pipe::do_sendmsg(struct msghdr *msg, int len, bool more) } } } + restore_sigpipe(); return 0; } @@ -2527,8 +2602,15 @@ int Pipe::tcp_write(const char *buf, int len) //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl; assert(len > 0); + suppress_sigpipe(); + while (len > 0) { - int did = ::send( sd, buf, len, MSG_NOSIGNAL ); + int did; +#if defined(MSG_NOSIGNAL) + did = ::send( sd, buf, len, MSG_NOSIGNAL ); +#else + did = ::send( sd, buf, len, 0); +#endif if (did < 0) { //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl; //lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl; @@ -2538,5 +2620,7 @@ int Pipe::tcp_write(const char *buf, int len) buf += did; //lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl; } + restore_sigpipe(); + return 0; } diff --git a/src/msg/simple/Pipe.h b/src/msg/simple/Pipe.h index 0c1671a396d39..ce24a2a1ae75a 100644 --- a/src/msg/simple/Pipe.h +++ b/src/msg/simple/Pipe.h @@ -179,6 +179,11 @@ class DispatchQueue; private: int sd; struct iovec msgvec[IOV_MAX]; +#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) + sigset_t sigpipe_mask; + bool sigpipe_pending; + bool sigpipe_unblock; +#endif public: int port; @@ -247,6 +252,10 @@ class DispatchQueue; int write_keepalive(); int write_keepalive2(char tag, const utime_t &t); + void suppress_sigpipe(); + void restore_sigpipe(); + + void fault(bool reader=false); void was_session_reset(); -- 2.39.5