return 0;
}
+void Infiniband::QueuePair::wire_gid_to_gid(const char *wgid, ib_cm_meta_t* cm_meta_data)
+{
+ char tmp[9];
+ uint32_t v32;
+ int i;
+
+ for (tmp[8] = 0, i = 0; i < 4; ++i) {
+ memcpy(tmp, wgid + i * 8, 8);
+ sscanf(tmp, "%x", &v32);
+ *(uint32_t *)(&cm_meta_data->gid.raw[i * 4]) = ntohl(v32);
+ }
+}
+
+void Infiniband::QueuePair::gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, char wgid[])
+{
+ for (int i = 0; i < 4; ++i)
+ sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(cm_meta_data.gid.raw + i * 4)));
+}
+
+/*
+ * return value
+ * 1: means no valid buffer read
+ * 0: means got enough buffer
+ * < 0: means error
+ */
+int Infiniband::QueuePair::recv_cm_meta(CephContext *cct, int socket_fd, ib_cm_meta_t& cm_meta_data)
+{
+ char msg[TCP_MSG_LEN];
+ char gid[33];
+ ssize_t r = ::read(socket_fd, &msg, sizeof(msg));
+ // Drop incoming qpt
+ if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) {
+ if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+ return -EINVAL;
+ }
+ }
+ if (r < 0) {
+ r = -errno;
+ lderr(cct) << __func__ << " got error " << r << ": "
+ << cpp_strerror(r) << dendl;
+ } else if (r == 0) { // valid disconnect message of length 0
+ ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
+ } else if ((size_t)r != sizeof(msg)) { // invalid message
+ ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
+ r = -EINVAL;
+ } else { // valid message
+ sscanf(msg, "%hx:%x:%x:%x:%s", &(cm_meta_data.lid), &(cm_meta_data.local_qpn), &(cm_meta_data.psn), &(cm_meta_data.peer_qpn), gid);
+ wire_gid_to_gid(gid, &cm_meta_data);
+ ldout(cct, 5) << __func__ << " recevd: " << cm_meta_data.lid << ", " << cm_meta_data.local_qpn
+ << ", " << cm_meta_data.psn << ", " << cm_meta_data.peer_qpn << ", " << gid << dendl;
+ }
+ return r;
+}
+
+int Infiniband::QueuePair::send_cm_meta(CephContext *cct, int socket_fd, ib_cm_meta_t& cm_meta_data)
+{
+ int retry = 0;
+ ssize_t r;
+
+ char msg[TCP_MSG_LEN];
+ char gid[33];
+retry:
+ gid_to_wire_gid(cm_meta_data, gid);
+ sprintf(msg, "%04x:%08x:%08x:%08x:%s", cm_meta_data.lid, cm_meta_data.local_qpn, cm_meta_data.psn, cm_meta_data.peer_qpn, gid);
+ ldout(cct, 10) << __func__ << " sending: " << cm_meta_data.lid << ", " << cm_meta_data.local_qpn
+ << ", " << cm_meta_data.psn << ", " << cm_meta_data.peer_qpn << ", " << gid << dendl;
+ r = ::write(socket_fd, msg, sizeof(msg));
+ // Drop incoming qpt
+ if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) {
+ if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+ return -EINVAL;
+ }
+ }
+
+ if ((size_t)r != sizeof(msg)) {
+ // FIXME need to handle EAGAIN instead of retry
+ if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
+ retry++;
+ goto retry;
+ }
+ if (r < 0)
+ lderr(cct) << __func__ << " send returned error " << errno << ": "
+ << cpp_strerror(errno) << dendl;
+ else
+ lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
+ return -errno;
+ }
+ return 0;
+}
+
/**
* Change RC QueuePair into the ERROR state. This is necessary modify
* the Queue Pair into the Error state and poll all of the relevant
return cq;
}
-// 1 means no valid buffer read, 0 means got enough buffer
-// else return < 0 means error
-int Infiniband::recv_msg(CephContext *cct, int sd, ib_cm_meta_t& im)
-{
- char msg[TCP_MSG_LEN];
- char gid[33];
- ssize_t r = ::read(sd, &msg, sizeof(msg));
- // Drop incoming qpt
- if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
- if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
- ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
- return -EINVAL;
- }
- }
- if (r < 0) {
- r = -errno;
- lderr(cct) << __func__ << " got error " << r << ": "
- << cpp_strerror(r) << dendl;
- } else if (r == 0) { // valid disconnect message of length 0
- ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
- } else if ((size_t)r != sizeof(msg)) { // invalid message
- ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
- r = -EINVAL;
- } else { // valid message
- sscanf(msg, "%hx:%x:%x:%x:%s", &(im.lid), &(im.local_qpn), &(im.psn), &(im.peer_qpn),gid);
- wire_gid_to_gid(gid, &im);
- ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.local_qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl;
- }
- return r;
-}
-
-int Infiniband::send_msg(CephContext *cct, int sd, ib_cm_meta_t& im)
-{
- int retry = 0;
- ssize_t r;
-
- char msg[TCP_MSG_LEN];
- char gid[33];
-retry:
- gid_to_wire_gid(im, gid);
- sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.local_qpn, im.psn, im.peer_qpn, gid);
- ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.local_qpn << ", " << im.psn
- << ", " << im.peer_qpn << ", " << gid << dendl;
- r = ::write(sd, msg, sizeof(msg));
- // Drop incoming qpt
- if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
- if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
- ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
- return -EINVAL;
- }
- }
-
- if ((size_t)r != sizeof(msg)) {
- // FIXME need to handle EAGAIN instead of retry
- if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
- retry++;
- goto retry;
- }
- if (r < 0)
- lderr(cct) << __func__ << " send returned error " << errno << ": "
- << cpp_strerror(errno) << dendl;
- else
- lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
- return -errno;
- }
- return 0;
-}
-
-void Infiniband::wire_gid_to_gid(const char *wgid, ib_cm_meta_t* im)
-{
- char tmp[9];
- uint32_t v32;
- int i;
-
- for (tmp[8] = 0, i = 0; i < 4; ++i) {
- memcpy(tmp, wgid + i * 8, 8);
- sscanf(tmp, "%x", &v32);
- *(uint32_t *)(&im->gid.raw[i * 4]) = ntohl(v32);
- }
-}
-
-void Infiniband::gid_to_wire_gid(const ib_cm_meta_t& im, char wgid[])
-{
- for (int i = 0; i < 4; ++i)
- sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(im.gid.raw + i * 4)));
-}
-
Infiniband::QueuePair::~QueuePair()
{
if (qp) {
Device *device = NULL;
ProtectionDomain *pd = NULL;
DeviceList *device_list = nullptr;
- void wire_gid_to_gid(const char *wgid, ib_cm_meta_t* im);
- void gid_to_wire_gid(const ib_cm_meta_t& im, char wgid[]);
CephContext *cct;
ceph::mutex lock = ceph::make_mutex("IB lock");
bool initialized = false;
* Get the state of a QueuePair.
*/
int get_state() const;
+ /*
+ * send/receive connection management meta data
+ */
+ int send_cm_meta(CephContext *cct, int socket_fd, ib_cm_meta_t& cm_meta_data);
+ int recv_cm_meta(CephContext *cct, int socket_fd, ib_cm_meta_t& cm_meta_data);
+ void wire_gid_to_gid(const char *wgid, ib_cm_meta_t* cm_meta_data);
+ void gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, char wgid[]);
void add_tx_wr(uint32_t amt) { tx_wr_inflight += amt; }
void dec_tx_wr(uint32_t amt) { tx_wr_inflight -= amt; }
uint32_t get_tx_wr() const { return tx_wr_inflight; }
CompletionChannel *create_comp_channel(CephContext *c);
CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
uint8_t get_ib_physical_port() { return ib_physical_port; }
- int send_msg(CephContext *cct, int sd, ib_cm_meta_t& msg);
- int recv_msg(CephContext *cct, int sd, ib_cm_meta_t& msg);
uint16_t get_lid() { return device->get_lid(); }
ibv_gid get_gid() { return device->get_gid(); }
MemoryManager* get_memory_manager() { return memory_manager; }
ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
local_cm_meta.peer_qpn = 0;
- r = ib->send_msg(cct, tcp_fd, local_cm_meta);
+ r = qp->send_cm_meta(cct, tcp_fd, local_cm_meta);
if (r < 0)
return r;
void RDMAConnectedSocketImpl::handle_connection() {
ldout(cct, 20) << __func__ << " QP: " << local_cm_meta.local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
- int r = ib->recv_msg(cct, tcp_fd, peer_cm_meta);
+ int r = qp->recv_cm_meta(cct, tcp_fd, peer_cm_meta);
if (r <= 0) {
if (r != -EAGAIN) {
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
ceph_assert(!r);
}
notify();
- r = ib->send_msg(cct, tcp_fd, local_cm_meta);
+ r = qp->send_cm_meta(cct, tcp_fd, local_cm_meta);
if (r < 0) {
ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
}
r = activate();
ceph_assert(!r);
- r = ib->send_msg(cct, tcp_fd, local_cm_meta);
+ r = qp->send_cm_meta(cct, tcp_fd, local_cm_meta);
if (r < 0) {
ldout(cct, 1) << __func__ << " server ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);