-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- \r
-// vim: ts=8 sw=2 smarttab\r
-/*\r
- * Ceph - scalable distributed file system\r
- *\r
- * Copyright (C) 2016 XSKY <haomai@xsky.com>\r
- *\r
- * Author: Haomai Wang <haomaiwang@gmail.com>\r
- *\r
- * This is free software; you can redistribute it and/or\r
- * modify it under the terms of the GNU Lesser General Public\r
- * License version 2.1, as published by the Free Software\r
- * Foundation. See file COPYING.\r
- *\r
- */\r
-\r
-#include "Infiniband.h"\r
-#include "common/errno.h"\r
-#include "common/debug.h"\r
-\r
-#define dout_subsys ceph_subsys_ms\r
-#undef dout_prefix\r
-#define dout_prefix *_dout << "Infiniband "\r
-\r
-static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;\r
-static const uint32_t MAX_INLINE_DATA = 128;\r
-static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");\r
-static const uint32_t CQ_DEPTH = 30000;\r
-\r
-Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr)\r
-{\r
- if (device == NULL) {\r
- lderr(cct) << __func__ << "device == NULL" << cpp_strerror(errno) << dendl;\r
- ceph_abort();\r
- }\r
- name = ibv_get_device_name(device);\r
- ctxt = ibv_open_device(device);\r
- if (ctxt == NULL) {\r
- lderr(cct) << __func__ << "open rdma device failed. " << cpp_strerror(errno) << dendl;\r
- ceph_abort();\r
- }\r
- int r = ibv_query_device(ctxt, device_attr);\r
- if (r == -1) {\r
- lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;\r
- ceph_abort();\r
- }\r
-}\r
-\r
-Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) {\r
- union ibv_gid cgid;\r
- struct ibv_exp_gid_attr gid_attr;\r
- bool malformed = false;\r
-
- int r = ibv_query_port(ctxt, port_num, port_attr);\r
- if (r == -1) {\r
- lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;\r
- ceph_abort();\r
- }\r
-\r
- lid = port_attr->lid;\r
-\r
- // search for requested GID in GIDs table\r
- ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid)\r
- << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl;\r
- r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(),\r
- "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx"\r
- ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx",\r
- &cgid.raw[ 0], &cgid.raw[ 1],\r
- &cgid.raw[ 2], &cgid.raw[ 3],\r
- &cgid.raw[ 4], &cgid.raw[ 5],\r
- &cgid.raw[ 6], &cgid.raw[ 7],\r
- &cgid.raw[ 8], &cgid.raw[ 9],\r
- &cgid.raw[10], &cgid.raw[11],\r
- &cgid.raw[12], &cgid.raw[13],\r
- &cgid.raw[14], &cgid.raw[15]);\r
-\r
- if (r != 16) {\r
- ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl;\r
- malformed = true;\r
- }\r
-\r
- gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE;\r
-\r
- for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) {\r
- r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);\r
- if (r) {\r
- lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;\r
- ceph_abort();\r
- }\r
- r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr);\r
- if (r) {\r
- lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;\r
- ceph_abort();\r
- }\r
-\r
- if (malformed) break; // stay with gid_idx=0\r
- if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) &&\r
- (memcmp(&gid, &cgid, 16) == 0) ) {\r
- ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl;\r
- break;\r
- }\r
- }\r
-\r
- if (gid_idx == port_attr->gid_tbl_len) {\r
- lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl;\r
- ceph_abort();\r
- }\r
- }\r
-\r
-void Device::binding_port(CephContext *cct, uint8_t port_num) {\r
- port_cnt = device_attr->phys_port_cnt;\r
- ports = new Port*[port_cnt];\r
- for (uint8_t i = 0; i < port_cnt; ++i) {\r
- ports[i] = new Port(cct, ctxt, i+1);\r
- if (i+1 == port_num && ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) {\r
- active_port = ports[i];\r
- ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;\r
- return ;\r
- } else {\r
- ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << ports[i]->get_port_attr()->state << ")"<< dendl;\r
- }\r
- }\r
- if (nullptr == active_port) {\r
- lderr(cct) << __func__ << " port not found" << dendl;\r
- assert(active_port);\r
- }\r
-}\r
-\r
-Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num): device_list(cct)\r
-{\r
- device = device_list.get_device(device_name.c_str());\r
- device->binding_port(cct, port_num);\r
- assert(device);\r
- ib_physical_port = device->active_port->get_port_num();\r
- pd = new ProtectionDomain(cct, device);\r
- assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);\r
-\r
- max_recv_wr = device->device_attr->max_srq_wr;\r
- if (max_recv_wr > cct->_conf->ms_async_rdma_receive_buffers) {\r
- max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers;\r
- ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl;\r
- } else {\r
- ldout(cct, 1) << __func__ << " using the max allowed receive buffers: " << max_recv_wr << dendl;\r
- }\r
-\r
- max_send_wr = device->device_attr->max_qp_wr;\r
- if (max_send_wr > cct->_conf->ms_async_rdma_send_buffers) {\r
- max_send_wr = cct->_conf->ms_async_rdma_send_buffers;\r
- ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl;\r
- } else {\r
- ldout(cct, 1) << __func__ << " using the max allowed send buffers: " << max_send_wr << dendl;\r
- }\r
-\r
- ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe\r
- << " completion entries" << dendl;\r
-\r
- memory_manager = new MemoryManager(device, pd,\r
- cct->_conf->ms_async_rdma_enable_hugepage);\r
- memory_manager->register_rx_tx(\r
- cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr);\r
-\r
- srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);\r
- post_channel_cluster();\r
-}\r
-\r
-/**\r
- * Create a shared receive queue. This basically wraps the verbs call. \r
- *\r
- * \param[in] max_wr\r
- * The max number of outstanding work requests in the SRQ.\r
- * \param[in] max_sge\r
- * The max number of scatter elements per WR.\r
- * \return\r
- * A valid ibv_srq pointer, or NULL on error.\r
- */\r
-ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)\r
-{\r
- ibv_srq_init_attr sia;\r
- memset(&sia, 0, sizeof(sia));\r
- sia.srq_context = device->ctxt;\r
- sia.attr.max_wr = max_wr;\r
- sia.attr.max_sge = max_sge;\r
- return ibv_create_srq(pd->pd, &sia);\r
-}\r
-\r
-/**\r
- * Create a new QueuePair. This factory should be used in preference to\r
- * the QueuePair constructor directly, since this lets derivatives of\r
- * Infiniband, e.g. MockInfiniband (if it existed),\r
- * return mocked out QueuePair derivatives.\r
- *\r
- * \return\r
- * QueuePair on success or NULL if init fails\r
- * See QueuePair::QueuePair for parameter documentation.\r
- */\r
-Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)\r
-{\r
- Infiniband::QueuePair *qp = new QueuePair(\r
- cct, *this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr);\r
- if (qp->init()) {\r
- delete qp;\r
- return NULL;\r
- }\r
- return qp;\r
-}\r
-\r
-int Infiniband::QueuePair::init()\r
-{\r
- ldout(cct, 20) << __func__ << " started." << dendl;\r
- ibv_qp_init_attr qpia;\r
- memset(&qpia, 0, sizeof(qpia));\r
- qpia.send_cq = txcq->get_cq();\r
- qpia.recv_cq = rxcq->get_cq();\r
- qpia.srq = srq; // use the same shared receive queue\r
- qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests\r
- qpia.cap.max_send_sge = 1; // max send scatter-gather elements\r
- qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q\r
- qpia.qp_type = type; // RC, UC, UD, or XRC\r
- qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs\r
-\r
- qp = ibv_create_qp(pd, &qpia);\r
- if (qp == NULL) {\r
- lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;\r
- lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_buffers or"\r
- " ms_async_rdma_send_buffers" << dendl;\r
- return -1;\r
- }\r
-\r
- ldout(cct, 20) << __func__ << " successfully create queue pair: "\r
- << "qp=" << qp << dendl;\r
-\r
- // move from RESET to INIT state\r
- ibv_qp_attr qpa;\r
- memset(&qpa, 0, sizeof(qpa));\r
- qpa.qp_state = IBV_QPS_INIT;\r
- qpa.pkey_index = 0;\r
- qpa.port_num = (uint8_t)(ib_physical_port);\r
- qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;\r
- qpa.qkey = q_key;\r
-\r
- int mask = IBV_QP_STATE | IBV_QP_PORT;\r
- switch (type) {\r
- case IBV_QPT_RC:\r
- mask |= IBV_QP_ACCESS_FLAGS;\r
- mask |= IBV_QP_PKEY_INDEX;\r
- break;\r
- case IBV_QPT_UD:\r
- mask |= IBV_QP_QKEY;\r
- mask |= IBV_QP_PKEY_INDEX;\r
- break;\r
- case IBV_QPT_RAW_PACKET:\r
- break;\r
- default:\r
- ceph_abort();\r
- }\r
-\r
- int ret = ibv_modify_qp(qp, &qpa, mask);\r
- if (ret) {\r
- ibv_destroy_qp(qp);\r
- lderr(cct) << __func__ << " failed to transition to INIT state: "\r
- << cpp_strerror(errno) << dendl;\r
- return -1;\r
- }\r
- ldout(cct, 20) << __func__ << " successfully change queue pair to INIT:"\r
- << " qp=" << qp << dendl;\r
- return 0;\r
-}\r
-\r
-/**\r
- * Change RC QueuePair into the ERROR state. This is necessary modify\r
- * the Queue Pair into the Error state and poll all of the relevant\r
- * Work Completions prior to destroying a Queue Pair.\r
- * Since destroying a Queue Pair does not guarantee that its Work\r
- * Completions are removed from the CQ upon destruction. Even if the\r
- * Work Completions are already in the CQ, it might not be possible to\r
- * retrieve them. If the Queue Pair is associated with an SRQ, it is\r
- * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED\r
- *\r
- * \return\r
- * -errno if the QueuePair can't switch to ERROR\r
- * 0 for success.\r
- */\r
-int Infiniband::QueuePair::to_dead()\r
-{\r
- if (dead)\r
- return 0;\r
- ibv_qp_attr qpa;\r
- memset(&qpa, 0, sizeof(qpa));\r
- qpa.qp_state = IBV_QPS_ERR;\r
-\r
- int mask = IBV_QP_STATE;\r
- int ret = ibv_modify_qp(qp, &qpa, mask);\r
- if (ret) {\r
- lderr(cct) << __func__ << " failed to transition to ERROR state: "\r
- << cpp_strerror(errno) << dendl;\r
- return -errno;\r
- }\r
- dead = true;\r
- return ret;\r
-}\r
-\r
-int Infiniband::post_chunk(Chunk* chunk)\r
-{\r
- ibv_sge isge;\r
- isge.addr = reinterpret_cast<uint64_t>(chunk->buffer);\r
- isge.length = chunk->bytes;\r
- isge.lkey = chunk->mr->lkey;\r
- ibv_recv_wr rx_work_request;\r
-\r
- memset(&rx_work_request, 0, sizeof(rx_work_request));\r
- rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr\r
- rx_work_request.next = NULL;\r
- rx_work_request.sg_list = &isge;\r
- rx_work_request.num_sge = 1;\r
-\r
- ibv_recv_wr *badWorkRequest;\r
- int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);\r
- if (ret)\r
- return -1;\r
- return 0;\r
-}\r
-\r
-int Infiniband::post_channel_cluster()\r
-{\r
- vector<Chunk*> free_chunks;\r
- int r = memory_manager->get_channel_buffers(free_chunks, 0);\r
- assert(r > 0);\r
- for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {\r
- r = post_chunk(*iter);\r
- assert(r == 0);\r
- }\r
- return 0;\r
-}\r
-\r
-Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)\r
-{\r
- Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);\r
- if (cc->init()) {\r
- delete cc;\r
- return NULL;\r
- }\r
- return cc;\r
-}\r
-\r
-Infiniband::CompletionQueue* Infiniband::create_comp_queue(\r
- CephContext *cct, CompletionChannel *cc)\r
-{\r
- Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(\r
- cct, *this, CQ_DEPTH, cc);\r
- if (cq->init()) {\r
- delete cq;\r
- return NULL;\r
- }\r
- return cq;\r
-}\r
-\r
-\r
-Infiniband::QueuePair::QueuePair(\r
- CephContext *c, Infiniband& infiniband, ibv_qp_type type,\r
- int port, ibv_srq *srq,\r
- Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,\r
- uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)\r
-: cct(c), infiniband(infiniband),\r
- type(type),\r
- ctxt(infiniband.device->ctxt),\r
- ib_physical_port(port),\r
- pd(infiniband.pd->pd),\r
- srq(srq),\r
- qp(NULL),\r
- txcq(txcq),\r
- rxcq(rxcq),\r
- initial_psn(0),\r
- max_send_wr(max_send_wr),\r
- max_recv_wr(max_recv_wr),\r
- q_key(q_key),\r
- dead(false)\r
-{\r
- initial_psn = lrand48() & 0xffffff;\r
- if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {\r
- lderr(cct) << __func__ << "invalid queue pair type" << cpp_strerror(errno) << dendl;\r
- ceph_abort();\r
- }\r
- pd = infiniband.pd->pd;\r
-}\r
-\r
-// 1 means no valid buffer read, 0 means got enough buffer\r
-// else return < 0 means error\r
-int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)\r
-{\r
- char msg[TCP_MSG_LEN];\r
- char gid[33];\r
- ssize_t r = ::read(sd, &msg, sizeof(msg));\r
- // Drop incoming qpt\r
- if (cct->_conf->ms_inject_socket_failures && sd >= 0) {\r
- if (rand() % cct->_conf->ms_inject_socket_failures == 0) {\r
- ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;\r
- return -EINVAL;\r
- }\r
- }\r
- if (r < 0) {\r
- r = -errno;\r
- lderr(cct) << __func__ << " got error " << errno << ": "\r
- << cpp_strerror(errno) << dendl;\r
- } else if (r == 0) { // valid disconnect message of length 0\r
- ldout(cct, 10) << __func__ << " got disconnect message " << dendl;\r
- } else if ((size_t)r != sizeof(msg)) { // invalid message\r
- ldout(cct, 1) << __func__ << " got bad length (" << r << "): " << cpp_strerror(errno) << dendl;\r
- r = -EINVAL;\r
- } else { // valid message\r
- sscanf(msg, "%x:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);\r
- wire_gid_to_gid(gid, &(im.gid));\r
- ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl;\r
- }\r
- return r;\r
-}\r
-\r
-int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im)\r
-{\r
- int retry = 0;\r
- ssize_t r;\r
-\r
- char msg[TCP_MSG_LEN];\r
- char gid[33];\r
-retry:\r
- gid_to_wire_gid(&(im.gid), gid);\r
- sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);\r
- ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn\r
- << ", " << im.peer_qpn << ", " << gid << dendl;\r
- r = ::write(sd, msg, sizeof(msg));\r
- // Drop incoming qpt\r
- if (cct->_conf->ms_inject_socket_failures && sd >= 0) {\r
- if (rand() % cct->_conf->ms_inject_socket_failures == 0) {\r
- ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;\r
- return -EINVAL;\r
- }\r
- }\r
-\r
- if ((size_t)r != sizeof(msg)) {\r
- // FIXME need to handle EAGAIN instead of retry\r
- if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {\r
- retry++;\r
- goto retry;\r
- }\r
- if (r < 0)\r
- lderr(cct) << __func__ << " send returned error " << errno << ": "\r
- << cpp_strerror(errno) << dendl;\r
- else\r
- lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;\r
- return -errno;\r
- }\r
- return 0;\r
-}\r
-\r
-void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid)\r
-{\r
- char tmp[9];\r
- uint32_t v32;\r
- int i;\r
-\r
- for (tmp[8] = 0, i = 0; i < 4; ++i) {\r
- memcpy(tmp, wgid + i * 8, 8);\r
- sscanf(tmp, "%x", &v32);\r
- *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32);\r
- }\r
-}\r
-\r
-void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[])\r
-{\r
- for (int i = 0; i < 4; ++i)\r
- sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4)));\r
-}\r
-\r
-Infiniband::QueuePair::~QueuePair()\r
-{\r
- if (qp)\r
- assert(!ibv_destroy_qp(qp));\r
-}\r
-\r
-Infiniband::CompletionChannel::~CompletionChannel()\r
-{\r
- if (channel) {\r
- int r = ibv_destroy_comp_channel(channel);\r
- if (r < 0)\r
- lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;\r
- assert(r == 0);\r
- }\r
-}\r
-\r
-Infiniband::CompletionQueue::~CompletionQueue()\r
-{\r
- if (cq) {\r
- int r = ibv_destroy_cq(cq);\r
- if (r < 0)\r
- lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;\r
- assert(r == 0);\r
- }\r
-}\r
-\r
-int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)\r
-{\r
- ldout(cct, 20) << __func__ << " started." << dendl;\r
- int r = ibv_req_notify_cq(cq, 0);\r
- if (r < 0)\r
- lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;\r
- return r;\r
-}\r
-\r
-int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {\r
- int r = ibv_poll_cq(cq, num_entries, ret_wc_array);\r
- if (r < 0) {\r
- lderr(cct) << __func__ << " poll_completion_queue occur met error: "\r
- << cpp_strerror(errno) << dendl;\r
- return -1;\r
- }\r
- return r;\r
-}\r
-\r
-bool Infiniband::CompletionChannel::get_cq_event()\r
-{\r
- ibv_cq *cq = NULL;\r
- void *ev_ctx;\r
- if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {\r
- if (errno != EAGAIN && errno != EINTR)\r
- lderr(cct) << __func__ << " failed to retrieve CQ event: "\r
- << cpp_strerror(errno) << dendl;\r
- return false;\r
- }\r
-\r
- /* accumulate number of cq events that need to\r
- * * be acked, and periodically ack them\r
- * */\r
- if (++cq_events_that_need_ack == MAX_ACK_EVENT) {\r
- ldout(cct, 20) << __func__ << " ack aq events." << dendl;\r
- ibv_ack_cq_events(cq, MAX_ACK_EVENT);\r
- cq_events_that_need_ack = 0;\r
- }\r
-\r
- return true;\r
-}\r
-\r
-int Infiniband::CompletionQueue::init()\r
-{\r
- cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);\r
- if (!cq) {\r
- lderr(cct) << __func__ << " failed to create receive completion queue: "\r
- << cpp_strerror(errno) << dendl;\r
- return -1;\r
- }\r
-\r
- if (ibv_req_notify_cq(cq, 0)) {\r
- lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;\r
- ibv_destroy_cq(cq);\r
- cq = nullptr;\r
- return -1;\r
- }\r
-\r
- channel->bind_cq(cq);\r
- ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl;\r
- return 0;\r
-}\r
-\r
-int Infiniband::CompletionChannel::init()\r
-{\r
- ldout(cct, 20) << __func__ << " started." << dendl;\r
- channel = ibv_create_comp_channel(infiniband.device->ctxt);\r
- if (!channel) {\r
- lderr(cct) << __func__ << " failed to create receive completion channel: "\r
- << cpp_strerror(errno) << dendl;\r
- return -1;\r
- }\r
- int rc = NetHandler(cct).set_nonblock(channel->fd);\r
- if (rc < 0) {\r
- ibv_destroy_comp_channel(channel);\r
- return -1;\r
- }\r
- return 0;\r
-}\r
-\r
-/**\r
- * Given a string representation of the `status' field from Verbs\r
- * struct `ibv_wc'.\r
- *\r
- * \param[in] status\r
- * The integer status obtained in ibv_wc.status.\r
- * \return\r
- * A string corresponding to the given status.\r
- */\r
-const char* Infiniband::wc_status_to_string(int status)\r
-{\r
- static const char *lookup[] = {\r
- "SUCCESS",\r
- "LOC_LEN_ERR",\r
- "LOC_QP_OP_ERR",\r
- "LOC_EEC_OP_ERR",\r
- "LOC_PROT_ERR",\r
- "WR_FLUSH_ERR",\r
- "MW_BIND_ERR",\r
- "BAD_RESP_ERR",\r
- "LOC_ACCESS_ERR",\r
- "REM_INV_REQ_ERR",\r
- "REM_ACCESS_ERR",\r
- "REM_OP_ERR",\r
- "RETRY_EXC_ERR",\r
- "RNR_RETRY_EXC_ERR",\r
- "LOC_RDD_VIOL_ERR",\r
- "REM_INV_RD_REQ_ERR",\r
- "REM_ABORT_ERR",\r
- "INV_EECN_ERR",\r
- "INV_EEC_STATE_ERR",\r
- "FATAL_ERR",\r
- "RESP_TIMEOUT_ERR",\r
- "GENERAL_ERR"\r
- };\r
-\r
- if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR)\r
- return "<status out of range!>";\r
- return lookup[status];\r
-}\r
-\r
-const char* Infiniband::qp_state_string(int status) {\r
- switch(status) {\r
- case IBV_QPS_RESET : return "IBV_QPS_RESET";\r
- case IBV_QPS_INIT : return "IBV_QPS_INIT";\r
- case IBV_QPS_RTR : return "IBV_QPS_RTR";\r
- case IBV_QPS_RTS : return "IBV_QPS_RTS";\r
- case IBV_QPS_SQD : return "IBV_QPS_SQD";\r
- case IBV_QPS_SQE : return "IBV_QPS_SQE";\r
- case IBV_QPS_ERR : return "IBV_QPS_ERR";\r
- default: return " out of range.";\r
- }\r
-}\r
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "Infiniband.h"
+#include "common/errno.h"
+#include "common/debug.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << "Infiniband "
+
+static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;
+static const uint32_t MAX_INLINE_DATA = 128;
+static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
+static const uint32_t CQ_DEPTH = 30000;
+
+Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr)
+{
+ if (device == NULL) {
+ lderr(cct) << __func__ << "device == NULL" << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+ name = ibv_get_device_name(device);
+ ctxt = ibv_open_device(device);
+ if (ctxt == NULL) {
+ lderr(cct) << __func__ << "open rdma device failed. " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+ int r = ibv_query_device(ctxt, device_attr);
+ if (r == -1) {
+ lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+}
+
+Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) {
+ union ibv_gid cgid;
+ struct ibv_exp_gid_attr gid_attr;
+ bool malformed = false;
+
+ int r = ibv_query_port(ctxt, port_num, port_attr);
+ if (r == -1) {
+ lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+
+ lid = port_attr->lid;
+
+ // search for requested GID in GIDs table
+ ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid)
+ << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl;
+ r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(),
+ "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx"
+ ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx",
+ &cgid.raw[ 0], &cgid.raw[ 1],
+ &cgid.raw[ 2], &cgid.raw[ 3],
+ &cgid.raw[ 4], &cgid.raw[ 5],
+ &cgid.raw[ 6], &cgid.raw[ 7],
+ &cgid.raw[ 8], &cgid.raw[ 9],
+ &cgid.raw[10], &cgid.raw[11],
+ &cgid.raw[12], &cgid.raw[13],
+ &cgid.raw[14], &cgid.raw[15]);
+
+ if (r != 16) {
+ ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl;
+ malformed = true;
+ }
+
+ gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE;
+
+ for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) {
+ r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);
+ if (r) {
+ lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+ r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr);
+ if (r) {
+ lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+
+ if (malformed) break; // stay with gid_idx=0
+ if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) &&
+ (memcmp(&gid, &cgid, 16) == 0) ) {
+ ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl;
+ break;
+ }
+ }
+
+ if (gid_idx == port_attr->gid_tbl_len) {
+ lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl;
+ ceph_abort();
+ }
+ }
+
+void Device::binding_port(CephContext *cct, uint8_t port_num) {
+ port_cnt = device_attr->phys_port_cnt;
+ ports = new Port*[port_cnt];
+ for (uint8_t i = 0; i < port_cnt; ++i) {
+ ports[i] = new Port(cct, ctxt, i+1);
+ if (i+1 == port_num && ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) {
+ active_port = ports[i];
+ ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;
+ return ;
+ } else {
+ ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << ports[i]->get_port_attr()->state << ")"<< dendl;
+ }
+ }
+ if (nullptr == active_port) {
+ lderr(cct) << __func__ << " port not found" << dendl;
+ assert(active_port);
+ }
+}
+
+Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num): device_list(cct)
+{
+ device = device_list.get_device(device_name.c_str());
+ device->binding_port(cct, port_num);
+ assert(device);
+ ib_physical_port = device->active_port->get_port_num();
+ pd = new ProtectionDomain(cct, device);
+ assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
+
+ max_recv_wr = device->device_attr->max_srq_wr;
+ if (max_recv_wr > cct->_conf->ms_async_rdma_receive_buffers) {
+ max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers;
+ ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl;
+ } else {
+ ldout(cct, 1) << __func__ << " using the max allowed receive buffers: " << max_recv_wr << dendl;
+ }
+
+ max_send_wr = device->device_attr->max_qp_wr;
+ if (max_send_wr > cct->_conf->ms_async_rdma_send_buffers) {
+ max_send_wr = cct->_conf->ms_async_rdma_send_buffers;
+ ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl;
+ } else {
+ ldout(cct, 1) << __func__ << " using the max allowed send buffers: " << max_send_wr << dendl;
+ }
+
+ ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe
+ << " completion entries" << dendl;
+
+ memory_manager = new MemoryManager(device, pd,
+ cct->_conf->ms_async_rdma_enable_hugepage);
+ memory_manager->register_rx_tx(
+ cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr);
+
+ srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);
+ post_channel_cluster();
+}
+
+/**
+ * Create a shared receive queue. This basically wraps the verbs call.
+ *
+ * \param[in] max_wr
+ * The max number of outstanding work requests in the SRQ.
+ * \param[in] max_sge
+ * The max number of scatter elements per WR.
+ * \return
+ * A valid ibv_srq pointer, or NULL on error.
+ */
+ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)
+{
+ ibv_srq_init_attr sia;
+ memset(&sia, 0, sizeof(sia));
+ sia.srq_context = device->ctxt;
+ sia.attr.max_wr = max_wr;
+ sia.attr.max_sge = max_sge;
+ return ibv_create_srq(pd->pd, &sia);
+}
+
+/**
+ * Create a new QueuePair. This factory should be used in preference to
+ * the QueuePair constructor directly, since this lets derivatives of
+ * Infiniband, e.g. MockInfiniband (if it existed),
+ * return mocked out QueuePair derivatives.
+ *
+ * \return
+ * QueuePair on success or NULL if init fails
+ * See QueuePair::QueuePair for parameter documentation.
+ */
+Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)
+{
+ Infiniband::QueuePair *qp = new QueuePair(
+ cct, *this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr);
+ if (qp->init()) {
+ delete qp;
+ return NULL;
+ }
+ return qp;
+}
+
+int Infiniband::QueuePair::init()
+{
+ ldout(cct, 20) << __func__ << " started." << dendl;
+ ibv_qp_init_attr qpia;
+ memset(&qpia, 0, sizeof(qpia));
+ qpia.send_cq = txcq->get_cq();
+ qpia.recv_cq = rxcq->get_cq();
+ qpia.srq = srq; // use the same shared receive queue
+ qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests
+ qpia.cap.max_send_sge = 1; // max send scatter-gather elements
+ qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q
+ qpia.qp_type = type; // RC, UC, UD, or XRC
+ qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs
+
+ qp = ibv_create_qp(pd, &qpia);
+ if (qp == NULL) {
+ lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
+ lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_buffers or"
+ " ms_async_rdma_send_buffers" << dendl;
+ return -1;
+ }
+
+ ldout(cct, 20) << __func__ << " successfully create queue pair: "
+ << "qp=" << qp << dendl;
+
+ // move from RESET to INIT state
+ ibv_qp_attr qpa;
+ memset(&qpa, 0, sizeof(qpa));
+ qpa.qp_state = IBV_QPS_INIT;
+ qpa.pkey_index = 0;
+ qpa.port_num = (uint8_t)(ib_physical_port);
+ qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;
+ qpa.qkey = q_key;
+
+ int mask = IBV_QP_STATE | IBV_QP_PORT;
+ switch (type) {
+ case IBV_QPT_RC:
+ mask |= IBV_QP_ACCESS_FLAGS;
+ mask |= IBV_QP_PKEY_INDEX;
+ break;
+ case IBV_QPT_UD:
+ mask |= IBV_QP_QKEY;
+ mask |= IBV_QP_PKEY_INDEX;
+ break;
+ case IBV_QPT_RAW_PACKET:
+ break;
+ default:
+ ceph_abort();
+ }
+
+ int ret = ibv_modify_qp(qp, &qpa, mask);
+ if (ret) {
+ ibv_destroy_qp(qp);
+ lderr(cct) << __func__ << " failed to transition to INIT state: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ ldout(cct, 20) << __func__ << " successfully change queue pair to INIT:"
+ << " qp=" << qp << dendl;
+ return 0;
+}
+
+/**
+ * Change RC QueuePair into the ERROR state. This is necessary modify
+ * the Queue Pair into the Error state and poll all of the relevant
+ * Work Completions prior to destroying a Queue Pair.
+ * Since destroying a Queue Pair does not guarantee that its Work
+ * Completions are removed from the CQ upon destruction. Even if the
+ * Work Completions are already in the CQ, it might not be possible to
+ * retrieve them. If the Queue Pair is associated with an SRQ, it is
+ * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED
+ *
+ * \return
+ * -errno if the QueuePair can't switch to ERROR
+ * 0 for success.
+ */
+int Infiniband::QueuePair::to_dead()
+{
+ if (dead)
+ return 0;
+ ibv_qp_attr qpa;
+ memset(&qpa, 0, sizeof(qpa));
+ qpa.qp_state = IBV_QPS_ERR;
+
+ int mask = IBV_QP_STATE;
+ int ret = ibv_modify_qp(qp, &qpa, mask);
+ if (ret) {
+ lderr(cct) << __func__ << " failed to transition to ERROR state: "
+ << cpp_strerror(errno) << dendl;
+ return -errno;
+ }
+ dead = true;
+ return ret;
+}
+
+int Infiniband::post_chunk(Chunk* chunk)
+{
+ ibv_sge isge;
+ isge.addr = reinterpret_cast<uint64_t>(chunk->buffer);
+ isge.length = chunk->bytes;
+ isge.lkey = chunk->mr->lkey;
+ ibv_recv_wr rx_work_request;
+
+ memset(&rx_work_request, 0, sizeof(rx_work_request));
+ rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
+ rx_work_request.next = NULL;
+ rx_work_request.sg_list = &isge;
+ rx_work_request.num_sge = 1;
+
+ ibv_recv_wr *badWorkRequest;
+ int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);
+ if (ret)
+ return -1;
+ return 0;
+}
+
+int Infiniband::post_channel_cluster()
+{
+ vector<Chunk*> free_chunks;
+ int r = memory_manager->get_channel_buffers(free_chunks, 0);
+ assert(r > 0);
+ for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {
+ r = post_chunk(*iter);
+ assert(r == 0);
+ }
+ return 0;
+}
+
+Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)
+{
+ Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);
+ if (cc->init()) {
+ delete cc;
+ return NULL;
+ }
+ return cc;
+}
+
+Infiniband::CompletionQueue* Infiniband::create_comp_queue(
+ CephContext *cct, CompletionChannel *cc)
+{
+ Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(
+ cct, *this, CQ_DEPTH, cc);
+ if (cq->init()) {
+ delete cq;
+ return NULL;
+ }
+ return cq;
+}
+
+
+Infiniband::QueuePair::QueuePair(
+ CephContext *c, Infiniband& infiniband, ibv_qp_type type,
+ int port, ibv_srq *srq,
+ Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
+ uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)
+: cct(c), infiniband(infiniband),
+ type(type),
+ ctxt(infiniband.device->ctxt),
+ ib_physical_port(port),
+ pd(infiniband.pd->pd),
+ srq(srq),
+ qp(NULL),
+ txcq(txcq),
+ rxcq(rxcq),
+ initial_psn(0),
+ max_send_wr(max_send_wr),
+ max_recv_wr(max_recv_wr),
+ q_key(q_key),
+ dead(false)
+{
+ initial_psn = lrand48() & 0xffffff;
+ if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {
+ lderr(cct) << __func__ << "invalid queue pair type" << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+ pd = infiniband.pd->pd;
+}
+
+// 1 means no valid buffer read, 0 means got enough buffer
+// else return < 0 means error
+int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
+{
+ char msg[TCP_MSG_LEN];
+ char gid[33];
+ ssize_t r = ::read(sd, &msg, sizeof(msg));
+ // Drop incoming qpt
+ if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
+ if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+ return -EINVAL;
+ }
+ }
+ if (r < 0) {
+ r = -errno;
+ lderr(cct) << __func__ << " got error " << errno << ": "
+ << cpp_strerror(errno) << dendl;
+ } else if (r == 0) { // valid disconnect message of length 0
+ ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
+ } else if ((size_t)r != sizeof(msg)) { // invalid message
+ ldout(cct, 1) << __func__ << " got bad length (" << r << "): " << cpp_strerror(errno) << dendl;
+ r = -EINVAL;
+ } else { // valid message
+ sscanf(msg, "%x:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);
+ wire_gid_to_gid(gid, &(im.gid));
+ ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl;
+ }
+ return r;
+}
+
+int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im)
+{
+ int retry = 0;
+ ssize_t r;
+
+ char msg[TCP_MSG_LEN];
+ char gid[33];
+retry:
+ gid_to_wire_gid(&(im.gid), gid);
+ sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);
+ ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn
+ << ", " << im.peer_qpn << ", " << gid << dendl;
+ r = ::write(sd, msg, sizeof(msg));
+ // Drop incoming qpt
+ if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
+ if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+ return -EINVAL;
+ }
+ }
+
+ if ((size_t)r != sizeof(msg)) {
+ // FIXME need to handle EAGAIN instead of retry
+ if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
+ retry++;
+ goto retry;
+ }
+ if (r < 0)
+ lderr(cct) << __func__ << " send returned error " << errno << ": "
+ << cpp_strerror(errno) << dendl;
+ else
+ lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
+ return -errno;
+ }
+ return 0;
+}
+
+void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid)
+{
+ char tmp[9];
+ uint32_t v32;
+ int i;
+
+ for (tmp[8] = 0, i = 0; i < 4; ++i) {
+ memcpy(tmp, wgid + i * 8, 8);
+ sscanf(tmp, "%x", &v32);
+ *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32);
+ }
+}
+
+void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[])
+{
+ for (int i = 0; i < 4; ++i)
+ sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4)));
+}
+
+Infiniband::QueuePair::~QueuePair()
+{
+ if (qp)
+ assert(!ibv_destroy_qp(qp));
+}
+
+Infiniband::CompletionChannel::~CompletionChannel()
+{
+ if (channel) {
+ int r = ibv_destroy_comp_channel(channel);
+ if (r < 0)
+ lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;
+ assert(r == 0);
+ }
+}
+
+Infiniband::CompletionQueue::~CompletionQueue()
+{
+ if (cq) {
+ int r = ibv_destroy_cq(cq);
+ if (r < 0)
+ lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;
+ assert(r == 0);
+ }
+}
+
+int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)
+{
+ ldout(cct, 20) << __func__ << " started." << dendl;
+ int r = ibv_req_notify_cq(cq, 0);
+ if (r < 0)
+ lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;
+ return r;
+}
+
+int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {
+ int r = ibv_poll_cq(cq, num_entries, ret_wc_array);
+ if (r < 0) {
+ lderr(cct) << __func__ << " poll_completion_queue occur met error: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ return r;
+}
+
+bool Infiniband::CompletionChannel::get_cq_event()
+{
+ ibv_cq *cq = NULL;
+ void *ev_ctx;
+ if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {
+ if (errno != EAGAIN && errno != EINTR)
+ lderr(cct) << __func__ << " failed to retrieve CQ event: "
+ << cpp_strerror(errno) << dendl;
+ return false;
+ }
+
+ /* accumulate number of cq events that need to
+ * * be acked, and periodically ack them
+ * */
+ if (++cq_events_that_need_ack == MAX_ACK_EVENT) {
+ ldout(cct, 20) << __func__ << " ack aq events." << dendl;
+ ibv_ack_cq_events(cq, MAX_ACK_EVENT);
+ cq_events_that_need_ack = 0;
+ }
+
+ return true;
+}
+
+int Infiniband::CompletionQueue::init()
+{
+ cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);
+ if (!cq) {
+ lderr(cct) << __func__ << " failed to create receive completion queue: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+
+ if (ibv_req_notify_cq(cq, 0)) {
+ lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;
+ ibv_destroy_cq(cq);
+ cq = nullptr;
+ return -1;
+ }
+
+ channel->bind_cq(cq);
+ ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl;
+ return 0;
+}
+
+int Infiniband::CompletionChannel::init()
+{
+ ldout(cct, 20) << __func__ << " started." << dendl;
+ channel = ibv_create_comp_channel(infiniband.device->ctxt);
+ if (!channel) {
+ lderr(cct) << __func__ << " failed to create receive completion channel: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ int rc = NetHandler(cct).set_nonblock(channel->fd);
+ if (rc < 0) {
+ ibv_destroy_comp_channel(channel);
+ return -1;
+ }
+ return 0;
+}
+
+/**
+ * Given a string representation of the `status' field from Verbs
+ * struct `ibv_wc'.
+ *
+ * \param[in] status
+ * The integer status obtained in ibv_wc.status.
+ * \return
+ * A string corresponding to the given status.
+ */
+const char* Infiniband::wc_status_to_string(int status)
+{
+ static const char *lookup[] = {
+ "SUCCESS",
+ "LOC_LEN_ERR",
+ "LOC_QP_OP_ERR",
+ "LOC_EEC_OP_ERR",
+ "LOC_PROT_ERR",
+ "WR_FLUSH_ERR",
+ "MW_BIND_ERR",
+ "BAD_RESP_ERR",
+ "LOC_ACCESS_ERR",
+ "REM_INV_REQ_ERR",
+ "REM_ACCESS_ERR",
+ "REM_OP_ERR",
+ "RETRY_EXC_ERR",
+ "RNR_RETRY_EXC_ERR",
+ "LOC_RDD_VIOL_ERR",
+ "REM_INV_RD_REQ_ERR",
+ "REM_ABORT_ERR",
+ "INV_EECN_ERR",
+ "INV_EEC_STATE_ERR",
+ "FATAL_ERR",
+ "RESP_TIMEOUT_ERR",
+ "GENERAL_ERR"
+ };
+
+ if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR)
+ return "<status out of range!>";
+ return lookup[status];
+}
+
+const char* Infiniband::qp_state_string(int status) {
+ switch(status) {
+ case IBV_QPS_RESET : return "IBV_QPS_RESET";
+ case IBV_QPS_INIT : return "IBV_QPS_INIT";
+ case IBV_QPS_RTR : return "IBV_QPS_RTR";
+ case IBV_QPS_RTS : return "IBV_QPS_RTS";
+ case IBV_QPS_SQD : return "IBV_QPS_SQD";
+ case IBV_QPS_SQE : return "IBV_QPS_SQE";
+ case IBV_QPS_ERR : return "IBV_QPS_ERR";
+ default: return " out of range.";
+ }
+}
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- \r
-// vim: ts=8 sw=2 smarttab\r
-/*\r
- * Ceph - scalable distributed file system\r
- *\r
- * Copyright (C) 2016 XSKY <haomai@xsky.com>\r
- *\r
- * Author: Haomai Wang <haomaiwang@gmail.com>\r
- *\r
- * This is free software; you can redistribute it and/or\r
- * modify it under the terms of the GNU Lesser General Public\r
- * License version 2.1, as published by the Free Software\r
- * Foundation. See file COPYING.\r
- *\r
- */\r
-\r
-#ifndef CEPH_INFINIBAND_H\r
-#define CEPH_INFINIBAND_H\r
-\r
-#include <infiniband/verbs.h>\r
-\r
-#include <string>\r
-#include <vector>\r
-\r
-#include "include/int_types.h"\r
-#include "include/page.h"\r
-#include "common/debug.h"\r
-#include "common/errno.h"\r
-#include "msg/msg_types.h"\r
-#include "msg/async/net_handler.h"\r
-#include "common/Mutex.h"\r
-\r
-#define HUGE_PAGE_SIZE (2 * 1024 * 1024)\r
-#define ALIGN_TO_PAGE_SIZE(x) \\r
- (((x) + HUGE_PAGE_SIZE -1) / HUGE_PAGE_SIZE * HUGE_PAGE_SIZE)\r
-\r
-struct IBSYNMsg {\r
- uint16_t lid;\r
- uint32_t qpn;\r
- uint32_t psn;\r
- uint32_t peer_qpn;\r
- union ibv_gid gid;\r
-} __attribute__((packed));\r
-\r
-class RDMAStack;\r
-class CephContext;\r
-\r
-class Port {\r
- struct ibv_context* ctxt;\r
- uint8_t port_num;\r
- struct ibv_port_attr* port_attr;\r
- uint16_t lid;\r
- int gid_idx;\r
- union ibv_gid gid;\r
-\r
- public:\r
- explicit Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn);\r
- uint16_t get_lid() { return lid; }\r
- ibv_gid get_gid() { return gid; }\r
- uint8_t get_port_num() { return port_num; }\r
- ibv_port_attr* get_port_attr() { return port_attr; }\r
- int get_gid_idx() { return gid_idx; }\r
-};\r
-\r
-\r
-class Device {\r
- ibv_device *device;\r
- const char* name;\r
- uint8_t port_cnt;\r
- Port** ports;\r
- public:\r
- explicit Device(CephContext *c, ibv_device* d);\r
- ~Device() {\r
- for (uint8_t i = 0; i < port_cnt; ++i)\r
- delete ports[i];\r
- delete []ports;\r
- assert(ibv_close_device(ctxt) == 0);\r
- }\r
- const char* get_name() { return name;}\r
- uint16_t get_lid() { return active_port->get_lid(); }\r
- ibv_gid get_gid() { return active_port->get_gid(); }\r
- int get_gid_idx() { return active_port->get_gid_idx(); }\r
- void binding_port(CephContext *c, uint8_t port_num);\r
- struct ibv_context *ctxt;\r
- ibv_device_attr *device_attr;\r
- Port* active_port;\r
-};\r
-\r
-\r
-class DeviceList {\r
- struct ibv_device ** device_list;\r
- int num;\r
- Device** devices;\r
- public:\r
- DeviceList(CephContext *cct): device_list(ibv_get_device_list(&num)) {\r
- if (device_list == NULL || num == 0) {\r
- lderr(cct) << __func__ << " failed to get rdma device list. " << cpp_strerror(errno) << dendl;\r
- ceph_abort();\r
- }\r
- devices = new Device*[num];\r
-\r
- for (int i = 0;i < num; ++i) {\r
- devices[i] = new Device(cct, device_list[i]);\r
- }\r
- }\r
- ~DeviceList() {\r
- for (int i=0; i < num; ++i) {\r
- delete devices[i];\r
- }\r
- delete []devices;\r
- ibv_free_device_list(device_list);\r
- }\r
-\r
- Device* get_device(const char* device_name) {\r
- assert(devices);\r
- for (int i = 0; i < num; ++i) {\r
- if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) {\r
- return devices[i];\r
- }\r
- }\r
- return NULL;\r
- }\r
-};\r
-\r
-\r
-class Infiniband {\r
- public:\r
- class ProtectionDomain {\r
- public:\r
- explicit ProtectionDomain(CephContext *cct, Device *device)\r
- : pd(ibv_alloc_pd(device->ctxt))\r
- {\r
- if (pd == NULL) {\r
- lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;\r
- ceph_abort();\r
- }\r
- }\r
- ~ProtectionDomain() {\r
- int rc = ibv_dealloc_pd(pd);\r
- assert(rc == 0);\r
- }\r
- ibv_pd* const pd;\r
- };\r
-\r
-\r
- class MemoryManager {\r
- public:\r
- class Chunk {\r
- public:\r
- Chunk(char* b, uint32_t len, ibv_mr* m) : buffer(b), bytes(len), offset(0), mr(m) {}\r
- ~Chunk() {\r
- assert(ibv_dereg_mr(mr) == 0);\r
- }\r
-\r
- void set_offset(uint32_t o) {\r
- offset = o;\r
- }\r
-\r
- uint32_t get_offset() {\r
- return offset;\r
- }\r
-\r
- void set_bound(uint32_t b) {\r
- bound = b;\r
- }\r
-\r
- void prepare_read(uint32_t b) {\r
- offset = 0;\r
- bound = b;\r
- }\r
-\r
- uint32_t get_bound() {\r
- return bound;\r
- }\r
-\r
- uint32_t read(char* buf, uint32_t len) {\r
- uint32_t left = bound - offset;\r
- if (left >= len) {\r
- memcpy(buf, buffer+offset, len);\r
- offset += len;\r
- return len;\r
- } else {\r
- memcpy(buf, buffer+offset, left);\r
- offset = 0;\r
- bound = 0;\r
- return left;\r
- }\r
- }\r
-\r
- uint32_t write(char* buf, uint32_t len) {\r
- uint32_t left = bytes - offset;\r
- if (left >= len) {\r
- memcpy(buffer+offset, buf, len);\r
- offset += len;\r
- return len;\r
- } else {\r
- memcpy(buffer+offset, buf, left);\r
- offset = bytes;\r
- return left;\r
- }\r
- }\r
-\r
- bool full() {\r
- return offset == bytes;\r
- }\r
-\r
- bool over() {\r
- return offset == bound;\r
- }\r
-\r
- void clear() {\r
- offset = 0;\r
- bound = 0;\r
- }\r
-\r
- void post_srq(Infiniband *ib) {\r
- ib->post_chunk(this);\r
- }\r
-\r
- void set_owner(uint64_t o) {\r
- owner = o;\r
- }\r
-\r
- uint64_t get_owner() {\r
- return owner;\r
- }\r
-\r
- public:\r
- char* buffer;\r
- uint32_t bytes;\r
- uint32_t bound;\r
- uint32_t offset;\r
- ibv_mr* mr;\r
- uint64_t owner;\r
- };\r
-\r
- class Cluster {\r
- public:\r
- Cluster(MemoryManager& m, uint32_t s) : manager(m), chunk_size(s), lock("cluster_lock"){}\r
- Cluster(MemoryManager& m, uint32_t s, uint32_t n) : manager(m), chunk_size(s), lock("cluster_lock"){\r
- add(n);\r
- }\r
-\r
- ~Cluster() {\r
- set<Chunk*>::iterator c = all_chunks.begin();\r
- while(c != all_chunks.end()) {\r
- delete *c;\r
- ++c;\r
- }\r
- if (manager.enabled_huge_page)\r
- manager.free_huge_pages(base);\r
- else\r
- delete base;\r
- }\r
- int add(uint32_t num) {\r
- uint32_t bytes = chunk_size * num;\r
- //cihar* base = (char*)malloc(bytes);\r
- if (manager.enabled_huge_page) {\r
- base = (char*)manager.malloc_huge_pages(bytes);\r
- } else {\r
- base = (char*)memalign(CEPH_PAGE_SIZE, bytes);\r
- }\r
- assert(base);\r
- for (uint32_t offset = 0; offset < bytes; offset += chunk_size){\r
- ibv_mr* m = ibv_reg_mr(manager.pd->pd, base+offset, chunk_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);\r
- assert(m);\r
- Chunk* c = new Chunk(base+offset,chunk_size,m);\r
- free_chunks.push_back(c);\r
- all_chunks.insert(c);\r
- }\r
- return 0;\r
- }\r
-\r
- void take_back(Chunk* ck) {\r
- Mutex::Locker l(lock);\r
- free_chunks.push_back(ck);\r
- }\r
-\r
- int get_buffers(std::vector<Chunk*> &chunks, size_t bytes) {\r
- uint32_t num = bytes / chunk_size + 1;\r
- if (bytes % chunk_size == 0)\r
- --num;\r
- int r = num;\r
- Mutex::Locker l(lock);\r
- if (free_chunks.empty())\r
- return 0;\r
- if (!bytes) {\r
- free_chunks.swap(chunks);\r
- r = chunks.size();\r
- return r;\r
- }\r
- if (free_chunks.size() < num) {\r
- num = free_chunks.size();\r
- r = num;\r
- }\r
- for (uint32_t i = 0; i < num; ++i) {\r
- chunks.push_back(free_chunks.back());\r
- free_chunks.pop_back();\r
- }\r
- return r;\r
- }\r
- MemoryManager& manager;\r
- uint32_t chunk_size;\r
- Mutex lock;\r
- std::vector<Chunk*> free_chunks;\r
- std::set<Chunk*> all_chunks;\r
- char* base;\r
- };\r
-\r
- MemoryManager(Device *d, ProtectionDomain *p, bool hugepage): device(d), pd(p) {\r
- enabled_huge_page = hugepage;\r
- }\r
- ~MemoryManager() {\r
- if (channel)\r
- delete channel;\r
- if (send)\r
- delete send;\r
- }\r
- void* malloc_huge_pages(size_t size) {\r
- size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);\r
- char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);\r
- if (ptr == MAP_FAILED) {\r
- ptr = (char *)malloc(real_size);\r
- if (ptr == NULL) return NULL;\r
- real_size = 0;\r
- }\r
- *((size_t *)ptr) = real_size;\r
- return ptr + HUGE_PAGE_SIZE;\r
- }\r
- void free_huge_pages(void *ptr) {\r
- if (ptr == NULL) return;\r
- void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;\r
- size_t real_size = *((size_t *)real_ptr);\r
- assert(real_size % HUGE_PAGE_SIZE == 0);\r
- if (real_size != 0)\r
- munmap(real_ptr, real_size);\r
- else\r
- free(real_ptr);\r
- }\r
- void register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num) {\r
- assert(device);\r
- assert(pd);\r
- channel = new Cluster(*this, size);\r
- channel->add(rx_num);\r
-\r
- send = new Cluster(*this, size);\r
- send->add(tx_num);\r
- }\r
- void return_tx(std::vector<Chunk*> &chunks) {\r
- for (auto c : chunks) {\r
- c->clear();\r
- send->take_back(c);\r
- }\r
- }\r
-\r
- int get_send_buffers(std::vector<Chunk*> &c, size_t bytes) {\r
- return send->get_buffers(c, bytes);\r
- }\r
-\r
- int get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes) {\r
- return channel->get_buffers(chunks, bytes);\r
- }\r
-\r
- int is_tx_chunk(Chunk* c) { return send->all_chunks.count(c);}\r
- int is_rx_chunk(Chunk* c) { return channel->all_chunks.count(c);}\r
- bool enabled_huge_page;\r
- private:\r
- Cluster* channel;//RECV\r
- Cluster* send;// SEND\r
- Device *device;\r
- ProtectionDomain *pd;\r
- };\r
-\r
- private:\r
- uint32_t max_send_wr;\r
- uint32_t max_recv_wr;\r
- uint32_t max_sge;\r
- uint8_t ib_physical_port;\r
- MemoryManager* memory_manager;\r
- ibv_srq* srq; // shared receive work queue\r
- Device *device;\r
- ProtectionDomain *pd;\r
- DeviceList device_list;\r
- void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);\r
- void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);\r
-\r
- public:\r
- explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);\r
-\r
- /**\r
- * Destroy an Infiniband object.\r
- */\r
- ~Infiniband() {\r
- assert(ibv_destroy_srq(srq) == 0);\r
- delete memory_manager;\r
- delete pd;\r
- }\r
-\r
- class CompletionChannel {\r
- static const uint32_t MAX_ACK_EVENT = 5000;\r
- CephContext *cct;\r
- Infiniband& infiniband;\r
- ibv_comp_channel *channel;\r
- ibv_cq *cq;\r
- uint32_t cq_events_that_need_ack;\r
-\r
- public:\r
- CompletionChannel(CephContext *c, Infiniband &ib)\r
- : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) {}\r
- ~CompletionChannel();\r
- int init();\r
- bool get_cq_event();\r
- int get_fd() { return channel->fd; }\r
- ibv_comp_channel* get_channel() { return channel; }\r
- void bind_cq(ibv_cq *c) { cq = c; }\r
- void ack_events() {\r
- ibv_ack_cq_events(cq, cq_events_that_need_ack);\r
- cq_events_that_need_ack = 0;\r
- }\r
- };\r
-\r
- // this class encapsulates the creation, use, and destruction of an RC\r
- // completion queue.\r
- //\r
- // You need to call init and it will create a cq and associate to comp channel\r
- class CompletionQueue {\r
- public:\r
- CompletionQueue(CephContext *c, Infiniband &ib,\r
- const uint32_t qd, CompletionChannel *cc)\r
- : cct(c), infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {}\r
- ~CompletionQueue();\r
- int init();\r
- int poll_cq(int num_entries, ibv_wc *ret_wc_array);\r
-\r
- ibv_cq* get_cq() const { return cq; }\r
- int rearm_notify(bool solicited_only=true);\r
- CompletionChannel* get_cc() const { return channel; }\r
- private:\r
- CephContext *cct;\r
- Infiniband& infiniband; // Infiniband to which this QP belongs\r
- CompletionChannel *channel;\r
- ibv_cq *cq;\r
- uint32_t queue_depth;\r
- };\r
-\r
- // this class encapsulates the creation, use, and destruction of an RC\r
- // queue pair.\r
- //\r
- // you need call init and it will create a qp and bring it to the INIT state.\r
- // after obtaining the lid, qpn, and psn of a remote queue pair, one\r
- // must call plumb() to bring the queue pair to the RTS state.\r
- class QueuePair {\r
- public:\r
- QueuePair(CephContext *c, Infiniband& infiniband, ibv_qp_type type,\r
- int ib_physical_port, ibv_srq *srq,\r
- Infiniband::CompletionQueue* txcq,\r
- Infiniband::CompletionQueue* rxcq,\r
- uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0);\r
- ~QueuePair();\r
-\r
- int init();\r
-\r
- /**\r
- * Get the initial packet sequence number for this QueuePair.\r
- * This is randomly generated on creation. It should not be confused\r
- * with the remote side's PSN, which is set in #plumb(). \r
- */\r
- uint32_t get_initial_psn() const { return initial_psn; };\r
- /**\r
- * Get the local queue pair number for this QueuePair.\r
- * QPNs are analogous to UDP/TCP port numbers.\r
- */\r
- uint32_t get_local_qp_number() const { return qp->qp_num; };\r
- /**\r
- * Get the remote queue pair number for this QueuePair, as set in #plumb().\r
- * QPNs are analogous to UDP/TCP port numbers.\r
- */\r
- int get_remote_qp_number(uint32_t *rqp) const {\r
- ibv_qp_attr qpa;\r
- ibv_qp_init_attr qpia;\r
-\r
- int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);\r
- if (r) {\r
- lderr(cct) << __func__ << " failed to query qp: "\r
- << cpp_strerror(errno) << dendl;\r
- return -1;\r
- }\r
-\r
- if (rqp)\r
- *rqp = qpa.dest_qp_num;\r
- return 0;\r
- }\r
- /**\r
- * Get the remote infiniband address for this QueuePair, as set in #plumb().\r
- * LIDs are "local IDs" in infiniband terminology. They are short, locally\r
- * routable addresses.\r
- */\r
- int get_remote_lid(uint16_t *lid) const {\r
- ibv_qp_attr qpa;\r
- ibv_qp_init_attr qpia;\r
-\r
- int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);\r
- if (r) {\r
- lderr(cct) << __func__ << " failed to query qp: "\r
- << cpp_strerror(errno) << dendl;\r
- return -1;\r
- }\r
-\r
- if (lid)\r
- *lid = qpa.ah_attr.dlid;\r
- return 0;\r
- }\r
- /**\r
- * Get the state of a QueuePair.\r
- */\r
- int get_state() const {\r
- ibv_qp_attr qpa;\r
- ibv_qp_init_attr qpia;\r
-\r
- int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);\r
- if (r) {\r
- lderr(cct) << __func__ << " failed to get state: "\r
- << cpp_strerror(errno) << dendl;\r
- return -1;\r
- }\r
- return qpa.qp_state;\r
- }\r
- /**\r
- * Return true if the queue pair is in an error state, false otherwise.\r
- */\r
- bool is_error() const {\r
- ibv_qp_attr qpa;\r
- ibv_qp_init_attr qpia;\r
-\r
- int r = ibv_query_qp(qp, &qpa, -1, &qpia);\r
- if (r) {\r
- lderr(cct) << __func__ << " failed to get state: "\r
- << cpp_strerror(errno) << dendl;\r
- return true;\r
- }\r
- return qpa.cur_qp_state == IBV_QPS_ERR;\r
- }\r
- ibv_qp* get_qp() const { return qp; }\r
- Infiniband::CompletionQueue* get_tx_cq() const { return txcq; }\r
- Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; }\r
- int to_dead();\r
- bool is_dead() const { return dead; }\r
-\r
- private:\r
- CephContext *cct;\r
- Infiniband& infiniband; // Infiniband to which this QP belongs\r
- ibv_qp_type type; // QP type (IBV_QPT_RC, etc.)\r
- ibv_context* ctxt; // device context of the HCA to use\r
- int ib_physical_port;\r
- ibv_pd* pd; // protection domain\r
- ibv_srq* srq; // shared receive queue\r
- ibv_qp* qp; // infiniband verbs QP handle\r
- Infiniband::CompletionQueue* txcq;\r
- Infiniband::CompletionQueue* rxcq;\r
- uint32_t initial_psn; // initial packet sequence number\r
- uint32_t max_send_wr;\r
- uint32_t max_recv_wr;\r
- uint32_t q_key;\r
- bool dead;\r
- };\r
-\r
- public:\r
- typedef MemoryManager::Cluster Cluster;\r
- typedef MemoryManager::Chunk Chunk;\r
- QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type);\r
- ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);\r
- int post_chunk(Chunk* chunk);\r
- int post_channel_cluster();\r
- int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes) {\r
- return memory_manager->get_send_buffers(c, bytes);\r
- }\r
- CompletionChannel *create_comp_channel(CephContext *c);\r
- CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);\r
- uint8_t get_ib_physical_port() {\r
- return ib_physical_port;\r
- }\r
- int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);\r
- int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);\r
- uint16_t get_lid() { return device->get_lid(); }\r
- ibv_gid get_gid() { return device->get_gid(); }\r
- MemoryManager* get_memory_manager() { return memory_manager; }\r
- Device* get_device() { return device; }\r
- int get_async_fd() { return device->ctxt->async_fd; }\r
- int recall_chunk(Chunk* c) {\r
- if (memory_manager->is_rx_chunk(c)) {\r
- post_chunk(c); \r
- return 1;\r
- } else if (memory_manager->is_tx_chunk(c)) {\r
- vector<Chunk*> v;\r
- v.push_back(c);\r
- memory_manager->return_tx(v); \r
- return 2;\r
- }\r
- return -1;\r
- }\r
- int is_tx_chunk(Chunk* c) { return memory_manager->is_tx_chunk(c); }\r
- int is_rx_chunk(Chunk* c) { return memory_manager->is_rx_chunk(c); }\r
- static const char* wc_status_to_string(int status);\r
- static const char* qp_state_string(int status);\r
-};\r
-\r
-#endif\r
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_INFINIBAND_H
+#define CEPH_INFINIBAND_H
+
+#include <infiniband/verbs.h>
+
+#include <string>
+#include <vector>
+
+#include "include/int_types.h"
+#include "include/page.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "msg/msg_types.h"
+#include "msg/async/net_handler.h"
+#include "common/Mutex.h"
+
+#define HUGE_PAGE_SIZE (2 * 1024 * 1024)
+#define ALIGN_TO_PAGE_SIZE(x) \
+ (((x) + HUGE_PAGE_SIZE -1) / HUGE_PAGE_SIZE * HUGE_PAGE_SIZE)
+
+struct IBSYNMsg {
+ uint16_t lid;
+ uint32_t qpn;
+ uint32_t psn;
+ uint32_t peer_qpn;
+ union ibv_gid gid;
+} __attribute__((packed));
+
+class RDMAStack;
+class CephContext;
+
+class Port {
+ struct ibv_context* ctxt;
+ uint8_t port_num;
+ struct ibv_port_attr* port_attr;
+ uint16_t lid;
+ int gid_idx;
+ union ibv_gid gid;
+
+ public:
+ explicit Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn);
+ uint16_t get_lid() { return lid; }
+ ibv_gid get_gid() { return gid; }
+ uint8_t get_port_num() { return port_num; }
+ ibv_port_attr* get_port_attr() { return port_attr; }
+ int get_gid_idx() { return gid_idx; }
+};
+
+
+class Device {
+ ibv_device *device;
+ const char* name;
+ uint8_t port_cnt;
+ Port** ports;
+ public:
+ explicit Device(CephContext *c, ibv_device* d);
+ ~Device() {
+ for (uint8_t i = 0; i < port_cnt; ++i)
+ delete ports[i];
+ delete []ports;
+ assert(ibv_close_device(ctxt) == 0);
+ }
+ const char* get_name() { return name;}
+ uint16_t get_lid() { return active_port->get_lid(); }
+ ibv_gid get_gid() { return active_port->get_gid(); }
+ int get_gid_idx() { return active_port->get_gid_idx(); }
+ void binding_port(CephContext *c, uint8_t port_num);
+ struct ibv_context *ctxt;
+ ibv_device_attr *device_attr;
+ Port* active_port;
+};
+
+
+class DeviceList {
+ struct ibv_device ** device_list;
+ int num;
+ Device** devices;
+ public:
+ DeviceList(CephContext *cct): device_list(ibv_get_device_list(&num)) {
+ if (device_list == NULL || num == 0) {
+ lderr(cct) << __func__ << " failed to get rdma device list. " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+ devices = new Device*[num];
+
+ for (int i = 0;i < num; ++i) {
+ devices[i] = new Device(cct, device_list[i]);
+ }
+ }
+ ~DeviceList() {
+ for (int i=0; i < num; ++i) {
+ delete devices[i];
+ }
+ delete []devices;
+ ibv_free_device_list(device_list);
+ }
+
+ Device* get_device(const char* device_name) {
+ assert(devices);
+ for (int i = 0; i < num; ++i) {
+ if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) {
+ return devices[i];
+ }
+ }
+ return NULL;
+ }
+};
+
+
+class Infiniband {
+ public:
+ class ProtectionDomain {
+ public:
+ explicit ProtectionDomain(CephContext *cct, Device *device)
+ : pd(ibv_alloc_pd(device->ctxt))
+ {
+ if (pd == NULL) {
+ lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+ }
+ ~ProtectionDomain() {
+ int rc = ibv_dealloc_pd(pd);
+ assert(rc == 0);
+ }
+ ibv_pd* const pd;
+ };
+
+
+ class MemoryManager {
+ public:
+ class Chunk {
+ public:
+ Chunk(char* b, uint32_t len, ibv_mr* m) : buffer(b), bytes(len), offset(0), mr(m) {}
+ ~Chunk() {
+ assert(ibv_dereg_mr(mr) == 0);
+ }
+
+ void set_offset(uint32_t o) {
+ offset = o;
+ }
+
+ uint32_t get_offset() {
+ return offset;
+ }
+
+ void set_bound(uint32_t b) {
+ bound = b;
+ }
+
+ void prepare_read(uint32_t b) {
+ offset = 0;
+ bound = b;
+ }
+
+ uint32_t get_bound() {
+ return bound;
+ }
+
+ uint32_t read(char* buf, uint32_t len) {
+ uint32_t left = bound - offset;
+ if (left >= len) {
+ memcpy(buf, buffer+offset, len);
+ offset += len;
+ return len;
+ } else {
+ memcpy(buf, buffer+offset, left);
+ offset = 0;
+ bound = 0;
+ return left;
+ }
+ }
+
+ uint32_t write(char* buf, uint32_t len) {
+ uint32_t left = bytes - offset;
+ if (left >= len) {
+ memcpy(buffer+offset, buf, len);
+ offset += len;
+ return len;
+ } else {
+ memcpy(buffer+offset, buf, left);
+ offset = bytes;
+ return left;
+ }
+ }
+
+ bool full() {
+ return offset == bytes;
+ }
+
+ bool over() {
+ return offset == bound;
+ }
+
+ void clear() {
+ offset = 0;
+ bound = 0;
+ }
+
+ void post_srq(Infiniband *ib) {
+ ib->post_chunk(this);
+ }
+
+ void set_owner(uint64_t o) {
+ owner = o;
+ }
+
+ uint64_t get_owner() {
+ return owner;
+ }
+
+ public:
+ char* buffer;
+ uint32_t bytes;
+ uint32_t bound;
+ uint32_t offset;
+ ibv_mr* mr;
+ uint64_t owner;
+ };
+
+ class Cluster {
+ public:
+ Cluster(MemoryManager& m, uint32_t s) : manager(m), chunk_size(s), lock("cluster_lock"){}
+ Cluster(MemoryManager& m, uint32_t s, uint32_t n) : manager(m), chunk_size(s), lock("cluster_lock"){
+ add(n);
+ }
+
+ ~Cluster() {
+ set<Chunk*>::iterator c = all_chunks.begin();
+ while(c != all_chunks.end()) {
+ delete *c;
+ ++c;
+ }
+ if (manager.enabled_huge_page)
+ manager.free_huge_pages(base);
+ else
+ delete base;
+ }
+ int add(uint32_t num) {
+ uint32_t bytes = chunk_size * num;
+ //cihar* base = (char*)malloc(bytes);
+ if (manager.enabled_huge_page) {
+ base = (char*)manager.malloc_huge_pages(bytes);
+ } else {
+ base = (char*)memalign(CEPH_PAGE_SIZE, bytes);
+ }
+ assert(base);
+ for (uint32_t offset = 0; offset < bytes; offset += chunk_size){
+ ibv_mr* m = ibv_reg_mr(manager.pd->pd, base+offset, chunk_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
+ assert(m);
+ Chunk* c = new Chunk(base+offset,chunk_size,m);
+ free_chunks.push_back(c);
+ all_chunks.insert(c);
+ }
+ return 0;
+ }
+
+ void take_back(Chunk* ck) {
+ Mutex::Locker l(lock);
+ free_chunks.push_back(ck);
+ }
+
+ int get_buffers(std::vector<Chunk*> &chunks, size_t bytes) {
+ uint32_t num = bytes / chunk_size + 1;
+ if (bytes % chunk_size == 0)
+ --num;
+ int r = num;
+ Mutex::Locker l(lock);
+ if (free_chunks.empty())
+ return 0;
+ if (!bytes) {
+ free_chunks.swap(chunks);
+ r = chunks.size();
+ return r;
+ }
+ if (free_chunks.size() < num) {
+ num = free_chunks.size();
+ r = num;
+ }
+ for (uint32_t i = 0; i < num; ++i) {
+ chunks.push_back(free_chunks.back());
+ free_chunks.pop_back();
+ }
+ return r;
+ }
+ MemoryManager& manager;
+ uint32_t chunk_size;
+ Mutex lock;
+ std::vector<Chunk*> free_chunks;
+ std::set<Chunk*> all_chunks;
+ char* base;
+ };
+
+ MemoryManager(Device *d, ProtectionDomain *p, bool hugepage): device(d), pd(p) {
+ enabled_huge_page = hugepage;
+ }
+ ~MemoryManager() {
+ if (channel)
+ delete channel;
+ if (send)
+ delete send;
+ }
+ void* malloc_huge_pages(size_t size) {
+ size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);
+ char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);
+ if (ptr == MAP_FAILED) {
+ ptr = (char *)malloc(real_size);
+ if (ptr == NULL) return NULL;
+ real_size = 0;
+ }
+ *((size_t *)ptr) = real_size;
+ return ptr + HUGE_PAGE_SIZE;
+ }
+ void free_huge_pages(void *ptr) {
+ if (ptr == NULL) return;
+ void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;
+ size_t real_size = *((size_t *)real_ptr);
+ assert(real_size % HUGE_PAGE_SIZE == 0);
+ if (real_size != 0)
+ munmap(real_ptr, real_size);
+ else
+ free(real_ptr);
+ }
+ void register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num) {
+ assert(device);
+ assert(pd);
+ channel = new Cluster(*this, size);
+ channel->add(rx_num);
+
+ send = new Cluster(*this, size);
+ send->add(tx_num);
+ }
+ void return_tx(std::vector<Chunk*> &chunks) {
+ for (auto c : chunks) {
+ c->clear();
+ send->take_back(c);
+ }
+ }
+
+ int get_send_buffers(std::vector<Chunk*> &c, size_t bytes) {
+ return send->get_buffers(c, bytes);
+ }
+
+ int get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes) {
+ return channel->get_buffers(chunks, bytes);
+ }
+
+ int is_tx_chunk(Chunk* c) { return send->all_chunks.count(c);}
+ int is_rx_chunk(Chunk* c) { return channel->all_chunks.count(c);}
+ bool enabled_huge_page;
+ private:
+ Cluster* channel;//RECV
+ Cluster* send;// SEND
+ Device *device;
+ ProtectionDomain *pd;
+ };
+
+ private:
+ uint32_t max_send_wr;
+ uint32_t max_recv_wr;
+ uint32_t max_sge;
+ uint8_t ib_physical_port;
+ MemoryManager* memory_manager;
+ ibv_srq* srq; // shared receive work queue
+ Device *device;
+ ProtectionDomain *pd;
+ DeviceList device_list;
+ void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
+ void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
+
+ public:
+ explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);
+
+ /**
+ * Destroy an Infiniband object.
+ */
+ ~Infiniband() {
+ assert(ibv_destroy_srq(srq) == 0);
+ delete memory_manager;
+ delete pd;
+ }
+
+ class CompletionChannel {
+ static const uint32_t MAX_ACK_EVENT = 5000;
+ CephContext *cct;
+ Infiniband& infiniband;
+ ibv_comp_channel *channel;
+ ibv_cq *cq;
+ uint32_t cq_events_that_need_ack;
+
+ public:
+ CompletionChannel(CephContext *c, Infiniband &ib)
+ : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) {}
+ ~CompletionChannel();
+ int init();
+ bool get_cq_event();
+ int get_fd() { return channel->fd; }
+ ibv_comp_channel* get_channel() { return channel; }
+ void bind_cq(ibv_cq *c) { cq = c; }
+ void ack_events() {
+ ibv_ack_cq_events(cq, cq_events_that_need_ack);
+ cq_events_that_need_ack = 0;
+ }
+ };
+
+ // this class encapsulates the creation, use, and destruction of an RC
+ // completion queue.
+ //
+ // You need to call init and it will create a cq and associate to comp channel
+ class CompletionQueue {
+ public:
+ CompletionQueue(CephContext *c, Infiniband &ib,
+ const uint32_t qd, CompletionChannel *cc)
+ : cct(c), infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {}
+ ~CompletionQueue();
+ int init();
+ int poll_cq(int num_entries, ibv_wc *ret_wc_array);
+
+ ibv_cq* get_cq() const { return cq; }
+ int rearm_notify(bool solicited_only=true);
+ CompletionChannel* get_cc() const { return channel; }
+ private:
+ CephContext *cct;
+ Infiniband& infiniband; // Infiniband to which this QP belongs
+ CompletionChannel *channel;
+ ibv_cq *cq;
+ uint32_t queue_depth;
+ };
+
+ // this class encapsulates the creation, use, and destruction of an RC
+ // queue pair.
+ //
+ // you need call init and it will create a qp and bring it to the INIT state.
+ // after obtaining the lid, qpn, and psn of a remote queue pair, one
+ // must call plumb() to bring the queue pair to the RTS state.
+ class QueuePair {
+ public:
+ QueuePair(CephContext *c, Infiniband& infiniband, ibv_qp_type type,
+ int ib_physical_port, ibv_srq *srq,
+ Infiniband::CompletionQueue* txcq,
+ Infiniband::CompletionQueue* rxcq,
+ uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0);
+ ~QueuePair();
+
+ int init();
+
+ /**
+ * Get the initial packet sequence number for this QueuePair.
+ * This is randomly generated on creation. It should not be confused
+ * with the remote side's PSN, which is set in #plumb().
+ */
+ uint32_t get_initial_psn() const { return initial_psn; };
+ /**
+ * Get the local queue pair number for this QueuePair.
+ * QPNs are analogous to UDP/TCP port numbers.
+ */
+ uint32_t get_local_qp_number() const { return qp->qp_num; };
+ /**
+ * Get the remote queue pair number for this QueuePair, as set in #plumb().
+ * QPNs are analogous to UDP/TCP port numbers.
+ */
+ int get_remote_qp_number(uint32_t *rqp) const {
+ ibv_qp_attr qpa;
+ ibv_qp_init_attr qpia;
+
+ int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);
+ if (r) {
+ lderr(cct) << __func__ << " failed to query qp: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+
+ if (rqp)
+ *rqp = qpa.dest_qp_num;
+ return 0;
+ }
+ /**
+ * Get the remote infiniband address for this QueuePair, as set in #plumb().
+ * LIDs are "local IDs" in infiniband terminology. They are short, locally
+ * routable addresses.
+ */
+ int get_remote_lid(uint16_t *lid) const {
+ ibv_qp_attr qpa;
+ ibv_qp_init_attr qpia;
+
+ int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);
+ if (r) {
+ lderr(cct) << __func__ << " failed to query qp: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+
+ if (lid)
+ *lid = qpa.ah_attr.dlid;
+ return 0;
+ }
+ /**
+ * Get the state of a QueuePair.
+ */
+ int get_state() const {
+ ibv_qp_attr qpa;
+ ibv_qp_init_attr qpia;
+
+ int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);
+ if (r) {
+ lderr(cct) << __func__ << " failed to get state: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ return qpa.qp_state;
+ }
+ /**
+ * Return true if the queue pair is in an error state, false otherwise.
+ */
+ bool is_error() const {
+ ibv_qp_attr qpa;
+ ibv_qp_init_attr qpia;
+
+ int r = ibv_query_qp(qp, &qpa, -1, &qpia);
+ if (r) {
+ lderr(cct) << __func__ << " failed to get state: "
+ << cpp_strerror(errno) << dendl;
+ return true;
+ }
+ return qpa.cur_qp_state == IBV_QPS_ERR;
+ }
+ ibv_qp* get_qp() const { return qp; }
+ Infiniband::CompletionQueue* get_tx_cq() const { return txcq; }
+ Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; }
+ int to_dead();
+ bool is_dead() const { return dead; }
+
+ private:
+ CephContext *cct;
+ Infiniband& infiniband; // Infiniband to which this QP belongs
+ ibv_qp_type type; // QP type (IBV_QPT_RC, etc.)
+ ibv_context* ctxt; // device context of the HCA to use
+ int ib_physical_port;
+ ibv_pd* pd; // protection domain
+ ibv_srq* srq; // shared receive queue
+ ibv_qp* qp; // infiniband verbs QP handle
+ Infiniband::CompletionQueue* txcq;
+ Infiniband::CompletionQueue* rxcq;
+ uint32_t initial_psn; // initial packet sequence number
+ uint32_t max_send_wr;
+ uint32_t max_recv_wr;
+ uint32_t q_key;
+ bool dead;
+ };
+
+ public:
+ typedef MemoryManager::Cluster Cluster;
+ typedef MemoryManager::Chunk Chunk;
+ QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type);
+ ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
+ int post_chunk(Chunk* chunk);
+ int post_channel_cluster();
+ int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes) {
+ return memory_manager->get_send_buffers(c, bytes);
+ }
+ CompletionChannel *create_comp_channel(CephContext *c);
+ CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
+ uint8_t get_ib_physical_port() {
+ return ib_physical_port;
+ }
+ int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
+ int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
+ uint16_t get_lid() { return device->get_lid(); }
+ ibv_gid get_gid() { return device->get_gid(); }
+ MemoryManager* get_memory_manager() { return memory_manager; }
+ Device* get_device() { return device; }
+ int get_async_fd() { return device->ctxt->async_fd; }
+ int recall_chunk(Chunk* c) {
+ if (memory_manager->is_rx_chunk(c)) {
+ post_chunk(c);
+ return 1;
+ } else if (memory_manager->is_tx_chunk(c)) {
+ vector<Chunk*> v;
+ v.push_back(c);
+ memory_manager->return_tx(v);
+ return 2;
+ }
+ return -1;
+ }
+ int is_tx_chunk(Chunk* c) { return memory_manager->is_tx_chunk(c); }
+ int is_rx_chunk(Chunk* c) { return memory_manager->is_rx_chunk(c); }
+ static const char* wc_status_to_string(int status);
+ static const char* qp_state_string(int status);
+};
+
+#endif
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- \r
-// vim: ts=8 sw=2 smarttab\r
-/*\r
- * Ceph - scalable distributed file system\r
- *\r
- * Copyright (C) 2016 XSKY <haomai@xsky.com>\r
- *\r
- * Author: Haomai Wang <haomaiwang@gmail.com>\r
- *\r
- * This is free software; you can redistribute it and/or\r
- * modify it under the terms of the GNU Lesser General Public\r
- * License version 2.1, as published by the Free Software\r
- * Foundation. See file COPYING.\r
- *\r
- */\r
-\r
-#include "RDMAStack.h"\r
-\r
-#define dout_subsys ceph_subsys_ms\r
-#undef dout_prefix\r
-#define dout_prefix *_dout << " RDMAConnectedSocketImpl "\r
-\r
-int RDMAConnectedSocketImpl::activate()\r
-{\r
- ibv_qp_attr qpa;\r
- int r;\r
-\r
- // now connect up the qps and switch to RTR\r
- memset(&qpa, 0, sizeof(qpa));\r
- qpa.qp_state = IBV_QPS_RTR;\r
- qpa.path_mtu = IBV_MTU_1024;\r
- qpa.dest_qp_num = peer_msg.qpn;\r
- qpa.rq_psn = peer_msg.psn;\r
- qpa.max_dest_rd_atomic = 1;\r
- qpa.min_rnr_timer = 12;\r
- //qpa.ah_attr.is_global = 0;\r
- qpa.ah_attr.is_global = 1;\r
- qpa.ah_attr.grh.hop_limit = 6;\r
- qpa.ah_attr.grh.dgid = peer_msg.gid;\r
-\r
- qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx();\r
-\r
- qpa.ah_attr.dlid = peer_msg.lid;\r
- qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;\r
- qpa.ah_attr.src_path_bits = 0;\r
- qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port());\r
-\r
- ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;\r
-\r
- r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |\r
- IBV_QP_AV |\r
- IBV_QP_PATH_MTU |\r
- IBV_QP_DEST_QPN |\r
- IBV_QP_RQ_PSN |\r
- IBV_QP_MIN_RNR_TIMER |\r
- IBV_QP_MAX_DEST_RD_ATOMIC);\r
- if (r) {\r
- lderr(cct) << __func__ << " failed to transition to RTR state: "\r
- << cpp_strerror(errno) << dendl;\r
- return -1;\r
- }\r
-\r
- ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl;\r
-\r
- // now move to RTS\r
- qpa.qp_state = IBV_QPS_RTS;\r
-\r
- // How long to wait before retrying if packet lost or server dead.\r
- // Supposedly the timeout is 4.096us*2^timeout. However, the actual\r
- // timeout appears to be 4.096us*2^(timeout+1), so the setting\r
- // below creates a 135ms timeout.\r
- qpa.timeout = 14;\r
-\r
- // How many times to retry after timeouts before giving up.\r
- qpa.retry_cnt = 7;\r
-\r
- // How many times to retry after RNR (receiver not ready) condition\r
- // before giving up. Occurs when the remote side has not yet posted\r
- // a receive request.\r
- qpa.rnr_retry = 7; // 7 is infinite retry.\r
- qpa.sq_psn = my_msg.psn;\r
- qpa.max_rd_atomic = 1;\r
-\r
- r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |\r
- IBV_QP_TIMEOUT |\r
- IBV_QP_RETRY_CNT |\r
- IBV_QP_RNR_RETRY |\r
- IBV_QP_SQ_PSN |\r
- IBV_QP_MAX_QP_RD_ATOMIC);\r
- if (r) {\r
- lderr(cct) << __func__ << " failed to transition to RTS state: "\r
- << cpp_strerror(errno) << dendl;\r
- return -1;\r
- }\r
-\r
- // the queue pair should be ready to use once the client has finished\r
- // setting up their end.\r
- ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;\r
- ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl;\r
-\r
- if (!is_server) {\r
- connected = 1; //indicate successfully\r
- ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl;\r
- submit(false);\r
- }\r
- active = true;\r
-\r
- return 0;\r
-}\r
-\r
-int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {\r
- ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"\r
- << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;\r
- NetHandler net(cct);\r
- tcp_fd = net.connect(peer_addr);\r
-\r
- if (tcp_fd < 0) {\r
- return -errno;\r
- }\r
- net.set_close_on_exec(tcp_fd);\r
-\r
- int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size);\r
- if (r < 0) {\r
- ::close(tcp_fd);\r
- return -errno;\r
- }\r
-\r
- ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;\r
- net.set_priority(tcp_fd, opts.priority);\r
- my_msg.peer_qpn = 0;\r
- r = infiniband->send_msg(cct, tcp_fd, my_msg);\r
- if (r < 0)\r
- return r;\r
-\r
- worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);\r
- return 0;\r
-}\r
-\r
-void RDMAConnectedSocketImpl::handle_connection() {\r
- ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl;\r
- int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);\r
- if (r < 0) {\r
- if (r != -EAGAIN)\r
- fault();\r
- return;\r
- }\r
-\r
- if (!is_server) {// syn + ack from server\r
- my_msg.peer_qpn = peer_msg.qpn;\r
- ldout(cct, 20) << __func__ << " peer msg : < " << peer_msg.qpn << ", " << peer_msg.psn\r
- << ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl;\r
- if (!connected) {\r
- r = activate();\r
- assert(!r);\r
- }\r
- notify();\r
- r = infiniband->send_msg(cct, tcp_fd, my_msg);\r
- if (r < 0) {\r
- ldout(cct, 1) << __func__ << " send client ack failed." << dendl;\r
- fault();\r
- }\r
- } else {\r
- if (peer_msg.peer_qpn == 0) {// syn from client\r
- if (active) {\r
- ldout(cct, 10) << __func__ << " server is already active." << dendl;\r
- return ;\r
- }\r
- r = infiniband->send_msg(cct, tcp_fd, my_msg);\r
- if (r < 0) {\r
- ldout(cct, 1) << __func__ << " server ack failed." << dendl;\r
- fault();\r
- return ;\r
- }\r
- r = activate();\r
- assert(!r);\r
- } else { // ack from client\r
- connected = 1;\r
- cleanup();\r
- submit(false);\r
- notify();\r
- }\r
- }\r
-}\r
-\r
-ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)\r
-{\r
- uint64_t i = 0;\r
- int r = ::read(notify_fd, &i, sizeof(i));\r
- ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl;\r
- if (error)\r
- return -error;\r
- ssize_t read = 0;\r
- if (!buffers.empty())\r
- read = read_buffers(buf,len);\r
-\r
- std::vector<ibv_wc> cqe;\r
- get_wc(cqe);\r
- if (cqe.empty())\r
- return read == 0 ? -EAGAIN : read;\r
-\r
- ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;\r
- for (size_t i = 0; i < cqe.size(); ++i) {\r
- ibv_wc* response = &cqe[i];\r
- assert(response->status == IBV_WC_SUCCESS);\r
- Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);\r
- ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;\r
- chunk->prepare_read(response->byte_len);\r
- if (response->byte_len == 0) {\r
- if (connected) {\r
- error = ECONNRESET;\r
- assert(infiniband->post_chunk(chunk) == 0);\r
- ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;\r
- }\r
- break;\r
- }\r
- //assert(response->byte_len);\r
- if (read == (ssize_t)len) {\r
- buffers.push_back(chunk);\r
- ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;\r
- } else if (read + response->byte_len > (ssize_t)len) {\r
- read += chunk->read(buf+read, (ssize_t)len-read);\r
- buffers.push_back(chunk);\r
- ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;\r
- } else {\r
- read += chunk->read(buf+read, response->byte_len);\r
- assert(infiniband->post_chunk(chunk) == 0);\r
- }\r
- }\r
-\r
- if (is_server && connected == 0) {\r
- ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;\r
- connected = 1; //if so, we don't need the last handshake\r
- cleanup();\r
- submit(false);\r
- }\r
-\r
- if (read == 0 && error)\r
- return -error;\r
- return read == 0 ? -EAGAIN : read;\r
-}\r
-\r
-ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)\r
-{\r
- size_t read = 0, tmp = 0;\r
- vector<Chunk*>::iterator c = buffers.begin();\r
- for (; c != buffers.end() ; ++c) {\r
- tmp = (*c)->read(buf+read, len-read);\r
- read += tmp;\r
- ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl;\r
- if ((*c)->over()) {\r
- assert(infiniband->post_chunk(*c) == 0);\r
- ldout(cct, 25) << __func__ << " one chunk over." << dendl;\r
- }\r
- if (read == len) {\r
- break;\r
- }\r
- }\r
-\r
- if (c != buffers.end() && (*c)->over())\r
- c++;\r
- buffers.erase(buffers.begin(), c);\r
- ldout(cct, 25) << __func__ << " got " << read << " bytes, buffers size: " << buffers.size() << dendl;\r
- return read;\r
-}\r
-\r
-ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)\r
-{\r
- if (error)\r
- return -error;\r
- static const int MAX_COMPLETIONS = 16;\r
- ibv_wc wc[MAX_COMPLETIONS];\r
- ssize_t size = 0;\r
-\r
- ibv_wc* response;\r
- Chunk* chunk;\r
- bool loaded = false;\r
- auto iter = buffers.begin();\r
- if (iter != buffers.end()) {\r
- chunk = *iter;\r
- // FIXME need to handle release\r
- // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
- buffers.erase(iter);\r
- loaded = true;\r
- size = chunk->bound;\r
- }\r
-\r
- std::vector<ibv_wc> cqe;\r
- get_wc(cqe);\r
- if (cqe.empty())\r
- return size == 0 ? -EAGAIN : size;\r
-\r
- ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl;\r
-\r
- for (size_t i = 0; i < cqe.size(); ++i) {\r
- response = &wc[i];\r
- chunk = reinterpret_cast<Chunk*>(response->wr_id);\r
- chunk->prepare_read(response->byte_len);\r
- if (!loaded && i == 0) {\r
- // FIXME need to handle release\r
- // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
- size = chunk->bound;\r
- continue;\r
- }\r
- buffers.push_back(chunk);\r
- iter++;\r
- }\r
-\r
- if (size == 0)\r
- return -EAGAIN;\r
- return size;\r
-}\r
-\r
-ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)\r
-{\r
- if (error) {\r
- if (!active)\r
- return -EPIPE;\r
- return -error;\r
- }\r
- size_t bytes = bl.length();\r
- if (!bytes)\r
- return 0;\r
- {\r
- Mutex::Locker l(lock);\r
- pending_bl.claim_append(bl);\r
- if (!connected) {\r
- ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;\r
- return bytes;\r
- }\r
- }\r
- ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl;\r
- ssize_t r = submit(more);\r
- if (r < 0 && r != -EAGAIN)\r
- return r;\r
- return bytes;\r
-}\r
-\r
-ssize_t RDMAConnectedSocketImpl::submit(bool more)\r
-{\r
- if (error)\r
- return -error;\r
- Mutex::Locker l(lock);\r
- std::vector<Chunk*> tx_buffers;\r
- size_t bytes = pending_bl.length();\r
- ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "\r
- << pending_bl.buffers().size() << dendl;\r
- if (!bytes)\r
- return 0;\r
-\r
- int ret = worker->reserve_message_buffer(this, tx_buffers, bytes);\r
- if (ret == 0) {\r
- ldout(cct, 10) << __func__ << " no enough buffers in worker " << worker << dendl;\r
- return -EAGAIN; // that is ok , cause send will return bytes. == 0 enough buffers, < 0 no buffer, >0 not enough\r
- }\r
- vector<Chunk*>::iterator current_buffer = tx_buffers.begin();\r
- list<bufferptr>::const_iterator it = pending_bl.buffers().begin();\r
- unsigned total = 0;\r
- while (it != pending_bl.buffers().end()) {\r
- const uintptr_t addr = reinterpret_cast<const uintptr_t>(it->c_str());\r
- unsigned copied = 0;\r
- while (copied < it->length()) {\r
- uint32_t r = (*current_buffer)->write((char*)addr+copied, it->length() - copied);\r
- copied += r;\r
- total += r;\r
- if ((*current_buffer)->full()){\r
- ++current_buffer;\r
- if (current_buffer == tx_buffers.end())\r
- goto sending;\r
- }\r
- }\r
- ++it;\r
- }\r
-\r
- sending:\r
- assert(total <= pending_bl.length());\r
- bufferlist swapped;\r
- if (total < pending_bl.length()) {\r
- pending_bl.splice(total, pending_bl.length()-total, &swapped);\r
- pending_bl.swap(swapped);\r
- } else {\r
- pending_bl.clear();\r
- }\r
-\r
- ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "\r
- << pending_bl.buffers().size() << dendl;\r
-\r
- int r = post_work_request(tx_buffers);\r
- if (r < 0)\r
- return r;\r
-\r
- ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl;\r
- return bytes;\r
-}\r
-\r
-int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)\r
-{\r
- ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl;\r
- vector<Chunk*>::iterator current_buffer = tx_buffers.begin();\r
- ibv_sge isge[tx_buffers.size()];\r
- uint32_t current_sge = 0;\r
- ibv_send_wr iswr[tx_buffers.size()];\r
- uint32_t current_swr = 0;\r
- ibv_send_wr* pre_wr = NULL;\r
-\r
- memset(iswr, 0, sizeof(iswr));\r
- memset(isge, 0, sizeof(isge));\r
- current_buffer = tx_buffers.begin();\r
- while (current_buffer != tx_buffers.end()) {\r
- isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer);\r
- isge[current_sge].length = (*current_buffer)->get_offset();\r
- isge[current_sge].lkey = (*current_buffer)->mr->lkey;\r
- ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl;\r
-\r
- iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer);\r
- iswr[current_swr].next = NULL;\r
- iswr[current_swr].sg_list = &isge[current_sge];\r
- iswr[current_swr].num_sge = 1;\r
- iswr[current_swr].opcode = IBV_WR_SEND;\r
- iswr[current_swr].send_flags = IBV_SEND_SIGNALED;\r
- /*if (isge[current_sge].length < infiniband->max_inline_data) {\r
- iswr[current_swr].send_flags = IBV_SEND_INLINE;\r
- ldout(cct, 20) << __func__ << " send_inline." << dendl;\r
- }*/\r
-\r
- if (pre_wr)\r
- pre_wr->next = &iswr[current_swr];\r
- pre_wr = &iswr[current_swr];\r
- ++current_sge;\r
- ++current_swr;\r
- ++current_buffer;\r
- }\r
-\r
- ibv_send_wr *bad_tx_work_request;\r
- if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {\r
- lderr(cct) << __func__ << " failed to send data"\r
- << " (most probably should be peer not ready): "\r
- << cpp_strerror(errno) << dendl;\r
- return -errno;\r
- }\r
- ldout(cct, 20) << __func__ << " qp state is : " << Infiniband::qp_state_string(qp->get_state()) << dendl;\r
- return 0;\r
-}\r
-\r
-void RDMAConnectedSocketImpl::fin() {\r
- ibv_send_wr wr;\r
- memset(&wr, 0, sizeof(wr));\r
- wr.wr_id = reinterpret_cast<uint64_t>(qp);\r
- wr.num_sge = 0;\r
- wr.opcode = IBV_WR_SEND;\r
- wr.send_flags = IBV_SEND_SIGNALED;\r
- ibv_send_wr* bad_tx_work_request;\r
- if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {\r
- lderr(cct) << __func__ << " failed to send message="\r
- << " ibv_post_send failed(most probably should be peer not ready): "\r
- << cpp_strerror(errno) << dendl;\r
- return ;\r
- }\r
-}\r
-\r
-void RDMAConnectedSocketImpl::cleanup() {\r
- if (con_handler) {\r
- (static_cast<C_handle_connection*>(con_handler))->close();\r
- worker->center.submit_to(worker->center.get_id(), [this]() {\r
- worker->center.delete_file_event(tcp_fd, EVENT_READABLE);\r
- }, false);\r
- delete con_handler;\r
- con_handler = nullptr;\r
- }\r
-}\r
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "RDMAStack.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << " RDMAConnectedSocketImpl "
+
+int RDMAConnectedSocketImpl::activate()
+{
+ ibv_qp_attr qpa;
+ int r;
+
+ // now connect up the qps and switch to RTR
+ memset(&qpa, 0, sizeof(qpa));
+ qpa.qp_state = IBV_QPS_RTR;
+ qpa.path_mtu = IBV_MTU_1024;
+ qpa.dest_qp_num = peer_msg.qpn;
+ qpa.rq_psn = peer_msg.psn;
+ qpa.max_dest_rd_atomic = 1;
+ qpa.min_rnr_timer = 12;
+ //qpa.ah_attr.is_global = 0;
+ qpa.ah_attr.is_global = 1;
+ qpa.ah_attr.grh.hop_limit = 6;
+ qpa.ah_attr.grh.dgid = peer_msg.gid;
+
+ qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx();
+
+ qpa.ah_attr.dlid = peer_msg.lid;
+ qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
+ qpa.ah_attr.src_path_bits = 0;
+ qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port());
+
+ ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
+
+ r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
+ IBV_QP_AV |
+ IBV_QP_PATH_MTU |
+ IBV_QP_DEST_QPN |
+ IBV_QP_RQ_PSN |
+ IBV_QP_MIN_RNR_TIMER |
+ IBV_QP_MAX_DEST_RD_ATOMIC);
+ if (r) {
+ lderr(cct) << __func__ << " failed to transition to RTR state: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+
+ ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl;
+
+ // now move to RTS
+ qpa.qp_state = IBV_QPS_RTS;
+
+ // How long to wait before retrying if packet lost or server dead.
+ // Supposedly the timeout is 4.096us*2^timeout. However, the actual
+ // timeout appears to be 4.096us*2^(timeout+1), so the setting
+ // below creates a 135ms timeout.
+ qpa.timeout = 14;
+
+ // How many times to retry after timeouts before giving up.
+ qpa.retry_cnt = 7;
+
+ // How many times to retry after RNR (receiver not ready) condition
+ // before giving up. Occurs when the remote side has not yet posted
+ // a receive request.
+ qpa.rnr_retry = 7; // 7 is infinite retry.
+ qpa.sq_psn = my_msg.psn;
+ qpa.max_rd_atomic = 1;
+
+ r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
+ IBV_QP_TIMEOUT |
+ IBV_QP_RETRY_CNT |
+ IBV_QP_RNR_RETRY |
+ IBV_QP_SQ_PSN |
+ IBV_QP_MAX_QP_RD_ATOMIC);
+ if (r) {
+ lderr(cct) << __func__ << " failed to transition to RTS state: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+
+ // the queue pair should be ready to use once the client has finished
+ // setting up their end.
+ ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;
+ ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl;
+
+ if (!is_server) {
+ connected = 1; //indicate successfully
+ ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl;
+ submit(false);
+ }
+ active = true;
+
+ return 0;
+}
+
+int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
+ ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
+ << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
+ NetHandler net(cct);
+ tcp_fd = net.connect(peer_addr);
+
+ if (tcp_fd < 0) {
+ return -errno;
+ }
+ net.set_close_on_exec(tcp_fd);
+
+ int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size);
+ if (r < 0) {
+ ::close(tcp_fd);
+ return -errno;
+ }
+
+ ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
+ net.set_priority(tcp_fd, opts.priority);
+ my_msg.peer_qpn = 0;
+ r = infiniband->send_msg(cct, tcp_fd, my_msg);
+ if (r < 0)
+ return r;
+
+ worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
+ return 0;
+}
+
+void RDMAConnectedSocketImpl::handle_connection() {
+ ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl;
+ int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);
+ if (r < 0) {
+ if (r != -EAGAIN)
+ fault();
+ return;
+ }
+
+ if (!is_server) {// syn + ack from server
+ my_msg.peer_qpn = peer_msg.qpn;
+ ldout(cct, 20) << __func__ << " peer msg : < " << peer_msg.qpn << ", " << peer_msg.psn
+ << ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl;
+ if (!connected) {
+ r = activate();
+ assert(!r);
+ }
+ notify();
+ r = infiniband->send_msg(cct, tcp_fd, my_msg);
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
+ fault();
+ }
+ } else {
+ if (peer_msg.peer_qpn == 0) {// syn from client
+ if (active) {
+ ldout(cct, 10) << __func__ << " server is already active." << dendl;
+ return ;
+ }
+ r = infiniband->send_msg(cct, tcp_fd, my_msg);
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " server ack failed." << dendl;
+ fault();
+ return ;
+ }
+ r = activate();
+ assert(!r);
+ } else { // ack from client
+ connected = 1;
+ cleanup();
+ submit(false);
+ notify();
+ }
+ }
+}
+
+ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
+{
+ uint64_t i = 0;
+ int r = ::read(notify_fd, &i, sizeof(i));
+ ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl;
+ if (error)
+ return -error;
+ ssize_t read = 0;
+ if (!buffers.empty())
+ read = read_buffers(buf,len);
+
+ std::vector<ibv_wc> cqe;
+ get_wc(cqe);
+ if (cqe.empty())
+ return read == 0 ? -EAGAIN : read;
+
+ ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;
+ for (size_t i = 0; i < cqe.size(); ++i) {
+ ibv_wc* response = &cqe[i];
+ assert(response->status == IBV_WC_SUCCESS);
+ Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+ ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;
+ chunk->prepare_read(response->byte_len);
+ if (response->byte_len == 0) {
+ if (connected) {
+ error = ECONNRESET;
+ assert(infiniband->post_chunk(chunk) == 0);
+ ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
+ }
+ break;
+ }
+ //assert(response->byte_len);
+ if (read == (ssize_t)len) {
+ buffers.push_back(chunk);
+ ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;
+ } else if (read + response->byte_len > (ssize_t)len) {
+ read += chunk->read(buf+read, (ssize_t)len-read);
+ buffers.push_back(chunk);
+ ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
+ } else {
+ read += chunk->read(buf+read, response->byte_len);
+ assert(infiniband->post_chunk(chunk) == 0);
+ }
+ }
+
+ if (is_server && connected == 0) {
+ ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
+ connected = 1; //if so, we don't need the last handshake
+ cleanup();
+ submit(false);
+ }
+
+ if (read == 0 && error)
+ return -error;
+ return read == 0 ? -EAGAIN : read;
+}
+
+ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
+{
+ size_t read = 0, tmp = 0;
+ vector<Chunk*>::iterator c = buffers.begin();
+ for (; c != buffers.end() ; ++c) {
+ tmp = (*c)->read(buf+read, len-read);
+ read += tmp;
+ ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl;
+ if ((*c)->over()) {
+ assert(infiniband->post_chunk(*c) == 0);
+ ldout(cct, 25) << __func__ << " one chunk over." << dendl;
+ }
+ if (read == len) {
+ break;
+ }
+ }
+
+ if (c != buffers.end() && (*c)->over())
+ c++;
+ buffers.erase(buffers.begin(), c);
+ ldout(cct, 25) << __func__ << " got " << read << " bytes, buffers size: " << buffers.size() << dendl;
+ return read;
+}
+
+ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
+{
+ if (error)
+ return -error;
+ static const int MAX_COMPLETIONS = 16;
+ ibv_wc wc[MAX_COMPLETIONS];
+ ssize_t size = 0;
+
+ ibv_wc* response;
+ Chunk* chunk;
+ bool loaded = false;
+ auto iter = buffers.begin();
+ if (iter != buffers.end()) {
+ chunk = *iter;
+ // FIXME need to handle release
+ // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
+ buffers.erase(iter);
+ loaded = true;
+ size = chunk->bound;
+ }
+
+ std::vector<ibv_wc> cqe;
+ get_wc(cqe);
+ if (cqe.empty())
+ return size == 0 ? -EAGAIN : size;
+
+ ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl;
+
+ for (size_t i = 0; i < cqe.size(); ++i) {
+ response = &wc[i];
+ chunk = reinterpret_cast<Chunk*>(response->wr_id);
+ chunk->prepare_read(response->byte_len);
+ if (!loaded && i == 0) {
+ // FIXME need to handle release
+ // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
+ size = chunk->bound;
+ continue;
+ }
+ buffers.push_back(chunk);
+ iter++;
+ }
+
+ if (size == 0)
+ return -EAGAIN;
+ return size;
+}
+
+ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
+{
+ if (error) {
+ if (!active)
+ return -EPIPE;
+ return -error;
+ }
+ size_t bytes = bl.length();
+ if (!bytes)
+ return 0;
+ {
+ Mutex::Locker l(lock);
+ pending_bl.claim_append(bl);
+ if (!connected) {
+ ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;
+ return bytes;
+ }
+ }
+ ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl;
+ ssize_t r = submit(more);
+ if (r < 0 && r != -EAGAIN)
+ return r;
+ return bytes;
+}
+
+ssize_t RDMAConnectedSocketImpl::submit(bool more)
+{
+ if (error)
+ return -error;
+ Mutex::Locker l(lock);
+ std::vector<Chunk*> tx_buffers;
+ size_t bytes = pending_bl.length();
+ ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
+ << pending_bl.buffers().size() << dendl;
+ if (!bytes)
+ return 0;
+
+ int ret = worker->reserve_message_buffer(this, tx_buffers, bytes);
+ if (ret == 0) {
+ ldout(cct, 10) << __func__ << " no enough buffers in worker " << worker << dendl;
+ return -EAGAIN; // that is ok , cause send will return bytes. == 0 enough buffers, < 0 no buffer, >0 not enough
+ }
+ vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
+ list<bufferptr>::const_iterator it = pending_bl.buffers().begin();
+ unsigned total = 0;
+ while (it != pending_bl.buffers().end()) {
+ const uintptr_t addr = reinterpret_cast<const uintptr_t>(it->c_str());
+ unsigned copied = 0;
+ while (copied < it->length()) {
+ uint32_t r = (*current_buffer)->write((char*)addr+copied, it->length() - copied);
+ copied += r;
+ total += r;
+ if ((*current_buffer)->full()){
+ ++current_buffer;
+ if (current_buffer == tx_buffers.end())
+ goto sending;
+ }
+ }
+ ++it;
+ }
+
+ sending:
+ assert(total <= pending_bl.length());
+ bufferlist swapped;
+ if (total < pending_bl.length()) {
+ pending_bl.splice(total, pending_bl.length()-total, &swapped);
+ pending_bl.swap(swapped);
+ } else {
+ pending_bl.clear();
+ }
+
+ ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "
+ << pending_bl.buffers().size() << dendl;
+
+ int r = post_work_request(tx_buffers);
+ if (r < 0)
+ return r;
+
+ ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl;
+ return bytes;
+}
+
+int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
+{
+ ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl;
+ vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
+ ibv_sge isge[tx_buffers.size()];
+ uint32_t current_sge = 0;
+ ibv_send_wr iswr[tx_buffers.size()];
+ uint32_t current_swr = 0;
+ ibv_send_wr* pre_wr = NULL;
+
+ memset(iswr, 0, sizeof(iswr));
+ memset(isge, 0, sizeof(isge));
+ current_buffer = tx_buffers.begin();
+ while (current_buffer != tx_buffers.end()) {
+ isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer);
+ isge[current_sge].length = (*current_buffer)->get_offset();
+ isge[current_sge].lkey = (*current_buffer)->mr->lkey;
+ ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl;
+
+ iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer);
+ iswr[current_swr].next = NULL;
+ iswr[current_swr].sg_list = &isge[current_sge];
+ iswr[current_swr].num_sge = 1;
+ iswr[current_swr].opcode = IBV_WR_SEND;
+ iswr[current_swr].send_flags = IBV_SEND_SIGNALED;
+ /*if (isge[current_sge].length < infiniband->max_inline_data) {
+ iswr[current_swr].send_flags = IBV_SEND_INLINE;
+ ldout(cct, 20) << __func__ << " send_inline." << dendl;
+ }*/
+
+ if (pre_wr)
+ pre_wr->next = &iswr[current_swr];
+ pre_wr = &iswr[current_swr];
+ ++current_sge;
+ ++current_swr;
+ ++current_buffer;
+ }
+
+ ibv_send_wr *bad_tx_work_request;
+ if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {
+ lderr(cct) << __func__ << " failed to send data"
+ << " (most probably should be peer not ready): "
+ << cpp_strerror(errno) << dendl;
+ return -errno;
+ }
+ ldout(cct, 20) << __func__ << " qp state is : " << Infiniband::qp_state_string(qp->get_state()) << dendl;
+ return 0;
+}
+
+void RDMAConnectedSocketImpl::fin() {
+ ibv_send_wr wr;
+ memset(&wr, 0, sizeof(wr));
+ wr.wr_id = reinterpret_cast<uint64_t>(qp);
+ wr.num_sge = 0;
+ wr.opcode = IBV_WR_SEND;
+ wr.send_flags = IBV_SEND_SIGNALED;
+ ibv_send_wr* bad_tx_work_request;
+ if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {
+ lderr(cct) << __func__ << " failed to send message="
+ << " ibv_post_send failed(most probably should be peer not ready): "
+ << cpp_strerror(errno) << dendl;
+ return ;
+ }
+}
+
+void RDMAConnectedSocketImpl::cleanup() {
+ if (con_handler) {
+ (static_cast<C_handle_connection*>(con_handler))->close();
+ worker->center.submit_to(worker->center.get_id(), [this]() {
+ worker->center.delete_file_event(tcp_fd, EVENT_READABLE);
+ }, false);
+ delete con_handler;
+ con_handler = nullptr;
+ }
+}
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- \r
-// vim: ts=8 sw=2 smarttab\r
-/*\r
- * Ceph - scalable distributed file system\r
- *\r
- * Copyright (C) 2016 XSKY <haomai@xsky.com>\r
- *\r
- * Author: Haomai Wang <haomaiwang@gmail.com>\r
- *\r
- * This is free software; you can redistribute it and/or\r
- * modify it under the terms of the GNU Lesser General Public\r
- * License version 2.1, as published by the Free Software\r
- * Foundation. See file COPYING.\r
- *\r
- */\r
-\r
-#include "msg/async/net_handler.h"\r
-#include "RDMAStack.h"\r
-\r
-#define dout_subsys ceph_subsys_ms\r
-#undef dout_prefix\r
-#define dout_prefix *_dout << " RDMAServerSocketImpl "\r
-\r
-int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)\r
-{\r
- int rc = 0;\r
- server_setup_socket = net.create_socket(sa.get_family(), true);\r
- if (server_setup_socket < 0) {\r
- rc = -errno;\r
- lderr(cct) << __func__ << " failed to create server socket: "\r
- << cpp_strerror(errno) << dendl;\r
- return rc;\r
- }\r
-\r
- rc = net.set_nonblock(server_setup_socket);\r
- if (rc < 0) {\r
- goto err;\r
- }\r
-\r
- rc = net.set_socket_options(server_setup_socket, opt.nodelay, opt.rcbuf_size);\r
- if (rc < 0) {\r
- goto err;\r
- }\r
- net.set_close_on_exec(server_setup_socket);\r
-\r
- rc = ::bind(server_setup_socket, sa.get_sockaddr(), sa.get_sockaddr_len());\r
- if (rc < 0) {\r
- rc = -errno;\r
- ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()\r
- << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;\r
- goto err;\r
- }\r
-\r
- rc = ::listen(server_setup_socket, 128);\r
- if (rc < 0) {\r
- rc = -errno;\r
- lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(errno) << dendl;\r
- goto err;\r
- }\r
-\r
- ldout(cct, 20) << __func__ << " bind to " << sa.get_sockaddr() << " on port " << sa.get_port() << dendl;\r
- return 0;\r
-\r
-err:\r
- ::close(server_setup_socket);\r
- server_setup_socket = -1;\r
- return -errno;\r
-}\r
-\r
-int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w)\r
-{\r
- ldout(cct, 15) << __func__ << dendl;\r
-\r
- assert(sock);\r
- sockaddr_storage ss;\r
- socklen_t slen = sizeof(ss);\r
- int sd = ::accept(server_setup_socket, (sockaddr*)&ss, &slen);\r
- if (sd < 0) {\r
- return -errno;\r
- }\r
- ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl;\r
-\r
- net.set_close_on_exec(sd);\r
- int r = net.set_nonblock(sd);\r
- if (r < 0) {\r
- ::close(sd);\r
- return -errno;\r
- }\r
-\r
- r = net.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);\r
- if (r < 0) {\r
- ::close(sd);\r
- return -errno;\r
- }\r
- net.set_priority(sd, opt.priority);\r
-\r
- RDMAConnectedSocketImpl* server;\r
- //Worker* w = dispatcher->get_stack()->get_worker();\r
- server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w));\r
- server->set_accept_fd(sd);\r
- ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl;\r
- std::unique_ptr<RDMAConnectedSocketImpl> csi(server);\r
- *sock = ConnectedSocket(std::move(csi));\r
- if (out)\r
- out->set_sockaddr((sockaddr*)&ss);\r
-\r
- return 0;\r
-}\r
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "msg/async/net_handler.h"
+#include "RDMAStack.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << " RDMAServerSocketImpl "
+
+int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
+{
+ int rc = 0;
+ server_setup_socket = net.create_socket(sa.get_family(), true);
+ if (server_setup_socket < 0) {
+ rc = -errno;
+ lderr(cct) << __func__ << " failed to create server socket: "
+ << cpp_strerror(errno) << dendl;
+ return rc;
+ }
+
+ rc = net.set_nonblock(server_setup_socket);
+ if (rc < 0) {
+ goto err;
+ }
+
+ rc = net.set_socket_options(server_setup_socket, opt.nodelay, opt.rcbuf_size);
+ if (rc < 0) {
+ goto err;
+ }
+ net.set_close_on_exec(server_setup_socket);
+
+ rc = ::bind(server_setup_socket, sa.get_sockaddr(), sa.get_sockaddr_len());
+ if (rc < 0) {
+ rc = -errno;
+ ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
+ << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;
+ goto err;
+ }
+
+ rc = ::listen(server_setup_socket, 128);
+ if (rc < 0) {
+ rc = -errno;
+ lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(errno) << dendl;
+ goto err;
+ }
+
+ ldout(cct, 20) << __func__ << " bind to " << sa.get_sockaddr() << " on port " << sa.get_port() << dendl;
+ return 0;
+
+err:
+ ::close(server_setup_socket);
+ server_setup_socket = -1;
+ return -errno;
+}
+
+int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w)
+{
+ ldout(cct, 15) << __func__ << dendl;
+
+ assert(sock);
+ sockaddr_storage ss;
+ socklen_t slen = sizeof(ss);
+ int sd = ::accept(server_setup_socket, (sockaddr*)&ss, &slen);
+ if (sd < 0) {
+ return -errno;
+ }
+ ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl;
+
+ net.set_close_on_exec(sd);
+ int r = net.set_nonblock(sd);
+ if (r < 0) {
+ ::close(sd);
+ return -errno;
+ }
+
+ r = net.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
+ if (r < 0) {
+ ::close(sd);
+ return -errno;
+ }
+ net.set_priority(sd, opt.priority);
+
+ RDMAConnectedSocketImpl* server;
+ //Worker* w = dispatcher->get_stack()->get_worker();
+ server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w));
+ server->set_accept_fd(sd);
+ ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl;
+ std::unique_ptr<RDMAConnectedSocketImpl> csi(server);
+ *sock = ConnectedSocket(std::move(csi));
+ if (out)
+ out->set_sockaddr((sockaddr*)&ss);
+
+ return 0;
+}