]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: implement function to prefetch buffers
authorChangcheng Liu <changcheng.liu@aliyun.com>
Fri, 2 Aug 2019 03:23:08 +0000 (11:23 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 23 Aug 2019 06:36:05 +0000 (14:36 +0800)
The original RDMAConnectedSocketImpl::read read date from buffers and
prefertch data into buffers for next round of reading. It makes the
logical a little complex and the code isn't smooth to be read.
In this patch:
1) RDMAConnectedSocketImpl::buffer_prefetch private API is added to
prefetch data into buffers at the head of read_buffers.
2) reduce one time of calling notify() to reduce context switches.
It's really not needed to notify upper layer to read data since current
read operation hasn't finished yet.
3) Simplify RDMAConnectedSocketImpl::read implementation.

Signed-off-by: Changcheng Liu <changcheng.liu@aliyun.com>
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAStack.h

index f06f179fdd8fbd5bf177bacc8951b44284bc90d2..8fb486834ab86b54e80c0a7fff418811be010430 100644 (file)
@@ -271,31 +271,37 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
     return -EAGAIN;
   }
   ssize_t read = 0;
-  if (!buffers.empty())
-    read = read_buffers(buf,len);
+  read = read_buffers(buf,len);
 
+  if (is_server && connected == 0) {
+    ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
+    connected = 1; //if so, we don't need the last handshake
+    cleanup();
+    submit(false);
+  }
+
+  if (!buffers.empty()) {
+    notify();
+  }
+
+  if (read == 0 && error)
+    return -error;
+  return read == 0 ? -EAGAIN : read;
+}
+
+void RDMAConnectedSocketImpl::buffer_prefetch(void)
+{
   std::vector<ibv_wc> cqe;
   get_wc(cqe);
-  if (cqe.empty()) {
-    if (!buffers.empty()) {
-      notify();
-    }
-    if (read > 0) {
-      return read;
-    }
-    if (error) {
-      return -error;
-    } else {
-      return -EAGAIN;
-    }
-  }
+  if(cqe.empty())
+    return;
 
-  ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;
-  for (size_t i = 0; i < cqe.size(); ++i) {
+  for(size_t i = 0; i < cqe.size(); ++i) {
     ibv_wc* response = &cqe[i];
     ceph_assert(response->status == IBV_WC_SUCCESS);
     Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
     chunk->prepare_read(response->byte_len);
+
     if (chunk->get_size() == 0) {
       chunk->reset_read_chunk();
       dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
@@ -305,42 +311,18 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
       }
       dispatcher->post_chunk_to_pool(chunk);
       continue;
-    }
-    ldout(cct, 25) << __func__ << " chunk " << chunk << " has bytes " << chunk->get_size() << dendl;
-    worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
-
-    read += chunk->read(buf + read, len - read);
-
-    if (chunk->get_size()) {
+    } else {
       buffers.push_back(chunk);
       ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
-    } else {
-      chunk->reset_read_chunk();
-      dispatcher->post_chunk_to_pool(chunk);
-      update_post_backlog();
     }
   }
-
   worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
-  if (is_server && connected == 0) {
-    ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
-    connected = 1; //if so, we don't need the last handshake
-    cleanup();
-    submit(false);
-  }
-
-  if (!buffers.empty()) {
-    notify();
-  }
-
-  if (read == 0 && error)
-    return -error;
-  return read == 0 ? -EAGAIN : read;
 }
 
 ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
 {
   size_t read_size = 0, tmp = 0;
+  buffer_prefetch();
   auto pchunk = buffers.begin();
   while (pchunk != buffers.end()) {
     tmp = (*pchunk)->read(buf + read_size, len - read_size);
@@ -363,6 +345,7 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
 
   buffers.erase(buffers.begin(), pchunk);
   ldout(cct, 25) << __func__ << " got " << read_size  << " bytes, buffers size: " << buffers.size() << dendl;
+  worker->perf_logger->inc(l_msgr_rdma_rx_bytes, read_size);
   return read_size;
 }
 
index 1642d17ad4d29b99033b460894cca557a82c8a30..13a80f68a5cf39ac7267104c98b03038c9805f34 100644 (file)
@@ -207,6 +207,7 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
   int post_backlog = 0;
 
   void notify();
+  void buffer_prefetch(void);
   ssize_t read_buffers(char* buf, size_t len);
   int post_work_request(std::vector<Chunk*>&);
   size_t tx_copy_chunk(std::vector<Chunk*> &tx_buffers, size_t req_copy_len,