]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: improves RX buffer management 16693/head
authorAdir Lev <adirl@mellanox.com>
Wed, 3 May 2017 11:17:51 +0000 (11:17 +0000)
committerAlex Mikheev <alexm@mellanox.com>
Sun, 6 Aug 2017 11:21:48 +0000 (11:21 +0000)
The commit adds following changes:
- rx buffers are allocated from the memory pool (boost::pool)
- flat memory layout for buffer data/metadata to reduce cpu cache misses
- number of receive buffers can be much larger than receive queue len
- post new buffers to the srq as soon as possible.
- stat counters

Signed-off-by: Alex Mikheev <alexm@mellanox.com>
src/common/legacy_config_opts.h
src/common/options.cc
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index c65b52acb6061a4264cf237bbc080f184a09634f..5c28e939207d82b0b28b413525fda84420d5e0c8 100644 (file)
@@ -165,7 +165,10 @@ OPTION(ms_async_rdma_device_name, OPT_STR)
 OPTION(ms_async_rdma_enable_hugepage, OPT_BOOL)
 OPTION(ms_async_rdma_buffer_size, OPT_INT)
 OPTION(ms_async_rdma_send_buffers, OPT_U32)
+//size of the receive buffer pool, 0 is unlimited
 OPTION(ms_async_rdma_receive_buffers, OPT_U32)
+// max number of wr in srq
+OPTION(ms_async_rdma_receive_queue_len, OPT_U32)
 OPTION(ms_async_rdma_port_num, OPT_U32)
 OPTION(ms_async_rdma_polling_us, OPT_U32)
 OPTION(ms_async_rdma_local_gid, OPT_STR)       // GID format: "fe80:0000:0000:0000:7efe:90ff:fe72:6efe", no zero folding
