]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: use provided rx buffer if present
authorSage Weil <sage@newdream.net>
Mon, 15 Nov 2010 04:26:52 +0000 (20:26 -0800)
committerSage Weil <sage@newdream.net>
Mon, 15 Nov 2010 04:39:07 +0000 (20:39 -0800)
This changes the read path so that we hold the Connection::lock mutex while
reading data off the socket.  This ensures that we are reading into a
buffer we are allowed to use, and allows users to revoke a previously
posted buffer.  If that happens, switch over to a newly allocated buffer.

Note that currently the final result bufferlist may contain part of the
provided buffer and part of a newly allocated buffer.  This is okay as long
as we will always read the same data into the buffer.  And in practice, if
the rx buffer is revoked then the message itself will be thrown out anyway.

Signed-off-by: Sage Weil <sage@newdream.net>
src/msg/SimpleMessenger.cc
src/msg/tcp.cc
src/msg/tcp.h

index 423c4304711665bd3b8c0ca0a193055648211942..64a7ea736d3cc3c7b63089c2b58e48c50bc951df 100644 (file)
@@ -1706,6 +1706,29 @@ void SimpleMessenger::Pipe::unlock_maybe_reap()
   }
 }
 
+static void alloc_aligned_buffer(bufferlist& data, int len, int off)
+{
+  // create a buffer to read into that matches the data alignment
+  int left = len;
+  int head = 0;
+  if (off & ~PAGE_MASK) {
+    // head
+    head = MIN(PAGE_SIZE - (off & ~PAGE_MASK), (unsigned)left);
+    bufferptr bp = buffer::create(head);
+    data.push_back(bp);
+    left -= head;
+  }
+  int middle = left & PAGE_MASK;
+  if (middle > 0) {
+    bufferptr bp = buffer::create_page_aligned(middle);
+    data.push_back(bp);
+    left -= middle;
+  }
+  if (left) {
+    bufferptr bp = buffer::create(left);
+    data.push_back(bp);
+  }
+}
 
 int SimpleMessenger::Pipe::read_message(Message **pm)
 {
@@ -1796,36 +1819,55 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
   data_len = le32_to_cpu(header.data_len);
   data_off = le32_to_cpu(header.data_off);
   if (data_len) {
-    int left = data_len;
-    if (data_off & ~PAGE_MASK) {
-      // head
-      int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
-                    (unsigned)left);
-      bufferptr bp = buffer::create(head);
-      if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
+    unsigned offset = 0;
+    unsigned left = data_len;
+
+    bufferlist newbuf, rxbuf;
+    bufferlist::iterator blp;
+    int rxbuf_version = 0;
+       
+    while (left > 0) {
+      // wait for data
+      if (tcp_wait(sd, messenger->timeout) < 0)
        goto out_dethrottle;
-      data.push_back(bp);
-      left -= head;
-      dout(20) << "reader got data head " << head << dendl;
-    }
 
-    // middle
-    int middle = left & PAGE_MASK;
-    if (middle > 0) {
-      bufferptr bp = buffer::create_page_aligned(middle);
-      if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
-       goto out_dethrottle;
-      data.push_back(bp);
-      left -= middle;
-      dout(20) << "reader got data page-aligned middle " << middle << dendl;
-    }
-
-    if (left) {
-      bufferptr bp = buffer::create(left);
-      if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
+      // get a buffer
+      connection_state->lock.Lock();
+      map<tid_t,pair<bufferlist,int> >::iterator p = connection_state->rx_buffers.find(header.tid);
+      if (p != connection_state->rx_buffers.end()) {
+       if (rxbuf.length() == 0 || p->second.second != rxbuf_version) {
+         dout(10) << "reader seleting rx buffer v " << p->second.second
+                  << " at offset " << offset
+                  << " len " << p->second.first.length() << dendl;
+         rxbuf = p->second.first;
+         rxbuf_version = p->second.second;
+         // make sure it's big enough
+         if (rxbuf.length() < data_len)
+           rxbuf.push_back(buffer::create(data_len - rxbuf.length()));
+         blp = p->second.first.begin();
+         blp.advance(offset);
+       }
+      } else {
+       if (!newbuf.length()) {
+         dout(20) << "reader allocating new rx buffer at offset " << offset << dendl;
+         alloc_aligned_buffer(newbuf, data_len, data_off);
+         blp = newbuf.begin();
+         blp.advance(offset);
+       }
+      }
+      bufferptr bp = blp.get_current_ptr();
+      int read = MIN(bp.length(), left);
+      dout(20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl;
+      int got = tcp_read_nonblocking(sd, bp.c_str(), read);
+      dout(30) << "reader read " << got << " of " << read << dendl;
+      connection_state->lock.Unlock();
+      if (got < 0)
        goto out_dethrottle;
-      data.push_back(bp);
-      dout(20) << "reader got data tail " << left << dendl;
+      assert(got > 0);  // hmm.. right?
+      blp.advance(got);
+      data.append(bp, 0, got);
+      offset += got;
+      left -= got;
     }
   }
 
index bab1a7651ff45edbe93bd3f4f56987469008cc40..f67db85079543cab6457e4eef18a2ddaf3370f3a 100644 (file)
@@ -7,8 +7,8 @@
 /******************
  * tcp crap
  */
-
-int tcp_read(int sd, char *buf, int len, int timeout) {
+int tcp_read(int sd, char *buf, int len, int timeout) 
+{
   if (sd < 0)
     return -1;
   struct pollfd pfd;
@@ -40,7 +40,30 @@ int tcp_read(int sd, char *buf, int len, int timeout) {
   return len;
 }
 
-int tcp_write(int sd, const char *buf, int len) {
+int tcp_wait(int sd, int timeout) 
+{
+  if (sd < 0)
+    return -1;
+  struct pollfd pfd;
+  pfd.fd = sd;
+  pfd.events = POLLIN | POLLHUP | POLLRDHUP | POLLNVAL | POLLERR;
+
+  if (poll(&pfd, 1, timeout) <= 0)
+    return -1;
+
+  if (!(pfd.revents & POLLIN))
+    return -1;
+
+  return 0;
+}
+
+int tcp_read_nonblocking(int sd, char *buf, int len)
+{
+  return ::recv(sd, buf, len, MSG_DONTWAIT);
+}
+
+int tcp_write(int sd, const char *buf, int len)
+{
   if (sd < 0)
     return -1;
   struct pollfd pfd;
index 89548aa10e9e46750e01f8a156d3de348b1c8b20..31ae967747b418f3072c26b8ed9e05c393537a9b 100644 (file)
@@ -26,6 +26,8 @@ inline ostream& operator<<(ostream& out, const sockaddr_storage &ss)
 }
 
 extern int tcp_read(int sd, char *buf, int len, int timeout=-1);
+extern int tcp_wait(int sd, int timeout);
+extern int tcp_read_nonblocking(int sd, char *buf, int len);
 extern int tcp_write(int sd, const char *buf, int len);
 
 inline bool operator==(const sockaddr_in& a, const sockaddr_in& b) {