]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg: prefetch data when doing recv() 2457/head
authorYehuda Sadeh <yehuda@inktank.com>
Thu, 12 Dec 2013 17:14:16 +0000 (09:14 -0800)
committerSage Weil <sage@redhat.com>
Thu, 11 Sep 2014 22:46:51 +0000 (15:46 -0700)
Since that when we're reading the header we do lot's of small IOs, we
can really benefit from doing bigger reads.
Set a limit on the buffer size, so that it should mainly affect header
reads and not the actual data transfer of large IOs.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/common/config_opts.h
src/msg/Pipe.cc
src/msg/Pipe.h

index b4486699c932f2e137a91d7d845e2617907b281c..2fa71758f9c49a340530b968a4fd86a55c242dcf 100644 (file)
@@ -115,6 +115,7 @@ OPTION(perf, OPT_BOOL, true)       // enable internal perf counters
 
 OPTION(ms_tcp_nodelay, OPT_BOOL, true)
 OPTION(ms_tcp_rcvbuf, OPT_INT, 0)
+OPTION(ms_tcp_prefetch_max_size, OPT_INT, 4096) // max prefetch size, we limit this to avoid extra memcpy
 OPTION(ms_initial_backoff, OPT_DOUBLE, .2)
 OPTION(ms_max_backoff, OPT_DOUBLE, 15.0)
 OPTION(ms_nocrc, OPT_BOOL, false)
index cbd04bc6e7731d6e5c5b0548f88a2cc7557bddac..814da5e18b1f95523a70f6a320d70a415f8ada10 100644 (file)
@@ -79,6 +79,8 @@ Pipe::Pipe(SimpleMessenger *r, int st, PipeConnection *con)
     delay_thread(NULL),
     msgr(r),
     conn_id(r->dispatch_queue.get_id()),
+    recv_ofs(0),
+    recv_len(0),
     sd(-1), port(0),
     peer_type(-1),
     pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
@@ -108,6 +110,9 @@ Pipe::Pipe(SimpleMessenger *r, int st, PipeConnection *con)
   msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms
   if (msgr->timeout == 0)
     msgr->timeout = -1;
+
+  recv_max_prefetch = msgr->cct->_conf->ms_tcp_prefetch_max_size;
+  recv_buf = new char[recv_max_prefetch];
 }
 
 Pipe::~Pipe()
@@ -115,6 +120,7 @@ Pipe::~Pipe()
   assert(out_q.empty());
   assert(sent.empty());
   delete delay_thread;
+  delete[] recv_buf;
 }
 
 void Pipe::handle_ack(uint64_t seq)
@@ -270,6 +276,8 @@ int Pipe::accept()
   // used for reading in the remote acked seq on connect
   uint64_t newly_acked_seq = 0;
 
+  recv_reset();
+
   set_socket_options();
 
   // announce myself.
@@ -844,6 +852,7 @@ int Pipe::connect()
     goto fail;
   }
 
+  recv_reset();
   // connect!
   ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
   rc = ::connect(sd, (sockaddr*)&peer_addr.addr, peer_addr.addr_size());
@@ -1706,6 +1715,7 @@ void Pipe::writer()
        m->set_connection(connection_state.get());
 
        uint64_t features = connection_state->get_features();
+
        if (m->empty_payload())
          ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " features " << features
                              << " " << m << " " << *m << dendl;
@@ -1741,7 +1751,7 @@ void Pipe::writer()
        blist.append(m->get_middle());
        blist.append(m->get_data());
 
-       pipe_lock.Unlock();
+        pipe_lock.Unlock();
 
         ldout(msgr->cct,20) << "writer sending " << m->get_seq() << " " << m << dendl;
        int rc = write_message(header, footer, blist);
@@ -2314,6 +2324,9 @@ int Pipe::tcp_read_wait()
   pfd.events |= POLLRDHUP;
 #endif
 
+  if (has_pending_data())
+    return 0;
+
   if (poll(&pfd, 1, msgr->timeout) <= 0)
     return -1;
 
