OPTION(ms_async_rdma_enable_hugepage, OPT_BOOL)
OPTION(ms_async_rdma_buffer_size, OPT_INT)
OPTION(ms_async_rdma_send_buffers, OPT_U32)
+//size of the receive buffer pool, 0 is unlimited
OPTION(ms_async_rdma_receive_buffers, OPT_U32)
+// max number of wr in srq
+OPTION(ms_async_rdma_receive_queue_len, OPT_U32)
OPTION(ms_async_rdma_port_num, OPT_U32)
OPTION(ms_async_rdma_polling_us, OPT_U32)
OPTION(ms_async_rdma_local_gid, OPT_STR) // GID format: "fe80:0000:0000:0000:7efe:90ff:fe72:6efe", no zero folding
.set_description(""),
Option("ms_async_rdma_receive_buffers", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
- .set_default(1024)
+ .set_default(32768)
+ .set_description(""),
+
+ Option("ms_async_rdma_receive_queue_len", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(4096)
.set_description(""),
Option("ms_async_rdma_port_num", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
CephContext *c, Infiniband& infiniband, ibv_qp_type type,
int port, ibv_srq *srq,
Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
- uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)
+ uint32_t tx_queue_len, uint32_t rx_queue_len, uint32_t q_key)
: cct(c), infiniband(infiniband),
type(type),
ctxt(infiniband.device->ctxt),
txcq(txcq),
rxcq(rxcq),
initial_psn(0),
- max_send_wr(max_send_wr),
- max_recv_wr(max_recv_wr),
+ max_send_wr(tx_queue_len),
+ max_recv_wr(rx_queue_len),
q_key(q_key),
dead(false)
{
if (qp == NULL) {
lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
if (errno == ENOMEM) {
- lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_buffers, "
+ lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_queue_length, "
" ms_async_rdma_send_buffers or"
" ms_async_rdma_buffer_size" << dendl;
}
bound = 0;
}
-void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib)
-{
- ib->post_chunk(this);
-}
-
Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
: manager(m), buffer_size(s), lock("cluster_lock")
{
}
::free(chunk_base);
- if (manager.enabled_huge_page)
- manager.free_huge_pages(base);
- else
- ::free(base);
+ manager.free(base);
}
int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
assert(!base);
num_chunk = num;
uint32_t bytes = buffer_size * num;
- if (manager.enabled_huge_page) {
- base = (char*)manager.malloc_huge_pages(bytes);
- } else {
- base = (char*)memalign(CEPH_PAGE_SIZE, bytes);
- }
+
+ base = (char*)manager.malloc(bytes);
end = base + bytes;
assert(base);
chunk_base = static_cast<Chunk*>(::malloc(sizeof(Chunk) * num));
return r;
}
+Infiniband::MemoryManager* Infiniband::MemoryManager::RxAllocator::manager = nullptr;
+PerfCounters *Infiniband::MemoryManager::RxAllocator::perf_logger = nullptr;
+
+unsigned Infiniband::MemoryManager::RxAllocator::n_bufs_allocated = 0;
+unsigned Infiniband::MemoryManager::RxAllocator::max_bufs = 0;
+
+
+char *Infiniband::MemoryManager::RxAllocator::malloc(const size_type bytes)
+{
+ mem_info *m;
+ Chunk *ch;
+ size_t rx_buf_size;
+ unsigned nbufs;
+
+ rx_buf_size = sizeof(Chunk) + manager->cct->_conf->ms_async_rdma_buffer_size;
+ nbufs = bytes/rx_buf_size;
+
+ if (max_bufs > 0 && n_bufs_allocated + nbufs > max_bufs) {
+ return NULL;
+ }
+
+ m = static_cast<mem_info *>(manager->malloc(bytes + sizeof(*m)));
+ if (!m)
+ return NULL;
+
+ m->mr = ibv_reg_mr(manager->pd->pd, m->chunks, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
+ assert(m->mr);
+ m->nbufs = nbufs;
+
+ n_bufs_allocated += nbufs;
+ // note that the memory can be allocated before perf logger is set
+ if (perf_logger)
+ perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs);
+
+ /* initialize chunks */
+ ch = m->chunks;
+ for (unsigned i = 0; i < nbufs; i++) {
+ ch->lkey = m->mr->lkey;
+ ch->bytes = manager->cct->_conf->ms_async_rdma_buffer_size;
+ ch->offset = 0;
+ ch->buffer = ch->data; // TODO: refactor tx and remove buffer
+ ch = reinterpret_cast<Chunk *>(reinterpret_cast<char *>(ch) + rx_buf_size);
+ }
+
+ return reinterpret_cast<char *>(m->chunks);
+}
+
-Infiniband::MemoryManager::MemoryManager(Device *d, ProtectionDomain *p, bool hugepage)
- : device(d), pd(p)
+void Infiniband::MemoryManager::RxAllocator::free(char * const block)
+{
+ mem_info *m;
+
+ m = reinterpret_cast<mem_info *>(block) - 1;
+ n_bufs_allocated -= m->nbufs;
+ if (perf_logger)
+ perf_logger->dec(l_msgr_rdma_rx_bufs_total, m->nbufs);
+ ibv_dereg_mr(m->mr);
+ manager->free(m);
+}
+
+Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDomain *p)
+ : cct(c), device(d), pd(p),
+ rxbuf_pool(sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size,
+ c->_conf->ms_async_rdma_receive_buffers > 0 ?
+ // if possible make initial pool size 2 * receive_queue_len
+ // that way there will be no pool expansion upon receive of the
+ // first packet.
+ (c->_conf->ms_async_rdma_receive_buffers < 2 * c->_conf->ms_async_rdma_receive_queue_len ?
+ c->_conf->ms_async_rdma_receive_buffers : 2 * c->_conf->ms_async_rdma_receive_queue_len) :
+ // rx pool is infinite, we can set any initial size that we want
+ 2 * c->_conf->ms_async_rdma_receive_queue_len)
{
- enabled_huge_page = hugepage;
+ RxAllocator::set_memory_manager(this);
+ // remember the setting because cct may not be available when
+ // global infiniband is destroyed
+ hp_enabled = cct->_conf->ms_async_rdma_enable_hugepage;
}
Infiniband::MemoryManager::~MemoryManager()
{
- if (channel)
- delete channel;
if (send)
delete send;
}
-void* Infiniband::MemoryManager::malloc_huge_pages(size_t size)
+void* Infiniband::MemoryManager::huge_pages_malloc(size_t size)
{
size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);
char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);
if (ptr == MAP_FAILED) {
- ptr = (char *)malloc(real_size);
+ ptr = (char *)std::malloc(real_size);
if (ptr == NULL) return NULL;
real_size = 0;
}
return ptr + HUGE_PAGE_SIZE;
}
-void Infiniband::MemoryManager::free_huge_pages(void *ptr)
+void Infiniband::MemoryManager::huge_pages_free(void *ptr)
{
if (ptr == NULL) return;
void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;
if (real_size != 0)
munmap(real_ptr, real_size);
else
- free(real_ptr);
+ std::free(real_ptr);
}
-void Infiniband::MemoryManager::register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num)
+
+void* Infiniband::MemoryManager::malloc(size_t size)
+{
+ if (hp_enabled)
+ return huge_pages_malloc(size);
+ else
+ return std::malloc(size);
+}
+
+void Infiniband::MemoryManager::free(void *ptr)
+{
+ if (hp_enabled)
+ huge_pages_free(ptr);
+ else
+ std::free(ptr);
+}
+
+void Infiniband::MemoryManager::create_tx_pool(uint32_t size, uint32_t tx_num)
{
assert(device);
assert(pd);
- channel = new Cluster(*this, size);
- channel->fill(rx_num);
send = new Cluster(*this, size);
send->fill(tx_num);
return send->get_buffers(c, bytes);
}
-int Infiniband::MemoryManager::get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes)
-{
- return channel->get_buffers(chunks, bytes);
-}
-
-
Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
: cct(cct), lock("IB lock"), device_name(device_name), port_num(port_num)
{
pd = new ProtectionDomain(cct, device);
assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
- max_recv_wr = device->device_attr->max_srq_wr;
- if (max_recv_wr > cct->_conf->ms_async_rdma_receive_buffers) {
- max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers;
- ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl;
+ rx_queue_len = device->device_attr->max_srq_wr;
+ if (rx_queue_len > cct->_conf->ms_async_rdma_receive_queue_len) {
+ rx_queue_len = cct->_conf->ms_async_rdma_receive_queue_len;
+ ldout(cct, 1) << __func__ << " receive queue length is " << rx_queue_len << " receive buffers" << dendl;
} else {
- ldout(cct, 1) << __func__ << " using the max allowed receive buffers: " << max_recv_wr << dendl;
+ ldout(cct, 0) << __func__ << " requested receive queue length " <<
+ cct->_conf->ms_async_rdma_receive_queue_len <<
+ " is too big. Setting " << rx_queue_len << dendl;
+ }
+
+ // check for the misconfiguration
+ if (cct->_conf->ms_async_rdma_receive_buffers > 0 &&
+ rx_queue_len > (unsigned)cct->_conf->ms_async_rdma_receive_buffers) {
+ lderr(cct) << __func__ << " rdma_receive_queue_len (" <<
+ rx_queue_len << ") > ms_async_rdma_receive_buffers(" <<
+ cct->_conf->ms_async_rdma_receive_buffers << ")." << dendl;
+ ceph_abort();
}
- max_send_wr = device->device_attr->max_qp_wr;
- if (max_send_wr > cct->_conf->ms_async_rdma_send_buffers) {
- max_send_wr = cct->_conf->ms_async_rdma_send_buffers;
- ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl;
+ MemoryManager::RxAllocator::set_max_bufs(cct->_conf->ms_async_rdma_receive_buffers);
+
+ tx_queue_len = device->device_attr->max_qp_wr;
+ if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) {
+ tx_queue_len = cct->_conf->ms_async_rdma_send_buffers;
+ ldout(cct, 1) << __func__ << " assigning: " << tx_queue_len << " send buffers" << dendl;
} else {
- ldout(cct, 1) << __func__ << " using the max allowed send buffers: " << max_send_wr << dendl;
+ ldout(cct, 0) << __func__ << " using the max allowed send buffers: " << tx_queue_len << dendl;
}
ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe
<< " completion entries" << dendl;
- memory_manager = new MemoryManager(device, pd,
- cct->_conf->ms_async_rdma_enable_hugepage);
- memory_manager->register_rx_tx(
- cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr);
+ memory_manager = new MemoryManager(cct, device, pd);
+ memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len);
- srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);
- post_channel_cluster();
+ srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
+ post_chunks_to_srq(rx_queue_len); //add to srq
dispatcher->polling_start();
}
assert(!d ^ !dispatcher);
dispatcher = d;
+ if (dispatcher != nullptr)
+ MemoryManager::RxAllocator::set_perf_logger(dispatcher->perf_logger);
}
/**
Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)
{
Infiniband::QueuePair *qp = new QueuePair(
- cct, *this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr);
+ cct, *this, type, ib_physical_port, srq, tx, rx, tx_queue_len, rx_queue_len);
if (qp->init()) {
delete qp;
return NULL;
return qp;
}
-int Infiniband::post_chunk(Chunk* chunk)
+void Infiniband::post_chunks_to_srq(int num)
{
- ibv_sge isge;
- isge.addr = reinterpret_cast<uint64_t>(chunk->buffer);
- isge.length = chunk->bytes;
- isge.lkey = chunk->mr->lkey;
- ibv_recv_wr rx_work_request;
-
- memset(&rx_work_request, 0, sizeof(rx_work_request));
- rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
- rx_work_request.next = NULL;
- rx_work_request.sg_list = &isge;
- rx_work_request.num_sge = 1;
-
- ibv_recv_wr *badWorkRequest;
- int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);
- if (ret)
- return -errno;
- return 0;
-}
+ int ret, i = 0;
+ ibv_sge isge[num];
+ Chunk *chunk;
+ ibv_recv_wr rx_work_request[num];
-int Infiniband::post_channel_cluster()
-{
- vector<Chunk*> free_chunks;
- int r = memory_manager->get_channel_buffers(free_chunks, 0);
- assert(r > 0);
- for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {
- r = post_chunk(*iter);
- assert(r == 0);
+ while (i < num) {
+ chunk = get_memory_manager()->get_rx_buffer();
+
+ assert (chunk != NULL);
+
+ isge[i].addr = reinterpret_cast<uint64_t>(chunk->data);
+ isge[i].length = chunk->bytes;
+ isge[i].lkey = chunk->lkey;
+
+ memset(&rx_work_request[i], 0, sizeof(rx_work_request[i]));
+ rx_work_request[i].wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
+ if (i == num - 1) {
+ rx_work_request[i].next = 0;
+ } else {
+ rx_work_request[i].next = &rx_work_request[i+1];
+ }
+ rx_work_request[i].sg_list = &isge[i];
+ rx_work_request[i].num_sge = 1;
+ i++;
}
- return 0;
+ ibv_recv_wr *badworkrequest;
+ ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
+ assert(ret == 0);
}
Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)
#ifndef CEPH_INFINIBAND_H
#define CEPH_INFINIBAND_H
+#include <boost/pool/pool.hpp>
+// need this because boost messes with ceph log/assert definitions
+#include <include/assert.h>
+
+#include <infiniband/verbs.h>
+
#include <string>
#include <vector>
#include "common/debug.h"
#include "common/errno.h"
#include "common/Mutex.h"
+#include "common/perf_counters.h"
#include "msg/msg_types.h"
#include "msg/async/net_handler.h"
}
};
+// stat counters
+enum {
+ l_msgr_rdma_dispatcher_first = 94000,
+
+ l_msgr_rdma_polling,
+ l_msgr_rdma_inflight_tx_chunks,
+ l_msgr_rdma_rx_bufs_in_use,
+ l_msgr_rdma_rx_bufs_total,
+
+ l_msgr_rdma_tx_total_wc,
+ l_msgr_rdma_tx_total_wc_errors,
+ l_msgr_rdma_tx_wc_retry_errors,
+ l_msgr_rdma_tx_wc_wr_flush_errors,
+
+ l_msgr_rdma_rx_total_wc,
+ l_msgr_rdma_rx_total_wc_errors,
+ l_msgr_rdma_rx_fin,
+
+ l_msgr_rdma_handshake_errors,
+
+ l_msgr_rdma_total_async_events,
+ l_msgr_rdma_async_last_wqe_events,
+
+ l_msgr_rdma_created_queue_pair,
+ l_msgr_rdma_active_queue_pair,
+
+ l_msgr_rdma_dispatcher_last,
+};
+
+enum {
+ l_msgr_rdma_first = 95000,
+
+ l_msgr_rdma_tx_no_mem,
+ l_msgr_rdma_tx_parital_mem,
+ l_msgr_rdma_tx_failed,
+
+ l_msgr_rdma_tx_chunks,
+ l_msgr_rdma_tx_bytes,
+ l_msgr_rdma_rx_chunks,
+ l_msgr_rdma_rx_bytes,
+ l_msgr_rdma_pending_sent_conns,
+
+ l_msgr_rdma_last,
+};
class RDMADispatcher;
bool full();
bool over();
void clear();
- void post_srq(Infiniband *ib);
public:
ibv_mr* mr;
+ uint32_t lkey;
uint32_t bytes;
uint32_t bound;
uint32_t offset;
- char* buffer;
+ char* buffer; // TODO: remove buffer/refactor TX
+ char data[0];
};
class Cluster {
Chunk* chunk_base = nullptr;
};
- MemoryManager(Device *d, ProtectionDomain *p, bool hugepage);
+ class RxAllocator {
+ struct mem_info {
+ ibv_mr *mr;
+ unsigned nbufs;
+ Chunk chunks[0];
+ };
+ static MemoryManager *manager;
+ static unsigned n_bufs_allocated;
+ static unsigned max_bufs;
+ static PerfCounters *perf_logger;
+ public:
+ typedef std::size_t size_type;
+ typedef std::ptrdiff_t difference_type;
+
+ static char * malloc(const size_type bytes);
+ static void free(char * const block);
+
+ static void set_memory_manager(MemoryManager *m) {
+ manager = m;
+ }
+ static void set_max_bufs(int n) {
+ max_bufs = n;
+ }
+
+ static void set_perf_logger(PerfCounters *logger) {
+ perf_logger = logger;
+ perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated);
+ }
+ };
+
+ MemoryManager(CephContext *c, Device *d, ProtectionDomain *p);
~MemoryManager();
- void* malloc_huge_pages(size_t size);
- void free_huge_pages(void *ptr);
- void register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num);
+ void* malloc(size_t size);
+ void free(void *ptr);
+
+ void create_tx_pool(uint32_t size, uint32_t tx_num);
void return_tx(std::vector<Chunk*> &chunks);
int get_send_buffers(std::vector<Chunk*> &c, size_t bytes);
- int get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes);
bool is_tx_buffer(const char* c) { return send->is_my_buffer(c); }
- bool is_rx_buffer(const char* c) { return channel->is_my_buffer(c); }
Chunk *get_tx_chunk_by_buffer(const char *c) {
return send->get_chunk_by_buffer(c);
}
return send->buffer_size;
}
- bool enabled_huge_page;
+ Chunk *get_rx_buffer() {
+ return reinterpret_cast<Chunk *>(rxbuf_pool.malloc());
+ }
+
+ void release_rx_buffer(Chunk *chunk) {
+ rxbuf_pool.free(chunk);
+ }
+ CephContext *cct;
private:
- Cluster* channel;//RECV
+ // TODO: Cluster -> TxPool txbuf_pool
+ // chunk layout fix
+ //
Cluster* send;// SEND
Device *device;
ProtectionDomain *pd;
+ boost::pool<RxAllocator> rxbuf_pool;
+ bool hp_enabled;
+
+ void* huge_pages_malloc(size_t size);
+ void huge_pages_free(void *ptr);
};
private:
- uint32_t max_send_wr = 0;
- uint32_t max_recv_wr = 0;
+ uint32_t tx_queue_len = 0;
+ uint32_t rx_queue_len = 0;
uint32_t max_sge = 0;
uint8_t ib_physical_port = 0;
MemoryManager* memory_manager = nullptr;
int ib_physical_port, ibv_srq *srq,
Infiniband::CompletionQueue* txcq,
Infiniband::CompletionQueue* rxcq,
- uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0);
+ uint32_t tx_queue_len, uint32_t max_recv_wr, uint32_t q_key = 0);
~QueuePair();
int init();
typedef MemoryManager::Chunk Chunk;
QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type);
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
- int post_chunk(Chunk* chunk);
- int post_channel_cluster();
+ void post_chunks_to_srq(int);
+ void post_chunk_to_pool(Chunk* chunk) {
+ get_memory_manager()->release_rx_buffer(chunk);
+ }
int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes);
CompletionChannel *create_comp_channel(CephContext *c);
CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
Device* get_device() { return device; }
int get_async_fd() { return device->ctxt->async_fd; }
bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);}
- bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);}
Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
static const char* wc_status_to_string(int status);
static const char* qp_state_string(int status);
cleanup();
worker->remove_pending_conn(this);
dispatcher->erase_qpn(my_msg.qpn);
+
+ for (unsigned i=0; i < wc.size(); ++i) {
+ dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id));
+ }
+ for (unsigned i=0; i < buffers.size(); ++i) {
+ dispatcher->post_chunk_to_pool(buffers[i]);
+ }
+
Mutex::Locker l(lock);
if (notify_fd >= 0)
::close(notify_fd);
if (tcp_fd >= 0)
::close(tcp_fd);
error = ECONNRESET;
- int ret = 0;
- for (unsigned i=0; i < wc.size(); ++i) {
- ret = infiniband->post_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
- assert(ret == 0);
- dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
- }
- for (unsigned i=0; i < buffers.size(); ++i) {
- ret = infiniband->post_chunk(buffers[i]);
- assert(ret == 0);
- dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
- }
}
void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
error = ECONNRESET;
ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
}
- assert(infiniband->post_chunk(chunk) == 0);
- dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
+ dispatcher->post_chunk_to_pool(chunk);
} else {
if (read == (ssize_t)len) {
buffers.push_back(chunk);
ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
} else {
read += chunk->read(buf+read, response->byte_len);
- assert(infiniband->post_chunk(chunk) == 0);
- dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
+ dispatcher->post_chunk_to_pool(chunk);
}
}
}
read += tmp;
ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl;
if ((*c)->over()) {
- assert(infiniband->post_chunk(*c) == 0);
- dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
+ dispatcher->post_chunk_to_pool(*c);
ldout(cct, 25) << __func__ << " one chunk over." << dendl;
}
if (read == len) {
plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");
- plb.add_u64_counter(l_msgr_rdma_inqueue_rx_chunks, "inqueue_rx_chunks", "The number of inqueue rx chunks");
+ plb.add_u64_counter(l_msgr_rdma_rx_bufs_in_use, "rx_bufs_in_use", "The number of rx buffers that are holding data and being processed");
+ plb.add_u64_counter(l_msgr_rdma_rx_bufs_total, "rx_bufs_total", "The total number of rx buffers");
plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
}
}
+void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) {
+ Mutex::Locker l(lock);
+ global_infiniband->post_chunk_to_pool(chunk);
+ perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+}
+
void RDMADispatcher::polling()
{
static int MAX_COMPLETIONS = 32;
ldout(cct, 20) << __func__ << " rt completion queue got " << rx_ret
<< " responses."<< dendl;
perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
+ perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret);
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
+ global_infiniband->post_chunks_to_srq(rx_ret);
for (int i = 0; i < rx_ret; ++i) {
ibv_wc* response = &wc[i];
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
if (response->status == IBV_WC_SUCCESS) {
conn = get_conn_lockless(response->qp_num);
if (!conn) {
- assert(global_infiniband->is_rx_buffer(chunk->buffer));
- r = global_infiniband->post_chunk(chunk);
ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
- assert(r == 0);
+ global_infiniband->post_chunk_to_pool(chunk);
+ perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
} else {
- polled[conn].push_back(*response);
+ polled[conn].push_back(*response);
}
} else {
perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
+ perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
<< ") status(" << response->status << ":"
<< global_infiniband->wc_status_to_string(response->status) << ")" << dendl;
- assert(global_infiniband->is_rx_buffer(chunk->buffer));
- r = global_infiniband->post_chunk(chunk);
- if (r) {
- ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl;
- assert(r == 0);
- }
-
conn = get_conn_lockless(response->qp_num);
if (conn && conn->is_connected())
conn->fault();
+
+ global_infiniband->post_chunk_to_pool(chunk);
}
}
-
- for (auto &&i : polled) {
- perf_logger->inc(l_msgr_rdma_inqueue_rx_chunks, i.second.size());
+ for (auto &&i : polled)
i.first->pass_wc(std::move(i.second));
- }
polled.clear();
}
plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
- plb.add_u64_counter(l_msgr_rdma_rx_no_registered_mem, "rx_no_registered_mem", "The count of no registered buffer when receiving");
plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted");
class RDMAStack;
class RDMAWorker;
-enum {
- l_msgr_rdma_dispatcher_first = 94000,
-
- l_msgr_rdma_polling,
- l_msgr_rdma_inflight_tx_chunks,
- l_msgr_rdma_inqueue_rx_chunks,
-
- l_msgr_rdma_tx_total_wc,
- l_msgr_rdma_tx_total_wc_errors,
- l_msgr_rdma_tx_wc_retry_errors,
- l_msgr_rdma_tx_wc_wr_flush_errors,
-
- l_msgr_rdma_rx_total_wc,
- l_msgr_rdma_rx_total_wc_errors,
- l_msgr_rdma_rx_fin,
-
- l_msgr_rdma_handshake_errors,
-
- l_msgr_rdma_total_async_events,
- l_msgr_rdma_async_last_wqe_events,
-
- l_msgr_rdma_created_queue_pair,
- l_msgr_rdma_active_queue_pair,
-
- l_msgr_rdma_dispatcher_last,
-};
-
-
class RDMADispatcher {
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::QueuePair QueuePair;
void post_tx_buffer(std::vector<Chunk*> &chunks);
std::atomic<uint64_t> inflight = {0};
-};
-
-
-enum {
- l_msgr_rdma_first = 95000,
-
- l_msgr_rdma_tx_no_mem,
- l_msgr_rdma_tx_parital_mem,
- l_msgr_rdma_tx_failed,
- l_msgr_rdma_rx_no_registered_mem,
- l_msgr_rdma_tx_chunks,
- l_msgr_rdma_tx_bytes,
- l_msgr_rdma_rx_chunks,
- l_msgr_rdma_rx_bytes,
- l_msgr_rdma_pending_sent_conns,
+ void post_chunk_to_pool(Chunk* chunk);
- l_msgr_rdma_last,
};
class RDMAWorker : public Worker {