]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Revert "msg/async/rdma: Move resource handling to Device"
authorAmir Vadai <amir@vadai.me>
Tue, 23 May 2017 07:33:51 +0000 (10:33 +0300)
committerAmir Vadai <amir@vadai.me>
Tue, 23 May 2017 14:04:28 +0000 (17:04 +0300)
This reverts commit 9cba3b36ef90a1f3d5d474f0fefe48e9c7ec6bad.

Change-Id: I5953a9380ecd5a07e8ea107cf1fc35d0772e8f61
Issue: 995322
Signed-off-by: Amir Vadai <amir@vadai.me>
src/msg/async/rdma/Device.cc
src/msg/async/rdma/Device.h
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index c629f2c9edfb6ada97d6137af5237b2931b8357d..41b2d898232767a0cd6f05e897beb0fd0f29f7aa 100644 (file)
  */
 
 #include "Infiniband.h"
-#include "RDMAStack.h"
 #include "Device.h"
 #include "common/errno.h"
 #include "common/debug.h"
 
-#include <poll.h>
-
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
 #define dout_prefix *_dout << "IBDevice "
 
-static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;
-static const uint32_t CQ_DEPTH = 30000;
-
 Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr)
 {
 #ifdef HAVE_IBV_EXP
@@ -108,9 +102,7 @@ Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt
 }
 
 
-Device::Device(CephContext *cct, ibv_device* d)
-  : cct(cct), device(d), lock("ibdev_lock"),
-    device_attr(new ibv_device_attr), active_port(nullptr)
+Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr)
 {
   if (device == NULL) {
     lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl;
@@ -127,78 +119,10 @@ Device::Device(CephContext *cct, ibv_device* d)
     lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
     ceph_abort();
   }
-
-  tx_cc = create_comp_channel(cct);
-  assert(tx_cc);
-
-  rx_cc = create_comp_channel(cct);
-  assert(rx_cc);
-
-  assert(NetHandler(cct).set_nonblock(ctxt->async_fd) == 0);
-}
-
-void Device::init()
-{
-  Mutex::Locker l(lock);
-
-  if (initialized)
-    return;
-
-  pd = new ProtectionDomain(cct, this);
-
-  max_recv_wr = std::min(device_attr->max_srq_wr, (int)cct->_conf->ms_async_rdma_receive_buffers);
-  ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl;
-
-  max_send_wr = std::min(device_attr->max_qp_wr, (int)cct->_conf->ms_async_rdma_send_buffers);
-  ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers"  << dendl;
-
-  ldout(cct, 1) << __func__ << " device allow " << device_attr->max_cqe
-                << " completion entries" << dendl;
-
-  memory_manager = new MemoryManager(this, pd,
-                                     cct->_conf->ms_async_rdma_enable_hugepage);
-  memory_manager->register_rx_tx(
-      cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr);
-
-  srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);
-  post_channel_cluster();
-
-  tx_cq = create_comp_queue(cct, tx_cc);
-  assert(tx_cq);
-
-  rx_cq = create_comp_queue(cct, rx_cc);
-  assert(rx_cq);
-
-  initialized = true;
-  ldout(cct, 5) << __func__ << ":" << __LINE__ << " device " << *this << " is initialized" << dendl;
-}
-
-void Device::uninit()
-{
-  Mutex::Locker l(lock);
-
-  if (!initialized)
-    return;
-
-  tx_cc->ack_events();
-  rx_cc->ack_events();
-
-  initialized = false;
-
-  delete rx_cq;
-  delete tx_cq;
-  delete rx_cc;
-  delete tx_cc;
-
-  assert(ibv_destroy_srq(srq) == 0);
-  delete memory_manager;
-  delete pd;
 }
 
 Device::~Device()
 {
-  uninit();
-
   if (active_port) {
     delete active_port;
     assert(ibv_close_device(ctxt) == 0);
@@ -224,141 +148,9 @@ void Device::binding_port(CephContext *cct, int port_num) {
   }
 }
 
-/**
- * Create a new QueuePair. This factory should be used in preference to
- * the QueuePair constructor directly, since this lets derivatives of
- * Infiniband, e.g. MockInfiniband (if it existed),
- * return mocked out QueuePair derivatives.
- *
- * \return
- *      QueuePair on success or NULL if init fails
- * See QueuePair::QueuePair for parameter documentation.
- */
-Infiniband::QueuePair* Device::create_queue_pair(CephContext *cct,
-                                                ibv_qp_type type)
-{
-  Infiniband::QueuePair *qp = new QueuePair(
-      cct, *this, type, active_port->get_port_num(), srq, tx_cq, rx_cq, max_send_wr, max_recv_wr);
-  if (qp->init()) {
-    delete qp;
-    return NULL;
-  }
-  return qp;
-}
-
-/**
- * Create a shared receive queue. This basically wraps the verbs call.
- *
- * \param[in] max_wr
- *      The max number of outstanding work requests in the SRQ.
- * \param[in] max_sge
- *      The max number of scatter elements per WR.
- * \return
- *      A valid ibv_srq pointer, or NULL on error.
- */
-ibv_srq* Device::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)
-{
-  ibv_srq_init_attr sia;
-  memset(&sia, 0, sizeof(sia));
-  sia.srq_context = ctxt;
-  sia.attr.max_wr = max_wr;
-  sia.attr.max_sge = max_sge;
-  return ibv_create_srq(pd->pd, &sia);
-}
-
-Infiniband::CompletionChannel* Device::create_comp_channel(CephContext *c)
-{
-  Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);
-  if (cc->init()) {
-    delete cc;
-    return NULL;
-  }
-  return cc;
-}
-
-Infiniband::CompletionQueue* Device::create_comp_queue(
-    CephContext *cct, CompletionChannel *cc)
-{
-  Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(
-      cct, *this, CQ_DEPTH, cc);
-  if (cq->init()) {
-    delete cq;
-    return NULL;
-  }
-  return cq;
-}
-
-int Device::post_chunk(Chunk* chunk)
-{
-  ibv_sge isge;
-  isge.addr = reinterpret_cast<uint64_t>(chunk->buffer);
-  isge.length = chunk->bytes;
-  isge.lkey = chunk->mr->lkey;
-  ibv_recv_wr rx_work_request;
-
-  memset(&rx_work_request, 0, sizeof(rx_work_request));
-  rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
-  rx_work_request.next = NULL;
-  rx_work_request.sg_list = &isge;
-  rx_work_request.num_sge = 1;
-
-  ibv_recv_wr *badWorkRequest;
-  int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);
-  if (ret)
-    return -errno;
-  return 0;
-}
-
-int Device::post_channel_cluster()
-{
-  vector<Chunk*> free_chunks;
-  int r = memory_manager->get_channel_buffers(free_chunks, 0);
-  assert(r > 0);
-  for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {
-    r = post_chunk(*iter);
-    assert(r == 0);
-  }
-  return 0;
-}
-
-int Device::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
-{
-  return memory_manager->get_send_buffers(c, bytes);
-}
-
-int Device::poll_tx_cq(int n, ibv_wc *wc)
-{
-  if (!initialized)
-    return 0;
-
-  return tx_cq->poll_cq(n, wc);
-}
-
-int Device::poll_rx_cq(int n, ibv_wc *wc)
-{
-  if (!initialized)
-    return 0;
-
-  return rx_cq->poll_cq(n, wc);
-}
-
-void Device::rearm_cqs()
-{
-  int ret;
-
-  if (!initialized)
-    return;
-
-  ret = tx_cq->rearm_notify();
-  assert(!ret);
-
-  ret = rx_cq->rearm_notify();
-  assert(!ret);
-}
-
 
 DeviceList::DeviceList(CephContext *cct)
