OPTION(ms_async_rdma_send_buffers, OPT_U32, 10240)
OPTION(ms_async_rdma_receive_buffers, OPT_U32, 10240)
OPTION(ms_async_rdma_port_num, OPT_U32, 1)
+OPTION(ms_async_rdma_polling_us, OPT_U32, 1000)
OPTION(ms_dpdk_port_id, OPT_INT, 0)
OPTION(ms_dpdk_coremask, OPT_STR, "1")
*
*/
+#include <poll.h>
+
#include "include/str_list.h"
#include "RDMAStack.h"
RDMAWorker* worker;
ldout(cct, 20) << __func__ << " going to poll rx cq:" << rx_cq << dendl;
RDMAConnectedSocketImpl *conn = nullptr;
+ utime_t last_inactive = ceph_clock_now(cct);
+ bool rearmed = false;
while (true) {
int n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
}
}
// handle_async_event();
- if (done && !inflight)
+ if (done)
break;
+
+ if ((ceph_clock_now(cct) - last_inactive).to_nsec() / 1000 > cct->_conf->ms_async_rdma_polling_us) {
+ if (!rearmed) {
+ // Clean up cq events after rearm notify ensure no new incoming event
+ // arrived between polling and rearm
+ rx_cq->rearm_notify();
+ rearmed = true;
+ continue;
+ }
+
+ struct pollfd channel_poll;
+ channel_poll.fd = rx_cc->get_fd();
+ channel_poll.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+ channel_poll.revents = 0;
+ int r = 0;
+ while (!done && r == 0) {
+ r = poll(&channel_poll, 1, 1);
+ if (r < 0) {
+ r = -errno;
+ lderr(cct) << __func__ << " poll failed " << r << dendl;
+ assert(0);
+ }
+ }
+ if (r > 0 && rx_cc->get_cq_event())
+ ldout(cct, 20) << __func__ << " got cq event." << dendl;
+ last_inactive = ceph_clock_now(cct);
+ rearmed = false;
+ }
continue;
}