index 9101078cb8d709e27a25edac0ca629d560eec262..afc847a9b0012a9a5a7ac41f4c27a78a5b8d22f4 100644 (file)
@@ -736,7 +736,11 @@ std::vector<Option> get_global_options() {
     .set_description(""),
 
     Option("ms_async_rdma_receive_buffers", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
-    .set_default(1024)
+    .set_default(32768)
+    .set_description(""),
+
+    Option("ms_async_rdma_receive_queue_len", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+    .set_default(4096)
     .set_description(""),
 
     Option("ms_async_rdma_port_num", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
index 37e1a53077889f6507af1211e8b2c832a85baff1..cf9031e068d6ac301db19326b7c7ae82ab5f2349 100644 (file)
@@ -150,7 +150,7 @@ Infiniband::QueuePair::QueuePair(
     CephContext *c, Infiniband& infiniband, ibv_qp_type type,
     int port, ibv_srq *srq,
     Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
-    uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)
+    uint32_t tx_queue_len, uint32_t rx_queue_len, uint32_t q_key)
 : cct(c), infiniband(infiniband),
   type(type),
   ctxt(infiniband.device->ctxt),
@@ -161,8 +161,8 @@ Infiniband::QueuePair::QueuePair(
   txcq(txcq),
   rxcq(rxcq),
   initial_psn(0),
-  max_send_wr(max_send_wr),
-  max_recv_wr(max_recv_wr),
+  max_send_wr(tx_queue_len),
+  max_recv_wr(rx_queue_len),
   q_key(q_key),
   dead(false)
 {
@@ -192,7 +192,7 @@ int Infiniband::QueuePair::init()
   if (qp == NULL) {
     lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
     if (errno == ENOMEM) {
-      lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_buffers, "
+      lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_queue_length, "
                                " ms_async_rdma_send_buffers or"
                                " ms_async_rdma_buffer_size" << dendl;
     }
@@ -554,11 +554,6 @@ void Infiniband::MemoryManager::Chunk::clear()
   bound = 0;
 }
 
-void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib)
-{
-  ib->post_chunk(this);
-}
-
 Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
   : manager(m), buffer_size(s), lock("cluster_lock")
 {
@@ -574,10 +569,7 @@ Infiniband::MemoryManager::Cluster::~Cluster()
   }
 
   ::free(chunk_base);
-  if (manager.enabled_huge_page)
-    manager.free_huge_pages(base);
-  else
-    ::free(base);
+  manager.free(base);
 }
 
 int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
@@ -585,11 +577,8 @@ int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
   assert(!base);
   num_chunk = num;
   uint32_t bytes = buffer_size * num;
-  if (manager.enabled_huge_page) {
-    base = (char*)manager.malloc_huge_pages(bytes);
-  } else {
-    base = (char*)memalign(CEPH_PAGE_SIZE, bytes);
-  }
+
+  base = (char*)manager.malloc(bytes);
   end = base + bytes;
   assert(base);
   chunk_base = static_cast<Chunk*>(::malloc(sizeof(Chunk) * num));
@@ -642,27 +631,96 @@ int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks,
   return r;
 }
 
+Infiniband::MemoryManager* Infiniband::MemoryManager::RxAllocator::manager = nullptr;
+PerfCounters *Infiniband::MemoryManager::RxAllocator::perf_logger = nullptr;
+
+unsigned Infiniband::MemoryManager::RxAllocator::n_bufs_allocated = 0;
+unsigned Infiniband::MemoryManager::RxAllocator::max_bufs         = 0;
+
+
+char *Infiniband::MemoryManager::RxAllocator::malloc(const size_type bytes)
+{
+  mem_info *m;
+  Chunk *ch;
+  size_t rx_buf_size;
+  unsigned nbufs;
+
+  rx_buf_size = sizeof(Chunk) + manager->cct->_conf->ms_async_rdma_buffer_size;
+  nbufs       = bytes/rx_buf_size;
+
+  if (max_bufs > 0 && n_bufs_allocated + nbufs > max_bufs) {
+    return NULL;
+  }
+
+  m = static_cast<mem_info *>(manager->malloc(bytes + sizeof(*m)));
+  if (!m) 
+    return NULL;
+
+  m->mr    = ibv_reg_mr(manager->pd->pd, m->chunks, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
+  assert(m->mr);
+  m->nbufs = nbufs;
+
+  n_bufs_allocated += nbufs;
+  // note that the memory can be allocated before perf logger is set
+  if (perf_logger)
+    perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs);
+
+  /* initialize chunks */
+  ch = m->chunks;
+  for (unsigned i = 0; i < nbufs; i++) {
+    ch->lkey   = m->mr->lkey;
+    ch->bytes  = manager->cct->_conf->ms_async_rdma_buffer_size;
+    ch->offset = 0;
+    ch->buffer = ch->data; // TODO: refactor tx and remove buffer
+    ch = reinterpret_cast<Chunk *>(reinterpret_cast<char *>(ch) + rx_buf_size);
+  }
+
+  return reinterpret_cast<char *>(m->chunks);
+}
+
 
-Infiniband::MemoryManager::MemoryManager(Device *d, ProtectionDomain *p, bool hugepage)
-  : device(d), pd(p)
+void Infiniband::MemoryManager::RxAllocator::free(char * const block)
+{
+  mem_info *m;
+    
+  m = reinterpret_cast<mem_info *>(block) - 1;
+  n_bufs_allocated -= m->nbufs;
+  if (perf_logger)
+    perf_logger->dec(l_msgr_rdma_rx_bufs_total, m->nbufs);
+  ibv_dereg_mr(m->mr);
+  manager->free(m);
+}
+
+Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDomain *p)
+  : cct(c), device(d), pd(p),
+    rxbuf_pool(sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size, 
+               c->_conf->ms_async_rdma_receive_buffers > 0 ?
+                  // if possible make initial pool size 2 * receive_queue_len
+                  // that way there will be no pool expansion upon receive of the
+                  // first packet.
+                  (c->_conf->ms_async_rdma_receive_buffers < 2 * c->_conf->ms_async_rdma_receive_queue_len ?
+                   c->_conf->ms_async_rdma_receive_buffers :  2 * c->_conf->ms_async_rdma_receive_queue_len) :
+                  // rx pool is infinite, we can set any initial size that we want
+                   2 * c->_conf->ms_async_rdma_receive_queue_len)
 {
-  enabled_huge_page = hugepage;
+  RxAllocator::set_memory_manager(this);
+  // remember the setting because cct may not be available when
+  // global infiniband is destroyed
+  hp_enabled = cct->_conf->ms_async_rdma_enable_hugepage;
 }
 
 Infiniband::MemoryManager::~MemoryManager()
 {
-  if (channel)
-    delete channel;
   if (send)
     delete send;
 }
 
-void* Infiniband::MemoryManager::malloc_huge_pages(size_t size)
+void* Infiniband::MemoryManager::huge_pages_malloc(size_t size)
 {
   size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);
   char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);
   if (ptr == MAP_FAILED) {
-    ptr = (char *)malloc(real_size);
+    ptr = (char *)std::malloc(real_size);
     if (ptr == NULL) return NULL;
     real_size = 0;
   }
