]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: define handle_rx_event to handle recv-comple-queue
authorChangcheng Liu <changcheng.liu@aliyun.com>
Thu, 20 Jun 2019 08:16:29 +0000 (16:16 +0800)
committerChangcheng Liu <changcheng.liu@aliyun.com>
Fri, 23 Aug 2019 02:45:22 +0000 (10:45 +0800)
1. define handle_rx_event to let dispatch handle
recvive-completion-queue
2. simplify RDMADispatcher::polling implementation

Signed-off-by: Changcheng Liu <changcheng.liu@aliyun.com>
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index eb8db72b134d7748c654979b2873f0d914014d02..f19910a01793ceba974f341fd284ef72b6f94b7e 100644 (file)
@@ -254,7 +254,6 @@ void RDMADispatcher::polling()
   std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
   std::vector<ibv_wc> tx_cqe;
   ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
-  RDMAConnectedSocketImpl *conn = nullptr;
   uint64_t last_inactive = Cycles::rdtsc();
   bool rearmed = false;
   int r = 0;
@@ -271,43 +270,7 @@ void RDMADispatcher::polling()
     if (rx_ret > 0) {
       ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret
                      << " responses."<< dendl;
-      perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
-      perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret);
-
-      std::lock_guard l{lock};//make sure connected socket alive when pass wc
-
-      for (int i = 0; i < rx_ret; ++i) {
-        ibv_wc* response = &wc[i];
-        Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
-
-        if (response->status == IBV_WC_SUCCESS) {
-          ceph_assert(wc[i].opcode == IBV_WC_RECV);
-          conn = get_conn_lockless(response->qp_num);
-          if (!conn) {
-            ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
-            get_stack()->get_infiniband().post_chunk_to_pool(chunk);
-            perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
-          } else {
-            conn->post_chunks_to_rq(1);
-            polled[conn].push_back(*response);
-          }
-        } else {
-          perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
-          ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
-              << ") status(" << response->status << ":"
-              << get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl;
-          if (response->status != IBV_WC_WR_FLUSH_ERR) {
-            conn = get_conn_lockless(response->qp_num);
-            if (conn && conn->is_connected())
-              conn->fault();
-          }
-          get_stack()->get_infiniband().post_chunk_to_pool(chunk);
-          perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
-        }
-      }
-      for (auto &&i : polled)
-        i.first->pass_wc(std::move(i.second));
-      polled.clear();
+      handle_rx_event(wc, rx_ret);
     }
 
     if (!tx_ret && !rx_ret) {
@@ -523,6 +486,50 @@ void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
   notify_pending_workers();
 }
 
+void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number)
+{
+
+  perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_number);
+  perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_number);
+
+  std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
+  RDMAConnectedSocketImpl *conn = nullptr;
+  std::lock_guard l{lock};//make sure connected socket alive when pass wc
+
+  for (int i = 0; i < rx_number; ++i) {
+    ibv_wc* response = &cqe[i];
+    Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+
+    if (response->status == IBV_WC_SUCCESS) {
+      ceph_assert(response->opcode == IBV_WC_RECV);
+      conn = get_conn_lockless(response->qp_num);
+      if (!conn) {
+        ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back." << dendl;
+        get_stack()->get_infiniband().post_chunk_to_pool(chunk);
+        perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+      } else {
+        conn->post_chunks_to_rq(1);
+        polled[conn].push_back(*response);
+      }
+    } else {
+      perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
+      ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
+                    << ") status(" << response->status << ":"
+                    << get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl;
+      if (response->status != IBV_WC_WR_FLUSH_ERR) {
+        conn = get_conn_lockless(response->qp_num);
+        if (conn && conn->is_connected())
+          conn->fault();
+      }
+      get_stack()->get_infiniband().post_chunk_to_pool(chunk);
+      perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+    }
+  }
+
+  for (auto &i : polled)
+    i.first->pass_wc(std::move(i.second));
+  polled.clear();
+}
 
 RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
   : Worker(c, i), stack(nullptr),
index 948d1fce2a0e37a2a0b2c03eee556a3dcc8702f9..8033a5b91febf626a925e9275804666117bba41b 100644 (file)
@@ -118,6 +118,7 @@ class RDMADispatcher {
   void notify_pending_workers();
   void handle_tx_event(ibv_wc *cqe, int n);
   void post_tx_buffer(std::vector<Chunk*> &chunks);
+  void handle_rx_event(ibv_wc *cqe, int rx_number);
 
   std::atomic<uint64_t> inflight = {0};