]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: add rdma plugin support
authorHaomai Wang <haomai@xsky.com>
Fri, 15 Jul 2016 09:02:48 +0000 (17:02 +0800)
committerHaomai Wang <haomai@xsky.com>
Tue, 1 Nov 2016 07:34:04 +0000 (15:34 +0800)
Signed-off-by: Zhi Wang <wangzhi@xsky.com>
src/common/config_opts.h
src/msg/async/Stack.cc
src/msg/async/rdma/Infiniband.cc [new file with mode: 0644]
src/msg/async/rdma/Infiniband.h [new file with mode: 0644]
src/msg/async/rdma/RDMAConnectedSocketImpl.cc [new file with mode: 0644]
src/msg/async/rdma/RDMAServerSocketImpl.cc [new file with mode: 0644]
src/msg/async/rdma/RDMAStack.cc [new file with mode: 0644]
src/msg/async/rdma/RDMAStack.h [new file with mode: 0644]

index 34866893e653ad8aaee2990733ed9de160a38326..a433e5937d39ae2cbefb3f1a7f1b2cd38fda05f4 100644 (file)
@@ -213,6 +213,11 @@ OPTION(ms_async_set_affinity, OPT_BOOL, true)
 // core
 OPTION(ms_async_affinity_cores, OPT_STR, "")
 OPTION(ms_async_send_inline, OPT_BOOL, false)
+OPTION(ms_async_rdma_device_name, OPT_STR, "")
+OPTION(ms_async_rdma_enable_hugepage, OPT_BOOL, false)
+OPTION(ms_async_rdma_buffer_size, OPT_INT, 8192)
+OPTION(ms_async_rdma_send_buffers, OPT_U32, 32)
+OPTION(ms_async_rdma_receive_buffers, OPT_U32, 64)
 
 OPTION(inject_early_sigterm, OPT_BOOL, false)
 
index e267eb869e820f4e6758966ef6bec258d249ca3f..5537a39193656857cbe67e56d296e20e0c7e4d71 100644 (file)
@@ -17,6 +17,9 @@
 #include "common/Cond.h"
 #include "common/errno.h"
 #include "PosixStack.h"
+#ifdef HAVE_RDMA
+#include "rdma/RDMAStack.h"
+#endif
 
 #include "common/dout.h"
 #include "include/assert.h"
@@ -54,6 +57,10 @@ std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const string
 {
   if (t == "posix")
     return std::make_shared<PosixNetworkStack>(c, t);
+#ifdef HAVE_RDMA
+  else if (t == "rdma")
+    return std::make_shared<RDMAStack>(c, t);
+#endif
 
   return nullptr;
 }
@@ -62,6 +69,10 @@ Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned
 {
   if (type == "posix")
     return new PosixWorker(c, i);
+#ifdef HAVE_RDMA
+  else if (type == "rdma")
+    return new RDMAWorker(c, i);
+#endif
   return nullptr;
 }
 
diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc
new file mode 100644 (file)
index 0000000..fb34fec
--- /dev/null
@@ -0,0 +1,538 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- \r
+// vim: ts=8 sw=2 smarttab\r
+/*\r
+ * Ceph - scalable distributed file system\r
+ *\r
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>\r
+ *\r
+ * Author: Haomai Wang <haomaiwang@gmail.com>\r
+ *\r
+ * This is free software; you can redistribute it and/or\r
+ * modify it under the terms of the GNU Lesser General Public\r
+ * License version 2.1, as published by the Free Software\r
+ * Foundation.  See file COPYING.\r
+ *\r
+ */\r
+\r
+#include "Infiniband.h"\r
+#include "common/errno.h"\r
+#include "common/debug.h"\r
+#include "RDMAStack.h"\r
+\r
+#define dout_subsys ceph_subsys_ms\r
+#undef dout_prefix\r
+#define dout_prefix *_dout << "Infiniband "\r
+\r
+static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;\r
+static const uint32_t MAX_INLINE_DATA = 128;\r
+static const uint32_t UDP_MSG_LEN = sizeof("0000:00000000:00000000:00000000000000000000000000000000") - 1;\r
+\r
+Device::Device(CephContext *c, ibv_device* d): cct(c), device(d), device_attr(new ibv_device_attr)\r
+{\r
+  if (device == NULL) {\r
+    lderr(cct) << __func__ << "device == NULL" << cpp_strerror(errno) << dendl;\r
+    assert(0);\r
+  }\r
+  name = ibv_get_device_name(device);\r
+  ctxt = ibv_open_device(device);\r
+  if (ctxt == NULL) {\r
+    lderr(cct) << __func__ << "open rdma device failed. " << cpp_strerror(errno) << dendl;\r
+    assert(0);\r
+  }\r
+  int r = ibv_query_device(ctxt, device_attr);\r
+  if (r == -1) {\r
+    lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;\r
+    assert(0);\r
+  }\r
+  port_cnt = device_attr->phys_port_cnt;\r
+  ports = new Port*[port_cnt];\r
+  for (uint8_t i = 0; i < port_cnt; ++i) {\r
+    ports[i] = new Port(cct, ctxt, i+1);\r
+    if (ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) {\r
+      active_port = ports[i];\r
+      ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;\r
+    } else {\r
+      ldout(cct, 10) << __func__ << " port " << i+1 << " is unactive(" << ports[i]->get_port_attr()->state << ")"<< dendl;\r
+    }\r
+  }\r
+  if (!active_port) {\r
+    lderr(cct) << __func__ << " no active port found" << dendl;\r
+    assert(active_port);\r
+  }\r
+}\r
+\r
+Infiniband::Infiniband(RDMAStack* s, CephContext *c, const std::string &device_name): cct(c), device_list(c), net(c), stack(s)\r
+{\r
+  device = device_list.get_device(device_name.c_str());\r
+  assert(device);\r
+  ib_physical_port = device->active_port->get_port_num();\r
+  pd = new ProtectionDomain(cct, device);\r
+  assert(net.set_nonblock(device->ctxt->async_fd) == 0);\r
+\r
+  max_recv_wr = device->device_attr->max_srq_wr;\r
+  if (max_recv_wr < cct->_conf->ms_async_rdma_receive_buffers) {\r
+    ldout(cct, 0) << __func__ << " max allowed receive buffers is " << max_recv_wr << " use this instead." << dendl;\r
+    max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers;\r
+  }\r
+  max_send_wr = device->device_attr->max_qp_wr;\r
+  if (max_send_wr < cct->_conf->ms_async_rdma_send_buffers) {\r
+    ldout(cct, 0) << __func__ << " max allowed send buffers is " << max_send_wr << " use this instead." << dendl;\r
+    max_send_wr = cct->_conf->ms_async_rdma_send_buffers;\r
+  }\r
+\r
+  ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe\r
+                << " completion entries" << dendl;\r
+\r
+  memory_manager = new MemoryManager(cct, device, pd);\r
+  memory_manager->register_rx_tx(\r
+      cct->_conf->ms_async_rdma_buffer_size,\r
+      cct->_conf->ms_async_rdma_receive_buffers,\r
+      cct->_conf->ms_async_rdma_send_buffers);\r
+\r
+  srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);\r
+  post_channel_cluster();\r
+}\r
+\r
+/**\r
+ * Create a shared receive queue. This basically wraps the verbs call. \r
+ *\r
+ * \param[in] max_wr\r
+ *      The max number of outstanding work requests in the SRQ.\r
+ * \param[in] max_sge\r
+ *      The max number of scatter elements per WR.\r
+ * \return\r
+ *      A valid ibv_srq pointer, or NULL on error.\r
+ */\r
+ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)\r
+{\r
+  ldout(cct, 20) << __func__ << " max_wr=" << max_wr << " max_sge=" << max_sge << dendl;\r
+  ibv_srq_init_attr sia;\r
+  memset(&sia, 0, sizeof(sia));\r
+  sia.srq_context = device->ctxt;\r
+  sia.attr.max_wr = max_wr;\r
+  sia.attr.max_sge = max_sge;\r
+  return ibv_create_srq(pd->pd, &sia);\r
+}\r
+\r
+/**\r
+ * Create a new QueuePair. This factory should be used in preference to\r
+ * the QueuePair constructor directly, since this lets derivatives of\r
+ * Infiniband, e.g. MockInfiniband (if it existed),\r
+ * return mocked out QueuePair derivatives.\r
+ *\r
+ * \return\r
+ *      QueuePair on success or NULL if init fails\r
+ * See QueuePair::QueuePair for parameter documentation.\r
+ */\r
+Infiniband::QueuePair* Infiniband::create_queue_pair(ibv_qp_type type)\r
+{\r
+  Infiniband::CompletionChannel* cc = create_comp_channel();\r
+  if (!cc)\r
+    return NULL;\r
+\r
+  Infiniband::CompletionQueue* cq = create_comp_queue(cc);\r
+  if (!cq) {\r
+    delete cc;\r
+    lderr(cct) << __func__ << " failed to create cq." << dendl;\r
+    return NULL;\r
+  }\r
+\r
+  RDMAWorker* w = static_cast<RDMAWorker*>(stack->get_worker());\r
+  Infiniband::QueuePair *qp = new QueuePair(*this, type, ib_physical_port, srq, w->get_tx_cq(), cq, max_send_wr, max_recv_wr);\r
+  if (qp->init()) {\r
+    delete cc;\r
+    delete cq;\r
+    delete qp;\r
+    return NULL;\r
+  }\r
+  return qp;\r
+}\r
+\r
+int Infiniband::QueuePair::init()\r
+{\r
+  ldout(infiniband.cct, 20) << __func__ << " started." << dendl;\r
+  ibv_qp_init_attr qpia;\r
+  memset(&qpia, 0, sizeof(qpia));\r
+  qpia.send_cq = txcq->get_cq();\r
+  qpia.recv_cq = rxcq->get_cq();\r
+  qpia.srq = srq;                      // use the same shared receive queue\r
+  qpia.cap.max_send_wr  = max_send_wr; // max outstanding send requests\r
+  qpia.cap.max_send_sge = 1;           // max send scatter-gather elements\r
+  qpia.cap.max_inline_data = MAX_INLINE_DATA;          // max bytes of immediate data on send q\r
+  qpia.qp_type = type;                 // RC, UC, UD, or XRC\r
+  qpia.sq_sig_all = 0;                 // only generate CQEs on requested WQEs\r
+\r
+  qp = ibv_create_qp(pd, &qpia);\r
+  if (qp == NULL) {\r
+    lderr(infiniband.cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;\r
+    return -1;\r
+  }\r
+\r
+  ldout(infiniband.cct, 20) << __func__ << " successfully create queue pair: "\r
+    << "qp=" << qp << dendl;\r
+\r
+  // move from RESET to INIT state\r
+  ibv_qp_attr qpa;\r
+  memset(&qpa, 0, sizeof(qpa));\r
+  qpa.qp_state   = IBV_QPS_INIT;\r
+  qpa.pkey_index = 0;\r
+  qpa.port_num   = (uint8_t)(ib_physical_port);\r
+  qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;\r
+  qpa.qkey       = q_key;\r
+\r
+  int mask = IBV_QP_STATE | IBV_QP_PORT;\r
+  switch (type) {\r
+    case IBV_QPT_RC:\r
+      mask |= IBV_QP_ACCESS_FLAGS;\r
+      mask |= IBV_QP_PKEY_INDEX;\r
+      break;\r
+    case IBV_QPT_UD:\r
+      mask |= IBV_QP_QKEY;\r
+      mask |= IBV_QP_PKEY_INDEX;\r
+      break;\r
+    case IBV_QPT_RAW_PACKET:\r
+      break;\r
+    default:\r
+      assert(0);\r
+  }\r
+\r
+  int ret = ibv_modify_qp(qp, &qpa, mask);\r
+  if (ret) {\r
+    ibv_destroy_qp(qp);\r
+    lderr(infiniband.cct) << __func__ << " failed to transition to INIT state: "\r
+      << cpp_strerror(errno) << dendl;\r
+    return -1;\r
+  }\r
+  ldout(infiniband.cct, 20) << __func__ << " successfully change queue pair to INIT:"\r
+    << " qp=" << qp << dendl;\r
+  return 0;\r
+}\r
+\r
+int Infiniband::post_chunk(Chunk* chunk)\r
+{\r
+  ibv_sge isge;\r
+  isge.addr = reinterpret_cast<uint64_t>(chunk->buffer);\r
+  isge.length = chunk->bytes;\r
+  isge.lkey = chunk->mr->lkey;\r
+  ibv_recv_wr rx_work_request;\r
+\r
+  memset(&rx_work_request, 0, sizeof(rx_work_request));\r
+  rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr\r
+  rx_work_request.next = NULL;\r
+  rx_work_request.sg_list = &isge;\r
+  rx_work_request.num_sge = 1;\r
+\r
+  ibv_recv_wr *badWorkRequest;\r
+  int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);\r
+  if (ret) {\r
+    lderr(cct) << __func__ << " ib_post_srq_recv failed on post "\r
+      << cpp_strerror(errno) << dendl;\r
+    return -1;\r
+  }\r
+  return 0;\r
+}\r
+\r
+int Infiniband::post_channel_cluster()\r
+{\r
+  vector<Chunk*> free_chunks;\r
+  int r = memory_manager->get_channel_buffers(free_chunks, 0);\r
+  assert(r == 0);\r
+  for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {\r
+    r = post_chunk(*iter);\r
+    assert(r == 0);\r
+  }\r
+  ldout(cct, 20) << __func__ << " posted buffers to srq. "<< dendl;\r
+  return 0;\r
+}\r
+\r
+Infiniband::CompletionChannel* Infiniband::create_comp_channel()\r
+{\r
+  ldout(cct, 20) << __func__ << " started." << dendl;\r
+  Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(*this);\r
+  if (cc->init()) {\r
+    delete cc;\r
+    return NULL;\r
+  }\r
+  return cc;\r
+}\r
+\r
+Infiniband::CompletionQueue* Infiniband::create_comp_queue(CompletionChannel *cc)\r
+{\r
+  ldout(cct, 20) << __func__ << " completion channel=" << cc << dendl;\r
+  Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(*this, max_recv_wr, cc);\r
+  if (cq->init()) {\r
+    delete cq;\r
+    return NULL;\r
+  }\r
+  return cq;\r
+}\r
+\r
+\r
+Infiniband::QueuePair::QueuePair(\r
+    Infiniband& infiniband, ibv_qp_type type, int port, ibv_srq *srq,\r
+    Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,\r
+    uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)\r
+: infiniband(infiniband),\r
+  type(type),\r
+  ctxt(infiniband.device->ctxt),\r
+  ib_physical_port(port),\r
+  pd(infiniband.pd->pd),\r
+  srq(srq),\r
+  qp(NULL),\r
+  txcq(txcq),\r
+  rxcq(rxcq),\r
+  initial_psn(0),\r
+  max_send_wr(max_send_wr),\r
+  max_recv_wr(max_recv_wr),\r
+  q_key(q_key)\r
+{\r
+  initial_psn = lrand48() & 0xffffff;\r
+  if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {\r
+    lderr(infiniband.cct) << __func__ << "invalid queue pair type" << cpp_strerror(errno) << dendl;\r
+    assert(0);\r
+  }\r
+  pd = infiniband.pd->pd;\r
+}\r
+\r
+// 1 means no valid buffer read, 0 means got enough buffer\r
+// else return < 0 means error\r
+int Infiniband::recv_udp_msg(int sd, IBSYNMsg& im, entity_addr_t *addr)\r
+{\r
+  assert(sd >= 0);\r
+  ssize_t r;\r
+  entity_addr_t socket_addr;\r
+  struct sockaddr from;\r
+  socklen_t slen = sizeof(from);\r
+  char msg[UDP_MSG_LEN];\r
+  char gid[32];\r
+  r = ::recvfrom(sd, &msg, sizeof(msg), 0, &from, &slen);\r
+  // Drop incoming qpt\r
+  if (cct->_conf->ms_inject_socket_failures && sd >= 0) {\r
+    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {\r
+      ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;\r
+      r = -1;\r
+    }\r
+  }\r
+  if (r == -1) {\r
+    lderr(cct) << __func__ << " recv got error " << errno << ": "\r
+      << cpp_strerror(errno) << dendl;\r
+    return -1;\r
+  } else if ((size_t)r != sizeof(msg)) { // valid message length\r
+    lderr(cct) << __func__ << " recv got bad length (" << r << ")." << cpp_strerror(errno) << dendl;\r
+    return 1;\r
+  } else { // valid message\r
+    socket_addr.set_sockaddr(&from);\r
+    if (addr) {\r
+      *addr = socket_addr;\r
+    }\r
+    sscanf(msg, "%04x:%08x:%08x:%s", &(im.lid), &(im.qpn), &(im.psn), gid);\r
+    wire_gid_to_gid(gid, &(im.gid));\r
+    ldout(cct, 10) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << gid  << dendl;\r
+    return 0;\r
+  }\r
+}\r
+\r
+int Infiniband::send_udp_msg(int sd, IBSYNMsg& im, entity_addr_t &peeraddr)\r
+{\r
+  assert(sd >= 0);\r
+  int retry = 0;\r
+  ssize_t r;\r
+\r
+  char msg[UDP_MSG_LEN];\r
+  char gid[32];\r
+retry:\r
+  gid_to_wire_gid(&(im.gid), gid);\r
+  r = snprintf(msg, UDP_MSG_LEN, "%04x:%08x:%08x:%s", im.lid, im.qpn, im.psn, gid);\r
+  ldout(cct, 20) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << gid << " r=" << r << dendl;\r
+  r = ::sendto(sd, msg, sizeof(msg), 0, peeraddr.get_sockaddr(),\r
+               peeraddr.get_sockaddr_len());\r
+  // Drop incoming qpt\r
+  if (cct->_conf->ms_inject_socket_failures && sd >= 0) {\r
+    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {\r
+      ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;\r
+      r = -1;\r
+    }\r
+  }\r
+\r
+  if ((size_t)r != sizeof(msg)) {\r
+    if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {\r
+      retry++;\r
+      goto retry;\r
+    }\r
+    if (r < 0)\r
+      lderr(cct) << __func__ << " send returned error " << errno << ": "\r
+        << cpp_strerror(errno) << dendl;\r
+    else\r
+      lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;\r
+    return -1;\r
+  }\r
+  return 0;\r
+}\r
+\r
+void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid)\r
+{\r
+  char tmp[9];\r
+  uint32_t v32;\r
+  int i;\r
+\r
+  for (tmp[8] = 0, i = 0; i < 4; ++i) {\r
+    memcpy(tmp, wgid + i * 8, 8);\r
+    sscanf(tmp, "%x", &v32);\r
+    *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32);\r
+  }\r
+}\r
+\r
+void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[])\r
+{\r
+  for (int i = 0; i < 4; ++i)\r
+    sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4)));\r
+}\r
+\r
+Infiniband::QueuePair::~QueuePair()\r
+{\r
+  if (qp)\r
+    assert(!ibv_destroy_qp(qp));\r
+  ldout(infiniband.cct, 20) << __func__ << " successfully destroyed QueuePair." << dendl;\r
+}\r
+\r
+Infiniband::CompletionChannel::~CompletionChannel()\r
+{\r
+  if (channel) {\r
+    int r = ibv_destroy_comp_channel(channel);\r
+    ldout(infiniband.cct, 20) << __func__ << " r: " << r << dendl;\r
+    assert(r == 0);\r
+  }\r
+  ldout(infiniband.cct, 20) << __func__ << " successfully destroyed CompletionChannel." << dendl;\r
+}\r
+\r
+Infiniband::CompletionQueue::~CompletionQueue()\r
+{\r
+  if (cq) {\r
+    int r = ibv_destroy_cq(cq);\r
+    ldout(infiniband.cct, 20) << __func__ << " r: " << cpp_strerror(errno) << dendl;\r
+    assert(r == 0);\r
+  }\r
+  ldout(infiniband.cct, 20) << __func__ << " successfully destroyed CompletionQueue." << dendl;\r
+}\r
+\r
+int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)\r
+{\r
+  ldout(infiniband.cct, 20) << __func__ << " started." << dendl;\r
+  int r = ibv_req_notify_cq(cq, 0);\r
+  if (r) {\r
+    lderr(infiniband.cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;\r
+  }\r
+  return r;\r
+}\r
+\r
+int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {\r
+  int r = ibv_poll_cq(cq, num_entries, ret_wc_array);\r
+  if (r < 0) {\r
+    lderr(infiniband.cct) << __func__ << " poll_completion_queue occur met error: "\r
+      << cpp_strerror(errno) << dendl;\r
+    return -1;\r
+  }\r
+  return r;\r
+}\r
+\r
+bool Infiniband::CompletionChannel::get_cq_event()\r
+{\r
+  //  ldout(infiniband.cct, 21) << __func__ << " started." << dendl;\r
+  ibv_cq *cq = NULL;\r
+  void *ev_ctx;\r
+  if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {\r
+    if (errno != EAGAIN && errno != EINTR)\r
+      lderr(infiniband.cct) << __func__ << "failed to retrieve CQ event: "\r
+        << cpp_strerror(errno) << dendl;\r
+    return false;\r
+  }\r
+\r
+  /* accumulate number of cq events that need to\r
+   *    * be acked, and periodically ack them\r
+   *       */\r
+  if (++cq_events_that_need_ack == MAX_ACK_EVENT) {\r
+    ldout(infiniband.cct, 20) << __func__ << " ack aq events." << dendl;\r
+    ibv_ack_cq_events(cq, MAX_ACK_EVENT);\r
+    cq_events_that_need_ack = 0;\r
+  }\r
+\r
+  return true;\r
+}\r
+\r
+int Infiniband::CompletionQueue::init()\r
+{\r
+  cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);\r
+  if (!cq) {\r
+    lderr(infiniband.cct) << __func__ << " failed to create receive completion queue: "\r
+      << cpp_strerror(errno) << dendl;\r
+    return -1;\r
+  }\r
+\r
+  if (ibv_req_notify_cq(cq, 0)) {\r
+    lderr(infiniband.cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;\r
+    ibv_destroy_cq(cq);\r
+    return -1;\r
+  }\r
+\r
+  channel->bind_cq(cq);\r
+  ldout(infiniband.cct, 20) << __func__ << " successfully create cq=" << cq << dendl;\r
+  return 0;\r
+}\r
+\r
+int Infiniband::CompletionChannel::init()\r
+{\r
+  ldout(infiniband.cct, 20) << __func__ << " started." << dendl;\r
+  channel = ibv_create_comp_channel(infiniband.device->ctxt);\r
+  if (!channel) {\r
+    lderr(infiniband.cct) << __func__ << " failed to create receive completion channel: "\r
+      << cpp_strerror(errno) << dendl;\r
+    return -1;\r
+  }\r
+  int rc = infiniband.net.set_nonblock(channel->fd);\r
+  if (rc < 0) {\r
+    ibv_destroy_comp_channel(channel);\r
+    return -1;\r
+  }\r
+  return 0;\r
+}\r
+\r
+/**\r
+ * Given a string representation of the `status' field from Verbs\r
+ * struct `ibv_wc'.\r
+ *\r
+ * \param[in] status\r
+ *      The integer status obtained in ibv_wc.status.\r
+ * \return\r
+ *      A string corresponding to the given status.\r
+ */\r
+const char* Infiniband::wc_status_to_string(int status)\r
+{\r
+  static const char *lookup[] = {\r
+      "SUCCESS",\r
+      "LOC_LEN_ERR",\r
+      "LOC_QP_OP_ERR",\r
+      "LOC_EEC_OP_ERR",\r
+      "LOC_PROT_ERR",\r
+      "WR_FLUSH_ERR",\r
+      "MW_BIND_ERR",\r
+      "BAD_RESP_ERR",\r
+      "LOC_ACCESS_ERR",\r
+      "REM_INV_REQ_ERR",\r
+      "REM_ACCESS_ERR",\r
+      "REM_OP_ERR",\r
+      "RETRY_EXC_ERR",\r
+      "RNR_RETRY_EXC_ERR",\r
+      "LOC_RDD_VIOL_ERR",\r
+      "REM_INV_RD_REQ_ERR",\r
+      "REM_ABORT_ERR",\r
+      "INV_EECN_ERR",\r
+      "INV_EEC_STATE_ERR",\r
+      "FATAL_ERR",\r
+      "RESP_TIMEOUT_ERR",\r
+      "GENERAL_ERR"\r
+  };\r
+\r
+  if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR)\r
+      return "<status out of range!>";\r
+  return lookup[status];\r
+}\r
diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h
new file mode 100644 (file)
index 0000000..22dee79
--- /dev/null
@@ -0,0 +1,598 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- \r
+// vim: ts=8 sw=2 smarttab\r
+/*\r
+ * Ceph - scalable distributed file system\r
+ *\r
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>\r
+ *\r
+ * Author: Haomai Wang <haomaiwang@gmail.com>\r
+ *\r
+ * This is free software; you can redistribute it and/or\r
+ * modify it under the terms of the GNU Lesser General Public\r
+ * License version 2.1, as published by the Free Software\r
+ * Foundation.  See file COPYING.\r
+ *\r
+ */\r
+\r
+#ifndef CEPH_INFINIBAND_H\r
+#define CEPH_INFINIBAND_H\r
+\r
+#include <infiniband/verbs.h>\r
+\r
+#include <string>\r
+#include <vector>\r
+\r
+#include "include/int_types.h"\r
+#include "include/page.h"\r
+#include "common/debug.h"\r
+#include "common/errno.h"\r
+#include "msg/msg_types.h"\r
+#include "msg/async/net_handler.h"\r
+#include "common/Mutex.h"\r
+\r
+#define HUGE_PAGE_SIZE (2 * 1024 * 1024)\r
+#define ALIGN_TO_PAGE_SIZE(x) \\r
+  (((x) + HUGE_PAGE_SIZE -1) / HUGE_PAGE_SIZE * HUGE_PAGE_SIZE)\r
+\r
+struct IBSYNMsg {\r
+  uint16_t lid;\r
+  uint32_t qpn;\r
+  uint32_t psn;\r
+  union ibv_gid gid;\r
+} __attribute__((packed));\r
+\r
+class RDMAStack;\r
+class CephContext;\r
+\r
+class Port {\r
+  CephContext *cct;\r
+  struct ibv_context* ctxt;\r
+  uint8_t port_num;\r
+  struct ibv_port_attr* port_attr;\r
+  int gid_tbl_len;\r
+  uint16_t lid;\r
+  union ibv_gid gid;\r
+\r
+ public:\r
+  explicit Port(CephContext *c, struct ibv_context* ictxt, uint8_t ipn): cct(c), ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) {\r
+    int r = ibv_query_port(ctxt, port_num, port_attr);\r
+    if (r == -1) {\r
+      lderr(cct) << __func__  << " query port failed  " << cpp_strerror(errno) << dendl;\r
+      assert(0);\r
+    }\r
+\r
+    lid = port_attr->lid;\r
+    r = ibv_query_gid(ctxt, port_num, 0, &gid);\r
+    if (r) {\r
+      lderr(cct) << __func__  << " query gid failed  " << cpp_strerror(errno) << dendl;\r
+      assert(0);\r
+    }\r
+  }\r
+\r
+  uint16_t get_lid() { return lid; }\r
+  ibv_gid  get_gid() { return gid; }\r
+  uint8_t get_port_num() { return port_num; }\r
+  ibv_port_attr* get_port_attr() { return port_attr; }\r
+};\r
+\r
+\r
+class Device {\r
+  CephContext *cct;\r
+  ibv_device *device;\r
+  const char* name;\r
+  uint8_t  port_cnt;\r
+  Port** ports;\r
+ public:\r
+  explicit Device(CephContext *c, ibv_device* d);\r
+  ~Device() {\r
+    for (uint8_t i = 0; i < port_cnt; ++i)\r
+      delete ports[i];\r
+    delete []ports;\r
+    assert(ibv_close_device(ctxt) == 0);\r
+  }\r
+  const char* get_name() { return name;}\r
+  uint16_t get_lid() { return active_port->get_lid(); }\r
+  ibv_gid get_gid() { return active_port->get_gid(); }\r
+  struct ibv_context *ctxt;\r
+  ibv_device_attr *device_attr;\r
+  Port* active_port;\r
+};\r
+\r
+\r
+class DeviceList {\r
+  CephContext *cct;\r
+  struct ibv_device ** device_list;\r
+  int num;\r
+  Device** devices;\r
+ public:\r
+  DeviceList(CephContext *c): cct(c), device_list(ibv_get_device_list(&num)) {\r
+    if (device_list == NULL || num == 0) {\r
+      lderr(cct) << __func__ << " failed to get rdma device list.  " << cpp_strerror(errno) << dendl;\r
+      assert(0);\r
+    }\r
+    devices = new Device*[num];\r
+\r
+    for (int i = 0;i < num; ++i) {\r
+      devices[i] = new Device(cct, device_list[i]);\r
+    }\r
+  }\r
+  ~DeviceList() {\r
+    for (int i=0; devices[i] != NULL; ++i) {\r
+      delete devices[i];\r
+    }\r
+    delete []devices;\r
+    ibv_free_device_list(device_list);\r
+  }\r
+\r
+  Device* get_device(const char* device_name) {\r
+    assert(devices);\r
+    for (int i = 0; i < num; ++i) {\r
+      if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) {\r
+        return devices[i];\r
+      }\r
+    }\r
+    return NULL;\r
+  }\r
+};\r
+\r
+\r
+class Infiniband {\r
+ public:\r
+  class ProtectionDomain {\r
+   public:\r
+    explicit ProtectionDomain(CephContext *c, Device *device)\r
+      : cct(c), pd(ibv_alloc_pd(device->ctxt))\r
+    {\r
+      if (pd == NULL) {\r
+        lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;\r
+        assert(0);\r
+      }\r
+    }\r
+    ~ProtectionDomain() {\r
+      int rc = ibv_dealloc_pd(pd);\r
+      if (rc != 0) {\r
+        lderr(cct) << __func__ << " ibv_dealloc_pd failed: "\r
+          << cpp_strerror(errno) << dendl;\r
+      }\r
+    }\r
+    CephContext *cct;\r
+    ibv_pd* const pd;\r
+  };\r
+\r
+\r
+  class MemoryManager {\r
+   public:\r
+    class Chunk {\r
+     public:\r
+      Chunk(char* b, uint32_t len, ibv_mr* m) : buffer(b), bytes(len), offset(0), mr(m) {}\r
+      ~Chunk() {\r
+        assert(ibv_dereg_mr(mr) == 0);\r
+      }\r
+\r
+      void set_offset(uint32_t o) {\r
+        offset = o;\r
+      }\r
+\r
+      size_t get_offset() {\r
+        return offset;\r
+      }\r
+\r
+      void set_bound(uint32_t b) {\r
+        bound = b;\r
+      }\r
+\r
+      void prepare_read(uint32_t b) {\r
+        offset = 0;\r
+        bound = b;\r
+      }\r
+\r
+      uint32_t get_bound() {\r
+        return bound;\r
+      }\r
+\r
+      size_t read(char* buf, size_t len) {\r
+        size_t left = bound - offset;\r
+        if(left >= len) {\r
+          memcpy(buf, buffer+offset, len);\r
+          offset += len;\r
+          return len;\r
+        } else {\r
+          memcpy(buf, buffer+offset, left);\r
+          offset = 0;\r
+          bound = 0;\r
+          return left;\r
+        }\r
+      }\r
+\r
+      size_t write(char* buf, size_t len) {\r
+        size_t left = bytes - offset;\r
+        if (left >= len) {\r
+          memcpy(buffer+offset, buf, len);\r
+          offset += len;\r
+          return len;\r
+        } else {\r
+          memcpy(buffer+offset, buf, left);\r
+          offset = bytes;\r
+          return left;\r
+        }\r
+      }\r
+\r
+      bool full() {\r
+        return offset == bytes;\r
+      }\r
+\r
+      bool over() {\r
+        return offset == bound;\r
+      }\r
+\r
+      void clear() {\r
+        offset = 0;\r
+        bound = 0;\r
+      }\r
+\r
+      void post_srq(Infiniband *ib) {\r
+        ib->post_chunk(this);\r
+      }\r
+\r
+      void set_owner(uint64_t o) {\r
+        owner = o;\r
+      }\r
+\r
+      uint64_t get_owner() {\r
+        return owner;\r
+      }\r
+\r
+     public:\r
+      char* buffer;\r
+      uint32_t bytes;\r
+      uint32_t bound;\r
+      size_t offset;\r
+      ibv_mr* mr;\r
+      uint64_t owner;\r
+    };\r
+\r
+    class Cluster {\r
+     public:\r
+      Cluster(MemoryManager& m, uint32_t s) : manager(m), chunk_size(s), lock("cluster_lock"){}\r
+      Cluster(MemoryManager& m, uint32_t s, uint32_t n) : manager(m), chunk_size(s), lock("cluster_lock"){\r
+        add(n);\r
+      }\r
+\r
+      ~Cluster() {\r
+        set<Chunk*>::iterator c = all_chunks.begin();\r
+        while(c != all_chunks.end()) {\r
+          delete *c;\r
+          ++c;\r
+        }\r
+      }\r
+      int add(uint32_t num) {\r
+        uint32_t bytes = chunk_size * num;\r
+        //cihar* base = (char*)malloc(bytes);\r
+        if (!manager.enabled_huge_page) {\r
+          base = (char*)memalign(CEPH_PAGE_SIZE, bytes);\r
+        } else {\r
+          base = (char*)manager.malloc_huge_pages(bytes);\r
+        }\r
+        assert(base);\r
+        for (uint32_t offset = 0; offset < bytes; offset += chunk_size){\r
+          ibv_mr* m = ibv_reg_mr(manager.pd->pd, base+offset, chunk_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);\r
+          assert(m);\r
+          Chunk* c = new Chunk(base+offset,chunk_size,m);\r
+          free_chunks.push_back(c);\r
+          all_chunks.insert(c);\r
+        }\r
+        return 0;\r
+      }\r
+\r
+      int take_back(Chunk* ck) {\r
+        Mutex::Locker l(lock);\r
+        free_chunks.push_back(ck);\r
+        return 0;\r
+      }\r
+\r
+      int get_buffers(std::vector<Chunk*> &chunks, size_t bytes) {\r
+        Mutex::Locker l(lock);\r
+        if (!bytes) {\r
+          free_chunks.swap(chunks);\r
+          return 0;\r
+        }\r
+        uint32_t num = bytes / chunk_size + 1;\r
+        if (bytes % chunk_size == 0)\r
+          --num;\r
+        if (free_chunks.size() < num)\r
+          return -EAGAIN;\r
+        for (uint32_t i = 0; i < num; ++i) {\r
+          chunks.push_back(free_chunks.back());\r
+          free_chunks.pop_back();\r
+        }\r
+        return 0;\r
+      }\r
+\r
+      MemoryManager& manager;\r
+      uint32_t chunk_size;\r
+      Mutex lock;\r
+      std::vector<Chunk*> free_chunks;\r
+      set<Chunk*> all_chunks;\r
+      char* base;\r
+    };\r
+\r
+    MemoryManager(CephContext *cct, Device *d, ProtectionDomain *p) : cct(cct), device(d), pd(p) {\r
+      enabled_huge_page = cct->_conf->ms_async_rdma_enable_hugepage;\r
+    }\r
+    ~MemoryManager() {\r
+      if(channel)\r
+        delete channel;\r
+      if(send)\r
+        delete send;\r
+    }\r
+    void* malloc_huge_pages(size_t size) {\r
+      size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);\r
+      char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);\r
+      if (ptr == MAP_FAILED) {\r
+        lderr(cct) << __func__ << " MAP_FAILED" << dendl;\r
+        ptr = (char *)malloc(real_size);\r
+        if (ptr == NULL) return NULL;\r
+        real_size = 0;\r
+      }\r
+      *((size_t *)ptr) = real_size;\r
+      lderr(cct) << __func__ << " bingo!" << dendl;\r
+      return ptr + HUGE_PAGE_SIZE;\r
+    }\r
+    void free_huge_pages(void *ptr) {\r
+      if (ptr == NULL) return;\r
+      void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;\r
+      size_t real_size = *((size_t *)real_ptr);\r
+      assert(real_size % HUGE_PAGE_SIZE == 0);\r
+      if (real_size != 0)\r
+        munmap(real_ptr, real_size);\r
+      else\r
+        free(real_ptr);\r
+    }\r
+    void register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num) {\r
+      assert(device);\r
+      assert(pd);\r
+      channel = new Cluster(*this, size);\r
+      channel->add(rx_num);\r
+\r
+      send = new Cluster(*this, size);\r
+      send->add(tx_num);\r
+    }\r
+    int return_tx(Chunk* c) {\r
+      c->clear();\r
+      return send->take_back(c);\r
+    }\r
+\r
+    int get_send_buffers(std::vector<Chunk*> &c, size_t bytes) {\r
+      return send->get_buffers(c, bytes);\r
+    }\r
+\r
+    int get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes) {\r
+      return channel->get_buffers(chunks, bytes);\r
+    }\r
+\r
+    int is_tx_chunk(Chunk* c) { return send->all_chunks.count(c);}\r
+    bool enabled_huge_page;\r
+   private:\r
+    Cluster* channel;//RECV\r
+    Cluster* send;// SEND\r
+    CephContext *cct;\r
+    Device *device;\r
+    ProtectionDomain *pd;\r
+  };\r
+\r
+ private:\r
+  uint32_t max_send_wr;\r
+  uint32_t max_recv_wr;\r
+  uint32_t max_sge;\r
+  uint8_t  ib_physical_port;\r
+  MemoryManager* memory_manager;\r
+  ibv_srq* srq;             // shared receive work queue\r
+  Device *device;\r
+  ProtectionDomain *pd;\r
+  CephContext* cct;\r
+  DeviceList device_list;\r
+  void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);\r
+  void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);\r
+\r
+ public:\r
+  NetHandler net;\r
+  RDMAStack* stack;\r
+  explicit Infiniband(RDMAStack* s, CephContext *c, const std::string &device_name);\r
+\r
+  /**\r
+   * Destroy an Infiniband object.\r
+   */\r
+  ~Infiniband() {\r
+    assert(ibv_destroy_srq(srq) == 0);\r
+    delete memory_manager;\r
+    delete pd;\r
+  }\r
+\r
+  class CompletionChannel {\r
+     static const uint32_t MAX_ACK_EVENT = 5000;\r
+     Infiniband& infiniband;\r
+     ibv_comp_channel *channel;\r
+     ibv_cq *cq;\r
+     uint32_t cq_events_that_need_ack;\r
+\r
+   public:\r
+    CompletionChannel(Infiniband &ib): infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) {}\r
+    ~CompletionChannel();\r
+    int init();\r
+    bool get_cq_event();\r
+    int get_fd() { return channel->fd; }\r
+    ibv_comp_channel* get_channel() { return channel; }\r
+    void bind_cq(ibv_cq *c) { cq = c; }\r
+    void ack_events() {\r
+      ibv_ack_cq_events(cq, cq_events_that_need_ack);\r
+      cq_events_that_need_ack = 0;\r
+    }\r
+  };\r
+\r
+  // this class encapsulates the creation, use, and destruction of an RC\r
+  // completion queue.\r
+  //\r
+  // You need to call init and it will create a cq and associate to comp channel\r
+  class CompletionQueue {\r
+   public:\r
+    CompletionQueue(Infiniband &ib, const uint32_t qd, CompletionChannel *cc):infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {}\r
+    ~CompletionQueue();\r
+    int init();\r
+    int poll_cq(int num_entries, ibv_wc *ret_wc_array);\r
+\r
+    ibv_cq* get_cq() const { return cq; }\r
+    int rearm_notify(bool solicited_only=true);\r
+    CompletionChannel* get_cc() const { return channel; }\r
+   private:\r
+    Infiniband&  infiniband;     // Infiniband to which this QP belongs\r
+    CompletionChannel *channel;\r
+    ibv_cq *cq;\r
+    uint32_t queue_depth;\r
+  };\r
+\r
+  // this class encapsulates the creation, use, and destruction of an RC\r
+  // queue pair.\r
+  //\r
+  // you need call init and it will create a qp and bring it to the INIT state.\r
+  // after obtaining the lid, qpn, and psn of a remote queue pair, one\r
+  // must call plumb() to bring the queue pair to the RTS state.\r
+  class QueuePair {\r
+   public:\r
+    QueuePair(Infiniband& infiniband, ibv_qp_type type,int ib_physical_port,  ibv_srq *srq, Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0);\r
+    // exists solely as superclass constructor for MockQueuePair derivative\r
+    explicit QueuePair(Infiniband& infiniband):\r
+      infiniband(infiniband), type(IBV_QPT_RC), ctxt(NULL), ib_physical_port(-1),\r
+      pd(NULL), srq(NULL), qp(NULL), txcq(NULL), rxcq(NULL),\r
+      initial_psn(-1) {}\r
+    ~QueuePair();\r
+\r
+    int init();\r
+\r
+    /**\r
+     * Get the initial packet sequence number for this QueuePair.\r
+     * This is randomly generated on creation. It should not be confused\r
+     * with the remote side's PSN, which is set in #plumb(). \r
+     */\r
+    uint32_t get_initial_psn() const { return initial_psn; };\r
+    /**\r
+     * Get the local queue pair number for this QueuePair.\r
+     * QPNs are analogous to UDP/TCP port numbers.\r
+     */\r
+    uint32_t get_local_qp_number() const { return qp->qp_num; };\r
+    /**\r
+     * Get the remote queue pair number for this QueuePair, as set in #plumb().\r
+     * QPNs are analogous to UDP/TCP port numbers.\r
+     */\r
+    int get_remote_qp_number(uint32_t *rqp) const {\r
+      ibv_qp_attr qpa;\r
+      ibv_qp_init_attr qpia;\r
+\r
+      int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);\r
+      if (r) {\r
+        lderr(infiniband.cct) << __func__ << " failed to query qp: "\r
+          << cpp_strerror(errno) << dendl;\r
+        return -1;\r
+      }\r
+\r
+      if (rqp)\r
+        *rqp = qpa.dest_qp_num;\r
+      return 0;\r
+    }\r
+    /**\r
+     * Get the remote infiniband address for this QueuePair, as set in #plumb().\r
+     * LIDs are "local IDs" in infiniband terminology. They are short, locally\r
+     * routable addresses.\r
+     */\r
+    int get_remote_lid(uint16_t *lid) const {\r
+      ibv_qp_attr qpa;\r
+      ibv_qp_init_attr qpia;\r
+\r
+      int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);\r
+      if (r) {\r
+        lderr(infiniband.cct) << __func__ << " failed to query qp: "\r
+          << cpp_strerror(errno) << dendl;\r
+        return -1;\r
+      }\r
+\r
+      if (lid)\r
+        *lid = qpa.ah_attr.dlid;\r
+      return 0;\r
+    }\r
+    /**\r
+     * Get the state of a QueuePair.\r
+     */\r
+    int get_state() const {\r
+      ibv_qp_attr qpa;\r
+      ibv_qp_init_attr qpia;\r
+\r
+      int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);\r
+      if (r) {\r
+        lderr(infiniband.cct) << __func__ << " failed to get state: "\r
+          << cpp_strerror(errno) << dendl;\r
+        return -1;\r
+      }\r
+      return qpa.qp_state;\r
+    }\r
+    /**\r
+     * Return true if the queue pair is in an error state, false otherwise.\r
+     */\r
+    bool is_error() const {\r
+      ibv_qp_attr qpa;\r
+      ibv_qp_init_attr qpia;\r
+\r
+      int r = ibv_query_qp(qp, &qpa, -1, &qpia);\r
+      if (r) {\r
+        lderr(infiniband.cct) << __func__ << " failed to get state: "\r
+          << cpp_strerror(errno) << dendl;\r
+        return true;\r
+      }\r
+      return qpa.cur_qp_state == IBV_QPS_ERR;\r
+    }\r
+    ibv_qp* get_qp() const { return qp; }\r
+    Infiniband::CompletionQueue* get_tx_cq() const { return txcq; }\r
+    Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; }\r
+    int to_reset();\r
+    int to_dead();\r
+    int get_fd() { return fd; }\r
+   private:\r
+    Infiniband&  infiniband;     // Infiniband to which this QP belongs\r
+    ibv_qp_type  type;           // QP type (IBV_QPT_RC, etc.)\r
+    ibv_context* ctxt;           // device context of the HCA to use\r
+    int ib_physical_port;\r
+    ibv_pd*      pd;             // protection domain\r
+    ibv_srq*     srq;            // shared receive queue\r
+    ibv_qp*      qp;             // infiniband verbs QP handle\r
+    Infiniband::CompletionQueue* txcq;\r
+    Infiniband::CompletionQueue* rxcq;\r
+    uint32_t     initial_psn;    // initial packet sequence number\r
+    uint32_t     max_send_wr;\r
+    uint32_t     max_recv_wr;\r
+    uint32_t     q_key;\r
+    int fd;\r
+  };\r
+\r
+ public:\r
+  typedef MemoryManager::Cluster Cluster;\r
+  typedef MemoryManager::Chunk Chunk;\r
+  QueuePair* create_queue_pair(ibv_qp_type type);\r
+  ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);\r
+  int post_chunk(Chunk* chunk);\r
+  int post_channel_cluster();\r
+  int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes) {\r
+    return memory_manager->get_send_buffers(c, bytes);\r
+  }\r
+  CompletionChannel *create_comp_channel();\r
+  CompletionQueue *create_comp_queue(CompletionChannel *cc=NULL);\r
+  uint8_t get_ib_physical_port() {\r
+    return ib_physical_port;\r
+  }\r
+  int send_udp_msg(int sd, IBSYNMsg& msg, entity_addr_t &peeraddr);\r
+  int recv_udp_msg(int sd, IBSYNMsg& msg, entity_addr_t *addr);\r
+  uint16_t get_lid() { return device->get_lid(); }\r
+  ibv_gid get_gid() { return device->get_gid(); }\r
+  MemoryManager* get_memory_manager() { return memory_manager; }\r
+  Device* get_device() { return device; }\r
+  static const char* wc_status_to_string(int status);\r
+};\r
+\r
+#endif\r
diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc
new file mode 100644 (file)
index 0000000..3e93eba
--- /dev/null
@@ -0,0 +1,339 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- \r
+// vim: ts=8 sw=2 smarttab\r
+/*\r
+ * Ceph - scalable distributed file system\r
+ *\r
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>\r
+ *\r
+ * Author: Haomai Wang <haomaiwang@gmail.com>\r
+ *\r
+ * This is free software; you can redistribute it and/or\r
+ * modify it under the terms of the GNU Lesser General Public\r
+ * License version 2.1, as published by the Free Software\r
+ * Foundation.  See file COPYING.\r
+ *\r
+ */\r
+\r
+#include "RDMAStack.h"\r
+\r
+#define dout_subsys ceph_subsys_ms\r
+#undef dout_prefix\r
+#define dout_prefix *_dout << " RDMAConnectedSocketImpl "\r
+\r
+int RDMAConnectedSocketImpl::activate() {\r
+  ibv_qp_attr qpa;\r
+  int r;\r
+\r
+  // now connect up the qps and switch to RTR\r
+  memset(&qpa, 0, sizeof(qpa));\r
+  qpa.qp_state = IBV_QPS_RTR;\r
+  qpa.path_mtu = IBV_MTU_1024;\r
+  qpa.dest_qp_num = peer_msg.qpn;\r
+  qpa.rq_psn = peer_msg.psn;\r
+  qpa.max_dest_rd_atomic = 1;\r
+  qpa.min_rnr_timer = 12;\r
+  //qpa.ah_attr.is_global = 0;\r
+  qpa.ah_attr.is_global = 1;\r
+  qpa.ah_attr.grh.hop_limit = 6;\r
+  qpa.ah_attr.grh.dgid = peer_msg.gid;\r
+  qpa.ah_attr.grh.sgid_index = 0;\r
+\r
+  qpa.ah_attr.dlid = peer_msg.lid;\r
+  qpa.ah_attr.sl = 0;\r
+  qpa.ah_attr.src_path_bits = 0;\r
+  qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port());\r
+\r
+  r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |\r
+      IBV_QP_AV |\r
+      IBV_QP_PATH_MTU |\r
+      IBV_QP_DEST_QPN |\r
+      IBV_QP_RQ_PSN |\r
+      IBV_QP_MIN_RNR_TIMER |\r
+      IBV_QP_MAX_DEST_RD_ATOMIC);\r
+  if (r) {\r
+    lderr(cct) << __func__ << " failed to transition to RTR state: "\r
+      << cpp_strerror(errno) << dendl;\r
+    return -1;\r
+  }\r
+\r
+  ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl;\r
+\r
+  // now move to RTS\r
+  qpa.qp_state = IBV_QPS_RTS;\r
+\r
+  // How long to wait before retrying if packet lost or server dead.\r
+  // Supposedly the timeout is 4.096us*2^timeout.  However, the actual\r
+  // timeout appears to be 4.096us*2^(timeout+1), so the setting\r
+  // below creates a 135ms timeout.\r
+  qpa.timeout = 14;\r
+\r
+  // How many times to retry after timeouts before giving up.\r
+  qpa.retry_cnt = 7;\r
+\r
+  // How many times to retry after RNR (receiver not ready) condition\r
+  // before giving up. Occurs when the remote side has not yet posted\r
+  // a receive request.\r
+  qpa.rnr_retry = 7; // 7 is infinite retry.\r
+  qpa.sq_psn = my_msg.psn;\r
+  qpa.max_rd_atomic = 1;\r
+\r
+  r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |\r
+      IBV_QP_TIMEOUT |\r
+      IBV_QP_RETRY_CNT |\r
+      IBV_QP_RNR_RETRY |\r
+      IBV_QP_SQ_PSN |\r
+      IBV_QP_MAX_QP_RD_ATOMIC);\r
+  if (r) {\r
+    lderr(cct) << __func__ << " failed to transition to RTS state: "\r
+      << cpp_strerror(errno) << dendl;\r
+    return -1;\r
+  }\r
+\r
+  // the queue pair should be ready to use once the client has finished\r
+  // setting up their end.\r
+  ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;\r
+\r
+  connected = 1;//indicate successfully\r
+  return 0;\r
+}\r
+\r
+ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)\r
+{\r
+  ldout(cct, 20) << __func__ << " need to read bytes: " << len  << " buffers size: " << buffers.size() << dendl;\r
+\r
+  ssize_t read = 0;\r
+  if (!buffers.empty())\r
+    read = read_buffers(buf,len);\r
+\r
+  static const int MAX_COMPLETIONS = 16;\r
+  ibv_wc wc[MAX_COMPLETIONS];\r
+\r
+  bool rearmed = false;\r
+  int n;\r
+ again:\r
+  n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);\r
+  ldout(cct, 20) << __func__ << " poll completion queue got " << n << " responses."<< dendl;\r
+  for (int i = 0; i < n; ++i) {\r
+    ibv_wc* response = &wc[i];\r
+    ldout(cct, 20) << __func__ << " cqe  " << response->byte_len << " bytes." << dendl;\r
+    Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);\r
+    chunk->prepare_read(response->byte_len);\r
+    if (!response->byte_len) {\r
+      wait_close = true;\r
+      return 0;\r
+    }\r
+    if (response->status != IBV_WC_SUCCESS) {\r
+      lderr(cct) << __func__ << " poll cqe failed! " << " number: " << n << ", status: "<< response->status << cpp_strerror(errno) << dendl;\r
+      assert(0);\r
+    } else {\r
+      if (read == (ssize_t)len) {\r
+        buffers.push_back(chunk);\r
+        ldout(cct, 20) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;\r
+      } else if (read + response->byte_len > (ssize_t)len) {\r
+        read += chunk->read(buf+read, (ssize_t)len-read);\r
+        buffers.push_back(chunk);\r
+        ldout(cct, 20) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;\r
+      } else {\r
+        read += chunk->read(buf+read, response->byte_len);\r
+        assert(infiniband->post_chunk(chunk) == 0);\r
+      }\r
+    }\r
+  }\r
+\r
+  if (n)\r
+   goto again;\r
+  if (!rearmed) {\r
+     rx_cq->rearm_notify();\r
+     rearmed = true;\r
+     // Clean up cq events after rearm notify ensure no new incoming event\r
+     // arrived between polling and rearm\r
+     goto again;\r
+  }\r
+  if (read == 0)\r
+    return -EAGAIN;\r
+  return read;\r
+}\r
+\r
+ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)\r
+{\r
+  size_t read = 0, tmp = 0;\r
+  vector<Chunk*>::iterator c = buffers.begin();\r
+  for (; c != buffers.end() ; ++c) {\r
+    tmp = (*c)->read(buf+read, len-read);\r
+    read += tmp;\r
+    ldout(cct, 20) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound()  << ". Chunk:" << *c  << dendl;\r
+    if ((*c)->over()) {\r
+      assert(infiniband->post_chunk(*c) == 0);\r
+      ldout(cct, 20) << __func__ << " one chunk over." << dendl;\r
+    }\r
+    if (read == len) {\r
+      break;\r
+    }\r
+  }\r
+\r
+  if (c != buffers.end() && (*c)->over())\r
+    c++;\r
+  buffers.erase(buffers.begin(), c);\r
+  ldout(cct, 20) << __func__ << " got " << read << " bytes here. buffers size: " << buffers.size() << dendl;\r
+  return read;\r
+}\r
+\r
+ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)\r
+{\r
+  ssize_t size = 0;\r
+  static const int MAX_COMPLETIONS = 16;\r
+  ibv_wc wc[MAX_COMPLETIONS];\r
+\r
+  bool rearmed = false;\r
+  int n;\r
+ again:\r
+  n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);\r
+  ldout(cct, 20) << __func__ << " pool completion queue got " << n << " responses."<< dendl;\r
+\r
+  ibv_wc*  response;\r
+  Chunk* chunk;\r
+  bool loaded = false;\r
+  auto iter = buffers.begin();\r
+  if(iter != buffers.end()) {\r
+    chunk = *iter;\r
+    if (chunk->bound == 0) {\r
+      wait_close = true;\r
+      return 0;\r
+    }\r
+    auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
+    buffers.erase(iter);\r
+    loaded = true;\r
+    size = chunk->bound;\r
+  }\r
+\r
+  for (int i = 0; i < n; ++i) {\r
+    response = &wc[i];\r
+    chunk = reinterpret_cast<Chunk*>(response->wr_id);\r
+    chunk->prepare_read(response->byte_len);\r
+    if(!loaded && i == 0) {\r
+      if (chunk->bound == 0) {\r
+        wait_close = true;\r
+        return 0;\r
+      }\r
+      auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
+      size = chunk->bound;\r
+      continue;\r
+    }\r
+    buffers.push_back(chunk);\r
+  }\r
+\r
+  if (n)\r
+   goto again;\r
+  if (!rearmed) {\r
+     rx_cq->rearm_notify();\r
+     rearmed = true;\r
+     // Clean up cq events after rearm notify ensure no new incoming event\r
+     // arrived between polling and rearm\r
+     goto again;\r
+  }\r
+  if (size == 0)\r
+    return -EAGAIN;\r
+  return size;\r
+}\r
+\r
+ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)\r
+{\r
+  size_t bytes = bl.length();\r
+  if (!bytes)\r
+    return 0;\r
+  vector<Chunk*> tx_buffers;\r
+  if (infiniband->get_tx_buffers(tx_buffers, bytes) < 0) {\r
+    ldout(cct, 10) << __func__ << " no enough buffers" << dendl;\r
+    return 0;\r
+  }\r
+  ldout(cct, 20) << __func__ << " prepare " << bytes << " bytes, tx buffer count: " << tx_buffers.size() << dendl;\r
+  vector<Chunk*>::iterator current_buffer = tx_buffers.begin();\r
+  list<bufferptr>::const_iterator it = bl.buffers().begin();\r
+  while (it != bl.buffers().end()) {\r
+    const uintptr_t addr = reinterpret_cast<const uintptr_t>(it->c_str());\r
+    uint32_t copied = 0;\r
+    //  ldout(cct, 20) << __func__ << " app_buffer: " << addr << " length:  " << it->length()  << dendl;\r
+    while(copied < it->length()) {\r
+      //   ldout(cct, 20) << __func__ << " current_buffer: " << *current_buffer << " copied:  " << copied  << dendl;\r
+      size_t ret = (*current_buffer)->write((char*)addr+copied, it->length() - copied);\r
+      copied += ret;\r
+      //  ldout(cct, 20) << __func__ << " ret: " << ret << " copied:  " << copied  << dendl;\r
+      if((*current_buffer)->full()){\r
+        ++current_buffer;\r
+      }\r
+    }\r
+    ++it;\r
+  }\r
+\r
+  ssize_t r = post_work_request(tx_buffers);\r
+  if (r < 0)\r
+    return r;\r
+\r
+  ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl;\r
+  bl.clear();\r
+  return bytes;\r
+}\r
+\r
+int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)\r
+{\r
+  vector<Chunk*>::iterator current_buffer = tx_buffers.begin();\r
+  ibv_sge isge[tx_buffers.size()];\r
+  uint32_t current_sge = 0;\r
+  ibv_send_wr iswr[tx_buffers.size()];\r
+  uint32_t current_swr = 0;\r
+  ibv_send_wr* pre_wr = NULL;\r
+\r
+  current_buffer = tx_buffers.begin();\r
+  while (current_buffer != tx_buffers.end()) {\r
+    isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer);\r
+    isge[current_sge].length = (*current_buffer)->get_offset();\r
+    isge[current_sge].lkey = (*current_buffer)->mr->lkey;\r
+    ldout(cct, 20) << __func__ << " current_buffer: " << *current_buffer << " length: " << isge[current_sge].length  << dendl;\r
+\r
+    iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer);\r
+    iswr[current_swr].next = NULL;\r
+    iswr[current_swr].sg_list = &isge[current_sge];\r
+    iswr[current_swr].num_sge = 1;\r
+    iswr[current_swr].opcode = IBV_WR_SEND;\r
+    iswr[current_swr].send_flags = IBV_SEND_SIGNALED;\r
+    /*if(isge[current_sge].length < infiniband->max_inline_data) {\r
+      iswr[current_swr].send_flags = IBV_SEND_INLINE;\r
+      ldout(cct, 20) << __func__ << " send_inline." << dendl;\r
+      }*/\r
+\r
+    if (pre_wr)\r
+      pre_wr->next = &iswr[current_swr];\r
+    pre_wr = &iswr[current_swr];\r
+    ++current_sge;\r
+    ++current_swr;\r
+    ++current_buffer;\r
+  }\r
+\r
+  ibv_send_wr *bad_tx_work_request;\r
+  if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {\r
+    lderr(cct) << __func__ << " failed to send data"\r
+               << " (most probably should be peer not ready): "\r
+               << cpp_strerror(errno) << dendl;\r
+    return -errno;\r
+  }\r
+  return 0;\r
+}\r
+\r
+void RDMAConnectedSocketImpl::fin() {\r
+  //ibv_sge list;\r
+  //memset(&list, 0, sizeof(list));\r
+  ibv_send_wr wr;\r
+  memset(&wr, 0, sizeof(wr));\r
+  wr.wr_id = reinterpret_cast<uint64_t>(this);\r
+  wr.num_sge = 0;\r
+  //wr.sg_list = &list;\r
+  wr.opcode = IBV_WR_SEND;\r
+  wr.send_flags = IBV_SEND_SIGNALED;\r
+  ibv_send_wr* bad_tx_work_request;\r
+  if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {\r
+    lderr(cct) << __func__ << " failed to send FIN"\r
+               << "(most probably should be peer not ready): "\r
+               << cpp_strerror(errno) << dendl;\r
+    return ;\r
+  }\r
+}\r
diff --git a/src/msg/async/rdma/RDMAServerSocketImpl.cc b/src/msg/async/rdma/RDMAServerSocketImpl.cc
new file mode 100644 (file)
index 0000000..0a148f9
--- /dev/null
@@ -0,0 +1,95 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- \r
+// vim: ts=8 sw=2 smarttab\r
+/*\r
+ * Ceph - scalable distributed file system\r
+ *\r
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>\r
+ *\r
+ * Author: Haomai Wang <haomaiwang@gmail.com>\r
+ *\r
+ * This is free software; you can redistribute it and/or\r
+ * modify it under the terms of the GNU Lesser General Public\r
+ * License version 2.1, as published by the Free Software\r
+ * Foundation.  See file COPYING.\r
+ *\r
+ */\r
+\r
+#include "msg/async/net_handler.h"\r
+#include "RDMAStack.h"\r
+\r
+#define dout_subsys ceph_subsys_ms\r
+#undef dout_prefix\r
+#define dout_prefix *_dout << " RDMAServerSocketImpl "\r
+\r
+int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)\r
+{\r
+  server_setup_socket = ::socket(sa.get_family(), SOCK_DGRAM, 0);\r
+  if (server_setup_socket == -1) {\r
+    lderr(cct) << __func__ << " failed to create server socket: "\r
+               << cpp_strerror(errno) << dendl;\r
+    return -errno;\r
+  }\r
+\r
+  int on = 1;\r
+  int rc = ::setsockopt(server_setup_socket, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));\r
+  if (rc < 0) {\r
+    lderr(cct) << __func__ << " unable to setsockopt: " << cpp_strerror(errno) << dendl;\r
+    goto err;\r
+  }\r
+\r
+  rc = ::bind(server_setup_socket, sa.get_sockaddr(), sa.get_sockaddr_len());\r
+  if (rc < 0) {\r
+    lderr(cct) << __func__ << " unable to bind to " << sa.get_sockaddr()\r
+               << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;\r
+    goto err;\r
+  }\r
+\r
+  rc = net.set_nonblock(server_setup_socket);\r
+  if (rc < 0) {\r
+    goto err;\r
+  }\r
+\r
+  net.set_close_on_exec(server_setup_socket);\r
+\r
+  ldout(cct, 20) << __func__ << " bind to " << sa.get_sockaddr() << " on port " << sa.get_port()  << dendl;\r
+  return 0;\r
+\r
+err:\r
+  ::close(server_setup_socket);\r
+  server_setup_socket = -1;\r
+  return -1;\r
+}\r
+\r
+int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out)\r
+{\r
+  ldout(cct, 15) << __func__ << dendl;\r
+  int r;\r
+  RDMAConnectedSocketImpl* server;\r
+  while (1) {\r
+    IBSYNMsg msg;//TODO\r
+    entity_addr_t addr;\r
+    r = infiniband->recv_udp_msg(server_setup_socket, msg, &addr);\r
+    if (r < 0) {\r
+      r = -errno;\r
+      if (r != -EAGAIN)\r
+        ldout(cct, 10) << __func__ << " recv msg failed:" << cpp_strerror(errno)<< dendl;\r
+      break;\r
+    } else if (r > 0) {\r
+      ldout(cct, 1) << __func__ << " recv msg not whole." << dendl;\r
+      continue;\r
+    } else {\r
+      //RDMAWorker* w = static_cast<RDMAWorker*>(infiniband->stack->get_worker());\r
+      server = new RDMAConnectedSocketImpl(cct, infiniband, NULL, msg);\r
+      msg = server->get_my_msg();\r
+      r = infiniband->send_udp_msg(server_setup_socket, msg, addr);\r
+      server->activate();\r
+      std::unique_ptr<RDMAConnectedSocketImpl> csi(server);\r
+      *sock = ConnectedSocket(std::move(csi));\r
+      if(out)\r
+        *out = sa;\r
+      return r;\r
+    }\r
+  }\r
+\r
+  return r;\r
+}\r
diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc
new file mode 100644 (file)
index 0000000..ca92fae
--- /dev/null
@@ -0,0 +1,152 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "RDMAStack.h"
+#include "include/str_list.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << "RDMAStack "
+
+static Infiniband* global_infiniband;
+
+int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
+{
+  auto p = new RDMAServerSocketImpl(cct, infiniband, sa);
+  int r = p->listen(sa, opt);
+  if (r < 0) {
+    delete p;
+    return r;
+  }
+
+  *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p));
+  return 0;
+}
+
+int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
+{
+  RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, this);
+  entity_addr_t sa;
+  memcpy(&sa, &addr, sizeof(addr));
+
+  IBSYNMsg msg = p->get_my_msg();
+  ldout(cct, 20) << __func__ << " connecting to " << sa.get_sockaddr() << " : " << sa.get_port() << dendl;
+  ldout(cct, 20) << __func__ << " my syn msg :  < " << msg.qpn << ", " << msg.psn <<  ", " << msg.lid << ">"<< dendl;
+
+  client_setup_socket = ::socket(PF_INET, SOCK_DGRAM, 0);
+  if (client_setup_socket == -1) {
+    lderr(cct) << __func__ << " failed to create client socket: " << strerror(errno) << dendl;
+    return -errno;
+  }
+
+  int r = ::connect(client_setup_socket, addr.get_sockaddr(), addr.get_sockaddr_len());
+  if (r < 0) {
+    lderr(cct) << __func__ << " failed to connect " << addr << ": "
+               << strerror(errno) << dendl;
+    return -errno;
+  }
+
+  r = infiniband->send_udp_msg(client_setup_socket, msg, sa);
+  if (r < 0) {
+    ldout(cct, 0) << __func__ << " send msg failed." << dendl;
+    return r;
+  }
+
+  // FIXME: need to make this async
+  r = infiniband->recv_udp_msg(client_setup_socket, msg, &sa);
+  if (r < 0) {
+    ldout(cct, 0) << __func__ << " recv msg failed." << dendl;
+    return r;
+  }
+  p->set_peer_msg(msg);
+  ldout(cct, 20) << __func__ << " peer msg :  < " << msg.qpn << ", " << msg.psn <<  ", " << msg.lid << "> " << dendl;
+  r = p->activate();
+  assert(!r);
+  std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
+  *socket = ConnectedSocket(std::move(csi));
+
+  return 0;
+}
+
+RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
+{
+  if (!global_infiniband)
+    global_infiniband = new Infiniband(
+        this, cct, cct->_conf->ms_async_rdma_device_name);
+}
+
+void RDMAWorker::initialize()
+{
+  infiniband = global_infiniband;
+  tx_cc = infiniband->create_comp_channel();
+  tx_cq = infiniband->create_comp_queue(tx_cc);
+  center.create_file_event(tx_cc->get_fd(), EVENT_READABLE, tx_handler);
+  memory_manager = infiniband->get_memory_manager();
+}
+
+void RDMAWorker::handle_tx_event()
+{
+  ldout(cct, 20) << __func__ << dendl;
+  if (!tx_cc->get_cq_event())
+    return ;
+
+  static const int MAX_COMPLETIONS = 16;
+  ibv_wc wc[MAX_COMPLETIONS];
+
+  bool rearmed = false;
+  int n;
+ again:
+  n = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
+  ldout(cct, 20) << __func__ << " pool completion queue got " << n
+                 << " responses."<< dendl;
+  for (int i = 0; i < n; ++i) {
+    ibv_wc* response = &wc[i];
+    Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+    ldout(cct, 20) << __func__ << " opcode: " << response->opcode << " len: " << response->byte_len << dendl;
+
+    if (response->status != IBV_WC_SUCCESS) {
+      if (response->status == IBV_WC_RETRY_EXC_ERR) {
+        lderr(cct) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
+      } else if (response->status == IBV_WC_WR_FLUSH_ERR) {
+        lderr(cct) << __func__ << " Work Request Flushed Error: this connection's qp="
+                               << response->qp_num << " should be down while this WR=" << response->wr_id
+                               << " still in flight." << dendl;
+      } else {
+        lderr(cct) << __func__ << " send work request returned error for buffer("
+                               << response->wr_id << ") status(" << response->status << "): "
+                               << infiniband->wc_status_to_string(response->status) << dendl;
+      }
+      assert(0);
+    }
+
+    if (memory_manager->is_tx_chunk(chunk))
+      infiniband->get_memory_manager()->return_tx(chunk);
+    else
+      ldout(cct, 20) << __func__ << " chunk belongs to none " << dendl;
+  }
+
+  if (n)
+    goto again;
+
+  if (!rearmed) {
+    tx_cq->rearm_notify();
+    rearmed = true;
+    // Clean up cq events after rearm notify ensure no new incoming event
+    // arrived between polling and rearm
+    goto again;
+  }
+  ldout(cct, 20) << __func__ << " leaving handle_tx_event. " << dendl;
+}
diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h
new file mode 100644 (file)
index 0000000..2ff1c0d
--- /dev/null
@@ -0,0 +1,184 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MSG_RDMASTACK_H
+#define CEPH_MSG_RDMASTACK_H
+
+#include "common/ceph_context.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "msg/async/Stack.h"
+#include <thread>
+#include "Infiniband.h"
+
+class RDMAConnectedSocketImpl;
+
+class RDMAWorker : public Worker {
+  typedef Infiniband::CompletionQueue CompletionQueue;
+  typedef Infiniband::CompletionChannel CompletionChannel;
+  typedef Infiniband::MemoryManager::Chunk Chunk;
+  typedef Infiniband::MemoryManager MemoryManager;
+  int client_setup_socket;
+  Infiniband* infiniband;
+  CompletionQueue* tx_cq;           // common completion queue for all transmits
+  CompletionChannel* tx_cc;
+  EventCallbackRef tx_handler;
+  MemoryManager* memory_manager;
+  vector<RDMAConnectedSocketImpl*> to_delete;
+  class C_handle_cq_tx : public EventCallback {
+    RDMAWorker *worker;
+    public:
+    C_handle_cq_tx(RDMAWorker *w): worker(w) {}
+    void do_request(int fd) {
+      worker->handle_tx_event();
+    }
+  };
+
+ public:
+  explicit RDMAWorker(CephContext *c, unsigned i)
+    : Worker(c, i), infiniband(NULL), tx_handler(new C_handle_cq_tx(this)) {}
+  virtual ~RDMAWorker() {
+    tx_cc->ack_events();
+    delete tx_cq;
+    delete tx_cc;
+    delete tx_handler;
+  }
+
+  virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override;
+  virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
+  virtual void initialize() override;
+  void handle_tx_event();
+  CompletionQueue* get_tx_cq() { return tx_cq; }
+  void remove_to_delete(RDMAConnectedSocketImpl* csi) {
+    if (to_delete.empty())
+      return ;
+    vector<RDMAConnectedSocketImpl*>::iterator iter = to_delete.begin();
+    for (; iter != to_delete.end(); ++iter) {
+      if(csi == *iter) {
+        to_delete.erase(iter);
+      }
+    }
+  }
+  void add_to_delete(RDMAConnectedSocketImpl* csi) {
+    to_delete.push_back(csi);
+  }
+};
+
+class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
+ public:
+  typedef Infiniband::MemoryManager::Chunk Chunk;
+  typedef Infiniband::CompletionChannel CompletionChannel;
+  typedef Infiniband::CompletionQueue CompletionQueue;
+
+ private:
+  CephContext *cct;
+  Infiniband::QueuePair *qp;
+  IBSYNMsg peer_msg;
+  IBSYNMsg my_msg;
+  int connected;
+  Infiniband* infiniband;
+  RDMAWorker* worker;
+  std::vector<Chunk*> buffers;
+  CompletionChannel* rx_cc;
+  CompletionQueue* rx_cq;
+  bool wait_close;
+
+ public:
+  RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMAWorker* w, IBSYNMsg im = IBSYNMsg()) : cct(cct), peer_msg(im), infiniband(ib), worker(w), wait_close(false) {
+    qp = infiniband->create_queue_pair(IBV_QPT_RC);
+    rx_cq = qp->get_rx_cq();
+    rx_cc = rx_cq->get_cc();
+    my_msg.qpn = qp->get_local_qp_number();
+    my_msg.psn = qp->get_initial_psn();
+    my_msg.lid = infiniband->get_lid();
+    my_msg.gid = infiniband->get_gid();
+  }
+
+  virtual int is_connected() override {
+    return connected;
+  }
+  virtual ssize_t read(char* buf, size_t len) override;
+  virtual ssize_t zero_copy_read(bufferptr &data) override;
+  virtual ssize_t send(bufferlist &bl, bool more) override;
+  virtual void shutdown() override {
+  }
+  virtual void close() override {
+    if (!wait_close) {
+      fin();
+      worker->add_to_delete(this);
+    } else {
+      clear_all();
+    }
+  }
+  virtual int fd() const override {
+    return rx_cc->get_fd();
+  }
+  void clear_all() {
+    delete qp;
+    rx_cc->ack_events();
+    delete rx_cq;
+    rx_cq = NULL;
+    if (!wait_close)
+      worker->remove_to_delete(this);
+  }
+  int activate();
+  ssize_t read_buffers(char* buf, size_t len);
+  int poll_cq(int num_entries, ibv_wc *ret_wc_array);
+  IBSYNMsg get_my_msg() { return my_msg; }
+  IBSYNMsg get_peer_msg() { return peer_msg; }
+  void set_peer_msg(IBSYNMsg m) { peer_msg = m ;}
+  int post_work_request(std::vector<Chunk*>&);
+  void fin();
+};
+
+
+class RDMAServerSocketImpl : public ServerSocketImpl {
+  CephContext *cct;
+  NetHandler net;
+  int server_setup_socket;
+  Infiniband* infiniband;
+  entity_addr_t sa;
+ public:
+  RDMAServerSocketImpl(CephContext *cct, Infiniband* i, entity_addr_t& a)
+    : cct(cct), net(cct), infiniband(i), sa(a) {}
+  int listen(entity_addr_t &sa, const SocketOptions &opt);
+  virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out) override;
+  virtual void abort_accept() override {}
+  virtual int fd() const override {
+    return server_setup_socket;
+  }
+};
+
+
+class RDMAStack : public NetworkStack {
+  vector<std::thread> threads;
+
+ public:
+  explicit RDMAStack(CephContext *cct, const string &t);
+  virtual bool support_zero_copy_read() const override { return true; }
+  //virtual bool support_local_listen_table() const { return true; }
+
+  virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override {
+    threads.resize(i+1);
+    threads[i] = std::move(std::thread(func));
+  }
+  virtual void join_worker(unsigned i) override {
+    assert(threads.size() > i && threads[i].joinable());
+    threads[i].join();
+  }
+};
+
+#endif