From: ownedu Date: Sat, 30 Sep 2017 02:14:39 +0000 (+0800) Subject: msg/async/rdma: fix Tx buffer leakage which can introduce "heartbeat no X-Git-Tag: v13.0.1~696^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=92c3499f7b621937c2b51816e30a2a209f6db476;p=ceph.git msg/async/rdma: fix Tx buffer leakage which can introduce "heartbeat no reply" due to out of Tx buffers, this can be reproduced by marking some OSDs down in a big Ceph cluster, say 300+ OSDs. rootcause: when RDMAStack wants to delete faulty connections there are chances that those QPs still have inflight CQEs, thus inflight Tx buffers; without waiting for them to complete, Tx buffer pool will run out of buffers finally. fix: ideally the best way to fix this bug is to destroy QPs gracefully such as to_dead(), we now just reply on the number of Tx WQE and CQE to avoid buffer leakage; RDMAStack polling is always running so we are safe to simply bypass some QPs that are not in 'complete' state. Signed-off-by: Yan Lei --- diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index db2245dd1e1d..e774226b416d 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -166,7 +166,9 @@ Infiniband::QueuePair::QueuePair( max_send_wr(tx_queue_len), max_recv_wr(rx_queue_len), q_key(q_key), - dead(false) + dead(false), + tx_wr(0), + tx_wc(0) { initial_psn = lrand48() & 0xffffff; if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) { diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index fff706df4f82..7ebfbca73e28 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -23,6 +23,7 @@ #include +#include #include #include @@ -464,6 +465,16 @@ class Infiniband { * Return true if the queue pair is in an error state, false otherwise. */ bool is_error() const; + /** + * Add Tx work request and completion counters. + */ + void add_tx_wr(uint32_t amt) { tx_wr += amt; } + void add_tx_wc(uint32_t amt) { tx_wc += amt; } + /** + * Get Tx work request and completion counter values. + */ + uint32_t get_tx_wr() const { return tx_wr; } + uint32_t get_tx_wc() const { return tx_wc; } ibv_qp* get_qp() const { return qp; } Infiniband::CompletionQueue* get_tx_cq() const { return txcq; } Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; } @@ -486,6 +497,8 @@ class Infiniband { uint32_t max_recv_wr; uint32_t q_key; bool dead; + std::atomic tx_wr; // atomic counter for successful Tx WQEs + std::atomic tx_wc; // atomic counter for successful Tx CQEs }; public: diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index 8f2ac4a59c14..c9427f5bba6c 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -537,6 +537,7 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector &tx_buffers) ibv_send_wr iswr[tx_buffers.size()]; uint32_t current_swr = 0; ibv_send_wr* pre_wr = NULL; + uint32_t num = 0; memset(iswr, 0, sizeof(iswr)); memset(isge, 0, sizeof(isge)); @@ -558,6 +559,7 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector &tx_buffers) ldout(cct, 20) << __func__ << " send_inline." << dendl; }*/ + num++; worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length); if (pre_wr) pre_wr->next = &iswr[current_swr]; @@ -575,6 +577,8 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector &tx_buffers) worker->perf_logger->inc(l_msgr_rdma_tx_failed); return -errno; } + // Update the Tx WQE counter + qp->add_tx_wr(num); worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size()); ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl; return 0; @@ -595,6 +599,8 @@ void RDMAConnectedSocketImpl::fin() { worker->perf_logger->inc(l_msgr_rdma_tx_failed); return ; } + // Update the Tx WQE counter + qp->add_tx_wr(1); } void RDMAConnectedSocketImpl::cleanup() { diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index 1b00f8f0c997..031faf289a38 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -244,15 +244,18 @@ void RDMADispatcher::polling() perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight); if (num_dead_queue_pair) { Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms - while (!dead_queue_pairs.empty()) { - ldout(cct, 10) << __func__ << " finally delete qp=" << dead_queue_pairs.back() << dendl; - delete dead_queue_pairs.back(); + for (auto idx = 0; idx < dead_queue_pairs.size(); idx++) { + // Bypass QPs that do not collect all Tx completions yet. + if (dead_queue_pairs.at(idx)->get_tx_wc() != dead_queue_pairs.at(idx)->get_tx_wr()) + continue; + ldout(cct, 10) << __func__ << " finally delete qp=" << dead_queue_pairs.at(idx) << dendl; + delete dead_queue_pairs.at(idx); + dead_queue_pairs.erase(dead_queue_pairs.begin() + idx); perf_logger->dec(l_msgr_rdma_active_queue_pair); - dead_queue_pairs.pop_back(); --num_dead_queue_pair; } } - if (!num_qp_conn && done) + if (!num_qp_conn && done && dead_queue_pairs.empty()) break; uint64_t now = Cycles::rdtsc(); @@ -333,6 +336,22 @@ RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp) return it->second.second; } +Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp) +{ + Mutex::Locker l(lock); + // Try to find the QP in qp_conns firstly. + auto it = qp_conns.find(qp); + if (it == qp_conns.end()) { + // Try again in dead_queue_pairs. + for(auto dead_qp = dead_queue_pairs.begin(); dead_qp != dead_queue_pairs.end(); dead_qp++) { + if ((*dead_qp)->get_local_qp_number() == qp) + return *dead_qp; + } + return nullptr; + } + return it->second.first; +} + void RDMADispatcher::erase_qpn_lockless(uint32_t qpn) { auto it = qp_conns.find(qpn); @@ -361,6 +380,11 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) << " len: " << response->byte_len << " , addr:" << chunk << " " << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl; + // Update the Tx CQE counter. + QueuePair *qp = get_qp(response->qp_num); + if (qp) + qp->add_tx_wc(1); + if (response->status != IBV_WC_SUCCESS) { perf_logger->inc(l_msgr_rdma_tx_total_wc_errors); if (response->status == IBV_WC_RETRY_EXC_ERR) { diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index 74cc94a5a4e6..9673428db892 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -108,6 +108,7 @@ class RDMADispatcher { } RDMAStack* get_stack() { return stack; } RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp); + QueuePair* get_qp(uint32_t qp); void erase_qpn_lockless(uint32_t qpn); void erase_qpn(uint32_t qpn); Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }