]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: add RDMA iWARP protocol support
authorhaodong tang <haodong.tang@intel.com>
Wed, 4 Apr 2018 03:19:38 +0000 (11:19 +0800)
committerhaodong <haodong.tang@intel.com>
Wed, 20 Jun 2018 06:08:20 +0000 (14:08 +0800)
Signed-off-by: Haodong Tang <haodong.tang@intel.com>
14 files changed:
ceph.spec.in
cmake/modules/Findrdma.cmake
debian/control
src/CMakeLists.txt
src/common/legacy_config_opts.h
src/common/options.cc
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc [new file with mode: 0644]
src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc [new file with mode: 0644]
src/msg/async/rdma/RDMAServerSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index 25ba2391d5f81dc62c1227bd2446ebc3c3b91e91..e76feeac0dc5dbe0c1df8bfcb2630ee1291d80bd 100644 (file)
@@ -199,6 +199,7 @@ BuildRequires:      btrfs-progs
 BuildRequires: nss-devel
 BuildRequires: keyutils-libs-devel
 BuildRequires: libibverbs-devel
+BuildRequires:  librdmacm-devel
 BuildRequires:  openldap-devel
 BuildRequires:  openssl-devel
 BuildRequires:  CUnit-devel
index eb31f7922c91c2124d02c03f843da87261e7e4cb..dcc6cac825b303f5e4b8a21ef16cad9ea368073b 100644 (file)
@@ -10,9 +10,12 @@ find_path(RDMA_INCLUDE_DIR infiniband/verbs.h)
 set(RDMA_NAMES ${RDMA_NAMES} ibverbs)
 find_library(RDMA_LIBRARY NAMES ${RDMA_NAMES})
 
+set(RDMACM_NAMES ${RDMACM_NAMES} rdmacm)
+find_library(RDMACM_LIBRARY NAMES ${RDMACM_NAMES})
+
 if (RDMA_INCLUDE_DIR AND RDMA_LIBRARY)
   set(RDMA_FOUND TRUE)
-  set(RDMA_LIBRARIES ${RDMA_LIBRARY})
+  set(RDMA_LIBRARIES ${RDMA_LIBRARY} ${RDMACM_LIBRARY})
 else ()
   set(RDMA_FOUND FALSE)
   set( RDMA_LIBRARIES )
@@ -20,6 +23,7 @@ endif ()
 
 if (RDMA_FOUND)
   message(STATUS "Found libibverbs: ${RDMA_LIBRARY}")
