// 1 means no valid buffer read, 0 means got enough buffer
// else return < 0 means error
-int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
+int Infiniband::recv_msg(CephContext *cct, int sd, ib_cm_meta_t& im)
{
char msg[TCP_MSG_LEN];
char gid[33];
ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
r = -EINVAL;
} else { // valid message
- sscanf(msg, "%hx:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);
+ sscanf(msg, "%hx:%x:%x:%x:%s", &(im.lid), &(im.local_qpn), &(im.psn), &(im.peer_qpn),gid);
wire_gid_to_gid(gid, &im);
- ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl;
+ ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.local_qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl;
}
return r;
}
-int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im)
+int Infiniband::send_msg(CephContext *cct, int sd, ib_cm_meta_t& im)
{
int retry = 0;
ssize_t r;
char gid[33];
retry:
gid_to_wire_gid(im, gid);
- sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);
- ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn
+ sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.local_qpn, im.psn, im.peer_qpn, gid);
+ ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.local_qpn << ", " << im.psn
<< ", " << im.peer_qpn << ", " << gid << dendl;
r = ::write(sd, msg, sizeof(msg));
// Drop incoming qpt
return 0;
}
-void Infiniband::wire_gid_to_gid(const char *wgid, IBSYNMsg* im)
+void Infiniband::wire_gid_to_gid(const char *wgid, ib_cm_meta_t* im)
{
char tmp[9];
uint32_t v32;
}
}
-void Infiniband::gid_to_wire_gid(const IBSYNMsg& im, char wgid[])
+void Infiniband::gid_to_wire_gid(const ib_cm_meta_t& im, char wgid[])
{
for (int i = 0; i < 4; ++i)
sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(im.gid.raw + i * 4)));
{
if (!cct->_conf->ms_async_rdma_cm) {
qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), dispatcher->get_rx_cq(), IBV_QPT_RC, NULL);
- my_msg.qpn = qp->get_local_qp_number();
- my_msg.psn = qp->get_initial_psn();
- my_msg.lid = ib->get_lid();
- my_msg.peer_qpn = 0;
- my_msg.gid = ib->get_gid();
+ local_cm_meta.local_qpn = qp->get_local_qp_number();
+ local_cm_meta.psn = qp->get_initial_psn();
+ local_cm_meta.lid = ib->get_lid();
+ local_cm_meta.peer_qpn = 0;
+ local_cm_meta.gid = ib->get_gid();
notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
dispatcher->register_qp(qp, this);
dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
ldout(cct, 20) << __func__ << " destruct." << dendl;
cleanup();
worker->remove_pending_conn(this);
- dispatcher->erase_qpn(my_msg.qpn);
+ dispatcher->erase_qpn(local_cm_meta.local_qpn);
for (unsigned i=0; i < wc.size(); ++i) {
dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id));
memset(&qpa, 0, sizeof(qpa));
qpa.qp_state = IBV_QPS_RTR;
qpa.path_mtu = IBV_MTU_1024;
- qpa.dest_qp_num = peer_msg.qpn;
- qpa.rq_psn = peer_msg.psn;
+ qpa.dest_qp_num = peer_cm_meta.local_qpn;
+ qpa.rq_psn = peer_cm_meta.psn;
qpa.max_dest_rd_atomic = 1;
qpa.min_rnr_timer = 12;
//qpa.ah_attr.is_global = 0;
qpa.ah_attr.is_global = 1;
qpa.ah_attr.grh.hop_limit = 6;
- qpa.ah_attr.grh.dgid = peer_msg.gid;
+ qpa.ah_attr.grh.dgid = peer_cm_meta.gid;
qpa.ah_attr.grh.sgid_index = ib->get_device()->get_gid_idx();
- qpa.ah_attr.dlid = peer_msg.lid;
+ qpa.ah_attr.dlid = peer_cm_meta.lid;
qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp;
qpa.ah_attr.src_path_bits = 0;
// before giving up. Occurs when the remote side has not yet posted
// a receive request.
qpa.rnr_retry = 7; // 7 is infinite retry.
- qpa.sq_psn = my_msg.psn;
+ qpa.sq_psn = local_cm_meta.psn;
qpa.max_rd_atomic = 1;
r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
if (!is_server) {
connected = 1; //indicate successfully
- ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl;
+ ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << local_cm_meta.local_qpn << dendl;
submit(false);
}
active = true;
ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
- my_msg.peer_qpn = 0;
- r = ib->send_msg(cct, tcp_fd, my_msg);
+ local_cm_meta.peer_qpn = 0;
+ r = ib->send_msg(cct, tcp_fd, local_cm_meta);
if (r < 0)
return r;
}
void RDMAConnectedSocketImpl::handle_connection() {
- ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
- int r = ib->recv_msg(cct, tcp_fd, peer_msg);
+ ldout(cct, 20) << __func__ << " QP: " << local_cm_meta.local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
+ int r = ib->recv_msg(cct, tcp_fd, peer_cm_meta);
if (r <= 0) {
if (r != -EAGAIN) {
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
}
if (!is_server) {// syn + ack from server
- my_msg.peer_qpn = peer_msg.qpn;
- ldout(cct, 20) << __func__ << " peer msg : < " << peer_msg.qpn << ", " << peer_msg.psn
- << ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl;
+ local_cm_meta.peer_qpn = peer_cm_meta.local_qpn;
+ ldout(cct, 20) << __func__ << " peer msg : < " << peer_cm_meta.local_qpn << ", " << peer_cm_meta.psn
+ << ", " << peer_cm_meta.lid << ", " << peer_cm_meta.peer_qpn << "> " << dendl;
if (!connected) {
r = activate();
ceph_assert(!r);
}
notify();
- r = ib->send_msg(cct, tcp_fd, my_msg);
+ r = ib->send_msg(cct, tcp_fd, local_cm_meta);
if (r < 0) {
ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
fault();
}
} else {
- if (peer_msg.peer_qpn == 0) {// syn from client
+ if (peer_cm_meta.peer_qpn == 0) {// syn from client
if (active) {
ldout(cct, 10) << __func__ << " server is already active." << dendl;
return ;
}
r = activate();
ceph_assert(!r);
- r = ib->send_msg(cct, tcp_fd, my_msg);
+ r = ib->send_msg(cct, tcp_fd, local_cm_meta);
if (r < 0) {
ldout(cct, 1) << __func__ << " server ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
{
eventfd_t event_val = 0;
int r = eventfd_read(notify_fd, &event_val);
- ldout(cct, 20) << __func__ << " notify_fd : " << event_val << " in " << my_msg.qpn << " r = " << r << dendl;
+ ldout(cct, 20) << __func__ << " notify_fd : " << event_val << " in " << local_cm_meta.local_qpn << " r = " << r << dendl;
if (!active) {
ldout(cct, 1) << __func__ << " when ib not active. len: " << len << dendl;
read = read_buffers(buf,len);
if (is_server && connected == 0) {
- ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
+ ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << local_cm_meta.local_qpn << " peer QP: " << peer_cm_meta.local_qpn << dendl;
connected = 1; //if so, we don't need the last handshake
cleanup();
submit(false);
std::lock_guard l{lock};
pending_bl.claim_append(bl);
if (!connected) {
- ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;
+ ldout(cct, 20) << __func__ << " fake send to upper, QP: " << local_cm_meta.local_qpn << dendl;
return bytes;
}
}
- ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl;
+ ldout(cct, 20) << __func__ << " QP: " << local_cm_meta.local_qpn << dendl;
ssize_t r = submit(more);
if (r < 0 && r != -EAGAIN)
return r;
int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
{
- ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl;
+ ldout(cct, 20) << __func__ << " QP: " << local_cm_meta.local_qpn << " " << tx_buffers[0] << dendl;
vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
ibv_sge isge[tx_buffers.size()];
uint32_t current_sge = 0;