]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: refactor rx buffer pool allocator
authorAlex Mikheev <alexm@mellanox.com>
Mon, 26 Jun 2017 11:44:39 +0000 (11:44 +0000)
committerAlex Mikheev <alexm@mellanox.com>
Mon, 14 Aug 2017 13:00:39 +0000 (13:00 +0000)
Pool allocator now has a context. Each context can have a
different configuration. It allows to use buffer pool safely
when there are multiple RDMAStack running in parrallel.

Do not fail on assertion when out of memory for rx buffers.
Instead log warning and try to recover.

Signed-off-by: Alex Mikheev <alexm@mellanox.com>
Conflicts:
src/msg/async/rdma/Infiniband.h

src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index a6842f0bf68014bc9cd673c92ec47c16e40cde95..e155a44888e4aef73bbb352f208d3355f6bd26e8 100644 (file)
@@ -633,45 +633,103 @@ int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks,
   return r;
 }
 
-Infiniband::MemoryManager* Infiniband::MemoryManager::RxAllocator::manager = nullptr;
-PerfCounters *Infiniband::MemoryManager::RxAllocator::perf_logger = nullptr;
+bool Infiniband::MemoryManager::pool_context::can_alloc(unsigned nbufs)
+{
+  /* unimited */
+  if (manager->cct->_conf->ms_async_rdma_receive_buffers <= 0)
+    return true;
+
+  if (n_bufs_allocated + nbufs > (unsigned)manager->cct->_conf->ms_async_rdma_receive_buffers) {
+    lderr(manager->cct) << __func__ << " WARNING: OUT OF RX BUFFERS: allocated: " <<
+        n_bufs_allocated << " requested: " << nbufs <<
+        " limit: " << manager->cct->_conf->ms_async_rdma_receive_buffers << dendl;
+    return false;
+  }
+
+  return true;
+}
+
+void Infiniband::MemoryManager::pool_context::set_stat_logger(PerfCounters *logger) {
+  perf_logger = logger;
+  if (perf_logger != nullptr)
+    perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated);
+}
 
-unsigned Infiniband::MemoryManager::RxAllocator::n_bufs_allocated = 0;
-unsigned Infiniband::MemoryManager::RxAllocator::max_bufs         = 0;
+void Infiniband::MemoryManager::pool_context::update_stats(int nbufs)
+{
+  n_bufs_allocated += nbufs;
+
+  if (!perf_logger)
+    return;
 
+  if (nbufs > 0) {
+    perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs);
+  } else {
+    perf_logger->dec(l_msgr_rdma_rx_bufs_total, -nbufs);
+  }
+}
 
-char *Infiniband::MemoryManager::RxAllocator::malloc(const size_type bytes)
+void *Infiniband::MemoryManager::mem_pool::slow_malloc()
+{
+    void *p;
+
+    Mutex::Locker l(PoolAllocator::lock);
+    PoolAllocator::g_ctx = ctx;
+    // this will trigger pool expansion via PoolAllocator::malloc()
+    p = boost::pool<PoolAllocator>::malloc();
+    PoolAllocator::g_ctx = nullptr;
+    return p;
+}
+
+Infiniband::MemoryManager::pool_context *Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr;
+Mutex Infiniband::MemoryManager::PoolAllocator::lock("pool-alloc-lock");
+
+// lock is taken by mem_pool::slow_malloc()
+char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type bytes)
 {
   mem_info *m;
   Chunk *ch;
   size_t rx_buf_size;
   unsigned nbufs;
+  MemoryManager *manager;
+  CephContext *cct;
 
-  rx_buf_size = sizeof(Chunk) + manager->cct->_conf->ms_async_rdma_buffer_size;
+  assert(g_ctx);
+  manager     = g_ctx->manager;
+  cct         = manager->cct;
+  rx_buf_size = sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size;
   nbufs       = bytes/rx_buf_size;
 
-  if (max_bufs > 0 && n_bufs_allocated + nbufs > max_bufs) {
+  if (!g_ctx->can_alloc(nbufs))
     return NULL;
-  }
 
   m = static_cast<mem_info *>(manager->malloc(bytes + sizeof(*m)));
-  if (!m) 
+  if (!m) {
+    lderr(cct) << __func__ << "failed to allocate " <<
+        bytes << " + " << sizeof(*m) << " bytes of memory for " << nbufs << dendl;
     return NULL;
+  }
+
+  m->mr = ibv_reg_mr(manager->pd->pd, m->chunks, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
+  if (m->mr == NULL) {
+    lderr(cct) << __func__ << "failed to register " <<
+        bytes << " + " << sizeof(*m) << " bytes of memory for " << nbufs << dendl;
+    manager->free(m);
+    return NULL;
+  }
 
-  m->mr    = ibv_reg_mr(manager->pd->pd, m->chunks, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
-  assert(m->mr);
   m->nbufs = nbufs;
+  // save this chunk context
+  m->ctx   = g_ctx;
 
-  n_bufs_allocated += nbufs;
   // note that the memory can be allocated before perf logger is set
-  if (perf_logger)
-    perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs);
+  g_ctx->update_stats(nbufs);
 
   /* initialize chunks */
   ch = m->chunks;
   for (unsigned i = 0; i < nbufs; i++) {
     ch->lkey   = m->mr->lkey;
-    ch->bytes  = manager->cct->_conf->ms_async_rdma_buffer_size;
+    ch->bytes  = cct->_conf->ms_async_rdma_buffer_size;
     ch->offset = 0;
     ch->buffer = ch->data; // TODO: refactor tx and remove buffer
     ch = reinterpret_cast<Chunk *>(reinterpret_cast<char *>(ch) + rx_buf_size);
@@ -681,21 +739,21 @@ char *Infiniband::MemoryManager::RxAllocator::malloc(const size_type bytes)
 }
 
 
-void Infiniband::MemoryManager::RxAllocator::free(char * const block)
+void Infiniband::MemoryManager::PoolAllocator::free(char * const block)
 {
   mem_info *m;
+  Mutex::Locker l(lock);
     
   m = reinterpret_cast<mem_info *>(block) - 1;
-  n_bufs_allocated -= m->nbufs;
-  if (perf_logger)
-    perf_logger->dec(l_msgr_rdma_rx_bufs_total, m->nbufs);
+  m->ctx->update_stats(-m->nbufs);
   ibv_dereg_mr(m->mr);
-  manager->free(m);
+  m->ctx->manager->free(m);
 }
 
 Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDomain *p)
   : cct(c), device(d), pd(p),
-    rxbuf_pool(sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size, 
+    rxbuf_pool_ctx(this),
+    rxbuf_pool(&rxbuf_pool_ctx, sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size,
                c->_conf->ms_async_rdma_receive_buffers > 0 ?
                   // if possible make initial pool size 2 * receive_queue_len
                   // that way there will be no pool expansion upon receive of the
@@ -705,10 +763,6 @@ Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDo
                   // rx pool is infinite, we can set any initial size that we want
                    2 * c->_conf->ms_async_rdma_receive_queue_len)
 {
-  RxAllocator::set_memory_manager(this);
-  // remember the setting because cct may not be available when
-  // global infiniband is destroyed
-  hp_enabled = cct->_conf->ms_async_rdma_enable_hugepage;
 }
 
 Infiniband::MemoryManager::~MemoryManager()
@@ -745,7 +799,7 @@ void Infiniband::MemoryManager::huge_pages_free(void *ptr)
 
 void* Infiniband::MemoryManager::malloc(size_t size)
 {
-  if (hp_enabled)
+  if (cct->_conf->ms_async_rdma_enable_hugepage)
     return huge_pages_malloc(size);
   else
     return std::malloc(size);
@@ -753,7 +807,7 @@ void* Infiniband::MemoryManager::malloc(size_t size)
 
 void Infiniband::MemoryManager::free(void *ptr)
 {
-  if (hp_enabled)
+  if (cct->_conf->ms_async_rdma_enable_hugepage)
     huge_pages_free(ptr);
   else
     std::free(ptr);
@@ -855,8 +909,6 @@ void Infiniband::init()
     ceph_abort();
   }
 
-  MemoryManager::RxAllocator::set_max_bufs(cct->_conf->ms_async_rdma_receive_buffers);
-
   tx_queue_len = device->device_attr->max_qp_wr;
   if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) {
     tx_queue_len = cct->_conf->ms_async_rdma_send_buffers;
@@ -932,7 +984,7 @@ Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, Completio
   return qp;
 }
 
-void Infiniband::post_chunks_to_srq(int num)
+int Infiniband::post_chunks_to_srq(int num)
 {
   int ret, i = 0;
   ibv_sge isge[num];
@@ -941,8 +993,15 @@ void Infiniband::post_chunks_to_srq(int num)
 
   while (i < num) {
     chunk = get_memory_manager()->get_rx_buffer();
-
-    assert (chunk != NULL);
+    if (chunk == NULL) {
+      lderr(cct) << __func__ << " WARNING: out of memory. Requested " << num <<
+        " rx buffers. Got " << i << dendl;
+      if (i == 0)
+        return 0;
+      // if we got some buffers post them and hope for the best
+      rx_work_request[i-1].next = 0;
+      break;
+    }
 
     isge[i].addr = reinterpret_cast<uint64_t>(chunk->data);
     isge[i].length = chunk->bytes;
@@ -962,6 +1021,7 @@ void Infiniband::post_chunks_to_srq(int num)
   ibv_recv_wr *badworkrequest;
   ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
   assert(ret == 0);
+  return i;
 }
 
 Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)
index d9b196ba5570e317ed56a04fb5d11e3468c4bd2a..8a7d1f70ca8669ccf0956ceb29cf1b44295d617a 100644 (file)
@@ -241,34 +241,67 @@ class Infiniband {
       Chunk* chunk_base = nullptr;
     };
 
-    class RxAllocator {
-      struct mem_info {
-        ibv_mr   *mr;
-        unsigned nbufs;
-        Chunk    chunks[0];
-      };
-      static MemoryManager *manager;
-      static unsigned n_bufs_allocated;
-      static unsigned max_bufs;
-      static PerfCounters *perf_logger;
-     public:
-      typedef std::size_t size_type;
-      typedef std::ptrdiff_t difference_type;
-
-      static char * malloc(const size_type bytes);
-      static void free(char * const block);
+    class pool_context {
+        PerfCounters *perf_logger;
+
+      public:
+        MemoryManager *manager;
+        unsigned n_bufs_allocated;
+        // true if it is possible to alloc
+        // more memory for the pool
+        pool_context(MemoryManager *m) :
+          perf_logger(nullptr),
+          manager(m),
+          n_bufs_allocated(0) {}
+        bool can_alloc(unsigned nbufs);
+        void update_stats(int val);
+        void set_stat_logger(PerfCounters *logger);
+    };
 
-      static void set_memory_manager(MemoryManager *m) {
-        manager = m;
-      }
-      static void set_max_bufs(int n) {
-        max_bufs = n;
-      }
+    class PoolAllocator {
+        struct mem_info {
+          ibv_mr   *mr;
+          pool_context *ctx;
+          unsigned nbufs;
+          Chunk    chunks[0];
+        };
+      public:
+        typedef std::size_t size_type;
+        typedef std::ptrdiff_t difference_type;
+
+        static char * malloc(const size_type bytes);
+        static void free(char * const block);
+
+        static pool_context  *g_ctx;
+        static Mutex lock;
+    };
 
-      static void set_perf_logger(PerfCounters *logger) {
-        perf_logger = logger;
-        perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated);
-      }
+    // modify boost pool so that it is possible to
+    // have a thread safe 'context' when allocating/freeing
+    // the memory. It is needed to allow a different pool
+    // configurations and bookkeeping per CephContext and
+    // also to be able // to use same allocator to deal with
+    // RX and TX pool.
+    // TODO: use boost pool to allocate TX chunks too
+    class mem_pool : public boost::pool<PoolAllocator> {
+      private:
+        pool_context *ctx;
+        void *slow_malloc();
+
+      public:
+        explicit mem_pool(pool_context *ctx, const size_type nrequested_size,
+            const size_type nnext_size = 32,
+            const size_type nmax_size = 0) :
+          pool(nrequested_size, nnext_size, nmax_size),
+          ctx(ctx) { }
+
+        void *malloc() {
+          if (!store().empty())
+            return (store().malloc)();
+          // need to alloc more memory...
+          // slow path code
+          return slow_malloc();
+        }
     };
 
     MemoryManager(CephContext *c, Device *d, ProtectionDomain *p);
@@ -296,6 +329,10 @@ class Infiniband {
       rxbuf_pool.free(chunk);
     }
 
+    void set_rx_stat_logger(PerfCounters *logger) {
+      rxbuf_pool_ctx.set_stat_logger(logger);
+    }
+
     CephContext  *cct;
    private:
     // TODO: Cluster -> TxPool txbuf_pool
@@ -304,8 +341,8 @@ class Infiniband {
     Cluster* send;// SEND
     Device *device;
     ProtectionDomain *pd;
-    boost::pool<RxAllocator> rxbuf_pool;
-    bool  hp_enabled;
+    pool_context rxbuf_pool_ctx;
+    mem_pool     rxbuf_pool;
 
     void* huge_pages_malloc(size_t size);
     void  huge_pages_free(void *ptr);
@@ -454,7 +491,8 @@ class Infiniband {
   typedef MemoryManager::Chunk Chunk;
   QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type);
   ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
-  void  post_chunks_to_srq(int);
+  // post rx buffers to srq, return number of buffers actually posted
+  int  post_chunks_to_srq(int num);
   void post_chunk_to_pool(Chunk* chunk) {
     get_memory_manager()->release_rx_buffer(chunk);
   }
index dcc6e1a8fb3db0d1c7d1e7dbc4bc389873e4d5b9..115ff4d3c5e673806b82ac001c717c19f32f56eb 100644 (file)
@@ -84,6 +84,8 @@ void RDMADispatcher::polling_start()
   if (t.joinable()) 
     return; // dispatcher thread already running 
 
+  get_stack()->get_infiniband().get_memory_manager()->set_rx_stat_logger(perf_logger);
+
   tx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
   assert(tx_cc);
   rx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
@@ -155,6 +157,12 @@ void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) {
   Mutex::Locker l(lock);
   get_stack()->get_infiniband().post_chunk_to_pool(chunk);
   perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+  // handle a case when we have a limited number of
+  // rx buffers and we could not post a required amount when polling
+  if (post_backlog > 0) {
+    ldout(cct, 20) << __func__ << " post_backlog is " << post_backlog << dendl;
+    post_backlog -= get_stack()->get_infiniband().post_chunks_to_srq(post_backlog);
+  }
 }
 
 void RDMADispatcher::polling()
@@ -186,7 +194,9 @@ void RDMADispatcher::polling()
       perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret);
 
       Mutex::Locker l(lock);//make sure connected socket alive when pass wc
-      get_stack()->get_infiniband().post_chunks_to_srq(rx_ret);
+
+      post_backlog += rx_ret - get_stack()->get_infiniband().post_chunks_to_srq(rx_ret);
+
       for (int i = 0; i < rx_ret; ++i) {
         ibv_wc* response = &wc[i];
         Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
index 764ea33f39e82ceba6e2b6b86a448d65f6add061..6869d32ef0b18476c36c5ea9c489b50aa330b81e 100644 (file)
@@ -47,6 +47,7 @@ class RDMADispatcher {
   bool done = false;
   std::atomic<uint64_t> num_dead_queue_pair = {0};
   std::atomic<uint64_t> num_qp_conn = {0};
+  int post_backlog = 0;
   Mutex lock; // protect `qp_conns`, `dead_queue_pairs`
   // qp_num -> InfRcConnection
   // The main usage of `qp_conns` is looking up connection by qp_num,