From 7cb2d50850cd8028750b4f9ebb805f2325e47bb2 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Sun, 14 Nov 2010 20:26:52 -0800 Subject: [PATCH] msgr: use provided rx buffer if present This changes the read path so that we hold the Connection::lock mutex while reading data off the socket. This ensures that we are reading into a buffer we are allowed to use, and allows users to revoke a previously posted buffer. If that happens, switch over to a newly allocated buffer. Note that currently the final result bufferlist may contain part of the provided buffer and part of a newly allocated buffer. This is okay as long as we will always read the same data into the buffer. And in practice, if the rx buffer is revoked then the message itself will be thrown out anyway. Signed-off-by: Sage Weil --- src/msg/SimpleMessenger.cc | 96 +++++++++++++++++++++++++++----------- src/msg/tcp.cc | 29 ++++++++++-- src/msg/tcp.h | 2 + 3 files changed, 97 insertions(+), 30 deletions(-) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 423c430471166..64a7ea736d3cc 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -1706,6 +1706,29 @@ void SimpleMessenger::Pipe::unlock_maybe_reap() } } +static void alloc_aligned_buffer(bufferlist& data, int len, int off) +{ + // create a buffer to read into that matches the data alignment + int left = len; + int head = 0; + if (off & ~PAGE_MASK) { + // head + head = MIN(PAGE_SIZE - (off & ~PAGE_MASK), (unsigned)left); + bufferptr bp = buffer::create(head); + data.push_back(bp); + left -= head; + } + int middle = left & PAGE_MASK; + if (middle > 0) { + bufferptr bp = buffer::create_page_aligned(middle); + data.push_back(bp); + left -= middle; + } + if (left) { + bufferptr bp = buffer::create(left); + data.push_back(bp); + } +} int SimpleMessenger::Pipe::read_message(Message **pm) { @@ -1796,36 +1819,55 @@ int SimpleMessenger::Pipe::read_message(Message **pm) data_len = le32_to_cpu(header.data_len); data_off = le32_to_cpu(header.data_off); if (data_len) { - int left = data_len; - if (data_off & ~PAGE_MASK) { - // head - int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK), - (unsigned)left); - bufferptr bp = buffer::create(head); - if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0) + unsigned offset = 0; + unsigned left = data_len; + + bufferlist newbuf, rxbuf; + bufferlist::iterator blp; + int rxbuf_version = 0; + + while (left > 0) { + // wait for data + if (tcp_wait(sd, messenger->timeout) < 0) goto out_dethrottle; - data.push_back(bp); - left -= head; - dout(20) << "reader got data head " << head << dendl; - } - // middle - int middle = left & PAGE_MASK; - if (middle > 0) { - bufferptr bp = buffer::create_page_aligned(middle); - if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0) - goto out_dethrottle; - data.push_back(bp); - left -= middle; - dout(20) << "reader got data page-aligned middle " << middle << dendl; - } - - if (left) { - bufferptr bp = buffer::create(left); - if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0) + // get a buffer + connection_state->lock.Lock(); + map >::iterator p = connection_state->rx_buffers.find(header.tid); + if (p != connection_state->rx_buffers.end()) { + if (rxbuf.length() == 0 || p->second.second != rxbuf_version) { + dout(10) << "reader seleting rx buffer v " << p->second.second + << " at offset " << offset + << " len " << p->second.first.length() << dendl; + rxbuf = p->second.first; + rxbuf_version = p->second.second; + // make sure it's big enough + if (rxbuf.length() < data_len) + rxbuf.push_back(buffer::create(data_len - rxbuf.length())); + blp = p->second.first.begin(); + blp.advance(offset); + } + } else { + if (!newbuf.length()) { + dout(20) << "reader allocating new rx buffer at offset " << offset << dendl; + alloc_aligned_buffer(newbuf, data_len, data_off); + blp = newbuf.begin(); + blp.advance(offset); + } + } + bufferptr bp = blp.get_current_ptr(); + int read = MIN(bp.length(), left); + dout(20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl; + int got = tcp_read_nonblocking(sd, bp.c_str(), read); + dout(30) << "reader read " << got << " of " << read << dendl; + connection_state->lock.Unlock(); + if (got < 0) goto out_dethrottle; - data.push_back(bp); - dout(20) << "reader got data tail " << left << dendl; + assert(got > 0); // hmm.. right? + blp.advance(got); + data.append(bp, 0, got); + offset += got; + left -= got; } } diff --git a/src/msg/tcp.cc b/src/msg/tcp.cc index bab1a7651ff45..f67db85079543 100644 --- a/src/msg/tcp.cc +++ b/src/msg/tcp.cc @@ -7,8 +7,8 @@ /****************** * tcp crap */ - -int tcp_read(int sd, char *buf, int len, int timeout) { +int tcp_read(int sd, char *buf, int len, int timeout) +{ if (sd < 0) return -1; struct pollfd pfd; @@ -40,7 +40,30 @@ int tcp_read(int sd, char *buf, int len, int timeout) { return len; } -int tcp_write(int sd, const char *buf, int len) { +int tcp_wait(int sd, int timeout) +{ + if (sd < 0) + return -1; + struct pollfd pfd; + pfd.fd = sd; + pfd.events = POLLIN | POLLHUP | POLLRDHUP | POLLNVAL | POLLERR; + + if (poll(&pfd, 1, timeout) <= 0) + return -1; + + if (!(pfd.revents & POLLIN)) + return -1; + + return 0; +} + +int tcp_read_nonblocking(int sd, char *buf, int len) +{ + return ::recv(sd, buf, len, MSG_DONTWAIT); +} + +int tcp_write(int sd, const char *buf, int len) +{ if (sd < 0) return -1; struct pollfd pfd; diff --git a/src/msg/tcp.h b/src/msg/tcp.h index 89548aa10e9e4..31ae967747b41 100644 --- a/src/msg/tcp.h +++ b/src/msg/tcp.h @@ -26,6 +26,8 @@ inline ostream& operator<<(ostream& out, const sockaddr_storage &ss) } extern int tcp_read(int sd, char *buf, int len, int timeout=-1); +extern int tcp_wait(int sd, int timeout); +extern int tcp_read_nonblocking(int sd, char *buf, int len); extern int tcp_write(int sd, const char *buf, int len); inline bool operator==(const sockaddr_in& a, const sockaddr_in& b) { -- 2.39.5