OPTION(ms_async_rdma_receive_buffers, OPT_U32)
// max number of wr in srq
OPTION(ms_async_rdma_receive_queue_len, OPT_U32)
+// support srq
+OPTION(ms_async_rdma_support_srq, OPT_BOOL)
OPTION(ms_async_rdma_port_num, OPT_U32)
OPTION(ms_async_rdma_polling_us, OPT_U32)
OPTION(ms_async_rdma_local_gid, OPT_STR) // GID format: "fe80:0000:0000:0000:7efe:90ff:fe72:6efe", no zero folding
.set_default(4096)
.set_description(""),
+ Option("ms_async_rdma_support_srq", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
+ .set_default(true)
+ .set_description(""),
+
Option("ms_async_rdma_port_num", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(1)
.set_description(""),
memset(&qpia, 0, sizeof(qpia));
qpia.send_cq = txcq->get_cq();
qpia.recv_cq = rxcq->get_cq();
- qpia.srq = srq; // use the same shared receive queue
+ if (srq) {
+ qpia.srq = srq; // use the same shared receive queue
+ } else {
+ qpia.cap.max_recv_wr = max_recv_wr;
+ qpia.cap.max_recv_sge = 1;
+ }
qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests
qpia.cap.max_send_sge = 1; // max send scatter-gather elements
qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q
pd = new ProtectionDomain(cct, device);
assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
- rx_queue_len = device->device_attr->max_srq_wr;
+ support_srq = cct->_conf->ms_async_rdma_support_srq;
+ if (support_srq)
+ rx_queue_len = device->device_attr->max_srq_wr;
+ else
+ rx_queue_len = device->device_attr->max_qp_wr;
if (rx_queue_len > cct->_conf->ms_async_rdma_receive_queue_len) {
rx_queue_len = cct->_conf->ms_async_rdma_receive_queue_len;
ldout(cct, 1) << __func__ << " receive queue length is " << rx_queue_len << " receive buffers" << dendl;
memory_manager = new MemoryManager(cct, device, pd);
memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len);
- srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
-
- post_chunks_to_srq(rx_queue_len); //add to srq
+ if (support_srq) {
+ srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
+ post_chunks_to_rq(rx_queue_len, NULL); //add to srq
+ }
}
Infiniband::~Infiniband()
{
if (!initialized)
return;
-
- ibv_destroy_srq(srq);
+ if (support_srq)
+ ibv_destroy_srq(srq);
delete memory_manager;
delete pd;
}
return qp;
}
-int Infiniband::post_chunks_to_srq(int num)
+int Infiniband::post_chunks_to_rq(int num, ibv_qp *qp)
{
int ret, i = 0;
ibv_sge isge[num];
i++;
}
ibv_recv_wr *badworkrequest;
- ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
- assert(ret == 0);
+ if (support_srq) {
+ ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
+ assert(ret == 0);
+ } else {
+ assert(qp);
+ ret = ibv_post_recv(qp, &rx_work_request[0], &badworkrequest);
+ assert(ret == 0);
+ }
return i;
}
MemPoolContext rxbuf_pool_ctx;
mem_pool rxbuf_pool;
+
void* huge_pages_malloc(size_t size);
void huge_pages_free(void *ptr);
};
bool initialized = false;
const std::string &device_name;
uint8_t port_num;
+ bool support_srq = false;
public:
explicit Infiniband(CephContext *c);
ibv_qp_type type, struct rdma_cm_id *cm_id);
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
// post rx buffers to srq, return number of buffers actually posted
- int post_chunks_to_srq(int num);
+ int post_chunks_to_rq(int num, ibv_qp *qp=NULL);
void post_chunk_to_pool(Chunk* chunk) {
get_memory_manager()->release_rx_buffer(chunk);
}
Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
static const char* wc_status_to_string(int status);
static const char* qp_state_string(int status);
+ uint32_t get_rx_queue_len() const { return rx_queue_len; }
};
#endif
} else {
read += chunk->read(buf+read, response->byte_len);
dispatcher->post_chunk_to_pool(chunk);
+ update_post_backlog();
}
}
}
ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl;
if ((*c)->over()) {
dispatcher->post_chunk_to_pool(*c);
+ update_post_backlog();
ldout(cct, 25) << __func__ << " one chunk over." << dendl;
}
if (read == len) {
worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
}, true);
}
+
+void RDMAConnectedSocketImpl::post_chunks_to_rq(int num)
+{
+ post_backlog += num - infiniband->post_chunks_to_rq(num, qp->get_qp());
+}
+
+void RDMAConnectedSocketImpl::update_post_backlog()
+{
+ if (post_backlog)
+ post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp->get_qp());
+}
cm_channel = info->cm_channel;
status = RDMA_ID_CREATED;
remote_qpn = info->qp_num;
- worker->center.submit_to(worker->center.get_id(), [this]() {
- worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
- status = CHANNEL_FD_CREATED;
- }, false);
if (alloc_resource()) {
close_notify();
return;
}
+ worker->center.submit_to(worker->center.get_id(), [this]() {
+ worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
+ status = CHANNEL_FD_CREATED;
+ }, false);
status = RESOURCE_ALLOCATED;
local_qpn = qp->get_local_qp_number();
my_msg.qpn = local_qpn;
void RDMAIWARPConnectedSocketImpl::close() {
error = ECONNRESET;
active = false;
- if (status >= CONNECTED)
+ if (status >= CONNECTED) {
rdma_disconnect(cm_id);
+ }
close_notify();
}
break;
case RDMA_CM_EVENT_ESTABLISHED:
+ ldout(cct, 20) << __func__ << " qp_num=" << cm_id->qp->qp_num << dendl;
status = CONNECTED;
if (!is_server) {
remote_qpn = event->param.conn.qp_num;
if (!qp) {
return -1;
}
+ if (!cct->_conf->ms_async_rdma_support_srq)
+ dispatcher->post_chunks_to_rq(infiniband->get_rx_queue_len(), qp->get_qp());
dispatcher->register_qp(qp, this);
dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
}
}
-void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) {
+void RDMADispatcher::post_chunk_to_pool(Chunk* chunk)
+{
Mutex::Locker l(lock);
get_stack()->get_infiniband().post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
- // handle a case when we have a limited number of
- // rx buffers and we could not post a required amount when polling
- if (post_backlog > 0) {
- ldout(cct, 20) << __func__ << " post_backlog is " << post_backlog << dendl;
- post_backlog -= get_stack()->get_infiniband().post_chunks_to_srq(post_backlog);
- }
+}
+
+int RDMADispatcher::post_chunks_to_rq(int num, ibv_qp *qp)
+{
+ Mutex::Locker l(lock);
+ return get_stack()->get_infiniband().post_chunks_to_rq(num, qp);
}
void RDMADispatcher::polling()
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
- post_backlog += rx_ret - get_stack()->get_infiniband().post_chunks_to_srq(rx_ret);
-
for (int i = 0; i < rx_ret; ++i) {
ibv_wc* response = &wc[i];
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
- ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
-
- assert(wc[i].opcode == IBV_WC_RECV);
if (response->status == IBV_WC_SUCCESS) {
+ assert(wc[i].opcode == IBV_WC_RECV);
conn = get_conn_lockless(response->qp_num);
if (!conn) {
ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
get_stack()->get_infiniband().post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
} else {
- polled[conn].push_back(*response);
+ conn->post_chunks_to_rq(1);
+ polled[conn].push_back(*response);
}
} else {
perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
- perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
-
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
<< ") status(" << response->status << ":"
<< get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl;
- conn = get_conn_lockless(response->qp_num);
- if (conn && conn->is_connected())
- conn->fault();
-
+ if (response->status != IBV_WC_WR_FLUSH_ERR) {
+ conn = get_conn_lockless(response->qp_num);
+ if (conn && conn->is_connected())
+ conn->fault();
+ }
get_stack()->get_infiniband().post_chunk_to_pool(chunk);
+ perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
}
}
for (auto &&i : polled)
ldout(cct, 1) << __func__ << " send work request returned error for buffer("
<< response->wr_id << ") status(" << response->status << "): "
<< get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
- }
-
- Mutex::Locker l(lock);//make sure connected socket alive when pass wc
- RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
+ Mutex::Locker l(lock);//make sure connected socket alive when pass wc
+ RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
- if (conn && conn->is_connected()) {
- ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
- conn->fault();
- } else {
- ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
+ if (conn && conn->is_connected()) {
+ ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;
+ conn->fault();
+ } else {
+ ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
+ }
}
}
bool done = false;
std::atomic<uint64_t> num_dead_queue_pair = {0};
std::atomic<uint64_t> num_qp_conn = {0};
- int post_backlog = 0;
Mutex lock; // protect `qp_conns`, `dead_queue_pairs`
// qp_num -> InfRcConnection
// The main usage of `qp_conns` is looking up connection by qp_num,
std::atomic<uint64_t> inflight = {0};
- void post_chunk_to_pool(Chunk* chunk);
-
+ void post_chunk_to_pool(Chunk* chunk);
+ int post_chunks_to_rq(int num, ibv_qp *qp=NULL);
};
class RDMAWorker : public Worker {
int tcp_fd = -1;
bool active;// qp is active ?
bool pending;
+ int post_backlog = 0;
void notify();
ssize_t read_buffers(char* buf, size_t len);
virtual int try_connect(const entity_addr_t&, const SocketOptions &opt);
bool is_pending() {return pending;}
void set_pending(bool val) {pending = val;}
+ void post_chunks_to_rq(int num);
+ void update_post_backlog();
class C_handle_connection : public EventCallback {
RDMAConnectedSocketImpl *csi;