From: Changcheng Liu Date: Tue, 27 Aug 2019 03:48:52 +0000 (+0800) Subject: msg/async/rdma: implement send/recv in QueuePair X-Git-Tag: v15.1.0~1481^2~19 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8b74d0b641f6efd7586891d38b987d241f4cf010;p=ceph.git msg/async/rdma: implement send/recv in QueuePair send/recv is used to transact connection management meta data. QueuePair is the obj which has the meta data. Use QueuePair to transact the meta data. 1. rename send_msg to send_cm_meta 2. rename recv_msg to recv_cm_meta 3. move send/recv_cm_meta to QueuePair class scope 4. change code to adapt to the above change Signed-off-by: Changcheng Liu --- diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index 23fc5ff31d2e..b511df29327f 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -266,6 +266,98 @@ int Infiniband::QueuePair::init() return 0; } +void Infiniband::QueuePair::wire_gid_to_gid(const char *wgid, ib_cm_meta_t* cm_meta_data) +{ + char tmp[9]; + uint32_t v32; + int i; + + for (tmp[8] = 0, i = 0; i < 4; ++i) { + memcpy(tmp, wgid + i * 8, 8); + sscanf(tmp, "%x", &v32); + *(uint32_t *)(&cm_meta_data->gid.raw[i * 4]) = ntohl(v32); + } +} + +void Infiniband::QueuePair::gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, char wgid[]) +{ + for (int i = 0; i < 4; ++i) + sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(cm_meta_data.gid.raw + i * 4))); +} + +/* + * return value + * 1: means no valid buffer read + * 0: means got enough buffer + * < 0: means error + */ +int Infiniband::QueuePair::recv_cm_meta(CephContext *cct, int socket_fd, ib_cm_meta_t& cm_meta_data) +{ + char msg[TCP_MSG_LEN]; + char gid[33]; + ssize_t r = ::read(socket_fd, &msg, sizeof(msg)); + // Drop incoming qpt + if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) { + if (rand() % cct->_conf->ms_inject_socket_failures == 0) { + ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; + return -EINVAL; + } + } + if (r < 0) { + r = -errno; + lderr(cct) << __func__ << " got error " << r << ": " + << cpp_strerror(r) << dendl; + } else if (r == 0) { // valid disconnect message of length 0 + ldout(cct, 10) << __func__ << " got disconnect message " << dendl; + } else if ((size_t)r != sizeof(msg)) { // invalid message + ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl; + r = -EINVAL; + } else { // valid message + sscanf(msg, "%hx:%x:%x:%x:%s", &(cm_meta_data.lid), &(cm_meta_data.local_qpn), &(cm_meta_data.psn), &(cm_meta_data.peer_qpn), gid); + wire_gid_to_gid(gid, &cm_meta_data); + ldout(cct, 5) << __func__ << " recevd: " << cm_meta_data.lid << ", " << cm_meta_data.local_qpn + << ", " << cm_meta_data.psn << ", " << cm_meta_data.peer_qpn << ", " << gid << dendl; + } + return r; +} + +int Infiniband::QueuePair::send_cm_meta(CephContext *cct, int socket_fd, ib_cm_meta_t& cm_meta_data) +{ + int retry = 0; + ssize_t r; + + char msg[TCP_MSG_LEN]; + char gid[33]; +retry: + gid_to_wire_gid(cm_meta_data, gid); + sprintf(msg, "%04x:%08x:%08x:%08x:%s", cm_meta_data.lid, cm_meta_data.local_qpn, cm_meta_data.psn, cm_meta_data.peer_qpn, gid); + ldout(cct, 10) << __func__ << " sending: " << cm_meta_data.lid << ", " << cm_meta_data.local_qpn + << ", " << cm_meta_data.psn << ", " << cm_meta_data.peer_qpn << ", " << gid << dendl; + r = ::write(socket_fd, msg, sizeof(msg)); + // Drop incoming qpt + if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) { + if (rand() % cct->_conf->ms_inject_socket_failures == 0) { + ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; + return -EINVAL; + } + } + + if ((size_t)r != sizeof(msg)) { + // FIXME need to handle EAGAIN instead of retry + if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) { + retry++; + goto retry; + } + if (r < 0) + lderr(cct) << __func__ << " send returned error " << errno << ": " + << cpp_strerror(errno) << dendl; + else + lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl; + return -errno; + } + return 0; +} + /** * Change RC QueuePair into the ERROR state. This is necessary modify * the Queue Pair into the Error state and poll all of the relevant @@ -1043,93 +1135,6 @@ Infiniband::CompletionQueue* Infiniband::create_comp_queue( return cq; } -// 1 means no valid buffer read, 0 means got enough buffer -// else return < 0 means error -int Infiniband::recv_msg(CephContext *cct, int sd, ib_cm_meta_t& im) -{ - char msg[TCP_MSG_LEN]; - char gid[33]; - ssize_t r = ::read(sd, &msg, sizeof(msg)); - // Drop incoming qpt - if (cct->_conf->ms_inject_socket_failures && sd >= 0) { - if (rand() % cct->_conf->ms_inject_socket_failures == 0) { - ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; - return -EINVAL; - } - } - if (r < 0) { - r = -errno; - lderr(cct) << __func__ << " got error " << r << ": " - << cpp_strerror(r) << dendl; - } else if (r == 0) { // valid disconnect message of length 0 - ldout(cct, 10) << __func__ << " got disconnect message " << dendl; - } else if ((size_t)r != sizeof(msg)) { // invalid message - ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl; - r = -EINVAL; - } else { // valid message - sscanf(msg, "%hx:%x:%x:%x:%s", &(im.lid), &(im.local_qpn), &(im.psn), &(im.peer_qpn),gid); - wire_gid_to_gid(gid, &im); - ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.local_qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl; - } - return r; -} - -int Infiniband::send_msg(CephContext *cct, int sd, ib_cm_meta_t& im) -{ - int retry = 0; - ssize_t r; - - char msg[TCP_MSG_LEN]; - char gid[33]; -retry: - gid_to_wire_gid(im, gid); - sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.local_qpn, im.psn, im.peer_qpn, gid); - ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.local_qpn << ", " << im.psn - << ", " << im.peer_qpn << ", " << gid << dendl; - r = ::write(sd, msg, sizeof(msg)); - // Drop incoming qpt - if (cct->_conf->ms_inject_socket_failures && sd >= 0) { - if (rand() % cct->_conf->ms_inject_socket_failures == 0) { - ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; - return -EINVAL; - } - } - - if ((size_t)r != sizeof(msg)) { - // FIXME need to handle EAGAIN instead of retry - if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) { - retry++; - goto retry; - } - if (r < 0) - lderr(cct) << __func__ << " send returned error " << errno << ": " - << cpp_strerror(errno) << dendl; - else - lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl; - return -errno; - } - return 0; -} - -void Infiniband::wire_gid_to_gid(const char *wgid, ib_cm_meta_t* im) -{ - char tmp[9]; - uint32_t v32; - int i; - - for (tmp[8] = 0, i = 0; i < 4; ++i) { - memcpy(tmp, wgid + i * 8, 8); - sscanf(tmp, "%x", &v32); - *(uint32_t *)(&im->gid.raw[i * 4]) = ntohl(v32); - } -} - -void Infiniband::gid_to_wire_gid(const ib_cm_meta_t& im, char wgid[]) -{ - for (int i = 0; i < 4; ++i) - sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(im.gid.raw + i * 4))); -} - Infiniband::QueuePair::~QueuePair() { if (qp) { diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index 63ac44c2c3e6..dc7ff9e306aa 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -373,8 +373,6 @@ class Infiniband { Device *device = NULL; ProtectionDomain *pd = NULL; DeviceList *device_list = nullptr; - void wire_gid_to_gid(const char *wgid, ib_cm_meta_t* im); - void gid_to_wire_gid(const ib_cm_meta_t& im, char wgid[]); CephContext *cct; ceph::mutex lock = ceph::make_mutex("IB lock"); bool initialized = false; @@ -475,6 +473,13 @@ class Infiniband { * Get the state of a QueuePair. */ int get_state() const; + /* + * send/receive connection management meta data + */ + int send_cm_meta(CephContext *cct, int socket_fd, ib_cm_meta_t& cm_meta_data); + int recv_cm_meta(CephContext *cct, int socket_fd, ib_cm_meta_t& cm_meta_data); + void wire_gid_to_gid(const char *wgid, ib_cm_meta_t* cm_meta_data); + void gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, char wgid[]); void add_tx_wr(uint32_t amt) { tx_wr_inflight += amt; } void dec_tx_wr(uint32_t amt) { tx_wr_inflight -= amt; } uint32_t get_tx_wr() const { return tx_wr_inflight; } @@ -519,8 +524,6 @@ class Infiniband { CompletionChannel *create_comp_channel(CephContext *c); CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL); uint8_t get_ib_physical_port() { return ib_physical_port; } - int send_msg(CephContext *cct, int sd, ib_cm_meta_t& msg); - int recv_msg(CephContext *cct, int sd, ib_cm_meta_t& msg); uint16_t get_lid() { return device->get_lid(); } ibv_gid get_gid() { return device->get_gid(); } MemoryManager* get_memory_manager() { return memory_manager; } diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index 516190100652..9b4e9cec9485 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -190,7 +190,7 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl; net.set_priority(tcp_fd, opts.priority, peer_addr.get_family()); local_cm_meta.peer_qpn = 0; - r = ib->send_msg(cct, tcp_fd, local_cm_meta); + r = qp->send_cm_meta(cct, tcp_fd, local_cm_meta); if (r < 0) return r; @@ -200,7 +200,7 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S void RDMAConnectedSocketImpl::handle_connection() { ldout(cct, 20) << __func__ << " QP: " << local_cm_meta.local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl; - int r = ib->recv_msg(cct, tcp_fd, peer_cm_meta); + int r = qp->recv_cm_meta(cct, tcp_fd, peer_cm_meta); if (r <= 0) { if (r != -EAGAIN) { dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); @@ -225,7 +225,7 @@ void RDMAConnectedSocketImpl::handle_connection() { ceph_assert(!r); } notify(); - r = ib->send_msg(cct, tcp_fd, local_cm_meta); + r = qp->send_cm_meta(cct, tcp_fd, local_cm_meta); if (r < 0) { ldout(cct, 1) << __func__ << " send client ack failed." << dendl; dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); @@ -239,7 +239,7 @@ void RDMAConnectedSocketImpl::handle_connection() { } r = activate(); ceph_assert(!r); - r = ib->send_msg(cct, tcp_fd, local_cm_meta); + r = qp->send_cm_meta(cct, tcp_fd, local_cm_meta); if (r < 0) { ldout(cct, 1) << __func__ << " server ack failed." << dendl; dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);