]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
SIGPIPE suppression for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL 6416/head
authorRohan Mars <code@rohanmars.com>
Tue, 27 Oct 2015 03:34:08 +0000 (20:34 -0700)
committerRohan Mars <code@rohanmars.com>
Fri, 13 Nov 2015 16:18:35 +0000 (08:18 -0800)
Signed-off-by: Rohan Mars <code@rohanmars.com>
COPYING
src/include/sock_compat.h
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/net_handler.cc
src/msg/simple/Pipe.cc
src/msg/simple/Pipe.h

diff --git a/COPYING b/COPYING
index 5efc838319c48d01f580ab8586753fa482108bfb..189b1acb28a30158268627a50bd5b731a33bc7d4 100644 (file)
--- a/COPYING
+++ b/COPYING
@@ -149,3 +149,8 @@ Files: src/include/timegm.h
   Copyright (C) Copyright Howard Hinnant
   Copyright (C) Copyright 2010-2011 Vicente J. Botet Escriba
   License: Boost Software License, Version 1.0
+
+Files: src/msg/async/AsyncConnection.cc, src/msg/simple/Pipe.cc (sigpipe suppression)
+  Copyright (C) 2010 Tomash Brechko.  All rights reserved.
+  License: GPL3
+
index 5faacc343edc67a7c8ae774f0e50cab46984080c..56eb92bd6e3c20de3f4fa65895a0c7f6c2d5bee9 100644 (file)
 # define MSG_MORE 0
 #endif
 
-/*
- * On BSD SO_NOSIGPIPE can be set via setsockopt to block SIGPIPE.
- */
-#ifndef MSG_NOSIGNAL
-# define MSG_NOSIGNAL 0
-# ifdef SO_NOSIGPIPE
-#  define CEPH_USE_SO_NOSIGPIPE
-# else
-#  error "Cannot block SIGPIPE!"
-# endif
-#endif
-
 #endif
index 42c55adad58c1ce19f63514243f9d97a9e7dee2a..63da55e57b1e25875c33e0782c9cac32bfe85433 100644 (file)
@@ -229,12 +229,82 @@ int AsyncConnection::read_bulk(int fd, char *buf, int len)
   return nread;
 }
 
+/* 
+ SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL
+  http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html 
+  http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html 
+*/
+void AsyncConnection::suppress_sigpipe()
+{
+#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
+  /*
+    We want to ignore possible SIGPIPE that we can generate on write.
+    SIGPIPE is delivered *synchronously* and *only* to the thread
+    doing the write.  So if it is reported as already pending (which
+    means the thread blocks it), then we do nothing: if we generate
+    SIGPIPE, it will be merged with the pending one (there's no
+    queuing), and that suits us well.  If it is not pending, we block
+    it in this thread (and we avoid changing signal action, because it
+    is per-process).
+  */
+  sigset_t pending;
+  sigemptyset(&pending);
+  sigpending(&pending);
+  sigpipe_pending = sigismember(&pending, SIGPIPE);
+  if (!sigpipe_pending) {
+    sigset_t blocked;
+    sigemptyset(&blocked);
+    pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked);
+
+    /* Maybe is was blocked already?  */
+    sigpipe_unblock = ! sigismember(&blocked, SIGPIPE);
+  }
+#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
+}
+
+
+void AsyncConnection::restore_sigpipe()
+{
+#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
+  /*
+    If SIGPIPE was pending already we do nothing.  Otherwise, if it
+    become pending (i.e., we generated it), then we sigwait() it (thus
+    clearing pending status).  Then we unblock SIGPIPE, but only if it
+    were us who blocked it.
+  */
+  if (!sigpipe_pending) {
+    sigset_t pending;
+    sigemptyset(&pending);
+    sigpending(&pending);
+    if (sigismember(&pending, SIGPIPE)) {
+      /*
+        Protect ourselves from a situation when SIGPIPE was sent
+        by the user to the whole process, and was delivered to
+        other thread before we had a chance to wait for it.
+      */
+      static const struct timespec nowait = { 0, 0 };
+      TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait));
+    }
+
+    if (sigpipe_unblock)
+      pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL);
+  }
+#endif  /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
+}
+
 // return the length of msg needed to be sent,
 // < 0 means error occured
 int AsyncConnection::do_sendmsg(struct msghdr &msg, int len, bool more)
 {
+  suppress_sigpipe();
+
   while (len > 0) {
-    int r = ::sendmsg(sd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
+    int r;
+#if defined(MSG_NOSIGNAL)
+    r = ::sendmsg(sd, &msg, MSG_NOSIGNAL);
+#else
+    r = ::sendmsg(sd, &msg, 0);
+#endif /* defined(MSG_NOSIGNAL) */
 
     if (r == 0) {
       ldout(async_msgr->cct, 10) << __func__ << " sendmsg got r==0!" << dendl;
@@ -266,6 +336,7 @@ int AsyncConnection::do_sendmsg(struct msghdr &msg, int len, bool more)
         break;
       }
     }
+    restore_sigpipe();
   }
   return len;
 }
