void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number)
{
-
perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_number);
perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_number);
std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
- RDMAConnectedSocketImpl *conn = nullptr;
std::lock_guard l{lock};//make sure connected socket alive when pass wc
for (int i = 0; i < rx_number; ++i) {
ibv_wc* response = &cqe[i];
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+ RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
+ QueuePair *qp = get_qp_lockless(response->qp_num);
+
+ switch (response->status) {
+ case IBV_WC_SUCCESS:
+ ceph_assert(response->opcode == IBV_WC_RECV);
+ if (!conn) {
+ ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk 0x"
+ << std::hex << chunk << " will be back." << std::dec << dendl;
+ ib->post_chunk_to_pool(chunk);
+ perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+ } else {
+ conn->post_chunks_to_rq(1);
+ polled[conn].push_back(*response);
+
+ if (qp != nullptr && !qp->get_srq()) {
+ qp->remove_rq_wr(chunk);
+ chunk->clear_qp();
+ }
+ }
+ break;
+
+ case IBV_WC_WR_FLUSH_ERR:
+ perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
+
+ if (qp) {
+ ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
+ }
+ if (qp && qp->is_dead()) {
+ ldout(cct, 20) << __func__ << " outstanding RQ WR is flushed into CQ since QueuePair is dead " << dendl;
+ } else {
+ ldout(cct, 1) << __func__ << " RQ WR return error,"
+ << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
+ << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
+ << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
+ if (conn) {
+ ldout(cct, 1) << __func__ << " RQ WR return error, remote Queue Pair, qp number: "
+ << conn->get_peer_qpn() << dendl;
+ }
+ }
- if (response->status == IBV_WC_SUCCESS) {
- ceph_assert(response->opcode == IBV_WC_RECV);
- conn = get_conn_lockless(response->qp_num);
- if (!conn) {
- ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back." << dendl;
ib->post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
- } else {
- conn->post_chunks_to_rq(1);
- polled[conn].push_back(*response);
-
- QueuePair *qp = get_qp_lockless(response->qp_num);
- if (qp != nullptr) {
- qp->remove_rq_wr(chunk);
- chunk->clear_qp();
- }
- }
- } else {
- perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
- ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
- << ") status(" << response->status << ":"
- << ib->wc_status_to_string(response->status) << ")" << dendl;
- if (response->status != IBV_WC_WR_FLUSH_ERR) {
- conn = get_conn_lockless(response->qp_num);
+ break;
+
+ default:
+ perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
+
+ ldout(cct, 1) << __func__ << " RQ WR return error,"
+ << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
+ << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
+ << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
if (conn && conn->is_connected())
conn->fault();
- }
- ib->post_chunk_to_pool(chunk);
- perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+
+ ib->post_chunk_to_pool(chunk);
+ perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+ break;
}
}