]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
RDMA: Move defenitions from Infiniband.h into .cc
authorAmir Vadai <amir@vadai.me>
Mon, 2 Jan 2017 08:29:43 +0000 (10:29 +0200)
committerAmir Vadai <amir@vadai.me>
Mon, 2 Jan 2017 10:12:04 +0000 (12:12 +0200)
Signed-off-by: Amir Vadai <amir@vadai.me>
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h

index cec6998545f33735517b8108fb2aed21449fc84e..657f62158fcdafaf8d58bd4824b72117ced7bd94 100644 (file)
@@ -27,6 +27,69 @@ static const uint32_t MAX_INLINE_DATA = 128;
 static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
 static const uint32_t CQ_DEPTH = 30000;
 
+Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr)
+{
+  union ibv_gid cgid;
+  struct ibv_exp_gid_attr gid_attr;
+  bool malformed = false;
+
+  int r = ibv_query_port(ctxt, port_num, port_attr);
+  if (r == -1) {
+    lderr(cct) << __func__  << " query port failed  " << cpp_strerror(errno) << dendl;
+    ceph_abort();
+  }
+
+  lid = port_attr->lid;
+
+  // search for requested GID in GIDs table
+  ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid)
+    << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl;
+  r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(),
+            "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx"
+            ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx",
+            &cgid.raw[ 0], &cgid.raw[ 1],
+            &cgid.raw[ 2], &cgid.raw[ 3],
+            &cgid.raw[ 4], &cgid.raw[ 5],
+            &cgid.raw[ 6], &cgid.raw[ 7],
+            &cgid.raw[ 8], &cgid.raw[ 9],
+            &cgid.raw[10], &cgid.raw[11],
+            &cgid.raw[12], &cgid.raw[13],
+            &cgid.raw[14], &cgid.raw[15]);
+
+  if (r != 16) {
+    ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl;
+    malformed = true;
+  }
+
+  gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE;
+
+  for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) {
+    r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);
+    if (r) {
+      lderr(cct) << __func__  << " query gid of port " << port_num << " index " << gid_idx << " failed  " << cpp_strerror(errno) << dendl;
+      ceph_abort();
+    }
+    r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr);
+    if (r) {
+      lderr(cct) << __func__  << " query gid attributes of port " << port_num << " index " << gid_idx << " failed  " << cpp_strerror(errno) << dendl;
+      ceph_abort();
+    }
+
+    if (malformed) break; // stay with gid_idx=0
+    if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) &&
+        (memcmp(&gid, &cgid, 16) == 0) ) {
+      ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl;
+      break;
+    }
+  }
+
+  if (gid_idx == port_attr->gid_tbl_len) {
+    lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl;
+    ceph_abort();
+  }
+}
+
+
 Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr)
 {
   if (device == NULL) {
@@ -46,86 +109,594 @@ Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_
   }
 }
 
-Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) {
-   union ibv_gid cgid;
-   struct ibv_exp_gid_attr gid_attr;
-   bool malformed = false;
-
-   int r = ibv_query_port(ctxt, port_num, port_attr);
-   if (r == -1) {
-     lderr(cct) << __func__  << " query port failed  " << cpp_strerror(errno) << dendl;
-     ceph_abort();
-   }
-
-   lid = port_attr->lid;
-
-   // search for requested GID in GIDs table
-   ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid)
-       << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl;
-   r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(),
-       "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx"
-       ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx",
-     &cgid.raw[ 0], &cgid.raw[ 1],
-     &cgid.raw[ 2], &cgid.raw[ 3],
-     &cgid.raw[ 4], &cgid.raw[ 5],
-     &cgid.raw[ 6], &cgid.raw[ 7],
-     &cgid.raw[ 8], &cgid.raw[ 9],
-     &cgid.raw[10], &cgid.raw[11],
-     &cgid.raw[12], &cgid.raw[13],
-     &cgid.raw[14], &cgid.raw[15]);
-
-   if (r != 16) {
-     ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl;
-     malformed = true;
-   }
-
-   gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE;
-
-   for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) {
-     r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);
-     if (r) {
-       lderr(cct) << __func__  << " query gid of port " << port_num << " index " << gid_idx << " failed  " << cpp_strerror(errno) << dendl;
-       ceph_abort();
-     }
-     r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr);
-     if (r) {
-       lderr(cct) << __func__  << " query gid attributes of port " << port_num << " index " << gid_idx << " failed  " << cpp_strerror(errno) << dendl;
-       ceph_abort();
-     }
-
-     if (malformed) break; // stay with gid_idx=0
-     if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) &&
-          (memcmp(&gid, &cgid, 16) == 0) ) {
-       ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl;
-       break;
-     }
-   }
-
-   if (gid_idx == port_attr->gid_tbl_len) {
-     lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl;
-     ceph_abort();
-   }
- }
+void Device::binding_port(CephContext *cct, uint8_t port_num) {
+  port_cnt = device_attr->phys_port_cnt;
+  ports = new Port*[port_cnt];
+  for (uint8_t i = 0; i < port_cnt; ++i) {
+    ports[i] = new Port(cct, ctxt, i+1);
+    if (i+1 == port_num && ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) {
+      active_port = ports[i];
+      ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;
+      return ;
+    } else {
+      ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << ports[i]->get_port_attr()->state << ")"<< dendl;
+    }
+  }
+  if (nullptr == active_port) {
+    lderr(cct) << __func__ << "  port not found" << dendl;
+    assert(active_port);
+  }
+}
+
+
+Infiniband::QueuePair::QueuePair(
+    CephContext *c, Infiniband& infiniband, ibv_qp_type type,
+    int port, ibv_srq *srq,
+    Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
+    uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)
+: cct(c), infiniband(infiniband),
+  type(type),
+  ctxt(infiniband.device->ctxt),
+  ib_physical_port(port),
+  pd(infiniband.pd->pd),
+  srq(srq),
+  qp(NULL),
+  txcq(txcq),
+  rxcq(rxcq),
+  initial_psn(0),
+  max_send_wr(max_send_wr),
+  max_recv_wr(max_recv_wr),
+  q_key(q_key),
+  dead(false)
+{
+  initial_psn = lrand48() & 0xffffff;
+  if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {
+    lderr(cct) << __func__ << "invalid queue pair type" << cpp_strerror(errno) << dendl;
+    ceph_abort();
+  }
+  pd = infiniband.pd->pd;
+}
+
+int Infiniband::QueuePair::init()
+{
+  ldout(cct, 20) << __func__ << " started." << dendl;
+  ibv_qp_init_attr qpia;
+  memset(&qpia, 0, sizeof(qpia));
+  qpia.send_cq = txcq->get_cq();
+  qpia.recv_cq = rxcq->get_cq();
+  qpia.srq = srq;                      // use the same shared receive queue
+  qpia.cap.max_send_wr  = max_send_wr; // max outstanding send requests
+  qpia.cap.max_send_sge = 1;           // max send scatter-gather elements
+  qpia.cap.max_inline_data = MAX_INLINE_DATA;          // max bytes of immediate data on send q
+  qpia.qp_type = type;                 // RC, UC, UD, or XRC
+  qpia.sq_sig_all = 0;                 // only generate CQEs on requested WQEs
+
+  qp = ibv_create_qp(pd, &qpia);
+  if (qp == NULL) {
+    lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
+    lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_buffers or"
+       " ms_async_rdma_send_buffers" << dendl;
+    return -1;
+  }
+
+  ldout(cct, 20) << __func__ << " successfully create queue pair: "
+                 << "qp=" << qp << dendl;
+
+  // move from RESET to INIT state
+  ibv_qp_attr qpa;
+  memset(&qpa, 0, sizeof(qpa));
+  qpa.qp_state   = IBV_QPS_INIT;
+  qpa.pkey_index = 0;
+  qpa.port_num   = (uint8_t)(ib_physical_port);
+  qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;
+  qpa.qkey       = q_key;
+
+  int mask = IBV_QP_STATE | IBV_QP_PORT;
+  switch (type) {
+    case IBV_QPT_RC:
+      mask |= IBV_QP_ACCESS_FLAGS;
+      mask |= IBV_QP_PKEY_INDEX;
+      break;
+    case IBV_QPT_UD:
+      mask |= IBV_QP_QKEY;
+      mask |= IBV_QP_PKEY_INDEX;
+      break;
+    case IBV_QPT_RAW_PACKET:
+      break;
+    default:
+      ceph_abort();
+  }
+
+  int ret = ibv_modify_qp(qp, &qpa, mask);
+  if (ret) {
+    ibv_destroy_qp(qp);
+    lderr(cct) << __func__ << " failed to transition to INIT state: "
+               << cpp_strerror(errno) << dendl;
+    return -1;
+  }
+  ldout(cct, 20) << __func__ << " successfully change queue pair to INIT:"
+                 << " qp=" << qp << dendl;
+  return 0;
+}
+
+/**
+ * Change RC QueuePair into the ERROR state. This is necessary modify
+ * the Queue Pair into the Error state and poll all of the relevant
+ * Work Completions prior to destroying a Queue Pair.
+ * Since destroying a Queue Pair does not guarantee that its Work
+ * Completions are removed from the CQ upon destruction. Even if the
+ * Work Completions are already in the CQ, it might not be possible to
+ * retrieve them. If the Queue Pair is associated with an SRQ, it is
+ * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED
+ *
+ * \return
+ *      -errno if the QueuePair can't switch to ERROR
+ *      0 for success.
+ */
+int Infiniband::QueuePair::to_dead()
+{
+  if (dead)
+    return 0;
+  ibv_qp_attr qpa;
+  memset(&qpa, 0, sizeof(qpa));
+  qpa.qp_state = IBV_QPS_ERR;
+
+  int mask = IBV_QP_STATE;
+  int ret = ibv_modify_qp(qp, &qpa, mask);
+  if (ret) {
+    lderr(cct) << __func__ << " failed to transition to ERROR state: "
+               << cpp_strerror(errno) << dendl;
+    return -errno;
+  }
+  dead = true;
+  return ret;
+}
+
+int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp) const
+{
+  ibv_qp_attr qpa;
+  ibv_qp_init_attr qpia;
+
+  int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);
+  if (r) {
+    lderr(cct) << __func__ << " failed to query qp: "
+      << cpp_strerror(errno) << dendl;
+    return -1;
+  }
+
+  if (rqp)
+    *rqp = qpa.dest_qp_num;
+  return 0;
+}
+
+/**
+ * Get the remote infiniband address for this QueuePair, as set in #plumb().
+ * LIDs are "local IDs" in infiniband terminology. They are short, locally
+ * routable addresses.
+ */
+int Infiniband::QueuePair::get_remote_lid(uint16_t *lid) const
+{
+  ibv_qp_attr qpa;
+  ibv_qp_init_attr qpia;
+
+  int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);
+  if (r) {
+    lderr(cct) << __func__ << " failed to query qp: "
+      << cpp_strerror(errno) << dendl;
+    return -1;
+  }
+
+  if (lid)
+    *lid = qpa.ah_attr.dlid;
+  return 0;
+}
+
+/**
+ * Get the state of a QueuePair.
+ */
+int Infiniband::QueuePair::get_state() const
+{
+  ibv_qp_attr qpa;
+  ibv_qp_init_attr qpia;
+
+  int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);
+  if (r) {
+    lderr(cct) << __func__ << " failed to get state: "
+      << cpp_strerror(errno) << dendl;
+    return -1;
+  }
+  return qpa.qp_state;
+}
+
+/**
+ * Return true if the queue pair is in an error state, false otherwise.
+ */
+bool Infiniband::QueuePair::is_error() const
+{
+  ibv_qp_attr qpa;
+  ibv_qp_init_attr qpia;
+
+  int r = ibv_query_qp(qp, &qpa, -1, &qpia);
+  if (r) {
+    lderr(cct) << __func__ << " failed to get state: "
+      << cpp_strerror(errno) << dendl;
+    return true;
+  }
+  return qpa.cur_qp_state == IBV_QPS_ERR;
+}
+
+
+Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib)
+  : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0)
+{
+}
+
+Infiniband::CompletionChannel::~CompletionChannel()
+{
+  if (channel) {
+    int r = ibv_destroy_comp_channel(channel);
+    if (r < 0)
+      lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;
+    assert(r == 0);
+  }
+}
+
+int Infiniband::CompletionChannel::init()
+{
+  ldout(cct, 20) << __func__ << " started." << dendl;
+  channel = ibv_create_comp_channel(infiniband.device->ctxt);
+  if (!channel) {
+    lderr(cct) << __func__ << " failed to create receive completion channel: "
+                          << cpp_strerror(errno) << dendl;
+    return -1;
+  }
+  int rc = NetHandler(cct).set_nonblock(channel->fd);
+  if (rc < 0) {
+    ibv_destroy_comp_channel(channel);
+    return -1;
+  }
+  return 0;
+}
+
+void Infiniband::CompletionChannel::ack_events()
+{
+  ibv_ack_cq_events(cq, cq_events_that_need_ack);
+  cq_events_that_need_ack = 0;
+}
+
+bool Infiniband::CompletionChannel::get_cq_event()
+{
+  ibv_cq *cq = NULL;
+  void *ev_ctx;
+  if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {
+    if (errno != EAGAIN && errno != EINTR)
+      lderr(cct) << __func__ << " failed to retrieve CQ event: "
+                 << cpp_strerror(errno) << dendl;
+    return false;
+  }
+
+  /* accumulate number of cq events that need to
+   *    * be acked, and periodically ack them
+   *       */
+  if (++cq_events_that_need_ack == MAX_ACK_EVENT) {
+    ldout(cct, 20) << __func__ << " ack aq events." << dendl;
+    ibv_ack_cq_events(cq, MAX_ACK_EVENT);
+    cq_events_that_need_ack = 0;
+  }
+
+  return true;
+}
+
+
+Infiniband::CompletionQueue::~CompletionQueue()
+{
+  if (cq) {
+    int r = ibv_destroy_cq(cq);
+    if (r < 0)
+      lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;
+    assert(r == 0);
+  }
+}
+
+int Infiniband::CompletionQueue::init()
+{
+  cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);
+  if (!cq) {
+    lderr(cct) << __func__ << " failed to create receive completion queue: "
+      << cpp_strerror(errno) << dendl;
+    return -1;
+  }
+
+  if (ibv_req_notify_cq(cq, 0)) {
+    lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;
+    ibv_destroy_cq(cq);
+    cq = nullptr;
+    return -1;
+  }
+
+  channel->bind_cq(cq);
+  ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl;
+  return 0;
+}
+
+int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)
+{
+  ldout(cct, 20) << __func__ << " started." << dendl;
+  int r = ibv_req_notify_cq(cq, 0);
+  if (r < 0)
+    lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;
+  return r;
+}
+
+int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {
+  int r = ibv_poll_cq(cq, num_entries, ret_wc_array);
+  if (r < 0) {
+    lderr(cct) << __func__ << " poll_completion_queue occur met error: "
+      << cpp_strerror(errno) << dendl;
+    return -1;
+  }
+  return r;
+}
+
+
+Infiniband::ProtectionDomain::ProtectionDomain(CephContext *cct, Device *device)
+  : pd(ibv_alloc_pd(device->ctxt))
+{
+  if (pd == NULL) {
+    lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;
+    ceph_abort();
+  }
+}
+
+Infiniband::ProtectionDomain::~ProtectionDomain()
+{
+  int rc = ibv_dealloc_pd(pd);
+  assert(rc == 0);
+}
+
+
+Infiniband::MemoryManager::Chunk::Chunk(char* b, uint32_t len, ibv_mr* m)
+  : buffer(b), bytes(len), offset(0), mr(m)
+{
+}
+
+Infiniband::MemoryManager::Chunk::~Chunk()
+{
+  assert(ibv_dereg_mr(mr) == 0);
+}
+
+void Infiniband::MemoryManager::Chunk::set_offset(uint32_t o)
+{
+  offset = o;
+}
+
+uint32_t Infiniband::MemoryManager::Chunk::get_offset()
+{
+  return offset;
+}
+
+void Infiniband::MemoryManager::Chunk::set_bound(uint32_t b)
+{
+  bound = b;
+}
+
+void Infiniband::MemoryManager::Chunk::prepare_read(uint32_t b)
+{
+  offset = 0;
+  bound = b;
+}
+
+uint32_t Infiniband::MemoryManager::Chunk::get_bound()
+{
+  return bound;
+}
+
+uint32_t Infiniband::MemoryManager::Chunk::read(char* buf, uint32_t len)
+{
+  uint32_t left = bound - offset;
+  if (left >= len) {
+    memcpy(buf, buffer+offset, len);
+    offset += len;
+    return len;
+  } else {
+    memcpy(buf, buffer+offset, left);
+    offset = 0;
+    bound = 0;
+    return left;
+  }
+}
+
+uint32_t Infiniband::MemoryManager::Chunk::write(char* buf, uint32_t len)
+{
+  uint32_t left = bytes - offset;
+  if (left >= len) {
+    memcpy(buffer+offset, buf, len);
+    offset += len;
+    return len;
+  } else {
+    memcpy(buffer+offset, buf, left);
+    offset = bytes;
+    return left;
+  }
+}
+
+bool Infiniband::MemoryManager::Chunk::full()
+{
+  return offset == bytes;
+}
+
+bool Infiniband::MemoryManager::Chunk::over()
+{
+  return Infiniband::MemoryManager::Chunk::offset == bound;
+}
+
+void Infiniband::MemoryManager::Chunk::clear()
+{
+  offset = 0;
+  bound = 0;
+}
+
+void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib)
+{
+  ib->post_chunk(this);
+}
+
+void Infiniband::MemoryManager::Chunk::set_owner(uint64_t o)
+{
+  owner = o;
+}
+
+uint64_t Infiniband::MemoryManager::Chunk::get_owner()
+{
+  return owner;
+}
+
+
+Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
+  : manager(m), chunk_size(s), lock("cluster_lock")
+{
+}
+
+Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s, uint32_t n)
+  : manager(m), chunk_size(s), lock("cluster_lock")
+{
+  add(n);
+}
+
+Infiniband::MemoryManager::Cluster::~Cluster()
+{
+  set<Chunk*>::iterator c = all_chunks.begin();
+  while(c != all_chunks.end()) {
+    delete *c;
+    ++c;
+  }
+  if (manager.enabled_huge_page)
+    manager.free_huge_pages(base);
+  else
+    delete base;
+}
+
+int Infiniband::MemoryManager::Cluster::add(uint32_t num)
+{
+  uint32_t bytes = chunk_size * num;
+  //cihar* base = (char*)malloc(bytes);
+  if (manager.enabled_huge_page) {
+    base = (char*)manager.malloc_huge_pages(bytes);
+  } else {
+    base = (char*)memalign(CEPH_PAGE_SIZE, bytes);
+  }
+  assert(base);
+  for (uint32_t offset = 0; offset < bytes; offset += chunk_size){
+    ibv_mr* m = ibv_reg_mr(manager.pd->pd, base+offset, chunk_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
+    assert(m);
+    Chunk* c = new Chunk(base+offset,chunk_size,m);
+    free_chunks.push_back(c);
+    all_chunks.insert(c);
+  }
+  return 0;
+}
+
+void Infiniband::MemoryManager::Cluster::take_back(Chunk* ck)
+{
+  Mutex::Locker l(lock);
+  free_chunks.push_back(ck);
+}
+
+int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks, size_t bytes)
+{
+  uint32_t num = bytes / chunk_size + 1;
+  if (bytes % chunk_size == 0)
+    --num;
+  int r = num;
+  Mutex::Locker l(lock);
+  if (free_chunks.empty())
+    return 0;
+  if (!bytes) {
+    free_chunks.swap(chunks);
+    r = chunks.size();
+    return r;
+  }
+  if (free_chunks.size() < num) {
+    num = free_chunks.size();
+    r = num;
+  }
+  for (uint32_t i = 0; i < num; ++i) {
+    chunks.push_back(free_chunks.back());
+    free_chunks.pop_back();
+  }
+  return r;
+}
+
+
+Infiniband::MemoryManager::MemoryManager(Device *d, ProtectionDomain *p, bool hugepage)
+  : device(d), pd(p)
+{
+  enabled_huge_page = hugepage;
+}
+
+Infiniband::MemoryManager::~MemoryManager()
+{
+  if (channel)
+    delete channel;
+  if (send)
+    delete send;
+}
+
+void* Infiniband::MemoryManager::malloc_huge_pages(size_t size)
+{
+  size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);
+  char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);
+  if (ptr == MAP_FAILED) {
+    ptr = (char *)malloc(real_size);
+    if (ptr == NULL) return NULL;
+    real_size = 0;
+  }
+  *((size_t *)ptr) = real_size;
+  return ptr + HUGE_PAGE_SIZE;
+}
+
+void Infiniband::MemoryManager::free_huge_pages(void *ptr)
+{
+  if (ptr == NULL) return;
+  void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;
+  size_t real_size = *((size_t *)real_ptr);
+  assert(real_size % HUGE_PAGE_SIZE == 0);
+  if (real_size != 0)
+    munmap(real_ptr, real_size);
+  else
+    free(real_ptr);
+}
+
+void Infiniband::MemoryManager::register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num)
+{
+  assert(device);
+  assert(pd);
+  channel = new Cluster(*this, size);
+  channel->add(rx_num);
+
+  send = new Cluster(*this, size);
+  send->add(tx_num);
+}
+
+void Infiniband::MemoryManager::return_tx(std::vector<Chunk*> &chunks)
+{
+  for (auto c : chunks) {
+    c->clear();
+    send->take_back(c);
+  }
+}
+
+int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t bytes)
+{
+  return send->get_buffers(c, bytes);
+}
 