index 64c2921d904dd44cfd126b7bf349f0f185fd5c52..43c3e566975140322a707e3eb71958e6c9678baf 100644 (file)
@@ -18,6 +18,7 @@
 #define CEPH_MSG_ASYNCCONNECTION_H
 
 #include <pthread.h>
+#include <signal.h>
 #include <climits>
 #include <list>
 #include <map>
@@ -45,6 +46,8 @@ class AsyncMessenger;
 class AsyncConnection : public Connection {
 
   int read_bulk(int fd, char *buf, int len);
+  void suppress_sigpipe();
+  void restore_sigpipe();
   int do_sendmsg(struct msghdr &msg, int len, bool more);
   int try_send(bufferlist &bl, bool send=true) {
     Mutex::Locker l(write_lock);
@@ -291,6 +294,12 @@ class AsyncConnection : public Connection {
   EventCenter *center;
   ceph::shared_ptr<AuthSessionHandler> session_security;
 
+#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
+  sigset_t sigpipe_mask;
+  bool sigpipe_pending;
+  bool sigpipe_unblock;
+#endif
+
  public:
   // used by eventcallback
   void handle_write();
index 2639fdc3b2b9f61f9093c8a849c189110c06d9ff..3d4810a65e97101a4bae1f0f961fa7ac0a867549 100644 (file)
@@ -92,7 +92,7 @@ void NetHandler::set_socket_options(int sd)
   }
 
   // block ESIGPIPE
-#ifdef CEPH_USE_SO_NOSIGPIPE
+#ifdef SO_NOSIGPIPE
   int val = 1;
   int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val));
   if (r) {
index de94c15bf045e9fe16fd26889103aa3ecfb7dd27..b61685b59815d3b5bd6867a7d66eea1df32e2ddf 100644 (file)
@@ -827,7 +827,7 @@ void Pipe::set_socket_options()
   }
 
   // block ESIGPIPE
-#ifdef CEPH_USE_SO_NOSIGPIPE
+#if defined(SO_NOSIGPIPE)
   int val = 1;
   int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val));
   if (r) {
@@ -847,6 +847,7 @@ void Pipe::set_socket_options()
                          << ": " << cpp_strerror(errno) << dendl;
     }
 #endif
+#if defined(SO_PRIORITY) 
     // setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0.
     // See http://goo.gl/QWhvsD and http://goo.gl/laTbjT
     // We need to call setsockopt(SO_PRIORITY) after it.
@@ -857,6 +858,7 @@ void Pipe::set_socket_options()
       ldout(msgr->cct,0) << "couldn't set SO_PRIORITY to " << prio
                          << ": " << cpp_strerror(errno) << dendl;
     }
+#endif
   }
 }
 
@@ -2120,8 +2122,73 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
   return ret;
 }
 