-  : cct(cct), device_list(ibv_get_device_list(&num))
+  : device_list(ibv_get_device_list(&num))
 {
   if (device_list == NULL || num == 0) {
     lderr(cct) << __func__ << " failed to get rdma device list.  " << cpp_strerror(errno) << dendl;
@@ -366,29 +158,13 @@ DeviceList::DeviceList(CephContext *cct)
   }
   devices = new Device*[num];
 
-  poll_fds = new struct pollfd[2 * num];
-
-  for (int i = 0; i < num; ++i) {
-    struct pollfd *pfd = &poll_fds[i * 2];
-    struct Device *d;
-
-    d = new Device(cct, device_list[i]);
-    devices[i] = d;
-
-    pfd[0].fd = d->tx_cc->get_fd();
-    pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
-    pfd[0].revents = 0;
-
-    pfd[1].fd = d->rx_cc->get_fd();
-    pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
-    pfd[1].revents = 0;
+  for (int i = 0;i < num; ++i) {
+    devices[i] = new Device(cct, device_list[i]);
   }
 }
 
 DeviceList::~DeviceList()
 {
-  delete poll_fds;
-
   for (int i=0; i < num; ++i) {
     delete devices[i];
   }
@@ -406,67 +182,3 @@ Device* DeviceList::get_device(const char* device_name)
   }
   return NULL;
 }
-
-int DeviceList::poll_tx(int num_entries, Device **d, ibv_wc *wc)
-{
-  int n = 0;
-
-  for (int i = 0; i < num; i++) {
-    *d = devices[++last_poll_dev % num];
-
-    n = (*d)->poll_tx_cq(num_entries, wc);
-    if (n)
-      break;
-  }
-
-  return n;
-}
-
-int DeviceList::poll_rx(int num_entries, Device **d, ibv_wc *wc)
-{
-  int n = 0;
-
-  for (int i = 0; i < num; i++) {
-    *d = devices[++last_poll_dev % num];
-
-    n = (*d)->poll_rx_cq(num_entries, wc);
-    if (n)
-      break;
-  }
-
-  return n;
-}
-
-int DeviceList::poll_blocking(bool &done)
-{
-  int r = 0;
-  while (!done && r == 0) {
-    r = poll(poll_fds, num * 2, 100);
-    if (r < 0) {
-      r = -errno;
-      lderr(cct) << __func__ << " poll failed " << r << dendl;
-      ceph_abort();
-    }
-  }
-
-  if (r <= 0)
-    return r;
-
-  for (int i = 0; i < num ; i++) {
-    Device *d = devices[i];
-
-    if (d->tx_cc->get_cq_event())
-      ldout(cct, 20) << __func__ << " " << *d << ": got tx cq event" << dendl;
-
-    if (d->rx_cc->get_cq_event())
-      ldout(cct, 20) << __func__ << " " << *d << ": got rx cq event" << dendl;
-  }
-
-  return r;
-}
-
-void DeviceList::rearm_notify()
-{
-  for (int i = 0; i < num; i++)
-    devices[i]->rearm_cqs();
-}
index a19640c2baf76591c4d0f62bda2c86b657b278b6..9bd51a4af1cdae20301945042e7f16dd11e66166 100644 (file)
 #include "msg/async/net_handler.h"
 #include "common/Mutex.h"
 
