// core
OPTION(ms_async_affinity_cores, OPT_STR, "")
OPTION(ms_async_send_inline, OPT_BOOL, false)
+OPTION(ms_async_rdma_device_name, OPT_STR, "")
+OPTION(ms_async_rdma_enable_hugepage, OPT_BOOL, false)
+OPTION(ms_async_rdma_buffer_size, OPT_INT, 8192)
+OPTION(ms_async_rdma_send_buffers, OPT_U32, 32)
+OPTION(ms_async_rdma_receive_buffers, OPT_U32, 64)
OPTION(inject_early_sigterm, OPT_BOOL, false)
#include "common/Cond.h"
#include "common/errno.h"
#include "PosixStack.h"
+#ifdef HAVE_RDMA
+#include "rdma/RDMAStack.h"
+#endif
#include "common/dout.h"
#include "include/assert.h"
{
if (t == "posix")
return std::make_shared<PosixNetworkStack>(c, t);
+#ifdef HAVE_RDMA
+ else if (t == "rdma")
+ return std::make_shared<RDMAStack>(c, t);
+#endif
return nullptr;
}
{
if (type == "posix")
return new PosixWorker(c, i);
+#ifdef HAVE_RDMA
+ else if (type == "rdma")
+ return new RDMAWorker(c, i);
+#endif
return nullptr;
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- \r
+// vim: ts=8 sw=2 smarttab\r
+/*\r
+ * Ceph - scalable distributed file system\r
+ *\r
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>\r
+ *\r
+ * Author: Haomai Wang <haomaiwang@gmail.com>\r
+ *\r
+ * This is free software; you can redistribute it and/or\r
+ * modify it under the terms of the GNU Lesser General Public\r
+ * License version 2.1, as published by the Free Software\r
+ * Foundation. See file COPYING.\r
+ *\r
+ */\r
+\r
+#include "Infiniband.h"\r
+#include "common/errno.h"\r
+#include "common/debug.h"\r
+#include "RDMAStack.h"\r
+\r
+#define dout_subsys ceph_subsys_ms\r
+#undef dout_prefix\r
+#define dout_prefix *_dout << "Infiniband "\r
+\r
+static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;\r
+static const uint32_t MAX_INLINE_DATA = 128;\r
+static const uint32_t UDP_MSG_LEN = sizeof("0000:00000000:00000000:00000000000000000000000000000000") - 1;\r
+\r
+Device::Device(CephContext *c, ibv_device* d): cct(c), device(d), device_attr(new ibv_device_attr)\r
+{\r
+ if (device == NULL) {\r
+ lderr(cct) << __func__ << "device == NULL" << cpp_strerror(errno) << dendl;\r
+ assert(0);\r
+ }\r
+ name = ibv_get_device_name(device);\r
+ ctxt = ibv_open_device(device);\r
+ if (ctxt == NULL) {\r
+ lderr(cct) << __func__ << "open rdma device failed. " << cpp_strerror(errno) << dendl;\r
+ assert(0);\r
+ }\r
+ int r = ibv_query_device(ctxt, device_attr);\r
+ if (r == -1) {\r
+ lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;\r
+ assert(0);\r
+ }\r
+ port_cnt = device_attr->phys_port_cnt;\r
+ ports = new Port*[port_cnt];\r
+ for (uint8_t i = 0; i < port_cnt; ++i) {\r
+ ports[i] = new Port(cct, ctxt, i+1);\r
+ if (ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) {\r
+ active_port = ports[i];\r
+ ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;\r
+ } else {\r
+ ldout(cct, 10) << __func__ << " port " << i+1 << " is unactive(" << ports[i]->get_port_attr()->state << ")"<< dendl;\r
+ }\r
+ }\r
+ if (!active_port) {\r
+ lderr(cct) << __func__ << " no active port found" << dendl;\r
+ assert(active_port);\r
+ }\r
+}\r
+\r
+Infiniband::Infiniband(RDMAStack* s, CephContext *c, const std::string &device_name): cct(c), device_list(c), net(c), stack(s)\r
+{\r
+ device = device_list.get_device(device_name.c_str());\r
+ assert(device);\r
+ ib_physical_port = device->active_port->get_port_num();\r
+ pd = new ProtectionDomain(cct, device);\r
+ assert(net.set_nonblock(device->ctxt->async_fd) == 0);\r
+\r
+ max_recv_wr = device->device_attr->max_srq_wr;\r
+ if (max_recv_wr < cct->_conf->ms_async_rdma_receive_buffers) {\r
+ ldout(cct, 0) << __func__ << " max allowed receive buffers is " << max_recv_wr << " use this instead." << dendl;\r
+ max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers;\r
+ }\r
+ max_send_wr = device->device_attr->max_qp_wr;\r
+ if (max_send_wr < cct->_conf->ms_async_rdma_send_buffers) {\r
+ ldout(cct, 0) << __func__ << " max allowed send buffers is " << max_send_wr << " use this instead." << dendl;\r
+ max_send_wr = cct->_conf->ms_async_rdma_send_buffers;\r
+ }\r
+\r
+ ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe\r
+ << " completion entries" << dendl;\r
+\r
+ memory_manager = new MemoryManager(cct, device, pd);\r
+ memory_manager->register_rx_tx(\r
+ cct->_conf->ms_async_rdma_buffer_size,\r
+ cct->_conf->ms_async_rdma_receive_buffers,\r
+ cct->_conf->ms_async_rdma_send_buffers);\r
+\r
+ srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);\r
+ post_channel_cluster();\r
+}\r
+\r
+/**\r
+ * Create a shared receive queue. This basically wraps the verbs call. \r
+ *\r
+ * \param[in] max_wr\r
+ * The max number of outstanding work requests in the SRQ.\r
+ * \param[in] max_sge\r
+ * The max number of scatter elements per WR.\r
+ * \return\r
+ * A valid ibv_srq pointer, or NULL on error.\r
+ */\r
+ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)\r
+{\r
+ ldout(cct, 20) << __func__ << " max_wr=" << max_wr << " max_sge=" << max_sge << dendl;\r
+ ibv_srq_init_attr sia;\r
+ memset(&sia, 0, sizeof(sia));\r
+ sia.srq_context = device->ctxt;\r
+ sia.attr.max_wr = max_wr;\r
+ sia.attr.max_sge = max_sge;\r
+ return ibv_create_srq(pd->pd, &sia);\r
+}\r
+\r
+/**\r
+ * Create a new QueuePair. This factory should be used in preference to\r
+ * the QueuePair constructor directly, since this lets derivatives of\r
+ * Infiniband, e.g. MockInfiniband (if it existed),\r
+ * return mocked out QueuePair derivatives.\r
+ *\r
+ * \return\r
+ * QueuePair on success or NULL if init fails\r
+ * See QueuePair::QueuePair for parameter documentation.\r
+ */\r
+Infiniband::QueuePair* Infiniband::create_queue_pair(ibv_qp_type type)\r
+{\r
+ Infiniband::CompletionChannel* cc = create_comp_channel();\r
+ if (!cc)\r
+ return NULL;\r
+\r
+ Infiniband::CompletionQueue* cq = create_comp_queue(cc);\r
+ if (!cq) {\r
+ delete cc;\r
+ lderr(cct) << __func__ << " failed to create cq." << dendl;\r
+ return NULL;\r
+ }\r
+\r
+ RDMAWorker* w = static_cast<RDMAWorker*>(stack->get_worker());\r
+ Infiniband::QueuePair *qp = new QueuePair(*this, type, ib_physical_port, srq, w->get_tx_cq(), cq, max_send_wr, max_recv_wr);\r
+ if (qp->init()) {\r
+ delete cc;\r
+ delete cq;\r
+ delete qp;\r
+ return NULL;\r
+ }\r
+ return qp;\r
+}\r
+\r
+int Infiniband::QueuePair::init()\r
+{\r
+ ldout(infiniband.cct, 20) << __func__ << " started." << dendl;\r
+ ibv_qp_init_attr qpia;\r
+ memset(&qpia, 0, sizeof(qpia));\r
+ qpia.send_cq = txcq->get_cq();\r
+ qpia.recv_cq = rxcq->get_cq();\r
+ qpia.srq = srq; // use the same shared receive queue\r
+ qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests\r
+ qpia.cap.max_send_sge = 1; // max send scatter-gather elements\r
+ qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q\r
+ qpia.qp_type = type; // RC, UC, UD, or XRC\r
+ qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs\r
+\r
+ qp = ibv_create_qp(pd, &qpia);\r
+ if (qp == NULL) {\r
+ lderr(infiniband.cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;\r
+ return -1;\r
+ }\r
+\r
+ ldout(infiniband.cct, 20) << __func__ << " successfully create queue pair: "\r
+ << "qp=" << qp << dendl;\r
+\r
+ // move from RESET to INIT state\r
+ ibv_qp_attr qpa;\r
+ memset(&qpa, 0, sizeof(qpa));\r
+ qpa.qp_state = IBV_QPS_INIT;\r
+ qpa.pkey_index = 0;\r
+ qpa.port_num = (uint8_t)(ib_physical_port);\r
+ qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;\r
+ qpa.qkey = q_key;\r
+\r
+ int mask = IBV_QP_STATE | IBV_QP_PORT;\r
+ switch (type) {\r
+ case IBV_QPT_RC:\r
+ mask |= IBV_QP_ACCESS_FLAGS;\r
+ mask |= IBV_QP_PKEY_INDEX;\r
+ break;\r
+ case IBV_QPT_UD:\r
+ mask |= IBV_QP_QKEY;\r
+ mask |= IBV_QP_PKEY_INDEX;\r
+ break;\r
+ case IBV_QPT_RAW_PACKET:\r
+ break;\r
+ default:\r
+ assert(0);\r
+ }\r
+\r
+ int ret = ibv_modify_qp(qp, &qpa, mask);\r
+ if (ret) {\r
+ ibv_destroy_qp(qp);\r
+ lderr(infiniband.cct) << __func__ << " failed to transition to INIT state: "\r
+ << cpp_strerror(errno) << dendl;\r
+ return -1;\r
+ }\r
+ ldout(infiniband.cct, 20) << __func__ << " successfully change queue pair to INIT:"\r
+ << " qp=" << qp << dendl;\r
+ return 0;\r
+}\r
+\r
+int Infiniband::post_chunk(Chunk* chunk)\r
+{\r
+ ibv_sge isge;\r
+ isge.addr = reinterpret_cast<uint64_t>(chunk->buffer);\r
+ isge.length = chunk->bytes;\r
+ isge.lkey = chunk->mr->lkey;\r
+ ibv_recv_wr rx_work_request;\r
+\r
+ memset(&rx_work_request, 0, sizeof(rx_work_request));\r
+ rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr\r
+ rx_work_request.next = NULL;\r
+ rx_work_request.sg_list = &isge;\r
+ rx_work_request.num_sge = 1;\r
+\r
+ ibv_recv_wr *badWorkRequest;\r
+ int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);\r
+ if (ret) {\r
+ lderr(cct) << __func__ << " ib_post_srq_recv failed on post "\r
+ << cpp_strerror(errno) << dendl;\r
+ return -1;\r
+ }\r
+ return 0;\r
+}\r
+\r
+int Infiniband::post_channel_cluster()\r
+{\r
+ vector<Chunk*> free_chunks;\r
+ int r = memory_manager->get_channel_buffers(free_chunks, 0);\r
+ assert(r == 0);\r
+ for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {\r
+ r = post_chunk(*iter);\r
+ assert(r == 0);\r
+ }\r
+ ldout(cct, 20) << __func__ << " posted buffers to srq. "<< dendl;\r
+ return 0;\r
+}\r
+\r
+Infiniband::CompletionChannel* Infiniband::create_comp_channel()\r
+{\r
+ ldout(cct, 20) << __func__ << " started." << dendl;\r
+ Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(*this);\r
+ if (cc->init()) {\r
+ delete cc;\r
+ return NULL;\r
+ }\r
+ return cc;\r
+}\r
+\r
+Infiniband::CompletionQueue* Infiniband::create_comp_queue(CompletionChannel *cc)\r
+{\r
+ ldout(cct, 20) << __func__ << " completion channel=" << cc << dendl;\r
+ Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(*this, max_recv_wr, cc);\r
+ if (cq->init()) {\r
+ delete cq;\r
+ return NULL;\r
+ }\r
+ return cq;\r
+}\r
+\r
+\r
+Infiniband::QueuePair::QueuePair(\r
+ Infiniband& infiniband, ibv_qp_type type, int port, ibv_srq *srq,\r
+ Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,\r
+ uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)\r
+: infiniband(infiniband),\r
+ type(type),\r
+ ctxt(infiniband.device->ctxt),\r
+ ib_physical_port(port),\r
+ pd(infiniband.pd->pd),\r
+ srq(srq),\r
+ qp(NULL),\r
+ txcq(txcq),\r
+ rxcq(rxcq),\r
+ initial_psn(0),\r
+ max_send_wr(max_send_wr),\r
+ max_recv_wr(max_recv_wr),\r
+ q_key(q_key)\r
+{\r
+ initial_psn = lrand48() & 0xffffff;\r
+ if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {\r
+ lderr(infiniband.cct) << __func__ << "invalid queue pair type" << cpp_strerror(errno) << dendl;\r
+ assert(0);\r
+ }\r
+ pd = infiniband.pd->pd;\r
+}\r
+\r
+// 1 means no valid buffer read, 0 means got enough buffer\r
+// else return < 0 means error\r
+int Infiniband::recv_udp_msg(int sd, IBSYNMsg& im, entity_addr_t *addr)\r
+{\r
+ assert(sd >= 0);\r
+ ssize_t r;\r
+ entity_addr_t socket_addr;\r
+ struct sockaddr from;\r
+ socklen_t slen = sizeof(from);\r
+ char msg[UDP_MSG_LEN];\r
+ char gid[32];\r
+ r = ::recvfrom(sd, &msg, sizeof(msg), 0, &from, &slen);\r
+ // Drop incoming qpt\r
+ if (cct->_conf->ms_inject_socket_failures && sd >= 0) {\r
+ if (rand() % cct->_conf->ms_inject_socket_failures == 0) {\r
+ ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;\r
+ r = -1;\r
+ }\r
+ }\r
+ if (r == -1) {\r
+ lderr(cct) << __func__ << " recv got error " << errno << ": "\r
+ << cpp_strerror(errno) << dendl;\r
+ return -1;\r
+ } else if ((size_t)r != sizeof(msg)) { // valid message length\r
+ lderr(cct) << __func__ << " recv got bad length (" << r << ")." << cpp_strerror(errno) << dendl;\r
+ return 1;\r
+ } else { // valid message\r
+ socket_addr.set_sockaddr(&from);\r
+ if (addr) {\r
+ *addr = socket_addr;\r
+ }\r
+ sscanf(msg, "%04x:%08x:%08x:%s", &(im.lid), &(im.qpn), &(im.psn), gid);\r
+ wire_gid_to_gid(gid, &(im.gid));\r
+ ldout(cct, 10) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << gid << dendl;\r
+ return 0;\r
+ }\r
+}\r
+\r
+int Infiniband::send_udp_msg(int sd, IBSYNMsg& im, entity_addr_t &peeraddr)\r
+{\r
+ assert(sd >= 0);\r
+ int retry = 0;\r
+ ssize_t r;\r
+\r
+ char msg[UDP_MSG_LEN];\r
+ char gid[32];\r
+retry:\r
+ gid_to_wire_gid(&(im.gid), gid);\r
+ r = snprintf(msg, UDP_MSG_LEN, "%04x:%08x:%08x:%s", im.lid, im.qpn, im.psn, gid);\r
+ ldout(cct, 20) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << gid << " r=" << r << dendl;\r
+ r = ::sendto(sd, msg, sizeof(msg), 0, peeraddr.get_sockaddr(),\r
+ peeraddr.get_sockaddr_len());\r
+ // Drop incoming qpt\r
+ if (cct->_conf->ms_inject_socket_failures && sd >= 0) {\r
+ if (rand() % cct->_conf->ms_inject_socket_failures == 0) {\r
+ ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;\r
+ r = -1;\r
+ }\r
+ }\r
+\r
+ if ((size_t)r != sizeof(msg)) {\r
+ if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {\r
+ retry++;\r
+ goto retry;\r
+ }\r
+ if (r < 0)\r
+ lderr(cct) << __func__ << " send returned error " << errno << ": "\r
+ << cpp_strerror(errno) << dendl;\r
+ else\r
+ lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;\r
+ return -1;\r
+ }\r
+ return 0;\r
+}\r
+\r
+void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid)\r
+{\r
+ char tmp[9];\r
+ uint32_t v32;\r
+ int i;\r
+\r
+ for (tmp[8] = 0, i = 0; i < 4; ++i) {\r
+ memcpy(tmp, wgid + i * 8, 8);\r
+ sscanf(tmp, "%x", &v32);\r
+ *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32);\r
+ }\r
+}\r
+\r
+void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[])\r
+{\r
+ for (int i = 0; i < 4; ++i)\r
+ sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4)));\r
+}\r
+\r
+Infiniband::QueuePair::~QueuePair()\r
+{\r
+ if (qp)\r
+ assert(!ibv_destroy_qp(qp));\r
+ ldout(infiniband.cct, 20) << __func__ << " successfully destroyed QueuePair." << dendl;\r
+}\r
+\r
+Infiniband::CompletionChannel::~CompletionChannel()\r
+{\r
+ if (channel) {\r
+ int r = ibv_destroy_comp_channel(channel);\r
+ ldout(infiniband.cct, 20) << __func__ << " r: " << r << dendl;\r
+ assert(r == 0);\r
+ }\r
+ ldout(infiniband.cct, 20) << __func__ << " successfully destroyed CompletionChannel." << dendl;\r
+}\r
+\r
+Infiniband::CompletionQueue::~CompletionQueue()\r
+{\r
+ if (cq) {\r
+ int r = ibv_destroy_cq(cq);\r
+ ldout(infiniband.cct, 20) << __func__ << " r: " << cpp_strerror(errno) << dendl;\r
+ assert(r == 0);\r
+ }\r
+ ldout(infiniband.cct, 20) << __func__ << " successfully destroyed CompletionQueue." << dendl;\r
+}\r
+\r
+int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)\r
+{\r
+ ldout(infiniband.cct, 20) << __func__ << " started." << dendl;\r
+ int r = ibv_req_notify_cq(cq, 0);\r
+ if (r) {\r
+ lderr(infiniband.cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;\r
+ }\r
+ return r;\r
+}\r
+\r
+int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {\r
+ int r = ibv_poll_cq(cq, num_entries, ret_wc_array);\r
+ if (r < 0) {\r
+ lderr(infiniband.cct) << __func__ << " poll_completion_queue occur met error: "\r
+ << cpp_strerror(errno) << dendl;\r
+ return -1;\r
+ }\r
+ return r;\r
+}\r
+\r
+bool Infiniband::CompletionChannel::get_cq_event()\r
+{\r
+ // ldout(infiniband.cct, 21) << __func__ << " started." << dendl;\r
+ ibv_cq *cq = NULL;\r
+ void *ev_ctx;\r
+ if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {\r
+ if (errno != EAGAIN && errno != EINTR)\r
+ lderr(infiniband.cct) << __func__ << "failed to retrieve CQ event: "\r
+ << cpp_strerror(errno) << dendl;\r
+ return false;\r
+ }\r
+\r
+ /* accumulate number of cq events that need to\r
+ * * be acked, and periodically ack them\r
+ * */\r
+ if (++cq_events_that_need_ack == MAX_ACK_EVENT) {\r
+ ldout(infiniband.cct, 20) << __func__ << " ack aq events." << dendl;\r
+ ibv_ack_cq_events(cq, MAX_ACK_EVENT);\r
+ cq_events_that_need_ack = 0;\r
+ }\r
+\r
+ return true;\r
+}\r
+\r
+int Infiniband::CompletionQueue::init()\r
+{\r
+ cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);\r
+ if (!cq) {\r
+ lderr(infiniband.cct) << __func__ << " failed to create receive completion queue: "\r
+ << cpp_strerror(errno) << dendl;\r
+ return -1;\r
+ }\r
+\r
+ if (ibv_req_notify_cq(cq, 0)) {\r
+ lderr(infiniband.cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;\r
+ ibv_destroy_cq(cq);\r
+ return -1;\r
+ }\r
+\r
+ channel->bind_cq(cq);\r
+ ldout(infiniband.cct, 20) << __func__ << " successfully create cq=" << cq << dendl;\r
+ return 0;\r
+}\r
+\r
+int Infiniband::CompletionChannel::init()\r
+{\r
+ ldout(infiniband.cct, 20) << __func__ << " started." << dendl;\r
+ channel = ibv_create_comp_channel(infiniband.device->ctxt);\r
+ if (!channel) {\r
+ lderr(infiniband.cct) << __func__ << " failed to create receive completion channel: "\r
+ << cpp_strerror(errno) << dendl;\r
+ return -1;\r
+ }\r
+ int rc = infiniband.net.set_nonblock(channel->fd);\r
+ if (rc < 0) {\r
+ ibv_destroy_comp_channel(channel);\r
+ return -1;\r
+ }\r
+ return 0;\r
+}\r
+\r
+/**\r
+ * Given a string representation of the `status' field from Verbs\r
+ * struct `ibv_wc'.\r
+ *\r
+ * \param[in] status\r
+ * The integer status obtained in ibv_wc.status.\r
+ * \return\r
+ * A string corresponding to the given status.\r
+ */\r
+const char* Infiniband::wc_status_to_string(int status)\r
+{\r
+ static const char *lookup[] = {\r
+ "SUCCESS",\r
+ "LOC_LEN_ERR",\r
+ "LOC_QP_OP_ERR",\r
+ "LOC_EEC_OP_ERR",\r
+ "LOC_PROT_ERR",\r
+ "WR_FLUSH_ERR",\r
+ "MW_BIND_ERR",\r
+ "BAD_RESP_ERR",\r
+ "LOC_ACCESS_ERR",\r
+ "REM_INV_REQ_ERR",\r
+ "REM_ACCESS_ERR",\r
+ "REM_OP_ERR",\r
+ "RETRY_EXC_ERR",\r
+ "RNR_RETRY_EXC_ERR",\r
+ "LOC_RDD_VIOL_ERR",\r
+ "REM_INV_RD_REQ_ERR",\r
+ "REM_ABORT_ERR",\r
+ "INV_EECN_ERR",\r
+ "INV_EEC_STATE_ERR",\r
+ "FATAL_ERR",\r
+ "RESP_TIMEOUT_ERR",\r
+ "GENERAL_ERR"\r
+ };\r
+\r
+ if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR)\r
+ return "<status out of range!>";\r
+ return lookup[status];\r
+}\r
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- \r
+// vim: ts=8 sw=2 smarttab\r
+/*\r
+ * Ceph - scalable distributed file system\r
+ *\r
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>\r
+ *\r
+ * Author: Haomai Wang <haomaiwang@gmail.com>\r
+ *\r
+ * This is free software; you can redistribute it and/or\r
+ * modify it under the terms of the GNU Lesser General Public\r
+ * License version 2.1, as published by the Free Software\r
+ * Foundation. See file COPYING.\r
+ *\r
+ */\r
+\r
+#ifndef CEPH_INFINIBAND_H\r
+#define CEPH_INFINIBAND_H\r
+\r
+#include <infiniband/verbs.h>\r
+\r
+#include <string>\r
+#include <vector>\r
+\r
+#include "include/int_types.h"\r
+#include "include/page.h"\r
+#include "common/debug.h"\r
+#include "common/errno.h"\r
+#include "msg/msg_types.h"\r
+#include "msg/async/net_handler.h"\r
+#include "common/Mutex.h"\r
+\r
+#define HUGE_PAGE_SIZE (2 * 1024 * 1024)\r
+#define ALIGN_TO_PAGE_SIZE(x) \\r
+ (((x) + HUGE_PAGE_SIZE -1) / HUGE_PAGE_SIZE * HUGE_PAGE_SIZE)\r
+\r
+struct IBSYNMsg {\r
+ uint16_t lid;\r
+ uint32_t qpn;\r
+ uint32_t psn;\r
+ union ibv_gid gid;\r
+} __attribute__((packed));\r
+\r
+class RDMAStack;\r
+class CephContext;\r
+\r
+class Port {\r
+ CephContext *cct;\r
+ struct ibv_context* ctxt;\r
+ uint8_t port_num;\r
+ struct ibv_port_attr* port_attr;\r
+ int gid_tbl_len;\r
+ uint16_t lid;\r
+ union ibv_gid gid;\r
+\r
+ public:\r
+ explicit Port(CephContext *c, struct ibv_context* ictxt, uint8_t ipn): cct(c), ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) {\r
+ int r = ibv_query_port(ctxt, port_num, port_attr);\r
+ if (r == -1) {\r
+ lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;\r
+ assert(0);\r
+ }\r
+\r
+ lid = port_attr->lid;\r
+ r = ibv_query_gid(ctxt, port_num, 0, &gid);\r
+ if (r) {\r
+ lderr(cct) << __func__ << " query gid failed " << cpp_strerror(errno) << dendl;\r
+ assert(0);\r
+ }\r
+ }\r
+\r
+ uint16_t get_lid() { return lid; }\r
+ ibv_gid get_gid() { return gid; }\r
+ uint8_t get_port_num() { return port_num; }\r
+ ibv_port_attr* get_port_attr() { return port_attr; }\r
+};\r
+\r
+\r
+class Device {\r
+ CephContext *cct;\r
+ ibv_device *device;\r
+ const char* name;\r
+ uint8_t port_cnt;\r
+ Port** ports;\r
+ public:\r
+ explicit Device(CephContext *c, ibv_device* d);\r
+ ~Device() {\r
+ for (uint8_t i = 0; i < port_cnt; ++i)\r
+ delete ports[i];\r
+ delete []ports;\r
+ assert(ibv_close_device(ctxt) == 0);\r
+ }\r
+ const char* get_name() { return name;}\r
+ uint16_t get_lid() { return active_port->get_lid(); }\r
+ ibv_gid get_gid() { return active_port->get_gid(); }\r
+ struct ibv_context *ctxt;\r
+ ibv_device_attr *device_attr;\r
+ Port* active_port;\r
+};\r
+\r
+\r
+class DeviceList {\r
+ CephContext *cct;\r
+ struct ibv_device ** device_list;\r
+ int num;\r
+ Device** devices;\r
+ public:\r
+ DeviceList(CephContext *c): cct(c), device_list(ibv_get_device_list(&num)) {\r
+ if (device_list == NULL || num == 0) {\r
+ lderr(cct) << __func__ << " failed to get rdma device list. " << cpp_strerror(errno) << dendl;\r
+ assert(0);\r
+ }\r
+ devices = new Device*[num];\r
+\r
+ for (int i = 0;i < num; ++i) {\r
+ devices[i] = new Device(cct, device_list[i]);\r
+ }\r
+ }\r
+ ~DeviceList() {\r
+ for (int i=0; devices[i] != NULL; ++i) {\r
+ delete devices[i];\r
+ }\r
+ delete []devices;\r
+ ibv_free_device_list(device_list);\r
+ }\r
+\r
+ Device* get_device(const char* device_name) {\r
+ assert(devices);\r
+ for (int i = 0; i < num; ++i) {\r
+ if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) {\r
+ return devices[i];\r
+ }\r
+ }\r
+ return NULL;\r
+ }\r
+};\r
+\r
+\r
+class Infiniband {\r
+ public:\r
+ class ProtectionDomain {\r
+ public:\r
+ explicit ProtectionDomain(CephContext *c, Device *device)\r
+ : cct(c), pd(ibv_alloc_pd(device->ctxt))\r
+ {\r
+ if (pd == NULL) {\r
+ lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;\r
+ assert(0);\r
+ }\r
+ }\r
+ ~ProtectionDomain() {\r
+ int rc = ibv_dealloc_pd(pd);\r
+ if (rc != 0) {\r
+ lderr(cct) << __func__ << " ibv_dealloc_pd failed: "\r
+ << cpp_strerror(errno) << dendl;\r
+ }\r
+ }\r
+ CephContext *cct;\r
+ ibv_pd* const pd;\r
+ };\r
+\r
+\r
+ class MemoryManager {\r
+ public:\r
+ class Chunk {\r
+ public:\r
+ Chunk(char* b, uint32_t len, ibv_mr* m) : buffer(b), bytes(len), offset(0), mr(m) {}\r
+ ~Chunk() {\r
+ assert(ibv_dereg_mr(mr) == 0);\r
+ }\r
+\r
+ void set_offset(uint32_t o) {\r
+ offset = o;\r
+ }\r
+\r
+ size_t get_offset() {\r
+ return offset;\r
+ }\r
+\r
+ void set_bound(uint32_t b) {\r
+ bound = b;\r
+ }\r
+\r
+ void prepare_read(uint32_t b) {\r
+ offset = 0;\r
+ bound = b;\r
+ }\r
+\r
+ uint32_t get_bound() {\r
+ return bound;\r
+ }\r
+\r
+ size_t read(char* buf, size_t len) {\r
+ size_t left = bound - offset;\r
+ if(left >= len) {\r
+ memcpy(buf, buffer+offset, len);\r
+ offset += len;\r
+ return len;\r
+ } else {\r
+ memcpy(buf, buffer+offset, left);\r
+ offset = 0;\r
+ bound = 0;\r
+ return left;\r
+ }\r
+ }\r
+\r
+ size_t write(char* buf, size_t len) {\r
+ size_t left = bytes - offset;\r
+ if (left >= len) {\r
+ memcpy(buffer+offset, buf, len);\r
+ offset += len;\r
+ return len;\r
+ } else {\r
+ memcpy(buffer+offset, buf, left);\r
+ offset = bytes;\r
+ return left;\r
+ }\r
+ }\r
+\r
+ bool full() {\r
+ return offset == bytes;\r
+ }\r
+\r
+ bool over() {\r
+ return offset == bound;\r
+ }\r
+\r
+ void clear() {\r
+ offset = 0;\r
+ bound = 0;\r
+ }\r
+\r
+ void post_srq(Infiniband *ib) {\r
+ ib->post_chunk(this);\r
+ }\r
+\r
+ void set_owner(uint64_t o) {\r
+ owner = o;\r
+ }\r
+\r
+ uint64_t get_owner() {\r
+ return owner;\r
+ }\r
+\r
+ public:\r
+ char* buffer;\r
+ uint32_t bytes;\r
+ uint32_t bound;\r
+ size_t offset;\r
+ ibv_mr* mr;\r
+ uint64_t owner;\r
+ };\r
+\r
+ class Cluster {\r
+ public:\r
+ Cluster(MemoryManager& m, uint32_t s) : manager(m), chunk_size(s), lock("cluster_lock"){}\r
+ Cluster(MemoryManager& m, uint32_t s, uint32_t n) : manager(m), chunk_size(s), lock("cluster_lock"){\r
+ add(n);\r
+ }\r
+\r
+ ~Cluster() {\r
+ set<Chunk*>::iterator c = all_chunks.begin();\r
+ while(c != all_chunks.end()) {\r
+ delete *c;\r
+ ++c;\r
+ }\r
+ }\r
+ int add(uint32_t num) {\r
+ uint32_t bytes = chunk_size * num;\r
+ //cihar* base = (char*)malloc(bytes);\r
+ if (!manager.enabled_huge_page) {\r
+ base = (char*)memalign(CEPH_PAGE_SIZE, bytes);\r
+ } else {\r
+ base = (char*)manager.malloc_huge_pages(bytes);\r
+ }\r
+ assert(base);\r
+ for (uint32_t offset = 0; offset < bytes; offset += chunk_size){\r
+ ibv_mr* m = ibv_reg_mr(manager.pd->pd, base+offset, chunk_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);\r
+ assert(m);\r
+ Chunk* c = new Chunk(base+offset,chunk_size,m);\r
+ free_chunks.push_back(c);\r
+ all_chunks.insert(c);\r
+ }\r
+ return 0;\r
+ }\r
+\r
+ int take_back(Chunk* ck) {\r
+ Mutex::Locker l(lock);\r
+ free_chunks.push_back(ck);\r
+ return 0;\r
+ }\r
+\r
+ int get_buffers(std::vector<Chunk*> &chunks, size_t bytes) {\r
+ Mutex::Locker l(lock);\r
+ if (!bytes) {\r
+ free_chunks.swap(chunks);\r
+ return 0;\r
+ }\r
+ uint32_t num = bytes / chunk_size + 1;\r
+ if (bytes % chunk_size == 0)\r
+ --num;\r
+ if (free_chunks.size() < num)\r
+ return -EAGAIN;\r
+ for (uint32_t i = 0; i < num; ++i) {\r
+ chunks.push_back(free_chunks.back());\r
+ free_chunks.pop_back();\r
+ }\r
+ return 0;\r
+ }\r
+\r
+ MemoryManager& manager;\r
+ uint32_t chunk_size;\r
+ Mutex lock;\r
+ std::vector<Chunk*> free_chunks;\r
+ set<Chunk*> all_chunks;\r
+ char* base;\r
+ };\r
+\r
+ MemoryManager(CephContext *cct, Device *d, ProtectionDomain *p) : cct(cct), device(d), pd(p) {\r
+ enabled_huge_page = cct->_conf->ms_async_rdma_enable_hugepage;\r
+ }\r
+ ~MemoryManager() {\r
+ if(channel)\r
+ delete channel;\r
+ if(send)\r
+ delete send;\r
+ }\r
+ void* malloc_huge_pages(size_t size) {\r
+ size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);\r
+ char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);\r
+ if (ptr == MAP_FAILED) {\r
+ lderr(cct) << __func__ << " MAP_FAILED" << dendl;\r
+ ptr = (char *)malloc(real_size);\r
+ if (ptr == NULL) return NULL;\r
+ real_size = 0;\r
+ }\r
+ *((size_t *)ptr) = real_size;\r
+ lderr(cct) << __func__ << " bingo!" << dendl;\r
+ return ptr + HUGE_PAGE_SIZE;\r
+ }\r
+ void free_huge_pages(void *ptr) {\r
+ if (ptr == NULL) return;\r
+ void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;\r
+ size_t real_size = *((size_t *)real_ptr);\r
+ assert(real_size % HUGE_PAGE_SIZE == 0);\r
+ if (real_size != 0)\r
+ munmap(real_ptr, real_size);\r
+ else\r
+ free(real_ptr);\r
+ }\r
+ void register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num) {\r
+ assert(device);\r
+ assert(pd);\r
+ channel = new Cluster(*this, size);\r
+ channel->add(rx_num);\r
+\r
+ send = new Cluster(*this, size);\r
+ send->add(tx_num);\r
+ }\r
+ int return_tx(Chunk* c) {\r
+ c->clear();\r
+ return send->take_back(c);\r
+ }\r
+\r
+ int get_send_buffers(std::vector<Chunk*> &c, size_t bytes) {\r
+ return send->get_buffers(c, bytes);\r
+ }\r
+\r
+ int get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes) {\r
+ return channel->get_buffers(chunks, bytes);\r
+ }\r
+\r
+ int is_tx_chunk(Chunk* c) { return send->all_chunks.count(c);}\r
+ bool enabled_huge_page;\r
+ private:\r
+ Cluster* channel;//RECV\r
+ Cluster* send;// SEND\r
+ CephContext *cct;\r
+ Device *device;\r
+ ProtectionDomain *pd;\r
+ };\r
+\r
+ private:\r
+ uint32_t max_send_wr;\r
+ uint32_t max_recv_wr;\r
+ uint32_t max_sge;\r
+ uint8_t ib_physical_port;\r
+ MemoryManager* memory_manager;\r
+ ibv_srq* srq; // shared receive work queue\r
+ Device *device;\r
+ ProtectionDomain *pd;\r
+ CephContext* cct;\r
+ DeviceList device_list;\r
+ void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);\r
+ void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);\r
+\r
+ public:\r
+ NetHandler net;\r
+ RDMAStack* stack;\r
+ explicit Infiniband(RDMAStack* s, CephContext *c, const std::string &device_name);\r
+\r
+ /**\r
+ * Destroy an Infiniband object.\r
+ */\r
+ ~Infiniband() {\r
+ assert(ibv_destroy_srq(srq) == 0);\r
+ delete memory_manager;\r
+ delete pd;\r
+ }\r
+\r
+ class CompletionChannel {\r
+ static const uint32_t MAX_ACK_EVENT = 5000;\r
+ Infiniband& infiniband;\r
+ ibv_comp_channel *channel;\r
+ ibv_cq *cq;\r
+ uint32_t cq_events_that_need_ack;\r
+\r
+ public:\r
+ CompletionChannel(Infiniband &ib): infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) {}\r
+ ~CompletionChannel();\r
+ int init();\r
+ bool get_cq_event();\r
+ int get_fd() { return channel->fd; }\r
+ ibv_comp_channel* get_channel() { return channel; }\r
+ void bind_cq(ibv_cq *c) { cq = c; }\r
+ void ack_events() {\r
+ ibv_ack_cq_events(cq, cq_events_that_need_ack);\r
+ cq_events_that_need_ack = 0;\r
+ }\r
+ };\r
+\r
+ // this class encapsulates the creation, use, and destruction of an RC\r
+ // completion queue.\r
+ //\r
+ // You need to call init and it will create a cq and associate to comp channel\r
+ class CompletionQueue {\r
+ public:\r
+ CompletionQueue(Infiniband &ib, const uint32_t qd, CompletionChannel *cc):infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {}\r
+ ~CompletionQueue();\r
+ int init();\r
+ int poll_cq(int num_entries, ibv_wc *ret_wc_array);\r
+\r
+ ibv_cq* get_cq() const { return cq; }\r
+ int rearm_notify(bool solicited_only=true);\r
+ CompletionChannel* get_cc() const { return channel; }\r
+ private:\r
+ Infiniband& infiniband; // Infiniband to which this QP belongs\r
+ CompletionChannel *channel;\r
+ ibv_cq *cq;\r
+ uint32_t queue_depth;\r
+ };\r
+\r
+ // this class encapsulates the creation, use, and destruction of an RC\r
+ // queue pair.\r
+ //\r
+ // you need call init and it will create a qp and bring it to the INIT state.\r
+ // after obtaining the lid, qpn, and psn of a remote queue pair, one\r
+ // must call plumb() to bring the queue pair to the RTS state.\r
+ class QueuePair {\r
+ public:\r
+ QueuePair(Infiniband& infiniband, ibv_qp_type type,int ib_physical_port, ibv_srq *srq, Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0);\r
+ // exists solely as superclass constructor for MockQueuePair derivative\r
+ explicit QueuePair(Infiniband& infiniband):\r
+ infiniband(infiniband), type(IBV_QPT_RC), ctxt(NULL), ib_physical_port(-1),\r
+ pd(NULL), srq(NULL), qp(NULL), txcq(NULL), rxcq(NULL),\r
+ initial_psn(-1) {}\r
+ ~QueuePair();\r
+\r
+ int init();\r
+\r
+ /**\r
+ * Get the initial packet sequence number for this QueuePair.\r
+ * This is randomly generated on creation. It should not be confused\r
+ * with the remote side's PSN, which is set in #plumb(). \r
+ */\r
+ uint32_t get_initial_psn() const { return initial_psn; };\r
+ /**\r
+ * Get the local queue pair number for this QueuePair.\r
+ * QPNs are analogous to UDP/TCP port numbers.\r
+ */\r
+ uint32_t get_local_qp_number() const { return qp->qp_num; };\r
+ /**\r
+ * Get the remote queue pair number for this QueuePair, as set in #plumb().\r
+ * QPNs are analogous to UDP/TCP port numbers.\r
+ */\r
+ int get_remote_qp_number(uint32_t *rqp) const {\r
+ ibv_qp_attr qpa;\r
+ ibv_qp_init_attr qpia;\r
+\r
+ int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);\r
+ if (r) {\r
+ lderr(infiniband.cct) << __func__ << " failed to query qp: "\r
+ << cpp_strerror(errno) << dendl;\r
+ return -1;\r
+ }\r
+\r
+ if (rqp)\r
+ *rqp = qpa.dest_qp_num;\r
+ return 0;\r
+ }\r
+ /**\r
+ * Get the remote infiniband address for this QueuePair, as set in #plumb().\r
+ * LIDs are "local IDs" in infiniband terminology. They are short, locally\r
+ * routable addresses.\r
+ */\r
+ int get_remote_lid(uint16_t *lid) const {\r
+ ibv_qp_attr qpa;\r
+ ibv_qp_init_attr qpia;\r
+\r
+ int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);\r
+ if (r) {\r
+ lderr(infiniband.cct) << __func__ << " failed to query qp: "\r
+ << cpp_strerror(errno) << dendl;\r
+ return -1;\r
+ }\r
+\r
+ if (lid)\r
+ *lid = qpa.ah_attr.dlid;\r
+ return 0;\r
+ }\r
+ /**\r
+ * Get the state of a QueuePair.\r
+ */\r
+ int get_state() const {\r
+ ibv_qp_attr qpa;\r
+ ibv_qp_init_attr qpia;\r
+\r
+ int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);\r
+ if (r) {\r
+ lderr(infiniband.cct) << __func__ << " failed to get state: "\r
+ << cpp_strerror(errno) << dendl;\r
+ return -1;\r
+ }\r
+ return qpa.qp_state;\r
+ }\r
+ /**\r
+ * Return true if the queue pair is in an error state, false otherwise.\r
+ */\r
+ bool is_error() const {\r
+ ibv_qp_attr qpa;\r
+ ibv_qp_init_attr qpia;\r
+\r
+ int r = ibv_query_qp(qp, &qpa, -1, &qpia);\r
+ if (r) {\r
+ lderr(infiniband.cct) << __func__ << " failed to get state: "\r
+ << cpp_strerror(errno) << dendl;\r
+ return true;\r
+ }\r
+ return qpa.cur_qp_state == IBV_QPS_ERR;\r
+ }\r
+ ibv_qp* get_qp() const { return qp; }\r
+ Infiniband::CompletionQueue* get_tx_cq() const { return txcq; }\r
+ Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; }\r
+ int to_reset();\r
+ int to_dead();\r
+ int get_fd() { return fd; }\r
+ private:\r
+ Infiniband& infiniband; // Infiniband to which this QP belongs\r
+ ibv_qp_type type; // QP type (IBV_QPT_RC, etc.)\r
+ ibv_context* ctxt; // device context of the HCA to use\r
+ int ib_physical_port;\r
+ ibv_pd* pd; // protection domain\r
+ ibv_srq* srq; // shared receive queue\r
+ ibv_qp* qp; // infiniband verbs QP handle\r
+ Infiniband::CompletionQueue* txcq;\r
+ Infiniband::CompletionQueue* rxcq;\r
+ uint32_t initial_psn; // initial packet sequence number\r
+ uint32_t max_send_wr;\r
+ uint32_t max_recv_wr;\r
+ uint32_t q_key;\r
+ int fd;\r
+ };\r
+\r
+ public:\r
+ typedef MemoryManager::Cluster Cluster;\r
+ typedef MemoryManager::Chunk Chunk;\r
+ QueuePair* create_queue_pair(ibv_qp_type type);\r
+ ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);\r
+ int post_chunk(Chunk* chunk);\r
+ int post_channel_cluster();\r
+ int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes) {\r
+ return memory_manager->get_send_buffers(c, bytes);\r
+ }\r
+ CompletionChannel *create_comp_channel();\r
+ CompletionQueue *create_comp_queue(CompletionChannel *cc=NULL);\r
+ uint8_t get_ib_physical_port() {\r
+ return ib_physical_port;\r
+ }\r
+ int send_udp_msg(int sd, IBSYNMsg& msg, entity_addr_t &peeraddr);\r
+ int recv_udp_msg(int sd, IBSYNMsg& msg, entity_addr_t *addr);\r
+ uint16_t get_lid() { return device->get_lid(); }\r
+ ibv_gid get_gid() { return device->get_gid(); }\r
+ MemoryManager* get_memory_manager() { return memory_manager; }\r
+ Device* get_device() { return device; }\r
+ static const char* wc_status_to_string(int status);\r
+};\r
+\r
+#endif\r
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- \r
+// vim: ts=8 sw=2 smarttab\r
+/*\r
+ * Ceph - scalable distributed file system\r
+ *\r
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>\r
+ *\r
+ * Author: Haomai Wang <haomaiwang@gmail.com>\r
+ *\r
+ * This is free software; you can redistribute it and/or\r
+ * modify it under the terms of the GNU Lesser General Public\r
+ * License version 2.1, as published by the Free Software\r
+ * Foundation. See file COPYING.\r
+ *\r
+ */\r
+\r
+#include "RDMAStack.h"\r
+\r
+#define dout_subsys ceph_subsys_ms\r
+#undef dout_prefix\r
+#define dout_prefix *_dout << " RDMAConnectedSocketImpl "\r
+\r
+int RDMAConnectedSocketImpl::activate() {\r
+ ibv_qp_attr qpa;\r
+ int r;\r
+\r
+ // now connect up the qps and switch to RTR\r
+ memset(&qpa, 0, sizeof(qpa));\r
+ qpa.qp_state = IBV_QPS_RTR;\r
+ qpa.path_mtu = IBV_MTU_1024;\r
+ qpa.dest_qp_num = peer_msg.qpn;\r
+ qpa.rq_psn = peer_msg.psn;\r
+ qpa.max_dest_rd_atomic = 1;\r
+ qpa.min_rnr_timer = 12;\r
+ //qpa.ah_attr.is_global = 0;\r
+ qpa.ah_attr.is_global = 1;\r
+ qpa.ah_attr.grh.hop_limit = 6;\r
+ qpa.ah_attr.grh.dgid = peer_msg.gid;\r
+ qpa.ah_attr.grh.sgid_index = 0;\r
+\r
+ qpa.ah_attr.dlid = peer_msg.lid;\r
+ qpa.ah_attr.sl = 0;\r
+ qpa.ah_attr.src_path_bits = 0;\r
+ qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port());\r
+\r
+ r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |\r
+ IBV_QP_AV |\r
+ IBV_QP_PATH_MTU |\r
+ IBV_QP_DEST_QPN |\r
+ IBV_QP_RQ_PSN |\r
+ IBV_QP_MIN_RNR_TIMER |\r
+ IBV_QP_MAX_DEST_RD_ATOMIC);\r
+ if (r) {\r
+ lderr(cct) << __func__ << " failed to transition to RTR state: "\r
+ << cpp_strerror(errno) << dendl;\r
+ return -1;\r
+ }\r
+\r
+ ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl;\r
+\r
+ // now move to RTS\r
+ qpa.qp_state = IBV_QPS_RTS;\r
+\r
+ // How long to wait before retrying if packet lost or server dead.\r
+ // Supposedly the timeout is 4.096us*2^timeout. However, the actual\r
+ // timeout appears to be 4.096us*2^(timeout+1), so the setting\r
+ // below creates a 135ms timeout.\r
+ qpa.timeout = 14;\r
+\r
+ // How many times to retry after timeouts before giving up.\r
+ qpa.retry_cnt = 7;\r
+\r
+ // How many times to retry after RNR (receiver not ready) condition\r
+ // before giving up. Occurs when the remote side has not yet posted\r
+ // a receive request.\r
+ qpa.rnr_retry = 7; // 7 is infinite retry.\r
+ qpa.sq_psn = my_msg.psn;\r
+ qpa.max_rd_atomic = 1;\r
+\r
+ r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |\r
+ IBV_QP_TIMEOUT |\r
+ IBV_QP_RETRY_CNT |\r
+ IBV_QP_RNR_RETRY |\r
+ IBV_QP_SQ_PSN |\r
+ IBV_QP_MAX_QP_RD_ATOMIC);\r
+ if (r) {\r
+ lderr(cct) << __func__ << " failed to transition to RTS state: "\r
+ << cpp_strerror(errno) << dendl;\r
+ return -1;\r
+ }\r
+\r
+ // the queue pair should be ready to use once the client has finished\r
+ // setting up their end.\r
+ ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;\r
+\r
+ connected = 1;//indicate successfully\r
+ return 0;\r
+}\r
+\r
+ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)\r
+{\r
+ ldout(cct, 20) << __func__ << " need to read bytes: " << len << " buffers size: " << buffers.size() << dendl;\r
+\r
+ ssize_t read = 0;\r
+ if (!buffers.empty())\r
+ read = read_buffers(buf,len);\r
+\r
+ static const int MAX_COMPLETIONS = 16;\r
+ ibv_wc wc[MAX_COMPLETIONS];\r
+\r
+ bool rearmed = false;\r
+ int n;\r
+ again:\r
+ n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);\r
+ ldout(cct, 20) << __func__ << " poll completion queue got " << n << " responses."<< dendl;\r
+ for (int i = 0; i < n; ++i) {\r
+ ibv_wc* response = &wc[i];\r
+ ldout(cct, 20) << __func__ << " cqe " << response->byte_len << " bytes." << dendl;\r
+ Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);\r
+ chunk->prepare_read(response->byte_len);\r
+ if (!response->byte_len) {\r
+ wait_close = true;\r
+ return 0;\r
+ }\r
+ if (response->status != IBV_WC_SUCCESS) {\r
+ lderr(cct) << __func__ << " poll cqe failed! " << " number: " << n << ", status: "<< response->status << cpp_strerror(errno) << dendl;\r
+ assert(0);\r
+ } else {\r
+ if (read == (ssize_t)len) {\r
+ buffers.push_back(chunk);\r
+ ldout(cct, 20) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;\r
+ } else if (read + response->byte_len > (ssize_t)len) {\r
+ read += chunk->read(buf+read, (ssize_t)len-read);\r
+ buffers.push_back(chunk);\r
+ ldout(cct, 20) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;\r
+ } else {\r
+ read += chunk->read(buf+read, response->byte_len);\r
+ assert(infiniband->post_chunk(chunk) == 0);\r
+ }\r
+ }\r
+ }\r
+\r
+ if (n)\r
+ goto again;\r
+ if (!rearmed) {\r
+ rx_cq->rearm_notify();\r
+ rearmed = true;\r
+ // Clean up cq events after rearm notify ensure no new incoming event\r
+ // arrived between polling and rearm\r
+ goto again;\r
+ }\r
+ if (read == 0)\r
+ return -EAGAIN;\r
+ return read;\r
+}\r
+\r
+ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)\r
+{\r
+ size_t read = 0, tmp = 0;\r
+ vector<Chunk*>::iterator c = buffers.begin();\r
+ for (; c != buffers.end() ; ++c) {\r
+ tmp = (*c)->read(buf+read, len-read);\r
+ read += tmp;\r
+ ldout(cct, 20) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl;\r
+ if ((*c)->over()) {\r
+ assert(infiniband->post_chunk(*c) == 0);\r
+ ldout(cct, 20) << __func__ << " one chunk over." << dendl;\r
+ }\r
+ if (read == len) {\r
+ break;\r
+ }\r
+ }\r
+\r
+ if (c != buffers.end() && (*c)->over())\r
+ c++;\r
+ buffers.erase(buffers.begin(), c);\r
+ ldout(cct, 20) << __func__ << " got " << read << " bytes here. buffers size: " << buffers.size() << dendl;\r
+ return read;\r
+}\r
+\r
+ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)\r
+{\r
+ ssize_t size = 0;\r
+ static const int MAX_COMPLETIONS = 16;\r
+ ibv_wc wc[MAX_COMPLETIONS];\r
+\r
+ bool rearmed = false;\r
+ int n;\r
+ again:\r
+ n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);\r
+ ldout(cct, 20) << __func__ << " pool completion queue got " << n << " responses."<< dendl;\r
+\r
+ ibv_wc* response;\r
+ Chunk* chunk;\r
+ bool loaded = false;\r
+ auto iter = buffers.begin();\r
+ if(iter != buffers.end()) {\r
+ chunk = *iter;\r
+ if (chunk->bound == 0) {\r
+ wait_close = true;\r
+ return 0;\r
+ }\r
+ auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
+ buffers.erase(iter);\r
+ loaded = true;\r
+ size = chunk->bound;\r
+ }\r
+\r
+ for (int i = 0; i < n; ++i) {\r
+ response = &wc[i];\r
+ chunk = reinterpret_cast<Chunk*>(response->wr_id);\r
+ chunk->prepare_read(response->byte_len);\r
+ if(!loaded && i == 0) {\r
+ if (chunk->bound == 0) {\r
+ wait_close = true;\r
+ return 0;\r
+ }\r
+ auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
+ size = chunk->bound;\r
+ continue;\r
+ }\r
+ buffers.push_back(chunk);\r
+ }\r
+\r
+ if (n)\r
+ goto again;\r
+ if (!rearmed) {\r
+ rx_cq->rearm_notify();\r
+ rearmed = true;\r
+ // Clean up cq events after rearm notify ensure no new incoming event\r
+ // arrived between polling and rearm\r
+ goto again;\r
+ }\r
+ if (size == 0)\r
+ return -EAGAIN;\r
+ return size;\r
+}\r
+\r
+ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)\r
+{\r
+ size_t bytes = bl.length();\r
+ if (!bytes)\r
+ return 0;\r
+ vector<Chunk*> tx_buffers;\r
+ if (infiniband->get_tx_buffers(tx_buffers, bytes) < 0) {\r
+ ldout(cct, 10) << __func__ << " no enough buffers" << dendl;\r
+ return 0;\r
+ }\r
+ ldout(cct, 20) << __func__ << " prepare " << bytes << " bytes, tx buffer count: " << tx_buffers.size() << dendl;\r
+ vector<Chunk*>::iterator current_buffer = tx_buffers.begin();\r
+ list<bufferptr>::const_iterator it = bl.buffers().begin();\r
+ while (it != bl.buffers().end()) {\r
+ const uintptr_t addr = reinterpret_cast<const uintptr_t>(it->c_str());\r
+ uint32_t copied = 0;\r
+ // ldout(cct, 20) << __func__ << " app_buffer: " << addr << " length: " << it->length() << dendl;\r
+ while(copied < it->length()) {\r
+ // ldout(cct, 20) << __func__ << " current_buffer: " << *current_buffer << " copied: " << copied << dendl;\r
+ size_t ret = (*current_buffer)->write((char*)addr+copied, it->length() - copied);\r
+ copied += ret;\r
+ // ldout(cct, 20) << __func__ << " ret: " << ret << " copied: " << copied << dendl;\r
+ if((*current_buffer)->full()){\r
+ ++current_buffer;\r
+ }\r
+ }\r
+ ++it;\r
+ }\r
+\r
+ ssize_t r = post_work_request(tx_buffers);\r
+ if (r < 0)\r
+ return r;\r
+\r
+ ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl;\r
+ bl.clear();\r
+ return bytes;\r
+}\r
+\r
+int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)\r
+{\r
+ vector<Chunk*>::iterator current_buffer = tx_buffers.begin();\r
+ ibv_sge isge[tx_buffers.size()];\r
+ uint32_t current_sge = 0;\r
+ ibv_send_wr iswr[tx_buffers.size()];\r
+ uint32_t current_swr = 0;\r
+ ibv_send_wr* pre_wr = NULL;\r
+\r
+ current_buffer = tx_buffers.begin();\r
+ while (current_buffer != tx_buffers.end()) {\r
+ isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer);\r
+ isge[current_sge].length = (*current_buffer)->get_offset();\r
+ isge[current_sge].lkey = (*current_buffer)->mr->lkey;\r
+ ldout(cct, 20) << __func__ << " current_buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl;\r
+\r
+ iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer);\r
+ iswr[current_swr].next = NULL;\r
+ iswr[current_swr].sg_list = &isge[current_sge];\r
+ iswr[current_swr].num_sge = 1;\r
+ iswr[current_swr].opcode = IBV_WR_SEND;\r
+ iswr[current_swr].send_flags = IBV_SEND_SIGNALED;\r
+ /*if(isge[current_sge].length < infiniband->max_inline_data) {\r
+ iswr[current_swr].send_flags = IBV_SEND_INLINE;\r
+ ldout(cct, 20) << __func__ << " send_inline." << dendl;\r
+ }*/\r
+\r
+ if (pre_wr)\r
+ pre_wr->next = &iswr[current_swr];\r
+ pre_wr = &iswr[current_swr];\r
+ ++current_sge;\r
+ ++current_swr;\r
+ ++current_buffer;\r
+ }\r
+\r
+ ibv_send_wr *bad_tx_work_request;\r
+ if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {\r
+ lderr(cct) << __func__ << " failed to send data"\r
+ << " (most probably should be peer not ready): "\r
+ << cpp_strerror(errno) << dendl;\r
+ return -errno;\r
+ }\r
+ return 0;\r
+}\r
+\r
+void RDMAConnectedSocketImpl::fin() {\r
+ //ibv_sge list;\r
+ //memset(&list, 0, sizeof(list));\r
+ ibv_send_wr wr;\r
+ memset(&wr, 0, sizeof(wr));\r
+ wr.wr_id = reinterpret_cast<uint64_t>(this);\r
+ wr.num_sge = 0;\r
+ //wr.sg_list = &list;\r
+ wr.opcode = IBV_WR_SEND;\r
+ wr.send_flags = IBV_SEND_SIGNALED;\r
+ ibv_send_wr* bad_tx_work_request;\r
+ if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {\r
+ lderr(cct) << __func__ << " failed to send FIN"\r
+ << "(most probably should be peer not ready): "\r
+ << cpp_strerror(errno) << dendl;\r
+ return ;\r
+ }\r
+}\r
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- \r
+// vim: ts=8 sw=2 smarttab\r
+/*\r
+ * Ceph - scalable distributed file system\r
+ *\r
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>\r
+ *\r
+ * Author: Haomai Wang <haomaiwang@gmail.com>\r
+ *\r
+ * This is free software; you can redistribute it and/or\r
+ * modify it under the terms of the GNU Lesser General Public\r
+ * License version 2.1, as published by the Free Software\r
+ * Foundation. See file COPYING.\r
+ *\r
+ */\r
+\r
+#include "msg/async/net_handler.h"\r
+#include "RDMAStack.h"\r
+\r
+#define dout_subsys ceph_subsys_ms\r
+#undef dout_prefix\r
+#define dout_prefix *_dout << " RDMAServerSocketImpl "\r
+\r
+int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)\r
+{\r
+ server_setup_socket = ::socket(sa.get_family(), SOCK_DGRAM, 0);\r
+ if (server_setup_socket == -1) {\r
+ lderr(cct) << __func__ << " failed to create server socket: "\r
+ << cpp_strerror(errno) << dendl;\r
+ return -errno;\r
+ }\r
+\r
+ int on = 1;\r
+ int rc = ::setsockopt(server_setup_socket, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));\r
+ if (rc < 0) {\r
+ lderr(cct) << __func__ << " unable to setsockopt: " << cpp_strerror(errno) << dendl;\r
+ goto err;\r
+ }\r
+\r
+ rc = ::bind(server_setup_socket, sa.get_sockaddr(), sa.get_sockaddr_len());\r
+ if (rc < 0) {\r
+ lderr(cct) << __func__ << " unable to bind to " << sa.get_sockaddr()\r
+ << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;\r
+ goto err;\r
+ }\r
+\r
+ rc = net.set_nonblock(server_setup_socket);\r
+ if (rc < 0) {\r
+ goto err;\r
+ }\r
+\r
+ net.set_close_on_exec(server_setup_socket);\r
+\r
+ ldout(cct, 20) << __func__ << " bind to " << sa.get_sockaddr() << " on port " << sa.get_port() << dendl;\r
+ return 0;\r
+\r
+err:\r
+ ::close(server_setup_socket);\r
+ server_setup_socket = -1;\r
+ return -1;\r
+}\r
+\r
+int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out)\r
+{\r
+ ldout(cct, 15) << __func__ << dendl;\r
+ int r;\r
+ RDMAConnectedSocketImpl* server;\r
+ while (1) {\r
+ IBSYNMsg msg;//TODO\r
+ entity_addr_t addr;\r
+ r = infiniband->recv_udp_msg(server_setup_socket, msg, &addr);\r
+ if (r < 0) {\r
+ r = -errno;\r
+ if (r != -EAGAIN)\r
+ ldout(cct, 10) << __func__ << " recv msg failed:" << cpp_strerror(errno)<< dendl;\r
+ break;\r
+ } else if (r > 0) {\r
+ ldout(cct, 1) << __func__ << " recv msg not whole." << dendl;\r
+ continue;\r
+ } else {\r
+ //RDMAWorker* w = static_cast<RDMAWorker*>(infiniband->stack->get_worker());\r
+ server = new RDMAConnectedSocketImpl(cct, infiniband, NULL, msg);\r
+ msg = server->get_my_msg();\r
+ r = infiniband->send_udp_msg(server_setup_socket, msg, addr);\r
+ server->activate();\r
+ std::unique_ptr<RDMAConnectedSocketImpl> csi(server);\r
+ *sock = ConnectedSocket(std::move(csi));\r
+ if(out)\r
+ *out = sa;\r
+ return r;\r
+ }\r
+ }\r
+\r
+ return r;\r
+}\r
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "RDMAStack.h"
+#include "include/str_list.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << "RDMAStack "
+
+static Infiniband* global_infiniband;
+
+int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
+{
+ auto p = new RDMAServerSocketImpl(cct, infiniband, sa);
+ int r = p->listen(sa, opt);
+ if (r < 0) {
+ delete p;
+ return r;
+ }
+
+ *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p));
+ return 0;
+}
+
+int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
+{
+ RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, this);
+ entity_addr_t sa;
+ memcpy(&sa, &addr, sizeof(addr));
+
+ IBSYNMsg msg = p->get_my_msg();
+ ldout(cct, 20) << __func__ << " connecting to " << sa.get_sockaddr() << " : " << sa.get_port() << dendl;
+ ldout(cct, 20) << __func__ << " my syn msg : < " << msg.qpn << ", " << msg.psn << ", " << msg.lid << ">"<< dendl;
+
+ client_setup_socket = ::socket(PF_INET, SOCK_DGRAM, 0);
+ if (client_setup_socket == -1) {
+ lderr(cct) << __func__ << " failed to create client socket: " << strerror(errno) << dendl;
+ return -errno;
+ }
+
+ int r = ::connect(client_setup_socket, addr.get_sockaddr(), addr.get_sockaddr_len());
+ if (r < 0) {
+ lderr(cct) << __func__ << " failed to connect " << addr << ": "
+ << strerror(errno) << dendl;
+ return -errno;
+ }
+
+ r = infiniband->send_udp_msg(client_setup_socket, msg, sa);
+ if (r < 0) {
+ ldout(cct, 0) << __func__ << " send msg failed." << dendl;
+ return r;
+ }
+
+ // FIXME: need to make this async
+ r = infiniband->recv_udp_msg(client_setup_socket, msg, &sa);
+ if (r < 0) {
+ ldout(cct, 0) << __func__ << " recv msg failed." << dendl;
+ return r;
+ }
+ p->set_peer_msg(msg);
+ ldout(cct, 20) << __func__ << " peer msg : < " << msg.qpn << ", " << msg.psn << ", " << msg.lid << "> " << dendl;
+ r = p->activate();
+ assert(!r);
+ std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
+ *socket = ConnectedSocket(std::move(csi));
+
+ return 0;
+}
+
+RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
+{
+ if (!global_infiniband)
+ global_infiniband = new Infiniband(
+ this, cct, cct->_conf->ms_async_rdma_device_name);
+}
+
+void RDMAWorker::initialize()
+{
+ infiniband = global_infiniband;
+ tx_cc = infiniband->create_comp_channel();
+ tx_cq = infiniband->create_comp_queue(tx_cc);
+ center.create_file_event(tx_cc->get_fd(), EVENT_READABLE, tx_handler);
+ memory_manager = infiniband->get_memory_manager();
+}
+
+void RDMAWorker::handle_tx_event()
+{
+ ldout(cct, 20) << __func__ << dendl;
+ if (!tx_cc->get_cq_event())
+ return ;
+
+ static const int MAX_COMPLETIONS = 16;
+ ibv_wc wc[MAX_COMPLETIONS];
+
+ bool rearmed = false;
+ int n;
+ again:
+ n = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
+ ldout(cct, 20) << __func__ << " pool completion queue got " << n
+ << " responses."<< dendl;
+ for (int i = 0; i < n; ++i) {
+ ibv_wc* response = &wc[i];
+ Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+ ldout(cct, 20) << __func__ << " opcode: " << response->opcode << " len: " << response->byte_len << dendl;
+
+ if (response->status != IBV_WC_SUCCESS) {
+ if (response->status == IBV_WC_RETRY_EXC_ERR) {
+ lderr(cct) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
+ } else if (response->status == IBV_WC_WR_FLUSH_ERR) {
+ lderr(cct) << __func__ << " Work Request Flushed Error: this connection's qp="
+ << response->qp_num << " should be down while this WR=" << response->wr_id
+ << " still in flight." << dendl;
+ } else {
+ lderr(cct) << __func__ << " send work request returned error for buffer("
+ << response->wr_id << ") status(" << response->status << "): "
+ << infiniband->wc_status_to_string(response->status) << dendl;
+ }
+ assert(0);
+ }
+
+ if (memory_manager->is_tx_chunk(chunk))
+ infiniband->get_memory_manager()->return_tx(chunk);
+ else
+ ldout(cct, 20) << __func__ << " chunk belongs to none " << dendl;
+ }
+
+ if (n)
+ goto again;
+
+ if (!rearmed) {
+ tx_cq->rearm_notify();
+ rearmed = true;
+ // Clean up cq events after rearm notify ensure no new incoming event
+ // arrived between polling and rearm
+ goto again;
+ }
+ ldout(cct, 20) << __func__ << " leaving handle_tx_event. " << dendl;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MSG_RDMASTACK_H
+#define CEPH_MSG_RDMASTACK_H
+
+#include "common/ceph_context.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "msg/async/Stack.h"
+#include <thread>
+#include "Infiniband.h"
+
+class RDMAConnectedSocketImpl;
+
+class RDMAWorker : public Worker {
+ typedef Infiniband::CompletionQueue CompletionQueue;
+ typedef Infiniband::CompletionChannel CompletionChannel;
+ typedef Infiniband::MemoryManager::Chunk Chunk;
+ typedef Infiniband::MemoryManager MemoryManager;
+ int client_setup_socket;
+ Infiniband* infiniband;
+ CompletionQueue* tx_cq; // common completion queue for all transmits
+ CompletionChannel* tx_cc;
+ EventCallbackRef tx_handler;
+ MemoryManager* memory_manager;
+ vector<RDMAConnectedSocketImpl*> to_delete;
+ class C_handle_cq_tx : public EventCallback {
+ RDMAWorker *worker;
+ public:
+ C_handle_cq_tx(RDMAWorker *w): worker(w) {}
+ void do_request(int fd) {
+ worker->handle_tx_event();
+ }
+ };
+
+ public:
+ explicit RDMAWorker(CephContext *c, unsigned i)
+ : Worker(c, i), infiniband(NULL), tx_handler(new C_handle_cq_tx(this)) {}
+ virtual ~RDMAWorker() {
+ tx_cc->ack_events();
+ delete tx_cq;
+ delete tx_cc;
+ delete tx_handler;
+ }
+
+ virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override;
+ virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
+ virtual void initialize() override;
+ void handle_tx_event();
+ CompletionQueue* get_tx_cq() { return tx_cq; }
+ void remove_to_delete(RDMAConnectedSocketImpl* csi) {
+ if (to_delete.empty())
+ return ;
+ vector<RDMAConnectedSocketImpl*>::iterator iter = to_delete.begin();
+ for (; iter != to_delete.end(); ++iter) {
+ if(csi == *iter) {
+ to_delete.erase(iter);
+ }
+ }
+ }
+ void add_to_delete(RDMAConnectedSocketImpl* csi) {
+ to_delete.push_back(csi);
+ }
+};
+
+class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
+ public:
+ typedef Infiniband::MemoryManager::Chunk Chunk;
+ typedef Infiniband::CompletionChannel CompletionChannel;
+ typedef Infiniband::CompletionQueue CompletionQueue;
+
+ private:
+ CephContext *cct;
+ Infiniband::QueuePair *qp;
+ IBSYNMsg peer_msg;
+ IBSYNMsg my_msg;
+ int connected;
+ Infiniband* infiniband;
+ RDMAWorker* worker;
+ std::vector<Chunk*> buffers;
+ CompletionChannel* rx_cc;
+ CompletionQueue* rx_cq;
+ bool wait_close;
+
+ public:
+ RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMAWorker* w, IBSYNMsg im = IBSYNMsg()) : cct(cct), peer_msg(im), infiniband(ib), worker(w), wait_close(false) {
+ qp = infiniband->create_queue_pair(IBV_QPT_RC);
+ rx_cq = qp->get_rx_cq();
+ rx_cc = rx_cq->get_cc();
+ my_msg.qpn = qp->get_local_qp_number();
+ my_msg.psn = qp->get_initial_psn();
+ my_msg.lid = infiniband->get_lid();
+ my_msg.gid = infiniband->get_gid();
+ }
+
+ virtual int is_connected() override {
+ return connected;
+ }
+ virtual ssize_t read(char* buf, size_t len) override;
+ virtual ssize_t zero_copy_read(bufferptr &data) override;
+ virtual ssize_t send(bufferlist &bl, bool more) override;
+ virtual void shutdown() override {
+ }
+ virtual void close() override {
+ if (!wait_close) {
+ fin();
+ worker->add_to_delete(this);
+ } else {
+ clear_all();
+ }
+ }
+ virtual int fd() const override {
+ return rx_cc->get_fd();
+ }
+ void clear_all() {
+ delete qp;
+ rx_cc->ack_events();
+ delete rx_cq;
+ rx_cq = NULL;
+ if (!wait_close)
+ worker->remove_to_delete(this);
+ }
+ int activate();
+ ssize_t read_buffers(char* buf, size_t len);
+ int poll_cq(int num_entries, ibv_wc *ret_wc_array);
+ IBSYNMsg get_my_msg() { return my_msg; }
+ IBSYNMsg get_peer_msg() { return peer_msg; }
+ void set_peer_msg(IBSYNMsg m) { peer_msg = m ;}
+ int post_work_request(std::vector<Chunk*>&);
+ void fin();
+};
+
+
+class RDMAServerSocketImpl : public ServerSocketImpl {
+ CephContext *cct;
+ NetHandler net;
+ int server_setup_socket;
+ Infiniband* infiniband;
+ entity_addr_t sa;
+ public:
+ RDMAServerSocketImpl(CephContext *cct, Infiniband* i, entity_addr_t& a)
+ : cct(cct), net(cct), infiniband(i), sa(a) {}
+ int listen(entity_addr_t &sa, const SocketOptions &opt);
+ virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out) override;
+ virtual void abort_accept() override {}
+ virtual int fd() const override {
+ return server_setup_socket;
+ }
+};
+
+
+class RDMAStack : public NetworkStack {
+ vector<std::thread> threads;
+
+ public:
+ explicit RDMAStack(CephContext *cct, const string &t);
+ virtual bool support_zero_copy_read() const override { return true; }
+ //virtual bool support_local_listen_table() const { return true; }
+
+ virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override {
+ threads.resize(i+1);
+ threads[i] = std::move(std::thread(func));
+ }
+ virtual void join_worker(unsigned i) override {
+ assert(threads.size() > i && threads[i].joinable());
+ threads[i].join();
+ }
+};
+
+#endif