-void Device::binding_port(CephContext *cct, uint8_t port_num) {
-  port_cnt = device_attr->phys_port_cnt;
-  ports = new Port*[port_cnt];
-  for (uint8_t i = 0; i < port_cnt; ++i) {
-    ports[i] = new Port(cct, ctxt, i+1);
-    if (i+1 == port_num && ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) {
-      active_port = ports[i];
-      ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;
-      return ;
-    } else {
-      ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << ports[i]->get_port_attr()->state << ")"<< dendl;
-    }
-  }
-  if (nullptr == active_port) {
-    lderr(cct) << __func__ << "  port not found" << dendl;
-    assert(active_port);
-  }
+int Infiniband::MemoryManager::get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes)
+{
+  return channel->get_buffers(chunks, bytes);
 }
 
+
 Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num): device_list(cct)
 {
   device = device_list.get_device(device_name.c_str());
@@ -163,6 +734,13 @@ Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t
   post_channel_cluster();
 }
 
+Infiniband::~Infiniband()
+{
+  assert(ibv_destroy_srq(srq) == 0);
+  delete memory_manager;
+  delete pd;
+}
+
 /**
  * Create a shared receive queue. This basically wraps the verbs call. 
  *
@@ -183,6 +761,25 @@ ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_s
   return ibv_create_srq(pd->pd, &sia);
 }
 
+int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
+{
+  return memory_manager->get_send_buffers(c, bytes);
+}
+
+int Infiniband::recall_chunk(Chunk* c)
+{
+  if (memory_manager->is_rx_chunk(c)) {
+    post_chunk(c);  
+    return 1;
+  } else if (memory_manager->is_tx_chunk(c)) {
+    vector<Chunk*> v;
+    v.push_back(c);
+    memory_manager->return_tx(v);  
+    return 2;
+  }
+  return -1;
+}
+
 /**
  * Create a new QueuePair. This factory should be used in preference to
  * the QueuePair constructor directly, since this lets derivatives of
@@ -204,101 +801,6 @@ Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, Completio
   return qp;
 }
 
-int Infiniband::QueuePair::init()
-{
-  ldout(cct, 20) << __func__ << " started." << dendl;
-  ibv_qp_init_attr qpia;
-  memset(&qpia, 0, sizeof(qpia));
-  qpia.send_cq = txcq->get_cq();
-  qpia.recv_cq = rxcq->get_cq();
-  qpia.srq = srq;                      // use the same shared receive queue
-  qpia.cap.max_send_wr  = max_send_wr; // max outstanding send requests
-  qpia.cap.max_send_sge = 1;           // max send scatter-gather elements
-  qpia.cap.max_inline_data = MAX_INLINE_DATA;          // max bytes of immediate data on send q
-  qpia.qp_type = type;                 // RC, UC, UD, or XRC
-  qpia.sq_sig_all = 0;                 // only generate CQEs on requested WQEs
-
-  qp = ibv_create_qp(pd, &qpia);
-  if (qp == NULL) {
-    lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
-    lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_buffers or"
-       " ms_async_rdma_send_buffers" << dendl;
-    return -1;
-  }
-
-  ldout(cct, 20) << __func__ << " successfully create queue pair: "
-                 << "qp=" << qp << dendl;
-
-  // move from RESET to INIT state
-  ibv_qp_attr qpa;
-  memset(&qpa, 0, sizeof(qpa));
-  qpa.qp_state   = IBV_QPS_INIT;
-  qpa.pkey_index = 0;
-  qpa.port_num   = (uint8_t)(ib_physical_port);
-  qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;
-  qpa.qkey       = q_key;
-
-  int mask = IBV_QP_STATE | IBV_QP_PORT;
-  switch (type) {
-    case IBV_QPT_RC:
-      mask |= IBV_QP_ACCESS_FLAGS;
-      mask |= IBV_QP_PKEY_INDEX;
-      break;
-    case IBV_QPT_UD:
-      mask |= IBV_QP_QKEY;
-      mask |= IBV_QP_PKEY_INDEX;
-      break;
-    case IBV_QPT_RAW_PACKET:
-      break;
-    default:
-      ceph_abort();
-  }
-
-  int ret = ibv_modify_qp(qp, &qpa, mask);
-  if (ret) {
-    ibv_destroy_qp(qp);
-    lderr(cct) << __func__ << " failed to transition to INIT state: "
-               << cpp_strerror(errno) << dendl;
-    return -1;
-  }
-  ldout(cct, 20) << __func__ << " successfully change queue pair to INIT:"
-                 << " qp=" << qp << dendl;
-  return 0;
-}
-
-/**
- * Change RC QueuePair into the ERROR state. This is necessary modify
- * the Queue Pair into the Error state and poll all of the relevant
- * Work Completions prior to destroying a Queue Pair.
- * Since destroying a Queue Pair does not guarantee that its Work
- * Completions are removed from the CQ upon destruction. Even if the
- * Work Completions are already in the CQ, it might not be possible to
- * retrieve them. If the Queue Pair is associated with an SRQ, it is
- * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED
- *
- * \return
- *      -errno if the QueuePair can't switch to ERROR
- *      0 for success.
- */
-int Infiniband::QueuePair::to_dead()
-{
-  if (dead)
-    return 0;
-  ibv_qp_attr qpa;
-  memset(&qpa, 0, sizeof(qpa));
-  qpa.qp_state = IBV_QPS_ERR;
-
-  int mask = IBV_QP_STATE;
-  int ret = ibv_modify_qp(qp, &qpa, mask);
-  if (ret) {
-    lderr(cct) << __func__ << " failed to transition to ERROR state: "
-               << cpp_strerror(errno) << dendl;
-    return -errno;
-  }
-  dead = true;
-  return ret;
-}
-
 int Infiniband::post_chunk(Chunk* chunk)
 {
   ibv_sge isge;
@@ -354,35 +856,6 @@ Infiniband::CompletionQueue* Infiniband::create_comp_queue(
   return cq;
 }
 
-
-Infiniband::QueuePair::QueuePair(
-    CephContext *c, Infiniband& infiniband, ibv_qp_type type,
-    int port, ibv_srq *srq,
-    Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
-    uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)
-: cct(c), infiniband(infiniband),
-  type(type),
-  ctxt(infiniband.device->ctxt),
-  ib_physical_port(port),
-  pd(infiniband.pd->pd),
-  srq(srq),
-  qp(NULL),
-  txcq(txcq),
-  rxcq(rxcq),
-  initial_psn(0),
-  max_send_wr(max_send_wr),
-  max_recv_wr(max_recv_wr),
-  q_key(q_key),
-  dead(false)
-{
-  initial_psn = lrand48() & 0xffffff;
-  if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {
-    lderr(cct) << __func__ << "invalid queue pair type" << cpp_strerror(errno) << dendl;
-    ceph_abort();
-  }
-  pd = infiniband.pd->pd;
-}
-
 // 1 means no valid buffer read, 0 means got enough buffer
 // else return < 0 means error
 int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
@@ -476,106 +949,6 @@ Infiniband::QueuePair::~QueuePair()
     assert(!ibv_destroy_qp(qp));
 }
 