@@ -2330,19 +2343,80 @@ int Pipe::tcp_read_wait()
   return 0;
 }
 
-int Pipe::tcp_read_nonblocking(char *buf, int len)
+int Pipe::do_recv(char *buf, size_t len, int flags)
 {
 again:
-  int got = ::recv( sd, buf, len, MSG_DONTWAIT );
+  int got = ::recv( sd, buf, len, flags );
   if (got < 0) {
     if (errno == EAGAIN || errno == EINTR) {
       goto again;
-    } else {
-      ldout(msgr->cct, 10) << "tcp_read_nonblocking socket " << sd << " returned "
+    }
+    ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
                     << got << " " << cpp_strerror(errno) << dendl;
-      return -1;
+    return -1;
+  }
+  if (got == 0) {
+    return -1;
+  }
+  return got;
+}
+
+int Pipe::buffered_recv(char *buf, size_t len, int flags)
+{
+  int left = len;
+  int total_recv = 0;
+  if (recv_len > recv_ofs) {
+    int to_read = MIN(recv_len - recv_ofs, left);
+    memcpy(buf, &recv_buf[recv_ofs], to_read);
+    recv_ofs += to_read;
+    left -= to_read;
+    if (left == 0) {
+      return to_read;
     }
-  } else if (got == 0) {
+    buf += to_read;
+    total_recv += to_read;
+  }
+
+  /* nothing left in the prefetch buffer */
+
+  if (len > (size_t)recv_max_prefetch) {
+    /* this was a large read, we don't prefetch for these */
+    int ret = do_recv(buf, left, flags );
+    if (ret < 0) {
+      if (total_recv > 0)
+        return total_recv;
+      return ret;
+    }
+    total_recv += ret;
+    return total_recv;
+  }
+
+
+  int got = do_recv(recv_buf, recv_max_prefetch, flags);
+  if (got <= 0) {
+    if (total_recv > 0)
+      return total_recv;
+
+    return got;
+  }
+
+  recv_len = got;
+  got = MIN(left, got);
+  memcpy(buf, recv_buf, got);
+  recv_ofs = got;
+  total_recv += got;
+  return total_recv;
+}
+
+int Pipe::tcp_read_nonblocking(char *buf, int len)
+{
+  int got = buffered_recv(buf, len, MSG_DONTWAIT );
+  if (got < 0) {
+    ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
+                        << got << " " << cpp_strerror(errno) << dendl;
+    return -1;
+  }
+  if (got == 0) {
     /* poll() said there was data, but we didn't read any - peer
      * sent a FIN.  Maybe POLLRDHUP signals this, but this is
      * standard socket behavior as documented by Stevens.
index cc2044c3d44fb857c8bdd3fddf2ff71d66c15a1f..7c32bc090d8aa0104ef3a2b08533aee339c360aa 100644 (file)
@@ -131,6 +131,11 @@ class DispatchQueue;
       return static_cast<Pipe*>(RefCountedObject::get());
     }
 
+    char *recv_buf;
+    int recv_max_prefetch;
+    int recv_ofs;
+    int recv_len;
+
     enum {
       STATE_ACCEPTING,
       STATE_CONNECTING,
@@ -157,7 +162,9 @@ class DispatchQueue;
       return get_state_name(state);
     }
 
+  private:
     int sd;
+  public:
     int port;
     int peer_type;
     entity_addr_t peer_addr;
@@ -300,10 +307,19 @@ class DispatchQueue;
     void discard_out_queue();
 
     void shutdown_socket() {
+      recv_reset();
       if (sd >= 0)
         ::shutdown(sd, SHUT_RDWR);
     }
 
+    void recv_reset() {
+      recv_len = 0;
+      recv_ofs = 0;
+    }
+    int do_recv(char *buf, size_t len, int flags);
+    int buffered_recv(char *buf, size_t len, int flags);
+    bool has_pending_data() { return recv_len > recv_ofs; }
+
     /**
      * do a blocking read of len bytes from socket
      *