From: Adir Lev Date: Mon, 26 Dec 2016 15:43:06 +0000 (+0000) Subject: msg/async/rdma: clean line endings X-Git-Tag: v12.0.0~349^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F12688%2Fhead;p=ceph.git msg/async/rdma: clean line endings Change-Id: I59359ae585b66018fe54d5e841a6506c077c606f Signed-off-by: Adir Lev --- diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index d81c338d2a2e..cec6998545f3 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -1,631 +1,631 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 XSKY - * - * Author: Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "Infiniband.h" -#include "common/errno.h" -#include "common/debug.h" - -#define dout_subsys ceph_subsys_ms -#undef dout_prefix -#define dout_prefix *_dout << "Infiniband " - -static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1; -static const uint32_t MAX_INLINE_DATA = 128; -static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000"); -static const uint32_t CQ_DEPTH = 30000; - -Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr) -{ - if (device == NULL) { - lderr(cct) << __func__ << "device == NULL" << cpp_strerror(errno) << dendl; - ceph_abort(); - } - name = ibv_get_device_name(device); - ctxt = ibv_open_device(device); - if (ctxt == NULL) { - lderr(cct) << __func__ << "open rdma device failed. " << cpp_strerror(errno) << dendl; - ceph_abort(); - } - int r = ibv_query_device(ctxt, device_attr); - if (r == -1) { - lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl; - ceph_abort(); - } -} - -Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) { - union ibv_gid cgid; - struct ibv_exp_gid_attr gid_attr; - bool malformed = false; - - int r = ibv_query_port(ctxt, port_num, port_attr); - if (r == -1) { - lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl; - ceph_abort(); - } - - lid = port_attr->lid; - - // search for requested GID in GIDs table - ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid) - << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl; - r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(), - "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx" - ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx", - &cgid.raw[ 0], &cgid.raw[ 1], - &cgid.raw[ 2], &cgid.raw[ 3], - &cgid.raw[ 4], &cgid.raw[ 5], - &cgid.raw[ 6], &cgid.raw[ 7], - &cgid.raw[ 8], &cgid.raw[ 9], - &cgid.raw[10], &cgid.raw[11], - &cgid.raw[12], &cgid.raw[13], - &cgid.raw[14], &cgid.raw[15]); - - if (r != 16) { - ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl; - malformed = true; - } - - gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE; - - for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) { - r = ibv_query_gid(ctxt, port_num, gid_idx, &gid); - if (r) { - lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl; - ceph_abort(); - } - r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr); - if (r) { - lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl; - ceph_abort(); - } - - if (malformed) break; // stay with gid_idx=0 - if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) && - (memcmp(&gid, &cgid, 16) == 0) ) { - ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl; - break; - } - } - - if (gid_idx == port_attr->gid_tbl_len) { - lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl; - ceph_abort(); - } - } - -void Device::binding_port(CephContext *cct, uint8_t port_num) { - port_cnt = device_attr->phys_port_cnt; - ports = new Port*[port_cnt]; - for (uint8_t i = 0; i < port_cnt; ++i) { - ports[i] = new Port(cct, ctxt, i+1); - if (i+1 == port_num && ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) { - active_port = ports[i]; - ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl; - return ; - } else { - ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << ports[i]->get_port_attr()->state << ")"<< dendl; - } - } - if (nullptr == active_port) { - lderr(cct) << __func__ << " port not found" << dendl; - assert(active_port); - } -} - -Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num): device_list(cct) -{ - device = device_list.get_device(device_name.c_str()); - device->binding_port(cct, port_num); - assert(device); - ib_physical_port = device->active_port->get_port_num(); - pd = new ProtectionDomain(cct, device); - assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0); - - max_recv_wr = device->device_attr->max_srq_wr; - if (max_recv_wr > cct->_conf->ms_async_rdma_receive_buffers) { - max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers; - ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl; - } else { - ldout(cct, 1) << __func__ << " using the max allowed receive buffers: " << max_recv_wr << dendl; - } - - max_send_wr = device->device_attr->max_qp_wr; - if (max_send_wr > cct->_conf->ms_async_rdma_send_buffers) { - max_send_wr = cct->_conf->ms_async_rdma_send_buffers; - ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl; - } else { - ldout(cct, 1) << __func__ << " using the max allowed send buffers: " << max_send_wr << dendl; - } - - ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe - << " completion entries" << dendl; - - memory_manager = new MemoryManager(device, pd, - cct->_conf->ms_async_rdma_enable_hugepage); - memory_manager->register_rx_tx( - cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr); - - srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT); - post_channel_cluster(); -} - -/** - * Create a shared receive queue. This basically wraps the verbs call. - * - * \param[in] max_wr - * The max number of outstanding work requests in the SRQ. - * \param[in] max_sge - * The max number of scatter elements per WR. - * \return - * A valid ibv_srq pointer, or NULL on error. - */ -ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge) -{ - ibv_srq_init_attr sia; - memset(&sia, 0, sizeof(sia)); - sia.srq_context = device->ctxt; - sia.attr.max_wr = max_wr; - sia.attr.max_sge = max_sge; - return ibv_create_srq(pd->pd, &sia); -} - -/** - * Create a new QueuePair. This factory should be used in preference to - * the QueuePair constructor directly, since this lets derivatives of - * Infiniband, e.g. MockInfiniband (if it existed), - * return mocked out QueuePair derivatives. - * - * \return - * QueuePair on success or NULL if init fails - * See QueuePair::QueuePair for parameter documentation. - */ -Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type) -{ - Infiniband::QueuePair *qp = new QueuePair( - cct, *this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr); - if (qp->init()) { - delete qp; - return NULL; - } - return qp; -} - -int Infiniband::QueuePair::init() -{ - ldout(cct, 20) << __func__ << " started." << dendl; - ibv_qp_init_attr qpia; - memset(&qpia, 0, sizeof(qpia)); - qpia.send_cq = txcq->get_cq(); - qpia.recv_cq = rxcq->get_cq(); - qpia.srq = srq; // use the same shared receive queue - qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests - qpia.cap.max_send_sge = 1; // max send scatter-gather elements - qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q - qpia.qp_type = type; // RC, UC, UD, or XRC - qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs - - qp = ibv_create_qp(pd, &qpia); - if (qp == NULL) { - lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl; - lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_buffers or" - " ms_async_rdma_send_buffers" << dendl; - return -1; - } - - ldout(cct, 20) << __func__ << " successfully create queue pair: " - << "qp=" << qp << dendl; - - // move from RESET to INIT state - ibv_qp_attr qpa; - memset(&qpa, 0, sizeof(qpa)); - qpa.qp_state = IBV_QPS_INIT; - qpa.pkey_index = 0; - qpa.port_num = (uint8_t)(ib_physical_port); - qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE; - qpa.qkey = q_key; - - int mask = IBV_QP_STATE | IBV_QP_PORT; - switch (type) { - case IBV_QPT_RC: - mask |= IBV_QP_ACCESS_FLAGS; - mask |= IBV_QP_PKEY_INDEX; - break; - case IBV_QPT_UD: - mask |= IBV_QP_QKEY; - mask |= IBV_QP_PKEY_INDEX; - break; - case IBV_QPT_RAW_PACKET: - break; - default: - ceph_abort(); - } - - int ret = ibv_modify_qp(qp, &qpa, mask); - if (ret) { - ibv_destroy_qp(qp); - lderr(cct) << __func__ << " failed to transition to INIT state: " - << cpp_strerror(errno) << dendl; - return -1; - } - ldout(cct, 20) << __func__ << " successfully change queue pair to INIT:" - << " qp=" << qp << dendl; - return 0; -} - -/** - * Change RC QueuePair into the ERROR state. This is necessary modify - * the Queue Pair into the Error state and poll all of the relevant - * Work Completions prior to destroying a Queue Pair. - * Since destroying a Queue Pair does not guarantee that its Work - * Completions are removed from the CQ upon destruction. Even if the - * Work Completions are already in the CQ, it might not be possible to - * retrieve them. If the Queue Pair is associated with an SRQ, it is - * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED - * - * \return - * -errno if the QueuePair can't switch to ERROR - * 0 for success. - */ -int Infiniband::QueuePair::to_dead() -{ - if (dead) - return 0; - ibv_qp_attr qpa; - memset(&qpa, 0, sizeof(qpa)); - qpa.qp_state = IBV_QPS_ERR; - - int mask = IBV_QP_STATE; - int ret = ibv_modify_qp(qp, &qpa, mask); - if (ret) { - lderr(cct) << __func__ << " failed to transition to ERROR state: " - << cpp_strerror(errno) << dendl; - return -errno; - } - dead = true; - return ret; -} - -int Infiniband::post_chunk(Chunk* chunk) -{ - ibv_sge isge; - isge.addr = reinterpret_cast(chunk->buffer); - isge.length = chunk->bytes; - isge.lkey = chunk->mr->lkey; - ibv_recv_wr rx_work_request; - - memset(&rx_work_request, 0, sizeof(rx_work_request)); - rx_work_request.wr_id = reinterpret_cast(chunk);// stash descriptor ptr - rx_work_request.next = NULL; - rx_work_request.sg_list = &isge; - rx_work_request.num_sge = 1; - - ibv_recv_wr *badWorkRequest; - int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest); - if (ret) - return -1; - return 0; -} - -int Infiniband::post_channel_cluster() -{ - vector free_chunks; - int r = memory_manager->get_channel_buffers(free_chunks, 0); - assert(r > 0); - for (vector::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) { - r = post_chunk(*iter); - assert(r == 0); - } - return 0; -} - -Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c) -{ - Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this); - if (cc->init()) { - delete cc; - return NULL; - } - return cc; -} - -Infiniband::CompletionQueue* Infiniband::create_comp_queue( - CephContext *cct, CompletionChannel *cc) -{ - Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue( - cct, *this, CQ_DEPTH, cc); - if (cq->init()) { - delete cq; - return NULL; - } - return cq; -} - - -Infiniband::QueuePair::QueuePair( - CephContext *c, Infiniband& infiniband, ibv_qp_type type, - int port, ibv_srq *srq, - Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, - uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key) -: cct(c), infiniband(infiniband), - type(type), - ctxt(infiniband.device->ctxt), - ib_physical_port(port), - pd(infiniband.pd->pd), - srq(srq), - qp(NULL), - txcq(txcq), - rxcq(rxcq), - initial_psn(0), - max_send_wr(max_send_wr), - max_recv_wr(max_recv_wr), - q_key(q_key), - dead(false) -{ - initial_psn = lrand48() & 0xffffff; - if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) { - lderr(cct) << __func__ << "invalid queue pair type" << cpp_strerror(errno) << dendl; - ceph_abort(); - } - pd = infiniband.pd->pd; -} - -// 1 means no valid buffer read, 0 means got enough buffer -// else return < 0 means error -int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im) -{ - char msg[TCP_MSG_LEN]; - char gid[33]; - ssize_t r = ::read(sd, &msg, sizeof(msg)); - // Drop incoming qpt - if (cct->_conf->ms_inject_socket_failures && sd >= 0) { - if (rand() % cct->_conf->ms_inject_socket_failures == 0) { - ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; - return -EINVAL; - } - } - if (r < 0) { - r = -errno; - lderr(cct) << __func__ << " got error " << errno << ": " - << cpp_strerror(errno) << dendl; - } else if (r == 0) { // valid disconnect message of length 0 - ldout(cct, 10) << __func__ << " got disconnect message " << dendl; - } else if ((size_t)r != sizeof(msg)) { // invalid message - ldout(cct, 1) << __func__ << " got bad length (" << r << "): " << cpp_strerror(errno) << dendl; - r = -EINVAL; - } else { // valid message - sscanf(msg, "%x:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid); - wire_gid_to_gid(gid, &(im.gid)); - ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl; - } - return r; -} - -int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im) -{ - int retry = 0; - ssize_t r; - - char msg[TCP_MSG_LEN]; - char gid[33]; -retry: - gid_to_wire_gid(&(im.gid), gid); - sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid); - ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn - << ", " << im.peer_qpn << ", " << gid << dendl; - r = ::write(sd, msg, sizeof(msg)); - // Drop incoming qpt - if (cct->_conf->ms_inject_socket_failures && sd >= 0) { - if (rand() % cct->_conf->ms_inject_socket_failures == 0) { - ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; - return -EINVAL; - } - } - - if ((size_t)r != sizeof(msg)) { - // FIXME need to handle EAGAIN instead of retry - if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) { - retry++; - goto retry; - } - if (r < 0) - lderr(cct) << __func__ << " send returned error " << errno << ": " - << cpp_strerror(errno) << dendl; - else - lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl; - return -errno; - } - return 0; -} - -void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid) -{ - char tmp[9]; - uint32_t v32; - int i; - - for (tmp[8] = 0, i = 0; i < 4; ++i) { - memcpy(tmp, wgid + i * 8, 8); - sscanf(tmp, "%x", &v32); - *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32); - } -} - -void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[]) -{ - for (int i = 0; i < 4; ++i) - sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4))); -} - -Infiniband::QueuePair::~QueuePair() -{ - if (qp) - assert(!ibv_destroy_qp(qp)); -} - -Infiniband::CompletionChannel::~CompletionChannel() -{ - if (channel) { - int r = ibv_destroy_comp_channel(channel); - if (r < 0) - lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl; - assert(r == 0); - } -} - -Infiniband::CompletionQueue::~CompletionQueue() -{ - if (cq) { - int r = ibv_destroy_cq(cq); - if (r < 0) - lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl; - assert(r == 0); - } -} - -int Infiniband::CompletionQueue::rearm_notify(bool solicite_only) -{ - ldout(cct, 20) << __func__ << " started." << dendl; - int r = ibv_req_notify_cq(cq, 0); - if (r < 0) - lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl; - return r; -} - -int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) { - int r = ibv_poll_cq(cq, num_entries, ret_wc_array); - if (r < 0) { - lderr(cct) << __func__ << " poll_completion_queue occur met error: " - << cpp_strerror(errno) << dendl; - return -1; - } - return r; -} - -bool Infiniband::CompletionChannel::get_cq_event() -{ - ibv_cq *cq = NULL; - void *ev_ctx; - if (ibv_get_cq_event(channel, &cq, &ev_ctx)) { - if (errno != EAGAIN && errno != EINTR) - lderr(cct) << __func__ << " failed to retrieve CQ event: " - << cpp_strerror(errno) << dendl; - return false; - } - - /* accumulate number of cq events that need to - * * be acked, and periodically ack them - * */ - if (++cq_events_that_need_ack == MAX_ACK_EVENT) { - ldout(cct, 20) << __func__ << " ack aq events." << dendl; - ibv_ack_cq_events(cq, MAX_ACK_EVENT); - cq_events_that_need_ack = 0; - } - - return true; -} - -int Infiniband::CompletionQueue::init() -{ - cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0); - if (!cq) { - lderr(cct) << __func__ << " failed to create receive completion queue: " - << cpp_strerror(errno) << dendl; - return -1; - } - - if (ibv_req_notify_cq(cq, 0)) { - lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl; - ibv_destroy_cq(cq); - cq = nullptr; - return -1; - } - - channel->bind_cq(cq); - ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl; - return 0; -} - -int Infiniband::CompletionChannel::init() -{ - ldout(cct, 20) << __func__ << " started." << dendl; - channel = ibv_create_comp_channel(infiniband.device->ctxt); - if (!channel) { - lderr(cct) << __func__ << " failed to create receive completion channel: " - << cpp_strerror(errno) << dendl; - return -1; - } - int rc = NetHandler(cct).set_nonblock(channel->fd); - if (rc < 0) { - ibv_destroy_comp_channel(channel); - return -1; - } - return 0; -} - -/** - * Given a string representation of the `status' field from Verbs - * struct `ibv_wc'. - * - * \param[in] status - * The integer status obtained in ibv_wc.status. - * \return - * A string corresponding to the given status. - */ -const char* Infiniband::wc_status_to_string(int status) -{ - static const char *lookup[] = { - "SUCCESS", - "LOC_LEN_ERR", - "LOC_QP_OP_ERR", - "LOC_EEC_OP_ERR", - "LOC_PROT_ERR", - "WR_FLUSH_ERR", - "MW_BIND_ERR", - "BAD_RESP_ERR", - "LOC_ACCESS_ERR", - "REM_INV_REQ_ERR", - "REM_ACCESS_ERR", - "REM_OP_ERR", - "RETRY_EXC_ERR", - "RNR_RETRY_EXC_ERR", - "LOC_RDD_VIOL_ERR", - "REM_INV_RD_REQ_ERR", - "REM_ABORT_ERR", - "INV_EECN_ERR", - "INV_EEC_STATE_ERR", - "FATAL_ERR", - "RESP_TIMEOUT_ERR", - "GENERAL_ERR" - }; - - if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR) - return ""; - return lookup[status]; -} - -const char* Infiniband::qp_state_string(int status) { - switch(status) { - case IBV_QPS_RESET : return "IBV_QPS_RESET"; - case IBV_QPS_INIT : return "IBV_QPS_INIT"; - case IBV_QPS_RTR : return "IBV_QPS_RTR"; - case IBV_QPS_RTS : return "IBV_QPS_RTS"; - case IBV_QPS_SQD : return "IBV_QPS_SQD"; - case IBV_QPS_SQE : return "IBV_QPS_SQE"; - case IBV_QPS_ERR : return "IBV_QPS_ERR"; - default: return " out of range."; - } -} +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "Infiniband.h" +#include "common/errno.h" +#include "common/debug.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << "Infiniband " + +static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1; +static const uint32_t MAX_INLINE_DATA = 128; +static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000"); +static const uint32_t CQ_DEPTH = 30000; + +Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr) +{ + if (device == NULL) { + lderr(cct) << __func__ << "device == NULL" << cpp_strerror(errno) << dendl; + ceph_abort(); + } + name = ibv_get_device_name(device); + ctxt = ibv_open_device(device); + if (ctxt == NULL) { + lderr(cct) << __func__ << "open rdma device failed. " << cpp_strerror(errno) << dendl; + ceph_abort(); + } + int r = ibv_query_device(ctxt, device_attr); + if (r == -1) { + lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl; + ceph_abort(); + } +} + +Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) { + union ibv_gid cgid; + struct ibv_exp_gid_attr gid_attr; + bool malformed = false; + + int r = ibv_query_port(ctxt, port_num, port_attr); + if (r == -1) { + lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl; + ceph_abort(); + } + + lid = port_attr->lid; + + // search for requested GID in GIDs table + ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid) + << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl; + r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(), + "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx" + ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx", + &cgid.raw[ 0], &cgid.raw[ 1], + &cgid.raw[ 2], &cgid.raw[ 3], + &cgid.raw[ 4], &cgid.raw[ 5], + &cgid.raw[ 6], &cgid.raw[ 7], + &cgid.raw[ 8], &cgid.raw[ 9], + &cgid.raw[10], &cgid.raw[11], + &cgid.raw[12], &cgid.raw[13], + &cgid.raw[14], &cgid.raw[15]); + + if (r != 16) { + ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl; + malformed = true; + } + + gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE; + + for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) { + r = ibv_query_gid(ctxt, port_num, gid_idx, &gid); + if (r) { + lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl; + ceph_abort(); + } + r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr); + if (r) { + lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl; + ceph_abort(); + } + + if (malformed) break; // stay with gid_idx=0 + if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) && + (memcmp(&gid, &cgid, 16) == 0) ) { + ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl; + break; + } + } + + if (gid_idx == port_attr->gid_tbl_len) { + lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl; + ceph_abort(); + } + } + +void Device::binding_port(CephContext *cct, uint8_t port_num) { + port_cnt = device_attr->phys_port_cnt; + ports = new Port*[port_cnt]; + for (uint8_t i = 0; i < port_cnt; ++i) { + ports[i] = new Port(cct, ctxt, i+1); + if (i+1 == port_num && ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) { + active_port = ports[i]; + ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl; + return ; + } else { + ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << ports[i]->get_port_attr()->state << ")"<< dendl; + } + } + if (nullptr == active_port) { + lderr(cct) << __func__ << " port not found" << dendl; + assert(active_port); + } +} + +Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num): device_list(cct) +{ + device = device_list.get_device(device_name.c_str()); + device->binding_port(cct, port_num); + assert(device); + ib_physical_port = device->active_port->get_port_num(); + pd = new ProtectionDomain(cct, device); + assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0); + + max_recv_wr = device->device_attr->max_srq_wr; + if (max_recv_wr > cct->_conf->ms_async_rdma_receive_buffers) { + max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers; + ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl; + } else { + ldout(cct, 1) << __func__ << " using the max allowed receive buffers: " << max_recv_wr << dendl; + } + + max_send_wr = device->device_attr->max_qp_wr; + if (max_send_wr > cct->_conf->ms_async_rdma_send_buffers) { + max_send_wr = cct->_conf->ms_async_rdma_send_buffers; + ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl; + } else { + ldout(cct, 1) << __func__ << " using the max allowed send buffers: " << max_send_wr << dendl; + } + + ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe + << " completion entries" << dendl; + + memory_manager = new MemoryManager(device, pd, + cct->_conf->ms_async_rdma_enable_hugepage); + memory_manager->register_rx_tx( + cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr); + + srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT); + post_channel_cluster(); +} + +/** + * Create a shared receive queue. This basically wraps the verbs call. + * + * \param[in] max_wr + * The max number of outstanding work requests in the SRQ. + * \param[in] max_sge + * The max number of scatter elements per WR. + * \return + * A valid ibv_srq pointer, or NULL on error. + */ +ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge) +{ + ibv_srq_init_attr sia; + memset(&sia, 0, sizeof(sia)); + sia.srq_context = device->ctxt; + sia.attr.max_wr = max_wr; + sia.attr.max_sge = max_sge; + return ibv_create_srq(pd->pd, &sia); +} + +/** + * Create a new QueuePair. This factory should be used in preference to + * the QueuePair constructor directly, since this lets derivatives of + * Infiniband, e.g. MockInfiniband (if it existed), + * return mocked out QueuePair derivatives. + * + * \return + * QueuePair on success or NULL if init fails + * See QueuePair::QueuePair for parameter documentation. + */ +Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type) +{ + Infiniband::QueuePair *qp = new QueuePair( + cct, *this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr); + if (qp->init()) { + delete qp; + return NULL; + } + return qp; +} + +int Infiniband::QueuePair::init() +{ + ldout(cct, 20) << __func__ << " started." << dendl; + ibv_qp_init_attr qpia; + memset(&qpia, 0, sizeof(qpia)); + qpia.send_cq = txcq->get_cq(); + qpia.recv_cq = rxcq->get_cq(); + qpia.srq = srq; // use the same shared receive queue + qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests + qpia.cap.max_send_sge = 1; // max send scatter-gather elements + qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q + qpia.qp_type = type; // RC, UC, UD, or XRC + qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs + + qp = ibv_create_qp(pd, &qpia); + if (qp == NULL) { + lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl; + lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_buffers or" + " ms_async_rdma_send_buffers" << dendl; + return -1; + } + + ldout(cct, 20) << __func__ << " successfully create queue pair: " + << "qp=" << qp << dendl; + + // move from RESET to INIT state + ibv_qp_attr qpa; + memset(&qpa, 0, sizeof(qpa)); + qpa.qp_state = IBV_QPS_INIT; + qpa.pkey_index = 0; + qpa.port_num = (uint8_t)(ib_physical_port); + qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE; + qpa.qkey = q_key; + + int mask = IBV_QP_STATE | IBV_QP_PORT; + switch (type) { + case IBV_QPT_RC: + mask |= IBV_QP_ACCESS_FLAGS; + mask |= IBV_QP_PKEY_INDEX; + break; + case IBV_QPT_UD: + mask |= IBV_QP_QKEY; + mask |= IBV_QP_PKEY_INDEX; + break; + case IBV_QPT_RAW_PACKET: + break; + default: + ceph_abort(); + } + + int ret = ibv_modify_qp(qp, &qpa, mask); + if (ret) { + ibv_destroy_qp(qp); + lderr(cct) << __func__ << " failed to transition to INIT state: " + << cpp_strerror(errno) << dendl; + return -1; + } + ldout(cct, 20) << __func__ << " successfully change queue pair to INIT:" + << " qp=" << qp << dendl; + return 0; +} + +/** + * Change RC QueuePair into the ERROR state. This is necessary modify + * the Queue Pair into the Error state and poll all of the relevant + * Work Completions prior to destroying a Queue Pair. + * Since destroying a Queue Pair does not guarantee that its Work + * Completions are removed from the CQ upon destruction. Even if the + * Work Completions are already in the CQ, it might not be possible to + * retrieve them. If the Queue Pair is associated with an SRQ, it is + * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED + * + * \return + * -errno if the QueuePair can't switch to ERROR + * 0 for success. + */ +int Infiniband::QueuePair::to_dead() +{ + if (dead) + return 0; + ibv_qp_attr qpa; + memset(&qpa, 0, sizeof(qpa)); + qpa.qp_state = IBV_QPS_ERR; + + int mask = IBV_QP_STATE; + int ret = ibv_modify_qp(qp, &qpa, mask); + if (ret) { + lderr(cct) << __func__ << " failed to transition to ERROR state: " + << cpp_strerror(errno) << dendl; + return -errno; + } + dead = true; + return ret; +} + +int Infiniband::post_chunk(Chunk* chunk) +{ + ibv_sge isge; + isge.addr = reinterpret_cast(chunk->buffer); + isge.length = chunk->bytes; + isge.lkey = chunk->mr->lkey; + ibv_recv_wr rx_work_request; + + memset(&rx_work_request, 0, sizeof(rx_work_request)); + rx_work_request.wr_id = reinterpret_cast(chunk);// stash descriptor ptr + rx_work_request.next = NULL; + rx_work_request.sg_list = &isge; + rx_work_request.num_sge = 1; + + ibv_recv_wr *badWorkRequest; + int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest); + if (ret) + return -1; + return 0; +} + +int Infiniband::post_channel_cluster() +{ + vector free_chunks; + int r = memory_manager->get_channel_buffers(free_chunks, 0); + assert(r > 0); + for (vector::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) { + r = post_chunk(*iter); + assert(r == 0); + } + return 0; +} + +Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c) +{ + Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this); + if (cc->init()) { + delete cc; + return NULL; + } + return cc; +} + +Infiniband::CompletionQueue* Infiniband::create_comp_queue( + CephContext *cct, CompletionChannel *cc) +{ + Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue( + cct, *this, CQ_DEPTH, cc); + if (cq->init()) { + delete cq; + return NULL; + } + return cq; +} + + +Infiniband::QueuePair::QueuePair( + CephContext *c, Infiniband& infiniband, ibv_qp_type type, + int port, ibv_srq *srq, + Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, + uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key) +: cct(c), infiniband(infiniband), + type(type), + ctxt(infiniband.device->ctxt), + ib_physical_port(port), + pd(infiniband.pd->pd), + srq(srq), + qp(NULL), + txcq(txcq), + rxcq(rxcq), + initial_psn(0), + max_send_wr(max_send_wr), + max_recv_wr(max_recv_wr), + q_key(q_key), + dead(false) +{ + initial_psn = lrand48() & 0xffffff; + if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) { + lderr(cct) << __func__ << "invalid queue pair type" << cpp_strerror(errno) << dendl; + ceph_abort(); + } + pd = infiniband.pd->pd; +} + +// 1 means no valid buffer read, 0 means got enough buffer +// else return < 0 means error +int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im) +{ + char msg[TCP_MSG_LEN]; + char gid[33]; + ssize_t r = ::read(sd, &msg, sizeof(msg)); + // Drop incoming qpt + if (cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (rand() % cct->_conf->ms_inject_socket_failures == 0) { + ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; + return -EINVAL; + } + } + if (r < 0) { + r = -errno; + lderr(cct) << __func__ << " got error " << errno << ": " + << cpp_strerror(errno) << dendl; + } else if (r == 0) { // valid disconnect message of length 0 + ldout(cct, 10) << __func__ << " got disconnect message " << dendl; + } else if ((size_t)r != sizeof(msg)) { // invalid message + ldout(cct, 1) << __func__ << " got bad length (" << r << "): " << cpp_strerror(errno) << dendl; + r = -EINVAL; + } else { // valid message + sscanf(msg, "%x:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid); + wire_gid_to_gid(gid, &(im.gid)); + ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl; + } + return r; +} + +int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im) +{ + int retry = 0; + ssize_t r; + + char msg[TCP_MSG_LEN]; + char gid[33]; +retry: + gid_to_wire_gid(&(im.gid), gid); + sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid); + ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn + << ", " << im.peer_qpn << ", " << gid << dendl; + r = ::write(sd, msg, sizeof(msg)); + // Drop incoming qpt + if (cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (rand() % cct->_conf->ms_inject_socket_failures == 0) { + ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; + return -EINVAL; + } + } + + if ((size_t)r != sizeof(msg)) { + // FIXME need to handle EAGAIN instead of retry + if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) { + retry++; + goto retry; + } + if (r < 0) + lderr(cct) << __func__ << " send returned error " << errno << ": " + << cpp_strerror(errno) << dendl; + else + lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl; + return -errno; + } + return 0; +} + +void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid) +{ + char tmp[9]; + uint32_t v32; + int i; + + for (tmp[8] = 0, i = 0; i < 4; ++i) { + memcpy(tmp, wgid + i * 8, 8); + sscanf(tmp, "%x", &v32); + *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32); + } +} + +void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[]) +{ + for (int i = 0; i < 4; ++i) + sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4))); +} + +Infiniband::QueuePair::~QueuePair() +{ + if (qp) + assert(!ibv_destroy_qp(qp)); +} + +Infiniband::CompletionChannel::~CompletionChannel() +{ + if (channel) { + int r = ibv_destroy_comp_channel(channel); + if (r < 0) + lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl; + assert(r == 0); + } +} + +Infiniband::CompletionQueue::~CompletionQueue() +{ + if (cq) { + int r = ibv_destroy_cq(cq); + if (r < 0) + lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl; + assert(r == 0); + } +} + +int Infiniband::CompletionQueue::rearm_notify(bool solicite_only) +{ + ldout(cct, 20) << __func__ << " started." << dendl; + int r = ibv_req_notify_cq(cq, 0); + if (r < 0) + lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl; + return r; +} + +int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) { + int r = ibv_poll_cq(cq, num_entries, ret_wc_array); + if (r < 0) { + lderr(cct) << __func__ << " poll_completion_queue occur met error: " + << cpp_strerror(errno) << dendl; + return -1; + } + return r; +} + +bool Infiniband::CompletionChannel::get_cq_event() +{ + ibv_cq *cq = NULL; + void *ev_ctx; + if (ibv_get_cq_event(channel, &cq, &ev_ctx)) { + if (errno != EAGAIN && errno != EINTR) + lderr(cct) << __func__ << " failed to retrieve CQ event: " + << cpp_strerror(errno) << dendl; + return false; + } + + /* accumulate number of cq events that need to + * * be acked, and periodically ack them + * */ + if (++cq_events_that_need_ack == MAX_ACK_EVENT) { + ldout(cct, 20) << __func__ << " ack aq events." << dendl; + ibv_ack_cq_events(cq, MAX_ACK_EVENT); + cq_events_that_need_ack = 0; + } + + return true; +} + +int Infiniband::CompletionQueue::init() +{ + cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0); + if (!cq) { + lderr(cct) << __func__ << " failed to create receive completion queue: " + << cpp_strerror(errno) << dendl; + return -1; + } + + if (ibv_req_notify_cq(cq, 0)) { + lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl; + ibv_destroy_cq(cq); + cq = nullptr; + return -1; + } + + channel->bind_cq(cq); + ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl; + return 0; +} + +int Infiniband::CompletionChannel::init() +{ + ldout(cct, 20) << __func__ << " started." << dendl; + channel = ibv_create_comp_channel(infiniband.device->ctxt); + if (!channel) { + lderr(cct) << __func__ << " failed to create receive completion channel: " + << cpp_strerror(errno) << dendl; + return -1; + } + int rc = NetHandler(cct).set_nonblock(channel->fd); + if (rc < 0) { + ibv_destroy_comp_channel(channel); + return -1; + } + return 0; +} + +/** + * Given a string representation of the `status' field from Verbs + * struct `ibv_wc'. + * + * \param[in] status + * The integer status obtained in ibv_wc.status. + * \return + * A string corresponding to the given status. + */ +const char* Infiniband::wc_status_to_string(int status) +{ + static const char *lookup[] = { + "SUCCESS", + "LOC_LEN_ERR", + "LOC_QP_OP_ERR", + "LOC_EEC_OP_ERR", + "LOC_PROT_ERR", + "WR_FLUSH_ERR", + "MW_BIND_ERR", + "BAD_RESP_ERR", + "LOC_ACCESS_ERR", + "REM_INV_REQ_ERR", + "REM_ACCESS_ERR", + "REM_OP_ERR", + "RETRY_EXC_ERR", + "RNR_RETRY_EXC_ERR", + "LOC_RDD_VIOL_ERR", + "REM_INV_RD_REQ_ERR", + "REM_ABORT_ERR", + "INV_EECN_ERR", + "INV_EEC_STATE_ERR", + "FATAL_ERR", + "RESP_TIMEOUT_ERR", + "GENERAL_ERR" + }; + + if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR) + return ""; + return lookup[status]; +} + +const char* Infiniband::qp_state_string(int status) { + switch(status) { + case IBV_QPS_RESET : return "IBV_QPS_RESET"; + case IBV_QPS_INIT : return "IBV_QPS_INIT"; + case IBV_QPS_RTR : return "IBV_QPS_RTR"; + case IBV_QPS_RTS : return "IBV_QPS_RTS"; + case IBV_QPS_SQD : return "IBV_QPS_SQD"; + case IBV_QPS_SQE : return "IBV_QPS_SQE"; + case IBV_QPS_ERR : return "IBV_QPS_ERR"; + default: return " out of range."; + } +} diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index b92f1b19d650..fdee49e24d0d 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -1,607 +1,607 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 XSKY - * - * Author: Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_INFINIBAND_H -#define CEPH_INFINIBAND_H - -#include - -#include -#include - -#include "include/int_types.h" -#include "include/page.h" -#include "common/debug.h" -#include "common/errno.h" -#include "msg/msg_types.h" -#include "msg/async/net_handler.h" -#include "common/Mutex.h" - -#define HUGE_PAGE_SIZE (2 * 1024 * 1024) -#define ALIGN_TO_PAGE_SIZE(x) \ - (((x) + HUGE_PAGE_SIZE -1) / HUGE_PAGE_SIZE * HUGE_PAGE_SIZE) - -struct IBSYNMsg { - uint16_t lid; - uint32_t qpn; - uint32_t psn; - uint32_t peer_qpn; - union ibv_gid gid; -} __attribute__((packed)); - -class RDMAStack; -class CephContext; - -class Port { - struct ibv_context* ctxt; - uint8_t port_num; - struct ibv_port_attr* port_attr; - uint16_t lid; - int gid_idx; - union ibv_gid gid; - - public: - explicit Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn); - uint16_t get_lid() { return lid; } - ibv_gid get_gid() { return gid; } - uint8_t get_port_num() { return port_num; } - ibv_port_attr* get_port_attr() { return port_attr; } - int get_gid_idx() { return gid_idx; } -}; - - -class Device { - ibv_device *device; - const char* name; - uint8_t port_cnt; - Port** ports; - public: - explicit Device(CephContext *c, ibv_device* d); - ~Device() { - for (uint8_t i = 0; i < port_cnt; ++i) - delete ports[i]; - delete []ports; - assert(ibv_close_device(ctxt) == 0); - } - const char* get_name() { return name;} - uint16_t get_lid() { return active_port->get_lid(); } - ibv_gid get_gid() { return active_port->get_gid(); } - int get_gid_idx() { return active_port->get_gid_idx(); } - void binding_port(CephContext *c, uint8_t port_num); - struct ibv_context *ctxt; - ibv_device_attr *device_attr; - Port* active_port; -}; - - -class DeviceList { - struct ibv_device ** device_list; - int num; - Device** devices; - public: - DeviceList(CephContext *cct): device_list(ibv_get_device_list(&num)) { - if (device_list == NULL || num == 0) { - lderr(cct) << __func__ << " failed to get rdma device list. " << cpp_strerror(errno) << dendl; - ceph_abort(); - } - devices = new Device*[num]; - - for (int i = 0;i < num; ++i) { - devices[i] = new Device(cct, device_list[i]); - } - } - ~DeviceList() { - for (int i=0; i < num; ++i) { - delete devices[i]; - } - delete []devices; - ibv_free_device_list(device_list); - } - - Device* get_device(const char* device_name) { - assert(devices); - for (int i = 0; i < num; ++i) { - if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) { - return devices[i]; - } - } - return NULL; - } -}; - - -class Infiniband { - public: - class ProtectionDomain { - public: - explicit ProtectionDomain(CephContext *cct, Device *device) - : pd(ibv_alloc_pd(device->ctxt)) - { - if (pd == NULL) { - lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl; - ceph_abort(); - } - } - ~ProtectionDomain() { - int rc = ibv_dealloc_pd(pd); - assert(rc == 0); - } - ibv_pd* const pd; - }; - - - class MemoryManager { - public: - class Chunk { - public: - Chunk(char* b, uint32_t len, ibv_mr* m) : buffer(b), bytes(len), offset(0), mr(m) {} - ~Chunk() { - assert(ibv_dereg_mr(mr) == 0); - } - - void set_offset(uint32_t o) { - offset = o; - } - - uint32_t get_offset() { - return offset; - } - - void set_bound(uint32_t b) { - bound = b; - } - - void prepare_read(uint32_t b) { - offset = 0; - bound = b; - } - - uint32_t get_bound() { - return bound; - } - - uint32_t read(char* buf, uint32_t len) { - uint32_t left = bound - offset; - if (left >= len) { - memcpy(buf, buffer+offset, len); - offset += len; - return len; - } else { - memcpy(buf, buffer+offset, left); - offset = 0; - bound = 0; - return left; - } - } - - uint32_t write(char* buf, uint32_t len) { - uint32_t left = bytes - offset; - if (left >= len) { - memcpy(buffer+offset, buf, len); - offset += len; - return len; - } else { - memcpy(buffer+offset, buf, left); - offset = bytes; - return left; - } - } - - bool full() { - return offset == bytes; - } - - bool over() { - return offset == bound; - } - - void clear() { - offset = 0; - bound = 0; - } - - void post_srq(Infiniband *ib) { - ib->post_chunk(this); - } - - void set_owner(uint64_t o) { - owner = o; - } - - uint64_t get_owner() { - return owner; - } - - public: - char* buffer; - uint32_t bytes; - uint32_t bound; - uint32_t offset; - ibv_mr* mr; - uint64_t owner; - }; - - class Cluster { - public: - Cluster(MemoryManager& m, uint32_t s) : manager(m), chunk_size(s), lock("cluster_lock"){} - Cluster(MemoryManager& m, uint32_t s, uint32_t n) : manager(m), chunk_size(s), lock("cluster_lock"){ - add(n); - } - - ~Cluster() { - set::iterator c = all_chunks.begin(); - while(c != all_chunks.end()) { - delete *c; - ++c; - } - if (manager.enabled_huge_page) - manager.free_huge_pages(base); - else - delete base; - } - int add(uint32_t num) { - uint32_t bytes = chunk_size * num; - //cihar* base = (char*)malloc(bytes); - if (manager.enabled_huge_page) { - base = (char*)manager.malloc_huge_pages(bytes); - } else { - base = (char*)memalign(CEPH_PAGE_SIZE, bytes); - } - assert(base); - for (uint32_t offset = 0; offset < bytes; offset += chunk_size){ - ibv_mr* m = ibv_reg_mr(manager.pd->pd, base+offset, chunk_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE); - assert(m); - Chunk* c = new Chunk(base+offset,chunk_size,m); - free_chunks.push_back(c); - all_chunks.insert(c); - } - return 0; - } - - void take_back(Chunk* ck) { - Mutex::Locker l(lock); - free_chunks.push_back(ck); - } - - int get_buffers(std::vector &chunks, size_t bytes) { - uint32_t num = bytes / chunk_size + 1; - if (bytes % chunk_size == 0) - --num; - int r = num; - Mutex::Locker l(lock); - if (free_chunks.empty()) - return 0; - if (!bytes) { - free_chunks.swap(chunks); - r = chunks.size(); - return r; - } - if (free_chunks.size() < num) { - num = free_chunks.size(); - r = num; - } - for (uint32_t i = 0; i < num; ++i) { - chunks.push_back(free_chunks.back()); - free_chunks.pop_back(); - } - return r; - } - MemoryManager& manager; - uint32_t chunk_size; - Mutex lock; - std::vector free_chunks; - std::set all_chunks; - char* base; - }; - - MemoryManager(Device *d, ProtectionDomain *p, bool hugepage): device(d), pd(p) { - enabled_huge_page = hugepage; - } - ~MemoryManager() { - if (channel) - delete channel; - if (send) - delete send; - } - void* malloc_huge_pages(size_t size) { - size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE); - char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0); - if (ptr == MAP_FAILED) { - ptr = (char *)malloc(real_size); - if (ptr == NULL) return NULL; - real_size = 0; - } - *((size_t *)ptr) = real_size; - return ptr + HUGE_PAGE_SIZE; - } - void free_huge_pages(void *ptr) { - if (ptr == NULL) return; - void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE; - size_t real_size = *((size_t *)real_ptr); - assert(real_size % HUGE_PAGE_SIZE == 0); - if (real_size != 0) - munmap(real_ptr, real_size); - else - free(real_ptr); - } - void register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num) { - assert(device); - assert(pd); - channel = new Cluster(*this, size); - channel->add(rx_num); - - send = new Cluster(*this, size); - send->add(tx_num); - } - void return_tx(std::vector &chunks) { - for (auto c : chunks) { - c->clear(); - send->take_back(c); - } - } - - int get_send_buffers(std::vector &c, size_t bytes) { - return send->get_buffers(c, bytes); - } - - int get_channel_buffers(std::vector &chunks, size_t bytes) { - return channel->get_buffers(chunks, bytes); - } - - int is_tx_chunk(Chunk* c) { return send->all_chunks.count(c);} - int is_rx_chunk(Chunk* c) { return channel->all_chunks.count(c);} - bool enabled_huge_page; - private: - Cluster* channel;//RECV - Cluster* send;// SEND - Device *device; - ProtectionDomain *pd; - }; - - private: - uint32_t max_send_wr; - uint32_t max_recv_wr; - uint32_t max_sge; - uint8_t ib_physical_port; - MemoryManager* memory_manager; - ibv_srq* srq; // shared receive work queue - Device *device; - ProtectionDomain *pd; - DeviceList device_list; - void wire_gid_to_gid(const char *wgid, union ibv_gid *gid); - void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]); - - public: - explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p); - - /** - * Destroy an Infiniband object. - */ - ~Infiniband() { - assert(ibv_destroy_srq(srq) == 0); - delete memory_manager; - delete pd; - } - - class CompletionChannel { - static const uint32_t MAX_ACK_EVENT = 5000; - CephContext *cct; - Infiniband& infiniband; - ibv_comp_channel *channel; - ibv_cq *cq; - uint32_t cq_events_that_need_ack; - - public: - CompletionChannel(CephContext *c, Infiniband &ib) - : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) {} - ~CompletionChannel(); - int init(); - bool get_cq_event(); - int get_fd() { return channel->fd; } - ibv_comp_channel* get_channel() { return channel; } - void bind_cq(ibv_cq *c) { cq = c; } - void ack_events() { - ibv_ack_cq_events(cq, cq_events_that_need_ack); - cq_events_that_need_ack = 0; - } - }; - - // this class encapsulates the creation, use, and destruction of an RC - // completion queue. - // - // You need to call init and it will create a cq and associate to comp channel - class CompletionQueue { - public: - CompletionQueue(CephContext *c, Infiniband &ib, - const uint32_t qd, CompletionChannel *cc) - : cct(c), infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {} - ~CompletionQueue(); - int init(); - int poll_cq(int num_entries, ibv_wc *ret_wc_array); - - ibv_cq* get_cq() const { return cq; } - int rearm_notify(bool solicited_only=true); - CompletionChannel* get_cc() const { return channel; } - private: - CephContext *cct; - Infiniband& infiniband; // Infiniband to which this QP belongs - CompletionChannel *channel; - ibv_cq *cq; - uint32_t queue_depth; - }; - - // this class encapsulates the creation, use, and destruction of an RC - // queue pair. - // - // you need call init and it will create a qp and bring it to the INIT state. - // after obtaining the lid, qpn, and psn of a remote queue pair, one - // must call plumb() to bring the queue pair to the RTS state. - class QueuePair { - public: - QueuePair(CephContext *c, Infiniband& infiniband, ibv_qp_type type, - int ib_physical_port, ibv_srq *srq, - Infiniband::CompletionQueue* txcq, - Infiniband::CompletionQueue* rxcq, - uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0); - ~QueuePair(); - - int init(); - - /** - * Get the initial packet sequence number for this QueuePair. - * This is randomly generated on creation. It should not be confused - * with the remote side's PSN, which is set in #plumb(). - */ - uint32_t get_initial_psn() const { return initial_psn; }; - /** - * Get the local queue pair number for this QueuePair. - * QPNs are analogous to UDP/TCP port numbers. - */ - uint32_t get_local_qp_number() const { return qp->qp_num; }; - /** - * Get the remote queue pair number for this QueuePair, as set in #plumb(). - * QPNs are analogous to UDP/TCP port numbers. - */ - int get_remote_qp_number(uint32_t *rqp) const { - ibv_qp_attr qpa; - ibv_qp_init_attr qpia; - - int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia); - if (r) { - lderr(cct) << __func__ << " failed to query qp: " - << cpp_strerror(errno) << dendl; - return -1; - } - - if (rqp) - *rqp = qpa.dest_qp_num; - return 0; - } - /** - * Get the remote infiniband address for this QueuePair, as set in #plumb(). - * LIDs are "local IDs" in infiniband terminology. They are short, locally - * routable addresses. - */ - int get_remote_lid(uint16_t *lid) const { - ibv_qp_attr qpa; - ibv_qp_init_attr qpia; - - int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia); - if (r) { - lderr(cct) << __func__ << " failed to query qp: " - << cpp_strerror(errno) << dendl; - return -1; - } - - if (lid) - *lid = qpa.ah_attr.dlid; - return 0; - } - /** - * Get the state of a QueuePair. - */ - int get_state() const { - ibv_qp_attr qpa; - ibv_qp_init_attr qpia; - - int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia); - if (r) { - lderr(cct) << __func__ << " failed to get state: " - << cpp_strerror(errno) << dendl; - return -1; - } - return qpa.qp_state; - } - /** - * Return true if the queue pair is in an error state, false otherwise. - */ - bool is_error() const { - ibv_qp_attr qpa; - ibv_qp_init_attr qpia; - - int r = ibv_query_qp(qp, &qpa, -1, &qpia); - if (r) { - lderr(cct) << __func__ << " failed to get state: " - << cpp_strerror(errno) << dendl; - return true; - } - return qpa.cur_qp_state == IBV_QPS_ERR; - } - ibv_qp* get_qp() const { return qp; } - Infiniband::CompletionQueue* get_tx_cq() const { return txcq; } - Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; } - int to_dead(); - bool is_dead() const { return dead; } - - private: - CephContext *cct; - Infiniband& infiniband; // Infiniband to which this QP belongs - ibv_qp_type type; // QP type (IBV_QPT_RC, etc.) - ibv_context* ctxt; // device context of the HCA to use - int ib_physical_port; - ibv_pd* pd; // protection domain - ibv_srq* srq; // shared receive queue - ibv_qp* qp; // infiniband verbs QP handle - Infiniband::CompletionQueue* txcq; - Infiniband::CompletionQueue* rxcq; - uint32_t initial_psn; // initial packet sequence number - uint32_t max_send_wr; - uint32_t max_recv_wr; - uint32_t q_key; - bool dead; - }; - - public: - typedef MemoryManager::Cluster Cluster; - typedef MemoryManager::Chunk Chunk; - QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type); - ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge); - int post_chunk(Chunk* chunk); - int post_channel_cluster(); - int get_tx_buffers(std::vector &c, size_t bytes) { - return memory_manager->get_send_buffers(c, bytes); - } - CompletionChannel *create_comp_channel(CephContext *c); - CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL); - uint8_t get_ib_physical_port() { - return ib_physical_port; - } - int send_msg(CephContext *cct, int sd, IBSYNMsg& msg); - int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg); - uint16_t get_lid() { return device->get_lid(); } - ibv_gid get_gid() { return device->get_gid(); } - MemoryManager* get_memory_manager() { return memory_manager; } - Device* get_device() { return device; } - int get_async_fd() { return device->ctxt->async_fd; } - int recall_chunk(Chunk* c) { - if (memory_manager->is_rx_chunk(c)) { - post_chunk(c); - return 1; - } else if (memory_manager->is_tx_chunk(c)) { - vector v; - v.push_back(c); - memory_manager->return_tx(v); - return 2; - } - return -1; - } - int is_tx_chunk(Chunk* c) { return memory_manager->is_tx_chunk(c); } - int is_rx_chunk(Chunk* c) { return memory_manager->is_rx_chunk(c); } - static const char* wc_status_to_string(int status); - static const char* qp_state_string(int status); -}; - -#endif +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_INFINIBAND_H +#define CEPH_INFINIBAND_H + +#include + +#include +#include + +#include "include/int_types.h" +#include "include/page.h" +#include "common/debug.h" +#include "common/errno.h" +#include "msg/msg_types.h" +#include "msg/async/net_handler.h" +#include "common/Mutex.h" + +#define HUGE_PAGE_SIZE (2 * 1024 * 1024) +#define ALIGN_TO_PAGE_SIZE(x) \ + (((x) + HUGE_PAGE_SIZE -1) / HUGE_PAGE_SIZE * HUGE_PAGE_SIZE) + +struct IBSYNMsg { + uint16_t lid; + uint32_t qpn; + uint32_t psn; + uint32_t peer_qpn; + union ibv_gid gid; +} __attribute__((packed)); + +class RDMAStack; +class CephContext; + +class Port { + struct ibv_context* ctxt; + uint8_t port_num; + struct ibv_port_attr* port_attr; + uint16_t lid; + int gid_idx; + union ibv_gid gid; + + public: + explicit Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn); + uint16_t get_lid() { return lid; } + ibv_gid get_gid() { return gid; } + uint8_t get_port_num() { return port_num; } + ibv_port_attr* get_port_attr() { return port_attr; } + int get_gid_idx() { return gid_idx; } +}; + + +class Device { + ibv_device *device; + const char* name; + uint8_t port_cnt; + Port** ports; + public: + explicit Device(CephContext *c, ibv_device* d); + ~Device() { + for (uint8_t i = 0; i < port_cnt; ++i) + delete ports[i]; + delete []ports; + assert(ibv_close_device(ctxt) == 0); + } + const char* get_name() { return name;} + uint16_t get_lid() { return active_port->get_lid(); } + ibv_gid get_gid() { return active_port->get_gid(); } + int get_gid_idx() { return active_port->get_gid_idx(); } + void binding_port(CephContext *c, uint8_t port_num); + struct ibv_context *ctxt; + ibv_device_attr *device_attr; + Port* active_port; +}; + + +class DeviceList { + struct ibv_device ** device_list; + int num; + Device** devices; + public: + DeviceList(CephContext *cct): device_list(ibv_get_device_list(&num)) { + if (device_list == NULL || num == 0) { + lderr(cct) << __func__ << " failed to get rdma device list. " << cpp_strerror(errno) << dendl; + ceph_abort(); + } + devices = new Device*[num]; + + for (int i = 0;i < num; ++i) { + devices[i] = new Device(cct, device_list[i]); + } + } + ~DeviceList() { + for (int i=0; i < num; ++i) { + delete devices[i]; + } + delete []devices; + ibv_free_device_list(device_list); + } + + Device* get_device(const char* device_name) { + assert(devices); + for (int i = 0; i < num; ++i) { + if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) { + return devices[i]; + } + } + return NULL; + } +}; + + +class Infiniband { + public: + class ProtectionDomain { + public: + explicit ProtectionDomain(CephContext *cct, Device *device) + : pd(ibv_alloc_pd(device->ctxt)) + { + if (pd == NULL) { + lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl; + ceph_abort(); + } + } + ~ProtectionDomain() { + int rc = ibv_dealloc_pd(pd); + assert(rc == 0); + } + ibv_pd* const pd; + }; + + + class MemoryManager { + public: + class Chunk { + public: + Chunk(char* b, uint32_t len, ibv_mr* m) : buffer(b), bytes(len), offset(0), mr(m) {} + ~Chunk() { + assert(ibv_dereg_mr(mr) == 0); + } + + void set_offset(uint32_t o) { + offset = o; + } + + uint32_t get_offset() { + return offset; + } + + void set_bound(uint32_t b) { + bound = b; + } + + void prepare_read(uint32_t b) { + offset = 0; + bound = b; + } + + uint32_t get_bound() { + return bound; + } + + uint32_t read(char* buf, uint32_t len) { + uint32_t left = bound - offset; + if (left >= len) { + memcpy(buf, buffer+offset, len); + offset += len; + return len; + } else { + memcpy(buf, buffer+offset, left); + offset = 0; + bound = 0; + return left; + } + } + + uint32_t write(char* buf, uint32_t len) { + uint32_t left = bytes - offset; + if (left >= len) { + memcpy(buffer+offset, buf, len); + offset += len; + return len; + } else { + memcpy(buffer+offset, buf, left); + offset = bytes; + return left; + } + } + + bool full() { + return offset == bytes; + } + + bool over() { + return offset == bound; + } + + void clear() { + offset = 0; + bound = 0; + } + + void post_srq(Infiniband *ib) { + ib->post_chunk(this); + } + + void set_owner(uint64_t o) { + owner = o; + } + + uint64_t get_owner() { + return owner; + } + + public: + char* buffer; + uint32_t bytes; + uint32_t bound; + uint32_t offset; + ibv_mr* mr; + uint64_t owner; + }; + + class Cluster { + public: + Cluster(MemoryManager& m, uint32_t s) : manager(m), chunk_size(s), lock("cluster_lock"){} + Cluster(MemoryManager& m, uint32_t s, uint32_t n) : manager(m), chunk_size(s), lock("cluster_lock"){ + add(n); + } + + ~Cluster() { + set::iterator c = all_chunks.begin(); + while(c != all_chunks.end()) { + delete *c; + ++c; + } + if (manager.enabled_huge_page) + manager.free_huge_pages(base); + else + delete base; + } + int add(uint32_t num) { + uint32_t bytes = chunk_size * num; + //cihar* base = (char*)malloc(bytes); + if (manager.enabled_huge_page) { + base = (char*)manager.malloc_huge_pages(bytes); + } else { + base = (char*)memalign(CEPH_PAGE_SIZE, bytes); + } + assert(base); + for (uint32_t offset = 0; offset < bytes; offset += chunk_size){ + ibv_mr* m = ibv_reg_mr(manager.pd->pd, base+offset, chunk_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE); + assert(m); + Chunk* c = new Chunk(base+offset,chunk_size,m); + free_chunks.push_back(c); + all_chunks.insert(c); + } + return 0; + } + + void take_back(Chunk* ck) { + Mutex::Locker l(lock); + free_chunks.push_back(ck); + } + + int get_buffers(std::vector &chunks, size_t bytes) { + uint32_t num = bytes / chunk_size + 1; + if (bytes % chunk_size == 0) + --num; + int r = num; + Mutex::Locker l(lock); + if (free_chunks.empty()) + return 0; + if (!bytes) { + free_chunks.swap(chunks); + r = chunks.size(); + return r; + } + if (free_chunks.size() < num) { + num = free_chunks.size(); + r = num; + } + for (uint32_t i = 0; i < num; ++i) { + chunks.push_back(free_chunks.back()); + free_chunks.pop_back(); + } + return r; + } + MemoryManager& manager; + uint32_t chunk_size; + Mutex lock; + std::vector free_chunks; + std::set all_chunks; + char* base; + }; + + MemoryManager(Device *d, ProtectionDomain *p, bool hugepage): device(d), pd(p) { + enabled_huge_page = hugepage; + } + ~MemoryManager() { + if (channel) + delete channel; + if (send) + delete send; + } + void* malloc_huge_pages(size_t size) { + size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE); + char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0); + if (ptr == MAP_FAILED) { + ptr = (char *)malloc(real_size); + if (ptr == NULL) return NULL; + real_size = 0; + } + *((size_t *)ptr) = real_size; + return ptr + HUGE_PAGE_SIZE; + } + void free_huge_pages(void *ptr) { + if (ptr == NULL) return; + void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE; + size_t real_size = *((size_t *)real_ptr); + assert(real_size % HUGE_PAGE_SIZE == 0); + if (real_size != 0) + munmap(real_ptr, real_size); + else + free(real_ptr); + } + void register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num) { + assert(device); + assert(pd); + channel = new Cluster(*this, size); + channel->add(rx_num); + + send = new Cluster(*this, size); + send->add(tx_num); + } + void return_tx(std::vector &chunks) { + for (auto c : chunks) { + c->clear(); + send->take_back(c); + } + } + + int get_send_buffers(std::vector &c, size_t bytes) { + return send->get_buffers(c, bytes); + } + + int get_channel_buffers(std::vector &chunks, size_t bytes) { + return channel->get_buffers(chunks, bytes); + } + + int is_tx_chunk(Chunk* c) { return send->all_chunks.count(c);} + int is_rx_chunk(Chunk* c) { return channel->all_chunks.count(c);} + bool enabled_huge_page; + private: + Cluster* channel;//RECV + Cluster* send;// SEND + Device *device; + ProtectionDomain *pd; + }; + + private: + uint32_t max_send_wr; + uint32_t max_recv_wr; + uint32_t max_sge; + uint8_t ib_physical_port; + MemoryManager* memory_manager; + ibv_srq* srq; // shared receive work queue + Device *device; + ProtectionDomain *pd; + DeviceList device_list; + void wire_gid_to_gid(const char *wgid, union ibv_gid *gid); + void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]); + + public: + explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p); + + /** + * Destroy an Infiniband object. + */ + ~Infiniband() { + assert(ibv_destroy_srq(srq) == 0); + delete memory_manager; + delete pd; + } + + class CompletionChannel { + static const uint32_t MAX_ACK_EVENT = 5000; + CephContext *cct; + Infiniband& infiniband; + ibv_comp_channel *channel; + ibv_cq *cq; + uint32_t cq_events_that_need_ack; + + public: + CompletionChannel(CephContext *c, Infiniband &ib) + : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) {} + ~CompletionChannel(); + int init(); + bool get_cq_event(); + int get_fd() { return channel->fd; } + ibv_comp_channel* get_channel() { return channel; } + void bind_cq(ibv_cq *c) { cq = c; } + void ack_events() { + ibv_ack_cq_events(cq, cq_events_that_need_ack); + cq_events_that_need_ack = 0; + } + }; + + // this class encapsulates the creation, use, and destruction of an RC + // completion queue. + // + // You need to call init and it will create a cq and associate to comp channel + class CompletionQueue { + public: + CompletionQueue(CephContext *c, Infiniband &ib, + const uint32_t qd, CompletionChannel *cc) + : cct(c), infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {} + ~CompletionQueue(); + int init(); + int poll_cq(int num_entries, ibv_wc *ret_wc_array); + + ibv_cq* get_cq() const { return cq; } + int rearm_notify(bool solicited_only=true); + CompletionChannel* get_cc() const { return channel; } + private: + CephContext *cct; + Infiniband& infiniband; // Infiniband to which this QP belongs + CompletionChannel *channel; + ibv_cq *cq; + uint32_t queue_depth; + }; + + // this class encapsulates the creation, use, and destruction of an RC + // queue pair. + // + // you need call init and it will create a qp and bring it to the INIT state. + // after obtaining the lid, qpn, and psn of a remote queue pair, one + // must call plumb() to bring the queue pair to the RTS state. + class QueuePair { + public: + QueuePair(CephContext *c, Infiniband& infiniband, ibv_qp_type type, + int ib_physical_port, ibv_srq *srq, + Infiniband::CompletionQueue* txcq, + Infiniband::CompletionQueue* rxcq, + uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0); + ~QueuePair(); + + int init(); + + /** + * Get the initial packet sequence number for this QueuePair. + * This is randomly generated on creation. It should not be confused + * with the remote side's PSN, which is set in #plumb(). + */ + uint32_t get_initial_psn() const { return initial_psn; }; + /** + * Get the local queue pair number for this QueuePair. + * QPNs are analogous to UDP/TCP port numbers. + */ + uint32_t get_local_qp_number() const { return qp->qp_num; }; + /** + * Get the remote queue pair number for this QueuePair, as set in #plumb(). + * QPNs are analogous to UDP/TCP port numbers. + */ + int get_remote_qp_number(uint32_t *rqp) const { + ibv_qp_attr qpa; + ibv_qp_init_attr qpia; + + int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia); + if (r) { + lderr(cct) << __func__ << " failed to query qp: " + << cpp_strerror(errno) << dendl; + return -1; + } + + if (rqp) + *rqp = qpa.dest_qp_num; + return 0; + } + /** + * Get the remote infiniband address for this QueuePair, as set in #plumb(). + * LIDs are "local IDs" in infiniband terminology. They are short, locally + * routable addresses. + */ + int get_remote_lid(uint16_t *lid) const { + ibv_qp_attr qpa; + ibv_qp_init_attr qpia; + + int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia); + if (r) { + lderr(cct) << __func__ << " failed to query qp: " + << cpp_strerror(errno) << dendl; + return -1; + } + + if (lid) + *lid = qpa.ah_attr.dlid; + return 0; + } + /** + * Get the state of a QueuePair. + */ + int get_state() const { + ibv_qp_attr qpa; + ibv_qp_init_attr qpia; + + int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia); + if (r) { + lderr(cct) << __func__ << " failed to get state: " + << cpp_strerror(errno) << dendl; + return -1; + } + return qpa.qp_state; + } + /** + * Return true if the queue pair is in an error state, false otherwise. + */ + bool is_error() const { + ibv_qp_attr qpa; + ibv_qp_init_attr qpia; + + int r = ibv_query_qp(qp, &qpa, -1, &qpia); + if (r) { + lderr(cct) << __func__ << " failed to get state: " + << cpp_strerror(errno) << dendl; + return true; + } + return qpa.cur_qp_state == IBV_QPS_ERR; + } + ibv_qp* get_qp() const { return qp; } + Infiniband::CompletionQueue* get_tx_cq() const { return txcq; } + Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; } + int to_dead(); + bool is_dead() const { return dead; } + + private: + CephContext *cct; + Infiniband& infiniband; // Infiniband to which this QP belongs + ibv_qp_type type; // QP type (IBV_QPT_RC, etc.) + ibv_context* ctxt; // device context of the HCA to use + int ib_physical_port; + ibv_pd* pd; // protection domain + ibv_srq* srq; // shared receive queue + ibv_qp* qp; // infiniband verbs QP handle + Infiniband::CompletionQueue* txcq; + Infiniband::CompletionQueue* rxcq; + uint32_t initial_psn; // initial packet sequence number + uint32_t max_send_wr; + uint32_t max_recv_wr; + uint32_t q_key; + bool dead; + }; + + public: + typedef MemoryManager::Cluster Cluster; + typedef MemoryManager::Chunk Chunk; + QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type); + ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge); + int post_chunk(Chunk* chunk); + int post_channel_cluster(); + int get_tx_buffers(std::vector &c, size_t bytes) { + return memory_manager->get_send_buffers(c, bytes); + } + CompletionChannel *create_comp_channel(CephContext *c); + CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL); + uint8_t get_ib_physical_port() { + return ib_physical_port; + } + int send_msg(CephContext *cct, int sd, IBSYNMsg& msg); + int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg); + uint16_t get_lid() { return device->get_lid(); } + ibv_gid get_gid() { return device->get_gid(); } + MemoryManager* get_memory_manager() { return memory_manager; } + Device* get_device() { return device; } + int get_async_fd() { return device->ctxt->async_fd; } + int recall_chunk(Chunk* c) { + if (memory_manager->is_rx_chunk(c)) { + post_chunk(c); + return 1; + } else if (memory_manager->is_tx_chunk(c)) { + vector v; + v.push_back(c); + memory_manager->return_tx(v); + return 2; + } + return -1; + } + int is_tx_chunk(Chunk* c) { return memory_manager->is_tx_chunk(c); } + int is_rx_chunk(Chunk* c) { return memory_manager->is_rx_chunk(c); } + static const char* wc_status_to_string(int status); + static const char* qp_state_string(int status); +}; + +#endif diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index b214147f6369..1f97c3e21dd8 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -1,469 +1,469 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 XSKY - * - * Author: Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "RDMAStack.h" - -#define dout_subsys ceph_subsys_ms -#undef dout_prefix -#define dout_prefix *_dout << " RDMAConnectedSocketImpl " - -int RDMAConnectedSocketImpl::activate() -{ - ibv_qp_attr qpa; - int r; - - // now connect up the qps and switch to RTR - memset(&qpa, 0, sizeof(qpa)); - qpa.qp_state = IBV_QPS_RTR; - qpa.path_mtu = IBV_MTU_1024; - qpa.dest_qp_num = peer_msg.qpn; - qpa.rq_psn = peer_msg.psn; - qpa.max_dest_rd_atomic = 1; - qpa.min_rnr_timer = 12; - //qpa.ah_attr.is_global = 0; - qpa.ah_attr.is_global = 1; - qpa.ah_attr.grh.hop_limit = 6; - qpa.ah_attr.grh.dgid = peer_msg.gid; - - qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx(); - - qpa.ah_attr.dlid = peer_msg.lid; - qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl; - qpa.ah_attr.src_path_bits = 0; - qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port()); - - ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl; - - r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | - IBV_QP_AV | - IBV_QP_PATH_MTU | - IBV_QP_DEST_QPN | - IBV_QP_RQ_PSN | - IBV_QP_MIN_RNR_TIMER | - IBV_QP_MAX_DEST_RD_ATOMIC); - if (r) { - lderr(cct) << __func__ << " failed to transition to RTR state: " - << cpp_strerror(errno) << dendl; - return -1; - } - - ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl; - - // now move to RTS - qpa.qp_state = IBV_QPS_RTS; - - // How long to wait before retrying if packet lost or server dead. - // Supposedly the timeout is 4.096us*2^timeout. However, the actual - // timeout appears to be 4.096us*2^(timeout+1), so the setting - // below creates a 135ms timeout. - qpa.timeout = 14; - - // How many times to retry after timeouts before giving up. - qpa.retry_cnt = 7; - - // How many times to retry after RNR (receiver not ready) condition - // before giving up. Occurs when the remote side has not yet posted - // a receive request. - qpa.rnr_retry = 7; // 7 is infinite retry. - qpa.sq_psn = my_msg.psn; - qpa.max_rd_atomic = 1; - - r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | - IBV_QP_TIMEOUT | - IBV_QP_RETRY_CNT | - IBV_QP_RNR_RETRY | - IBV_QP_SQ_PSN | - IBV_QP_MAX_QP_RD_ATOMIC); - if (r) { - lderr(cct) << __func__ << " failed to transition to RTS state: " - << cpp_strerror(errno) << dendl; - return -1; - } - - // the queue pair should be ready to use once the client has finished - // setting up their end. - ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl; - ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl; - - if (!is_server) { - connected = 1; //indicate successfully - ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl; - submit(false); - } - active = true; - - return 0; -} - -int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) { - ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:" - << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl; - NetHandler net(cct); - tcp_fd = net.connect(peer_addr); - - if (tcp_fd < 0) { - return -errno; - } - net.set_close_on_exec(tcp_fd); - - int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size); - if (r < 0) { - ::close(tcp_fd); - return -errno; - } - - ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl; - net.set_priority(tcp_fd, opts.priority); - my_msg.peer_qpn = 0; - r = infiniband->send_msg(cct, tcp_fd, my_msg); - if (r < 0) - return r; - - worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler); - return 0; -} - -void RDMAConnectedSocketImpl::handle_connection() { - ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl; - int r = infiniband->recv_msg(cct, tcp_fd, peer_msg); - if (r < 0) { - if (r != -EAGAIN) - fault(); - return; - } - - if (!is_server) {// syn + ack from server - my_msg.peer_qpn = peer_msg.qpn; - ldout(cct, 20) << __func__ << " peer msg : < " << peer_msg.qpn << ", " << peer_msg.psn - << ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl; - if (!connected) { - r = activate(); - assert(!r); - } - notify(); - r = infiniband->send_msg(cct, tcp_fd, my_msg); - if (r < 0) { - ldout(cct, 1) << __func__ << " send client ack failed." << dendl; - fault(); - } - } else { - if (peer_msg.peer_qpn == 0) {// syn from client - if (active) { - ldout(cct, 10) << __func__ << " server is already active." << dendl; - return ; - } - r = infiniband->send_msg(cct, tcp_fd, my_msg); - if (r < 0) { - ldout(cct, 1) << __func__ << " server ack failed." << dendl; - fault(); - return ; - } - r = activate(); - assert(!r); - } else { // ack from client - connected = 1; - cleanup(); - submit(false); - notify(); - } - } -} - -ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) -{ - uint64_t i = 0; - int r = ::read(notify_fd, &i, sizeof(i)); - ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl; - if (error) - return -error; - ssize_t read = 0; - if (!buffers.empty()) - read = read_buffers(buf,len); - - std::vector cqe; - get_wc(cqe); - if (cqe.empty()) - return read == 0 ? -EAGAIN : read; - - ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl; - for (size_t i = 0; i < cqe.size(); ++i) { - ibv_wc* response = &cqe[i]; - assert(response->status == IBV_WC_SUCCESS); - Chunk* chunk = reinterpret_cast(response->wr_id); - ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl; - chunk->prepare_read(response->byte_len); - if (response->byte_len == 0) { - if (connected) { - error = ECONNRESET; - assert(infiniband->post_chunk(chunk) == 0); - ldout(cct, 20) << __func__ << " got remote close msg..." << dendl; - } - break; - } - //assert(response->byte_len); - if (read == (ssize_t)len) { - buffers.push_back(chunk); - ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl; - } else if (read + response->byte_len > (ssize_t)len) { - read += chunk->read(buf+read, (ssize_t)len-read); - buffers.push_back(chunk); - ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; - } else { - read += chunk->read(buf+read, response->byte_len); - assert(infiniband->post_chunk(chunk) == 0); - } - } - - if (is_server && connected == 0) { - ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl; - connected = 1; //if so, we don't need the last handshake - cleanup(); - submit(false); - } - - if (read == 0 && error) - return -error; - return read == 0 ? -EAGAIN : read; -} - -ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) -{ - size_t read = 0, tmp = 0; - vector::iterator c = buffers.begin(); - for (; c != buffers.end() ; ++c) { - tmp = (*c)->read(buf+read, len-read); - read += tmp; - ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl; - if ((*c)->over()) { - assert(infiniband->post_chunk(*c) == 0); - ldout(cct, 25) << __func__ << " one chunk over." << dendl; - } - if (read == len) { - break; - } - } - - if (c != buffers.end() && (*c)->over()) - c++; - buffers.erase(buffers.begin(), c); - ldout(cct, 25) << __func__ << " got " << read << " bytes, buffers size: " << buffers.size() << dendl; - return read; -} - -ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data) -{ - if (error) - return -error; - static const int MAX_COMPLETIONS = 16; - ibv_wc wc[MAX_COMPLETIONS]; - ssize_t size = 0; - - ibv_wc* response; - Chunk* chunk; - bool loaded = false; - auto iter = buffers.begin(); - if (iter != buffers.end()) { - chunk = *iter; - // FIXME need to handle release - // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); - buffers.erase(iter); - loaded = true; - size = chunk->bound; - } - - std::vector cqe; - get_wc(cqe); - if (cqe.empty()) - return size == 0 ? -EAGAIN : size; - - ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl; - - for (size_t i = 0; i < cqe.size(); ++i) { - response = &wc[i]; - chunk = reinterpret_cast(response->wr_id); - chunk->prepare_read(response->byte_len); - if (!loaded && i == 0) { - // FIXME need to handle release - // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); - size = chunk->bound; - continue; - } - buffers.push_back(chunk); - iter++; - } - - if (size == 0) - return -EAGAIN; - return size; -} - -ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) -{ - if (error) { - if (!active) - return -EPIPE; - return -error; - } - size_t bytes = bl.length(); - if (!bytes) - return 0; - { - Mutex::Locker l(lock); - pending_bl.claim_append(bl); - if (!connected) { - ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl; - return bytes; - } - } - ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl; - ssize_t r = submit(more); - if (r < 0 && r != -EAGAIN) - return r; - return bytes; -} - -ssize_t RDMAConnectedSocketImpl::submit(bool more) -{ - if (error) - return -error; - Mutex::Locker l(lock); - std::vector tx_buffers; - size_t bytes = pending_bl.length(); - ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: " - << pending_bl.buffers().size() << dendl; - if (!bytes) - return 0; - - int ret = worker->reserve_message_buffer(this, tx_buffers, bytes); - if (ret == 0) { - ldout(cct, 10) << __func__ << " no enough buffers in worker " << worker << dendl; - return -EAGAIN; // that is ok , cause send will return bytes. == 0 enough buffers, < 0 no buffer, >0 not enough - } - vector::iterator current_buffer = tx_buffers.begin(); - list::const_iterator it = pending_bl.buffers().begin(); - unsigned total = 0; - while (it != pending_bl.buffers().end()) { - const uintptr_t addr = reinterpret_cast(it->c_str()); - unsigned copied = 0; - while (copied < it->length()) { - uint32_t r = (*current_buffer)->write((char*)addr+copied, it->length() - copied); - copied += r; - total += r; - if ((*current_buffer)->full()){ - ++current_buffer; - if (current_buffer == tx_buffers.end()) - goto sending; - } - } - ++it; - } - - sending: - assert(total <= pending_bl.length()); - bufferlist swapped; - if (total < pending_bl.length()) { - pending_bl.splice(total, pending_bl.length()-total, &swapped); - pending_bl.swap(swapped); - } else { - pending_bl.clear(); - } - - ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers " - << pending_bl.buffers().size() << dendl; - - int r = post_work_request(tx_buffers); - if (r < 0) - return r; - - ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl; - return bytes; -} - -int RDMAConnectedSocketImpl::post_work_request(std::vector &tx_buffers) -{ - ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl; - vector::iterator current_buffer = tx_buffers.begin(); - ibv_sge isge[tx_buffers.size()]; - uint32_t current_sge = 0; - ibv_send_wr iswr[tx_buffers.size()]; - uint32_t current_swr = 0; - ibv_send_wr* pre_wr = NULL; - - memset(iswr, 0, sizeof(iswr)); - memset(isge, 0, sizeof(isge)); - current_buffer = tx_buffers.begin(); - while (current_buffer != tx_buffers.end()) { - isge[current_sge].addr = reinterpret_cast((*current_buffer)->buffer); - isge[current_sge].length = (*current_buffer)->get_offset(); - isge[current_sge].lkey = (*current_buffer)->mr->lkey; - ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl; - - iswr[current_swr].wr_id = reinterpret_cast(*current_buffer); - iswr[current_swr].next = NULL; - iswr[current_swr].sg_list = &isge[current_sge]; - iswr[current_swr].num_sge = 1; - iswr[current_swr].opcode = IBV_WR_SEND; - iswr[current_swr].send_flags = IBV_SEND_SIGNALED; - /*if (isge[current_sge].length < infiniband->max_inline_data) { - iswr[current_swr].send_flags = IBV_SEND_INLINE; - ldout(cct, 20) << __func__ << " send_inline." << dendl; - }*/ - - if (pre_wr) - pre_wr->next = &iswr[current_swr]; - pre_wr = &iswr[current_swr]; - ++current_sge; - ++current_swr; - ++current_buffer; - } - - ibv_send_wr *bad_tx_work_request; - if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) { - lderr(cct) << __func__ << " failed to send data" - << " (most probably should be peer not ready): " - << cpp_strerror(errno) << dendl; - return -errno; - } - ldout(cct, 20) << __func__ << " qp state is : " << Infiniband::qp_state_string(qp->get_state()) << dendl; - return 0; -} - -void RDMAConnectedSocketImpl::fin() { - ibv_send_wr wr; - memset(&wr, 0, sizeof(wr)); - wr.wr_id = reinterpret_cast(qp); - wr.num_sge = 0; - wr.opcode = IBV_WR_SEND; - wr.send_flags = IBV_SEND_SIGNALED; - ibv_send_wr* bad_tx_work_request; - if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) { - lderr(cct) << __func__ << " failed to send message=" - << " ibv_post_send failed(most probably should be peer not ready): " - << cpp_strerror(errno) << dendl; - return ; - } -} - -void RDMAConnectedSocketImpl::cleanup() { - if (con_handler) { - (static_cast(con_handler))->close(); - worker->center.submit_to(worker->center.get_id(), [this]() { - worker->center.delete_file_event(tcp_fd, EVENT_READABLE); - }, false); - delete con_handler; - con_handler = nullptr; - } -} +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "RDMAStack.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << " RDMAConnectedSocketImpl " + +int RDMAConnectedSocketImpl::activate() +{ + ibv_qp_attr qpa; + int r; + + // now connect up the qps and switch to RTR + memset(&qpa, 0, sizeof(qpa)); + qpa.qp_state = IBV_QPS_RTR; + qpa.path_mtu = IBV_MTU_1024; + qpa.dest_qp_num = peer_msg.qpn; + qpa.rq_psn = peer_msg.psn; + qpa.max_dest_rd_atomic = 1; + qpa.min_rnr_timer = 12; + //qpa.ah_attr.is_global = 0; + qpa.ah_attr.is_global = 1; + qpa.ah_attr.grh.hop_limit = 6; + qpa.ah_attr.grh.dgid = peer_msg.gid; + + qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx(); + + qpa.ah_attr.dlid = peer_msg.lid; + qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl; + qpa.ah_attr.src_path_bits = 0; + qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port()); + + ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl; + + r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | + IBV_QP_AV | + IBV_QP_PATH_MTU | + IBV_QP_DEST_QPN | + IBV_QP_RQ_PSN | + IBV_QP_MIN_RNR_TIMER | + IBV_QP_MAX_DEST_RD_ATOMIC); + if (r) { + lderr(cct) << __func__ << " failed to transition to RTR state: " + << cpp_strerror(errno) << dendl; + return -1; + } + + ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl; + + // now move to RTS + qpa.qp_state = IBV_QPS_RTS; + + // How long to wait before retrying if packet lost or server dead. + // Supposedly the timeout is 4.096us*2^timeout. However, the actual + // timeout appears to be 4.096us*2^(timeout+1), so the setting + // below creates a 135ms timeout. + qpa.timeout = 14; + + // How many times to retry after timeouts before giving up. + qpa.retry_cnt = 7; + + // How many times to retry after RNR (receiver not ready) condition + // before giving up. Occurs when the remote side has not yet posted + // a receive request. + qpa.rnr_retry = 7; // 7 is infinite retry. + qpa.sq_psn = my_msg.psn; + qpa.max_rd_atomic = 1; + + r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | + IBV_QP_TIMEOUT | + IBV_QP_RETRY_CNT | + IBV_QP_RNR_RETRY | + IBV_QP_SQ_PSN | + IBV_QP_MAX_QP_RD_ATOMIC); + if (r) { + lderr(cct) << __func__ << " failed to transition to RTS state: " + << cpp_strerror(errno) << dendl; + return -1; + } + + // the queue pair should be ready to use once the client has finished + // setting up their end. + ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl; + ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl; + + if (!is_server) { + connected = 1; //indicate successfully + ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl; + submit(false); + } + active = true; + + return 0; +} + +int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) { + ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:" + << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl; + NetHandler net(cct); + tcp_fd = net.connect(peer_addr); + + if (tcp_fd < 0) { + return -errno; + } + net.set_close_on_exec(tcp_fd); + + int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size); + if (r < 0) { + ::close(tcp_fd); + return -errno; + } + + ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl; + net.set_priority(tcp_fd, opts.priority); + my_msg.peer_qpn = 0; + r = infiniband->send_msg(cct, tcp_fd, my_msg); + if (r < 0) + return r; + + worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler); + return 0; +} + +void RDMAConnectedSocketImpl::handle_connection() { + ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl; + int r = infiniband->recv_msg(cct, tcp_fd, peer_msg); + if (r < 0) { + if (r != -EAGAIN) + fault(); + return; + } + + if (!is_server) {// syn + ack from server + my_msg.peer_qpn = peer_msg.qpn; + ldout(cct, 20) << __func__ << " peer msg : < " << peer_msg.qpn << ", " << peer_msg.psn + << ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl; + if (!connected) { + r = activate(); + assert(!r); + } + notify(); + r = infiniband->send_msg(cct, tcp_fd, my_msg); + if (r < 0) { + ldout(cct, 1) << __func__ << " send client ack failed." << dendl; + fault(); + } + } else { + if (peer_msg.peer_qpn == 0) {// syn from client + if (active) { + ldout(cct, 10) << __func__ << " server is already active." << dendl; + return ; + } + r = infiniband->send_msg(cct, tcp_fd, my_msg); + if (r < 0) { + ldout(cct, 1) << __func__ << " server ack failed." << dendl; + fault(); + return ; + } + r = activate(); + assert(!r); + } else { // ack from client + connected = 1; + cleanup(); + submit(false); + notify(); + } + } +} + +ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) +{ + uint64_t i = 0; + int r = ::read(notify_fd, &i, sizeof(i)); + ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl; + if (error) + return -error; + ssize_t read = 0; + if (!buffers.empty()) + read = read_buffers(buf,len); + + std::vector cqe; + get_wc(cqe); + if (cqe.empty()) + return read == 0 ? -EAGAIN : read; + + ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl; + for (size_t i = 0; i < cqe.size(); ++i) { + ibv_wc* response = &cqe[i]; + assert(response->status == IBV_WC_SUCCESS); + Chunk* chunk = reinterpret_cast(response->wr_id); + ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl; + chunk->prepare_read(response->byte_len); + if (response->byte_len == 0) { + if (connected) { + error = ECONNRESET; + assert(infiniband->post_chunk(chunk) == 0); + ldout(cct, 20) << __func__ << " got remote close msg..." << dendl; + } + break; + } + //assert(response->byte_len); + if (read == (ssize_t)len) { + buffers.push_back(chunk); + ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl; + } else if (read + response->byte_len > (ssize_t)len) { + read += chunk->read(buf+read, (ssize_t)len-read); + buffers.push_back(chunk); + ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; + } else { + read += chunk->read(buf+read, response->byte_len); + assert(infiniband->post_chunk(chunk) == 0); + } + } + + if (is_server && connected == 0) { + ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl; + connected = 1; //if so, we don't need the last handshake + cleanup(); + submit(false); + } + + if (read == 0 && error) + return -error; + return read == 0 ? -EAGAIN : read; +} + +ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) +{ + size_t read = 0, tmp = 0; + vector::iterator c = buffers.begin(); + for (; c != buffers.end() ; ++c) { + tmp = (*c)->read(buf+read, len-read); + read += tmp; + ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl; + if ((*c)->over()) { + assert(infiniband->post_chunk(*c) == 0); + ldout(cct, 25) << __func__ << " one chunk over." << dendl; + } + if (read == len) { + break; + } + } + + if (c != buffers.end() && (*c)->over()) + c++; + buffers.erase(buffers.begin(), c); + ldout(cct, 25) << __func__ << " got " << read << " bytes, buffers size: " << buffers.size() << dendl; + return read; +} + +ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data) +{ + if (error) + return -error; + static const int MAX_COMPLETIONS = 16; + ibv_wc wc[MAX_COMPLETIONS]; + ssize_t size = 0; + + ibv_wc* response; + Chunk* chunk; + bool loaded = false; + auto iter = buffers.begin(); + if (iter != buffers.end()) { + chunk = *iter; + // FIXME need to handle release + // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); + buffers.erase(iter); + loaded = true; + size = chunk->bound; + } + + std::vector cqe; + get_wc(cqe); + if (cqe.empty()) + return size == 0 ? -EAGAIN : size; + + ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl; + + for (size_t i = 0; i < cqe.size(); ++i) { + response = &wc[i]; + chunk = reinterpret_cast(response->wr_id); + chunk->prepare_read(response->byte_len); + if (!loaded && i == 0) { + // FIXME need to handle release + // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); + size = chunk->bound; + continue; + } + buffers.push_back(chunk); + iter++; + } + + if (size == 0) + return -EAGAIN; + return size; +} + +ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) +{ + if (error) { + if (!active) + return -EPIPE; + return -error; + } + size_t bytes = bl.length(); + if (!bytes) + return 0; + { + Mutex::Locker l(lock); + pending_bl.claim_append(bl); + if (!connected) { + ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl; + return bytes; + } + } + ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl; + ssize_t r = submit(more); + if (r < 0 && r != -EAGAIN) + return r; + return bytes; +} + +ssize_t RDMAConnectedSocketImpl::submit(bool more) +{ + if (error) + return -error; + Mutex::Locker l(lock); + std::vector tx_buffers; + size_t bytes = pending_bl.length(); + ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: " + << pending_bl.buffers().size() << dendl; + if (!bytes) + return 0; + + int ret = worker->reserve_message_buffer(this, tx_buffers, bytes); + if (ret == 0) { + ldout(cct, 10) << __func__ << " no enough buffers in worker " << worker << dendl; + return -EAGAIN; // that is ok , cause send will return bytes. == 0 enough buffers, < 0 no buffer, >0 not enough + } + vector::iterator current_buffer = tx_buffers.begin(); + list::const_iterator it = pending_bl.buffers().begin(); + unsigned total = 0; + while (it != pending_bl.buffers().end()) { + const uintptr_t addr = reinterpret_cast(it->c_str()); + unsigned copied = 0; + while (copied < it->length()) { + uint32_t r = (*current_buffer)->write((char*)addr+copied, it->length() - copied); + copied += r; + total += r; + if ((*current_buffer)->full()){ + ++current_buffer; + if (current_buffer == tx_buffers.end()) + goto sending; + } + } + ++it; + } + + sending: + assert(total <= pending_bl.length()); + bufferlist swapped; + if (total < pending_bl.length()) { + pending_bl.splice(total, pending_bl.length()-total, &swapped); + pending_bl.swap(swapped); + } else { + pending_bl.clear(); + } + + ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers " + << pending_bl.buffers().size() << dendl; + + int r = post_work_request(tx_buffers); + if (r < 0) + return r; + + ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl; + return bytes; +} + +int RDMAConnectedSocketImpl::post_work_request(std::vector &tx_buffers) +{ + ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl; + vector::iterator current_buffer = tx_buffers.begin(); + ibv_sge isge[tx_buffers.size()]; + uint32_t current_sge = 0; + ibv_send_wr iswr[tx_buffers.size()]; + uint32_t current_swr = 0; + ibv_send_wr* pre_wr = NULL; + + memset(iswr, 0, sizeof(iswr)); + memset(isge, 0, sizeof(isge)); + current_buffer = tx_buffers.begin(); + while (current_buffer != tx_buffers.end()) { + isge[current_sge].addr = reinterpret_cast((*current_buffer)->buffer); + isge[current_sge].length = (*current_buffer)->get_offset(); + isge[current_sge].lkey = (*current_buffer)->mr->lkey; + ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl; + + iswr[current_swr].wr_id = reinterpret_cast(*current_buffer); + iswr[current_swr].next = NULL; + iswr[current_swr].sg_list = &isge[current_sge]; + iswr[current_swr].num_sge = 1; + iswr[current_swr].opcode = IBV_WR_SEND; + iswr[current_swr].send_flags = IBV_SEND_SIGNALED; + /*if (isge[current_sge].length < infiniband->max_inline_data) { + iswr[current_swr].send_flags = IBV_SEND_INLINE; + ldout(cct, 20) << __func__ << " send_inline." << dendl; + }*/ + + if (pre_wr) + pre_wr->next = &iswr[current_swr]; + pre_wr = &iswr[current_swr]; + ++current_sge; + ++current_swr; + ++current_buffer; + } + + ibv_send_wr *bad_tx_work_request; + if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) { + lderr(cct) << __func__ << " failed to send data" + << " (most probably should be peer not ready): " + << cpp_strerror(errno) << dendl; + return -errno; + } + ldout(cct, 20) << __func__ << " qp state is : " << Infiniband::qp_state_string(qp->get_state()) << dendl; + return 0; +} + +void RDMAConnectedSocketImpl::fin() { + ibv_send_wr wr; + memset(&wr, 0, sizeof(wr)); + wr.wr_id = reinterpret_cast(qp); + wr.num_sge = 0; + wr.opcode = IBV_WR_SEND; + wr.send_flags = IBV_SEND_SIGNALED; + ibv_send_wr* bad_tx_work_request; + if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) { + lderr(cct) << __func__ << " failed to send message=" + << " ibv_post_send failed(most probably should be peer not ready): " + << cpp_strerror(errno) << dendl; + return ; + } +} + +void RDMAConnectedSocketImpl::cleanup() { + if (con_handler) { + (static_cast(con_handler))->close(); + worker->center.submit_to(worker->center.get_id(), [this]() { + worker->center.delete_file_event(tcp_fd, EVENT_READABLE); + }, false); + delete con_handler; + con_handler = nullptr; + } +} diff --git a/src/msg/async/rdma/RDMAServerSocketImpl.cc b/src/msg/async/rdma/RDMAServerSocketImpl.cc index c1fee33cad7b..03be56da7598 100644 --- a/src/msg/async/rdma/RDMAServerSocketImpl.cc +++ b/src/msg/async/rdma/RDMAServerSocketImpl.cc @@ -1,108 +1,108 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 XSKY - * - * Author: Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "msg/async/net_handler.h" -#include "RDMAStack.h" - -#define dout_subsys ceph_subsys_ms -#undef dout_prefix -#define dout_prefix *_dout << " RDMAServerSocketImpl " - -int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt) -{ - int rc = 0; - server_setup_socket = net.create_socket(sa.get_family(), true); - if (server_setup_socket < 0) { - rc = -errno; - lderr(cct) << __func__ << " failed to create server socket: " - << cpp_strerror(errno) << dendl; - return rc; - } - - rc = net.set_nonblock(server_setup_socket); - if (rc < 0) { - goto err; - } - - rc = net.set_socket_options(server_setup_socket, opt.nodelay, opt.rcbuf_size); - if (rc < 0) { - goto err; - } - net.set_close_on_exec(server_setup_socket); - - rc = ::bind(server_setup_socket, sa.get_sockaddr(), sa.get_sockaddr_len()); - if (rc < 0) { - rc = -errno; - ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr() - << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl; - goto err; - } - - rc = ::listen(server_setup_socket, 128); - if (rc < 0) { - rc = -errno; - lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(errno) << dendl; - goto err; - } - - ldout(cct, 20) << __func__ << " bind to " << sa.get_sockaddr() << " on port " << sa.get_port() << dendl; - return 0; - -err: - ::close(server_setup_socket); - server_setup_socket = -1; - return -errno; -} - -int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) -{ - ldout(cct, 15) << __func__ << dendl; - - assert(sock); - sockaddr_storage ss; - socklen_t slen = sizeof(ss); - int sd = ::accept(server_setup_socket, (sockaddr*)&ss, &slen); - if (sd < 0) { - return -errno; - } - ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl; - - net.set_close_on_exec(sd); - int r = net.set_nonblock(sd); - if (r < 0) { - ::close(sd); - return -errno; - } - - r = net.set_socket_options(sd, opt.nodelay, opt.rcbuf_size); - if (r < 0) { - ::close(sd); - return -errno; - } - net.set_priority(sd, opt.priority); - - RDMAConnectedSocketImpl* server; - //Worker* w = dispatcher->get_stack()->get_worker(); - server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast(w)); - server->set_accept_fd(sd); - ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl; - std::unique_ptr csi(server); - *sock = ConnectedSocket(std::move(csi)); - if (out) - out->set_sockaddr((sockaddr*)&ss); - - return 0; -} +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "msg/async/net_handler.h" +#include "RDMAStack.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << " RDMAServerSocketImpl " + +int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt) +{ + int rc = 0; + server_setup_socket = net.create_socket(sa.get_family(), true); + if (server_setup_socket < 0) { + rc = -errno; + lderr(cct) << __func__ << " failed to create server socket: " + << cpp_strerror(errno) << dendl; + return rc; + } + + rc = net.set_nonblock(server_setup_socket); + if (rc < 0) { + goto err; + } + + rc = net.set_socket_options(server_setup_socket, opt.nodelay, opt.rcbuf_size); + if (rc < 0) { + goto err; + } + net.set_close_on_exec(server_setup_socket); + + rc = ::bind(server_setup_socket, sa.get_sockaddr(), sa.get_sockaddr_len()); + if (rc < 0) { + rc = -errno; + ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr() + << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl; + goto err; + } + + rc = ::listen(server_setup_socket, 128); + if (rc < 0) { + rc = -errno; + lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(errno) << dendl; + goto err; + } + + ldout(cct, 20) << __func__ << " bind to " << sa.get_sockaddr() << " on port " << sa.get_port() << dendl; + return 0; + +err: + ::close(server_setup_socket); + server_setup_socket = -1; + return -errno; +} + +int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) +{ + ldout(cct, 15) << __func__ << dendl; + + assert(sock); + sockaddr_storage ss; + socklen_t slen = sizeof(ss); + int sd = ::accept(server_setup_socket, (sockaddr*)&ss, &slen); + if (sd < 0) { + return -errno; + } + ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl; + + net.set_close_on_exec(sd); + int r = net.set_nonblock(sd); + if (r < 0) { + ::close(sd); + return -errno; + } + + r = net.set_socket_options(sd, opt.nodelay, opt.rcbuf_size); + if (r < 0) { + ::close(sd); + return -errno; + } + net.set_priority(sd, opt.priority); + + RDMAConnectedSocketImpl* server; + //Worker* w = dispatcher->get_stack()->get_worker(); + server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast(w)); + server->set_accept_fd(sd); + ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl; + std::unique_ptr csi(server); + *sock = ConnectedSocket(std::move(csi)); + if (out) + out->set_sockaddr((sockaddr*)&ss); + + return 0; +}