-Infiniband::CompletionChannel::~CompletionChannel()
-{
-  if (channel) {
-    int r = ibv_destroy_comp_channel(channel);
-    if (r < 0)
-      lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;
-    assert(r == 0);
-  }
-}
-
-Infiniband::CompletionQueue::~CompletionQueue()
-{
-  if (cq) {
-    int r = ibv_destroy_cq(cq);
-    if (r < 0)
-      lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;
-    assert(r == 0);
-  }
-}
-
-int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)
-{
-  ldout(cct, 20) << __func__ << " started." << dendl;
-  int r = ibv_req_notify_cq(cq, 0);
-  if (r < 0)
-    lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;
-  return r;
-}
-
-int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {
-  int r = ibv_poll_cq(cq, num_entries, ret_wc_array);
-  if (r < 0) {
-    lderr(cct) << __func__ << " poll_completion_queue occur met error: "
-      << cpp_strerror(errno) << dendl;
-    return -1;
-  }
-  return r;
-}
-
-bool Infiniband::CompletionChannel::get_cq_event()
-{
-  ibv_cq *cq = NULL;
-  void *ev_ctx;
-  if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {
-    if (errno != EAGAIN && errno != EINTR)
-      lderr(cct) << __func__ << " failed to retrieve CQ event: "
-                 << cpp_strerror(errno) << dendl;
-    return false;
-  }
-
-  /* accumulate number of cq events that need to
-   *    * be acked, and periodically ack them
-   *       */
-  if (++cq_events_that_need_ack == MAX_ACK_EVENT) {
-    ldout(cct, 20) << __func__ << " ack aq events." << dendl;
-    ibv_ack_cq_events(cq, MAX_ACK_EVENT);
-    cq_events_that_need_ack = 0;
-  }
-
-  return true;
-}
-
-int Infiniband::CompletionQueue::init()
-{
-  cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);
-  if (!cq) {
-    lderr(cct) << __func__ << " failed to create receive completion queue: "
-      << cpp_strerror(errno) << dendl;
-    return -1;
-  }
-
-  if (ibv_req_notify_cq(cq, 0)) {
-    lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;
-    ibv_destroy_cq(cq);
-    cq = nullptr;
-    return -1;
-  }
-
-  channel->bind_cq(cq);
-  ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl;
-  return 0;
-}
-
-int Infiniband::CompletionChannel::init()
-{
-  ldout(cct, 20) << __func__ << " started." << dendl;
-  channel = ibv_create_comp_channel(infiniband.device->ctxt);
-  if (!channel) {
-    lderr(cct) << __func__ << " failed to create receive completion channel: "
-                          << cpp_strerror(errno) << dendl;
-    return -1;
-  }
-  int rc = NetHandler(cct).set_nonblock(channel->fd);
-  if (rc < 0) {
-    ibv_destroy_comp_channel(channel);
-    return -1;
-  }
-  return 0;
-}
-
 /**
  * Given a string representation of the `status' field from Verbs
  * struct `ibv_wc'.
index fdee49e24d0d2c59665b752cfead667ab306fefb..d8e70c2ff80ed947ebdb5c7dd65629e0549d95a4 100644 (file)
@@ -127,18 +127,9 @@ class Infiniband {
  public:
   class ProtectionDomain {
    public:
-    explicit ProtectionDomain(CephContext *cct, Device *device)
-      : pd(ibv_alloc_pd(device->ctxt))
-    {
-      if (pd == NULL) {
-        lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;
-        ceph_abort();
-      }
-    }
-    ~ProtectionDomain() {
-      int rc = ibv_dealloc_pd(pd);
-      assert(rc == 0);
-    }
+    explicit ProtectionDomain(CephContext *cct, Device *device);
+    ~ProtectionDomain();
+
     ibv_pd* const pd;
   };
 
@@ -147,83 +138,22 @@ class Infiniband {
    public:
     class Chunk {
      public:
-      Chunk(char* b, uint32_t len, ibv_mr* m) : buffer(b), bytes(len), offset(0), mr(m) {}
-      ~Chunk() {
-        assert(ibv_dereg_mr(mr) == 0);
-      }
-
-      void set_offset(uint32_t o) {
-        offset = o;
-      }
-
-      uint32_t get_offset() {
-        return offset;
-      }
-
-      void set_bound(uint32_t b) {
-        bound = b;
-      }
-
-      void prepare_read(uint32_t b) {
-        offset = 0;
-        bound = b;
-      }
-
-      uint32_t get_bound() {
-        return bound;
-      }
-
-      uint32_t read(char* buf, uint32_t len) {
-        uint32_t left = bound - offset;
-        if (left >= len) {
-          memcpy(buf, buffer+offset, len);
-          offset += len;
-          return len;
-        } else {
-          memcpy(buf, buffer+offset, left);
-          offset = 0;
-          bound = 0;
-          return left;
-        }
-      }
-
-      uint32_t write(char* buf, uint32_t len) {
-        uint32_t left = bytes - offset;
-        if (left >= len) {
-          memcpy(buffer+offset, buf, len);
-          offset += len;
-          return len;
-        } else {
-          memcpy(buffer+offset, buf, left);
-          offset = bytes;
-          return left;
-        }
-      }
-
-      bool full() {
-        return offset == bytes;
-      }
-
-      bool over() {
-        return offset == bound;
-      }
-
-      void clear() {
-        offset = 0;
-        bound = 0;
-      }
-
-      void post_srq(Infiniband *ib) {
-        ib->post_chunk(this);
-      }
-
-      void set_owner(uint64_t o) {
-        owner = o;
-      }
-
-      uint64_t get_owner() {
-        return owner;
-      }
+      Chunk(char* b, uint32_t len, ibv_mr* m);
+      ~Chunk();
+
+      void set_offset(uint32_t o);
+      uint32_t get_offset();
+      void set_bound(uint32_t b);
+      void prepare_read(uint32_t b);
+      uint32_t get_bound();
+      uint32_t read(char* buf, uint32_t len);
+      uint32_t write(char* buf, uint32_t len);
+      bool full();
+      bool over();
+      void clear();
+      void post_srq(Infiniband *ib);
+      void set_owner(uint64_t o);
+      uint64_t get_owner();
 
      public:
       char* buffer;
@@ -236,69 +166,14 @@ class Infiniband {
 
     class Cluster {
      public:
-      Cluster(MemoryManager& m, uint32_t s) : manager(m), chunk_size(s), lock("cluster_lock"){}
-      Cluster(MemoryManager& m, uint32_t s, uint32_t n) : manager(m), chunk_size(s), lock("cluster_lock"){
-        add(n);
-      }
-
-      ~Cluster() {
-        set<Chunk*>::iterator c = all_chunks.begin();
-        while(c != all_chunks.end()) {
-          delete *c;
-          ++c;
-        }
-        if (manager.enabled_huge_page)
-          manager.free_huge_pages(base);
-        else
-          delete base;
-      }
-      int add(uint32_t num) {
-        uint32_t bytes = chunk_size * num;
-        //cihar* base = (char*)malloc(bytes);
-        if (manager.enabled_huge_page) {
-          base = (char*)manager.malloc_huge_pages(bytes);
-        } else {
-          base = (char*)memalign(CEPH_PAGE_SIZE, bytes);
-        }
-        assert(base);
-        for (uint32_t offset = 0; offset < bytes; offset += chunk_size){
-          ibv_mr* m = ibv_reg_mr(manager.pd->pd, base+offset, chunk_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
-          assert(m);
-          Chunk* c = new Chunk(base+offset,chunk_size,m);
-          free_chunks.push_back(c);
-          all_chunks.insert(c);
-        }
-        return 0;
-      }
+      Cluster(MemoryManager& m, uint32_t s);
+      Cluster(MemoryManager& m, uint32_t s, uint32_t n);
+      ~Cluster();
 
-      void take_back(Chunk* ck) {
-        Mutex::Locker l(lock);
-        free_chunks.push_back(ck);
-      }
+      int add(uint32_t num);
+      void take_back(Chunk* ck);
+      int get_buffers(std::vector<Chunk*> &chunks, size_t bytes);
 
-      int get_buffers(std::vector<Chunk*> &chunks, size_t bytes) {
-        uint32_t num = bytes / chunk_size + 1;
-        if (bytes % chunk_size == 0)
-          --num;
-        int r = num;
-        Mutex::Locker l(lock);
-        if (free_chunks.empty())
-          return 0;
-        if (!bytes) {
-          free_chunks.swap(chunks);
-          r = chunks.size();
-          return r;
-        }
-        if (free_chunks.size() < num) {
-          num = free_chunks.size();
-          r = num;
-        }
-        for (uint32_t i = 0; i < num; ++i) {
-          chunks.push_back(free_chunks.back());
-          free_chunks.pop_back();
-        }
-        return r;
-      }
       MemoryManager& manager;
       uint32_t chunk_size;
       Mutex lock;
@@ -307,63 +182,20 @@ class Infiniband {
       char* base;
     };
 
-    MemoryManager(Device *d, ProtectionDomain *p, bool hugepage): device(d), pd(p) {
-      enabled_huge_page = hugepage;
-    }
-    ~MemoryManager() {
-      if (channel)
-        delete channel;
-      if (send)
-        delete send;
-    }
-    void* malloc_huge_pages(size_t size) {
-      size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);
-      char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);
-      if (ptr == MAP_FAILED) {
-        ptr = (char *)malloc(real_size);
-        if (ptr == NULL) return NULL;
-        real_size = 0;
-      }
-      *((size_t *)ptr) = real_size;
-      return ptr + HUGE_PAGE_SIZE;
-    }
-    void free_huge_pages(void *ptr) {
-      if (ptr == NULL) return;
-      void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;
-      size_t real_size = *((size_t *)real_ptr);
-      assert(real_size % HUGE_PAGE_SIZE == 0);
-      if (real_size != 0)
-        munmap(real_ptr, real_size);
-      else
-        free(real_ptr);
-    }
-    void register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num) {
-      assert(device);
-      assert(pd);
-      channel = new Cluster(*this, size);
-      channel->add(rx_num);
-
-      send = new Cluster(*this, size);
-      send->add(tx_num);
-    }
-    void return_tx(std::vector<Chunk*> &chunks) {
-      for (auto c : chunks) {
-        c->clear();
-        send->take_back(c);
-      }
-    }
-
-    int get_send_buffers(std::vector<Chunk*> &c, size_t bytes) {
-      return send->get_buffers(c, bytes);
-    }
-
-    int get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes) {
-      return channel->get_buffers(chunks, bytes);
-    }
+    MemoryManager(Device *d, ProtectionDomain *p, bool hugepage);
+    ~MemoryManager();
 
+    void* malloc_huge_pages(size_t size);
+    void free_huge_pages(void *ptr);
+    void register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num);
+    void return_tx(std::vector<Chunk*> &chunks);
+    int get_send_buffers(std::vector<Chunk*> &c, size_t bytes);
+    int get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes);
     int is_tx_chunk(Chunk* c) { return send->all_chunks.count(c);}
     int is_rx_chunk(Chunk* c) { return channel->all_chunks.count(c);}
+
     bool enabled_huge_page;
+
    private:
     Cluster* channel;//RECV
     Cluster* send;// SEND
@@ -386,15 +218,7 @@ class Infiniband {
 
  public:
   explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);
-
-  /**
-   * Destroy an Infiniband object.
-   */
-  ~Infiniband() {
-    assert(ibv_destroy_srq(srq) == 0);
-    delete memory_manager;
-    delete pd;
-  }
+  ~Infiniband();
 
   class CompletionChannel {
     static const uint32_t MAX_ACK_EVENT = 5000;
@@ -405,18 +229,14 @@ class Infiniband {
     uint32_t cq_events_that_need_ack;
 
    public:
-    CompletionChannel(CephContext *c, Infiniband &ib)
-      : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) {}
+    CompletionChannel(CephContext *c, Infiniband &ib);
     ~CompletionChannel();
     int init();
     bool get_cq_event();
     int get_fd() { return channel->fd; }
     ibv_comp_channel* get_channel() { return channel; }
     void bind_cq(ibv_cq *c) { cq = c; }
-    void ack_events() {
-      ibv_ack_cq_events(cq, cq_events_that_need_ack);
-      cq_events_that_need_ack = 0;
-    }
+    void ack_events();
   };
 
   // this class encapsulates the creation, use, and destruction of an RC
