From: Sage Weil Date: Mon, 15 Nov 2010 04:26:52 +0000 (-0800) Subject: msgr: use provided rx buffer if present X-Git-Tag: v0.24~185^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7cb2d50850cd8028750b4f9ebb805f2325e47bb2;p=ceph.git msgr: use provided rx buffer if present This changes the read path so that we hold the Connection::lock mutex while reading data off the socket. This ensures that we are reading into a buffer we are allowed to use, and allows users to revoke a previously posted buffer. If that happens, switch over to a newly allocated buffer. Note that currently the final result bufferlist may contain part of the provided buffer and part of a newly allocated buffer. This is okay as long as we will always read the same data into the buffer. And in practice, if the rx buffer is revoked then the message itself will be thrown out anyway. Signed-off-by: Sage Weil --- diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 423c43047116..64a7ea736d3c 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -1706,6 +1706,29 @@ void SimpleMessenger::Pipe::unlock_maybe_reap() } } +static void alloc_aligned_buffer(bufferlist& data, int len, int off) +{ + // create a buffer to read into that matches the data alignment + int left = len; + int head = 0; + if (off & ~PAGE_MASK) { + // head + head = MIN(PAGE_SIZE - (off & ~PAGE_MASK), (unsigned)left); + bufferptr bp = buffer::create(head); + data.push_back(bp); + left -= head; + } + int middle = left & PAGE_MASK; + if (middle > 0) { + bufferptr bp = buffer::create_page_aligned(middle); + data.push_back(bp); + left -= middle; + } + if (left) { + bufferptr bp = buffer::create(left); + data.push_back(bp); + } +} int SimpleMessenger::Pipe::read_message(Message **pm) { @@ -1796,36 +1819,55 @@ int SimpleMessenger::Pipe::read_message(Message **pm) data_len = le32_to_cpu(header.data_len); data_off = le32_to_cpu(header.data_off); if (data_len) { - int left = data_len; - if (data_off & ~PAGE_MASK) { - // head - int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK), - (unsigned)left); - bufferptr bp = buffer::create(head); - if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0) + unsigned offset = 0; + unsigned left = data_len; + + bufferlist newbuf, rxbuf; + bufferlist::iterator blp; + int rxbuf_version = 0; + + while (left > 0) { + // wait for data + if (tcp_wait(sd, messenger->timeout) < 0) goto out_dethrottle; - data.push_back(bp); - left -= head; - dout(20) << "reader got data head " << head << dendl; - } - // middle - int middle = left & PAGE_MASK; - if (middle > 0) { - bufferptr bp = buffer::create_page_aligned(middle); - if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0) - goto out_dethrottle; - data.push_back(bp); - left -= middle; - dout(20) << "reader got data page-aligned middle " << middle << dendl; - } - - if (left) { - bufferptr bp = buffer::create(left); - if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0) + // get a buffer + connection_state->lock.Lock(); + map >::iterator p = connection_state->rx_buffers.find(header.tid); + if (p != connection_state->rx_buffers.end()) { + if (rxbuf.length() == 0 || p->second.second != rxbuf_version) { + dout(10) << "reader seleting rx buffer v " << p->second.second + << " at offset " << offset + << " len " << p->second.first.length() << dendl; + rxbuf = p->second.first; + rxbuf_version = p->second.second; + // make sure it's big enough + if (rxbuf.length() < data_len) + rxbuf.push_back(buffer::create(data_len - rxbuf.length())); + blp = p->second.first.begin(); + blp.advance(offset); + } + } else { + if (!newbuf.length()) { + dout(20) << "reader allocating new rx buffer at offset " << offset << dendl; + alloc_aligned_buffer(newbuf, data_len, data_off); + blp = newbuf.begin(); + blp.advance(offset); + } + } + bufferptr bp = blp.get_current_ptr(); + int read = MIN(bp.length(), left); + dout(20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl; + int got = tcp_read_nonblocking(sd, bp.c_str(), read); + dout(30) << "reader read " << got << " of " << read << dendl; + connection_state->lock.Unlock(); + if (got < 0) goto out_dethrottle; - data.push_back(bp); - dout(20) << "reader got data tail " << left << dendl; + assert(got > 0); // hmm.. right? + blp.advance(got); + data.append(bp, 0, got); + offset += got; + left -= got; } } diff --git a/src/msg/tcp.cc b/src/msg/tcp.cc index bab1a7651ff4..f67db8507954 100644 --- a/src/msg/tcp.cc +++ b/src/msg/tcp.cc @@ -7,8 +7,8 @@ /****************** * tcp crap */ - -int tcp_read(int sd, char *buf, int len, int timeout) { +int tcp_read(int sd, char *buf, int len, int timeout) +{ if (sd < 0) return -1; struct pollfd pfd; @@ -40,7 +40,30 @@ int tcp_read(int sd, char *buf, int len, int timeout) { return len; } -int tcp_write(int sd, const char *buf, int len) { +int tcp_wait(int sd, int timeout) +{ + if (sd < 0) + return -1; + struct pollfd pfd; + pfd.fd = sd; + pfd.events = POLLIN | POLLHUP | POLLRDHUP | POLLNVAL | POLLERR; + + if (poll(&pfd, 1, timeout) <= 0) + return -1; + + if (!(pfd.revents & POLLIN)) + return -1; + + return 0; +} + +int tcp_read_nonblocking(int sd, char *buf, int len) +{ + return ::recv(sd, buf, len, MSG_DONTWAIT); +} + +int tcp_write(int sd, const char *buf, int len) +{ if (sd < 0) return -1; struct pollfd pfd; diff --git a/src/msg/tcp.h b/src/msg/tcp.h index 89548aa10e9e..31ae967747b4 100644 --- a/src/msg/tcp.h +++ b/src/msg/tcp.h @@ -26,6 +26,8 @@ inline ostream& operator<<(ostream& out, const sockaddr_storage &ss) } extern int tcp_read(int sd, char *buf, int len, int timeout=-1); +extern int tcp_wait(int sd, int timeout); +extern int tcp_read_nonblocking(int sd, char *buf, int len); extern int tcp_write(int sd, const char *buf, int len); inline bool operator==(const sockaddr_in& a, const sockaddr_in& b) {