From: Yehuda Sadeh Date: Thu, 12 Dec 2013 17:14:16 +0000 (-0800) Subject: msg: prefetch data when doing recv() X-Git-Tag: v0.88~170^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0e0a8d4e6a2fe790ea82b3e8596d576d49f07ace;p=ceph.git msg: prefetch data when doing recv() Since that when we're reading the header we do lot's of small IOs, we can really benefit from doing bigger reads. Set a limit on the buffer size, so that it should mainly affect header reads and not the actual data transfer of large IOs. Signed-off-by: Yehuda Sadeh --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index b4486699c932f..2fa71758f9c49 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -115,6 +115,7 @@ OPTION(perf, OPT_BOOL, true) // enable internal perf counters OPTION(ms_tcp_nodelay, OPT_BOOL, true) OPTION(ms_tcp_rcvbuf, OPT_INT, 0) +OPTION(ms_tcp_prefetch_max_size, OPT_INT, 4096) // max prefetch size, we limit this to avoid extra memcpy OPTION(ms_initial_backoff, OPT_DOUBLE, .2) OPTION(ms_max_backoff, OPT_DOUBLE, 15.0) OPTION(ms_nocrc, OPT_BOOL, false) diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index cbd04bc6e7731..814da5e18b1f9 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -79,6 +79,8 @@ Pipe::Pipe(SimpleMessenger *r, int st, PipeConnection *con) delay_thread(NULL), msgr(r), conn_id(r->dispatch_queue.get_id()), + recv_ofs(0), + recv_len(0), sd(-1), port(0), peer_type(-1), pipe_lock("SimpleMessenger::Pipe::pipe_lock"), @@ -108,6 +110,9 @@ Pipe::Pipe(SimpleMessenger *r, int st, PipeConnection *con) msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms if (msgr->timeout == 0) msgr->timeout = -1; + + recv_max_prefetch = msgr->cct->_conf->ms_tcp_prefetch_max_size; + recv_buf = new char[recv_max_prefetch]; } Pipe::~Pipe() @@ -115,6 +120,7 @@ Pipe::~Pipe() assert(out_q.empty()); assert(sent.empty()); delete delay_thread; + delete[] recv_buf; } void Pipe::handle_ack(uint64_t seq) @@ -270,6 +276,8 @@ int Pipe::accept() // used for reading in the remote acked seq on connect uint64_t newly_acked_seq = 0; + recv_reset(); + set_socket_options(); // announce myself. @@ -844,6 +852,7 @@ int Pipe::connect() goto fail; } + recv_reset(); // connect! ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl; rc = ::connect(sd, (sockaddr*)&peer_addr.addr, peer_addr.addr_size()); @@ -1706,6 +1715,7 @@ void Pipe::writer() m->set_connection(connection_state.get()); uint64_t features = connection_state->get_features(); + if (m->empty_payload()) ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " features " << features << " " << m << " " << *m << dendl; @@ -1741,7 +1751,7 @@ void Pipe::writer() blist.append(m->get_middle()); blist.append(m->get_data()); - pipe_lock.Unlock(); + pipe_lock.Unlock(); ldout(msgr->cct,20) << "writer sending " << m->get_seq() << " " << m << dendl; int rc = write_message(header, footer, blist); @@ -2314,6 +2324,9 @@ int Pipe::tcp_read_wait() pfd.events |= POLLRDHUP; #endif + if (has_pending_data()) + return 0; + if (poll(&pfd, 1, msgr->timeout) <= 0) return -1; @@ -2330,19 +2343,80 @@ int Pipe::tcp_read_wait() return 0; } -int Pipe::tcp_read_nonblocking(char *buf, int len) +int Pipe::do_recv(char *buf, size_t len, int flags) { again: - int got = ::recv( sd, buf, len, MSG_DONTWAIT ); + int got = ::recv( sd, buf, len, flags ); if (got < 0) { if (errno == EAGAIN || errno == EINTR) { goto again; - } else { - ldout(msgr->cct, 10) << "tcp_read_nonblocking socket " << sd << " returned " + } + ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned " << got << " " << cpp_strerror(errno) << dendl; - return -1; + return -1; + } + if (got == 0) { + return -1; + } + return got; +} + +int Pipe::buffered_recv(char *buf, size_t len, int flags) +{ + int left = len; + int total_recv = 0; + if (recv_len > recv_ofs) { + int to_read = MIN(recv_len - recv_ofs, left); + memcpy(buf, &recv_buf[recv_ofs], to_read); + recv_ofs += to_read; + left -= to_read; + if (left == 0) { + return to_read; } - } else if (got == 0) { + buf += to_read; + total_recv += to_read; + } + + /* nothing left in the prefetch buffer */ + + if (len > (size_t)recv_max_prefetch) { + /* this was a large read, we don't prefetch for these */ + int ret = do_recv(buf, left, flags ); + if (ret < 0) { + if (total_recv > 0) + return total_recv; + return ret; + } + total_recv += ret; + return total_recv; + } + + + int got = do_recv(recv_buf, recv_max_prefetch, flags); + if (got <= 0) { + if (total_recv > 0) + return total_recv; + + return got; + } + + recv_len = got; + got = MIN(left, got); + memcpy(buf, recv_buf, got); + recv_ofs = got; + total_recv += got; + return total_recv; +} + +int Pipe::tcp_read_nonblocking(char *buf, int len) +{ + int got = buffered_recv(buf, len, MSG_DONTWAIT ); + if (got < 0) { + ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned " + << got << " " << cpp_strerror(errno) << dendl; + return -1; + } + if (got == 0) { /* poll() said there was data, but we didn't read any - peer * sent a FIN. Maybe POLLRDHUP signals this, but this is * standard socket behavior as documented by Stevens. diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index cc2044c3d44fb..7c32bc090d8aa 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -131,6 +131,11 @@ class DispatchQueue; return static_cast(RefCountedObject::get()); } + char *recv_buf; + int recv_max_prefetch; + int recv_ofs; + int recv_len; + enum { STATE_ACCEPTING, STATE_CONNECTING, @@ -157,7 +162,9 @@ class DispatchQueue; return get_state_name(state); } + private: int sd; + public: int port; int peer_type; entity_addr_t peer_addr; @@ -300,10 +307,19 @@ class DispatchQueue; void discard_out_queue(); void shutdown_socket() { + recv_reset(); if (sd >= 0) ::shutdown(sd, SHUT_RDWR); } + void recv_reset() { + recv_len = 0; + recv_ofs = 0; + } + int do_recv(char *buf, size_t len, int flags); + int buffered_recv(char *buf, size_t len, int flags); + bool has_pending_data() { return recv_len > recv_ofs; } + /** * do a blocking read of len bytes from socket *