@@ -475,71 +295,21 @@ class Infiniband {
      * Get the remote queue pair number for this QueuePair, as set in #plumb().
      * QPNs are analogous to UDP/TCP port numbers.
      */
-    int get_remote_qp_number(uint32_t *rqp) const {
-      ibv_qp_attr qpa;
-      ibv_qp_init_attr qpia;
-
-      int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);
-      if (r) {
-        lderr(cct) << __func__ << " failed to query qp: "
-          << cpp_strerror(errno) << dendl;
-        return -1;
-      }
-
-      if (rqp)
-        *rqp = qpa.dest_qp_num;
-      return 0;
-    }
+    int get_remote_qp_number(uint32_t *rqp) const;
     /**
      * Get the remote infiniband address for this QueuePair, as set in #plumb().
      * LIDs are "local IDs" in infiniband terminology. They are short, locally
      * routable addresses.
      */
-    int get_remote_lid(uint16_t *lid) const {
-      ibv_qp_attr qpa;
-      ibv_qp_init_attr qpia;
-
-      int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);
-      if (r) {
-        lderr(cct) << __func__ << " failed to query qp: "
-          << cpp_strerror(errno) << dendl;
-        return -1;
-      }
-
-      if (lid)
-        *lid = qpa.ah_attr.dlid;
-      return 0;
-    }
+    int get_remote_lid(uint16_t *lid) const;
     /**
      * Get the state of a QueuePair.
      */
