#include <errno.h>
#include <sstream>
+#include <signal.h>
#define SOCKET_PRIORITY_MIN_DELAY 6
/**
* @} // Subclass Interfacing
*/
+public:
+#ifdef CEPH_USE_SIGPIPE_BLOCKER
+ /**
+ * We need to disable SIGPIPE on all platforms, and if they
+ * don't give us a better mechanism (read: are on Solaris) that
+ * means blocking the signal whenever we do a send or sendmsg...
+ * That means any implementations must invoke MSGR_SIGPIPE_STOPPER in-scope
+ * whenever doing so. On most systems that's blank, but on systems where
+ * it's needed we construct an RAII object to plug and un-plug the SIGPIPE.
+ * See http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
+ */
+ struct sigpipe_stopper {
+ bool blocked;
+ sigset_t existing_mask;
+ sigset_t pipe_mask;
+ sigpipe_stopper() {
+ sigemptyset(&pipe_mask);
+ sigaddset(&pipe_mask, SIGPIPE);
+ sigset_t signals;
+ sigemptyset(&signals);
+ sigpending(&signals);
+ if (sigismember(&signals, SIGPIPE)) {
+ blocked = false;
+ } else {
+ blocked = true;
+ int r = pthread_sigmask(SIG_BLOCK, &pipe_mask, &existing_mask);
+ assert(r == 0);
+ }
+ }
+ ~sigpipe_stopper() {
+ if (blocked) {
+ struct timespec nowait{0};
+ int r = sigtimedwait(&pipe_mask, 0, &nowait);
+ assert(r == EAGAIN || r == 0);
+ r = pthread_sigmask(SIG_SETMASK, &existing_mask, 0);
+ assert(r == 0);
+ }
+ }
+ };
+# define MSGR_SIGPIPE_STOPPER Messenger::sigpipe_stopper stopper();
+#else
+# define MSGR_SIGPIPE_STOPPER
+#endif
/**
* @defgroup Dispatcher Interfacing
* @{
*/
-public:
/**
* Determine whether a message can be fast-dispatched. We will
* query each Dispatcher in sequence to determine if they are
#include "include/buffer.h"
#include "include/str_list.h"
-#include "include/sock_compat.h"
#include "common/errno.h"
#include "common/strtol.h"
#include "common/dout.h"
+#include "msg/Messenger.h"
+#include "include/sock_compat.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
{
size_t sent = 0;
while (1) {
+ MSGR_SIGPIPE_STOPPER;
ssize_t r;
r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
if (r < 0) {
int Pipe::do_sendmsg(struct msghdr *msg, unsigned len, bool more)
{
+ MSGR_SIGPIPE_STOPPER;
while (len > 0) {
int r;
r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
//lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
assert(len > 0);
while (len > 0) {
+ MSGR_SIGPIPE_STOPPER;
int did = ::send( sd, buf, len, MSG_NOSIGNAL );
if (did < 0) {
//lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;