-typedef Infiniband::QueuePair QueuePair;
-typedef Infiniband::CompletionChannel CompletionChannel;
-typedef Infiniband::CompletionQueue CompletionQueue;
-typedef Infiniband::ProtectionDomain ProtectionDomain;
-typedef Infiniband::MemoryManager::Cluster Cluster;
-typedef Infiniband::MemoryManager::Chunk Chunk;
-typedef Infiniband::MemoryManager MemoryManager;
-
 class Port {
   struct ibv_context* ctxt;
   int port_num;
@@ -57,85 +49,33 @@ class Port {
 
 
 class Device {
-  CephContext *cct;
   ibv_device *device;
-  const char *name;
+  const charname;
   uint8_t  port_cnt;
-
-  uint32_t max_send_wr;
-  uint32_t max_recv_wr;
-  uint32_t max_sge;
-
-  Mutex lock; // Protects from concurrent intialization of the device
-  bool initialized = false;
-
  public:
   explicit Device(CephContext *c, ibv_device* d);
   ~Device();
 
-  void init();
-  void uninit();
-
-  const char* get_name() const { return name;}
+  const char* get_name() { return name;}
   uint16_t get_lid() { return active_port->get_lid(); }
   ibv_gid get_gid() { return active_port->get_gid(); }
   int get_gid_idx() { return active_port->get_gid_idx(); }
   void binding_port(CephContext *c, int port_num);
-
-  QueuePair* create_queue_pair(CephContext *c, ibv_qp_type type);
-  ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
-  CompletionChannel *create_comp_channel(CephContext *c);
-  CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
-  int post_chunk(Chunk* chunk);
-  int post_channel_cluster();
-
-  MemoryManager* get_memory_manager() { return memory_manager; }
-  bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);}
-  bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);}
-  Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
-  int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes);
-  int poll_tx_cq(int n, ibv_wc *wc);
-  int poll_rx_cq(int n, ibv_wc *wc);
-  void rearm_cqs();
-
   struct ibv_context *ctxt;
   ibv_device_attr *device_attr;
   Port* active_port;
-
-  MemoryManager* memory_manager = nullptr;
-  ibv_srq *srq = nullptr;
-  Infiniband::CompletionQueue *rx_cq = nullptr;
-  Infiniband::CompletionChannel *rx_cc = nullptr;
-  Infiniband::CompletionQueue *tx_cq = nullptr;
-  Infiniband::CompletionChannel *tx_cc = nullptr;
-  ProtectionDomain *pd = nullptr;
 };
 
-inline ostream& operator<<(ostream& out, const Device &d)
-{
-    return out << d.get_name();
-}
-
 
 class DeviceList {
-  CephContext *cct;
   struct ibv_device ** device_list;
   int num;
   Device** devices;
-
-  int last_poll_dev;
-  struct pollfd *poll_fds;
-
  public:
   DeviceList(CephContext *cct);
   ~DeviceList();
 
   Device* get_device(const char* device_name);
-
-  void rearm_notify();
-  int poll_tx(int n, Device **d, ibv_wc *wc);
-  int poll_rx(int n, Device **d, ibv_wc *wc);
-  int poll_blocking(bool &done);
 };
 
 #endif
index 66b669b45d6fec63c2cb4fa169a4b41f1c5a8c97..83b75b2b08b59e755a317b58e36d03f8b42a84fd 100644 (file)
@@ -15,7 +15,6 @@
  */
 
 #include "Infiniband.h"
-#include "RDMAStack.h"
 #include "Device.h"
 
 #include "common/errno.h"
 #undef dout_prefix
 #define dout_prefix *_dout << "Infiniband "
 
+static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;
 static const uint32_t MAX_INLINE_DATA = 0;
 static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
+static const uint32_t CQ_DEPTH = 30000;
 
 Infiniband::QueuePair::QueuePair(
-    CephContext *c, Device &device, ibv_qp_type type,
+    CephContext *c, Infiniband& infiniband, ibv_qp_type type,
     int port, ibv_srq *srq,
     Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
     uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)
-: cct(c), ibdev(device),
+: cct(c), infiniband(infiniband),
   type(type),
-  ctxt(ibdev.ctxt),
+  ctxt(infiniband.device->ctxt),
   ib_physical_port(port),
-  pd(ibdev.pd->pd),
+  pd(infiniband.pd->pd),
   srq(srq),
   qp(NULL),
   txcq(txcq),
