1. Don't use bare pointer to manage Infiniband obj.
2. access Infiniband obj directly instead of accessing it from
RDMAStack. This could avoid caching RDMAStack obj in RDMAWorker
& RDMADispatcher.
Signed-off-by: Changcheng Liu <changcheng.liu@aliyun.com>
#undef dout_prefix
#define dout_prefix *_dout << " RDMAConnectedSocketImpl "
-RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband> &ib, RDMADispatcher* s,
RDMAWorker *w)
- : cct(cct), connected(0), error(0), infiniband(ib),
+ : cct(cct), connected(0), error(0), ib(ib),
dispatcher(s), worker(w),
is_server(false), con_handler(new C_handle_connection(this)),
active(false), pending(false)
{
if (!cct->_conf->ms_async_rdma_cm) {
- qp = infiniband->create_queue_pair(cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC, NULL);
+ qp = ib->create_queue_pair(cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC, NULL);
my_msg.qpn = qp->get_local_qp_number();
my_msg.psn = qp->get_initial_psn();
- my_msg.lid = infiniband->get_lid();
+ my_msg.lid = ib->get_lid();
my_msg.peer_qpn = 0;
- my_msg.gid = infiniband->get_gid();
+ my_msg.gid = ib->get_gid();
notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
dispatcher->register_qp(qp, this);
dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
qpa.ah_attr.grh.hop_limit = 6;
qpa.ah_attr.grh.dgid = peer_msg.gid;
- qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx();
+ qpa.ah_attr.grh.sgid_index = ib->get_device()->get_gid_idx();
qpa.ah_attr.dlid = peer_msg.lid;
qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp;
qpa.ah_attr.src_path_bits = 0;
- qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port());
+ qpa.ah_attr.port_num = (uint8_t)(ib->get_ib_physical_port());
ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
my_msg.peer_qpn = 0;
- r = infiniband->send_msg(cct, tcp_fd, my_msg);
+ r = ib->send_msg(cct, tcp_fd, my_msg);
if (r < 0)
return r;
void RDMAConnectedSocketImpl::handle_connection() {
ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
- int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);
+ int r = ib->recv_msg(cct, tcp_fd, peer_msg);
if (r <= 0) {
if (r != -EAGAIN) {
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
ceph_assert(!r);
}
notify();
- r = infiniband->send_msg(cct, tcp_fd, my_msg);
+ r = ib->send_msg(cct, tcp_fd, my_msg);
if (r < 0) {
ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
}
r = activate();
ceph_assert(!r);
- r = infiniband->send_msg(cct, tcp_fd, my_msg);
+ r = ib->send_msg(cct, tcp_fd, my_msg);
if (r < 0) {
ldout(cct, 1) << __func__ << " server ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
auto copy_start = it;
size_t total_copied = 0, wait_copy_len = 0;
while (it != pending_bl.buffers().end()) {
- if (infiniband->is_tx_buffer(it->raw_c_str())) {
+ if (ib->is_tx_buffer(it->raw_c_str())) {
if (wait_copy_len) {
size_t copied = tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
total_copied += copied;
wait_copy_len = 0;
}
ceph_assert(copy_start == it);
- tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str()));
+ tx_buffers.push_back(ib->get_tx_chunk_by_buffer(it->raw_c_str()));
total_copied += it->length();
++copy_start;
} else {
void RDMAConnectedSocketImpl::post_chunks_to_rq(int num)
{
- post_backlog += num - infiniband->post_chunks_to_rq(num, qp->get_qp());
+ post_backlog += num - ib->post_chunks_to_rq(num, qp->get_qp());
}
void RDMAConnectedSocketImpl::update_post_backlog()
#define TIMEOUT_MS 3000
#define RETRY_COUNT 7
-RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher* s,
RDMAWorker *w, RDMACMInfo *info)
: RDMAConnectedSocketImpl(cct, ib, s, w), cm_con_handler(new C_handle_cm_connection(this))
{
int RDMAIWARPConnectedSocketImpl::alloc_resource() {
ldout(cct, 30) << __func__ << dendl;
- qp = infiniband->create_queue_pair(cct, dispatcher->get_tx_cq(),
+ qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(),
dispatcher->get_rx_cq(), IBV_QPT_RC, cm_id);
if (!qp) {
return -1;
}
if (!cct->_conf->ms_async_rdma_support_srq)
- dispatcher->post_chunks_to_rq(infiniband->get_rx_queue_len(), qp->get_qp());
+ dispatcher->post_chunks_to_rq(ib->get_rx_queue_len(), qp->get_qp());
dispatcher->register_qp(qp, this);
dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
#define dout_prefix *_dout << " RDMAIWARPServerSocketImpl "
RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl(
- CephContext *cct, Infiniband* i,
+ CephContext *cct, shared_ptr<Infiniband>& ib,
RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a, unsigned addr_slot)
- : RDMAServerSocketImpl(cct, i, s, w, a, addr_slot)
+ : RDMAServerSocketImpl(cct, ib, s, w, a, addr_slot)
{
}
RDMACMInfo info(new_cm_id, event_channel, remote_conn_param->qp_num);
RDMAIWARPConnectedSocketImpl* server =
- new RDMAIWARPConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w), &info);
+ new RDMAIWARPConnectedSocketImpl(cct, ib, dispatcher, dynamic_cast<RDMAWorker*>(w), &info);
memset(&local_conn_param, 0, sizeof(local_conn_param));
local_conn_param.qp_num = server->get_local_qpn();
#define dout_prefix *_dout << " RDMAServerSocketImpl "
RDMAServerSocketImpl::RDMAServerSocketImpl(
- CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w,
+ CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher *s, RDMAWorker *w,
entity_addr_t& a, unsigned slot)
: ServerSocketImpl(a.get_type(), slot),
- cct(cct), net(cct), server_setup_socket(-1), infiniband(i),
+ cct(cct), net(cct), server_setup_socket(-1), ib(ib),
dispatcher(s), worker(w), sa(a)
{
}
RDMAConnectedSocketImpl* server;
//Worker* w = dispatcher->get_stack()->get_worker();
- server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w));
+ server = new RDMAConnectedSocketImpl(cct, ib, dispatcher, dynamic_cast<RDMAWorker*>(w));
server->set_accept_fd(sd);
ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl;
std::unique_ptr<RDMAConnectedSocketImpl> csi(server);
delete async_handler;
}
-RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
- : cct(c), async_handler(new C_handle_cq_async(this)),
+RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s, shared_ptr<Infiniband>& ib)
+ : cct(c), ib(ib), async_handler(new C_handle_cq_async(this)),
stack(s)
{
PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
if (t.joinable())
return; // dispatcher thread already running
- get_stack()->get_infiniband().get_memory_manager()->set_rx_stat_logger(perf_logger);
+ ib->get_memory_manager()->set_rx_stat_logger(perf_logger);
- tx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
+ tx_cc = ib->create_comp_channel(cct);
ceph_assert(tx_cc);
- rx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
+ rx_cc = ib->create_comp_channel(cct);
ceph_assert(rx_cc);
- tx_cq = get_stack()->get_infiniband().create_comp_queue(cct, tx_cc);
+ tx_cq = ib->create_comp_queue(cct, tx_cc);
ceph_assert(tx_cq);
- rx_cq = get_stack()->get_infiniband().create_comp_queue(cct, rx_cc);
+ rx_cq = ib->create_comp_queue(cct, rx_cc);
ceph_assert(rx_cq);
t = std::thread(&RDMADispatcher::polling, this);
ldout(cct, 30) << __func__ << dendl;
while (1) {
ibv_async_event async_event;
- if (ibv_get_async_event(get_stack()->get_infiniband().get_device()->ctxt, &async_event)) {
+ if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
if (errno != EAGAIN)
lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
<< " " << cpp_strerror(errno) << ")" << dendl;
switch (async_event.event_type) {
/***********************CQ events********************/
case IBV_EVENT_CQ_ERR:
- lderr(cct) << __func__ << " CQ Overflow, dev = " << get_stack()->get_infiniband().get_device()->ctxt
+ lderr(cct) << __func__ << " CQ Overflow, dev = " << ib->get_device()->ctxt
<< " Need destroy and recreate resource " << dendl;
break;
/***********************QP events********************/
case IBV_EVENT_QP_FATAL:
/* Error occurred on a QP and it transitioned to error state */
lderr(cct) << __func__ << " Error occurred on a QP and it transitioned to error state, dev = "
- << get_stack()->get_infiniband().get_device()->ctxt << " Need destroy and recreate resource " << dendl;
+ << ib->get_device()->ctxt << " Need destroy and recreate resource " << dendl;
break;
case IBV_EVENT_QP_LAST_WQE_REACHED:
/* Last WQE Reached on a QP associated with and SRQ */
//CA events:
case IBV_EVENT_DEVICE_FATAL:
/* CA is in FATAL state */
- ldout(cct, 1) << __func__ << " ibv_get_async_event: dev = " << get_stack()->get_infiniband().get_device()->ctxt
+ ldout(cct, 1) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
<< " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
break;
default:
- lderr(cct) << __func__ << " ibv_get_async_event: dev = " << get_stack()->get_infiniband().get_device()->ctxt
+ lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
<< " unknown event: " << async_event.event_type << dendl;
}
ibv_ack_async_event(&async_event);
void RDMADispatcher::post_chunk_to_pool(Chunk* chunk)
{
std::lock_guard l{lock};
- get_stack()->get_infiniband().post_chunk_to_pool(chunk);
+ ib->post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
}
int RDMADispatcher::post_chunks_to_rq(int num, ibv_qp *qp)
{
std::lock_guard l{lock};
- return get_stack()->get_infiniband().post_chunks_to_rq(num, qp);
+ return ib->post_chunks_to_rq(num, qp);
}
void RDMADispatcher::polling()
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
ldout(cct, 25) << __func__ << " QP: " << response->qp_num
<< " len: " << response->byte_len << " , addr:" << chunk
- << " " << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
+ << " " << ib->wc_status_to_string(response->status) << dendl;
QueuePair *qp = get_qp(response->qp_num);
if (qp)
} else {
ldout(cct, 1) << __func__ << " send work request returned error for buffer("
<< response->wr_id << ") status(" << response->status << "): "
- << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
+ << ib->wc_status_to_string(response->status) << dendl;
std::lock_guard l{lock};//make sure connected socket alive when pass wc
RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
//TX completion may come either from regular send message or from 'fin' message.
//In the case of 'fin' wr_id points to the QueuePair.
- if (get_stack()->get_infiniband().get_memory_manager()->is_tx_buffer(chunk->buffer)) {
+ if (ib->get_memory_manager()->is_tx_buffer(chunk->buffer)) {
tx_chunks.push_back(chunk);
} else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
return ;
inflight -= chunks.size();
- get_stack()->get_infiniband().get_memory_manager()->return_tx(chunks);
+ ib->get_memory_manager()->return_tx(chunks);
ldout(cct, 30) << __func__ << " release " << chunks.size()
<< " chunks, inflight " << inflight << dendl;
notify_pending_workers();
conn = get_conn_lockless(response->qp_num);
if (!conn) {
ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back." << dendl;
- get_stack()->get_infiniband().post_chunk_to_pool(chunk);
+ ib->post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
} else {
conn->post_chunks_to_rq(1);
perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
<< ") status(" << response->status << ":"
- << get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl;
+ << ib->wc_status_to_string(response->status) << ")" << dendl;
if (response->status != IBV_WC_WR_FLUSH_ERR) {
conn = get_conn_lockless(response->qp_num);
if (conn && conn->is_connected())
conn->fault();
}
- get_stack()->get_infiniband().post_chunk_to_pool(chunk);
+ ib->post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
}
}
int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot,
const SocketOptions &opt,ServerSocket *sock)
{
- Infiniband &ib = stack->get_infiniband();
- ib.init();
+ ib->init();
dispatcher->polling_start();
RDMAServerSocketImpl *p;
if (cct->_conf->ms_async_rdma_type == "iwarp") {
- p = new RDMAIWARPServerSocketImpl(cct, &ib, dispatcher, this, sa, addr_slot);
+ p = new RDMAIWARPServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
} else {
- p = new RDMAServerSocketImpl(cct, &ib, dispatcher, this, sa, addr_slot);
+ p = new RDMAServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
}
int r = p->listen(sa, opt);
if (r < 0) {
int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
{
- Infiniband &ib = stack->get_infiniband();
- ib.init();
+ ib->init();
dispatcher->polling_start();
RDMAConnectedSocketImpl* p;
if (cct->_conf->ms_async_rdma_type == "iwarp") {
- p = new RDMAIWARPConnectedSocketImpl(cct, &ib, dispatcher, this);
+ p = new RDMAIWARPConnectedSocketImpl(cct, ib, dispatcher, this);
} else {
- p = new RDMAConnectedSocketImpl(cct, &ib, dispatcher, this);
+ p = new RDMAConnectedSocketImpl(cct, ib, dispatcher, this);
}
int r = p->try_connect(addr, opts);
int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
{
ceph_assert(center.in_thread());
- int r = stack->get_infiniband().get_tx_buffers(c, bytes);
- size_t got = stack->get_infiniband().get_memory_manager()->get_tx_buffer_size() * r;
+ int r = ib->get_tx_buffers(c, bytes);
+ size_t got = ib->get_memory_manager()->get_tx_buffer_size() * r;
ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
dispatcher->inflight += r;
if (got >= bytes)
}
RDMAStack::RDMAStack(CephContext *cct, const string &t)
- : NetworkStack(cct, t), ib(cct), dispatcher(cct, this)
+ : NetworkStack(cct, t), ib(make_shared<Infiniband>(cct)), dispatcher(cct, this, ib)
{
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
for (unsigned i = 0; i < num; ++i) {
RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
w->set_stack(this);
+ w->set_ib(ib);
}
ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << &dispatcher << dendl;
}
std::thread t;
CephContext *cct;
+ shared_ptr<Infiniband> ib;
Infiniband::CompletionQueue* tx_cq = nullptr;
Infiniband::CompletionQueue* rx_cq = nullptr;
Infiniband::CompletionChannel *tx_cc = nullptr, *rx_cc = nullptr;
public:
PerfCounters *perf_logger;
- explicit RDMADispatcher(CephContext* c, RDMAStack* s);
+ explicit RDMADispatcher(CephContext* c, RDMAStack* s, shared_ptr<Infiniband>& ib);
virtual ~RDMADispatcher();
void handle_async_event();
typedef Infiniband::MemoryManager MemoryManager;
typedef std::vector<Chunk*>::iterator ChunkIter;
RDMAStack *stack;
+ shared_ptr<Infiniband> ib;
EventCallbackRef tx_handler;
std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
RDMADispatcher* dispatcher = nullptr;
}
void handle_pending_message();
void set_stack(RDMAStack *s) { stack = s; }
+ void set_ib(shared_ptr<Infiniband> &ib) {this->ib = ib;}
void notify_worker() {
center.dispatch_event_external(tx_handler);
}
IBSYNMsg my_msg;
int connected;
int error;
- Infiniband* infiniband;
+ shared_ptr<Infiniband> ib;
RDMADispatcher* dispatcher;
RDMAWorker* worker;
std::vector<Chunk*> buffers;
const decltype(std::cbegin(pending_bl.buffers()))& end);
public:
- RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+ RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher* s
RDMAWorker *w);
virtual ~RDMAConnectedSocketImpl();
class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl {
public:
- RDMAIWARPConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+ RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher* s,
RDMAWorker *w, RDMACMInfo *info = nullptr);
~RDMAIWARPConnectedSocketImpl();
virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
CephContext *cct;
NetHandler net;
int server_setup_socket;
- Infiniband* infiniband;
+ shared_ptr<Infiniband> ib;
RDMADispatcher *dispatcher;
RDMAWorker *worker;
entity_addr_t sa;
public:
- RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s,
+ RDMAServerSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher *s,
RDMAWorker *w, entity_addr_t& a, unsigned slot);
virtual int listen(entity_addr_t &sa, const SocketOptions &opt);
class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl {
public:
RDMAIWARPServerSocketImpl(
- CephContext *cct, Infiniband *i, RDMADispatcher *s, RDMAWorker *w,
+ CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher *s, RDMAWorker *w,
entity_addr_t& addr, unsigned addr_slot);
virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override;
virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
class RDMAStack : public NetworkStack {
vector<std::thread> threads;
PerfCounters *perf_counter;
- Infiniband ib;
+ shared_ptr<Infiniband> ib;
RDMADispatcher dispatcher;
std::atomic<bool> fork_finished = {false};
virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
virtual void join_worker(unsigned i) override;
RDMADispatcher &get_dispatcher() { return dispatcher; }
- Infiniband &get_infiniband() { return ib; }
virtual bool is_ready() override { return fork_finished.load(); };
virtual void ready() override { fork_finished = true; };
};