@@ -670,7 +728,7 @@ void* Infiniband::MemoryManager::malloc_huge_pages(size_t size)
   return ptr + HUGE_PAGE_SIZE;
 }
 
-void Infiniband::MemoryManager::free_huge_pages(void *ptr)
+void Infiniband::MemoryManager::huge_pages_free(void *ptr)
 {
   if (ptr == NULL) return;
   void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;
@@ -679,15 +737,30 @@ void Infiniband::MemoryManager::free_huge_pages(void *ptr)
   if (real_size != 0)
     munmap(real_ptr, real_size);
   else
-    free(real_ptr);
+    std::free(real_ptr);
 }
 
-void Infiniband::MemoryManager::register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num)
+
+void* Infiniband::MemoryManager::malloc(size_t size)
+{
+  if (hp_enabled)
+    return huge_pages_malloc(size);
+  else
+    return std::malloc(size);
+}
+
+void Infiniband::MemoryManager::free(void *ptr)
+{
+  if (hp_enabled)
+    huge_pages_free(ptr);
+  else
+    std::free(ptr);
+}
+
+void Infiniband::MemoryManager::create_tx_pool(uint32_t size, uint32_t tx_num)
 {
   assert(device);
   assert(pd);
-  channel = new Cluster(*this, size);
-  channel->fill(rx_num);
 
   send = new Cluster(*this, size);
   send->fill(tx_num);
@@ -703,12 +776,6 @@ int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t b
   return send->get_buffers(c, bytes);
 }
 