-    int get_state() const {
-      ibv_qp_attr qpa;
-      ibv_qp_init_attr qpia;
-
-      int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);
-      if (r) {
-        lderr(cct) << __func__ << " failed to get state: "
-          << cpp_strerror(errno) << dendl;
-        return -1;
-      }
-      return qpa.qp_state;
-    }
+    int get_state() const;
     /**
      * Return true if the queue pair is in an error state, false otherwise.
      */
-    bool is_error() const {
-      ibv_qp_attr qpa;
-      ibv_qp_init_attr qpia;
-
-      int r = ibv_query_qp(qp, &qpa, -1, &qpia);
-      if (r) {
-        lderr(cct) << __func__ << " failed to get state: "
-                              << cpp_strerror(errno) << dendl;
-        return true;
-      }
-      return qpa.cur_qp_state == IBV_QPS_ERR;
-    }
+    bool is_error() const;
     ibv_qp* get_qp() const { return qp; }
     Infiniband::CompletionQueue* get_tx_cq() const { return txcq; }
     Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; }
@@ -571,14 +341,10 @@ class Infiniband {
   ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
   int post_chunk(Chunk* chunk);
   int post_channel_cluster();
-  int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes) {
-    return memory_manager->get_send_buffers(c, bytes);
-  }
+  int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes);
   CompletionChannel *create_comp_channel(CephContext *c);
   CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
-  uint8_t get_ib_physical_port() {
-    return ib_physical_port;
-  }
+  uint8_t get_ib_physical_port() { return ib_physical_port; }
   int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
   int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
   uint16_t get_lid() { return device->get_lid(); }
@@ -586,18 +352,7 @@ class Infiniband {
   MemoryManager* get_memory_manager() { return memory_manager; }
   Device* get_device() { return device; }
   int get_async_fd() { return device->ctxt->async_fd; }
-  int recall_chunk(Chunk* c) {
-    if (memory_manager->is_rx_chunk(c)) {
-      post_chunk(c);  
-      return 1;
-    } else if (memory_manager->is_tx_chunk(c)) {
-      vector<Chunk*> v;
-      v.push_back(c);
-      memory_manager->return_tx(v);  
-      return 2;
-    }
-    return -1;
-  }
+  int recall_chunk(Chunk* c);
   int is_tx_chunk(Chunk* c) { return memory_manager->is_tx_chunk(c); }
   int is_rx_chunk(Chunk* c) { return memory_manager->is_rx_chunk(c); }
   static const char* wc_status_to_string(int status);