return r;
}
-Infiniband::MemoryManager* Infiniband::MemoryManager::RxAllocator::manager = nullptr;
-PerfCounters *Infiniband::MemoryManager::RxAllocator::perf_logger = nullptr;
+bool Infiniband::MemoryManager::pool_context::can_alloc(unsigned nbufs)
+{
+ /* unimited */
+ if (manager->cct->_conf->ms_async_rdma_receive_buffers <= 0)
+ return true;
+
+ if (n_bufs_allocated + nbufs > (unsigned)manager->cct->_conf->ms_async_rdma_receive_buffers) {
+ lderr(manager->cct) << __func__ << " WARNING: OUT OF RX BUFFERS: allocated: " <<
+ n_bufs_allocated << " requested: " << nbufs <<
+ " limit: " << manager->cct->_conf->ms_async_rdma_receive_buffers << dendl;
+ return false;
+ }
+
+ return true;
+}
+
+void Infiniband::MemoryManager::pool_context::set_stat_logger(PerfCounters *logger) {
+ perf_logger = logger;
+ if (perf_logger != nullptr)
+ perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated);
+}
-unsigned Infiniband::MemoryManager::RxAllocator::n_bufs_allocated = 0;
-unsigned Infiniband::MemoryManager::RxAllocator::max_bufs = 0;
+void Infiniband::MemoryManager::pool_context::update_stats(int nbufs)
+{
+ n_bufs_allocated += nbufs;
+
+ if (!perf_logger)
+ return;
+ if (nbufs > 0) {
+ perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs);
+ } else {
+ perf_logger->dec(l_msgr_rdma_rx_bufs_total, -nbufs);
+ }
+}
-char *Infiniband::MemoryManager::RxAllocator::malloc(const size_type bytes)
+void *Infiniband::MemoryManager::mem_pool::slow_malloc()
+{
+ void *p;
+
+ Mutex::Locker l(PoolAllocator::lock);
+ PoolAllocator::g_ctx = ctx;
+ // this will trigger pool expansion via PoolAllocator::malloc()
+ p = boost::pool<PoolAllocator>::malloc();
+ PoolAllocator::g_ctx = nullptr;
+ return p;
+}
+
+Infiniband::MemoryManager::pool_context *Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr;
+Mutex Infiniband::MemoryManager::PoolAllocator::lock("pool-alloc-lock");
+
+// lock is taken by mem_pool::slow_malloc()
+char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type bytes)
{
mem_info *m;
Chunk *ch;
size_t rx_buf_size;
unsigned nbufs;
+ MemoryManager *manager;
+ CephContext *cct;
- rx_buf_size = sizeof(Chunk) + manager->cct->_conf->ms_async_rdma_buffer_size;
+ assert(g_ctx);
+ manager = g_ctx->manager;
+ cct = manager->cct;
+ rx_buf_size = sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size;
nbufs = bytes/rx_buf_size;
- if (max_bufs > 0 && n_bufs_allocated + nbufs > max_bufs) {
+ if (!g_ctx->can_alloc(nbufs))
return NULL;
- }
m = static_cast<mem_info *>(manager->malloc(bytes + sizeof(*m)));
- if (!m)
+ if (!m) {
+ lderr(cct) << __func__ << "failed to allocate " <<
+ bytes << " + " << sizeof(*m) << " bytes of memory for " << nbufs << dendl;
return NULL;
+ }
+
+ m->mr = ibv_reg_mr(manager->pd->pd, m->chunks, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
+ if (m->mr == NULL) {
+ lderr(cct) << __func__ << "failed to register " <<
+ bytes << " + " << sizeof(*m) << " bytes of memory for " << nbufs << dendl;
+ manager->free(m);
+ return NULL;
+ }
- m->mr = ibv_reg_mr(manager->pd->pd, m->chunks, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
- assert(m->mr);
m->nbufs = nbufs;
+ // save this chunk context
+ m->ctx = g_ctx;
- n_bufs_allocated += nbufs;
// note that the memory can be allocated before perf logger is set
- if (perf_logger)
- perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs);
+ g_ctx->update_stats(nbufs);
/* initialize chunks */
ch = m->chunks;
for (unsigned i = 0; i < nbufs; i++) {
ch->lkey = m->mr->lkey;
- ch->bytes = manager->cct->_conf->ms_async_rdma_buffer_size;
+ ch->bytes = cct->_conf->ms_async_rdma_buffer_size;
ch->offset = 0;
ch->buffer = ch->data; // TODO: refactor tx and remove buffer
ch = reinterpret_cast<Chunk *>(reinterpret_cast<char *>(ch) + rx_buf_size);
}
-void Infiniband::MemoryManager::RxAllocator::free(char * const block)
+void Infiniband::MemoryManager::PoolAllocator::free(char * const block)
{
mem_info *m;
+ Mutex::Locker l(lock);
m = reinterpret_cast<mem_info *>(block) - 1;
- n_bufs_allocated -= m->nbufs;
- if (perf_logger)
- perf_logger->dec(l_msgr_rdma_rx_bufs_total, m->nbufs);
+ m->ctx->update_stats(-m->nbufs);
ibv_dereg_mr(m->mr);
- manager->free(m);
+ m->ctx->manager->free(m);
}
Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDomain *p)
: cct(c), device(d), pd(p),
- rxbuf_pool(sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size,
+ rxbuf_pool_ctx(this),
+ rxbuf_pool(&rxbuf_pool_ctx, sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size,
c->_conf->ms_async_rdma_receive_buffers > 0 ?
// if possible make initial pool size 2 * receive_queue_len
// that way there will be no pool expansion upon receive of the
// rx pool is infinite, we can set any initial size that we want
2 * c->_conf->ms_async_rdma_receive_queue_len)
{
- RxAllocator::set_memory_manager(this);
- // remember the setting because cct may not be available when
- // global infiniband is destroyed
- hp_enabled = cct->_conf->ms_async_rdma_enable_hugepage;
}
Infiniband::MemoryManager::~MemoryManager()
void* Infiniband::MemoryManager::malloc(size_t size)
{
- if (hp_enabled)
+ if (cct->_conf->ms_async_rdma_enable_hugepage)
return huge_pages_malloc(size);
else
return std::malloc(size);
void Infiniband::MemoryManager::free(void *ptr)
{
- if (hp_enabled)
+ if (cct->_conf->ms_async_rdma_enable_hugepage)
huge_pages_free(ptr);
else
std::free(ptr);
ceph_abort();
}
- MemoryManager::RxAllocator::set_max_bufs(cct->_conf->ms_async_rdma_receive_buffers);
-
tx_queue_len = device->device_attr->max_qp_wr;
if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) {
tx_queue_len = cct->_conf->ms_async_rdma_send_buffers;
return qp;
}
-void Infiniband::post_chunks_to_srq(int num)
+int Infiniband::post_chunks_to_srq(int num)
{
int ret, i = 0;
ibv_sge isge[num];
while (i < num) {
chunk = get_memory_manager()->get_rx_buffer();
-
- assert (chunk != NULL);
+ if (chunk == NULL) {
+ lderr(cct) << __func__ << " WARNING: out of memory. Requested " << num <<
+ " rx buffers. Got " << i << dendl;
+ if (i == 0)
+ return 0;
+ // if we got some buffers post them and hope for the best
+ rx_work_request[i-1].next = 0;
+ break;
+ }
isge[i].addr = reinterpret_cast<uint64_t>(chunk->data);
isge[i].length = chunk->bytes;
ibv_recv_wr *badworkrequest;
ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
assert(ret == 0);
+ return i;
}
Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)
Chunk* chunk_base = nullptr;
};
- class RxAllocator {
- struct mem_info {
- ibv_mr *mr;
- unsigned nbufs;
- Chunk chunks[0];
- };
- static MemoryManager *manager;
- static unsigned n_bufs_allocated;
- static unsigned max_bufs;
- static PerfCounters *perf_logger;
- public:
- typedef std::size_t size_type;
- typedef std::ptrdiff_t difference_type;
-
- static char * malloc(const size_type bytes);
- static void free(char * const block);
+ class pool_context {
+ PerfCounters *perf_logger;
+
+ public:
+ MemoryManager *manager;
+ unsigned n_bufs_allocated;
+ // true if it is possible to alloc
+ // more memory for the pool
+ pool_context(MemoryManager *m) :
+ perf_logger(nullptr),
+ manager(m),
+ n_bufs_allocated(0) {}
+ bool can_alloc(unsigned nbufs);
+ void update_stats(int val);
+ void set_stat_logger(PerfCounters *logger);
+ };
- static void set_memory_manager(MemoryManager *m) {
- manager = m;
- }
- static void set_max_bufs(int n) {
- max_bufs = n;
- }
+ class PoolAllocator {
+ struct mem_info {
+ ibv_mr *mr;
+ pool_context *ctx;
+ unsigned nbufs;
+ Chunk chunks[0];
+ };
+ public:
+ typedef std::size_t size_type;
+ typedef std::ptrdiff_t difference_type;
+
+ static char * malloc(const size_type bytes);
+ static void free(char * const block);
+
+ static pool_context *g_ctx;
+ static Mutex lock;
+ };
- static void set_perf_logger(PerfCounters *logger) {
- perf_logger = logger;
- perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated);
- }
+ // modify boost pool so that it is possible to
+ // have a thread safe 'context' when allocating/freeing
+ // the memory. It is needed to allow a different pool
+ // configurations and bookkeeping per CephContext and
+ // also to be able // to use same allocator to deal with
+ // RX and TX pool.
+ // TODO: use boost pool to allocate TX chunks too
+ class mem_pool : public boost::pool<PoolAllocator> {
+ private:
+ pool_context *ctx;
+ void *slow_malloc();
+
+ public:
+ explicit mem_pool(pool_context *ctx, const size_type nrequested_size,
+ const size_type nnext_size = 32,
+ const size_type nmax_size = 0) :
+ pool(nrequested_size, nnext_size, nmax_size),
+ ctx(ctx) { }
+
+ void *malloc() {
+ if (!store().empty())
+ return (store().malloc)();
+ // need to alloc more memory...
+ // slow path code
+ return slow_malloc();
+ }
};
MemoryManager(CephContext *c, Device *d, ProtectionDomain *p);
rxbuf_pool.free(chunk);
}
+ void set_rx_stat_logger(PerfCounters *logger) {
+ rxbuf_pool_ctx.set_stat_logger(logger);
+ }
+
CephContext *cct;
private:
// TODO: Cluster -> TxPool txbuf_pool
Cluster* send;// SEND
Device *device;
ProtectionDomain *pd;
- boost::pool<RxAllocator> rxbuf_pool;
- bool hp_enabled;
+ pool_context rxbuf_pool_ctx;
+ mem_pool rxbuf_pool;
void* huge_pages_malloc(size_t size);
void huge_pages_free(void *ptr);
typedef MemoryManager::Chunk Chunk;
QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type);
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
- void post_chunks_to_srq(int);
+ // post rx buffers to srq, return number of buffers actually posted
+ int post_chunks_to_srq(int num);
void post_chunk_to_pool(Chunk* chunk) {
get_memory_manager()->release_rx_buffer(chunk);
}