From: Haomai Wang Date: Fri, 15 Jul 2016 09:02:48 +0000 (+0800) Subject: msg/async: add rdma plugin support X-Git-Tag: v11.1.0~416^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=863066c011ecc437996c3a8a3faa3b21c09d2ba7;p=ceph.git msg/async: add rdma plugin support Signed-off-by: Zhi Wang --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 34866893e65..a433e5937d3 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -213,6 +213,11 @@ OPTION(ms_async_set_affinity, OPT_BOOL, true) // core OPTION(ms_async_affinity_cores, OPT_STR, "") OPTION(ms_async_send_inline, OPT_BOOL, false) +OPTION(ms_async_rdma_device_name, OPT_STR, "") +OPTION(ms_async_rdma_enable_hugepage, OPT_BOOL, false) +OPTION(ms_async_rdma_buffer_size, OPT_INT, 8192) +OPTION(ms_async_rdma_send_buffers, OPT_U32, 32) +OPTION(ms_async_rdma_receive_buffers, OPT_U32, 64) OPTION(inject_early_sigterm, OPT_BOOL, false) diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc index e267eb869e8..5537a391936 100644 --- a/src/msg/async/Stack.cc +++ b/src/msg/async/Stack.cc @@ -17,6 +17,9 @@ #include "common/Cond.h" #include "common/errno.h" #include "PosixStack.h" +#ifdef HAVE_RDMA +#include "rdma/RDMAStack.h" +#endif #include "common/dout.h" #include "include/assert.h" @@ -54,6 +57,10 @@ std::shared_ptr NetworkStack::create(CephContext *c, const string { if (t == "posix") return std::make_shared(c, t); +#ifdef HAVE_RDMA + else if (t == "rdma") + return std::make_shared(c, t); +#endif return nullptr; } @@ -62,6 +69,10 @@ Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned { if (type == "posix") return new PosixWorker(c, i); +#ifdef HAVE_RDMA + else if (type == "rdma") + return new RDMAWorker(c, i); +#endif return nullptr; } diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc new file mode 100644 index 00000000000..fb34fecf9a7 --- /dev/null +++ b/src/msg/async/rdma/Infiniband.cc @@ -0,0 +1,538 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "Infiniband.h" +#include "common/errno.h" +#include "common/debug.h" +#include "RDMAStack.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << "Infiniband " + +static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1; +static const uint32_t MAX_INLINE_DATA = 128; +static const uint32_t UDP_MSG_LEN = sizeof("0000:00000000:00000000:00000000000000000000000000000000") - 1; + +Device::Device(CephContext *c, ibv_device* d): cct(c), device(d), device_attr(new ibv_device_attr) +{ + if (device == NULL) { + lderr(cct) << __func__ << "device == NULL" << cpp_strerror(errno) << dendl; + assert(0); + } + name = ibv_get_device_name(device); + ctxt = ibv_open_device(device); + if (ctxt == NULL) { + lderr(cct) << __func__ << "open rdma device failed. " << cpp_strerror(errno) << dendl; + assert(0); + } + int r = ibv_query_device(ctxt, device_attr); + if (r == -1) { + lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl; + assert(0); + } + port_cnt = device_attr->phys_port_cnt; + ports = new Port*[port_cnt]; + for (uint8_t i = 0; i < port_cnt; ++i) { + ports[i] = new Port(cct, ctxt, i+1); + if (ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) { + active_port = ports[i]; + ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl; + } else { + ldout(cct, 10) << __func__ << " port " << i+1 << " is unactive(" << ports[i]->get_port_attr()->state << ")"<< dendl; + } + } + if (!active_port) { + lderr(cct) << __func__ << " no active port found" << dendl; + assert(active_port); + } +} + +Infiniband::Infiniband(RDMAStack* s, CephContext *c, const std::string &device_name): cct(c), device_list(c), net(c), stack(s) +{ + device = device_list.get_device(device_name.c_str()); + assert(device); + ib_physical_port = device->active_port->get_port_num(); + pd = new ProtectionDomain(cct, device); + assert(net.set_nonblock(device->ctxt->async_fd) == 0); + + max_recv_wr = device->device_attr->max_srq_wr; + if (max_recv_wr < cct->_conf->ms_async_rdma_receive_buffers) { + ldout(cct, 0) << __func__ << " max allowed receive buffers is " << max_recv_wr << " use this instead." << dendl; + max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers; + } + max_send_wr = device->device_attr->max_qp_wr; + if (max_send_wr < cct->_conf->ms_async_rdma_send_buffers) { + ldout(cct, 0) << __func__ << " max allowed send buffers is " << max_send_wr << " use this instead." << dendl; + max_send_wr = cct->_conf->ms_async_rdma_send_buffers; + } + + ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe + << " completion entries" << dendl; + + memory_manager = new MemoryManager(cct, device, pd); + memory_manager->register_rx_tx( + cct->_conf->ms_async_rdma_buffer_size, + cct->_conf->ms_async_rdma_receive_buffers, + cct->_conf->ms_async_rdma_send_buffers); + + srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT); + post_channel_cluster(); +} + +/** + * Create a shared receive queue. This basically wraps the verbs call. + * + * \param[in] max_wr + * The max number of outstanding work requests in the SRQ. + * \param[in] max_sge + * The max number of scatter elements per WR. + * \return + * A valid ibv_srq pointer, or NULL on error. + */ +ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge) +{ + ldout(cct, 20) << __func__ << " max_wr=" << max_wr << " max_sge=" << max_sge << dendl; + ibv_srq_init_attr sia; + memset(&sia, 0, sizeof(sia)); + sia.srq_context = device->ctxt; + sia.attr.max_wr = max_wr; + sia.attr.max_sge = max_sge; + return ibv_create_srq(pd->pd, &sia); +} + +/** + * Create a new QueuePair. This factory should be used in preference to + * the QueuePair constructor directly, since this lets derivatives of + * Infiniband, e.g. MockInfiniband (if it existed), + * return mocked out QueuePair derivatives. + * + * \return + * QueuePair on success or NULL if init fails + * See QueuePair::QueuePair for parameter documentation. + */ +Infiniband::QueuePair* Infiniband::create_queue_pair(ibv_qp_type type) +{ + Infiniband::CompletionChannel* cc = create_comp_channel(); + if (!cc) + return NULL; + + Infiniband::CompletionQueue* cq = create_comp_queue(cc); + if (!cq) { + delete cc; + lderr(cct) << __func__ << " failed to create cq." << dendl; + return NULL; + } + + RDMAWorker* w = static_cast(stack->get_worker()); + Infiniband::QueuePair *qp = new QueuePair(*this, type, ib_physical_port, srq, w->get_tx_cq(), cq, max_send_wr, max_recv_wr); + if (qp->init()) { + delete cc; + delete cq; + delete qp; + return NULL; + } + return qp; +} + +int Infiniband::QueuePair::init() +{ + ldout(infiniband.cct, 20) << __func__ << " started." << dendl; + ibv_qp_init_attr qpia; + memset(&qpia, 0, sizeof(qpia)); + qpia.send_cq = txcq->get_cq(); + qpia.recv_cq = rxcq->get_cq(); + qpia.srq = srq; // use the same shared receive queue + qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests + qpia.cap.max_send_sge = 1; // max send scatter-gather elements + qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q + qpia.qp_type = type; // RC, UC, UD, or XRC + qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs + + qp = ibv_create_qp(pd, &qpia); + if (qp == NULL) { + lderr(infiniband.cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl; + return -1; + } + + ldout(infiniband.cct, 20) << __func__ << " successfully create queue pair: " + << "qp=" << qp << dendl; + + // move from RESET to INIT state + ibv_qp_attr qpa; + memset(&qpa, 0, sizeof(qpa)); + qpa.qp_state = IBV_QPS_INIT; + qpa.pkey_index = 0; + qpa.port_num = (uint8_t)(ib_physical_port); + qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE; + qpa.qkey = q_key; + + int mask = IBV_QP_STATE | IBV_QP_PORT; + switch (type) { + case IBV_QPT_RC: + mask |= IBV_QP_ACCESS_FLAGS; + mask |= IBV_QP_PKEY_INDEX; + break; + case IBV_QPT_UD: + mask |= IBV_QP_QKEY; + mask |= IBV_QP_PKEY_INDEX; + break; + case IBV_QPT_RAW_PACKET: + break; + default: + assert(0); + } + + int ret = ibv_modify_qp(qp, &qpa, mask); + if (ret) { + ibv_destroy_qp(qp); + lderr(infiniband.cct) << __func__ << " failed to transition to INIT state: " + << cpp_strerror(errno) << dendl; + return -1; + } + ldout(infiniband.cct, 20) << __func__ << " successfully change queue pair to INIT:" + << " qp=" << qp << dendl; + return 0; +} + +int Infiniband::post_chunk(Chunk* chunk) +{ + ibv_sge isge; + isge.addr = reinterpret_cast(chunk->buffer); + isge.length = chunk->bytes; + isge.lkey = chunk->mr->lkey; + ibv_recv_wr rx_work_request; + + memset(&rx_work_request, 0, sizeof(rx_work_request)); + rx_work_request.wr_id = reinterpret_cast(chunk);// stash descriptor ptr + rx_work_request.next = NULL; + rx_work_request.sg_list = &isge; + rx_work_request.num_sge = 1; + + ibv_recv_wr *badWorkRequest; + int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest); + if (ret) { + lderr(cct) << __func__ << " ib_post_srq_recv failed on post " + << cpp_strerror(errno) << dendl; + return -1; + } + return 0; +} + +int Infiniband::post_channel_cluster() +{ + vector free_chunks; + int r = memory_manager->get_channel_buffers(free_chunks, 0); + assert(r == 0); + for (vector::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) { + r = post_chunk(*iter); + assert(r == 0); + } + ldout(cct, 20) << __func__ << " posted buffers to srq. "<< dendl; + return 0; +} + +Infiniband::CompletionChannel* Infiniband::create_comp_channel() +{ + ldout(cct, 20) << __func__ << " started." << dendl; + Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(*this); + if (cc->init()) { + delete cc; + return NULL; + } + return cc; +} + +Infiniband::CompletionQueue* Infiniband::create_comp_queue(CompletionChannel *cc) +{ + ldout(cct, 20) << __func__ << " completion channel=" << cc << dendl; + Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(*this, max_recv_wr, cc); + if (cq->init()) { + delete cq; + return NULL; + } + return cq; +} + + +Infiniband::QueuePair::QueuePair( + Infiniband& infiniband, ibv_qp_type type, int port, ibv_srq *srq, + Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, + uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key) +: infiniband(infiniband), + type(type), + ctxt(infiniband.device->ctxt), + ib_physical_port(port), + pd(infiniband.pd->pd), + srq(srq), + qp(NULL), + txcq(txcq), + rxcq(rxcq), + initial_psn(0), + max_send_wr(max_send_wr), + max_recv_wr(max_recv_wr), + q_key(q_key) +{ + initial_psn = lrand48() & 0xffffff; + if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) { + lderr(infiniband.cct) << __func__ << "invalid queue pair type" << cpp_strerror(errno) << dendl; + assert(0); + } + pd = infiniband.pd->pd; +} + +// 1 means no valid buffer read, 0 means got enough buffer +// else return < 0 means error +int Infiniband::recv_udp_msg(int sd, IBSYNMsg& im, entity_addr_t *addr) +{ + assert(sd >= 0); + ssize_t r; + entity_addr_t socket_addr; + struct sockaddr from; + socklen_t slen = sizeof(from); + char msg[UDP_MSG_LEN]; + char gid[32]; + r = ::recvfrom(sd, &msg, sizeof(msg), 0, &from, &slen); + // Drop incoming qpt + if (cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (rand() % cct->_conf->ms_inject_socket_failures == 0) { + ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; + r = -1; + } + } + if (r == -1) { + lderr(cct) << __func__ << " recv got error " << errno << ": " + << cpp_strerror(errno) << dendl; + return -1; + } else if ((size_t)r != sizeof(msg)) { // valid message length + lderr(cct) << __func__ << " recv got bad length (" << r << ")." << cpp_strerror(errno) << dendl; + return 1; + } else { // valid message + socket_addr.set_sockaddr(&from); + if (addr) { + *addr = socket_addr; + } + sscanf(msg, "%04x:%08x:%08x:%s", &(im.lid), &(im.qpn), &(im.psn), gid); + wire_gid_to_gid(gid, &(im.gid)); + ldout(cct, 10) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << gid << dendl; + return 0; + } +} + +int Infiniband::send_udp_msg(int sd, IBSYNMsg& im, entity_addr_t &peeraddr) +{ + assert(sd >= 0); + int retry = 0; + ssize_t r; + + char msg[UDP_MSG_LEN]; + char gid[32]; +retry: + gid_to_wire_gid(&(im.gid), gid); + r = snprintf(msg, UDP_MSG_LEN, "%04x:%08x:%08x:%s", im.lid, im.qpn, im.psn, gid); + ldout(cct, 20) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << gid << " r=" << r << dendl; + r = ::sendto(sd, msg, sizeof(msg), 0, peeraddr.get_sockaddr(), + peeraddr.get_sockaddr_len()); + // Drop incoming qpt + if (cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (rand() % cct->_conf->ms_inject_socket_failures == 0) { + ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; + r = -1; + } + } + + if ((size_t)r != sizeof(msg)) { + if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) { + retry++; + goto retry; + } + if (r < 0) + lderr(cct) << __func__ << " send returned error " << errno << ": " + << cpp_strerror(errno) << dendl; + else + lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl; + return -1; + } + return 0; +} + +void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid) +{ + char tmp[9]; + uint32_t v32; + int i; + + for (tmp[8] = 0, i = 0; i < 4; ++i) { + memcpy(tmp, wgid + i * 8, 8); + sscanf(tmp, "%x", &v32); + *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32); + } +} + +void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[]) +{ + for (int i = 0; i < 4; ++i) + sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4))); +} + +Infiniband::QueuePair::~QueuePair() +{ + if (qp) + assert(!ibv_destroy_qp(qp)); + ldout(infiniband.cct, 20) << __func__ << " successfully destroyed QueuePair." << dendl; +} + +Infiniband::CompletionChannel::~CompletionChannel() +{ + if (channel) { + int r = ibv_destroy_comp_channel(channel); + ldout(infiniband.cct, 20) << __func__ << " r: " << r << dendl; + assert(r == 0); + } + ldout(infiniband.cct, 20) << __func__ << " successfully destroyed CompletionChannel." << dendl; +} + +Infiniband::CompletionQueue::~CompletionQueue() +{ + if (cq) { + int r = ibv_destroy_cq(cq); + ldout(infiniband.cct, 20) << __func__ << " r: " << cpp_strerror(errno) << dendl; + assert(r == 0); + } + ldout(infiniband.cct, 20) << __func__ << " successfully destroyed CompletionQueue." << dendl; +} + +int Infiniband::CompletionQueue::rearm_notify(bool solicite_only) +{ + ldout(infiniband.cct, 20) << __func__ << " started." << dendl; + int r = ibv_req_notify_cq(cq, 0); + if (r) { + lderr(infiniband.cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl; + } + return r; +} + +int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) { + int r = ibv_poll_cq(cq, num_entries, ret_wc_array); + if (r < 0) { + lderr(infiniband.cct) << __func__ << " poll_completion_queue occur met error: " + << cpp_strerror(errno) << dendl; + return -1; + } + return r; +} + +bool Infiniband::CompletionChannel::get_cq_event() +{ + // ldout(infiniband.cct, 21) << __func__ << " started." << dendl; + ibv_cq *cq = NULL; + void *ev_ctx; + if (ibv_get_cq_event(channel, &cq, &ev_ctx)) { + if (errno != EAGAIN && errno != EINTR) + lderr(infiniband.cct) << __func__ << "failed to retrieve CQ event: " + << cpp_strerror(errno) << dendl; + return false; + } + + /* accumulate number of cq events that need to + * * be acked, and periodically ack them + * */ + if (++cq_events_that_need_ack == MAX_ACK_EVENT) { + ldout(infiniband.cct, 20) << __func__ << " ack aq events." << dendl; + ibv_ack_cq_events(cq, MAX_ACK_EVENT); + cq_events_that_need_ack = 0; + } + + return true; +} + +int Infiniband::CompletionQueue::init() +{ + cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0); + if (!cq) { + lderr(infiniband.cct) << __func__ << " failed to create receive completion queue: " + << cpp_strerror(errno) << dendl; + return -1; + } + + if (ibv_req_notify_cq(cq, 0)) { + lderr(infiniband.cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl; + ibv_destroy_cq(cq); + return -1; + } + + channel->bind_cq(cq); + ldout(infiniband.cct, 20) << __func__ << " successfully create cq=" << cq << dendl; + return 0; +} + +int Infiniband::CompletionChannel::init() +{ + ldout(infiniband.cct, 20) << __func__ << " started." << dendl; + channel = ibv_create_comp_channel(infiniband.device->ctxt); + if (!channel) { + lderr(infiniband.cct) << __func__ << " failed to create receive completion channel: " + << cpp_strerror(errno) << dendl; + return -1; + } + int rc = infiniband.net.set_nonblock(channel->fd); + if (rc < 0) { + ibv_destroy_comp_channel(channel); + return -1; + } + return 0; +} + +/** + * Given a string representation of the `status' field from Verbs + * struct `ibv_wc'. + * + * \param[in] status + * The integer status obtained in ibv_wc.status. + * \return + * A string corresponding to the given status. + */ +const char* Infiniband::wc_status_to_string(int status) +{ + static const char *lookup[] = { + "SUCCESS", + "LOC_LEN_ERR", + "LOC_QP_OP_ERR", + "LOC_EEC_OP_ERR", + "LOC_PROT_ERR", + "WR_FLUSH_ERR", + "MW_BIND_ERR", + "BAD_RESP_ERR", + "LOC_ACCESS_ERR", + "REM_INV_REQ_ERR", + "REM_ACCESS_ERR", + "REM_OP_ERR", + "RETRY_EXC_ERR", + "RNR_RETRY_EXC_ERR", + "LOC_RDD_VIOL_ERR", + "REM_INV_RD_REQ_ERR", + "REM_ABORT_ERR", + "INV_EECN_ERR", + "INV_EEC_STATE_ERR", + "FATAL_ERR", + "RESP_TIMEOUT_ERR", + "GENERAL_ERR" + }; + + if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR) + return ""; + return lookup[status]; +} diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h new file mode 100644 index 00000000000..22dee795de4 --- /dev/null +++ b/src/msg/async/rdma/Infiniband.h @@ -0,0 +1,598 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_INFINIBAND_H +#define CEPH_INFINIBAND_H + +#include + +#include +#include + +#include "include/int_types.h" +#include "include/page.h" +#include "common/debug.h" +#include "common/errno.h" +#include "msg/msg_types.h" +#include "msg/async/net_handler.h" +#include "common/Mutex.h" + +#define HUGE_PAGE_SIZE (2 * 1024 * 1024) +#define ALIGN_TO_PAGE_SIZE(x) \ + (((x) + HUGE_PAGE_SIZE -1) / HUGE_PAGE_SIZE * HUGE_PAGE_SIZE) + +struct IBSYNMsg { + uint16_t lid; + uint32_t qpn; + uint32_t psn; + union ibv_gid gid; +} __attribute__((packed)); + +class RDMAStack; +class CephContext; + +class Port { + CephContext *cct; + struct ibv_context* ctxt; + uint8_t port_num; + struct ibv_port_attr* port_attr; + int gid_tbl_len; + uint16_t lid; + union ibv_gid gid; + + public: + explicit Port(CephContext *c, struct ibv_context* ictxt, uint8_t ipn): cct(c), ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) { + int r = ibv_query_port(ctxt, port_num, port_attr); + if (r == -1) { + lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl; + assert(0); + } + + lid = port_attr->lid; + r = ibv_query_gid(ctxt, port_num, 0, &gid); + if (r) { + lderr(cct) << __func__ << " query gid failed " << cpp_strerror(errno) << dendl; + assert(0); + } + } + + uint16_t get_lid() { return lid; } + ibv_gid get_gid() { return gid; } + uint8_t get_port_num() { return port_num; } + ibv_port_attr* get_port_attr() { return port_attr; } +}; + + +class Device { + CephContext *cct; + ibv_device *device; + const char* name; + uint8_t port_cnt; + Port** ports; + public: + explicit Device(CephContext *c, ibv_device* d); + ~Device() { + for (uint8_t i = 0; i < port_cnt; ++i) + delete ports[i]; + delete []ports; + assert(ibv_close_device(ctxt) == 0); + } + const char* get_name() { return name;} + uint16_t get_lid() { return active_port->get_lid(); } + ibv_gid get_gid() { return active_port->get_gid(); } + struct ibv_context *ctxt; + ibv_device_attr *device_attr; + Port* active_port; +}; + + +class DeviceList { + CephContext *cct; + struct ibv_device ** device_list; + int num; + Device** devices; + public: + DeviceList(CephContext *c): cct(c), device_list(ibv_get_device_list(&num)) { + if (device_list == NULL || num == 0) { + lderr(cct) << __func__ << " failed to get rdma device list. " << cpp_strerror(errno) << dendl; + assert(0); + } + devices = new Device*[num]; + + for (int i = 0;i < num; ++i) { + devices[i] = new Device(cct, device_list[i]); + } + } + ~DeviceList() { + for (int i=0; devices[i] != NULL; ++i) { + delete devices[i]; + } + delete []devices; + ibv_free_device_list(device_list); + } + + Device* get_device(const char* device_name) { + assert(devices); + for (int i = 0; i < num; ++i) { + if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) { + return devices[i]; + } + } + return NULL; + } +}; + + +class Infiniband { + public: + class ProtectionDomain { + public: + explicit ProtectionDomain(CephContext *c, Device *device) + : cct(c), pd(ibv_alloc_pd(device->ctxt)) + { + if (pd == NULL) { + lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl; + assert(0); + } + } + ~ProtectionDomain() { + int rc = ibv_dealloc_pd(pd); + if (rc != 0) { + lderr(cct) << __func__ << " ibv_dealloc_pd failed: " + << cpp_strerror(errno) << dendl; + } + } + CephContext *cct; + ibv_pd* const pd; + }; + + + class MemoryManager { + public: + class Chunk { + public: + Chunk(char* b, uint32_t len, ibv_mr* m) : buffer(b), bytes(len), offset(0), mr(m) {} + ~Chunk() { + assert(ibv_dereg_mr(mr) == 0); + } + + void set_offset(uint32_t o) { + offset = o; + } + + size_t get_offset() { + return offset; + } + + void set_bound(uint32_t b) { + bound = b; + } + + void prepare_read(uint32_t b) { + offset = 0; + bound = b; + } + + uint32_t get_bound() { + return bound; + } + + size_t read(char* buf, size_t len) { + size_t left = bound - offset; + if(left >= len) { + memcpy(buf, buffer+offset, len); + offset += len; + return len; + } else { + memcpy(buf, buffer+offset, left); + offset = 0; + bound = 0; + return left; + } + } + + size_t write(char* buf, size_t len) { + size_t left = bytes - offset; + if (left >= len) { + memcpy(buffer+offset, buf, len); + offset += len; + return len; + } else { + memcpy(buffer+offset, buf, left); + offset = bytes; + return left; + } + } + + bool full() { + return offset == bytes; + } + + bool over() { + return offset == bound; + } + + void clear() { + offset = 0; + bound = 0; + } + + void post_srq(Infiniband *ib) { + ib->post_chunk(this); + } + + void set_owner(uint64_t o) { + owner = o; + } + + uint64_t get_owner() { + return owner; + } + + public: + char* buffer; + uint32_t bytes; + uint32_t bound; + size_t offset; + ibv_mr* mr; + uint64_t owner; + }; + + class Cluster { + public: + Cluster(MemoryManager& m, uint32_t s) : manager(m), chunk_size(s), lock("cluster_lock"){} + Cluster(MemoryManager& m, uint32_t s, uint32_t n) : manager(m), chunk_size(s), lock("cluster_lock"){ + add(n); + } + + ~Cluster() { + set::iterator c = all_chunks.begin(); + while(c != all_chunks.end()) { + delete *c; + ++c; + } + } + int add(uint32_t num) { + uint32_t bytes = chunk_size * num; + //cihar* base = (char*)malloc(bytes); + if (!manager.enabled_huge_page) { + base = (char*)memalign(CEPH_PAGE_SIZE, bytes); + } else { + base = (char*)manager.malloc_huge_pages(bytes); + } + assert(base); + for (uint32_t offset = 0; offset < bytes; offset += chunk_size){ + ibv_mr* m = ibv_reg_mr(manager.pd->pd, base+offset, chunk_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE); + assert(m); + Chunk* c = new Chunk(base+offset,chunk_size,m); + free_chunks.push_back(c); + all_chunks.insert(c); + } + return 0; + } + + int take_back(Chunk* ck) { + Mutex::Locker l(lock); + free_chunks.push_back(ck); + return 0; + } + + int get_buffers(std::vector &chunks, size_t bytes) { + Mutex::Locker l(lock); + if (!bytes) { + free_chunks.swap(chunks); + return 0; + } + uint32_t num = bytes / chunk_size + 1; + if (bytes % chunk_size == 0) + --num; + if (free_chunks.size() < num) + return -EAGAIN; + for (uint32_t i = 0; i < num; ++i) { + chunks.push_back(free_chunks.back()); + free_chunks.pop_back(); + } + return 0; + } + + MemoryManager& manager; + uint32_t chunk_size; + Mutex lock; + std::vector free_chunks; + set all_chunks; + char* base; + }; + + MemoryManager(CephContext *cct, Device *d, ProtectionDomain *p) : cct(cct), device(d), pd(p) { + enabled_huge_page = cct->_conf->ms_async_rdma_enable_hugepage; + } + ~MemoryManager() { + if(channel) + delete channel; + if(send) + delete send; + } + void* malloc_huge_pages(size_t size) { + size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE); + char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0); + if (ptr == MAP_FAILED) { + lderr(cct) << __func__ << " MAP_FAILED" << dendl; + ptr = (char *)malloc(real_size); + if (ptr == NULL) return NULL; + real_size = 0; + } + *((size_t *)ptr) = real_size; + lderr(cct) << __func__ << " bingo!" << dendl; + return ptr + HUGE_PAGE_SIZE; + } + void free_huge_pages(void *ptr) { + if (ptr == NULL) return; + void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE; + size_t real_size = *((size_t *)real_ptr); + assert(real_size % HUGE_PAGE_SIZE == 0); + if (real_size != 0) + munmap(real_ptr, real_size); + else + free(real_ptr); + } + void register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num) { + assert(device); + assert(pd); + channel = new Cluster(*this, size); + channel->add(rx_num); + + send = new Cluster(*this, size); + send->add(tx_num); + } + int return_tx(Chunk* c) { + c->clear(); + return send->take_back(c); + } + + int get_send_buffers(std::vector &c, size_t bytes) { + return send->get_buffers(c, bytes); + } + + int get_channel_buffers(std::vector &chunks, size_t bytes) { + return channel->get_buffers(chunks, bytes); + } + + int is_tx_chunk(Chunk* c) { return send->all_chunks.count(c);} + bool enabled_huge_page; + private: + Cluster* channel;//RECV + Cluster* send;// SEND + CephContext *cct; + Device *device; + ProtectionDomain *pd; + }; + + private: + uint32_t max_send_wr; + uint32_t max_recv_wr; + uint32_t max_sge; + uint8_t ib_physical_port; + MemoryManager* memory_manager; + ibv_srq* srq; // shared receive work queue + Device *device; + ProtectionDomain *pd; + CephContext* cct; + DeviceList device_list; + void wire_gid_to_gid(const char *wgid, union ibv_gid *gid); + void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]); + + public: + NetHandler net; + RDMAStack* stack; + explicit Infiniband(RDMAStack* s, CephContext *c, const std::string &device_name); + + /** + * Destroy an Infiniband object. + */ + ~Infiniband() { + assert(ibv_destroy_srq(srq) == 0); + delete memory_manager; + delete pd; + } + + class CompletionChannel { + static const uint32_t MAX_ACK_EVENT = 5000; + Infiniband& infiniband; + ibv_comp_channel *channel; + ibv_cq *cq; + uint32_t cq_events_that_need_ack; + + public: + CompletionChannel(Infiniband &ib): infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) {} + ~CompletionChannel(); + int init(); + bool get_cq_event(); + int get_fd() { return channel->fd; } + ibv_comp_channel* get_channel() { return channel; } + void bind_cq(ibv_cq *c) { cq = c; } + void ack_events() { + ibv_ack_cq_events(cq, cq_events_that_need_ack); + cq_events_that_need_ack = 0; + } + }; + + // this class encapsulates the creation, use, and destruction of an RC + // completion queue. + // + // You need to call init and it will create a cq and associate to comp channel + class CompletionQueue { + public: + CompletionQueue(Infiniband &ib, const uint32_t qd, CompletionChannel *cc):infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {} + ~CompletionQueue(); + int init(); + int poll_cq(int num_entries, ibv_wc *ret_wc_array); + + ibv_cq* get_cq() const { return cq; } + int rearm_notify(bool solicited_only=true); + CompletionChannel* get_cc() const { return channel; } + private: + Infiniband& infiniband; // Infiniband to which this QP belongs + CompletionChannel *channel; + ibv_cq *cq; + uint32_t queue_depth; + }; + + // this class encapsulates the creation, use, and destruction of an RC + // queue pair. + // + // you need call init and it will create a qp and bring it to the INIT state. + // after obtaining the lid, qpn, and psn of a remote queue pair, one + // must call plumb() to bring the queue pair to the RTS state. + class QueuePair { + public: + QueuePair(Infiniband& infiniband, ibv_qp_type type,int ib_physical_port, ibv_srq *srq, Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0); + // exists solely as superclass constructor for MockQueuePair derivative + explicit QueuePair(Infiniband& infiniband): + infiniband(infiniband), type(IBV_QPT_RC), ctxt(NULL), ib_physical_port(-1), + pd(NULL), srq(NULL), qp(NULL), txcq(NULL), rxcq(NULL), + initial_psn(-1) {} + ~QueuePair(); + + int init(); + + /** + * Get the initial packet sequence number for this QueuePair. + * This is randomly generated on creation. It should not be confused + * with the remote side's PSN, which is set in #plumb(). + */ + uint32_t get_initial_psn() const { return initial_psn; }; + /** + * Get the local queue pair number for this QueuePair. + * QPNs are analogous to UDP/TCP port numbers. + */ + uint32_t get_local_qp_number() const { return qp->qp_num; }; + /** + * Get the remote queue pair number for this QueuePair, as set in #plumb(). + * QPNs are analogous to UDP/TCP port numbers. + */ + int get_remote_qp_number(uint32_t *rqp) const { + ibv_qp_attr qpa; + ibv_qp_init_attr qpia; + + int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia); + if (r) { + lderr(infiniband.cct) << __func__ << " failed to query qp: " + << cpp_strerror(errno) << dendl; + return -1; + } + + if (rqp) + *rqp = qpa.dest_qp_num; + return 0; + } + /** + * Get the remote infiniband address for this QueuePair, as set in #plumb(). + * LIDs are "local IDs" in infiniband terminology. They are short, locally + * routable addresses. + */ + int get_remote_lid(uint16_t *lid) const { + ibv_qp_attr qpa; + ibv_qp_init_attr qpia; + + int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia); + if (r) { + lderr(infiniband.cct) << __func__ << " failed to query qp: " + << cpp_strerror(errno) << dendl; + return -1; + } + + if (lid) + *lid = qpa.ah_attr.dlid; + return 0; + } + /** + * Get the state of a QueuePair. + */ + int get_state() const { + ibv_qp_attr qpa; + ibv_qp_init_attr qpia; + + int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia); + if (r) { + lderr(infiniband.cct) << __func__ << " failed to get state: " + << cpp_strerror(errno) << dendl; + return -1; + } + return qpa.qp_state; + } + /** + * Return true if the queue pair is in an error state, false otherwise. + */ + bool is_error() const { + ibv_qp_attr qpa; + ibv_qp_init_attr qpia; + + int r = ibv_query_qp(qp, &qpa, -1, &qpia); + if (r) { + lderr(infiniband.cct) << __func__ << " failed to get state: " + << cpp_strerror(errno) << dendl; + return true; + } + return qpa.cur_qp_state == IBV_QPS_ERR; + } + ibv_qp* get_qp() const { return qp; } + Infiniband::CompletionQueue* get_tx_cq() const { return txcq; } + Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; } + int to_reset(); + int to_dead(); + int get_fd() { return fd; } + private: + Infiniband& infiniband; // Infiniband to which this QP belongs + ibv_qp_type type; // QP type (IBV_QPT_RC, etc.) + ibv_context* ctxt; // device context of the HCA to use + int ib_physical_port; + ibv_pd* pd; // protection domain + ibv_srq* srq; // shared receive queue + ibv_qp* qp; // infiniband verbs QP handle + Infiniband::CompletionQueue* txcq; + Infiniband::CompletionQueue* rxcq; + uint32_t initial_psn; // initial packet sequence number + uint32_t max_send_wr; + uint32_t max_recv_wr; + uint32_t q_key; + int fd; + }; + + public: + typedef MemoryManager::Cluster Cluster; + typedef MemoryManager::Chunk Chunk; + QueuePair* create_queue_pair(ibv_qp_type type); + ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge); + int post_chunk(Chunk* chunk); + int post_channel_cluster(); + int get_tx_buffers(std::vector &c, size_t bytes) { + return memory_manager->get_send_buffers(c, bytes); + } + CompletionChannel *create_comp_channel(); + CompletionQueue *create_comp_queue(CompletionChannel *cc=NULL); + uint8_t get_ib_physical_port() { + return ib_physical_port; + } + int send_udp_msg(int sd, IBSYNMsg& msg, entity_addr_t &peeraddr); + int recv_udp_msg(int sd, IBSYNMsg& msg, entity_addr_t *addr); + uint16_t get_lid() { return device->get_lid(); } + ibv_gid get_gid() { return device->get_gid(); } + MemoryManager* get_memory_manager() { return memory_manager; } + Device* get_device() { return device; } + static const char* wc_status_to_string(int status); +}; + +#endif diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc new file mode 100644 index 00000000000..3e93ebaa255 --- /dev/null +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -0,0 +1,339 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "RDMAStack.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << " RDMAConnectedSocketImpl " + +int RDMAConnectedSocketImpl::activate() { + ibv_qp_attr qpa; + int r; + + // now connect up the qps and switch to RTR + memset(&qpa, 0, sizeof(qpa)); + qpa.qp_state = IBV_QPS_RTR; + qpa.path_mtu = IBV_MTU_1024; + qpa.dest_qp_num = peer_msg.qpn; + qpa.rq_psn = peer_msg.psn; + qpa.max_dest_rd_atomic = 1; + qpa.min_rnr_timer = 12; + //qpa.ah_attr.is_global = 0; + qpa.ah_attr.is_global = 1; + qpa.ah_attr.grh.hop_limit = 6; + qpa.ah_attr.grh.dgid = peer_msg.gid; + qpa.ah_attr.grh.sgid_index = 0; + + qpa.ah_attr.dlid = peer_msg.lid; + qpa.ah_attr.sl = 0; + qpa.ah_attr.src_path_bits = 0; + qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port()); + + r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | + IBV_QP_AV | + IBV_QP_PATH_MTU | + IBV_QP_DEST_QPN | + IBV_QP_RQ_PSN | + IBV_QP_MIN_RNR_TIMER | + IBV_QP_MAX_DEST_RD_ATOMIC); + if (r) { + lderr(cct) << __func__ << " failed to transition to RTR state: " + << cpp_strerror(errno) << dendl; + return -1; + } + + ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl; + + // now move to RTS + qpa.qp_state = IBV_QPS_RTS; + + // How long to wait before retrying if packet lost or server dead. + // Supposedly the timeout is 4.096us*2^timeout. However, the actual + // timeout appears to be 4.096us*2^(timeout+1), so the setting + // below creates a 135ms timeout. + qpa.timeout = 14; + + // How many times to retry after timeouts before giving up. + qpa.retry_cnt = 7; + + // How many times to retry after RNR (receiver not ready) condition + // before giving up. Occurs when the remote side has not yet posted + // a receive request. + qpa.rnr_retry = 7; // 7 is infinite retry. + qpa.sq_psn = my_msg.psn; + qpa.max_rd_atomic = 1; + + r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | + IBV_QP_TIMEOUT | + IBV_QP_RETRY_CNT | + IBV_QP_RNR_RETRY | + IBV_QP_SQ_PSN | + IBV_QP_MAX_QP_RD_ATOMIC); + if (r) { + lderr(cct) << __func__ << " failed to transition to RTS state: " + << cpp_strerror(errno) << dendl; + return -1; + } + + // the queue pair should be ready to use once the client has finished + // setting up their end. + ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl; + + connected = 1;//indicate successfully + return 0; +} + +ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) +{ + ldout(cct, 20) << __func__ << " need to read bytes: " << len << " buffers size: " << buffers.size() << dendl; + + ssize_t read = 0; + if (!buffers.empty()) + read = read_buffers(buf,len); + + static const int MAX_COMPLETIONS = 16; + ibv_wc wc[MAX_COMPLETIONS]; + + bool rearmed = false; + int n; + again: + n = rx_cq->poll_cq(MAX_COMPLETIONS, wc); + ldout(cct, 20) << __func__ << " poll completion queue got " << n << " responses."<< dendl; + for (int i = 0; i < n; ++i) { + ibv_wc* response = &wc[i]; + ldout(cct, 20) << __func__ << " cqe " << response->byte_len << " bytes." << dendl; + Chunk* chunk = reinterpret_cast(response->wr_id); + chunk->prepare_read(response->byte_len); + if (!response->byte_len) { + wait_close = true; + return 0; + } + if (response->status != IBV_WC_SUCCESS) { + lderr(cct) << __func__ << " poll cqe failed! " << " number: " << n << ", status: "<< response->status << cpp_strerror(errno) << dendl; + assert(0); + } else { + if (read == (ssize_t)len) { + buffers.push_back(chunk); + ldout(cct, 20) << __func__ << " buffers add a chunk: " << response->byte_len << dendl; + } else if (read + response->byte_len > (ssize_t)len) { + read += chunk->read(buf+read, (ssize_t)len-read); + buffers.push_back(chunk); + ldout(cct, 20) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; + } else { + read += chunk->read(buf+read, response->byte_len); + assert(infiniband->post_chunk(chunk) == 0); + } + } + } + + if (n) + goto again; + if (!rearmed) { + rx_cq->rearm_notify(); + rearmed = true; + // Clean up cq events after rearm notify ensure no new incoming event + // arrived between polling and rearm + goto again; + } + if (read == 0) + return -EAGAIN; + return read; +} + +ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) +{ + size_t read = 0, tmp = 0; + vector::iterator c = buffers.begin(); + for (; c != buffers.end() ; ++c) { + tmp = (*c)->read(buf+read, len-read); + read += tmp; + ldout(cct, 20) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl; + if ((*c)->over()) { + assert(infiniband->post_chunk(*c) == 0); + ldout(cct, 20) << __func__ << " one chunk over." << dendl; + } + if (read == len) { + break; + } + } + + if (c != buffers.end() && (*c)->over()) + c++; + buffers.erase(buffers.begin(), c); + ldout(cct, 20) << __func__ << " got " << read << " bytes here. buffers size: " << buffers.size() << dendl; + return read; +} + +ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data) +{ + ssize_t size = 0; + static const int MAX_COMPLETIONS = 16; + ibv_wc wc[MAX_COMPLETIONS]; + + bool rearmed = false; + int n; + again: + n = rx_cq->poll_cq(MAX_COMPLETIONS, wc); + ldout(cct, 20) << __func__ << " pool completion queue got " << n << " responses."<< dendl; + + ibv_wc* response; + Chunk* chunk; + bool loaded = false; + auto iter = buffers.begin(); + if(iter != buffers.end()) { + chunk = *iter; + if (chunk->bound == 0) { + wait_close = true; + return 0; + } + auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); + buffers.erase(iter); + loaded = true; + size = chunk->bound; + } + + for (int i = 0; i < n; ++i) { + response = &wc[i]; + chunk = reinterpret_cast(response->wr_id); + chunk->prepare_read(response->byte_len); + if(!loaded && i == 0) { + if (chunk->bound == 0) { + wait_close = true; + return 0; + } + auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); + size = chunk->bound; + continue; + } + buffers.push_back(chunk); + } + + if (n) + goto again; + if (!rearmed) { + rx_cq->rearm_notify(); + rearmed = true; + // Clean up cq events after rearm notify ensure no new incoming event + // arrived between polling and rearm + goto again; + } + if (size == 0) + return -EAGAIN; + return size; +} + +ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) +{ + size_t bytes = bl.length(); + if (!bytes) + return 0; + vector tx_buffers; + if (infiniband->get_tx_buffers(tx_buffers, bytes) < 0) { + ldout(cct, 10) << __func__ << " no enough buffers" << dendl; + return 0; + } + ldout(cct, 20) << __func__ << " prepare " << bytes << " bytes, tx buffer count: " << tx_buffers.size() << dendl; + vector::iterator current_buffer = tx_buffers.begin(); + list::const_iterator it = bl.buffers().begin(); + while (it != bl.buffers().end()) { + const uintptr_t addr = reinterpret_cast(it->c_str()); + uint32_t copied = 0; + // ldout(cct, 20) << __func__ << " app_buffer: " << addr << " length: " << it->length() << dendl; + while(copied < it->length()) { + // ldout(cct, 20) << __func__ << " current_buffer: " << *current_buffer << " copied: " << copied << dendl; + size_t ret = (*current_buffer)->write((char*)addr+copied, it->length() - copied); + copied += ret; + // ldout(cct, 20) << __func__ << " ret: " << ret << " copied: " << copied << dendl; + if((*current_buffer)->full()){ + ++current_buffer; + } + } + ++it; + } + + ssize_t r = post_work_request(tx_buffers); + if (r < 0) + return r; + + ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl; + bl.clear(); + return bytes; +} + +int RDMAConnectedSocketImpl::post_work_request(std::vector &tx_buffers) +{ + vector::iterator current_buffer = tx_buffers.begin(); + ibv_sge isge[tx_buffers.size()]; + uint32_t current_sge = 0; + ibv_send_wr iswr[tx_buffers.size()]; + uint32_t current_swr = 0; + ibv_send_wr* pre_wr = NULL; + + current_buffer = tx_buffers.begin(); + while (current_buffer != tx_buffers.end()) { + isge[current_sge].addr = reinterpret_cast((*current_buffer)->buffer); + isge[current_sge].length = (*current_buffer)->get_offset(); + isge[current_sge].lkey = (*current_buffer)->mr->lkey; + ldout(cct, 20) << __func__ << " current_buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl; + + iswr[current_swr].wr_id = reinterpret_cast(*current_buffer); + iswr[current_swr].next = NULL; + iswr[current_swr].sg_list = &isge[current_sge]; + iswr[current_swr].num_sge = 1; + iswr[current_swr].opcode = IBV_WR_SEND; + iswr[current_swr].send_flags = IBV_SEND_SIGNALED; + /*if(isge[current_sge].length < infiniband->max_inline_data) { + iswr[current_swr].send_flags = IBV_SEND_INLINE; + ldout(cct, 20) << __func__ << " send_inline." << dendl; + }*/ + + if (pre_wr) + pre_wr->next = &iswr[current_swr]; + pre_wr = &iswr[current_swr]; + ++current_sge; + ++current_swr; + ++current_buffer; + } + + ibv_send_wr *bad_tx_work_request; + if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) { + lderr(cct) << __func__ << " failed to send data" + << " (most probably should be peer not ready): " + << cpp_strerror(errno) << dendl; + return -errno; + } + return 0; +} + +void RDMAConnectedSocketImpl::fin() { + //ibv_sge list; + //memset(&list, 0, sizeof(list)); + ibv_send_wr wr; + memset(&wr, 0, sizeof(wr)); + wr.wr_id = reinterpret_cast(this); + wr.num_sge = 0; + //wr.sg_list = &list; + wr.opcode = IBV_WR_SEND; + wr.send_flags = IBV_SEND_SIGNALED; + ibv_send_wr* bad_tx_work_request; + if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) { + lderr(cct) << __func__ << " failed to send FIN" + << "(most probably should be peer not ready): " + << cpp_strerror(errno) << dendl; + return ; + } +} diff --git a/src/msg/async/rdma/RDMAServerSocketImpl.cc b/src/msg/async/rdma/RDMAServerSocketImpl.cc new file mode 100644 index 00000000000..0a148f9f6e7 --- /dev/null +++ b/src/msg/async/rdma/RDMAServerSocketImpl.cc @@ -0,0 +1,95 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "msg/async/net_handler.h" +#include "RDMAStack.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << " RDMAServerSocketImpl " + +int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt) +{ + server_setup_socket = ::socket(sa.get_family(), SOCK_DGRAM, 0); + if (server_setup_socket == -1) { + lderr(cct) << __func__ << " failed to create server socket: " + << cpp_strerror(errno) << dendl; + return -errno; + } + + int on = 1; + int rc = ::setsockopt(server_setup_socket, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (rc < 0) { + lderr(cct) << __func__ << " unable to setsockopt: " << cpp_strerror(errno) << dendl; + goto err; + } + + rc = ::bind(server_setup_socket, sa.get_sockaddr(), sa.get_sockaddr_len()); + if (rc < 0) { + lderr(cct) << __func__ << " unable to bind to " << sa.get_sockaddr() + << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl; + goto err; + } + + rc = net.set_nonblock(server_setup_socket); + if (rc < 0) { + goto err; + } + + net.set_close_on_exec(server_setup_socket); + + ldout(cct, 20) << __func__ << " bind to " << sa.get_sockaddr() << " on port " << sa.get_port() << dendl; + return 0; + +err: + ::close(server_setup_socket); + server_setup_socket = -1; + return -1; +} + +int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out) +{ + ldout(cct, 15) << __func__ << dendl; + int r; + RDMAConnectedSocketImpl* server; + while (1) { + IBSYNMsg msg;//TODO + entity_addr_t addr; + r = infiniband->recv_udp_msg(server_setup_socket, msg, &addr); + if (r < 0) { + r = -errno; + if (r != -EAGAIN) + ldout(cct, 10) << __func__ << " recv msg failed:" << cpp_strerror(errno)<< dendl; + break; + } else if (r > 0) { + ldout(cct, 1) << __func__ << " recv msg not whole." << dendl; + continue; + } else { + //RDMAWorker* w = static_cast(infiniband->stack->get_worker()); + server = new RDMAConnectedSocketImpl(cct, infiniband, NULL, msg); + msg = server->get_my_msg(); + r = infiniband->send_udp_msg(server_setup_socket, msg, addr); + server->activate(); + std::unique_ptr csi(server); + *sock = ConnectedSocket(std::move(csi)); + if(out) + *out = sa; + return r; + } + } + + return r; +} diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc new file mode 100644 index 00000000000..ca92fae3874 --- /dev/null +++ b/src/msg/async/rdma/RDMAStack.cc @@ -0,0 +1,152 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "RDMAStack.h" +#include "include/str_list.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << "RDMAStack " + +static Infiniband* global_infiniband; + +int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock) +{ + auto p = new RDMAServerSocketImpl(cct, infiniband, sa); + int r = p->listen(sa, opt); + if (r < 0) { + delete p; + return r; + } + + *sock = ServerSocket(std::unique_ptr(p)); + return 0; +} + +int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) +{ + RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, this); + entity_addr_t sa; + memcpy(&sa, &addr, sizeof(addr)); + + IBSYNMsg msg = p->get_my_msg(); + ldout(cct, 20) << __func__ << " connecting to " << sa.get_sockaddr() << " : " << sa.get_port() << dendl; + ldout(cct, 20) << __func__ << " my syn msg : < " << msg.qpn << ", " << msg.psn << ", " << msg.lid << ">"<< dendl; + + client_setup_socket = ::socket(PF_INET, SOCK_DGRAM, 0); + if (client_setup_socket == -1) { + lderr(cct) << __func__ << " failed to create client socket: " << strerror(errno) << dendl; + return -errno; + } + + int r = ::connect(client_setup_socket, addr.get_sockaddr(), addr.get_sockaddr_len()); + if (r < 0) { + lderr(cct) << __func__ << " failed to connect " << addr << ": " + << strerror(errno) << dendl; + return -errno; + } + + r = infiniband->send_udp_msg(client_setup_socket, msg, sa); + if (r < 0) { + ldout(cct, 0) << __func__ << " send msg failed." << dendl; + return r; + } + + // FIXME: need to make this async + r = infiniband->recv_udp_msg(client_setup_socket, msg, &sa); + if (r < 0) { + ldout(cct, 0) << __func__ << " recv msg failed." << dendl; + return r; + } + p->set_peer_msg(msg); + ldout(cct, 20) << __func__ << " peer msg : < " << msg.qpn << ", " << msg.psn << ", " << msg.lid << "> " << dendl; + r = p->activate(); + assert(!r); + std::unique_ptr csi(p); + *socket = ConnectedSocket(std::move(csi)); + + return 0; +} + +RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t) +{ + if (!global_infiniband) + global_infiniband = new Infiniband( + this, cct, cct->_conf->ms_async_rdma_device_name); +} + +void RDMAWorker::initialize() +{ + infiniband = global_infiniband; + tx_cc = infiniband->create_comp_channel(); + tx_cq = infiniband->create_comp_queue(tx_cc); + center.create_file_event(tx_cc->get_fd(), EVENT_READABLE, tx_handler); + memory_manager = infiniband->get_memory_manager(); +} + +void RDMAWorker::handle_tx_event() +{ + ldout(cct, 20) << __func__ << dendl; + if (!tx_cc->get_cq_event()) + return ; + + static const int MAX_COMPLETIONS = 16; + ibv_wc wc[MAX_COMPLETIONS]; + + bool rearmed = false; + int n; + again: + n = tx_cq->poll_cq(MAX_COMPLETIONS, wc); + ldout(cct, 20) << __func__ << " pool completion queue got " << n + << " responses."<< dendl; + for (int i = 0; i < n; ++i) { + ibv_wc* response = &wc[i]; + Chunk* chunk = reinterpret_cast(response->wr_id); + ldout(cct, 20) << __func__ << " opcode: " << response->opcode << " len: " << response->byte_len << dendl; + + if (response->status != IBV_WC_SUCCESS) { + if (response->status == IBV_WC_RETRY_EXC_ERR) { + lderr(cct) << __func__ << " connection between server and client not working. Disconnect this now" << dendl; + } else if (response->status == IBV_WC_WR_FLUSH_ERR) { + lderr(cct) << __func__ << " Work Request Flushed Error: this connection's qp=" + << response->qp_num << " should be down while this WR=" << response->wr_id + << " still in flight." << dendl; + } else { + lderr(cct) << __func__ << " send work request returned error for buffer(" + << response->wr_id << ") status(" << response->status << "): " + << infiniband->wc_status_to_string(response->status) << dendl; + } + assert(0); + } + + if (memory_manager->is_tx_chunk(chunk)) + infiniband->get_memory_manager()->return_tx(chunk); + else + ldout(cct, 20) << __func__ << " chunk belongs to none " << dendl; + } + + if (n) + goto again; + + if (!rearmed) { + tx_cq->rearm_notify(); + rearmed = true; + // Clean up cq events after rearm notify ensure no new incoming event + // arrived between polling and rearm + goto again; + } + ldout(cct, 20) << __func__ << " leaving handle_tx_event. " << dendl; +} diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h new file mode 100644 index 00000000000..2ff1c0d7929 --- /dev/null +++ b/src/msg/async/rdma/RDMAStack.h @@ -0,0 +1,184 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSG_RDMASTACK_H +#define CEPH_MSG_RDMASTACK_H + +#include "common/ceph_context.h" +#include "common/debug.h" +#include "common/errno.h" +#include "msg/async/Stack.h" +#include +#include "Infiniband.h" + +class RDMAConnectedSocketImpl; + +class RDMAWorker : public Worker { + typedef Infiniband::CompletionQueue CompletionQueue; + typedef Infiniband::CompletionChannel CompletionChannel; + typedef Infiniband::MemoryManager::Chunk Chunk; + typedef Infiniband::MemoryManager MemoryManager; + int client_setup_socket; + Infiniband* infiniband; + CompletionQueue* tx_cq; // common completion queue for all transmits + CompletionChannel* tx_cc; + EventCallbackRef tx_handler; + MemoryManager* memory_manager; + vector to_delete; + class C_handle_cq_tx : public EventCallback { + RDMAWorker *worker; + public: + C_handle_cq_tx(RDMAWorker *w): worker(w) {} + void do_request(int fd) { + worker->handle_tx_event(); + } + }; + + public: + explicit RDMAWorker(CephContext *c, unsigned i) + : Worker(c, i), infiniband(NULL), tx_handler(new C_handle_cq_tx(this)) {} + virtual ~RDMAWorker() { + tx_cc->ack_events(); + delete tx_cq; + delete tx_cc; + delete tx_handler; + } + + virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override; + virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override; + virtual void initialize() override; + void handle_tx_event(); + CompletionQueue* get_tx_cq() { return tx_cq; } + void remove_to_delete(RDMAConnectedSocketImpl* csi) { + if (to_delete.empty()) + return ; + vector::iterator iter = to_delete.begin(); + for (; iter != to_delete.end(); ++iter) { + if(csi == *iter) { + to_delete.erase(iter); + } + } + } + void add_to_delete(RDMAConnectedSocketImpl* csi) { + to_delete.push_back(csi); + } +}; + +class RDMAConnectedSocketImpl : public ConnectedSocketImpl { + public: + typedef Infiniband::MemoryManager::Chunk Chunk; + typedef Infiniband::CompletionChannel CompletionChannel; + typedef Infiniband::CompletionQueue CompletionQueue; + + private: + CephContext *cct; + Infiniband::QueuePair *qp; + IBSYNMsg peer_msg; + IBSYNMsg my_msg; + int connected; + Infiniband* infiniband; + RDMAWorker* worker; + std::vector buffers; + CompletionChannel* rx_cc; + CompletionQueue* rx_cq; + bool wait_close; + + public: + RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMAWorker* w, IBSYNMsg im = IBSYNMsg()) : cct(cct), peer_msg(im), infiniband(ib), worker(w), wait_close(false) { + qp = infiniband->create_queue_pair(IBV_QPT_RC); + rx_cq = qp->get_rx_cq(); + rx_cc = rx_cq->get_cc(); + my_msg.qpn = qp->get_local_qp_number(); + my_msg.psn = qp->get_initial_psn(); + my_msg.lid = infiniband->get_lid(); + my_msg.gid = infiniband->get_gid(); + } + + virtual int is_connected() override { + return connected; + } + virtual ssize_t read(char* buf, size_t len) override; + virtual ssize_t zero_copy_read(bufferptr &data) override; + virtual ssize_t send(bufferlist &bl, bool more) override; + virtual void shutdown() override { + } + virtual void close() override { + if (!wait_close) { + fin(); + worker->add_to_delete(this); + } else { + clear_all(); + } + } + virtual int fd() const override { + return rx_cc->get_fd(); + } + void clear_all() { + delete qp; + rx_cc->ack_events(); + delete rx_cq; + rx_cq = NULL; + if (!wait_close) + worker->remove_to_delete(this); + } + int activate(); + ssize_t read_buffers(char* buf, size_t len); + int poll_cq(int num_entries, ibv_wc *ret_wc_array); + IBSYNMsg get_my_msg() { return my_msg; } + IBSYNMsg get_peer_msg() { return peer_msg; } + void set_peer_msg(IBSYNMsg m) { peer_msg = m ;} + int post_work_request(std::vector&); + void fin(); +}; + + +class RDMAServerSocketImpl : public ServerSocketImpl { + CephContext *cct; + NetHandler net; + int server_setup_socket; + Infiniband* infiniband; + entity_addr_t sa; + public: + RDMAServerSocketImpl(CephContext *cct, Infiniband* i, entity_addr_t& a) + : cct(cct), net(cct), infiniband(i), sa(a) {} + int listen(entity_addr_t &sa, const SocketOptions &opt); + virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out) override; + virtual void abort_accept() override {} + virtual int fd() const override { + return server_setup_socket; + } +}; + + +class RDMAStack : public NetworkStack { + vector threads; + + public: + explicit RDMAStack(CephContext *cct, const string &t); + virtual bool support_zero_copy_read() const override { return true; } + //virtual bool support_local_listen_table() const { return true; } + + virtual void spawn_worker(unsigned i, std::function &&func) override { + threads.resize(i+1); + threads[i] = std::move(std::thread(func)); + } + virtual void join_worker(unsigned i) override { + assert(threads.size() > i && threads[i].joinable()); + threads[i].join(); + } +}; + +#endif