}
}
+static void alloc_aligned_buffer(bufferlist& data, int len, int off)
+{
+ // create a buffer to read into that matches the data alignment
+ int left = len;
+ int head = 0;
+ if (off & ~PAGE_MASK) {
+ // head
+ head = MIN(PAGE_SIZE - (off & ~PAGE_MASK), (unsigned)left);
+ bufferptr bp = buffer::create(head);
+ data.push_back(bp);
+ left -= head;
+ }
+ int middle = left & PAGE_MASK;
+ if (middle > 0) {
+ bufferptr bp = buffer::create_page_aligned(middle);
+ data.push_back(bp);
+ left -= middle;
+ }
+ if (left) {
+ bufferptr bp = buffer::create(left);
+ data.push_back(bp);
+ }
+}
int SimpleMessenger::Pipe::read_message(Message **pm)
{
data_len = le32_to_cpu(header.data_len);
data_off = le32_to_cpu(header.data_off);
if (data_len) {
- int left = data_len;
- if (data_off & ~PAGE_MASK) {
- // head
- int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
- (unsigned)left);
- bufferptr bp = buffer::create(head);
- if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
+ unsigned offset = 0;
+ unsigned left = data_len;
+
+ bufferlist newbuf, rxbuf;
+ bufferlist::iterator blp;
+ int rxbuf_version = 0;
+
+ while (left > 0) {
+ // wait for data
+ if (tcp_wait(sd, messenger->timeout) < 0)
goto out_dethrottle;
- data.push_back(bp);
- left -= head;
- dout(20) << "reader got data head " << head << dendl;
- }
- // middle
- int middle = left & PAGE_MASK;
- if (middle > 0) {
- bufferptr bp = buffer::create_page_aligned(middle);
- if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
- goto out_dethrottle;
- data.push_back(bp);
- left -= middle;
- dout(20) << "reader got data page-aligned middle " << middle << dendl;
- }
-
- if (left) {
- bufferptr bp = buffer::create(left);
- if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
+ // get a buffer
+ connection_state->lock.Lock();
+ map<tid_t,pair<bufferlist,int> >::iterator p = connection_state->rx_buffers.find(header.tid);
+ if (p != connection_state->rx_buffers.end()) {
+ if (rxbuf.length() == 0 || p->second.second != rxbuf_version) {
+ dout(10) << "reader seleting rx buffer v " << p->second.second
+ << " at offset " << offset
+ << " len " << p->second.first.length() << dendl;
+ rxbuf = p->second.first;
+ rxbuf_version = p->second.second;
+ // make sure it's big enough
+ if (rxbuf.length() < data_len)
+ rxbuf.push_back(buffer::create(data_len - rxbuf.length()));
+ blp = p->second.first.begin();
+ blp.advance(offset);
+ }
+ } else {
+ if (!newbuf.length()) {
+ dout(20) << "reader allocating new rx buffer at offset " << offset << dendl;
+ alloc_aligned_buffer(newbuf, data_len, data_off);
+ blp = newbuf.begin();
+ blp.advance(offset);
+ }
+ }
+ bufferptr bp = blp.get_current_ptr();
+ int read = MIN(bp.length(), left);
+ dout(20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl;
+ int got = tcp_read_nonblocking(sd, bp.c_str(), read);
+ dout(30) << "reader read " << got << " of " << read << dendl;
+ connection_state->lock.Unlock();
+ if (got < 0)
goto out_dethrottle;
- data.push_back(bp);
- dout(20) << "reader got data tail " << left << dendl;
+ assert(got > 0); // hmm.. right?
+ blp.advance(got);
+ data.append(bp, 0, got);
+ offset += got;
+ left -= got;
}
}
/******************
* tcp crap
*/
-
-int tcp_read(int sd, char *buf, int len, int timeout) {
+int tcp_read(int sd, char *buf, int len, int timeout)
+{
if (sd < 0)
return -1;
struct pollfd pfd;
return len;
}
-int tcp_write(int sd, const char *buf, int len) {
+int tcp_wait(int sd, int timeout)
+{
+ if (sd < 0)
+ return -1;
+ struct pollfd pfd;
+ pfd.fd = sd;
+ pfd.events = POLLIN | POLLHUP | POLLRDHUP | POLLNVAL | POLLERR;
+
+ if (poll(&pfd, 1, timeout) <= 0)
+ return -1;
+
+ if (!(pfd.revents & POLLIN))
+ return -1;
+
+ return 0;
+}
+
+int tcp_read_nonblocking(int sd, char *buf, int len)
+{
+ return ::recv(sd, buf, len, MSG_DONTWAIT);
+}
+
+int tcp_write(int sd, const char *buf, int len)
+{
if (sd < 0)
return -1;
struct pollfd pfd;