@@ -53,6 +54,7 @@ Infiniband::QueuePair::QueuePair(
     lderr(cct) << __func__ << " invalid queue pair type" << cpp_strerror(errno) << dendl;
     ceph_abort();
   }
+  pd = infiniband.pd->pd;
 }
 
 int Infiniband::QueuePair::init()
@@ -227,8 +229,8 @@ bool Infiniband::QueuePair::is_error() const
 }
 
 
-Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Device &ibdev)
-  : cct(c), ibdev(ibdev), channel(NULL), cq(NULL), cq_events_that_need_ack(0)
+Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib)
+  : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0)
 {
 }
 
@@ -245,7 +247,7 @@ Infiniband::CompletionChannel::~CompletionChannel()
 int Infiniband::CompletionChannel::init()
 {
   ldout(cct, 20) << __func__ << " started." << dendl;
-  channel = ibv_create_comp_channel(ibdev.ctxt);
+  channel = ibv_create_comp_channel(infiniband.device->ctxt);
   if (!channel) {
     lderr(cct) << __func__ << " failed to create receive completion channel: "
                           << cpp_strerror(errno) << dendl;
@@ -301,7 +303,7 @@ Infiniband::CompletionQueue::~CompletionQueue()
 
 int Infiniband::CompletionQueue::init()
 {
-  cq = ibv_create_cq(ibdev.ctxt, queue_depth, this, channel->get_channel(), 0);
+  cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);
   if (!cq) {
     lderr(cct) << __func__ << " failed to create receive completion queue: "
       << cpp_strerror(errno) << dendl;
@@ -439,7 +441,7 @@ void Infiniband::MemoryManager::Chunk::clear()
 
 void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib)
 {
-  ib->device->post_chunk(this);
+  ib->post_chunk(this);
 }
 
 Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
@@ -594,27 +596,147 @@ Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t
   : device_list(new DeviceList(cct))
 {
   device = device_list->get_device(device_name.c_str());
-
-  device->init();
-
   device->binding_port(cct, port_num);
   assert(device);
   ib_physical_port = device->active_port->get_port_num();
+  pd = new ProtectionDomain(cct, device);
+  assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
+
+  max_recv_wr = device->device_attr->max_srq_wr;
+  if (max_recv_wr > cct->_conf->ms_async_rdma_receive_buffers) {
+    max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers;
+    ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl;
+  } else {
+    ldout(cct, 1) << __func__ << " using the max allowed receive buffers: " << max_recv_wr << dendl;
+  }
+
+  max_send_wr = device->device_attr->max_qp_wr;
+  if (max_send_wr > cct->_conf->ms_async_rdma_send_buffers) {
+    max_send_wr = cct->_conf->ms_async_rdma_send_buffers;
+    ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers"  << dendl;
+  } else {
+    ldout(cct, 1) << __func__ << " using the max allowed send buffers: " << max_send_wr << dendl;
+  }
+
+  ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe
+                << " completion entries" << dendl;
+
+  memory_manager = new MemoryManager(device, pd,
+                                     cct->_conf->ms_async_rdma_enable_hugepage);
+  memory_manager->register_rx_tx(
+      cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr);
+
+  srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);
+  post_channel_cluster();
 }
 
 Infiniband::~Infiniband()
 {
-  if (dispatcher)
-    dispatcher->polling_stop();
-
+  assert(ibv_destroy_srq(srq) == 0);
+  delete memory_manager;
+  delete pd;
   delete device_list;
 }
 
-void Infiniband::set_dispatcher(RDMADispatcher *d)
+/**
+ * Create a shared receive queue. This basically wraps the verbs call. 
+ *
+ * \param[in] max_wr
+ *      The max number of outstanding work requests in the SRQ.
+ * \param[in] max_sge
+ *      The max number of scatter elements per WR.
+ * \return
+ *      A valid ibv_srq pointer, or NULL on error.
+ */
+ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)
 {
-  assert(!d ^ !dispatcher);
+  ibv_srq_init_attr sia;
+  memset(&sia, 0, sizeof(sia));
+  sia.srq_context = device->ctxt;
+  sia.attr.max_wr = max_wr;
+  sia.attr.max_sge = max_sge;
+  return ibv_create_srq(pd->pd, &sia);
+}
 
