std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
std::vector<ibv_wc> tx_cqe;
ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
- RDMAConnectedSocketImpl *conn = nullptr;
uint64_t last_inactive = Cycles::rdtsc();
bool rearmed = false;
int r = 0;
if (rx_ret > 0) {
ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret
<< " responses."<< dendl;
- perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
- perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret);
-
- std::lock_guard l{lock};//make sure connected socket alive when pass wc
-
- for (int i = 0; i < rx_ret; ++i) {
- ibv_wc* response = &wc[i];
- Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
-
- if (response->status == IBV_WC_SUCCESS) {
- ceph_assert(wc[i].opcode == IBV_WC_RECV);
- conn = get_conn_lockless(response->qp_num);
- if (!conn) {
- ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
- get_stack()->get_infiniband().post_chunk_to_pool(chunk);
- perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
- } else {
- conn->post_chunks_to_rq(1);
- polled[conn].push_back(*response);
- }
- } else {
- perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
- ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
- << ") status(" << response->status << ":"
- << get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl;
- if (response->status != IBV_WC_WR_FLUSH_ERR) {
- conn = get_conn_lockless(response->qp_num);
- if (conn && conn->is_connected())
- conn->fault();
- }
- get_stack()->get_infiniband().post_chunk_to_pool(chunk);
- perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
- }
- }
- for (auto &&i : polled)
- i.first->pass_wc(std::move(i.second));
- polled.clear();
+ handle_rx_event(wc, rx_ret);
}
if (!tx_ret && !rx_ret) {
notify_pending_workers();
}
+void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number)
+{
+
+ perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_number);
+ perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_number);
+
+ std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
+ RDMAConnectedSocketImpl *conn = nullptr;
+ std::lock_guard l{lock};//make sure connected socket alive when pass wc
+
+ for (int i = 0; i < rx_number; ++i) {
+ ibv_wc* response = &cqe[i];
+ Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+
+ if (response->status == IBV_WC_SUCCESS) {
+ ceph_assert(response->opcode == IBV_WC_RECV);
+ conn = get_conn_lockless(response->qp_num);
+ if (!conn) {
+ ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back." << dendl;
+ get_stack()->get_infiniband().post_chunk_to_pool(chunk);
+ perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+ } else {
+ conn->post_chunks_to_rq(1);
+ polled[conn].push_back(*response);
+ }
+ } else {
+ perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
+ ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
+ << ") status(" << response->status << ":"
+ << get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl;
+ if (response->status != IBV_WC_WR_FLUSH_ERR) {
+ conn = get_conn_lockless(response->qp_num);
+ if (conn && conn->is_connected())
+ conn->fault();
+ }
+ get_stack()->get_infiniband().post_chunk_to_pool(chunk);
+ perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+ }
+ }
+
+ for (auto &i : polled)
+ i.first->pass_wc(std::move(i.second));
+ polled.clear();
+}
RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
: Worker(c, i), stack(nullptr),