#include "include/str_list.h"
#include "common/deleter.h"
+#include "common/Tub.h"
#include "RDMAStack.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix *_dout << "RDMAStack "
-static Infiniband* global_infiniband;
+static Tub<Infiniband> global_infiniband;
RDMADispatcher::~RDMADispatcher()
{
delete async_handler;
}
-RDMADispatcher::RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s)
- : cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
+RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
+ : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
w_lock("RDMADispatcher::for worker pending list"), stack(s)
{
- tx_cc = ib->create_comp_channel(c);
+ tx_cc = global_infiniband->create_comp_channel(c);
assert(tx_cc);
- rx_cc = ib->create_comp_channel(c);
+ rx_cc = global_infiniband->create_comp_channel(c);
assert(rx_cc);
- tx_cq = ib->create_comp_queue(c, tx_cc);
+ tx_cq = global_infiniband->create_comp_queue(c, tx_cc);
assert(tx_cq);
- rx_cq = ib->create_comp_queue(c, rx_cc);
+ rx_cq = global_infiniband->create_comp_queue(c, rx_cc);
assert(rx_cq);
PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
ldout(cct, 30) << __func__ << dendl;
while (1) {
ibv_async_event async_event;
- if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
+ if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) {
if (errno != EAGAIN)
lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
<< " " << cpp_strerror(errno) << ")" << dendl;
erase_qpn(qpn);
}
} else {
- ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << ib->get_device()->ctxt
+ ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
<< " evt: " << ibv_event_type_str(async_event.event_type)
<< dendl;
}
if (response->status == IBV_WC_SUCCESS) {
conn = get_conn_lockless(response->qp_num);
if (!conn) {
- assert(ib->is_rx_buffer(chunk->buffer));
- r = ib->post_chunk(chunk);
+ assert(global_infiniband->is_rx_buffer(chunk->buffer));
+ r = global_infiniband->post_chunk(chunk);
ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
assert(r == 0);
} else {
perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
<< ") status(" << response->status << ":"
- << ib->wc_status_to_string(response->status) << ")" << dendl;
- assert(ib->is_rx_buffer(chunk->buffer));
- r = ib->post_chunk(chunk);
+ << global_infiniband->wc_status_to_string(response->status) << ")" << dendl;
+ assert(global_infiniband->is_rx_buffer(chunk->buffer));
+ r = global_infiniband->post_chunk(chunk);
if (r) {
ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl;
assert(r == 0);
done = true;
t.join();
done = false;
+
+ tx_cc->ack_events();
+ rx_cc->ack_events();
+ delete tx_cq;
+ delete rx_cq;
+ delete tx_cc;
+ delete rx_cc;
+
+ global_infiniband.destroy();
}
void RDMADispatcher::handle_post_fork()
{
+ if (!global_infiniband)
+ global_infiniband.construct(
+ cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
+
+ tx_cc = global_infiniband->create_comp_channel(cct);
+ assert(tx_cc);
+ rx_cc = global_infiniband->create_comp_channel(cct);
+ assert(rx_cc);
+ tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
+ assert(tx_cq);
+ rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
+ assert(rx_cq);
+
t = std::thread(&RDMADispatcher::polling, this);
}
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
ldout(cct, 25) << __func__ << " QP: " << response->qp_num
<< " len: " << response->byte_len << " , addr:" << chunk
- << " " << ib->wc_status_to_string(response->status) << dendl;
+ << " " << global_infiniband->wc_status_to_string(response->status) << dendl;
if (response->status != IBV_WC_SUCCESS) {
perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
} else {
ldout(cct, 1) << __func__ << " send work request returned error for buffer("
<< response->wr_id << ") status(" << response->status << "): "
- << ib->wc_status_to_string(response->status) << dendl;
+ << global_infiniband->wc_status_to_string(response->status) << dendl;
}
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
}
// FIXME: why not tx?
- if (ib->get_memory_manager()->is_tx_buffer(chunk->buffer))
+ if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer))
tx_chunks.push_back(chunk);
else
ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
return ;
inflight -= chunks.size();
- ib->get_memory_manager()->return_tx(chunks);
+ global_infiniband->get_memory_manager()->return_tx(chunks);
ldout(cct, 30) << __func__ << " release " << chunks.size()
<< " chunks, inflight " << inflight << dendl;
notify_pending_workers();
RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
- : Worker(c, i), stack(nullptr), infiniband(NULL),
- tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL), lock("RDMAWorker::lock")
+ : Worker(c, i), stack(nullptr),
+ tx_handler(new C_handle_cq_tx(this)), lock("RDMAWorker::lock")
{
// initialize perf_logger
char name[128];
{
if (!dispatcher) {
dispatcher = stack->get_dispatcher();
- memory_manager = infiniband->get_memory_manager();
}
}
int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
{
- auto p = new RDMAServerSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this, sa);
+ auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
int r = p->listen(sa, opt);
if (r < 0) {
delete p;
int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
{
- RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this);
+ RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this);
int r = p->try_connect(addr, opts);
if (r < 0) {
int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
{
assert(center.in_thread());
- int r = infiniband->get_tx_buffers(c, bytes);
+ int r = global_infiniband->get_tx_buffers(c, bytes);
assert(r >= 0);
- size_t got = infiniband->get_memory_manager()->get_tx_chunk_size() * r;
+ size_t got = global_infiniband->get_memory_manager()->get_tx_chunk_size() * r;
ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
stack->get_dispatcher()->inflight += r;
if (got == bytes)
RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
{
if (!global_infiniband)
- global_infiniband = new Infiniband(
+ global_infiniband.construct(
cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
- dispatcher = new RDMADispatcher(cct, global_infiniband, this);
+ dispatcher = new RDMADispatcher(cct, this);
unsigned num = get_num_worker();
for (unsigned i = 0; i < num; ++i) {
RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
- w->set_ib(global_infiniband);
w->set_stack(this);
}