for (unsigned i=0; i < wc.size(); ++i) {
ret = ibdev->post_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
assert(ret == 0);
+ dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
}
for (unsigned i=0; i < buffers.size(); ++i) {
ret = ibdev->post_chunk(buffers[i]);
assert(ret == 0);
+ dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
}
delete cmgr;
ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
}
assert(ibdev->post_chunk(chunk) == 0);
+ dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
} else {
if (read == (ssize_t)len) {
buffers.push_back(chunk);
} else {
read += chunk->read(buf+read, response->byte_len);
assert(ibdev->post_chunk(chunk) == 0);
+ dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
}
}
}
ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl;
if ((*c)->over()) {
assert(ibdev->post_chunk(*c) == 0);
+ dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
ldout(cct, 25) << __func__ << " one chunk over." << dendl;
}
if (read == len) {
plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");
+ plb.add_u64_counter(l_msgr_rdma_inqueue_rx_chunks, "inqueue_rx_chunks", "The number of inqueue rx chunks");
plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
}
}
- for (auto &&i : polled)
+ for (auto &&i : polled) {
+ perf_logger->inc(l_msgr_rdma_inqueue_rx_chunks, i.second.size());
i.first->pass_wc(std::move(i.second));
+ }
polled.clear();
}
l_msgr_rdma_polling,
l_msgr_rdma_inflight_tx_chunks,
+ l_msgr_rdma_inqueue_rx_chunks,
l_msgr_rdma_tx_total_wc,
l_msgr_rdma_tx_total_wc_errors,