max_send_wr(tx_queue_len),
max_recv_wr(rx_queue_len),
q_key(q_key),
- dead(false)
+ dead(false),
+ tx_wr(0),
+ tx_wc(0)
{
initial_psn = lrand48() & 0xffffff;
if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {
#include <infiniband/verbs.h>
+#include <atomic>
#include <string>
#include <vector>
* Return true if the queue pair is in an error state, false otherwise.
*/
bool is_error() const;
+ /**
+ * Add Tx work request and completion counters.
+ */
+ void add_tx_wr(uint32_t amt) { tx_wr += amt; }
+ void add_tx_wc(uint32_t amt) { tx_wc += amt; }
+ /**
+ * Get Tx work request and completion counter values.
+ */
+ uint32_t get_tx_wr() const { return tx_wr; }
+ uint32_t get_tx_wc() const { return tx_wc; }
ibv_qp* get_qp() const { return qp; }
Infiniband::CompletionQueue* get_tx_cq() const { return txcq; }
Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; }
uint32_t max_recv_wr;
uint32_t q_key;
bool dead;
+ std::atomic<uint32_t> tx_wr; // atomic counter for successful Tx WQEs
+ std::atomic<uint32_t> tx_wc; // atomic counter for successful Tx CQEs
};
public:
ibv_send_wr iswr[tx_buffers.size()];
uint32_t current_swr = 0;
ibv_send_wr* pre_wr = NULL;
+ uint32_t num = 0;
memset(iswr, 0, sizeof(iswr));
memset(isge, 0, sizeof(isge));
ldout(cct, 20) << __func__ << " send_inline." << dendl;
}*/
+ num++;
worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
if (pre_wr)
pre_wr->next = &iswr[current_swr];
worker->perf_logger->inc(l_msgr_rdma_tx_failed);
return -errno;
}
+ // Update the Tx WQE counter
+ qp->add_tx_wr(num);
worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
return 0;
worker->perf_logger->inc(l_msgr_rdma_tx_failed);
return ;
}
+ // Update the Tx WQE counter
+ qp->add_tx_wr(1);
}
void RDMAConnectedSocketImpl::cleanup() {
perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
if (num_dead_queue_pair) {
Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms
- while (!dead_queue_pairs.empty()) {
- ldout(cct, 10) << __func__ << " finally delete qp=" << dead_queue_pairs.back() << dendl;
- delete dead_queue_pairs.back();
+ for (auto idx = 0; idx < dead_queue_pairs.size(); idx++) {
+ // Bypass QPs that do not collect all Tx completions yet.
+ if (dead_queue_pairs.at(idx)->get_tx_wc() != dead_queue_pairs.at(idx)->get_tx_wr())
+ continue;
+ ldout(cct, 10) << __func__ << " finally delete qp=" << dead_queue_pairs.at(idx) << dendl;
+ delete dead_queue_pairs.at(idx);
+ dead_queue_pairs.erase(dead_queue_pairs.begin() + idx);
perf_logger->dec(l_msgr_rdma_active_queue_pair);
- dead_queue_pairs.pop_back();
--num_dead_queue_pair;
}
}
- if (!num_qp_conn && done)
+ if (!num_qp_conn && done && dead_queue_pairs.empty())
break;
uint64_t now = Cycles::rdtsc();
return it->second.second;
}
+Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp)
+{
+ Mutex::Locker l(lock);
+ // Try to find the QP in qp_conns firstly.
+ auto it = qp_conns.find(qp);
+ if (it == qp_conns.end()) {
+ // Try again in dead_queue_pairs.
+ for(auto dead_qp = dead_queue_pairs.begin(); dead_qp != dead_queue_pairs.end(); dead_qp++) {
+ if ((*dead_qp)->get_local_qp_number() == qp)
+ return *dead_qp;
+ }
+ return nullptr;
+ }
+ return it->second.first;
+}
+
void RDMADispatcher::erase_qpn_lockless(uint32_t qpn)
{
auto it = qp_conns.find(qpn);
<< " len: " << response->byte_len << " , addr:" << chunk
<< " " << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
+ // Update the Tx CQE counter.
+ QueuePair *qp = get_qp(response->qp_num);
+ if (qp)
+ qp->add_tx_wc(1);
+
if (response->status != IBV_WC_SUCCESS) {
perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
if (response->status == IBV_WC_RETRY_EXC_ERR) {
}
RDMAStack* get_stack() { return stack; }
RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
+ QueuePair* get_qp(uint32_t qp);
void erase_qpn_lockless(uint32_t qpn);
void erase_qpn(uint32_t qpn);
Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }