return nread;
}
+/*
+ SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL
+ http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html
+ http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
+*/
+void AsyncConnection::suppress_sigpipe()
+{
+#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
+ /*
+ We want to ignore possible SIGPIPE that we can generate on write.
+ SIGPIPE is delivered *synchronously* and *only* to the thread
+ doing the write. So if it is reported as already pending (which
+ means the thread blocks it), then we do nothing: if we generate
+ SIGPIPE, it will be merged with the pending one (there's no
+ queuing), and that suits us well. If it is not pending, we block
+ it in this thread (and we avoid changing signal action, because it
+ is per-process).
+ */
+ sigset_t pending;
+ sigemptyset(&pending);
+ sigpending(&pending);
+ sigpipe_pending = sigismember(&pending, SIGPIPE);
+ if (!sigpipe_pending) {
+ sigset_t blocked;
+ sigemptyset(&blocked);
+ pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked);
+
+ /* Maybe is was blocked already? */
+ sigpipe_unblock = ! sigismember(&blocked, SIGPIPE);
+ }
+#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
+}
+
+
+void AsyncConnection::restore_sigpipe()
+{
+#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
+ /*
+ If SIGPIPE was pending already we do nothing. Otherwise, if it
+ become pending (i.e., we generated it), then we sigwait() it (thus
+ clearing pending status). Then we unblock SIGPIPE, but only if it
+ were us who blocked it.
+ */
+ if (!sigpipe_pending) {
+ sigset_t pending;
+ sigemptyset(&pending);
+ sigpending(&pending);
+ if (sigismember(&pending, SIGPIPE)) {
+ /*
+ Protect ourselves from a situation when SIGPIPE was sent
+ by the user to the whole process, and was delivered to
+ other thread before we had a chance to wait for it.
+ */
+ static const struct timespec nowait = { 0, 0 };
+ TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait));
+ }
+
+ if (sigpipe_unblock)
+ pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL);
+ }
+#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
+}
+
// return the length of msg needed to be sent,
// < 0 means error occured
int AsyncConnection::do_sendmsg(struct msghdr &msg, int len, bool more)
{
+ suppress_sigpipe();
+
while (len > 0) {
- int r = ::sendmsg(sd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
+ int r;
+#if defined(MSG_NOSIGNAL)
+ r = ::sendmsg(sd, &msg, MSG_NOSIGNAL);
+#else
+ r = ::sendmsg(sd, &msg, 0);
+#endif /* defined(MSG_NOSIGNAL) */
if (r == 0) {
ldout(async_msgr->cct, 10) << __func__ << " sendmsg got r==0!" << dendl;
break;
}
}
+ restore_sigpipe();
}
return len;
}
}
// block ESIGPIPE
-#ifdef CEPH_USE_SO_NOSIGPIPE
+#if defined(SO_NOSIGPIPE)
int val = 1;
int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val));
if (r) {
<< ": " << cpp_strerror(errno) << dendl;
}
#endif
+#if defined(SO_PRIORITY)
// setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0.
// See http://goo.gl/QWhvsD and http://goo.gl/laTbjT
// We need to call setsockopt(SO_PRIORITY) after it.
ldout(msgr->cct,0) << "couldn't set SO_PRIORITY to " << prio
<< ": " << cpp_strerror(errno) << dendl;
}
+#endif
}
}
return ret;
}
+/*
+ SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL
+ http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html
+ http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
+*/
+void Pipe::suppress_sigpipe()
+{
+#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
+ /*
+ We want to ignore possible SIGPIPE that we can generate on write.
+ SIGPIPE is delivered *synchronously* and *only* to the thread
+ doing the write. So if it is reported as already pending (which
+ means the thread blocks it), then we do nothing: if we generate
+ SIGPIPE, it will be merged with the pending one (there's no
+ queuing), and that suits us well. If it is not pending, we block
+ it in this thread (and we avoid changing signal action, because it
+ is per-process).
+ */
+ sigset_t pending;
+ sigemptyset(&pending);
+ sigpending(&pending);
+ sigpipe_pending = sigismember(&pending, SIGPIPE);
+ if (!sigpipe_pending) {
+ sigset_t blocked;
+ sigemptyset(&blocked);
+ pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked);
+
+ /* Maybe is was blocked already? */
+ sigpipe_unblock = ! sigismember(&blocked, SIGPIPE);
+ }
+#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
+}
+
+
+void Pipe::restore_sigpipe()
+{
+#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
+ /*
+ If SIGPIPE was pending already we do nothing. Otherwise, if it
+ become pending (i.e., we generated it), then we sigwait() it (thus
+ clearing pending status). Then we unblock SIGPIPE, but only if it
+ were us who blocked it.
+ */
+ if (!sigpipe_pending) {
+ sigset_t pending;
+ sigemptyset(&pending);
+ sigpending(&pending);
+ if (sigismember(&pending, SIGPIPE)) {
+ /*
+ Protect ourselves from a situation when SIGPIPE was sent
+ by the user to the whole process, and was delivered to
+ other thread before we had a chance to wait for it.
+ */
+ static const struct timespec nowait = { 0, 0 };
+ TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait));
+ }
+
+ if (sigpipe_unblock)
+ pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL);
+ }
+#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
+}
+
+
int Pipe::do_sendmsg(struct msghdr *msg, int len, bool more)
{
+ suppress_sigpipe();
while (len > 0) {
if (0) { // sanity
int l = 0;
assert(l == len);
}
- int r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
+ int r;
+#if defined(MSG_NOSIGNAL)
+ r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
+#else
+ r = ::sendmsg(sd, msg, (more ? MSG_MORE : 0));
+#endif
if (r == 0)
ldout(msgr->cct,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl;
if (r < 0) {
ldout(msgr->cct,1) << "do_sendmsg error " << cpp_strerror(errno) << dendl;
+ restore_sigpipe();
return -1;
}
if (state == STATE_CLOSED) {
ldout(msgr->cct,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl;
errno = EINTR;
+ restore_sigpipe();
return -1; // close enough
}
}
}
}
+ restore_sigpipe();
return 0;
}
//lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
assert(len > 0);
+ suppress_sigpipe();
+
while (len > 0) {
- int did = ::send( sd, buf, len, MSG_NOSIGNAL );
+ int did;
+#if defined(MSG_NOSIGNAL)
+ did = ::send( sd, buf, len, MSG_NOSIGNAL );
+#else
+ did = ::send( sd, buf, len, 0);
+#endif
if (did < 0) {
//lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
//lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
buf += did;
//lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl;
}
+ restore_sigpipe();
+
return 0;
}