+/* 
+ SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL
+  http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html 
+  http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html 
+*/
+void Pipe::suppress_sigpipe()
+{
+#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
+  /*
+    We want to ignore possible SIGPIPE that we can generate on write.
+    SIGPIPE is delivered *synchronously* and *only* to the thread
+    doing the write.  So if it is reported as already pending (which
+    means the thread blocks it), then we do nothing: if we generate
+    SIGPIPE, it will be merged with the pending one (there's no
+    queuing), and that suits us well.  If it is not pending, we block
+    it in this thread (and we avoid changing signal action, because it
+    is per-process).
+  */
+  sigset_t pending;
+  sigemptyset(&pending);
+  sigpending(&pending);
+  sigpipe_pending = sigismember(&pending, SIGPIPE);
+  if (!sigpipe_pending) {
+    sigset_t blocked;
+    sigemptyset(&blocked);
+    pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked);
+
+    /* Maybe is was blocked already?  */
+    sigpipe_unblock = ! sigismember(&blocked, SIGPIPE);
+  }
+#endif  /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
+}
+
+
+void Pipe::restore_sigpipe()
+{
+#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
+  /*
+    If SIGPIPE was pending already we do nothing.  Otherwise, if it
+    become pending (i.e., we generated it), then we sigwait() it (thus
+    clearing pending status).  Then we unblock SIGPIPE, but only if it
+    were us who blocked it.
+  */
+  if (!sigpipe_pending) {
+    sigset_t pending;
+    sigemptyset(&pending);
+    sigpending(&pending);
+    if (sigismember(&pending, SIGPIPE)) {
+      /*
+        Protect ourselves from a situation when SIGPIPE was sent
+        by the user to the whole process, and was delivered to
+        other thread before we had a chance to wait for it.
+      */
+      static const struct timespec nowait = { 0, 0 };
+      TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait));
+    }
+
+    if (sigpipe_unblock)
+      pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL);
+  }
+#endif  /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
+}
+
+
 int Pipe::do_sendmsg(struct msghdr *msg, int len, bool more)
 {
+  suppress_sigpipe();
   while (len > 0) {
     if (0) { // sanity
       int l = 0;
@@ -2130,16 +2197,23 @@ int Pipe::do_sendmsg(struct msghdr *msg, int len, bool more)
       assert(l == len);
     }
 
-    int r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
+    int r;
+#if defined(MSG_NOSIGNAL)
+    r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
+#else
+    r = ::sendmsg(sd, msg, (more ? MSG_MORE : 0));
+#endif
     if (r == 0) 
       ldout(msgr->cct,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl;
     if (r < 0) { 
       ldout(msgr->cct,1) << "do_sendmsg error " << cpp_strerror(errno) << dendl;
+      restore_sigpipe();
       return -1;
     }
     if (state == STATE_CLOSED) {
       ldout(msgr->cct,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl;
       errno = EINTR;
+      restore_sigpipe();
       return -1; // close enough
     }
 
@@ -2164,6 +2238,7 @@ int Pipe::do_sendmsg(struct msghdr *msg, int len, bool more)
       }
     }
   }
+  restore_sigpipe();
   return 0;
 }
 
@@ -2527,8 +2602,15 @@ int Pipe::tcp_write(const char *buf, int len)
 
   //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
   assert(len > 0);
+  suppress_sigpipe();
+
   while (len > 0) {
-    int did = ::send( sd, buf, len, MSG_NOSIGNAL );
+    int did;
+#if defined(MSG_NOSIGNAL)
+    did = ::send( sd, buf, len, MSG_NOSIGNAL );
+#else
+    did = ::send( sd, buf, len, 0);
+#endif
     if (did < 0) {
       //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
       //lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
@@ -2538,5 +2620,7 @@ int Pipe::tcp_write(const char *buf, int len)
     buf += did;
     //lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl;
   }
+  restore_sigpipe();
+
   return 0;
 }
index 0c1671a396d39927bb69e96ea7b10a25dd2f3726..ce24a2a1ae75aba0e0e44c98154b8b76fc179118 100644 (file)
@@ -179,6 +179,11 @@ class DispatchQueue;
   private:
     int sd;
     struct iovec msgvec[IOV_MAX];
+#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
+    sigset_t sigpipe_mask;
+    bool sigpipe_pending;
+    bool sigpipe_unblock;
+#endif
 
   public:
     int port;
@@ -247,6 +252,10 @@ class DispatchQueue;
     int write_keepalive();
     int write_keepalive2(char tag, const utime_t &t);
 
+    void suppress_sigpipe();
+    void restore_sigpipe();
+
+
     void fault(bool reader=false);
 
     void was_session_reset();