-int Infiniband::MemoryManager::get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes)
-{
-  return channel->get_buffers(chunks, bytes);
-}
-
-
 Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
   : cct(cct), lock("IB lock"), device_name(device_name), port_num(port_num)
 {
@@ -731,33 +798,44 @@ void Infiniband::init()
   pd = new ProtectionDomain(cct, device);
   assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
 
-  max_recv_wr = device->device_attr->max_srq_wr;
-  if (max_recv_wr > cct->_conf->ms_async_rdma_receive_buffers) {
-    max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers;
-    ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl;
+  rx_queue_len = device->device_attr->max_srq_wr;
+  if (rx_queue_len > cct->_conf->ms_async_rdma_receive_queue_len) {
+    rx_queue_len = cct->_conf->ms_async_rdma_receive_queue_len;
+    ldout(cct, 1) << __func__ << " receive queue length is " << rx_queue_len << " receive buffers" << dendl;
   } else {
-    ldout(cct, 1) << __func__ << " using the max allowed receive buffers: " << max_recv_wr << dendl;
+    ldout(cct, 0) << __func__ << " requested receive queue length " <<
+                  cct->_conf->ms_async_rdma_receive_queue_len <<
+                  " is too big. Setting " << rx_queue_len << dendl;
+  }
+
+  // check for the misconfiguration
+  if (cct->_conf->ms_async_rdma_receive_buffers > 0 &&
+      rx_queue_len > (unsigned)cct->_conf->ms_async_rdma_receive_buffers) {
+    lderr(cct) << __func__ << " rdma_receive_queue_len (" <<
+                  rx_queue_len << ") > ms_async_rdma_receive_buffers(" <<
+                  cct->_conf->ms_async_rdma_receive_buffers << ")." << dendl;
+    ceph_abort();
   }
 
-  max_send_wr = device->device_attr->max_qp_wr;
-  if (max_send_wr > cct->_conf->ms_async_rdma_send_buffers) {
-    max_send_wr = cct->_conf->ms_async_rdma_send_buffers;
-    ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers"  << dendl;
+  MemoryManager::RxAllocator::set_max_bufs(cct->_conf->ms_async_rdma_receive_buffers);
+
+  tx_queue_len = device->device_attr->max_qp_wr;
+  if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) {
+    tx_queue_len = cct->_conf->ms_async_rdma_send_buffers;
+    ldout(cct, 1) << __func__ << " assigning: " << tx_queue_len << " send buffers"  << dendl;
   } else {
-    ldout(cct, 1) << __func__ << " using the max allowed send buffers: " << max_send_wr << dendl;
+    ldout(cct, 0) << __func__ << " using the max allowed send buffers: " << tx_queue_len << dendl;
   }
 
   ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe
                 << " completion entries" << dendl;
 
-  memory_manager = new MemoryManager(device, pd,
-                                     cct->_conf->ms_async_rdma_enable_hugepage);
-  memory_manager->register_rx_tx(
-      cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr);
+  memory_manager = new MemoryManager(cct, device, pd);
+  memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len);
 
-  srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);
-  post_channel_cluster();
+  srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
 
+  post_chunks_to_srq(rx_queue_len); //add to srq
   dispatcher->polling_start();
 }
 
@@ -779,6 +857,8 @@ void Infiniband::set_dispatcher(RDMADispatcher *d)
   assert(!d ^ !dispatcher);
 
   dispatcher = d;
