#include "common/errno.h"
#include "common/debug.h"
#include "RDMAStack.h"
+#include <sys/time.h>
+#include <sys/resource.h>
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
return send->get_buffers(c, bytes);
}
-Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
- : cct(cct), lock("IB lock"), device_name(device_name), port_num(port_num)
+bool Infiniband::init_prereq = false;
+
+void Infiniband::verify_prereq(CephContext *cct) {
+
+ //On RDMA MUST be called before fork
+ int rc = ibv_fork_init();
+ if (rc) {
+ lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
+ ceph_abort();
+ }
+
+ ldout(cct, 20) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl;
+ if (cct->_conf->ms_async_rdma_enable_hugepage){
+ rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1);
+ ldout(cct, 20) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl;
+ if (rc) {
+ lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl;
+ ceph_abort();
+ }
+ }
+
+ //Check ulimit
+ struct rlimit limit;
+ getrlimit(RLIMIT_MEMLOCK, &limit);
+ if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) {
+ lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
+ " We recommend setting this parameter to infinity" << dendl;
+ }
+ init_prereq = true;
+}
+
+Infiniband::Infiniband(CephContext *cct)
+ : cct(cct), lock("IB lock"),
+ device_name(cct->_conf->ms_async_rdma_device_name),
+ port_num( cct->_conf->ms_async_rdma_port_num)
{
+ if (!init_prereq)
+ verify_prereq(cct);
+ ldout(cct, 20) << __func__ << " constructing Infiniband..." << dendl;
}
void Infiniband::init()
srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
post_chunks_to_srq(rx_queue_len); //add to srq
- dispatcher->polling_start();
}
Infiniband::~Infiniband()
if (!initialized)
return;
- if (dispatcher)
- dispatcher->polling_stop();
-
ibv_destroy_srq(srq);
delete memory_manager;
delete pd;
}
-void Infiniband::set_dispatcher(RDMADispatcher *d)
-{
- assert(!d ^ !dispatcher);
-
- dispatcher = d;
- if (dispatcher != nullptr)
- MemoryManager::RxAllocator::set_perf_logger(dispatcher->perf_logger);
-}
-
/**
* Create a shared receive queue. This basically wraps the verbs call.
*
#undef dout_prefix
#define dout_prefix *_dout << "RDMAStack "
-static Tub<Infiniband> global_infiniband;
-
RDMADispatcher::~RDMADispatcher()
{
- done = true;
- polling_stop();
ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
+ polling_stop();
assert(qp_conns.empty());
assert(num_qp_conn == 0);
assert(dead_queue_pairs.empty());
assert(num_dead_queue_pair == 0);
- tx_cc->ack_events();
- rx_cc->ack_events();
- delete tx_cq;
- delete rx_cq;
- delete tx_cc;
- delete rx_cc;
delete async_handler;
-
- global_infiniband->set_dispatcher(nullptr);
}
RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
void RDMADispatcher::polling_start()
{
- tx_cc = global_infiniband->create_comp_channel(cct);
+ // take lock because listen/connect can happen from different worker threads
+ Mutex::Locker l(lock);
+
+ if (t.joinable())
+ return; // dispatcher thread already running
+
+ tx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
assert(tx_cc);
- rx_cc = global_infiniband->create_comp_channel(cct);
+ rx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
assert(rx_cc);
- tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
+ tx_cq = get_stack()->get_infiniband().create_comp_queue(cct, tx_cc);
assert(tx_cq);
- rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
+ rx_cq = get_stack()->get_infiniband().create_comp_queue(cct, rx_cc);
assert(rx_cq);
t = std::thread(&RDMADispatcher::polling, this);
void RDMADispatcher::polling_stop()
{
- if (t.joinable())
- t.join();
+ done = true;
+ if (!t.joinable())
+ return;
+
+ t.join();
+
+ tx_cc->ack_events();
+ rx_cc->ack_events();
+ delete tx_cq;
+ delete rx_cq;
+ delete tx_cc;
+ delete rx_cc;
}
void RDMADispatcher::handle_async_event()
ldout(cct, 30) << __func__ << dendl;
while (1) {
ibv_async_event async_event;
- if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) {
+ if (ibv_get_async_event(get_stack()->get_infiniband().get_device()->ctxt, &async_event)) {
if (errno != EAGAIN)
lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
<< " " << cpp_strerror(errno) << ")" << dendl;
erase_qpn_lockless(qpn);
}
} else {
- ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
+ ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << get_stack()->get_infiniband().get_device()->ctxt
<< " evt: " << ibv_event_type_str(async_event.event_type)
<< dendl;
}
void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) {
Mutex::Locker l(lock);
- global_infiniband->post_chunk_to_pool(chunk);
+ get_stack()->get_infiniband().post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
}
perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret);
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
- global_infiniband->post_chunks_to_srq(rx_ret);
+ get_stack()->get_infiniband().post_chunks_to_srq(rx_ret);
for (int i = 0; i < rx_ret; ++i) {
ibv_wc* response = &wc[i];
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
conn = get_conn_lockless(response->qp_num);
if (!conn) {
ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
- global_infiniband->post_chunk_to_pool(chunk);
+ get_stack()->get_infiniband().post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
} else {
polled[conn].push_back(*response);
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
<< ") status(" << response->status << ":"
- << global_infiniband->wc_status_to_string(response->status) << ")" << dendl;
+ << get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl;
conn = get_conn_lockless(response->qp_num);
if (conn && conn->is_connected())
conn->fault();
- global_infiniband->post_chunk_to_pool(chunk);
+ get_stack()->get_infiniband().post_chunk_to_pool(chunk);
}
}
for (auto &&i : polled)
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
ldout(cct, 25) << __func__ << " QP: " << response->qp_num
<< " len: " << response->byte_len << " , addr:" << chunk
- << " " << global_infiniband->wc_status_to_string(response->status) << dendl;
+ << " " << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
if (response->status != IBV_WC_SUCCESS) {
perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
} else {
ldout(cct, 1) << __func__ << " send work request returned error for buffer("
<< response->wr_id << ") status(" << response->status << "): "
- << global_infiniband->wc_status_to_string(response->status) << dendl;
+ << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
}
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
//TX completion may come either from regular send message or from 'fin' message.
//In the case of 'fin' wr_id points to the QueuePair.
- if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer)) {
+ if (get_stack()->get_infiniband().get_memory_manager()->is_tx_buffer(chunk->buffer)) {
tx_chunks.push_back(chunk);
} else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
return ;
inflight -= chunks.size();
- global_infiniband->get_memory_manager()->return_tx(chunks);
+ get_stack()->get_infiniband().get_memory_manager()->return_tx(chunks);
ldout(cct, 30) << __func__ << " release " << chunks.size()
<< " chunks, inflight " << inflight << dendl;
notify_pending_workers();
void RDMAWorker::initialize()
{
if (!dispatcher) {
- dispatcher = stack->get_dispatcher();
+ dispatcher = &stack->get_dispatcher();
}
}
int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
{
- global_infiniband->init();
+ get_stack()->get_infiniband().init();
+ dispatcher->polling_start();
- auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
+ auto p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa);
int r = p->listen(sa, opt);
if (r < 0) {
delete p;
int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
{
- global_infiniband->init();
+ get_stack()->get_infiniband().init();
+ dispatcher->polling_start();
- RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this);
+ RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this);
int r = p->try_connect(addr, opts);
if (r < 0) {
int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
{
assert(center.in_thread());
- int r = global_infiniband->get_tx_buffers(c, bytes);
+ int r = get_stack()->get_infiniband().get_tx_buffers(c, bytes);
assert(r >= 0);
- size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r;
+ size_t got = get_stack()->get_infiniband().get_memory_manager()->get_tx_buffer_size() * r;
ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
- stack->get_dispatcher()->inflight += r;
+ stack->get_dispatcher().inflight += r;
if (got >= bytes)
return r;
dispatcher->notify_pending_workers();
}
-RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
+RDMAStack::RDMAStack(CephContext *cct, const string &t)
+ : NetworkStack(cct, t), ib(cct), dispatcher(cct, this)
{
- //
- //On RDMA MUST be called before fork
- //
-
- int rc = ibv_fork_init();
- if (rc) {
- lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
- ceph_abort();
- }
-
- ldout(cct, 1) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl;
- if (cct->_conf->ms_async_rdma_enable_hugepage) {
- rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1);
- ldout(cct, 1) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl;
- if (rc) {
- lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl;
- ceph_abort();
- }
- }
-
- //Check ulimit
- struct rlimit limit;
- getrlimit(RLIMIT_MEMLOCK, &limit);
- if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) {
- lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
- " We recommend setting this parameter to infinity" << dendl;
- }
-
- if (!global_infiniband)
- global_infiniband.construct(
- cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
- dispatcher = new RDMADispatcher(cct, this);
- global_infiniband->set_dispatcher(dispatcher);
unsigned num = get_num_worker();
for (unsigned i = 0; i < num; ++i) {
RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
w->set_stack(this);
}
-
- ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl;
+ ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << &dispatcher << dendl;
}
RDMAStack::~RDMAStack()
unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction
}
- delete dispatcher;
+ dispatcher.polling_stop();
}
void RDMAStack::spawn_worker(unsigned i, std::function<void ()> &&func)