From 7c411d26bfc84c09776da41ebfe2121cfa25a42f Mon Sep 17 00:00:00 2001 From: sageweil Date: Sun, 9 Sep 2007 17:56:56 +0000 Subject: [PATCH] sendmsg, tcp_nodelay, again git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1795 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/Makefile | 2 +- trunk/ceph/config.cc | 1 + trunk/ceph/config.h | 1 + trunk/ceph/msg/SimpleMessenger.cc | 108 +++++++++++++++++++++++------- trunk/ceph/msg/SimpleMessenger.h | 1 + 5 files changed, 89 insertions(+), 24 deletions(-) diff --git a/trunk/ceph/Makefile b/trunk/ceph/Makefile index 8b4f83925b524..70c62b685119d 100644 --- a/trunk/ceph/Makefile +++ b/trunk/ceph/Makefile @@ -13,7 +13,7 @@ # on issdm, it's /usr/local/mpich2/bin. # Hook for extra -I options, etc. -EXTRA_CFLAGS = -pg -g #-I${HOME}/include -L${HOME}/lib +EXTRA_CFLAGS = -O3 -g #-I${HOME}/include -L${HOME}/lib # base CFLAGS = -Wall -I. -D_FILE_OFFSET_BITS=64 -D_REENTRANT -D_THREAD_SAFE ${EXTRA_CFLAGS} diff --git a/trunk/ceph/config.cc b/trunk/ceph/config.cc index 04bf0406fe7e4..66461f8308b72 100644 --- a/trunk/ceph/config.cc +++ b/trunk/ceph/config.cc @@ -121,6 +121,7 @@ md_config_t g_conf = { clock_tare: false, // --- messenger --- + ms_tcp_nodelay: true, ms_single_dispatch: false, ms_requeue_on_sender_fail: false, diff --git a/trunk/ceph/config.h b/trunk/ceph/config.h index f6104ea030219..62565bb4b0001 100644 --- a/trunk/ceph/config.h +++ b/trunk/ceph/config.h @@ -111,6 +111,7 @@ struct md_config_t { bool tcp_multi_dispatch; */ + bool ms_tcp_nodelay; bool ms_single_dispatch; bool ms_requeue_on_sender_fail; diff --git a/trunk/ceph/msg/SimpleMessenger.cc b/trunk/ceph/msg/SimpleMessenger.cc index ded228ffebc78..93d5e686ec37e 100644 --- a/trunk/ceph/msg/SimpleMessenger.cc +++ b/trunk/ceph/msg/SimpleMessenger.cc @@ -19,6 +19,8 @@ #include #include #include +#include +#include #include "config.h" @@ -507,6 +509,14 @@ void Rank::Pipe::writer() } } + // disable Nagle algorithm? + if (g_conf.ms_tcp_nodelay) { + int flag = 1; + int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); + if (r < 0) + dout(0) << "pipe(" << peer_addr << ' ' << this << ").writer couldn't set TCP_NODELAY: " << strerror(errno) << dendl; + } + // loop. lock.Lock(); while (!q.empty() || !done) { @@ -644,6 +654,40 @@ Message *Rank::Pipe::read_message() } +int Rank::Pipe::do_sendmsg(Message *m, struct msghdr *msg, int len) +{ + while (len > 0) { + int r = ::sendmsg(sd, msg, 0); + if (r < 0) { + assert(r == -1); + derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error on sendmsg for " << *m + << " to " << m->get_dest() + << ", " << strerror(errno) + << dendl; + need_to_send_close = false; + return -1; + } + len -= r; + if (len == 0) break; + + // hrmph. trim r bytes off the front of our message. + while (r > 0) { + if (msg->msg_iov[0].iov_len >= (size_t)r) { + // lose this whole item + r -= msg->msg_iov[0].iov_len; + msg->msg_iov++; + msg->msg_iovlen--; + } else { + // partial! + msg->msg_iov[0].iov_base = (void*)((long)msg->msg_iov[0].iov_base + r); + msg->msg_iov[0].iov_len -= r; + break; + } + } + } + return 0; +} + int Rank::Pipe::write_message(Message *m) { @@ -668,22 +712,28 @@ int Rank::Pipe::write_message(Message *m) << " in " << env->nchunks << dendl; + // set up msghdr and iovecs + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + struct iovec msgvec[1 + blist.buffers().size() + env->nchunks*2]; // conservative upper bound + msg.msg_iov = msgvec; + int msglen = 0; + // send envelope - int r = tcp_write( sd, (char*)env, sizeof(*env) ); - if (r < 0) { - derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error sending envelope for " << *m - << " to " << m->get_dest() << dendl; - need_to_send_close = false; - return -1; - } - + msgvec[0].iov_base = (char*)env; + msgvec[0].iov_len = sizeof(*env); + msglen += sizeof(*env); + msg.msg_iovlen++; + // payload list::const_iterator pb = blist.buffers().begin(); list::const_iterator pc = m->get_chunk_payload_at().begin(); int b_off = 0; // carry-over buffer offset, if any int bl_pos = 0; // blist pos int nchunks = env->nchunks; - while (nchunks) { + int32_t chunksizes[nchunks]; + + for (int curchunk=0; curchunk < nchunks; curchunk++) { // start a chunk int32_t size = blist.length() - bl_pos; if (pc != m->get_chunk_payload_at().end()) { @@ -693,14 +743,14 @@ int Rank::Pipe::write_message(Message *m) pc++; } assert(size > 0); - dout(30) << "pipe(" << peer_addr << ' ' << this << ").writer chunk pos " << bl_pos << " size " << size << dendl; - - r = tcp_write(sd, (char*)&size, sizeof(size)); - if (r < 0) { - derr(10) << "pipe(" << peer_addr << ' ' << this << ").writer error sending chunk len for " << *m << " to " << m->get_dest() << dendl; - need_to_send_close = false; - return -1; - } + dout(30) << "chunk " << curchunk << " pos " << bl_pos << " size " << size << dendl; + + // chunk size + chunksizes[curchunk] = size; + msgvec[msg.msg_iovlen].iov_base = &chunksizes[curchunk]; + msgvec[msg.msg_iovlen].iov_len = sizeof(int32_t); + msglen += sizeof(int32_t); + msg.msg_iovlen++; // chunk contents int left = size; @@ -712,13 +762,22 @@ int Rank::Pipe::write_message(Message *m) << " buffer len " << pb->length() << " writing " << donow << dendl; - r = tcp_write(sd, pb->c_str()+b_off, donow); - if (r < 0) { - derr(10) << "pipe(" << peer_addr << ' ' << this << ").writer error sending data chunk for " << *m << " to " << m->get_dest() << dendl; - need_to_send_close = false; - return -1; + + if (msg.msg_iovlen >= IOV_MAX-1) { + if (do_sendmsg(m, &msg, msglen)) + return -1; + + // and restart the iov + msg.msg_iov = msgvec; + msg.msg_iovlen = 0; + msglen = 0; } + msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off); + msgvec[msg.msg_iovlen].iov_len = donow; + msglen += donow; + msg.msg_iovlen++; + left -= donow; assert(left >= 0); b_off += donow; @@ -729,10 +788,13 @@ int Rank::Pipe::write_message(Message *m) b_off = 0; } assert(left == 0); - nchunks--; } assert(pb == blist.buffers().end()); + // send + if (do_sendmsg(m, &msg, msglen)) + return -1; + return 0; } diff --git a/trunk/ceph/msg/SimpleMessenger.h b/trunk/ceph/msg/SimpleMessenger.h index 29df9a4066e91..87aa9793144b3 100644 --- a/trunk/ceph/msg/SimpleMessenger.h +++ b/trunk/ceph/msg/SimpleMessenger.h @@ -86,6 +86,7 @@ private: Message *read_message(); int write_message(Message *m); + int do_sendmsg(Message *m, struct msghdr *msg, int len); void fail(list& ls); // threads -- 2.39.5