+  message(STATUS "Found librdmacm: ${RDMACM_LIBRARY}")
 
   include(CheckCXXSourceCompiles)
   CHECK_CXX_SOURCE_COMPILES("
index d9906ef10fc2c6932b6d36327bc3c85438b476a7..220897a6d191f1c56e56266a63e740f4d7652b5e 100644 (file)
@@ -35,6 +35,7 @@ Build-Depends: bc,
                libfuse-dev,
                libgoogle-perftools-dev [i386 amd64 arm64],
                libibverbs-dev,
+               librdmacm-dev,
                libkeyutils-dev,
                libldap2-dev,
                libleveldb-dev,
index b372759b7752562a9aa55d7ff76ce93b9a0ffe55..afceda5c0394c59808b9b2526defeca6af2e9a0d 100644 (file)
@@ -408,7 +408,9 @@ if(HAVE_RDMA)
   list(APPEND async_rdma_common_srcs
     msg/async/rdma/Infiniband.cc
     msg/async/rdma/RDMAConnectedSocketImpl.cc
+    msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc
     msg/async/rdma/RDMAServerSocketImpl.cc
+    msg/async/rdma/RDMAIWARPServerSocketImpl.cc
     msg/async/rdma/RDMAStack.cc)
 endif(HAVE_RDMA)
 
@@ -663,7 +665,7 @@ if(HAVE_UDEV)
   list(APPEND ceph_common_deps ${UDEV_LIBRARIES})
 endif()
 if(HAVE_RDMA)
-  list(APPEND ceph_common_deps ${RDMA_LIBRARY})
+  list(APPEND ceph_common_deps ${RDMA_LIBRARIES})
 endif()
 if(NOT WITH_SYSTEM_BOOST)
   list(APPEND ceph_common_deps ${ZLIB_LIBRARIES})
index 5d2bacb12ed874435d761577127a8d8ca1f8d385..02638506b6921360903ca22e4d7fe25ffaee29f7 100644 (file)
@@ -168,6 +168,10 @@ OPTION(ms_async_rdma_roce_ver, OPT_INT)         // 0=RoCEv1, 1=RoCEv2, 2=RoCEv1.
 OPTION(ms_async_rdma_sl, OPT_INT)               // in RoCE, this means PCP
 OPTION(ms_async_rdma_dscp, OPT_INT)            // in RoCE, this means DSCP
 
+// rdma connection management
+OPTION(ms_async_rdma_cm, OPT_BOOL)
+OPTION(ms_async_rdma_type, OPT_STR)
+
 OPTION(ms_dpdk_port_id, OPT_INT)
 SAFE_OPTION(ms_dpdk_coremask, OPT_STR)        // it is modified in unittest so that use SAFE_OPTION to declare 
 OPTION(ms_dpdk_memory_channel, OPT_STR)
index 2a31285ff22831d1a7ece2ccf5aa440282113f47..b8ed5e79be62601bea04f764887476bf7d9abf00 100644 (file)
@@ -1048,6 +1048,14 @@ std::vector<Option> get_global_options() {
     .set_default(96)
     .set_description(""),
 
+    Option("ms_async_rdma_cm", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
+    .set_default(false)
+    .set_description(""),
+
+    Option("ms_async_rdma_type", Option::TYPE_STR, Option::LEVEL_ADVANCED)
+    .set_default("ib")
+    .set_description(""),
+
     Option("ms_dpdk_port_id", Option::TYPE_INT, Option::LEVEL_ADVANCED)
     .set_default(0)
     .set_description(""),
index 9ead47be8767a74726f3e1c1646231cc6ae8dfce..cfbd3680364c8e69cee67d47465e9c413c5e5f81 100644 (file)
@@ -109,14 +109,19 @@ Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt
 }
 
 
-Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr)
+Device::Device(CephContext *cct, ibv_device* d, struct ibv_context *dc)
+  : device(d), device_attr(new ibv_device_attr), active_port(nullptr)
 {
   if (device == NULL) {
     lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl;
     ceph_abort();
   }
   name = ibv_get_device_name(device);
-  ctxt = ibv_open_device(device);
+  if (cct->_conf->ms_async_rdma_cm) {
+    ctxt = dc;
+  } else {
+    ctxt = ibv_open_device(device);
+  }
   if (ctxt == NULL) {
     lderr(cct) << __func__ << " open rdma device failed. " << cpp_strerror(errno) << dendl;
     ceph_abort();
@@ -152,7 +157,7 @@ Infiniband::QueuePair::QueuePair(
     CephContext *c, Infiniband& infiniband, ibv_qp_type type,
     int port, ibv_srq *srq,
     Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
-    uint32_t tx_queue_len, uint32_t rx_queue_len, uint32_t q_key)
+    uint32_t tx_queue_len, uint32_t rx_queue_len, struct rdma_cm_id *cid, uint32_t q_key)
 : cct(c), infiniband(infiniband),
   type(type),
   ctxt(infiniband.device->ctxt),
@@ -160,6 +165,7 @@ Infiniband::QueuePair::QueuePair(
   pd(infiniband.pd->pd),
   srq(srq),
   qp(NULL),
+  cm_id(cid),
   txcq(txcq),
   rxcq(rxcq),
   initial_psn(0),
@@ -190,20 +196,32 @@ int Infiniband::QueuePair::init()
   qpia.qp_type = type;                 // RC, UC, UD, or XRC
   qpia.sq_sig_all = 0;                 // only generate CQEs on requested WQEs
 
-  qp = ibv_create_qp(pd, &qpia);
-  if (qp == NULL) {
-    lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
-    if (errno == ENOMEM) {
-      lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_queue_length, "
-                               " ms_async_rdma_send_buffers or"
-                               " ms_async_rdma_buffer_size" << dendl;
+  if (!cct->_conf->ms_async_rdma_cm) {
+    qp = ibv_create_qp(pd, &qpia);
+    if (qp == NULL) {
+      lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
+      if (errno == ENOMEM) {
+        lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_queue_length, "
+                                  " ms_async_rdma_send_buffers or"
+                                  " ms_async_rdma_buffer_size" << dendl;
+      }
+      return -1;
     }
-    return -1;
+  } else {
+    assert(cm_id->verbs == pd->context);
+    if (rdma_create_qp(cm_id, pd, &qpia)) {
+      lderr(cct) << __func__ << " failed to create queue pair with rdmacm library"
+                 << cpp_strerror(errno) << dendl;
+      return -1;
+    }
+    qp = cm_id->qp;
   }
-
   ldout(cct, 20) << __func__ << " successfully create queue pair: "
                  << "qp=" << qp << dendl;
 
+  if (cct->_conf->ms_async_rdma_cm)
+    return 0;
+
   // move from RESET to INIT state
   ibv_qp_attr qpa;
   memset(&qpa, 0, sizeof(qpa));
@@ -973,10 +991,11 @@ int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
  *      QueuePair on success or NULL if init fails
  * See QueuePair::QueuePair for parameter documentation.
  */
-Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)
+Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx,
+    CompletionQueue* rx, ibv_qp_type type, struct rdma_cm_id *cm_id)
 {
   Infiniband::QueuePair *qp = new QueuePair(
-      cct, *this, type, ib_physical_port, srq, tx, rx, tx_queue_len, rx_queue_len);
+      cct, *this, type, ib_physical_port, srq, tx, rx, tx_queue_len, rx_queue_len, cm_id);
   if (qp->init()) {
     delete qp;
     return NULL;
index 38dbafc2ef1f6ffe97a56d5cf4dba4c5cf1b46ab..0a9daf5fefb15f10aaab6ac273311fb192f80172 100644 (file)
@@ -22,6 +22,7 @@
 #include <include/assert.h>
 
 #include <infiniband/verbs.h>
+#include <rdma/rdma_cma.h>
 
 #include <atomic>
 #include <string>
@@ -76,7 +77,7 @@ class Device {
   const char* name;
   uint8_t  port_cnt = 0;
  public:
-  explicit Device(CephContext *c, ibv_device* d);
+  explicit Device(CephContext *c, ibv_device* d, struct ibv_context *dc);
   ~Device() {
     if (active_port) {
       delete active_port;
@@ -96,10 +97,12 @@ class Device {
 
 class DeviceList {
   struct ibv_device ** device_list;
+  struct ibv_context ** device_context_list;
   int num;
   Device** devices;
  public:
-  explicit DeviceList(CephContext *cct): device_list(ibv_get_device_list(&num)) {
+  explicit DeviceList(CephContext *cct): device_list(ibv_get_device_list(&num)),
+                                device_context_list(rdma_get_devices(&num)) {
     if (device_list == NULL || num == 0) {
       lderr(cct) << __func__ << " failed to get rdma device list.  " << cpp_strerror(errno) << dendl;
       ceph_abort();
@@ -107,7 +110,7 @@ class DeviceList {
     devices = new Device*[num];
 
     for (int i = 0;i < num; ++i) {
-      devices[i] = new Device(cct, device_list[i]);
+      devices[i] = new Device(cct, device_list[i], device_context_list[i]);
     }
   }
   ~DeviceList() {
@@ -430,7 +433,7 @@ class Infiniband {
               int ib_physical_port,  ibv_srq *srq,
               Infiniband::CompletionQueue* txcq,
               Infiniband::CompletionQueue* rxcq,
-              uint32_t tx_queue_len, uint32_t max_recv_wr, uint32_t q_key = 0);
+              uint32_t tx_queue_len, uint32_t max_recv_wr, struct rdma_cm_id *cid, uint32_t q_key = 0);
     ~QueuePair();
 
     int init();
@@ -483,6 +486,7 @@ class Infiniband {
     ibv_pd*      pd;             // protection domain
     ibv_srq*     srq;            // shared receive queue
     ibv_qp*      qp;             // infiniband verbs QP handle
+    struct rdma_cm_id *cm_id;
     Infiniband::CompletionQueue* txcq;
     Infiniband::CompletionQueue* rxcq;
     uint32_t     initial_psn;    // initial packet sequence number
@@ -496,7 +500,8 @@ class Infiniband {
  public:
   typedef MemoryManager::Cluster Cluster;
   typedef MemoryManager::Chunk Chunk;
-  QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type);
+  QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*,
+      ibv_qp_type type, struct rdma_cm_id *cm_id);
   ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
   // post rx buffers to srq, return number of buffers actually posted
   int  post_chunks_to_srq(int num);
index eca63e6054b9d3584cdc88e063a1cd0f65385233..3680045e8f2dcd6708cae5c68df02ca4ff4998a5 100644 (file)
@@ -13,7 +13,6 @@
  * Foundation.  See file COPYING.
  *
  */
-
 #include "RDMAStack.h"
 
 #define dout_subsys ceph_subsys_ms
@@ -27,16 +26,18 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* i
     is_server(false), con_handler(new C_handle_connection(this)),
     active(false), pending(false)
 {
-  qp = infiniband->create_queue_pair(
-                                    cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC);
-  my_msg.qpn = qp->get_local_qp_number();
-  my_msg.psn = qp->get_initial_psn();
-  my_msg.lid = infiniband->get_lid();
-  my_msg.peer_qpn = 0;
-  my_msg.gid = infiniband->get_gid();
-  notify_fd = dispatcher->register_qp(qp, this);
-  dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
-  dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
+  if (!cct->_conf->ms_async_rdma_cm) {
+    qp = infiniband->create_queue_pair(cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC, NULL);
+    my_msg.qpn = qp->get_local_qp_number();
+    my_msg.psn = qp->get_initial_psn();
+    my_msg.lid = infiniband->get_lid();
+    my_msg.peer_qpn = 0;
+    my_msg.gid = infiniband->get_gid();
+    notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
+    dispatcher->register_qp(qp, this);
+    dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
+    dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
+  }
 }
 
 RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
@@ -586,6 +587,7 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
 void RDMAConnectedSocketImpl::fin() {
   ibv_send_wr wr;
   memset(&wr, 0, sizeof(wr));
+
   wr.wr_id = reinterpret_cast<uint64_t>(qp);
   wr.num_sge = 0;
   wr.opcode = IBV_WR_SEND;
diff --git a/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc
new file mode 100644 (file)
index 0000000..672b876
--- /dev/null
@@ -0,0 +1,178 @@
+#include "RDMAStack.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << " RDMAIWARPConnectedSocketImpl "
+
+#define TIMEOUT_MS 3000
+#define RETRY_COUNT 7
+
+RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+                                                RDMAWorker *w, RDMACMInfo *info)
+  : RDMAConnectedSocketImpl(cct, ib, s, w), cm_con_handler(new C_handle_cm_connection(this))
+{
+  status = IDLE;
+  notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
+  if (info) {
+    is_server = true;
+    cm_id = info->cm_id;
+    cm_channel = info->cm_channel;
+    status = RDMA_ID_CREATED;
+    remote_qpn = info->qp_num;
+    worker->center.submit_to(worker->center.get_id(), [this]() {
+      worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
+      status = CHANNEL_FD_CREATED;
+    }, false);
+    if (alloc_resource()) {
+      close_notify();
+      return;
+    }
+    status = RESOURCE_ALLOCATED;
+    local_qpn = qp->get_local_qp_number();
+    my_msg.qpn = local_qpn;
+  } else {
+    is_server = false;
+    cm_channel = rdma_create_event_channel();
+    rdma_create_id(cm_channel, &cm_id, NULL, RDMA_PS_TCP);
+    status = RDMA_ID_CREATED;
+    ldout(cct, 20) << __func__ << " successfully created cm id: " << cm_id << dendl;
+  }
+}
+
+RDMAIWARPConnectedSocketImpl::~RDMAIWARPConnectedSocketImpl() {
+  ldout(cct, 20) << __func__ << " destruct." << dendl;
+  std::unique_lock l(close_mtx);
+  close_condition.wait(l, [&] { return closed; });
+  if (status >= RDMA_ID_CREATED) {
+    rdma_destroy_id(cm_id);
+    rdma_destroy_event_channel(cm_channel);
+  }
+}
+
+int RDMAIWARPConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
+  worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
+  status = CHANNEL_FD_CREATED;
+  if (rdma_resolve_addr(cm_id, NULL, const_cast<struct sockaddr*>(peer_addr.get_sockaddr()), TIMEOUT_MS)) {
+    lderr(cct) << __func__ << " failed to resolve addr" << dendl;
+    return -1;
+  }
+  return 0;
+}
+
+void RDMAIWARPConnectedSocketImpl::close() {
+  error = ECONNRESET;
+  active = false;
+  if (status >= CONNECTED)
+    rdma_disconnect(cm_id);
+  close_notify();
+}
+
+void RDMAIWARPConnectedSocketImpl::shutdown() {
+  error = ECONNRESET;
+  active = false;
+}
+
+void RDMAIWARPConnectedSocketImpl::handle_cm_connection() {
+  struct rdma_cm_event *event;
+  rdma_get_cm_event(cm_channel, &event);
+  ldout(cct, 20) << __func__ << " event name: " << rdma_event_str(event->event)
+                             << " (cm id: " << cm_id << ")" << dendl;
+  struct rdma_conn_param cm_params;
+  switch (event->event) {
+    case RDMA_CM_EVENT_ADDR_RESOLVED:
+      status = ADDR_RESOLVED;
+      if (rdma_resolve_route(cm_id, TIMEOUT_MS)) {
+        lderr(cct) << __func__ << " failed to resolve rdma addr" << dendl;
+        notify();
+      }
+      break;
+
+    case RDMA_CM_EVENT_ROUTE_RESOLVED:
+      status = ROUTE_RESOLVED;
+      if (alloc_resource()) {
+        lderr(cct) << __func__ << " failed to alloc resource while resolving the route" << dendl;
+        connected = -ECONNREFUSED;
+        notify();
+        break;
+      }
+      local_qpn = qp->get_local_qp_number();
+      my_msg.qpn = local_qpn;
+
+      memset(&cm_params, 0, sizeof(cm_params));
+      cm_params.retry_count = RETRY_COUNT;
+      cm_params.qp_num = local_qpn;
+      if (rdma_connect(cm_id, &cm_params)) {
+        lderr(cct) << __func__ << " failed to connect remote rdma port" << dendl;
+        connected = -ECONNREFUSED;
+        notify();
+      }
+      break;
+
+    case RDMA_CM_EVENT_ESTABLISHED:
+      status = CONNECTED;
+      if (!is_server) {
+        remote_qpn = event->param.conn.qp_num;
+        activate();
+        notify();
+      }
+      break;
+
+    case RDMA_CM_EVENT_ADDR_ERROR:
+    case RDMA_CM_EVENT_ROUTE_ERROR:
+    case RDMA_CM_EVENT_CONNECT_ERROR:
+    case RDMA_CM_EVENT_UNREACHABLE:
+    case RDMA_CM_EVENT_REJECTED:
+      lderr(cct) << __func__ << " rdma connection rejected" << dendl;
+      connected = -ECONNREFUSED;
+      notify();
+      break;
+
+    case RDMA_CM_EVENT_DISCONNECTED:
+      status = DISCONNECTED;
+      close_notify();
+      if (!error) {
+        error = ECONNRESET;
+        notify();
+      }
+      break;
+
+    case RDMA_CM_EVENT_DEVICE_REMOVAL:
+      break;
+
+    default:
+      assert(0 == "unhandled event");
+      break;
+  }
+  rdma_ack_cm_event(event);
+}
+
+void RDMAIWARPConnectedSocketImpl::activate() {
+  ldout(cct, 30) << __func__ << dendl;
+  active = true;
+  connected = 1;
+}
+
+int RDMAIWARPConnectedSocketImpl::alloc_resource() {
+  ldout(cct, 30) << __func__ << dendl;
+  qp = infiniband->create_queue_pair(cct, dispatcher->get_tx_cq(),
+      dispatcher->get_rx_cq(), IBV_QPT_RC, cm_id);
+  if (!qp) {
+    return -1;
+  }
+  dispatcher->register_qp(qp, this);
+  dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
+  dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
+  return 0;
+}
+
+void RDMAIWARPConnectedSocketImpl::close_notify() {
+  ldout(cct, 30) << __func__ << dendl;
+  if (status >= CHANNEL_FD_CREATED) {
+    worker->center.delete_file_event(cm_channel->fd, EVENT_READABLE);
+  }
+  std::unique_lock l(close_mtx);
+  if (!closed) {
+    closed = true;
+    close_condition.notify_all();
+  }
+}
diff --git a/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc b/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc
new file mode 100644 (file)
index 0000000..cf7c514
--- /dev/null
@@ -0,0 +1,104 @@
+#include <poll.h>
+
+#include "msg/async/net_handler.h"
+#include "RDMAStack.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << " RDMAIWARPServerSocketImpl "
+
+RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl(CephContext *cct, Infiniband* i,
+    RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
+  : RDMAServerSocketImpl(cct, i, s, w, a)
+{
+}
+
+int RDMAIWARPServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
+{
+  ldout(cct, 20) << __func__ << " bind to rdma point" << dendl;
+  cm_channel = rdma_create_event_channel();
+  rdma_create_id(cm_channel, &cm_id, NULL, RDMA_PS_TCP);
+  ldout(cct, 20) << __func__ << " successfully created cm id: " << cm_id << dendl;
+  int rc = rdma_bind_addr(cm_id, const_cast<struct sockaddr*>(sa.get_sockaddr()));
+  if (rc < 0) {
+    rc = -errno;
+    ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
+                   << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;
+    goto err;
+  }
+  rc = rdma_listen(cm_id, 128);
+  if (rc < 0) {
+    rc = -errno;
+    ldout(cct, 10) << __func__ << " unable to listen to " << sa.get_sockaddr()
+                   << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;
+    goto err;
+  }
+  server_setup_socket = cm_channel->fd;
+  ldout(cct, 20) << __func__ << " fd of cm_channel is " << server_setup_socket << dendl;
+  return 0;
+
+err:
+  server_setup_socket = -1;
+  rdma_destroy_id(cm_id);
+  rdma_destroy_event_channel(cm_channel);
+  return rc;
+}
+
+int RDMAIWARPServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt,
+    entity_addr_t *out, Worker *w)
+{
+  ldout(cct, 15) << __func__ << dendl;
+
+  assert(sock);
+  struct pollfd pfd = {
+    .fd = cm_channel->fd,
+    .events = POLLIN,
+  };
+  int ret = poll(&pfd, 1, 0);
+  assert(ret >= 0);
+  if (!ret)
+    return -EAGAIN;
+
+  struct rdma_cm_event *cm_event;
+  rdma_get_cm_event(cm_channel, &cm_event);
+  ldout(cct, 20) << __func__ << " event name: " << rdma_event_str(cm_event->event) << dendl;
+
+  struct rdma_cm_id *event_cm_id = cm_event->id;
+  struct rdma_event_channel *event_channel = rdma_create_event_channel();
+
+  rdma_migrate_id(event_cm_id, event_channel);
+
+  struct rdma_cm_id *new_cm_id = event_cm_id;
+  struct rdma_conn_param *remote_conn_param = &cm_event->param.conn;
+  struct rdma_conn_param local_conn_param;
+
+  RDMACMInfo info(new_cm_id, event_channel, remote_conn_param->qp_num);
+  RDMAIWARPConnectedSocketImpl* server =
+    new RDMAIWARPConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w), &info);
+
+  memset(&local_conn_param, 0, sizeof(local_conn_param));
+  local_conn_param.qp_num = server->get_local_qpn();
+
+  if (rdma_accept(new_cm_id, &local_conn_param)) {
+    return -EAGAIN;
+  }
+  server->activate();
+  ldout(cct, 20) << __func__ << " accepted a new QP" << dendl;
+
+  rdma_ack_cm_event(cm_event);
+
+  std::unique_ptr<RDMAConnectedSocketImpl> csi(server);
+  *sock = ConnectedSocket(std::move(csi));
+  struct sockaddr *addr = &new_cm_id->route.addr.dst_addr;
+  out->set_sockaddr(addr);
+
+  return 0;
+}
+
+void RDMAIWARPServerSocketImpl::abort_accept()
+{
+  if (server_setup_socket >= 0) {
+    rdma_destroy_id(cm_id);
+    rdma_destroy_event_channel(cm_channel);
+  }
+}
index 6e473d12ea7c9379d63ce71bae9d6d46fc353941..df1dfda94154c2a78d0afed26ae47bda66d611a9 100644 (file)
@@ -77,6 +77,7 @@ int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt
   ldout(cct, 15) << __func__ << dendl;
 
   assert(sock);
+
   sockaddr_storage ss;
   socklen_t slen = sizeof(ss);
   int sd = ::accept(server_setup_socket, (sockaddr*)&ss, &slen);
index 0790cdb12eed315371384aa3728b9eb36a9db5a3..5523f2289e422d77f37f484920b44667f1e63392 100644 (file)
@@ -145,7 +145,8 @@ void RDMADispatcher::handle_async_event()
       } else {
         ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
         conn->fault();
-        erase_qpn_lockless(qpn);
+        if (!cct->_conf->ms_async_rdma_cm)
+          erase_qpn_lockless(qpn);
       }
     } else {
       ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << get_stack()->get_infiniband().get_device()->ctxt
@@ -320,15 +321,12 @@ void RDMADispatcher::notify_pending_workers() {
   }
 }
 
-int RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
+void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
 {
-  int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
-  assert(fd >= 0);
   Mutex::Locker l(lock);
   assert(!qp_conns.count(qp->get_local_qp_number()));
   qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
   ++num_qp_conn;
-  return fd;
 }
 
 RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
@@ -492,8 +490,12 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket
 {
   get_stack()->get_infiniband().init();
   dispatcher->polling_start();
-
-  auto p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa);
+  RDMAServerSocketImpl *p;
+  if (cct->_conf->ms_async_rdma_type == "iwarp") {
+    p = new RDMAIWARPServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa);
+  } else {
+    p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa);
+  }
   int r = p->listen(sa, opt);
   if (r < 0) {
     delete p;
@@ -509,7 +511,12 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co
   get_stack()->get_infiniband().init();
   dispatcher->polling_start();
 
-  RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this);
+  RDMAConnectedSocketImpl* p;
+  if (cct->_conf->ms_async_rdma_type == "iwarp") {
+    p = new RDMAIWARPConnectedSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this);
+  } else {
+    p = new RDMAConnectedSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this);
+  }
   int r = p->try_connect(addr, opts);
 
   if (r < 0) {
index cc4357663e4832d885c35660d15c814d454eb704..f2965b22e32b0da91e42c4ab2ba348eabfca1f62 100644 (file)
@@ -97,7 +97,7 @@ class RDMADispatcher {
   void polling_start();
   void polling_stop();
   void polling();
-  int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
+  void register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
   void make_pending_worker(RDMAWorker* w) {
     Mutex::Locker l(w_lock);
     auto it = std::find(pending_workers.begin(), pending_workers.end(), w);
@@ -164,13 +164,21 @@ class RDMAWorker : public Worker {
   }
 };
 
+struct RDMACMInfo {
+  RDMACMInfo(rdma_cm_id *cid, rdma_event_channel *cm_channel_, uint32_t qp_num_)
+    : cm_id(cid), cm_channel(cm_channel_), qp_num(qp_num_) {}
+  rdma_cm_id *cm_id;
+  rdma_event_channel *cm_channel;
+  uint32_t qp_num;
+};
+
 class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
  public:
   typedef Infiniband::MemoryManager::Chunk Chunk;
   typedef Infiniband::CompletionChannel CompletionChannel;
   typedef Infiniband::CompletionQueue CompletionQueue;
 
- private:
+ protected:
   CephContext *cct;
   Infiniband::QueuePair *qp;
   IBSYNMsg peer_msg;
@@ -219,9 +227,10 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
   void handle_connection();
   void cleanup();
   void set_accept_fd(int sd);
-  int try_connect(const entity_addr_t&, const SocketOptions &opt);
+  virtual int try_connect(const entity_addr_t&, const SocketOptions &opt);
   bool is_pending() {return pending;}
   void set_pending(bool val) {pending = val;}
+
   class C_handle_connection : public EventCallback {
     RDMAConnectedSocketImpl *csi;
     bool active;
@@ -237,25 +246,87 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
   };
 };
 
+enum RDMA_CM_STATUS {
+  IDLE = 1,
+  RDMA_ID_CREATED,
+  CHANNEL_FD_CREATED,
+  RESOURCE_ALLOCATED,
+  ADDR_RESOLVED,
+  ROUTE_RESOLVED,
+  CONNECTED,
+  DISCONNECTED,
+  ERROR
+};
+
+class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl {
+  public:
+    RDMAIWARPConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+                          RDMAWorker *w, RDMACMInfo *info = nullptr);
+    ~RDMAIWARPConnectedSocketImpl();
+    virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
+    virtual void close() override;
+    virtual void shutdown() override;
+    virtual void handle_cm_connection();
+    uint32_t get_local_qpn() const { return local_qpn; }
+    void activate();
+    int alloc_resource();
+    void close_notify();
+
+  private:
+    rdma_cm_id *cm_id;
+    rdma_event_channel *cm_channel;
+    uint32_t local_qpn;
+    uint32_t remote_qpn;
+    EventCallbackRef cm_con_handler;
+    bool is_server;
+    std::mutex close_mtx;
+    std::condition_variable close_condition;
+    bool closed;
+    RDMA_CM_STATUS status;
+
+
+  class C_handle_cm_connection : public EventCallback {
+    RDMAIWARPConnectedSocketImpl *csi;
+    public:
+      C_handle_cm_connection(RDMAIWARPConnectedSocketImpl *w): csi(w) {}
+      void do_request(uint64_t fd) {
+        csi->handle_cm_connection();
+      }
+  };
+};
+
 class RDMAServerSocketImpl : public ServerSocketImpl {
-  CephContext *cct;
-  NetHandler net;
-  int server_setup_socket;
-  Infiniband* infiniband;
-  RDMADispatcher *dispatcher;
-  RDMAWorker *worker;
-  entity_addr_t sa;
+  protected:
+    CephContext *cct;
+    NetHandler net;
+    int server_setup_socket;
+    Infiniband* infiniband;
+    RDMADispatcher *dispatcher;
+    RDMAWorker *worker;
+    entity_addr_t sa;
 
  public:
-  RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
+  RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s,
+      RDMAWorker *w, entity_addr_t& a);
 
-  int listen(entity_addr_t &sa, const SocketOptions &opt);
+  virtual int listen(entity_addr_t &sa, const SocketOptions &opt);
   virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
   virtual void abort_accept() override;
   virtual int fd() const override { return server_setup_socket; }
   int get_fd() { return server_setup_socket; }
 };
 
+class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl {
+  public:
+    RDMAIWARPServerSocketImpl(CephContext *cct, Infiniband *i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
+    virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override;
+    virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
+    virtual void abort_accept() override;
+  private:
+    rdma_cm_id *cm_id;
+    rdma_event_channel *cm_channel;
+};
+
 class RDMAStack : public NetworkStack {
   vector<std::thread> threads;
   PerfCounters *perf_counter;