-  dispatcher = d;
+int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
+{
+  return memory_manager->get_send_buffers(c, bytes);
+}
+
+/**
+ * Create a new QueuePair. This factory should be used in preference to
+ * the QueuePair constructor directly, since this lets derivatives of
+ * Infiniband, e.g. MockInfiniband (if it existed),
+ * return mocked out QueuePair derivatives.
+ *
+ * \return
+ *      QueuePair on success or NULL if init fails
+ * See QueuePair::QueuePair for parameter documentation.
+ */
+Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)
+{
+  Infiniband::QueuePair *qp = new QueuePair(
+      cct, *this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr);
+  if (qp->init()) {
+    delete qp;
+    return NULL;
+  }
+  return qp;
+}
+
+int Infiniband::post_chunk(Chunk* chunk)
+{
+  ibv_sge isge;
+  isge.addr = reinterpret_cast<uint64_t>(chunk->buffer);
+  isge.length = chunk->bytes;
+  isge.lkey = chunk->mr->lkey;
+  ibv_recv_wr rx_work_request;
+
+  memset(&rx_work_request, 0, sizeof(rx_work_request));
+  rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
+  rx_work_request.next = NULL;
+  rx_work_request.sg_list = &isge;
+  rx_work_request.num_sge = 1;
+
+  ibv_recv_wr *badWorkRequest;
+  int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);
+  if (ret)
+    return -errno;
+  return 0;
+}
+
+int Infiniband::post_channel_cluster()
+{
+  vector<Chunk*> free_chunks;
+  int r = memory_manager->get_channel_buffers(free_chunks, 0);
+  assert(r > 0);
+  for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {
+    r = post_chunk(*iter);
+    assert(r == 0);
+  }
+  return 0;
+}
+
+Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)
+{
+  Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);
+  if (cc->init()) {
+    delete cc;
+    return NULL;
+  }
+  return cc;
+}
+
+Infiniband::CompletionQueue* Infiniband::create_comp_queue(
+    CephContext *cct, CompletionChannel *cc)
+{
+  Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(
+      cct, *this, CQ_DEPTH, cc);
+  if (cq->init()) {
+    delete cq;
+    return NULL;
+  }
+  return cq;
 }
 
 // 1 means no valid buffer read, 0 means got enough buffer
@@ -765,33 +887,3 @@ const char* Infiniband::qp_state_string(int status) {
     default: return " out of range.";
   }
 }
-
-void Infiniband::handle_pre_fork()
-{
-  device->uninit();
-}
-
-void Infiniband::handle_post_fork()
-{
-  device->init();
-}
-
-int Infiniband::poll_tx(int n, Device **d, ibv_wc *wc)
-{
-  return device_list->poll_tx(n, d, wc);
-}
-
-int Infiniband::poll_rx(int n, Device **d, ibv_wc *wc)
-{
-  return device_list->poll_rx(n, d, wc);
-}
-
-int Infiniband::poll_blocking(bool &done)
-{
-  return device_list->poll_blocking(done);
-}
-
-void Infiniband::rearm_notify()
-{
-  device_list->rearm_notify();
-}
index a3da3edd4c05aa75ed4ecf7c0e92fb42d1197348..875b72c3c7b169a08d773a96f2651a2daeed9d9b 100644 (file)
@@ -47,7 +47,6 @@ class CephContext;
 class Port;
 class Device;
 class DeviceList;
