void Device::binding_port(CephContext *cct, int port_num) {
port_cnt = device_attr.phys_port_cnt;
- for (uint8_t i = 0; i < port_cnt; ++i) {
- Port *port = new Port(cct, ctxt, i+1);
- if (i + 1 == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) {
+ for (uint8_t port_id = 1; port_id <= port_cnt; ++port_id) {
+ Port *port = new Port(cct, ctxt, port_id);
+ if (port_id == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) {
active_port = port;
- ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;
+ ldout(cct, 1) << __func__ << " found active port " << port_id << dendl;
break;
} else {
- ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl;
+ ldout(cct, 10) << __func__ << " port " << port_id << " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl;
+ delete port;
}
- delete port;
}
if (nullptr == active_port) {
lderr(cct) << __func__ << " port not found" << dendl;
PoolAllocator::g_ctx = nullptr;
return p;
}
-
Infiniband::MemoryManager::MemPoolContext *Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr;
ceph::mutex Infiniband::MemoryManager::PoolAllocator::lock =
ceph::make_mutex("pool-alloc-lock");
// lock is taken by mem_pool::slow_malloc()
-char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type bytes)
+char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type block_size)
{
- mem_info *m;
- Chunk *ch;
- size_t rx_buf_size;
- unsigned nbufs;
- MemoryManager *manager;
- CephContext *cct;
-
ceph_assert(g_ctx);
- manager = g_ctx->manager;
- cct = manager->cct;
- rx_buf_size = sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size;
- nbufs = bytes/rx_buf_size;
+ MemoryManager *manager = g_ctx->manager;
+ CephContext *cct = manager->cct;
+ size_t chunk_buffer_size = sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size;
+ size_t chunk_buffer_number = block_size / chunk_buffer_size;
- if (!g_ctx->can_alloc(nbufs))
+ if (!g_ctx->can_alloc(chunk_buffer_number))
return NULL;
- m = static_cast<mem_info *>(manager->malloc(bytes + sizeof(*m)));
- if (!m) {
- lderr(cct) << __func__ << " failed to allocate " <<
- bytes << " + " << sizeof(*m) << " bytes of memory for " << nbufs << dendl;
+ mem_info *minfo= static_cast<mem_info *>(manager->malloc(block_size + sizeof(mem_info)));
+ if (!minfo) {
+ lderr(cct) << __func__ << " failed to allocate " << chunk_buffer_number << " buffers "
+ " Its block size is : " << block_size + sizeof(mem_info) << dendl;
return NULL;
}
- m->mr = ibv_reg_mr(manager->pd->pd, m->chunks, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
- if (m->mr == NULL) {
- lderr(cct) << __func__ << " failed to register " <<
- bytes << " + " << sizeof(*m) << " bytes of memory for " << nbufs << dendl;
- manager->free(m);
+ minfo->mr = ibv_reg_mr(manager->pd->pd, minfo->chunks, block_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
+ if (minfo->mr == NULL) {
+ lderr(cct) << __func__ << " failed to do rdma memory registration " << block_size << " bytes. "
+ " relase allocated memory now." << dendl;
+ manager->free(minfo);
return NULL;
}
- m->nbufs = nbufs;
+ minfo->nbufs = chunk_buffer_number;
// save this chunk context
- m->ctx = g_ctx;
+ minfo->ctx = g_ctx;
// note that the memory can be allocated before perf logger is set
- g_ctx->update_stats(nbufs);
+ g_ctx->update_stats(chunk_buffer_number);
/* initialize chunks */
- ch = m->chunks;
- for (unsigned i = 0; i < nbufs; i++) {
- new(ch) Chunk(m->mr, cct->_conf->ms_async_rdma_buffer_size, ch->data, 0, 0, m->mr->lkey);
- ch = reinterpret_cast<Chunk *>(reinterpret_cast<char *>(ch) + rx_buf_size);
+ Chunk *chunk = minfo->chunks;
+ for (unsigned i = 0; i < chunk_buffer_number; i++) {
+ new(chunk) Chunk(minfo->mr, cct->_conf->ms_async_rdma_buffer_size, chunk->data, 0, 0, minfo->mr->lkey);
+ chunk = reinterpret_cast<Chunk *>(reinterpret_cast<char *>(chunk) + chunk_buffer_size);
}
- return reinterpret_cast<char *>(m->chunks);
+ return reinterpret_cast<char *>(minfo->chunks);
}
void* Infiniband::MemoryManager::huge_pages_malloc(size_t size)
{
- size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);
- char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);
+ size_t real_size = ALIGN_TO_PAGE_2MB(size) + HUGE_PAGE_SIZE_2MB;
+ char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_POPULATE | MAP_HUGETLB, -1, 0);
if (ptr == MAP_FAILED) {
ptr = (char *)std::malloc(real_size);
if (ptr == NULL) return NULL;
real_size = 0;
}
*((size_t *)ptr) = real_size;
- return ptr + HUGE_PAGE_SIZE;
+ return ptr + HUGE_PAGE_SIZE_2MB;
}
void Infiniband::MemoryManager::huge_pages_free(void *ptr)
{
if (ptr == NULL) return;
- void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;
+ void *real_ptr = (char *)ptr - HUGE_PAGE_SIZE_2MB;
size_t real_size = *((size_t *)real_ptr);
- ceph_assert(real_size % HUGE_PAGE_SIZE == 0);
+ ceph_assert(real_size % HUGE_PAGE_SIZE_2MB == 0);
if (real_size != 0)
munmap(real_ptr, real_size);
else
std::vector<Chunk*> tx_buffers;
auto it = std::cbegin(pending_bl.buffers());
- auto copy_it = it;
- size_t total = 0;
- size_t need_reserve_bytes = 0;
+ auto copy_start = it;
+ size_t total_copied = 0, wait_copy_len = 0;
while (it != pending_bl.buffers().end()) {
if (infiniband->is_tx_buffer(it->raw_c_str())) {
- if (need_reserve_bytes) {
- size_t copied = tx_copy_chunk(tx_buffers, need_reserve_bytes, copy_it, it);
- total += copied;
- if (copied < need_reserve_bytes)
+ if (wait_copy_len) {
+ size_t copied = tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
+ total_copied += copied;
+ if (copied < wait_copy_len)
goto sending;
- need_reserve_bytes = 0;
+ wait_copy_len = 0;
}
- ceph_assert(copy_it == it);
+ ceph_assert(copy_start == it);
tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str()));
- total += it->length();
- ++copy_it;
+ total_copied += it->length();
+ ++copy_start;
} else {
- need_reserve_bytes += it->length();
+ wait_copy_len += it->length();
}
++it;
}
- if (need_reserve_bytes)
- total += tx_copy_chunk(tx_buffers, need_reserve_bytes, copy_it, it);
+ if (wait_copy_len)
+ total_copied += tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
sending:
- if (total == 0)
+ if (total_copied == 0)
return -EAGAIN;
- ceph_assert(total <= pending_bl.length());
+ ceph_assert(total_copied <= pending_bl.length());
bufferlist swapped;
- if (total < pending_bl.length()) {
+ if (total_copied < pending_bl.length()) {
worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
- pending_bl.splice(total, pending_bl.length()-total, &swapped);
+ pending_bl.splice(total_copied, pending_bl.length() - total_copied, &swapped);
pending_bl.swap(swapped);
} else {
pending_bl.clear();
if (r < 0)
return r;
- ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl;
+ ldout(cct, 20) << __func__ << " finished sending " << total_copied << " bytes." << dendl;
return pending_bl.length() ? -EAGAIN : 0;
}