static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
static const uint32_t CQ_DEPTH = 30000;
+Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr)
+{
+ union ibv_gid cgid;
+ struct ibv_exp_gid_attr gid_attr;
+ bool malformed = false;
+
+ int r = ibv_query_port(ctxt, port_num, port_attr);
+ if (r == -1) {
+ lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+
+ lid = port_attr->lid;
+
+ // search for requested GID in GIDs table
+ ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid)
+ << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl;
+ r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(),
+ "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx"
+ ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx",
+ &cgid.raw[ 0], &cgid.raw[ 1],
+ &cgid.raw[ 2], &cgid.raw[ 3],
+ &cgid.raw[ 4], &cgid.raw[ 5],
+ &cgid.raw[ 6], &cgid.raw[ 7],
+ &cgid.raw[ 8], &cgid.raw[ 9],
+ &cgid.raw[10], &cgid.raw[11],
+ &cgid.raw[12], &cgid.raw[13],
+ &cgid.raw[14], &cgid.raw[15]);
+
+ if (r != 16) {
+ ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl;
+ malformed = true;
+ }
+
+ gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE;
+
+ for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) {
+ r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);
+ if (r) {
+ lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+ r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr);
+ if (r) {
+ lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+
+ if (malformed) break; // stay with gid_idx=0
+ if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) &&
+ (memcmp(&gid, &cgid, 16) == 0) ) {
+ ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl;
+ break;
+ }
+ }
+
+ if (gid_idx == port_attr->gid_tbl_len) {
+ lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl;
+ ceph_abort();
+ }
+}
+
+
Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr)
{
if (device == NULL) {
}
}
-Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) {
- union ibv_gid cgid;
- struct ibv_exp_gid_attr gid_attr;
- bool malformed = false;
-
- int r = ibv_query_port(ctxt, port_num, port_attr);
- if (r == -1) {
- lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;
- ceph_abort();
- }
-
- lid = port_attr->lid;
-
- // search for requested GID in GIDs table
- ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid)
- << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl;
- r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(),
- "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx"
- ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx",
- &cgid.raw[ 0], &cgid.raw[ 1],
- &cgid.raw[ 2], &cgid.raw[ 3],
- &cgid.raw[ 4], &cgid.raw[ 5],
- &cgid.raw[ 6], &cgid.raw[ 7],
- &cgid.raw[ 8], &cgid.raw[ 9],
- &cgid.raw[10], &cgid.raw[11],
- &cgid.raw[12], &cgid.raw[13],
- &cgid.raw[14], &cgid.raw[15]);
-
- if (r != 16) {
- ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl;
- malformed = true;
- }
-
- gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE;
-
- for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) {
- r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);
- if (r) {
- lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
- ceph_abort();
- }
- r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr);
- if (r) {
- lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
- ceph_abort();
- }
-
- if (malformed) break; // stay with gid_idx=0
- if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) &&
- (memcmp(&gid, &cgid, 16) == 0) ) {
- ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl;
- break;
- }
- }
-
- if (gid_idx == port_attr->gid_tbl_len) {
- lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl;
- ceph_abort();
- }
- }
+void Device::binding_port(CephContext *cct, uint8_t port_num) {
+ port_cnt = device_attr->phys_port_cnt;
+ ports = new Port*[port_cnt];
+ for (uint8_t i = 0; i < port_cnt; ++i) {
+ ports[i] = new Port(cct, ctxt, i+1);
+ if (i+1 == port_num && ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) {
+ active_port = ports[i];
+ ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;
+ return ;
+ } else {
+ ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << ports[i]->get_port_attr()->state << ")"<< dendl;
+ }
+ }
+ if (nullptr == active_port) {
+ lderr(cct) << __func__ << " port not found" << dendl;
+ assert(active_port);
+ }
+}
+
+
+Infiniband::QueuePair::QueuePair(
+ CephContext *c, Infiniband& infiniband, ibv_qp_type type,
+ int port, ibv_srq *srq,
+ Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
+ uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)
+: cct(c), infiniband(infiniband),
+ type(type),
+ ctxt(infiniband.device->ctxt),
+ ib_physical_port(port),
+ pd(infiniband.pd->pd),
+ srq(srq),
+ qp(NULL),
+ txcq(txcq),
+ rxcq(rxcq),
+ initial_psn(0),
+ max_send_wr(max_send_wr),
+ max_recv_wr(max_recv_wr),
+ q_key(q_key),
+ dead(false)
+{
+ initial_psn = lrand48() & 0xffffff;
+ if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {
+ lderr(cct) << __func__ << "invalid queue pair type" << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+ pd = infiniband.pd->pd;
+}
+
+int Infiniband::QueuePair::init()
+{
+ ldout(cct, 20) << __func__ << " started." << dendl;
+ ibv_qp_init_attr qpia;
+ memset(&qpia, 0, sizeof(qpia));
+ qpia.send_cq = txcq->get_cq();
+ qpia.recv_cq = rxcq->get_cq();
+ qpia.srq = srq; // use the same shared receive queue
+ qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests
+ qpia.cap.max_send_sge = 1; // max send scatter-gather elements
+ qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q
+ qpia.qp_type = type; // RC, UC, UD, or XRC
+ qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs
+
+ qp = ibv_create_qp(pd, &qpia);
+ if (qp == NULL) {
+ lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
+ lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_buffers or"
+ " ms_async_rdma_send_buffers" << dendl;
+ return -1;
+ }
+
+ ldout(cct, 20) << __func__ << " successfully create queue pair: "
+ << "qp=" << qp << dendl;
+
+ // move from RESET to INIT state
+ ibv_qp_attr qpa;
+ memset(&qpa, 0, sizeof(qpa));
+ qpa.qp_state = IBV_QPS_INIT;
+ qpa.pkey_index = 0;
+ qpa.port_num = (uint8_t)(ib_physical_port);
+ qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;
+ qpa.qkey = q_key;
+
+ int mask = IBV_QP_STATE | IBV_QP_PORT;
+ switch (type) {
+ case IBV_QPT_RC:
+ mask |= IBV_QP_ACCESS_FLAGS;
+ mask |= IBV_QP_PKEY_INDEX;
+ break;
+ case IBV_QPT_UD:
+ mask |= IBV_QP_QKEY;
+ mask |= IBV_QP_PKEY_INDEX;
+ break;
+ case IBV_QPT_RAW_PACKET:
+ break;
+ default:
+ ceph_abort();
+ }
+
+ int ret = ibv_modify_qp(qp, &qpa, mask);
+ if (ret) {
+ ibv_destroy_qp(qp);
+ lderr(cct) << __func__ << " failed to transition to INIT state: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ ldout(cct, 20) << __func__ << " successfully change queue pair to INIT:"
+ << " qp=" << qp << dendl;
+ return 0;
+}
+
+/**
+ * Change RC QueuePair into the ERROR state. This is necessary modify
+ * the Queue Pair into the Error state and poll all of the relevant
+ * Work Completions prior to destroying a Queue Pair.
+ * Since destroying a Queue Pair does not guarantee that its Work
+ * Completions are removed from the CQ upon destruction. Even if the
+ * Work Completions are already in the CQ, it might not be possible to
+ * retrieve them. If the Queue Pair is associated with an SRQ, it is
+ * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED
+ *
+ * \return
+ * -errno if the QueuePair can't switch to ERROR
+ * 0 for success.
+ */
+int Infiniband::QueuePair::to_dead()
+{
+ if (dead)
+ return 0;
+ ibv_qp_attr qpa;
+ memset(&qpa, 0, sizeof(qpa));
+ qpa.qp_state = IBV_QPS_ERR;
+
+ int mask = IBV_QP_STATE;
+ int ret = ibv_modify_qp(qp, &qpa, mask);
+ if (ret) {
+ lderr(cct) << __func__ << " failed to transition to ERROR state: "
+ << cpp_strerror(errno) << dendl;
+ return -errno;
+ }
+ dead = true;
+ return ret;
+}
+
+int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp) const
+{
+ ibv_qp_attr qpa;
+ ibv_qp_init_attr qpia;
+
+ int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);
+ if (r) {
+ lderr(cct) << __func__ << " failed to query qp: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+
+ if (rqp)
+ *rqp = qpa.dest_qp_num;
+ return 0;
+}
+
+/**
+ * Get the remote infiniband address for this QueuePair, as set in #plumb().
+ * LIDs are "local IDs" in infiniband terminology. They are short, locally
+ * routable addresses.
+ */
+int Infiniband::QueuePair::get_remote_lid(uint16_t *lid) const
+{
+ ibv_qp_attr qpa;
+ ibv_qp_init_attr qpia;
+
+ int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);
+ if (r) {
+ lderr(cct) << __func__ << " failed to query qp: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+
+ if (lid)
+ *lid = qpa.ah_attr.dlid;
+ return 0;
+}
+
+/**
+ * Get the state of a QueuePair.
+ */
+int Infiniband::QueuePair::get_state() const
+{
+ ibv_qp_attr qpa;
+ ibv_qp_init_attr qpia;
+
+ int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);
+ if (r) {
+ lderr(cct) << __func__ << " failed to get state: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ return qpa.qp_state;
+}
+
+/**
+ * Return true if the queue pair is in an error state, false otherwise.
+ */
+bool Infiniband::QueuePair::is_error() const
+{
+ ibv_qp_attr qpa;
+ ibv_qp_init_attr qpia;
+
+ int r = ibv_query_qp(qp, &qpa, -1, &qpia);
+ if (r) {
+ lderr(cct) << __func__ << " failed to get state: "
+ << cpp_strerror(errno) << dendl;
+ return true;
+ }
+ return qpa.cur_qp_state == IBV_QPS_ERR;
+}
+
+
+Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib)
+ : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0)
+{
+}
+
+Infiniband::CompletionChannel::~CompletionChannel()
+{
+ if (channel) {
+ int r = ibv_destroy_comp_channel(channel);
+ if (r < 0)
+ lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;
+ assert(r == 0);
+ }
+}
+
+int Infiniband::CompletionChannel::init()
+{
+ ldout(cct, 20) << __func__ << " started." << dendl;
+ channel = ibv_create_comp_channel(infiniband.device->ctxt);
+ if (!channel) {
+ lderr(cct) << __func__ << " failed to create receive completion channel: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ int rc = NetHandler(cct).set_nonblock(channel->fd);
+ if (rc < 0) {
+ ibv_destroy_comp_channel(channel);
+ return -1;
+ }
+ return 0;
+}
+
+void Infiniband::CompletionChannel::ack_events()
+{
+ ibv_ack_cq_events(cq, cq_events_that_need_ack);
+ cq_events_that_need_ack = 0;
+}
+
+bool Infiniband::CompletionChannel::get_cq_event()
+{
+ ibv_cq *cq = NULL;
+ void *ev_ctx;
+ if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {
+ if (errno != EAGAIN && errno != EINTR)
+ lderr(cct) << __func__ << " failed to retrieve CQ event: "
+ << cpp_strerror(errno) << dendl;
+ return false;
+ }
+
+ /* accumulate number of cq events that need to
+ * * be acked, and periodically ack them
+ * */
+ if (++cq_events_that_need_ack == MAX_ACK_EVENT) {
+ ldout(cct, 20) << __func__ << " ack aq events." << dendl;
+ ibv_ack_cq_events(cq, MAX_ACK_EVENT);
+ cq_events_that_need_ack = 0;
+ }
+
+ return true;
+}
+
+
+Infiniband::CompletionQueue::~CompletionQueue()
+{
+ if (cq) {
+ int r = ibv_destroy_cq(cq);
+ if (r < 0)
+ lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;
+ assert(r == 0);
+ }
+}
+
+int Infiniband::CompletionQueue::init()
+{
+ cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);
+ if (!cq) {
+ lderr(cct) << __func__ << " failed to create receive completion queue: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+
+ if (ibv_req_notify_cq(cq, 0)) {
+ lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;
+ ibv_destroy_cq(cq);
+ cq = nullptr;
+ return -1;
+ }
+
+ channel->bind_cq(cq);
+ ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl;
+ return 0;
+}
+
+int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)
+{
+ ldout(cct, 20) << __func__ << " started." << dendl;
+ int r = ibv_req_notify_cq(cq, 0);
+ if (r < 0)
+ lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;
+ return r;
+}
+
+int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {
+ int r = ibv_poll_cq(cq, num_entries, ret_wc_array);
+ if (r < 0) {
+ lderr(cct) << __func__ << " poll_completion_queue occur met error: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ return r;
+}
+
+
+Infiniband::ProtectionDomain::ProtectionDomain(CephContext *cct, Device *device)
+ : pd(ibv_alloc_pd(device->ctxt))
+{
+ if (pd == NULL) {
+ lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+}
+
+Infiniband::ProtectionDomain::~ProtectionDomain()
+{
+ int rc = ibv_dealloc_pd(pd);
+ assert(rc == 0);
+}
+
+
+Infiniband::MemoryManager::Chunk::Chunk(char* b, uint32_t len, ibv_mr* m)
+ : buffer(b), bytes(len), offset(0), mr(m)
+{
+}
+
+Infiniband::MemoryManager::Chunk::~Chunk()
+{
+ assert(ibv_dereg_mr(mr) == 0);
+}
+
+void Infiniband::MemoryManager::Chunk::set_offset(uint32_t o)
+{
+ offset = o;
+}
+
+uint32_t Infiniband::MemoryManager::Chunk::get_offset()
+{
+ return offset;
+}
+
+void Infiniband::MemoryManager::Chunk::set_bound(uint32_t b)
+{
+ bound = b;
+}
+
+void Infiniband::MemoryManager::Chunk::prepare_read(uint32_t b)
+{
+ offset = 0;
+ bound = b;
+}
+
+uint32_t Infiniband::MemoryManager::Chunk::get_bound()
+{
+ return bound;
+}
+
+uint32_t Infiniband::MemoryManager::Chunk::read(char* buf, uint32_t len)
+{
+ uint32_t left = bound - offset;
+ if (left >= len) {
+ memcpy(buf, buffer+offset, len);
+ offset += len;
+ return len;
+ } else {
+ memcpy(buf, buffer+offset, left);
+ offset = 0;
+ bound = 0;
+ return left;
+ }
+}
+
+uint32_t Infiniband::MemoryManager::Chunk::write(char* buf, uint32_t len)
+{
+ uint32_t left = bytes - offset;
+ if (left >= len) {
+ memcpy(buffer+offset, buf, len);
+ offset += len;
+ return len;
+ } else {
+ memcpy(buffer+offset, buf, left);
+ offset = bytes;
+ return left;
+ }
+}
+
+bool Infiniband::MemoryManager::Chunk::full()
+{
+ return offset == bytes;
+}
+
+bool Infiniband::MemoryManager::Chunk::over()
+{
+ return Infiniband::MemoryManager::Chunk::offset == bound;
+}
+
+void Infiniband::MemoryManager::Chunk::clear()
+{
+ offset = 0;
+ bound = 0;
+}
+
+void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib)
+{
+ ib->post_chunk(this);
+}
+
+void Infiniband::MemoryManager::Chunk::set_owner(uint64_t o)
+{
+ owner = o;
+}
+
+uint64_t Infiniband::MemoryManager::Chunk::get_owner()
+{
+ return owner;
+}
+
+
+Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
+ : manager(m), chunk_size(s), lock("cluster_lock")
+{
+}
+
+Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s, uint32_t n)
+ : manager(m), chunk_size(s), lock("cluster_lock")
+{
+ add(n);
+}
+
+Infiniband::MemoryManager::Cluster::~Cluster()
+{
+ set<Chunk*>::iterator c = all_chunks.begin();
+ while(c != all_chunks.end()) {
+ delete *c;
+ ++c;
+ }
+ if (manager.enabled_huge_page)
+ manager.free_huge_pages(base);
+ else
+ delete base;
+}
+
+int Infiniband::MemoryManager::Cluster::add(uint32_t num)
+{
+ uint32_t bytes = chunk_size * num;
+ //cihar* base = (char*)malloc(bytes);
+ if (manager.enabled_huge_page) {
+ base = (char*)manager.malloc_huge_pages(bytes);
+ } else {
+ base = (char*)memalign(CEPH_PAGE_SIZE, bytes);
+ }
+ assert(base);
+ for (uint32_t offset = 0; offset < bytes; offset += chunk_size){
+ ibv_mr* m = ibv_reg_mr(manager.pd->pd, base+offset, chunk_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
+ assert(m);
+ Chunk* c = new Chunk(base+offset,chunk_size,m);
+ free_chunks.push_back(c);
+ all_chunks.insert(c);
+ }
+ return 0;
+}
+
+void Infiniband::MemoryManager::Cluster::take_back(Chunk* ck)
+{
+ Mutex::Locker l(lock);
+ free_chunks.push_back(ck);
+}
+
+int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks, size_t bytes)
+{
+ uint32_t num = bytes / chunk_size + 1;
+ if (bytes % chunk_size == 0)
+ --num;
+ int r = num;
+ Mutex::Locker l(lock);
+ if (free_chunks.empty())
+ return 0;
+ if (!bytes) {
+ free_chunks.swap(chunks);
+ r = chunks.size();
+ return r;
+ }
+ if (free_chunks.size() < num) {
+ num = free_chunks.size();
+ r = num;
+ }
+ for (uint32_t i = 0; i < num; ++i) {
+ chunks.push_back(free_chunks.back());
+ free_chunks.pop_back();
+ }
+ return r;
+}
+
+
+Infiniband::MemoryManager::MemoryManager(Device *d, ProtectionDomain *p, bool hugepage)
+ : device(d), pd(p)
+{
+ enabled_huge_page = hugepage;
+}
+
+Infiniband::MemoryManager::~MemoryManager()
+{
+ if (channel)
+ delete channel;
+ if (send)
+ delete send;
+}
+
+void* Infiniband::MemoryManager::malloc_huge_pages(size_t size)
+{
+ size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);
+ char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);
+ if (ptr == MAP_FAILED) {
+ ptr = (char *)malloc(real_size);
+ if (ptr == NULL) return NULL;
+ real_size = 0;
+ }
+ *((size_t *)ptr) = real_size;
+ return ptr + HUGE_PAGE_SIZE;
+}
+
+void Infiniband::MemoryManager::free_huge_pages(void *ptr)
+{
+ if (ptr == NULL) return;
+ void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;
+ size_t real_size = *((size_t *)real_ptr);
+ assert(real_size % HUGE_PAGE_SIZE == 0);
+ if (real_size != 0)
+ munmap(real_ptr, real_size);
+ else
+ free(real_ptr);
+}
+
+void Infiniband::MemoryManager::register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num)
+{
+ assert(device);
+ assert(pd);
+ channel = new Cluster(*this, size);
+ channel->add(rx_num);
+
+ send = new Cluster(*this, size);
+ send->add(tx_num);
+}
+
+void Infiniband::MemoryManager::return_tx(std::vector<Chunk*> &chunks)
+{
+ for (auto c : chunks) {
+ c->clear();
+ send->take_back(c);
+ }
+}
+
+int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t bytes)
+{
+ return send->get_buffers(c, bytes);
+}
-void Device::binding_port(CephContext *cct, uint8_t port_num) {
- port_cnt = device_attr->phys_port_cnt;
- ports = new Port*[port_cnt];
- for (uint8_t i = 0; i < port_cnt; ++i) {
- ports[i] = new Port(cct, ctxt, i+1);
- if (i+1 == port_num && ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) {
- active_port = ports[i];
- ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;
- return ;
- } else {
- ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << ports[i]->get_port_attr()->state << ")"<< dendl;
- }
- }
- if (nullptr == active_port) {
- lderr(cct) << __func__ << " port not found" << dendl;
- assert(active_port);
- }
+int Infiniband::MemoryManager::get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes)
+{
+ return channel->get_buffers(chunks, bytes);
}
+
Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num): device_list(cct)
{
device = device_list.get_device(device_name.c_str());
post_channel_cluster();
}
+Infiniband::~Infiniband()
+{
+ assert(ibv_destroy_srq(srq) == 0);
+ delete memory_manager;
+ delete pd;
+}
+
/**
* Create a shared receive queue. This basically wraps the verbs call.
*
return ibv_create_srq(pd->pd, &sia);
}
+int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
+{
+ return memory_manager->get_send_buffers(c, bytes);
+}
+
+int Infiniband::recall_chunk(Chunk* c)
+{
+ if (memory_manager->is_rx_chunk(c)) {
+ post_chunk(c);
+ return 1;
+ } else if (memory_manager->is_tx_chunk(c)) {
+ vector<Chunk*> v;
+ v.push_back(c);
+ memory_manager->return_tx(v);
+ return 2;
+ }
+ return -1;
+}
+
/**
* Create a new QueuePair. This factory should be used in preference to
* the QueuePair constructor directly, since this lets derivatives of
return qp;
}
-int Infiniband::QueuePair::init()
-{
- ldout(cct, 20) << __func__ << " started." << dendl;
- ibv_qp_init_attr qpia;
- memset(&qpia, 0, sizeof(qpia));
- qpia.send_cq = txcq->get_cq();
- qpia.recv_cq = rxcq->get_cq();
- qpia.srq = srq; // use the same shared receive queue
- qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests
- qpia.cap.max_send_sge = 1; // max send scatter-gather elements
- qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q
- qpia.qp_type = type; // RC, UC, UD, or XRC
- qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs
-
- qp = ibv_create_qp(pd, &qpia);
- if (qp == NULL) {
- lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
- lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_buffers or"
- " ms_async_rdma_send_buffers" << dendl;
- return -1;
- }
-
- ldout(cct, 20) << __func__ << " successfully create queue pair: "
- << "qp=" << qp << dendl;
-
- // move from RESET to INIT state
- ibv_qp_attr qpa;
- memset(&qpa, 0, sizeof(qpa));
- qpa.qp_state = IBV_QPS_INIT;
- qpa.pkey_index = 0;
- qpa.port_num = (uint8_t)(ib_physical_port);
- qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;
- qpa.qkey = q_key;
-
- int mask = IBV_QP_STATE | IBV_QP_PORT;
- switch (type) {
- case IBV_QPT_RC:
- mask |= IBV_QP_ACCESS_FLAGS;
- mask |= IBV_QP_PKEY_INDEX;
- break;
- case IBV_QPT_UD:
- mask |= IBV_QP_QKEY;
- mask |= IBV_QP_PKEY_INDEX;
- break;
- case IBV_QPT_RAW_PACKET:
- break;
- default:
- ceph_abort();
- }
-
- int ret = ibv_modify_qp(qp, &qpa, mask);
- if (ret) {
- ibv_destroy_qp(qp);
- lderr(cct) << __func__ << " failed to transition to INIT state: "
- << cpp_strerror(errno) << dendl;
- return -1;
- }
- ldout(cct, 20) << __func__ << " successfully change queue pair to INIT:"
- << " qp=" << qp << dendl;
- return 0;
-}
-
-/**
- * Change RC QueuePair into the ERROR state. This is necessary modify
- * the Queue Pair into the Error state and poll all of the relevant
- * Work Completions prior to destroying a Queue Pair.
- * Since destroying a Queue Pair does not guarantee that its Work
- * Completions are removed from the CQ upon destruction. Even if the
- * Work Completions are already in the CQ, it might not be possible to
- * retrieve them. If the Queue Pair is associated with an SRQ, it is
- * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED
- *
- * \return
- * -errno if the QueuePair can't switch to ERROR
- * 0 for success.
- */
-int Infiniband::QueuePair::to_dead()
-{
- if (dead)
- return 0;
- ibv_qp_attr qpa;
- memset(&qpa, 0, sizeof(qpa));
- qpa.qp_state = IBV_QPS_ERR;
-
- int mask = IBV_QP_STATE;
- int ret = ibv_modify_qp(qp, &qpa, mask);
- if (ret) {
- lderr(cct) << __func__ << " failed to transition to ERROR state: "
- << cpp_strerror(errno) << dendl;
- return -errno;
- }
- dead = true;
- return ret;
-}
-
int Infiniband::post_chunk(Chunk* chunk)
{
ibv_sge isge;
return cq;
}
-
-Infiniband::QueuePair::QueuePair(
- CephContext *c, Infiniband& infiniband, ibv_qp_type type,
- int port, ibv_srq *srq,
- Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
- uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)
-: cct(c), infiniband(infiniband),
- type(type),
- ctxt(infiniband.device->ctxt),
- ib_physical_port(port),
- pd(infiniband.pd->pd),
- srq(srq),
- qp(NULL),
- txcq(txcq),
- rxcq(rxcq),
- initial_psn(0),
- max_send_wr(max_send_wr),
- max_recv_wr(max_recv_wr),
- q_key(q_key),
- dead(false)
-{
- initial_psn = lrand48() & 0xffffff;
- if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {
- lderr(cct) << __func__ << "invalid queue pair type" << cpp_strerror(errno) << dendl;
- ceph_abort();
- }
- pd = infiniband.pd->pd;
-}
-
// 1 means no valid buffer read, 0 means got enough buffer
// else return < 0 means error
int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
assert(!ibv_destroy_qp(qp));
}
-Infiniband::CompletionChannel::~CompletionChannel()
-{
- if (channel) {
- int r = ibv_destroy_comp_channel(channel);
- if (r < 0)
- lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;
- assert(r == 0);
- }
-}
-
-Infiniband::CompletionQueue::~CompletionQueue()
-{
- if (cq) {
- int r = ibv_destroy_cq(cq);
- if (r < 0)
- lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;
- assert(r == 0);
- }
-}
-
-int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)
-{
- ldout(cct, 20) << __func__ << " started." << dendl;
- int r = ibv_req_notify_cq(cq, 0);
- if (r < 0)
- lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;
- return r;
-}
-
-int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {
- int r = ibv_poll_cq(cq, num_entries, ret_wc_array);
- if (r < 0) {
- lderr(cct) << __func__ << " poll_completion_queue occur met error: "
- << cpp_strerror(errno) << dendl;
- return -1;
- }
- return r;
-}
-
-bool Infiniband::CompletionChannel::get_cq_event()
-{
- ibv_cq *cq = NULL;
- void *ev_ctx;
- if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {
- if (errno != EAGAIN && errno != EINTR)
- lderr(cct) << __func__ << " failed to retrieve CQ event: "
- << cpp_strerror(errno) << dendl;
- return false;
- }
-
- /* accumulate number of cq events that need to
- * * be acked, and periodically ack them
- * */
- if (++cq_events_that_need_ack == MAX_ACK_EVENT) {
- ldout(cct, 20) << __func__ << " ack aq events." << dendl;
- ibv_ack_cq_events(cq, MAX_ACK_EVENT);
- cq_events_that_need_ack = 0;
- }
-
- return true;
-}
-
-int Infiniband::CompletionQueue::init()
-{
- cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);
- if (!cq) {
- lderr(cct) << __func__ << " failed to create receive completion queue: "
- << cpp_strerror(errno) << dendl;
- return -1;
- }
-
- if (ibv_req_notify_cq(cq, 0)) {
- lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;
- ibv_destroy_cq(cq);
- cq = nullptr;
- return -1;
- }
-
- channel->bind_cq(cq);
- ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl;
- return 0;
-}
-
-int Infiniband::CompletionChannel::init()
-{
- ldout(cct, 20) << __func__ << " started." << dendl;
- channel = ibv_create_comp_channel(infiniband.device->ctxt);
- if (!channel) {
- lderr(cct) << __func__ << " failed to create receive completion channel: "
- << cpp_strerror(errno) << dendl;
- return -1;
- }
- int rc = NetHandler(cct).set_nonblock(channel->fd);
- if (rc < 0) {
- ibv_destroy_comp_channel(channel);
- return -1;
- }
- return 0;
-}
-
/**
* Given a string representation of the `status' field from Verbs
* struct `ibv_wc'.
public:
class ProtectionDomain {
public:
- explicit ProtectionDomain(CephContext *cct, Device *device)
- : pd(ibv_alloc_pd(device->ctxt))
- {
- if (pd == NULL) {
- lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;
- ceph_abort();
- }
- }
- ~ProtectionDomain() {
- int rc = ibv_dealloc_pd(pd);
- assert(rc == 0);
- }
+ explicit ProtectionDomain(CephContext *cct, Device *device);
+ ~ProtectionDomain();
+
ibv_pd* const pd;
};
public:
class Chunk {
public:
- Chunk(char* b, uint32_t len, ibv_mr* m) : buffer(b), bytes(len), offset(0), mr(m) {}
- ~Chunk() {
- assert(ibv_dereg_mr(mr) == 0);
- }
-
- void set_offset(uint32_t o) {
- offset = o;
- }
-
- uint32_t get_offset() {
- return offset;
- }
-
- void set_bound(uint32_t b) {
- bound = b;
- }
-
- void prepare_read(uint32_t b) {
- offset = 0;
- bound = b;
- }
-
- uint32_t get_bound() {
- return bound;
- }
-
- uint32_t read(char* buf, uint32_t len) {
- uint32_t left = bound - offset;
- if (left >= len) {
- memcpy(buf, buffer+offset, len);
- offset += len;
- return len;
- } else {
- memcpy(buf, buffer+offset, left);
- offset = 0;
- bound = 0;
- return left;
- }
- }
-
- uint32_t write(char* buf, uint32_t len) {
- uint32_t left = bytes - offset;
- if (left >= len) {
- memcpy(buffer+offset, buf, len);
- offset += len;
- return len;
- } else {
- memcpy(buffer+offset, buf, left);
- offset = bytes;
- return left;
- }
- }
-
- bool full() {
- return offset == bytes;
- }
-
- bool over() {
- return offset == bound;
- }
-
- void clear() {
- offset = 0;
- bound = 0;
- }
-
- void post_srq(Infiniband *ib) {
- ib->post_chunk(this);
- }
-
- void set_owner(uint64_t o) {
- owner = o;
- }
-
- uint64_t get_owner() {
- return owner;
- }
+ Chunk(char* b, uint32_t len, ibv_mr* m);
+ ~Chunk();
+
+ void set_offset(uint32_t o);
+ uint32_t get_offset();
+ void set_bound(uint32_t b);
+ void prepare_read(uint32_t b);
+ uint32_t get_bound();
+ uint32_t read(char* buf, uint32_t len);
+ uint32_t write(char* buf, uint32_t len);
+ bool full();
+ bool over();
+ void clear();
+ void post_srq(Infiniband *ib);
+ void set_owner(uint64_t o);
+ uint64_t get_owner();
public:
char* buffer;
class Cluster {
public:
- Cluster(MemoryManager& m, uint32_t s) : manager(m), chunk_size(s), lock("cluster_lock"){}
- Cluster(MemoryManager& m, uint32_t s, uint32_t n) : manager(m), chunk_size(s), lock("cluster_lock"){
- add(n);
- }
-
- ~Cluster() {
- set<Chunk*>::iterator c = all_chunks.begin();
- while(c != all_chunks.end()) {
- delete *c;
- ++c;
- }
- if (manager.enabled_huge_page)
- manager.free_huge_pages(base);
- else
- delete base;
- }
- int add(uint32_t num) {
- uint32_t bytes = chunk_size * num;
- //cihar* base = (char*)malloc(bytes);
- if (manager.enabled_huge_page) {
- base = (char*)manager.malloc_huge_pages(bytes);
- } else {
- base = (char*)memalign(CEPH_PAGE_SIZE, bytes);
- }
- assert(base);
- for (uint32_t offset = 0; offset < bytes; offset += chunk_size){
- ibv_mr* m = ibv_reg_mr(manager.pd->pd, base+offset, chunk_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
- assert(m);
- Chunk* c = new Chunk(base+offset,chunk_size,m);
- free_chunks.push_back(c);
- all_chunks.insert(c);
- }
- return 0;
- }
+ Cluster(MemoryManager& m, uint32_t s);
+ Cluster(MemoryManager& m, uint32_t s, uint32_t n);
+ ~Cluster();
- void take_back(Chunk* ck) {
- Mutex::Locker l(lock);
- free_chunks.push_back(ck);
- }
+ int add(uint32_t num);
+ void take_back(Chunk* ck);
+ int get_buffers(std::vector<Chunk*> &chunks, size_t bytes);
- int get_buffers(std::vector<Chunk*> &chunks, size_t bytes) {
- uint32_t num = bytes / chunk_size + 1;
- if (bytes % chunk_size == 0)
- --num;
- int r = num;
- Mutex::Locker l(lock);
- if (free_chunks.empty())
- return 0;
- if (!bytes) {
- free_chunks.swap(chunks);
- r = chunks.size();
- return r;
- }
- if (free_chunks.size() < num) {
- num = free_chunks.size();
- r = num;
- }
- for (uint32_t i = 0; i < num; ++i) {
- chunks.push_back(free_chunks.back());
- free_chunks.pop_back();
- }
- return r;
- }
MemoryManager& manager;
uint32_t chunk_size;
Mutex lock;
char* base;
};
- MemoryManager(Device *d, ProtectionDomain *p, bool hugepage): device(d), pd(p) {
- enabled_huge_page = hugepage;
- }
- ~MemoryManager() {
- if (channel)
- delete channel;
- if (send)
- delete send;
- }
- void* malloc_huge_pages(size_t size) {
- size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);
- char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);
- if (ptr == MAP_FAILED) {
- ptr = (char *)malloc(real_size);
- if (ptr == NULL) return NULL;
- real_size = 0;
- }
- *((size_t *)ptr) = real_size;
- return ptr + HUGE_PAGE_SIZE;
- }
- void free_huge_pages(void *ptr) {
- if (ptr == NULL) return;
- void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;
- size_t real_size = *((size_t *)real_ptr);
- assert(real_size % HUGE_PAGE_SIZE == 0);
- if (real_size != 0)
- munmap(real_ptr, real_size);
- else
- free(real_ptr);
- }
- void register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num) {
- assert(device);
- assert(pd);
- channel = new Cluster(*this, size);
- channel->add(rx_num);
-
- send = new Cluster(*this, size);
- send->add(tx_num);
- }
- void return_tx(std::vector<Chunk*> &chunks) {
- for (auto c : chunks) {
- c->clear();
- send->take_back(c);
- }
- }
-
- int get_send_buffers(std::vector<Chunk*> &c, size_t bytes) {
- return send->get_buffers(c, bytes);
- }
-
- int get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes) {
- return channel->get_buffers(chunks, bytes);
- }
+ MemoryManager(Device *d, ProtectionDomain *p, bool hugepage);
+ ~MemoryManager();
+ void* malloc_huge_pages(size_t size);
+ void free_huge_pages(void *ptr);
+ void register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num);
+ void return_tx(std::vector<Chunk*> &chunks);
+ int get_send_buffers(std::vector<Chunk*> &c, size_t bytes);
+ int get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes);
int is_tx_chunk(Chunk* c) { return send->all_chunks.count(c);}
int is_rx_chunk(Chunk* c) { return channel->all_chunks.count(c);}
+
bool enabled_huge_page;
+
private:
Cluster* channel;//RECV
Cluster* send;// SEND
public:
explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);
-
- /**
- * Destroy an Infiniband object.
- */
- ~Infiniband() {
- assert(ibv_destroy_srq(srq) == 0);
- delete memory_manager;
- delete pd;
- }
+ ~Infiniband();
class CompletionChannel {
static const uint32_t MAX_ACK_EVENT = 5000;
uint32_t cq_events_that_need_ack;
public:
- CompletionChannel(CephContext *c, Infiniband &ib)
- : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) {}
+ CompletionChannel(CephContext *c, Infiniband &ib);
~CompletionChannel();
int init();
bool get_cq_event();
int get_fd() { return channel->fd; }
ibv_comp_channel* get_channel() { return channel; }
void bind_cq(ibv_cq *c) { cq = c; }
- void ack_events() {
- ibv_ack_cq_events(cq, cq_events_that_need_ack);
- cq_events_that_need_ack = 0;
- }
+ void ack_events();
};
// this class encapsulates the creation, use, and destruction of an RC
* Get the remote queue pair number for this QueuePair, as set in #plumb().
* QPNs are analogous to UDP/TCP port numbers.
*/
- int get_remote_qp_number(uint32_t *rqp) const {
- ibv_qp_attr qpa;
- ibv_qp_init_attr qpia;
-
- int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);
- if (r) {
- lderr(cct) << __func__ << " failed to query qp: "
- << cpp_strerror(errno) << dendl;
- return -1;
- }
-
- if (rqp)
- *rqp = qpa.dest_qp_num;
- return 0;
- }
+ int get_remote_qp_number(uint32_t *rqp) const;
/**
* Get the remote infiniband address for this QueuePair, as set in #plumb().
* LIDs are "local IDs" in infiniband terminology. They are short, locally
* routable addresses.
*/
- int get_remote_lid(uint16_t *lid) const {
- ibv_qp_attr qpa;
- ibv_qp_init_attr qpia;
-
- int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);
- if (r) {
- lderr(cct) << __func__ << " failed to query qp: "
- << cpp_strerror(errno) << dendl;
- return -1;
- }
-
- if (lid)
- *lid = qpa.ah_attr.dlid;
- return 0;
- }
+ int get_remote_lid(uint16_t *lid) const;
/**
* Get the state of a QueuePair.
*/
- int get_state() const {
- ibv_qp_attr qpa;
- ibv_qp_init_attr qpia;
-
- int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);
- if (r) {
- lderr(cct) << __func__ << " failed to get state: "
- << cpp_strerror(errno) << dendl;
- return -1;
- }
- return qpa.qp_state;
- }
+ int get_state() const;
/**
* Return true if the queue pair is in an error state, false otherwise.
*/
- bool is_error() const {
- ibv_qp_attr qpa;
- ibv_qp_init_attr qpia;
-
- int r = ibv_query_qp(qp, &qpa, -1, &qpia);
- if (r) {
- lderr(cct) << __func__ << " failed to get state: "
- << cpp_strerror(errno) << dendl;
- return true;
- }
- return qpa.cur_qp_state == IBV_QPS_ERR;
- }
+ bool is_error() const;
ibv_qp* get_qp() const { return qp; }
Infiniband::CompletionQueue* get_tx_cq() const { return txcq; }
Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; }
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
int post_chunk(Chunk* chunk);
int post_channel_cluster();
- int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes) {
- return memory_manager->get_send_buffers(c, bytes);
- }
+ int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes);
CompletionChannel *create_comp_channel(CephContext *c);
CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
- uint8_t get_ib_physical_port() {
- return ib_physical_port;
- }
+ uint8_t get_ib_physical_port() { return ib_physical_port; }
int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
uint16_t get_lid() { return device->get_lid(); }
MemoryManager* get_memory_manager() { return memory_manager; }
Device* get_device() { return device; }
int get_async_fd() { return device->ctxt->async_fd; }
- int recall_chunk(Chunk* c) {
- if (memory_manager->is_rx_chunk(c)) {
- post_chunk(c);
- return 1;
- } else if (memory_manager->is_tx_chunk(c)) {
- vector<Chunk*> v;
- v.push_back(c);
- memory_manager->return_tx(v);
- return 2;
- }
- return -1;
- }
+ int recall_chunk(Chunk* c);
int is_tx_chunk(Chunk* c) { return memory_manager->is_tx_chunk(c); }
int is_rx_chunk(Chunk* c) { return memory_manager->is_rx_chunk(c); }
static const char* wc_status_to_string(int status);