From 2754d60f6615024c76f09d22d2480a9b69369a12 Mon Sep 17 00:00:00 2001 From: Changcheng Liu Date: Fri, 2 Aug 2019 11:23:08 +0800 Subject: [PATCH] msg/async/rdma: implement function to prefetch buffers The original RDMAConnectedSocketImpl::read read date from buffers and prefertch data into buffers for next round of reading. It makes the logical a little complex and the code isn't smooth to be read. In this patch: 1) RDMAConnectedSocketImpl::buffer_prefetch private API is added to prefetch data into buffers at the head of read_buffers. 2) reduce one time of calling notify() to reduce context switches. It's really not needed to notify upper layer to read data since current read operation hasn't finished yet. 3) Simplify RDMAConnectedSocketImpl::read implementation. Signed-off-by: Changcheng Liu --- src/msg/async/rdma/RDMAConnectedSocketImpl.cc | 69 +++++++------------ src/msg/async/rdma/RDMAStack.h | 1 + 2 files changed, 27 insertions(+), 43 deletions(-) diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index f06f179fdd8..8fb486834ab 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -271,31 +271,37 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) return -EAGAIN; } ssize_t read = 0; - if (!buffers.empty()) - read = read_buffers(buf,len); + read = read_buffers(buf,len); + if (is_server && connected == 0) { + ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl; + connected = 1; //if so, we don't need the last handshake + cleanup(); + submit(false); + } + + if (!buffers.empty()) { + notify(); + } + + if (read == 0 && error) + return -error; + return read == 0 ? -EAGAIN : read; +} + +void RDMAConnectedSocketImpl::buffer_prefetch(void) +{ std::vector cqe; get_wc(cqe); - if (cqe.empty()) { - if (!buffers.empty()) { - notify(); - } - if (read > 0) { - return read; - } - if (error) { - return -error; - } else { - return -EAGAIN; - } - } + if(cqe.empty()) + return; - ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl; - for (size_t i = 0; i < cqe.size(); ++i) { + for(size_t i = 0; i < cqe.size(); ++i) { ibv_wc* response = &cqe[i]; ceph_assert(response->status == IBV_WC_SUCCESS); Chunk* chunk = reinterpret_cast(response->wr_id); chunk->prepare_read(response->byte_len); + if (chunk->get_size() == 0) { chunk->reset_read_chunk(); dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin); @@ -305,42 +311,18 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) } dispatcher->post_chunk_to_pool(chunk); continue; - } - ldout(cct, 25) << __func__ << " chunk " << chunk << " has bytes " << chunk->get_size() << dendl; - worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len); - - read += chunk->read(buf + read, len - read); - - if (chunk->get_size()) { + } else { buffers.push_back(chunk); ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; - } else { - chunk->reset_read_chunk(); - dispatcher->post_chunk_to_pool(chunk); - update_post_backlog(); } } - worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size()); - if (is_server && connected == 0) { - ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl; - connected = 1; //if so, we don't need the last handshake - cleanup(); - submit(false); - } - - if (!buffers.empty()) { - notify(); - } - - if (read == 0 && error) - return -error; - return read == 0 ? -EAGAIN : read; } ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) { size_t read_size = 0, tmp = 0; + buffer_prefetch(); auto pchunk = buffers.begin(); while (pchunk != buffers.end()) { tmp = (*pchunk)->read(buf + read_size, len - read_size); @@ -363,6 +345,7 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) buffers.erase(buffers.begin(), pchunk); ldout(cct, 25) << __func__ << " got " << read_size << " bytes, buffers size: " << buffers.size() << dendl; + worker->perf_logger->inc(l_msgr_rdma_rx_bytes, read_size); return read_size; } diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index 1642d17ad4d..13a80f68a5c 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -207,6 +207,7 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl { int post_backlog = 0; void notify(); + void buffer_prefetch(void); ssize_t read_buffers(char* buf, size_t len); int post_work_request(std::vector&); size_t tx_copy_chunk(std::vector &tx_buffers, size_t req_copy_len, -- 2.39.5