From 060c5c8e3a551b6950b8e48b33a97fe0289bd04f Mon Sep 17 00:00:00 2001 From: Changcheng Liu Date: Thu, 20 Jun 2019 16:16:29 +0800 Subject: [PATCH] msg/async/rdma: define handle_rx_event to handle recv-comple-queue 1. define handle_rx_event to let dispatch handle recvive-completion-queue 2. simplify RDMADispatcher::polling implementation Signed-off-by: Changcheng Liu --- src/msg/async/rdma/RDMAStack.cc | 83 ++++++++++++++++++--------------- src/msg/async/rdma/RDMAStack.h | 1 + 2 files changed, 46 insertions(+), 38 deletions(-) diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index eb8db72b134..f19910a0179 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -254,7 +254,6 @@ void RDMADispatcher::polling() std::map > polled; std::vector tx_cqe; ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl; - RDMAConnectedSocketImpl *conn = nullptr; uint64_t last_inactive = Cycles::rdtsc(); bool rearmed = false; int r = 0; @@ -271,43 +270,7 @@ void RDMADispatcher::polling() if (rx_ret > 0) { ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret << " responses."<< dendl; - perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret); - perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret); - - std::lock_guard l{lock};//make sure connected socket alive when pass wc - - for (int i = 0; i < rx_ret; ++i) { - ibv_wc* response = &wc[i]; - Chunk* chunk = reinterpret_cast(response->wr_id); - - if (response->status == IBV_WC_SUCCESS) { - ceph_assert(wc[i].opcode == IBV_WC_RECV); - conn = get_conn_lockless(response->qp_num); - if (!conn) { - ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl; - get_stack()->get_infiniband().post_chunk_to_pool(chunk); - perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); - } else { - conn->post_chunks_to_rq(1); - polled[conn].push_back(*response); - } - } else { - perf_logger->inc(l_msgr_rdma_rx_total_wc_errors); - ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk - << ") status(" << response->status << ":" - << get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl; - if (response->status != IBV_WC_WR_FLUSH_ERR) { - conn = get_conn_lockless(response->qp_num); - if (conn && conn->is_connected()) - conn->fault(); - } - get_stack()->get_infiniband().post_chunk_to_pool(chunk); - perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); - } - } - for (auto &&i : polled) - i.first->pass_wc(std::move(i.second)); - polled.clear(); + handle_rx_event(wc, rx_ret); } if (!tx_ret && !rx_ret) { @@ -523,6 +486,50 @@ void RDMADispatcher::post_tx_buffer(std::vector &chunks) notify_pending_workers(); } +void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number) +{ + + perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_number); + perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_number); + + std::map > polled; + RDMAConnectedSocketImpl *conn = nullptr; + std::lock_guard l{lock};//make sure connected socket alive when pass wc + + for (int i = 0; i < rx_number; ++i) { + ibv_wc* response = &cqe[i]; + Chunk* chunk = reinterpret_cast(response->wr_id); + + if (response->status == IBV_WC_SUCCESS) { + ceph_assert(response->opcode == IBV_WC_RECV); + conn = get_conn_lockless(response->qp_num); + if (!conn) { + ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back." << dendl; + get_stack()->get_infiniband().post_chunk_to_pool(chunk); + perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); + } else { + conn->post_chunks_to_rq(1); + polled[conn].push_back(*response); + } + } else { + perf_logger->inc(l_msgr_rdma_rx_total_wc_errors); + ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk + << ") status(" << response->status << ":" + << get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl; + if (response->status != IBV_WC_WR_FLUSH_ERR) { + conn = get_conn_lockless(response->qp_num); + if (conn && conn->is_connected()) + conn->fault(); + } + get_stack()->get_infiniband().post_chunk_to_pool(chunk); + perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); + } + } + + for (auto &i : polled) + i.first->pass_wc(std::move(i.second)); + polled.clear(); +} RDMAWorker::RDMAWorker(CephContext *c, unsigned i) : Worker(c, i), stack(nullptr), diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index 948d1fce2a0..8033a5b91fe 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -118,6 +118,7 @@ class RDMADispatcher { void notify_pending_workers(); void handle_tx_event(ibv_wc *cqe, int n); void post_tx_buffer(std::vector &chunks); + void handle_rx_event(ibv_wc *cqe, int rx_number); std::atomic inflight = {0}; -- 2.39.5