+  if (dispatcher != nullptr)
+    MemoryManager::RxAllocator::set_perf_logger(dispatcher->perf_logger);
 }
 
 /**
@@ -819,7 +899,7 @@ int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
 Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)
 {
   Infiniband::QueuePair *qp = new QueuePair(
-      cct, *this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr);
+      cct, *this, type, ib_physical_port, srq, tx, rx, tx_queue_len, rx_queue_len);
   if (qp->init()) {
     delete qp;
     return NULL;
@@ -827,37 +907,36 @@ Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, Completio
   return qp;
 }
 
-int Infiniband::post_chunk(Chunk* chunk)
+void Infiniband::post_chunks_to_srq(int num)
 {
-  ibv_sge isge;
-  isge.addr = reinterpret_cast<uint64_t>(chunk->buffer);
-  isge.length = chunk->bytes;
-  isge.lkey = chunk->mr->lkey;
-  ibv_recv_wr rx_work_request;
-
-  memset(&rx_work_request, 0, sizeof(rx_work_request));
-  rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
-  rx_work_request.next = NULL;
-  rx_work_request.sg_list = &isge;
-  rx_work_request.num_sge = 1;
-
-  ibv_recv_wr *badWorkRequest;
-  int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);
-  if (ret)
-    return -errno;
-  return 0;
-}
+  int ret, i = 0;
+  ibv_sge isge[num];
+  Chunk *chunk;
+  ibv_recv_wr rx_work_request[num];
 
-int Infiniband::post_channel_cluster()
-{
-  vector<Chunk*> free_chunks;
-  int r = memory_manager->get_channel_buffers(free_chunks, 0);
-  assert(r > 0);
-  for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {
-    r = post_chunk(*iter);
-    assert(r == 0);
+  while (i < num) {
+    chunk = get_memory_manager()->get_rx_buffer();
+
+    assert (chunk != NULL);
+
+    isge[i].addr = reinterpret_cast<uint64_t>(chunk->data);
+    isge[i].length = chunk->bytes;
+    isge[i].lkey = chunk->lkey;
+
+    memset(&rx_work_request[i], 0, sizeof(rx_work_request[i]));
+    rx_work_request[i].wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
+    if (i == num - 1) {
+      rx_work_request[i].next = 0;
+    } else {
+      rx_work_request[i].next = &rx_work_request[i+1];
+    }
+    rx_work_request[i].sg_list = &isge[i];
+    rx_work_request[i].num_sge = 1;
+    i++;
   }
-  return 0;
+  ibv_recv_wr *badworkrequest;
+  ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
+  assert(ret == 0);
 }
 
 Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)
index 068896c982ac7e68602395316b7067f45a30a696..ee8fb2758584e374a71607ddd3f00efa5ef2d3fc 100644 (file)
 #ifndef CEPH_INFINIBAND_H
 #define CEPH_INFINIBAND_H
 
+#include <boost/pool/pool.hpp>
+// need this because boost messes with ceph log/assert definitions
+#include <include/assert.h>
+
+#include <infiniband/verbs.h>
+
 #include <string>
 #include <vector>
 
@@ -27,6 +33,7 @@
 #include "common/debug.h"
 #include "common/errno.h"
 #include "common/Mutex.h"
+#include "common/perf_counters.h"
 #include "msg/msg_types.h"
 #include "msg/async/net_handler.h"
 
@@ -121,6 +128,50 @@ class DeviceList {
   }
 };
 
+// stat counters
+enum {
+  l_msgr_rdma_dispatcher_first = 94000,
+
+  l_msgr_rdma_polling,
+  l_msgr_rdma_inflight_tx_chunks,
+  l_msgr_rdma_rx_bufs_in_use,
+  l_msgr_rdma_rx_bufs_total,
+
+  l_msgr_rdma_tx_total_wc,
+  l_msgr_rdma_tx_total_wc_errors,
+  l_msgr_rdma_tx_wc_retry_errors,
+  l_msgr_rdma_tx_wc_wr_flush_errors,
+
+  l_msgr_rdma_rx_total_wc,
+  l_msgr_rdma_rx_total_wc_errors,
+  l_msgr_rdma_rx_fin,
+
+  l_msgr_rdma_handshake_errors,
+
+  l_msgr_rdma_total_async_events,
+  l_msgr_rdma_async_last_wqe_events,
+
+  l_msgr_rdma_created_queue_pair,
+  l_msgr_rdma_active_queue_pair,
+
+  l_msgr_rdma_dispatcher_last,
+};
+
+enum {
+  l_msgr_rdma_first = 95000,
+
+  l_msgr_rdma_tx_no_mem,
+  l_msgr_rdma_tx_parital_mem,
+  l_msgr_rdma_tx_failed,
+
+  l_msgr_rdma_tx_chunks,
+  l_msgr_rdma_tx_bytes,
+  l_msgr_rdma_rx_chunks,
+  l_msgr_rdma_rx_bytes,
+  l_msgr_rdma_pending_sent_conns,
+
+  l_msgr_rdma_last,
+};
 
 class RDMADispatcher;
 
@@ -152,14 +203,15 @@ class Infiniband {
       bool full();
       bool over();
       void clear();
-      void post_srq(Infiniband *ib);
 
      public:
       ibv_mr* mr;
+      uint32_t lkey;
       uint32_t bytes;
       uint32_t bound;
       uint32_t offset;
-      char* buffer;
+      char* buffer; // TODO: remove buffer/refactor TX
+      char  data[0];
     };
 
     class Cluster {
@@ -189,17 +241,46 @@ class Infiniband {
       Chunk* chunk_base = nullptr;
     };
 
-    MemoryManager(Device *d, ProtectionDomain *p, bool hugepage);
+    class RxAllocator {
+      struct mem_info {
+        ibv_mr   *mr;
+        unsigned nbufs;
+        Chunk    chunks[0];
+      };
+      static MemoryManager *manager;
+      static unsigned n_bufs_allocated;
+      static unsigned max_bufs;
+      static PerfCounters *perf_logger;
+     public:
+      typedef std::size_t size_type;
+      typedef std::ptrdiff_t difference_type;
+
+      static char * malloc(const size_type bytes);
+      static void free(char * const block);
+
+      static void set_memory_manager(MemoryManager *m) {
+        manager = m;
+      }
+      static void set_max_bufs(int n) {
+        max_bufs = n;
+      }
+
+      static void set_perf_logger(PerfCounters *logger) {
+        perf_logger = logger;
+        perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated);
+      }
+    };
+
+    MemoryManager(CephContext *c, Device *d, ProtectionDomain *p);
     ~MemoryManager();
 
-    void* malloc_huge_pages(size_t size);
-    void free_huge_pages(void *ptr);
-    void register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num);
+    void* malloc(size_t size);
+    void  free(void *ptr);
+
+    void create_tx_pool(uint32_t size, uint32_t tx_num);
     void return_tx(std::vector<Chunk*> &chunks);
     int get_send_buffers(std::vector<Chunk*> &c, size_t bytes);
-    int get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes);
     bool is_tx_buffer(const char* c) { return send->is_my_buffer(c); }
-    bool is_rx_buffer(const char* c) { return channel->is_my_buffer(c); }
     Chunk *get_tx_chunk_by_buffer(const char *c) {
       return send->get_chunk_by_buffer(c);
     }
@@ -207,18 +288,32 @@ class Infiniband {
       return send->buffer_size;
     }
 
-    bool enabled_huge_page;
+    Chunk *get_rx_buffer() {
+       return reinterpret_cast<Chunk *>(rxbuf_pool.malloc());
+    }
+
+    void release_rx_buffer(Chunk *chunk) {
+      rxbuf_pool.free(chunk);
+    }
 
+    CephContext  *cct;
    private:
-    Cluster* channel;//RECV
+    // TODO: Cluster -> TxPool txbuf_pool
+    // chunk layout fix
+    //  
     Cluster* send;// SEND
     Device *device;
     ProtectionDomain *pd;
+    boost::pool<RxAllocator> rxbuf_pool;
+    bool  hp_enabled;
+
+    void* huge_pages_malloc(size_t size);
+    void  huge_pages_free(void *ptr);
   };
 
  private:
-  uint32_t max_send_wr = 0;
-  uint32_t max_recv_wr = 0;
+  uint32_t tx_queue_len = 0;
+  uint32_t rx_queue_len = 0;
   uint32_t max_sge = 0;
   uint8_t  ib_physical_port = 0;
   MemoryManager* memory_manager = nullptr;
@@ -297,7 +392,7 @@ class Infiniband {
               int ib_physical_port,  ibv_srq *srq,
               Infiniband::CompletionQueue* txcq,
               Infiniband::CompletionQueue* rxcq,
-              uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0);
+              uint32_t tx_queue_len, uint32_t max_recv_wr, uint32_t q_key = 0);
     ~QueuePair();
 
     int init();
@@ -361,8 +456,10 @@ class Infiniband {
   typedef MemoryManager::Chunk Chunk;
   QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type);
   ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
-  int post_chunk(Chunk* chunk);
-  int post_channel_cluster();
+  void  post_chunks_to_srq(int);
+  void post_chunk_to_pool(Chunk* chunk) {
+    get_memory_manager()->release_rx_buffer(chunk);
+  }
   int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes);
   CompletionChannel *create_comp_channel(CephContext *c);
   CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
@@ -375,7 +472,6 @@ class Infiniband {
   Device* get_device() { return device; }
   int get_async_fd() { return device->ctxt->async_fd; }
   bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);}
-  bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);}
   Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
   static const char* wc_status_to_string(int status);
   static const char* qp_state_string(int status);
index 66dc488cccf3fc7a6b27935a2be6412886304995..378a8e6c8262d3bc343b7b126958d7831845cb7c 100644 (file)
@@ -45,23 +45,20 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
   cleanup();
   worker->remove_pending_conn(this);
   dispatcher->erase_qpn(my_msg.qpn);
+
+  for (unsigned i=0; i < wc.size(); ++i) {
+    dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id));
+  }
+  for (unsigned i=0; i < buffers.size(); ++i) {
+    dispatcher->post_chunk_to_pool(buffers[i]);
+  }
+
   Mutex::Locker l(lock);
   if (notify_fd >= 0)
     ::close(notify_fd);
   if (tcp_fd >= 0)
     ::close(tcp_fd);
   error = ECONNRESET;
-  int ret = 0;
-  for (unsigned i=0; i < wc.size(); ++i) {
-    ret = infiniband->post_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
-    assert(ret == 0);
-    dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
-  }
-  for (unsigned i=0; i < buffers.size(); ++i) {
-    ret = infiniband->post_chunk(buffers[i]);
-    assert(ret == 0);
-    dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
-  }
 }
 
 void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
@@ -290,8 +287,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
         error = ECONNRESET;
         ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
       }
-      assert(infiniband->post_chunk(chunk) == 0);
-      dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
+      dispatcher->post_chunk_to_pool(chunk);
     } else {
       if (read == (ssize_t)len) {
         buffers.push_back(chunk);
@@ -302,8 +298,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
         ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
       } else {
         read += chunk->read(buf+read, response->byte_len);
-        assert(infiniband->post_chunk(chunk) == 0);
-        dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
+        dispatcher->post_chunk_to_pool(chunk);
       }
     }
   }
@@ -334,8 +329,7 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
     read += tmp;
     ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound()  << ". Chunk:" << *c  << dendl;
     if ((*c)->over()) {
-      assert(infiniband->post_chunk(*c) == 0);
-      dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
+      dispatcher->post_chunk_to_pool(*c);
       ldout(cct, 25) << __func__ << " one chunk over." << dendl;
     }
     if (read == len) {
index e16052f1dec22129eb3955eafd00bd30138c0310..d8a8a63bad6250c866c9d0f30eb3dbc7c4d49ed3 100644 (file)
@@ -59,7 +59,8 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
 
   plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
   plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");
-  plb.add_u64_counter(l_msgr_rdma_inqueue_rx_chunks, "inqueue_rx_chunks", "The number of inqueue rx chunks");
+  plb.add_u64_counter(l_msgr_rdma_rx_bufs_in_use, "rx_bufs_in_use", "The number of rx buffers that are holding data and being processed");
+  plb.add_u64_counter(l_msgr_rdma_rx_bufs_total, "rx_bufs_total", "The total number of rx buffers");
 
   plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
   plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
@@ -140,6 +141,12 @@ void RDMADispatcher::handle_async_event()
   }
 }
 
+void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) {
+  Mutex::Locker l(lock);
+  global_infiniband->post_chunk_to_pool(chunk);
+  perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+}
+
 void RDMADispatcher::polling()
 {
   static int MAX_COMPLETIONS = 32;
@@ -166,8 +173,10 @@ void RDMADispatcher::polling()
       ldout(cct, 20) << __func__ << " rt completion queue got " << rx_ret
                      << " responses."<< dendl;
       perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
+      perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret);
 
       Mutex::Locker l(lock);//make sure connected socket alive when pass wc
+      global_infiniband->post_chunks_to_srq(rx_ret);
       for (int i = 0; i < rx_ret; ++i) {
         ibv_wc* response = &wc[i];
         Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
@@ -178,35 +187,28 @@ void RDMADispatcher::polling()
         if (response->status == IBV_WC_SUCCESS) {
           conn = get_conn_lockless(response->qp_num);
           if (!conn) {
-            assert(global_infiniband->is_rx_buffer(chunk->buffer));
-            r = global_infiniband->post_chunk(chunk);
             ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
-            assert(r == 0);
+            global_infiniband->post_chunk_to_pool(chunk);
+            perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
           } else {
-            polled[conn].push_back(*response);
+           polled[conn].push_back(*response);
           }
         } else {
           perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
+          perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+
           ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
               << ") status(" << response->status << ":"
               << global_infiniband->wc_status_to_string(response->status) << ")" << dendl;
-          assert(global_infiniband->is_rx_buffer(chunk->buffer));
-          r = global_infiniband->post_chunk(chunk);
-          if (r) {
-            ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl;
-            assert(r == 0);
-          }
-
           conn = get_conn_lockless(response->qp_num);
           if (conn && conn->is_connected())
             conn->fault();
+
+          global_infiniband->post_chunk_to_pool(chunk);
         }
       }
-
-      for (auto &&i : polled) {
-        perf_logger->inc(l_msgr_rdma_inqueue_rx_chunks, i.second.size());
+      for (auto &&i : polled)
         i.first->pass_wc(std::move(i.second));
-      }
       polled.clear();
     }
 
@@ -411,7 +413,6 @@ RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
   plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
   plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
   plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
-  plb.add_u64_counter(l_msgr_rdma_rx_no_registered_mem, "rx_no_registered_mem", "The count of no registered buffer when receiving");
 
   plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
   plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted");
index 3c2d90094d140ad374d8926896ceabb57df36592..bbb97af1f77d19874f63e746cf79a9236dae0c90 100644 (file)
@@ -34,34 +34,6 @@ class RDMAServerSocketImpl;
 class RDMAStack;
 class RDMAWorker;
 
-enum {
-  l_msgr_rdma_dispatcher_first = 94000,
-
-  l_msgr_rdma_polling,
-  l_msgr_rdma_inflight_tx_chunks,
-  l_msgr_rdma_inqueue_rx_chunks,
-
-  l_msgr_rdma_tx_total_wc,
-  l_msgr_rdma_tx_total_wc_errors,
-  l_msgr_rdma_tx_wc_retry_errors,
-  l_msgr_rdma_tx_wc_wr_flush_errors,
-
-  l_msgr_rdma_rx_total_wc,
-  l_msgr_rdma_rx_total_wc_errors,
-  l_msgr_rdma_rx_fin,
-
-  l_msgr_rdma_handshake_errors,
-
-  l_msgr_rdma_total_async_events,
-  l_msgr_rdma_async_last_wqe_events,
-
-  l_msgr_rdma_created_queue_pair,
-  l_msgr_rdma_active_queue_pair,
-
-  l_msgr_rdma_dispatcher_last,
-};
-
-
 class RDMADispatcher {
   typedef Infiniband::MemoryManager::Chunk Chunk;
   typedef Infiniband::QueuePair QueuePair;
@@ -144,24 +116,9 @@ class RDMADispatcher {
   void post_tx_buffer(std::vector<Chunk*> &chunks);
 
   std::atomic<uint64_t> inflight = {0};
-};
-
-
-enum {
-  l_msgr_rdma_first = 95000,
-
-  l_msgr_rdma_tx_no_mem,
-  l_msgr_rdma_tx_parital_mem,
-  l_msgr_rdma_tx_failed,
-  l_msgr_rdma_rx_no_registered_mem,
 
-  l_msgr_rdma_tx_chunks,
-  l_msgr_rdma_tx_bytes,
-  l_msgr_rdma_rx_chunks,
-  l_msgr_rdma_rx_bytes,
-  l_msgr_rdma_pending_sent_conns,
+  void post_chunk_to_pool(Chunk* chunk); 
 
-  l_msgr_rdma_last,
 };
 
 class RDMAWorker : public Worker {