From: Amir Vadai Date: Mon, 2 Jan 2017 08:29:43 +0000 (+0200) Subject: RDMA: Move defenitions from Infiniband.h into .cc X-Git-Tag: v12.0.0~296^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=75996d20ae27b144162e91796226e19f782ca233;p=ceph.git RDMA: Move defenitions from Infiniband.h into .cc Signed-off-by: Amir Vadai --- diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index cec6998545f3..657f62158fcd 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -27,6 +27,69 @@ static const uint32_t MAX_INLINE_DATA = 128; static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000"); static const uint32_t CQ_DEPTH = 30000; +Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) +{ + union ibv_gid cgid; + struct ibv_exp_gid_attr gid_attr; + bool malformed = false; + + int r = ibv_query_port(ctxt, port_num, port_attr); + if (r == -1) { + lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl; + ceph_abort(); + } + + lid = port_attr->lid; + + // search for requested GID in GIDs table + ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid) + << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl; + r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(), + "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx" + ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx", + &cgid.raw[ 0], &cgid.raw[ 1], + &cgid.raw[ 2], &cgid.raw[ 3], + &cgid.raw[ 4], &cgid.raw[ 5], + &cgid.raw[ 6], &cgid.raw[ 7], + &cgid.raw[ 8], &cgid.raw[ 9], + &cgid.raw[10], &cgid.raw[11], + &cgid.raw[12], &cgid.raw[13], + &cgid.raw[14], &cgid.raw[15]); + + if (r != 16) { + ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl; + malformed = true; + } + + gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE; + + for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) { + r = ibv_query_gid(ctxt, port_num, gid_idx, &gid); + if (r) { + lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl; + ceph_abort(); + } + r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr); + if (r) { + lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl; + ceph_abort(); + } + + if (malformed) break; // stay with gid_idx=0 + if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) && + (memcmp(&gid, &cgid, 16) == 0) ) { + ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl; + break; + } + } + + if (gid_idx == port_attr->gid_tbl_len) { + lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl; + ceph_abort(); + } +} + + Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr) { if (device == NULL) { @@ -46,86 +109,594 @@ Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_ } } -Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) { - union ibv_gid cgid; - struct ibv_exp_gid_attr gid_attr; - bool malformed = false; - - int r = ibv_query_port(ctxt, port_num, port_attr); - if (r == -1) { - lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl; - ceph_abort(); - } - - lid = port_attr->lid; - - // search for requested GID in GIDs table - ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid) - << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl; - r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(), - "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx" - ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx", - &cgid.raw[ 0], &cgid.raw[ 1], - &cgid.raw[ 2], &cgid.raw[ 3], - &cgid.raw[ 4], &cgid.raw[ 5], - &cgid.raw[ 6], &cgid.raw[ 7], - &cgid.raw[ 8], &cgid.raw[ 9], - &cgid.raw[10], &cgid.raw[11], - &cgid.raw[12], &cgid.raw[13], - &cgid.raw[14], &cgid.raw[15]); - - if (r != 16) { - ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl; - malformed = true; - } - - gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE; - - for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) { - r = ibv_query_gid(ctxt, port_num, gid_idx, &gid); - if (r) { - lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl; - ceph_abort(); - } - r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr); - if (r) { - lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl; - ceph_abort(); - } - - if (malformed) break; // stay with gid_idx=0 - if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) && - (memcmp(&gid, &cgid, 16) == 0) ) { - ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl; - break; - } - } - - if (gid_idx == port_attr->gid_tbl_len) { - lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl; - ceph_abort(); - } - } +void Device::binding_port(CephContext *cct, uint8_t port_num) { + port_cnt = device_attr->phys_port_cnt; + ports = new Port*[port_cnt]; + for (uint8_t i = 0; i < port_cnt; ++i) { + ports[i] = new Port(cct, ctxt, i+1); + if (i+1 == port_num && ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) { + active_port = ports[i]; + ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl; + return ; + } else { + ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << ports[i]->get_port_attr()->state << ")"<< dendl; + } + } + if (nullptr == active_port) { + lderr(cct) << __func__ << " port not found" << dendl; + assert(active_port); + } +} + + +Infiniband::QueuePair::QueuePair( + CephContext *c, Infiniband& infiniband, ibv_qp_type type, + int port, ibv_srq *srq, + Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, + uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key) +: cct(c), infiniband(infiniband), + type(type), + ctxt(infiniband.device->ctxt), + ib_physical_port(port), + pd(infiniband.pd->pd), + srq(srq), + qp(NULL), + txcq(txcq), + rxcq(rxcq), + initial_psn(0), + max_send_wr(max_send_wr), + max_recv_wr(max_recv_wr), + q_key(q_key), + dead(false) +{ + initial_psn = lrand48() & 0xffffff; + if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) { + lderr(cct) << __func__ << "invalid queue pair type" << cpp_strerror(errno) << dendl; + ceph_abort(); + } + pd = infiniband.pd->pd; +} + +int Infiniband::QueuePair::init() +{ + ldout(cct, 20) << __func__ << " started." << dendl; + ibv_qp_init_attr qpia; + memset(&qpia, 0, sizeof(qpia)); + qpia.send_cq = txcq->get_cq(); + qpia.recv_cq = rxcq->get_cq(); + qpia.srq = srq; // use the same shared receive queue + qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests + qpia.cap.max_send_sge = 1; // max send scatter-gather elements + qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q + qpia.qp_type = type; // RC, UC, UD, or XRC + qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs + + qp = ibv_create_qp(pd, &qpia); + if (qp == NULL) { + lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl; + lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_buffers or" + " ms_async_rdma_send_buffers" << dendl; + return -1; + } + + ldout(cct, 20) << __func__ << " successfully create queue pair: " + << "qp=" << qp << dendl; + + // move from RESET to INIT state + ibv_qp_attr qpa; + memset(&qpa, 0, sizeof(qpa)); + qpa.qp_state = IBV_QPS_INIT; + qpa.pkey_index = 0; + qpa.port_num = (uint8_t)(ib_physical_port); + qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE; + qpa.qkey = q_key; + + int mask = IBV_QP_STATE | IBV_QP_PORT; + switch (type) { + case IBV_QPT_RC: + mask |= IBV_QP_ACCESS_FLAGS; + mask |= IBV_QP_PKEY_INDEX; + break; + case IBV_QPT_UD: + mask |= IBV_QP_QKEY; + mask |= IBV_QP_PKEY_INDEX; + break; + case IBV_QPT_RAW_PACKET: + break; + default: + ceph_abort(); + } + + int ret = ibv_modify_qp(qp, &qpa, mask); + if (ret) { + ibv_destroy_qp(qp); + lderr(cct) << __func__ << " failed to transition to INIT state: " + << cpp_strerror(errno) << dendl; + return -1; + } + ldout(cct, 20) << __func__ << " successfully change queue pair to INIT:" + << " qp=" << qp << dendl; + return 0; +} + +/** + * Change RC QueuePair into the ERROR state. This is necessary modify + * the Queue Pair into the Error state and poll all of the relevant + * Work Completions prior to destroying a Queue Pair. + * Since destroying a Queue Pair does not guarantee that its Work + * Completions are removed from the CQ upon destruction. Even if the + * Work Completions are already in the CQ, it might not be possible to + * retrieve them. If the Queue Pair is associated with an SRQ, it is + * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED + * + * \return + * -errno if the QueuePair can't switch to ERROR + * 0 for success. + */ +int Infiniband::QueuePair::to_dead() +{ + if (dead) + return 0; + ibv_qp_attr qpa; + memset(&qpa, 0, sizeof(qpa)); + qpa.qp_state = IBV_QPS_ERR; + + int mask = IBV_QP_STATE; + int ret = ibv_modify_qp(qp, &qpa, mask); + if (ret) { + lderr(cct) << __func__ << " failed to transition to ERROR state: " + << cpp_strerror(errno) << dendl; + return -errno; + } + dead = true; + return ret; +} + +int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp) const +{ + ibv_qp_attr qpa; + ibv_qp_init_attr qpia; + + int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia); + if (r) { + lderr(cct) << __func__ << " failed to query qp: " + << cpp_strerror(errno) << dendl; + return -1; + } + + if (rqp) + *rqp = qpa.dest_qp_num; + return 0; +} + +/** + * Get the remote infiniband address for this QueuePair, as set in #plumb(). + * LIDs are "local IDs" in infiniband terminology. They are short, locally + * routable addresses. + */ +int Infiniband::QueuePair::get_remote_lid(uint16_t *lid) const +{ + ibv_qp_attr qpa; + ibv_qp_init_attr qpia; + + int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia); + if (r) { + lderr(cct) << __func__ << " failed to query qp: " + << cpp_strerror(errno) << dendl; + return -1; + } + + if (lid) + *lid = qpa.ah_attr.dlid; + return 0; +} + +/** + * Get the state of a QueuePair. + */ +int Infiniband::QueuePair::get_state() const +{ + ibv_qp_attr qpa; + ibv_qp_init_attr qpia; + + int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia); + if (r) { + lderr(cct) << __func__ << " failed to get state: " + << cpp_strerror(errno) << dendl; + return -1; + } + return qpa.qp_state; +} + +/** + * Return true if the queue pair is in an error state, false otherwise. + */ +bool Infiniband::QueuePair::is_error() const +{ + ibv_qp_attr qpa; + ibv_qp_init_attr qpia; + + int r = ibv_query_qp(qp, &qpa, -1, &qpia); + if (r) { + lderr(cct) << __func__ << " failed to get state: " + << cpp_strerror(errno) << dendl; + return true; + } + return qpa.cur_qp_state == IBV_QPS_ERR; +} + + +Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib) + : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) +{ +} + +Infiniband::CompletionChannel::~CompletionChannel() +{ + if (channel) { + int r = ibv_destroy_comp_channel(channel); + if (r < 0) + lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl; + assert(r == 0); + } +} + +int Infiniband::CompletionChannel::init() +{ + ldout(cct, 20) << __func__ << " started." << dendl; + channel = ibv_create_comp_channel(infiniband.device->ctxt); + if (!channel) { + lderr(cct) << __func__ << " failed to create receive completion channel: " + << cpp_strerror(errno) << dendl; + return -1; + } + int rc = NetHandler(cct).set_nonblock(channel->fd); + if (rc < 0) { + ibv_destroy_comp_channel(channel); + return -1; + } + return 0; +} + +void Infiniband::CompletionChannel::ack_events() +{ + ibv_ack_cq_events(cq, cq_events_that_need_ack); + cq_events_that_need_ack = 0; +} + +bool Infiniband::CompletionChannel::get_cq_event() +{ + ibv_cq *cq = NULL; + void *ev_ctx; + if (ibv_get_cq_event(channel, &cq, &ev_ctx)) { + if (errno != EAGAIN && errno != EINTR) + lderr(cct) << __func__ << " failed to retrieve CQ event: " + << cpp_strerror(errno) << dendl; + return false; + } + + /* accumulate number of cq events that need to + * * be acked, and periodically ack them + * */ + if (++cq_events_that_need_ack == MAX_ACK_EVENT) { + ldout(cct, 20) << __func__ << " ack aq events." << dendl; + ibv_ack_cq_events(cq, MAX_ACK_EVENT); + cq_events_that_need_ack = 0; + } + + return true; +} + + +Infiniband::CompletionQueue::~CompletionQueue() +{ + if (cq) { + int r = ibv_destroy_cq(cq); + if (r < 0) + lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl; + assert(r == 0); + } +} + +int Infiniband::CompletionQueue::init() +{ + cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0); + if (!cq) { + lderr(cct) << __func__ << " failed to create receive completion queue: " + << cpp_strerror(errno) << dendl; + return -1; + } + + if (ibv_req_notify_cq(cq, 0)) { + lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl; + ibv_destroy_cq(cq); + cq = nullptr; + return -1; + } + + channel->bind_cq(cq); + ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl; + return 0; +} + +int Infiniband::CompletionQueue::rearm_notify(bool solicite_only) +{ + ldout(cct, 20) << __func__ << " started." << dendl; + int r = ibv_req_notify_cq(cq, 0); + if (r < 0) + lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl; + return r; +} + +int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) { + int r = ibv_poll_cq(cq, num_entries, ret_wc_array); + if (r < 0) { + lderr(cct) << __func__ << " poll_completion_queue occur met error: " + << cpp_strerror(errno) << dendl; + return -1; + } + return r; +} + + +Infiniband::ProtectionDomain::ProtectionDomain(CephContext *cct, Device *device) + : pd(ibv_alloc_pd(device->ctxt)) +{ + if (pd == NULL) { + lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl; + ceph_abort(); + } +} + +Infiniband::ProtectionDomain::~ProtectionDomain() +{ + int rc = ibv_dealloc_pd(pd); + assert(rc == 0); +} + + +Infiniband::MemoryManager::Chunk::Chunk(char* b, uint32_t len, ibv_mr* m) + : buffer(b), bytes(len), offset(0), mr(m) +{ +} + +Infiniband::MemoryManager::Chunk::~Chunk() +{ + assert(ibv_dereg_mr(mr) == 0); +} + +void Infiniband::MemoryManager::Chunk::set_offset(uint32_t o) +{ + offset = o; +} + +uint32_t Infiniband::MemoryManager::Chunk::get_offset() +{ + return offset; +} + +void Infiniband::MemoryManager::Chunk::set_bound(uint32_t b) +{ + bound = b; +} + +void Infiniband::MemoryManager::Chunk::prepare_read(uint32_t b) +{ + offset = 0; + bound = b; +} + +uint32_t Infiniband::MemoryManager::Chunk::get_bound() +{ + return bound; +} + +uint32_t Infiniband::MemoryManager::Chunk::read(char* buf, uint32_t len) +{ + uint32_t left = bound - offset; + if (left >= len) { + memcpy(buf, buffer+offset, len); + offset += len; + return len; + } else { + memcpy(buf, buffer+offset, left); + offset = 0; + bound = 0; + return left; + } +} + +uint32_t Infiniband::MemoryManager::Chunk::write(char* buf, uint32_t len) +{ + uint32_t left = bytes - offset; + if (left >= len) { + memcpy(buffer+offset, buf, len); + offset += len; + return len; + } else { + memcpy(buffer+offset, buf, left); + offset = bytes; + return left; + } +} + +bool Infiniband::MemoryManager::Chunk::full() +{ + return offset == bytes; +} + +bool Infiniband::MemoryManager::Chunk::over() +{ + return Infiniband::MemoryManager::Chunk::offset == bound; +} + +void Infiniband::MemoryManager::Chunk::clear() +{ + offset = 0; + bound = 0; +} + +void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib) +{ + ib->post_chunk(this); +} + +void Infiniband::MemoryManager::Chunk::set_owner(uint64_t o) +{ + owner = o; +} + +uint64_t Infiniband::MemoryManager::Chunk::get_owner() +{ + return owner; +} + + +Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s) + : manager(m), chunk_size(s), lock("cluster_lock") +{ +} + +Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s, uint32_t n) + : manager(m), chunk_size(s), lock("cluster_lock") +{ + add(n); +} + +Infiniband::MemoryManager::Cluster::~Cluster() +{ + set::iterator c = all_chunks.begin(); + while(c != all_chunks.end()) { + delete *c; + ++c; + } + if (manager.enabled_huge_page) + manager.free_huge_pages(base); + else + delete base; +} + +int Infiniband::MemoryManager::Cluster::add(uint32_t num) +{ + uint32_t bytes = chunk_size * num; + //cihar* base = (char*)malloc(bytes); + if (manager.enabled_huge_page) { + base = (char*)manager.malloc_huge_pages(bytes); + } else { + base = (char*)memalign(CEPH_PAGE_SIZE, bytes); + } + assert(base); + for (uint32_t offset = 0; offset < bytes; offset += chunk_size){ + ibv_mr* m = ibv_reg_mr(manager.pd->pd, base+offset, chunk_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE); + assert(m); + Chunk* c = new Chunk(base+offset,chunk_size,m); + free_chunks.push_back(c); + all_chunks.insert(c); + } + return 0; +} + +void Infiniband::MemoryManager::Cluster::take_back(Chunk* ck) +{ + Mutex::Locker l(lock); + free_chunks.push_back(ck); +} + +int Infiniband::MemoryManager::Cluster::get_buffers(std::vector &chunks, size_t bytes) +{ + uint32_t num = bytes / chunk_size + 1; + if (bytes % chunk_size == 0) + --num; + int r = num; + Mutex::Locker l(lock); + if (free_chunks.empty()) + return 0; + if (!bytes) { + free_chunks.swap(chunks); + r = chunks.size(); + return r; + } + if (free_chunks.size() < num) { + num = free_chunks.size(); + r = num; + } + for (uint32_t i = 0; i < num; ++i) { + chunks.push_back(free_chunks.back()); + free_chunks.pop_back(); + } + return r; +} + + +Infiniband::MemoryManager::MemoryManager(Device *d, ProtectionDomain *p, bool hugepage) + : device(d), pd(p) +{ + enabled_huge_page = hugepage; +} + +Infiniband::MemoryManager::~MemoryManager() +{ + if (channel) + delete channel; + if (send) + delete send; +} + +void* Infiniband::MemoryManager::malloc_huge_pages(size_t size) +{ + size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE); + char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0); + if (ptr == MAP_FAILED) { + ptr = (char *)malloc(real_size); + if (ptr == NULL) return NULL; + real_size = 0; + } + *((size_t *)ptr) = real_size; + return ptr + HUGE_PAGE_SIZE; +} + +void Infiniband::MemoryManager::free_huge_pages(void *ptr) +{ + if (ptr == NULL) return; + void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE; + size_t real_size = *((size_t *)real_ptr); + assert(real_size % HUGE_PAGE_SIZE == 0); + if (real_size != 0) + munmap(real_ptr, real_size); + else + free(real_ptr); +} + +void Infiniband::MemoryManager::register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num) +{ + assert(device); + assert(pd); + channel = new Cluster(*this, size); + channel->add(rx_num); + + send = new Cluster(*this, size); + send->add(tx_num); +} + +void Infiniband::MemoryManager::return_tx(std::vector &chunks) +{ + for (auto c : chunks) { + c->clear(); + send->take_back(c); + } +} + +int Infiniband::MemoryManager::get_send_buffers(std::vector &c, size_t bytes) +{ + return send->get_buffers(c, bytes); +} -void Device::binding_port(CephContext *cct, uint8_t port_num) { - port_cnt = device_attr->phys_port_cnt; - ports = new Port*[port_cnt]; - for (uint8_t i = 0; i < port_cnt; ++i) { - ports[i] = new Port(cct, ctxt, i+1); - if (i+1 == port_num && ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) { - active_port = ports[i]; - ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl; - return ; - } else { - ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << ports[i]->get_port_attr()->state << ")"<< dendl; - } - } - if (nullptr == active_port) { - lderr(cct) << __func__ << " port not found" << dendl; - assert(active_port); - } +int Infiniband::MemoryManager::get_channel_buffers(std::vector &chunks, size_t bytes) +{ + return channel->get_buffers(chunks, bytes); } + Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num): device_list(cct) { device = device_list.get_device(device_name.c_str()); @@ -163,6 +734,13 @@ Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t post_channel_cluster(); } +Infiniband::~Infiniband() +{ + assert(ibv_destroy_srq(srq) == 0); + delete memory_manager; + delete pd; +} + /** * Create a shared receive queue. This basically wraps the verbs call. * @@ -183,6 +761,25 @@ ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_s return ibv_create_srq(pd->pd, &sia); } +int Infiniband::get_tx_buffers(std::vector &c, size_t bytes) +{ + return memory_manager->get_send_buffers(c, bytes); +} + +int Infiniband::recall_chunk(Chunk* c) +{ + if (memory_manager->is_rx_chunk(c)) { + post_chunk(c); + return 1; + } else if (memory_manager->is_tx_chunk(c)) { + vector v; + v.push_back(c); + memory_manager->return_tx(v); + return 2; + } + return -1; +} + /** * Create a new QueuePair. This factory should be used in preference to * the QueuePair constructor directly, since this lets derivatives of @@ -204,101 +801,6 @@ Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, Completio return qp; } -int Infiniband::QueuePair::init() -{ - ldout(cct, 20) << __func__ << " started." << dendl; - ibv_qp_init_attr qpia; - memset(&qpia, 0, sizeof(qpia)); - qpia.send_cq = txcq->get_cq(); - qpia.recv_cq = rxcq->get_cq(); - qpia.srq = srq; // use the same shared receive queue - qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests - qpia.cap.max_send_sge = 1; // max send scatter-gather elements - qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q - qpia.qp_type = type; // RC, UC, UD, or XRC - qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs - - qp = ibv_create_qp(pd, &qpia); - if (qp == NULL) { - lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl; - lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_buffers or" - " ms_async_rdma_send_buffers" << dendl; - return -1; - } - - ldout(cct, 20) << __func__ << " successfully create queue pair: " - << "qp=" << qp << dendl; - - // move from RESET to INIT state - ibv_qp_attr qpa; - memset(&qpa, 0, sizeof(qpa)); - qpa.qp_state = IBV_QPS_INIT; - qpa.pkey_index = 0; - qpa.port_num = (uint8_t)(ib_physical_port); - qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE; - qpa.qkey = q_key; - - int mask = IBV_QP_STATE | IBV_QP_PORT; - switch (type) { - case IBV_QPT_RC: - mask |= IBV_QP_ACCESS_FLAGS; - mask |= IBV_QP_PKEY_INDEX; - break; - case IBV_QPT_UD: - mask |= IBV_QP_QKEY; - mask |= IBV_QP_PKEY_INDEX; - break; - case IBV_QPT_RAW_PACKET: - break; - default: - ceph_abort(); - } - - int ret = ibv_modify_qp(qp, &qpa, mask); - if (ret) { - ibv_destroy_qp(qp); - lderr(cct) << __func__ << " failed to transition to INIT state: " - << cpp_strerror(errno) << dendl; - return -1; - } - ldout(cct, 20) << __func__ << " successfully change queue pair to INIT:" - << " qp=" << qp << dendl; - return 0; -} - -/** - * Change RC QueuePair into the ERROR state. This is necessary modify - * the Queue Pair into the Error state and poll all of the relevant - * Work Completions prior to destroying a Queue Pair. - * Since destroying a Queue Pair does not guarantee that its Work - * Completions are removed from the CQ upon destruction. Even if the - * Work Completions are already in the CQ, it might not be possible to - * retrieve them. If the Queue Pair is associated with an SRQ, it is - * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED - * - * \return - * -errno if the QueuePair can't switch to ERROR - * 0 for success. - */ -int Infiniband::QueuePair::to_dead() -{ - if (dead) - return 0; - ibv_qp_attr qpa; - memset(&qpa, 0, sizeof(qpa)); - qpa.qp_state = IBV_QPS_ERR; - - int mask = IBV_QP_STATE; - int ret = ibv_modify_qp(qp, &qpa, mask); - if (ret) { - lderr(cct) << __func__ << " failed to transition to ERROR state: " - << cpp_strerror(errno) << dendl; - return -errno; - } - dead = true; - return ret; -} - int Infiniband::post_chunk(Chunk* chunk) { ibv_sge isge; @@ -354,35 +856,6 @@ Infiniband::CompletionQueue* Infiniband::create_comp_queue( return cq; } - -Infiniband::QueuePair::QueuePair( - CephContext *c, Infiniband& infiniband, ibv_qp_type type, - int port, ibv_srq *srq, - Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, - uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key) -: cct(c), infiniband(infiniband), - type(type), - ctxt(infiniband.device->ctxt), - ib_physical_port(port), - pd(infiniband.pd->pd), - srq(srq), - qp(NULL), - txcq(txcq), - rxcq(rxcq), - initial_psn(0), - max_send_wr(max_send_wr), - max_recv_wr(max_recv_wr), - q_key(q_key), - dead(false) -{ - initial_psn = lrand48() & 0xffffff; - if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) { - lderr(cct) << __func__ << "invalid queue pair type" << cpp_strerror(errno) << dendl; - ceph_abort(); - } - pd = infiniband.pd->pd; -} - // 1 means no valid buffer read, 0 means got enough buffer // else return < 0 means error int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im) @@ -476,106 +949,6 @@ Infiniband::QueuePair::~QueuePair() assert(!ibv_destroy_qp(qp)); } -Infiniband::CompletionChannel::~CompletionChannel() -{ - if (channel) { - int r = ibv_destroy_comp_channel(channel); - if (r < 0) - lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl; - assert(r == 0); - } -} - -Infiniband::CompletionQueue::~CompletionQueue() -{ - if (cq) { - int r = ibv_destroy_cq(cq); - if (r < 0) - lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl; - assert(r == 0); - } -} - -int Infiniband::CompletionQueue::rearm_notify(bool solicite_only) -{ - ldout(cct, 20) << __func__ << " started." << dendl; - int r = ibv_req_notify_cq(cq, 0); - if (r < 0) - lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl; - return r; -} - -int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) { - int r = ibv_poll_cq(cq, num_entries, ret_wc_array); - if (r < 0) { - lderr(cct) << __func__ << " poll_completion_queue occur met error: " - << cpp_strerror(errno) << dendl; - return -1; - } - return r; -} - -bool Infiniband::CompletionChannel::get_cq_event() -{ - ibv_cq *cq = NULL; - void *ev_ctx; - if (ibv_get_cq_event(channel, &cq, &ev_ctx)) { - if (errno != EAGAIN && errno != EINTR) - lderr(cct) << __func__ << " failed to retrieve CQ event: " - << cpp_strerror(errno) << dendl; - return false; - } - - /* accumulate number of cq events that need to - * * be acked, and periodically ack them - * */ - if (++cq_events_that_need_ack == MAX_ACK_EVENT) { - ldout(cct, 20) << __func__ << " ack aq events." << dendl; - ibv_ack_cq_events(cq, MAX_ACK_EVENT); - cq_events_that_need_ack = 0; - } - - return true; -} - -int Infiniband::CompletionQueue::init() -{ - cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0); - if (!cq) { - lderr(cct) << __func__ << " failed to create receive completion queue: " - << cpp_strerror(errno) << dendl; - return -1; - } - - if (ibv_req_notify_cq(cq, 0)) { - lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl; - ibv_destroy_cq(cq); - cq = nullptr; - return -1; - } - - channel->bind_cq(cq); - ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl; - return 0; -} - -int Infiniband::CompletionChannel::init() -{ - ldout(cct, 20) << __func__ << " started." << dendl; - channel = ibv_create_comp_channel(infiniband.device->ctxt); - if (!channel) { - lderr(cct) << __func__ << " failed to create receive completion channel: " - << cpp_strerror(errno) << dendl; - return -1; - } - int rc = NetHandler(cct).set_nonblock(channel->fd); - if (rc < 0) { - ibv_destroy_comp_channel(channel); - return -1; - } - return 0; -} - /** * Given a string representation of the `status' field from Verbs * struct `ibv_wc'. diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index fdee49e24d0d..d8e70c2ff80e 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -127,18 +127,9 @@ class Infiniband { public: class ProtectionDomain { public: - explicit ProtectionDomain(CephContext *cct, Device *device) - : pd(ibv_alloc_pd(device->ctxt)) - { - if (pd == NULL) { - lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl; - ceph_abort(); - } - } - ~ProtectionDomain() { - int rc = ibv_dealloc_pd(pd); - assert(rc == 0); - } + explicit ProtectionDomain(CephContext *cct, Device *device); + ~ProtectionDomain(); + ibv_pd* const pd; }; @@ -147,83 +138,22 @@ class Infiniband { public: class Chunk { public: - Chunk(char* b, uint32_t len, ibv_mr* m) : buffer(b), bytes(len), offset(0), mr(m) {} - ~Chunk() { - assert(ibv_dereg_mr(mr) == 0); - } - - void set_offset(uint32_t o) { - offset = o; - } - - uint32_t get_offset() { - return offset; - } - - void set_bound(uint32_t b) { - bound = b; - } - - void prepare_read(uint32_t b) { - offset = 0; - bound = b; - } - - uint32_t get_bound() { - return bound; - } - - uint32_t read(char* buf, uint32_t len) { - uint32_t left = bound - offset; - if (left >= len) { - memcpy(buf, buffer+offset, len); - offset += len; - return len; - } else { - memcpy(buf, buffer+offset, left); - offset = 0; - bound = 0; - return left; - } - } - - uint32_t write(char* buf, uint32_t len) { - uint32_t left = bytes - offset; - if (left >= len) { - memcpy(buffer+offset, buf, len); - offset += len; - return len; - } else { - memcpy(buffer+offset, buf, left); - offset = bytes; - return left; - } - } - - bool full() { - return offset == bytes; - } - - bool over() { - return offset == bound; - } - - void clear() { - offset = 0; - bound = 0; - } - - void post_srq(Infiniband *ib) { - ib->post_chunk(this); - } - - void set_owner(uint64_t o) { - owner = o; - } - - uint64_t get_owner() { - return owner; - } + Chunk(char* b, uint32_t len, ibv_mr* m); + ~Chunk(); + + void set_offset(uint32_t o); + uint32_t get_offset(); + void set_bound(uint32_t b); + void prepare_read(uint32_t b); + uint32_t get_bound(); + uint32_t read(char* buf, uint32_t len); + uint32_t write(char* buf, uint32_t len); + bool full(); + bool over(); + void clear(); + void post_srq(Infiniband *ib); + void set_owner(uint64_t o); + uint64_t get_owner(); public: char* buffer; @@ -236,69 +166,14 @@ class Infiniband { class Cluster { public: - Cluster(MemoryManager& m, uint32_t s) : manager(m), chunk_size(s), lock("cluster_lock"){} - Cluster(MemoryManager& m, uint32_t s, uint32_t n) : manager(m), chunk_size(s), lock("cluster_lock"){ - add(n); - } - - ~Cluster() { - set::iterator c = all_chunks.begin(); - while(c != all_chunks.end()) { - delete *c; - ++c; - } - if (manager.enabled_huge_page) - manager.free_huge_pages(base); - else - delete base; - } - int add(uint32_t num) { - uint32_t bytes = chunk_size * num; - //cihar* base = (char*)malloc(bytes); - if (manager.enabled_huge_page) { - base = (char*)manager.malloc_huge_pages(bytes); - } else { - base = (char*)memalign(CEPH_PAGE_SIZE, bytes); - } - assert(base); - for (uint32_t offset = 0; offset < bytes; offset += chunk_size){ - ibv_mr* m = ibv_reg_mr(manager.pd->pd, base+offset, chunk_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE); - assert(m); - Chunk* c = new Chunk(base+offset,chunk_size,m); - free_chunks.push_back(c); - all_chunks.insert(c); - } - return 0; - } + Cluster(MemoryManager& m, uint32_t s); + Cluster(MemoryManager& m, uint32_t s, uint32_t n); + ~Cluster(); - void take_back(Chunk* ck) { - Mutex::Locker l(lock); - free_chunks.push_back(ck); - } + int add(uint32_t num); + void take_back(Chunk* ck); + int get_buffers(std::vector &chunks, size_t bytes); - int get_buffers(std::vector &chunks, size_t bytes) { - uint32_t num = bytes / chunk_size + 1; - if (bytes % chunk_size == 0) - --num; - int r = num; - Mutex::Locker l(lock); - if (free_chunks.empty()) - return 0; - if (!bytes) { - free_chunks.swap(chunks); - r = chunks.size(); - return r; - } - if (free_chunks.size() < num) { - num = free_chunks.size(); - r = num; - } - for (uint32_t i = 0; i < num; ++i) { - chunks.push_back(free_chunks.back()); - free_chunks.pop_back(); - } - return r; - } MemoryManager& manager; uint32_t chunk_size; Mutex lock; @@ -307,63 +182,20 @@ class Infiniband { char* base; }; - MemoryManager(Device *d, ProtectionDomain *p, bool hugepage): device(d), pd(p) { - enabled_huge_page = hugepage; - } - ~MemoryManager() { - if (channel) - delete channel; - if (send) - delete send; - } - void* malloc_huge_pages(size_t size) { - size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE); - char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0); - if (ptr == MAP_FAILED) { - ptr = (char *)malloc(real_size); - if (ptr == NULL) return NULL; - real_size = 0; - } - *((size_t *)ptr) = real_size; - return ptr + HUGE_PAGE_SIZE; - } - void free_huge_pages(void *ptr) { - if (ptr == NULL) return; - void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE; - size_t real_size = *((size_t *)real_ptr); - assert(real_size % HUGE_PAGE_SIZE == 0); - if (real_size != 0) - munmap(real_ptr, real_size); - else - free(real_ptr); - } - void register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num) { - assert(device); - assert(pd); - channel = new Cluster(*this, size); - channel->add(rx_num); - - send = new Cluster(*this, size); - send->add(tx_num); - } - void return_tx(std::vector &chunks) { - for (auto c : chunks) { - c->clear(); - send->take_back(c); - } - } - - int get_send_buffers(std::vector &c, size_t bytes) { - return send->get_buffers(c, bytes); - } - - int get_channel_buffers(std::vector &chunks, size_t bytes) { - return channel->get_buffers(chunks, bytes); - } + MemoryManager(Device *d, ProtectionDomain *p, bool hugepage); + ~MemoryManager(); + void* malloc_huge_pages(size_t size); + void free_huge_pages(void *ptr); + void register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num); + void return_tx(std::vector &chunks); + int get_send_buffers(std::vector &c, size_t bytes); + int get_channel_buffers(std::vector &chunks, size_t bytes); int is_tx_chunk(Chunk* c) { return send->all_chunks.count(c);} int is_rx_chunk(Chunk* c) { return channel->all_chunks.count(c);} + bool enabled_huge_page; + private: Cluster* channel;//RECV Cluster* send;// SEND @@ -386,15 +218,7 @@ class Infiniband { public: explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p); - - /** - * Destroy an Infiniband object. - */ - ~Infiniband() { - assert(ibv_destroy_srq(srq) == 0); - delete memory_manager; - delete pd; - } + ~Infiniband(); class CompletionChannel { static const uint32_t MAX_ACK_EVENT = 5000; @@ -405,18 +229,14 @@ class Infiniband { uint32_t cq_events_that_need_ack; public: - CompletionChannel(CephContext *c, Infiniband &ib) - : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) {} + CompletionChannel(CephContext *c, Infiniband &ib); ~CompletionChannel(); int init(); bool get_cq_event(); int get_fd() { return channel->fd; } ibv_comp_channel* get_channel() { return channel; } void bind_cq(ibv_cq *c) { cq = c; } - void ack_events() { - ibv_ack_cq_events(cq, cq_events_that_need_ack); - cq_events_that_need_ack = 0; - } + void ack_events(); }; // this class encapsulates the creation, use, and destruction of an RC @@ -475,71 +295,21 @@ class Infiniband { * Get the remote queue pair number for this QueuePair, as set in #plumb(). * QPNs are analogous to UDP/TCP port numbers. */ - int get_remote_qp_number(uint32_t *rqp) const { - ibv_qp_attr qpa; - ibv_qp_init_attr qpia; - - int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia); - if (r) { - lderr(cct) << __func__ << " failed to query qp: " - << cpp_strerror(errno) << dendl; - return -1; - } - - if (rqp) - *rqp = qpa.dest_qp_num; - return 0; - } + int get_remote_qp_number(uint32_t *rqp) const; /** * Get the remote infiniband address for this QueuePair, as set in #plumb(). * LIDs are "local IDs" in infiniband terminology. They are short, locally * routable addresses. */ - int get_remote_lid(uint16_t *lid) const { - ibv_qp_attr qpa; - ibv_qp_init_attr qpia; - - int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia); - if (r) { - lderr(cct) << __func__ << " failed to query qp: " - << cpp_strerror(errno) << dendl; - return -1; - } - - if (lid) - *lid = qpa.ah_attr.dlid; - return 0; - } + int get_remote_lid(uint16_t *lid) const; /** * Get the state of a QueuePair. */ - int get_state() const { - ibv_qp_attr qpa; - ibv_qp_init_attr qpia; - - int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia); - if (r) { - lderr(cct) << __func__ << " failed to get state: " - << cpp_strerror(errno) << dendl; - return -1; - } - return qpa.qp_state; - } + int get_state() const; /** * Return true if the queue pair is in an error state, false otherwise. */ - bool is_error() const { - ibv_qp_attr qpa; - ibv_qp_init_attr qpia; - - int r = ibv_query_qp(qp, &qpa, -1, &qpia); - if (r) { - lderr(cct) << __func__ << " failed to get state: " - << cpp_strerror(errno) << dendl; - return true; - } - return qpa.cur_qp_state == IBV_QPS_ERR; - } + bool is_error() const; ibv_qp* get_qp() const { return qp; } Infiniband::CompletionQueue* get_tx_cq() const { return txcq; } Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; } @@ -571,14 +341,10 @@ class Infiniband { ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge); int post_chunk(Chunk* chunk); int post_channel_cluster(); - int get_tx_buffers(std::vector &c, size_t bytes) { - return memory_manager->get_send_buffers(c, bytes); - } + int get_tx_buffers(std::vector &c, size_t bytes); CompletionChannel *create_comp_channel(CephContext *c); CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL); - uint8_t get_ib_physical_port() { - return ib_physical_port; - } + uint8_t get_ib_physical_port() { return ib_physical_port; } int send_msg(CephContext *cct, int sd, IBSYNMsg& msg); int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg); uint16_t get_lid() { return device->get_lid(); } @@ -586,18 +352,7 @@ class Infiniband { MemoryManager* get_memory_manager() { return memory_manager; } Device* get_device() { return device; } int get_async_fd() { return device->ctxt->async_fd; } - int recall_chunk(Chunk* c) { - if (memory_manager->is_rx_chunk(c)) { - post_chunk(c); - return 1; - } else if (memory_manager->is_tx_chunk(c)) { - vector v; - v.push_back(c); - memory_manager->return_tx(v); - return 2; - } - return -1; - } + int recall_chunk(Chunk* c); int is_tx_chunk(Chunk* c) { return memory_manager->is_tx_chunk(c); } int is_rx_chunk(Chunk* c) { return memory_manager->is_rx_chunk(c); } static const char* wc_status_to_string(int status);