-class RDMADispatcher;
 
 class Infiniband {
  public:
@@ -142,11 +141,15 @@ class Infiniband {
   };
 
  private:
+  uint32_t max_send_wr;
+  uint32_t max_recv_wr;
+  uint32_t max_sge;
   uint8_t  ib_physical_port;
+  MemoryManager* memory_manager;
+  ibv_srq* srq;             // shared receive work queue
   Device *device;
+  ProtectionDomain *pd;
   DeviceList *device_list;
-  RDMADispatcher *dispatcher = nullptr;
-
   void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
   void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
 
@@ -154,18 +157,16 @@ class Infiniband {
   explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);
   ~Infiniband();
 
-  void set_dispatcher(RDMADispatcher *d);
-
   class CompletionChannel {
     static const uint32_t MAX_ACK_EVENT = 5000;
     CephContext *cct;
-    Device &ibdev;
+    Infiniband& infiniband;
     ibv_comp_channel *channel;
     ibv_cq *cq;
     uint32_t cq_events_that_need_ack;
 
    public:
-    CompletionChannel(CephContext *c, Device &ibdev);
+    CompletionChannel(CephContext *c, Infiniband &ib);
     ~CompletionChannel();
     int init();
     bool get_cq_event();
@@ -181,9 +182,9 @@ class Infiniband {
   // You need to call init and it will create a cq and associate to comp channel
   class CompletionQueue {
    public:
-    CompletionQueue(CephContext *c, Device &ibdev,
+    CompletionQueue(CephContext *c, Infiniband &ib,
                     const uint32_t qd, CompletionChannel *cc)
-      : cct(c), ibdev(ibdev), channel(cc), cq(NULL), queue_depth(qd) {}
+      : cct(c), infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {}
     ~CompletionQueue();
     int init();
     int poll_cq(int num_entries, ibv_wc *ret_wc_array);
@@ -193,7 +194,7 @@ class Infiniband {
     CompletionChannel* get_cc() const { return channel; }
    private:
     CephContext *cct;
-    Device &ibdev;
+    Infiniband&  infiniband;     // Infiniband to which this QP belongs
     CompletionChannel *channel;
     ibv_cq *cq;
     uint32_t queue_depth;
@@ -207,7 +208,7 @@ class Infiniband {
   // must call plumb() to bring the queue pair to the RTS state.
   class QueuePair {
    public:
-    QueuePair(CephContext *c, Device &device, ibv_qp_type type,
+    QueuePair(CephContext *c, Infiniband& infiniband, ibv_qp_type type,
               int ib_physical_port,  ibv_srq *srq,
               Infiniband::CompletionQueue* txcq,
               Infiniband::CompletionQueue* rxcq,
@@ -254,7 +255,7 @@ class Infiniband {
 
    private:
     CephContext  *cct;
-    Device       &ibdev;     // Infiniband to which this QP belongs
+    Infiniband&  infiniband;     // Infiniband to which this QP belongs
     ibv_qp_type  type;           // QP type (IBV_QPT_RC, etc.)
     ibv_context* ctxt;           // device context of the HCA to use
     int ib_physical_port;
@@ -273,20 +274,23 @@ class Infiniband {
  public:
   typedef MemoryManager::Cluster Cluster;
   typedef MemoryManager::Chunk Chunk;
+  QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type);
+  ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
+  int post_chunk(Chunk* chunk);
+  int post_channel_cluster();
+  int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes);
+  CompletionChannel *create_comp_channel(CephContext *c);
+  CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
   uint8_t get_ib_physical_port() { return ib_physical_port; }
   int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
   int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
+  MemoryManager* get_memory_manager() { return memory_manager; }
   Device* get_device() { return device; }
+  bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);}
+  bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);}
+  Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
   static const char* wc_status_to_string(int status);
   static const char* qp_state_string(int status);
-
-  void handle_pre_fork();
-  void handle_post_fork();
-
-  int poll_tx(int n, Device **d, ibv_wc *wc);
-  int poll_rx(int n, Device **d, ibv_wc *wc);
-  int poll_blocking(bool &done);
-  void rearm_notify();
 };
 
 #endif
index 154df978e92e65e38e6923dba7f46871b6620c4d..8df72d9f64216d55ca29374498315321fc102146 100644 (file)
@@ -31,7 +31,8 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* i
   ibdev = ib->get_device();
   ibport = ib->get_ib_physical_port();
 
-  qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
+  qp = infiniband->create_queue_pair(
+                                    cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC);
   my_msg.qpn = qp->get_local_qp_number();
   my_msg.psn = qp->get_initial_psn();
   my_msg.lid = ibdev->get_lid();
@@ -56,12 +57,12 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
   error = ECONNRESET;
   int ret = 0;
   for (unsigned i=0; i < wc.size(); ++i) {
-    ret = ibdev->post_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
+    ret = infiniband->post_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
     assert(ret == 0);
     dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
   }
   for (unsigned i=0; i < buffers.size(); ++i) {
-    ret = ibdev->post_chunk(buffers[i]);
+    ret = infiniband->post_chunk(buffers[i]);
     assert(ret == 0);
     dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
   }
@@ -283,7 +284,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
         error = ECONNRESET;
         ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
       }
-      assert(ibdev->post_chunk(chunk) == 0);
+      assert(infiniband->post_chunk(chunk) == 0);
       dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
     } else {
       if (read == (ssize_t)len) {
@@ -295,7 +296,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
         ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
       } else {
         read += chunk->read(buf+read, response->byte_len);
-        assert(ibdev->post_chunk(chunk) == 0);
+        assert(infiniband->post_chunk(chunk) == 0);
         dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
       }
     }
@@ -323,7 +324,7 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
     read += tmp;
     ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound()  << ". Chunk:" << *c  << dendl;
     if ((*c)->over()) {
-      assert(ibdev->post_chunk(*c) == 0);
+      assert(infiniband->post_chunk(*c) == 0);
       dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
       ldout(cct, 25) << __func__ << " one chunk over." << dendl;
     }
@@ -462,7 +463,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
   unsigned total = 0;
   unsigned need_reserve_bytes = 0;
   while (it != pending_bl.buffers().end()) {
-    if (ibdev->is_tx_buffer(it->raw_c_str())) {
+    if (infiniband->is_tx_buffer(it->raw_c_str())) {
       if (need_reserve_bytes) {
         unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
         total += copied;
@@ -471,7 +472,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
         need_reserve_bytes = 0;
       }
       assert(copy_it == it);
-      tx_buffers.push_back(ibdev->get_tx_chunk_by_buffer(it->raw_c_str()));
+      tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str()));
       total += it->length();
       ++copy_it;
     } else {
index 40d9be96b60cd4992c7750312462c469040a7195..3c4bad0c4d09fc86ff7f5a5de1307693dcdfb16a 100644 (file)
@@ -14,6 +14,7 @@
  *
  */
 
+#include <poll.h>
 #include <sys/time.h>
 #include <sys/resource.h>
 
@@ -31,22 +32,37 @@ static Tub<Infiniband> global_infiniband;
 
 RDMADispatcher::~RDMADispatcher()
 {
-  polling_stop();
-
+  done = true;
+  t.join();
   ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
 
-  global_infiniband->set_dispatcher(nullptr);
-
   assert(qp_conns.empty());
   assert(num_qp_conn == 0);
   assert(dead_queue_pairs.empty());
   assert(num_dead_queue_pair == 0);
+
+  tx_cc->ack_events();
+  rx_cc->ack_events();
+  delete tx_cq;
+  delete rx_cq;
+  delete tx_cc;
+  delete rx_cc;
+  delete async_handler;
 }
 
 RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
   : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
   w_lock("RDMADispatcher::for worker pending list"), stack(s)
 {
+  tx_cc = global_infiniband->create_comp_channel(c);
+  assert(tx_cc);
+  rx_cc = global_infiniband->create_comp_channel(c);
+  assert(rx_cc);
+  tx_cq = global_infiniband->create_comp_queue(c, tx_cc);
+  assert(tx_cq);
+  rx_cq = global_infiniband->create_comp_queue(c, rx_cc);
+  assert(rx_cq);
+
   PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
 
   plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
@@ -74,21 +90,8 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
   perf_logger = plb.create_perf_counters();
   cct->get_perfcounters_collection()->add(perf_logger);
 
-  cct->register_fork_watcher(this);
-}
-
-void RDMADispatcher::polling_start()
-{
   t = std::thread(&RDMADispatcher::polling, this);
-}
-
-void RDMADispatcher::polling_stop()
-{
-  if (!t.joinable())
-    return;
-
-  done = true;
-  t.join();
+  cct->register_fork_watcher(this);
 }
 
 void RDMADispatcher::handle_async_event()
@@ -135,24 +138,23 @@ void RDMADispatcher::polling()
 
   std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
   std::vector<ibv_wc> tx_cqe;
+  ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
   RDMAConnectedSocketImpl *conn = nullptr;
   utime_t last_inactive = ceph_clock_now();
   bool rearmed = false;
   int r = 0;
 
   while (true) {
-    Device *ibdev;
-
-    int tx_ret = global_infiniband->poll_tx(MAX_COMPLETIONS, &ibdev, wc);
+    int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
     if (tx_ret > 0) {
       ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret
                      << " responses."<< dendl;
-      handle_tx_event(ibdev, wc, tx_ret);
+      handle_tx_event(wc, tx_ret);
     }
 
-    int rx_ret = global_infiniband->poll_rx(MAX_COMPLETIONS, &ibdev, wc);
+    int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
     if (rx_ret > 0) {
-      ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret
+      ldout(cct, 20) << __func__ << " rt completion queue got " << rx_ret
                      << " responses."<< dendl;
       perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
 
@@ -167,8 +169,8 @@ void RDMADispatcher::polling()
         if (response->status == IBV_WC_SUCCESS) {
           conn = get_conn_lockless(response->qp_num);
           if (!conn) {
-            assert(ibdev->is_rx_buffer(chunk->buffer));
-            r = ibdev->post_chunk(chunk);
+            assert(global_infiniband->is_rx_buffer(chunk->buffer));
+            r = global_infiniband->post_chunk(chunk);
             ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
             assert(r == 0);
           } else {
@@ -178,9 +180,9 @@ void RDMADispatcher::polling()
           perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
           ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
               << ") status(" << response->status << ":"
-              << Infiniband::wc_status_to_string(response->status) << ")" << dendl;
-          assert(ibdev->is_rx_buffer(chunk->buffer));
-          r = ibdev->post_chunk(chunk);
+              << global_infiniband->wc_status_to_string(response->status) << ")" << dendl;
+          assert(global_infiniband->is_rx_buffer(chunk->buffer));
+          r = global_infiniband->post_chunk(chunk);
           if (r) {
             ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl;
             assert(r == 0);
@@ -224,17 +226,33 @@ void RDMADispatcher::polling()
         if (!rearmed) {
           // Clean up cq events after rearm notify ensure no new incoming event
           // arrived between polling and rearm
-         global_infiniband->rearm_notify();
+          tx_cq->rearm_notify();
+          rx_cq->rearm_notify();
           rearmed = true;
           continue;
         }
 
+        struct pollfd channel_poll[2];
+        channel_poll[0].fd = tx_cc->get_fd();
+        channel_poll[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+        channel_poll[0].revents = 0;
+        channel_poll[1].fd = rx_cc->get_fd();
+        channel_poll[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+        channel_poll[1].revents = 0;
+        r = 0;
         perf_logger->set(l_msgr_rdma_polling, 0);
-
-       r = global_infiniband->poll_blocking(done);
-        if (r > 0)
-          ldout(cct, 20) << __func__ << " got a cq event." << dendl;
-
+        while (!done && r == 0) {
+          r = poll(channel_poll, 2, 100);
+          if (r < 0) {
+            r = -errno;
+            lderr(cct) << __func__ << " poll failed " << r << dendl;
+            ceph_abort();
+          }
+        }
+        if (r > 0 && tx_cc->get_cq_event())
+          ldout(cct, 20) << __func__ << " got tx cq event." << dendl;
+        if (r > 0 && rx_cc->get_cq_event())
+          ldout(cct, 20) << __func__ << " got rx cq event." << dendl;
         last_inactive = ceph_clock_now();
         perf_logger->set(l_msgr_rdma_polling, 1);
         rearmed = false;
@@ -299,28 +317,39 @@ void RDMADispatcher::erase_qpn(uint32_t qpn)
 
 void RDMADispatcher::handle_pre_fork()
 {
-  polling_stop();
+  done = true;
+  t.join();
   done = false;
 
-  global_infiniband->handle_pre_fork();
+  tx_cc->ack_events();
+  rx_cc->ack_events();
+  delete tx_cq;
+  delete rx_cq;
+  delete tx_cc;
+  delete rx_cc;
 
   global_infiniband.destroy();
 }
 
 void RDMADispatcher::handle_post_fork()
 {
-  if (!global_infiniband) {
+  if (!global_infiniband)
     global_infiniband.construct(
       cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
-    global_infiniband->set_dispatcher(this);
-  }
 
-  global_infiniband->handle_post_fork();
+  tx_cc = global_infiniband->create_comp_channel(cct);
+  assert(tx_cc);
+  rx_cc = global_infiniband->create_comp_channel(cct);
+  assert(rx_cc);
+  tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
+  assert(tx_cq);
+  rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
+  assert(rx_cq);
 
-  polling_start();
+  t = std::thread(&RDMADispatcher::polling, this);
 }
 
-void RDMADispatcher::handle_tx_event(Device *ibdev, ibv_wc *cqe, int n)
+void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
 {
   std::vector<Chunk*> tx_chunks;
 
@@ -359,14 +388,14 @@ void RDMADispatcher::handle_tx_event(Device *ibdev, ibv_wc *cqe, int n)
     }
 
     // FIXME: why not tx?
-    if (ibdev->get_memory_manager()->is_tx_buffer(chunk->buffer))
+    if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer))
       tx_chunks.push_back(chunk);
     else
       ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
   }
 
   perf_logger->inc(l_msgr_rdma_tx_total_wc, n);
-  post_tx_buffer(ibdev, tx_chunks);
+  post_tx_buffer(tx_chunks);
 }
 
 /**
@@ -377,13 +406,13 @@ void RDMADispatcher::handle_tx_event(Device *ibdev, ibv_wc *cqe, int n)
  * \return
  *      0 if success or -1 for failure
  */
-void RDMADispatcher::post_tx_buffer(Device *ibdev, std::vector<Chunk*> &chunks)
+void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
 {
   if (chunks.empty())
     return ;
 
   inflight -= chunks.size();
-  ibdev->get_memory_manager()->return_tx(chunks);
+  global_infiniband->get_memory_manager()->return_tx(chunks);
   ldout(cct, 30) << __func__ << " release " << chunks.size()
                  << " chunks, inflight " << inflight << dendl;
   notify_pending_workers();
@@ -455,12 +484,10 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co
 
 int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
 {
-  Device *ibdev = o->get_device();
-
   assert(center.in_thread());
-  int r = ibdev->get_tx_buffers(c, bytes);
+  int r = global_infiniband->get_tx_buffers(c, bytes);
   assert(r >= 0);
-  size_t got = ibdev->get_memory_manager()->get_tx_buffer_size() * r;
+  size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r;
   ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered  bytes, inflight " << dispatcher->inflight << dendl;
   stack->get_dispatcher()->inflight += r;
   if (got == bytes)
@@ -524,9 +551,6 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
       cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
   ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
   dispatcher = new RDMADispatcher(cct, this);
-  global_infiniband->set_dispatcher(dispatcher);
-  dispatcher->polling_start();
-
   unsigned num = get_num_worker();
   for (unsigned i = 0; i < num; ++i) {
     RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
index 733650dc9727f6a72f8f0e5b907a1c5e36cb0efb..154056cf407245284fe5018fdccedcb4c5c12611 100644 (file)
@@ -68,6 +68,9 @@ class RDMADispatcher : public CephContext::ForkWatcher {
 
   std::thread t;
   CephContext *cct;
+  Infiniband::CompletionQueue* tx_cq;
+  Infiniband::CompletionQueue* rx_cq;
+  Infiniband::CompletionChannel *tx_cc, *rx_cc;
   EventCallbackRef async_handler;
   bool done = false;
   std::atomic<uint64_t> num_dead_queue_pair = {0};
@@ -116,13 +119,8 @@ class RDMADispatcher : public CephContext::ForkWatcher {
 
   explicit RDMADispatcher(CephContext* c, RDMAStack* s);
   virtual ~RDMADispatcher();
-
   void handle_async_event();
-
-  void polling_start();
-  void polling_stop();
   void polling();
-
   int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
   void make_pending_worker(RDMAWorker* w) {
     Mutex::Locker l(w_lock);
@@ -135,11 +133,13 @@ class RDMADispatcher : public CephContext::ForkWatcher {
   RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
   void erase_qpn_lockless(uint32_t qpn);
   void erase_qpn(uint32_t qpn);
+  Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
+  Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
   void notify_pending_workers();
   virtual void handle_pre_fork() override;
   virtual void handle_post_fork() override;
-  void handle_tx_event(Device *ibdev, ibv_wc *cqe, int n);
-  void post_tx_buffer(Device *ibdev, std::vector<Chunk*> &chunks);
+  void handle_tx_event(ibv_wc *cqe, int n);
+  void post_tx_buffer(std::vector<Chunk*> &chunks);
 
   std::atomic<uint64_t> inflight = {0};
 };
@@ -240,8 +240,6 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
                           RDMAWorker *w);
   virtual ~RDMAConnectedSocketImpl();
 
-  Device *get_device() { return ibdev; }
-
   void pass_wc(std::vector<ibv_wc> &&v);
   void get_wc(std::vector<ibv_wc> &w);
   virtual int is_connected() override { return connected; }