delay_thread(NULL),
msgr(r),
conn_id(r->dispatch_queue.get_id()),
+ recv_ofs(0),
+ recv_len(0),
sd(-1), port(0),
peer_type(-1),
pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms
if (msgr->timeout == 0)
msgr->timeout = -1;
+
+ recv_max_prefetch = msgr->cct->_conf->ms_tcp_prefetch_max_size;
+ recv_buf = new char[recv_max_prefetch];
}
Pipe::~Pipe()
assert(out_q.empty());
assert(sent.empty());
delete delay_thread;
+ delete[] recv_buf;
}
void Pipe::handle_ack(uint64_t seq)
// used for reading in the remote acked seq on connect
uint64_t newly_acked_seq = 0;
+ recv_reset();
+
set_socket_options();
// announce myself.
goto fail;
}
+ recv_reset();
// connect!
ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
rc = ::connect(sd, (sockaddr*)&peer_addr.addr, peer_addr.addr_size());
m->set_connection(connection_state.get());
uint64_t features = connection_state->get_features();
+
if (m->empty_payload())
ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " features " << features
<< " " << m << " " << *m << dendl;
blist.append(m->get_middle());
blist.append(m->get_data());
- pipe_lock.Unlock();
+ pipe_lock.Unlock();
ldout(msgr->cct,20) << "writer sending " << m->get_seq() << " " << m << dendl;
int rc = write_message(header, footer, blist);
pfd.events |= POLLRDHUP;
#endif
+ if (has_pending_data())
+ return 0;
+
if (poll(&pfd, 1, msgr->timeout) <= 0)
return -1;
return 0;
}
-int Pipe::tcp_read_nonblocking(char *buf, int len)
+int Pipe::do_recv(char *buf, size_t len, int flags)
{
again:
- int got = ::recv( sd, buf, len, MSG_DONTWAIT );
+ int got = ::recv( sd, buf, len, flags );
if (got < 0) {
if (errno == EAGAIN || errno == EINTR) {
goto again;
- } else {
- ldout(msgr->cct, 10) << "tcp_read_nonblocking socket " << sd << " returned "
+ }
+ ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
<< got << " " << cpp_strerror(errno) << dendl;
- return -1;
+ return -1;
+ }
+ if (got == 0) {
+ return -1;
+ }
+ return got;
+}
+
+int Pipe::buffered_recv(char *buf, size_t len, int flags)
+{
+ int left = len;
+ int total_recv = 0;
+ if (recv_len > recv_ofs) {
+ int to_read = MIN(recv_len - recv_ofs, left);
+ memcpy(buf, &recv_buf[recv_ofs], to_read);
+ recv_ofs += to_read;
+ left -= to_read;
+ if (left == 0) {
+ return to_read;
}
- } else if (got == 0) {
+ buf += to_read;
+ total_recv += to_read;
+ }
+
+ /* nothing left in the prefetch buffer */
+
+ if (len > (size_t)recv_max_prefetch) {
+ /* this was a large read, we don't prefetch for these */
+ int ret = do_recv(buf, left, flags );
+ if (ret < 0) {
+ if (total_recv > 0)
+ return total_recv;
+ return ret;
+ }
+ total_recv += ret;
+ return total_recv;
+ }
+
+
+ int got = do_recv(recv_buf, recv_max_prefetch, flags);
+ if (got <= 0) {
+ if (total_recv > 0)
+ return total_recv;
+
+ return got;
+ }
+
+ recv_len = got;
+ got = MIN(left, got);
+ memcpy(buf, recv_buf, got);
+ recv_ofs = got;
+ total_recv += got;
+ return total_recv;
+}
+
+int Pipe::tcp_read_nonblocking(char *buf, int len)
+{
+ int got = buffered_recv(buf, len, MSG_DONTWAIT );
+ if (got < 0) {
+ ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
+ << got << " " << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ if (got == 0) {
/* poll() said there was data, but we didn't read any - peer
* sent a FIN. Maybe POLLRDHUP signals this, but this is
* standard socket behavior as documented by Stevens.