]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: add a mechanism for Solaris to avoid dying on SIGPIPE
authorGreg Farnum <gfarnum@redhat.com>
Tue, 3 Oct 2017 22:54:06 +0000 (15:54 -0700)
committerGreg Farnum <gfarnum@redhat.com>
Wed, 4 Oct 2017 14:10:23 +0000 (07:10 -0700)
This is fairly clean: we define an RAII object in the Messenger.h on
Solaris, and "declare" it with a macro in the implementations. There's
no code duplication and on Linux it's just entirely compiled out.

Signed-off-by: Greg Farnum <gfarnum@redhat.com>
src/include/sock_compat.h
src/msg/Messenger.h
src/msg/async/PosixStack.cc
src/msg/simple/Pipe.cc

index 5faacc343edc67a7c8ae774f0e50cab46984080c..f9dc24b1ddc6bdba7210a23cb292b2422a7a9489 100644 (file)
@@ -19,7 +19,8 @@
 # ifdef SO_NOSIGPIPE
 #  define CEPH_USE_SO_NOSIGPIPE
 # else
-#  error "Cannot block SIGPIPE!"
+#  define CEPH_USE_SIGPIPE_BLOCKER
+#  warning "Using SIGPIPE blocking instead of suppression; this is not well-tested upstream!"
 # endif
 #endif
 
index 7c1a0d1fad5febe9ee507215aa37910508f4c9bc..c6dbcc17694d4e4ea594b4612313f753885bda55 100644 (file)
@@ -31,6 +31,7 @@ using namespace std;
 
 #include <errno.h>
 #include <sstream>
+#include <signal.h>
 
 #define SOCKET_PRIORITY_MIN_DELAY 6
 
@@ -558,11 +559,53 @@ protected:
   /**
    * @} // Subclass Interfacing
    */
+public:
+#ifdef CEPH_USE_SIGPIPE_BLOCKER
+  /**
+   * We need to disable SIGPIPE on all platforms, and if they
+   * don't give us a better mechanism (read: are on Solaris) that
+   * means blocking the signal whenever we do a send or sendmsg...
+   * That means any implementations must invoke MSGR_SIGPIPE_STOPPER in-scope
+   * whenever doing so. On most systems that's blank, but on systems where
+   * it's needed we construct an RAII object to plug and un-plug the SIGPIPE.
+   * See http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
+   */
+  struct sigpipe_stopper {
+    bool blocked;
+    sigset_t existing_mask;
+    sigset_t pipe_mask;
+    sigpipe_stopper() {
+      sigemptyset(&pipe_mask);
+      sigaddset(&pipe_mask, SIGPIPE);
+      sigset_t signals;
+      sigemptyset(&signals);
+      sigpending(&signals);
+      if (sigismember(&signals, SIGPIPE)) {
+       blocked = false;
+      } else {
+       blocked = true;
+       int r = pthread_sigmask(SIG_BLOCK, &pipe_mask, &existing_mask);
+       assert(r == 0);
+      }
+    }
+    ~sigpipe_stopper() {
+      if (blocked) {
+       struct timespec nowait{0};
+       int r = sigtimedwait(&pipe_mask, 0, &nowait);
+       assert(r == EAGAIN || r == 0);
+       r = pthread_sigmask(SIG_SETMASK, &existing_mask, 0);
+       assert(r == 0);
+      }
+    }
+  };
+#  define MSGR_SIGPIPE_STOPPER Messenger::sigpipe_stopper stopper();
+#else
+#  define MSGR_SIGPIPE_STOPPER
+#endif
   /**
    * @defgroup Dispatcher Interfacing
    * @{
    */
-public:
   /**
    * Determine whether a message can be fast-dispatched. We will
    * query each Dispatcher in sequence to determine if they are
index c91acb9b195e50cf1c5d3692daa8ee7e62ef31f1..e0104fdc4bcee134115cfa1349ec067e625a8a97 100644 (file)
 
 #include "include/buffer.h"
 #include "include/str_list.h"
-#include "include/sock_compat.h"
 #include "common/errno.h"
 #include "common/strtol.h"
 #include "common/dout.h"
+#include "msg/Messenger.h"
+#include "include/sock_compat.h"
 
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
@@ -77,6 +78,7 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
   {
     size_t sent = 0;
     while (1) {
+      MSGR_SIGPIPE_STOPPER;
       ssize_t r;
       r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
       if (r < 0) {
index 2f7115b447321efe4f7817259b30101b7b530e9a..1bb6faa52dda06b8c53e4a7f3930141d44323b3d 100644 (file)
@@ -2263,6 +2263,7 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
 
 int Pipe::do_sendmsg(struct msghdr *msg, unsigned len, bool more)
 {
+  MSGR_SIGPIPE_STOPPER;
   while (len > 0) {
     int r;
     r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
@@ -2666,6 +2667,7 @@ int Pipe::tcp_write(const char *buf, unsigned len)
   //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
   assert(len > 0);
   while (len > 0) {
+    MSGR_SIGPIPE_STOPPER;
     int did = ::send( sd, buf, len, MSG_NOSIGNAL );
     if (did < 0) {
       //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;