]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: fix Tx buffer leakage which can introduce "heartbeat no
authorownedu <yanlei_cv@aliyun.com>
Sat, 30 Sep 2017 02:14:39 +0000 (10:14 +0800)
committerownedu <yanlei_cv@aliyun.com>
Sat, 30 Sep 2017 02:14:39 +0000 (10:14 +0800)
reply" due to out of Tx buffers, this can be reproduced by marking some
OSDs down in a big Ceph cluster, say 300+ OSDs.

rootcause: when RDMAStack wants to delete faulty connections there are
chances that those QPs still have inflight CQEs, thus inflight Tx
buffers; without waiting for them to complete, Tx buffer pool will run
out of buffers finally.

fix: ideally the best way to fix this bug is to destroy QPs gracefully
such as to_dead(), we now just reply on the number of Tx WQE and CQE to
avoid buffer leakage; RDMAStack polling is always running so we are safe
to simply bypass some QPs that are not in 'complete' state.

Signed-off-by: Yan Lei <yongyou.yl@alibaba-inc.com>
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index db2245dd1e1dd37708fce86380926fc9980e6802..e774226b416d12f182666b636f881ac9634e9a28 100644 (file)
@@ -166,7 +166,9 @@ Infiniband::QueuePair::QueuePair(
   max_send_wr(tx_queue_len),
   max_recv_wr(rx_queue_len),
   q_key(q_key),
-  dead(false)
+  dead(false),
+  tx_wr(0),
+  tx_wc(0)
 {
   initial_psn = lrand48() & 0xffffff;
   if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {
index fff706df4f823081d66fcc6ffeb251709e83c659..7ebfbca73e281fe1c1de2ea21b930b86c94b8643 100644 (file)
@@ -23,6 +23,7 @@
 
 #include <infiniband/verbs.h>
 
+#include <atomic>
 #include <string>
 #include <vector>
 
@@ -464,6 +465,16 @@ class Infiniband {
      * Return true if the queue pair is in an error state, false otherwise.
      */
     bool is_error() const;
+    /**
+     * Add Tx work request and completion counters.
+     */
+    void add_tx_wr(uint32_t amt) { tx_wr += amt; }
+    void add_tx_wc(uint32_t amt) { tx_wc += amt; }
+    /**
+     * Get Tx work request and completion counter values.
+     */
+    uint32_t get_tx_wr() const { return tx_wr; }
+    uint32_t get_tx_wc() const { return tx_wc; }
     ibv_qp* get_qp() const { return qp; }
     Infiniband::CompletionQueue* get_tx_cq() const { return txcq; }
     Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; }
@@ -486,6 +497,8 @@ class Infiniband {
     uint32_t     max_recv_wr;
     uint32_t     q_key;
     bool dead;
+    std::atomic<uint32_t> tx_wr; // atomic counter for successful Tx WQEs
+    std::atomic<uint32_t> tx_wc; // atomic counter for successful Tx CQEs
   };
 
  public:
index 8f2ac4a59c147480a13555a08d79c075185f53de..c9427f5bba6c6a6c2fe85630a8b18f4337a76200 100644 (file)
@@ -537,6 +537,7 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
   ibv_send_wr iswr[tx_buffers.size()];
   uint32_t current_swr = 0;
   ibv_send_wr* pre_wr = NULL;
+  uint32_t num = 0; 
 
   memset(iswr, 0, sizeof(iswr));
   memset(isge, 0, sizeof(isge));
@@ -558,6 +559,7 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
       ldout(cct, 20) << __func__ << " send_inline." << dendl;
       }*/
 
+    num++;
     worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
     if (pre_wr)
       pre_wr->next = &iswr[current_swr];
@@ -575,6 +577,8 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
     worker->perf_logger->inc(l_msgr_rdma_tx_failed);
     return -errno;
   }
+  // Update the Tx WQE counter
+  qp->add_tx_wr(num);
   worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
   ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
   return 0;
@@ -595,6 +599,8 @@ void RDMAConnectedSocketImpl::fin() {
     worker->perf_logger->inc(l_msgr_rdma_tx_failed);
     return ;
   }
+  // Update the Tx WQE counter
+  qp->add_tx_wr(1);
 }
 
 void RDMAConnectedSocketImpl::cleanup() {
index 1b00f8f0c997462ed50b9a8851e8fdea2fcb4830..031faf289a38e5e1c7bb58f2ea07f6d149ad761d 100644 (file)
@@ -244,15 +244,18 @@ void RDMADispatcher::polling()
       perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
       if (num_dead_queue_pair) {
         Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms
-        while (!dead_queue_pairs.empty()) {
-          ldout(cct, 10) << __func__ << " finally delete qp=" << dead_queue_pairs.back() << dendl;
-          delete dead_queue_pairs.back();
+        for (auto idx = 0; idx < dead_queue_pairs.size(); idx++) {
+          // Bypass QPs that do not collect all Tx completions yet.
+          if (dead_queue_pairs.at(idx)->get_tx_wc() != dead_queue_pairs.at(idx)->get_tx_wr())
+            continue;
+          ldout(cct, 10) << __func__ << " finally delete qp=" << dead_queue_pairs.at(idx) << dendl;
+          delete dead_queue_pairs.at(idx);
+          dead_queue_pairs.erase(dead_queue_pairs.begin() + idx);
           perf_logger->dec(l_msgr_rdma_active_queue_pair);
-          dead_queue_pairs.pop_back();
           --num_dead_queue_pair;
         }
       }
-      if (!num_qp_conn && done)
+      if (!num_qp_conn && done && dead_queue_pairs.empty())
         break;
 
       uint64_t now = Cycles::rdtsc();
@@ -333,6 +336,22 @@ RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
   return it->second.second;
 }
 
+Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp)
+{
+  Mutex::Locker l(lock);
+  // Try to find the QP in qp_conns firstly.
+  auto it = qp_conns.find(qp);
+  if (it == qp_conns.end()) {
+    // Try again in dead_queue_pairs.
+    for(auto dead_qp = dead_queue_pairs.begin(); dead_qp != dead_queue_pairs.end(); dead_qp++) {
+      if ((*dead_qp)->get_local_qp_number() == qp)
+        return *dead_qp;
+    }
+    return nullptr;
+  }
+  return it->second.first;
+}
+
 void RDMADispatcher::erase_qpn_lockless(uint32_t qpn)
 {
   auto it = qp_conns.find(qpn);
@@ -361,6 +380,11 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
                    << " len: " << response->byte_len << " , addr:" << chunk
                    << " " << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
 
+    // Update the Tx CQE counter.
+    QueuePair *qp = get_qp(response->qp_num);
+    if (qp)
+      qp->add_tx_wc(1);
+
     if (response->status != IBV_WC_SUCCESS) {
       perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
       if (response->status == IBV_WC_RETRY_EXC_ERR) {
index 74cc94a5a4e6c8836f57fb488227e49ba28b2bfb..9673428db892c2b0d48b4b7657bd747fb5f4ea00 100644 (file)
@@ -108,6 +108,7 @@ class RDMADispatcher {
   }
   RDMAStack* get_stack() { return stack; }
   RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
+  QueuePair* get_qp(uint32_t qp);
   void erase_qpn_lockless(uint32_t qpn);
   void erase_qpn(uint32_t qpn);
   Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }