From a39421365bcb375eab895bf01ea1de4e97c2f629 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 8 Aug 2017 17:16:19 +0800 Subject: [PATCH] msg/async/rdma: fix multi cephcontext confllicting Signed-off-by: Haomai Wang --- src/msg/async/rdma/RDMAStack.cc | 74 ++++++++++++++++++++------------- src/msg/async/rdma/RDMAStack.h | 4 +- 2 files changed, 47 insertions(+), 31 deletions(-) diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index d8a8a63bad6..94b0fd31681 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -27,7 +27,20 @@ #undef dout_prefix #define dout_prefix *_dout << "RDMAStack " -static Tub global_infiniband; +struct InfinibandSingleton { + CephContext *cct; + std::shared_ptr ib; + + InfinibandSingleton(CephContext *c): cct(c), ib(nullptr) {} + void ready() { + if (!ib) { + ib.reset(new Infiniband(cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num)); + } + } + ~InfinibandSingleton() { + } +}; + RDMADispatcher::~RDMADispatcher() { @@ -48,11 +61,11 @@ RDMADispatcher::~RDMADispatcher() delete rx_cc; delete async_handler; - global_infiniband->set_dispatcher(nullptr); + ib->set_dispatcher(nullptr); } -RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s) - : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"), +RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s, Infiniband *i) + : cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"), w_lock("RDMADispatcher::for worker pending list"), stack(s) { PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last); @@ -86,13 +99,13 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s) void RDMADispatcher::polling_start() { - tx_cc = global_infiniband->create_comp_channel(cct); + tx_cc = ib->create_comp_channel(cct); assert(tx_cc); - rx_cc = global_infiniband->create_comp_channel(cct); + rx_cc = ib->create_comp_channel(cct); assert(rx_cc); - tx_cq = global_infiniband->create_comp_queue(cct, tx_cc); + tx_cq = ib->create_comp_queue(cct, tx_cc); assert(tx_cq); - rx_cq = global_infiniband->create_comp_queue(cct, rx_cc); + rx_cq = ib->create_comp_queue(cct, rx_cc); assert(rx_cq); t = std::thread(&RDMADispatcher::polling, this); @@ -109,7 +122,7 @@ void RDMADispatcher::handle_async_event() ldout(cct, 30) << __func__ << dendl; while (1) { ibv_async_event async_event; - if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) { + if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) { if (errno != EAGAIN) lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno << " " << cpp_strerror(errno) << ")" << dendl; @@ -133,7 +146,7 @@ void RDMADispatcher::handle_async_event() erase_qpn_lockless(qpn); } } else { - ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt + ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << ib->get_device()->ctxt << " evt: " << ibv_event_type_str(async_event.event_type) << dendl; } @@ -143,7 +156,7 @@ void RDMADispatcher::handle_async_event() void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) { Mutex::Locker l(lock); - global_infiniband->post_chunk_to_pool(chunk); + ib->post_chunk_to_pool(chunk); perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); } @@ -176,7 +189,7 @@ void RDMADispatcher::polling() perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret); Mutex::Locker l(lock);//make sure connected socket alive when pass wc - global_infiniband->post_chunks_to_srq(rx_ret); + ib->post_chunks_to_srq(rx_ret); for (int i = 0; i < rx_ret; ++i) { ibv_wc* response = &wc[i]; Chunk* chunk = reinterpret_cast(response->wr_id); @@ -188,7 +201,7 @@ void RDMADispatcher::polling() conn = get_conn_lockless(response->qp_num); if (!conn) { ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl; - global_infiniband->post_chunk_to_pool(chunk); + ib->post_chunk_to_pool(chunk); perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); } else { polled[conn].push_back(*response); @@ -199,12 +212,12 @@ void RDMADispatcher::polling() ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk << ") status(" << response->status << ":" - << global_infiniband->wc_status_to_string(response->status) << ")" << dendl; + << ib->wc_status_to_string(response->status) << ")" << dendl; conn = get_conn_lockless(response->qp_num); if (conn && conn->is_connected()) conn->fault(); - global_infiniband->post_chunk_to_pool(chunk); + ib->post_chunk_to_pool(chunk); } } for (auto &&i : polled) @@ -335,7 +348,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) Chunk* chunk = reinterpret_cast(response->wr_id); ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk - << " " << global_infiniband->wc_status_to_string(response->status) << dendl; + << " " << ib->wc_status_to_string(response->status) << dendl; if (response->status != IBV_WC_SUCCESS) { perf_logger->inc(l_msgr_rdma_tx_total_wc_errors); @@ -350,7 +363,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) } else { ldout(cct, 1) << __func__ << " send work request returned error for buffer(" << response->wr_id << ") status(" << response->status << "): " - << global_infiniband->wc_status_to_string(response->status) << dendl; + << ib->wc_status_to_string(response->status) << dendl; } Mutex::Locker l(lock);//make sure connected socket alive when pass wc @@ -366,7 +379,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) //TX completion may come either from regular send message or from 'fin' message. //In the case of 'fin' wr_id points to the QueuePair. - if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer)) { + if (ib->get_memory_manager()->is_tx_buffer(chunk->buffer)) { tx_chunks.push_back(chunk); } else if (reinterpret_cast(response->wr_id)->get_local_qp_number() == response->qp_num ) { ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl; @@ -394,7 +407,7 @@ void RDMADispatcher::post_tx_buffer(std::vector &chunks) return ; inflight -= chunks.size(); - global_infiniband->get_memory_manager()->return_tx(chunks); + ib->get_memory_manager()->return_tx(chunks); ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << inflight << dendl; notify_pending_workers(); @@ -438,9 +451,9 @@ void RDMAWorker::initialize() int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock) { - global_infiniband->init(); + dispatcher->get_ib()->init(); - auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa); + auto p = new RDMAServerSocketImpl(cct, dispatcher->get_ib(), get_stack()->get_dispatcher(), this, sa); int r = p->listen(sa, opt); if (r < 0) { delete p; @@ -453,9 +466,9 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) { - global_infiniband->init(); + dispatcher->get_ib()->init(); - RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this); + RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, dispatcher->get_ib(), get_stack()->get_dispatcher(), this); int r = p->try_connect(addr, opts); if (r < 0) { @@ -471,9 +484,9 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes) { assert(center.in_thread()); - int r = global_infiniband->get_tx_buffers(c, bytes); + int r = dispatcher->get_ib()->get_tx_buffers(c, bytes); assert(r >= 0); - size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r; + size_t got = dispatcher->get_ib()->get_memory_manager()->get_tx_buffer_size() * r; ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl; stack->get_dispatcher()->inflight += r; if (got >= bytes) @@ -543,12 +556,13 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t) " We recommend setting this parameter to infinity" << dendl; } - if (!global_infiniband) - global_infiniband.construct( - cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num); + InfinibandSingleton *single; + cct->lookup_or_create_singleton_object(single, "Infiniband"); + single->ready(); + ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl; - dispatcher = new RDMADispatcher(cct, this); - global_infiniband->set_dispatcher(dispatcher); + dispatcher = new RDMADispatcher(cct, this, single->ib.get()); + single->ib->set_dispatcher(dispatcher); unsigned num = get_num_worker(); for (unsigned i = 0; i < num; ++i) { diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index bbb97af1f77..73874fe341d 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -40,6 +40,7 @@ class RDMADispatcher { std::thread t; CephContext *cct; + Infiniband *ib; Infiniband::CompletionQueue* tx_cq; Infiniband::CompletionQueue* rx_cq; Infiniband::CompletionChannel *tx_cc, *rx_cc; @@ -89,7 +90,7 @@ class RDMADispatcher { public: PerfCounters *perf_logger; - explicit RDMADispatcher(CephContext* c, RDMAStack* s); + explicit RDMADispatcher(CephContext* c, RDMAStack* s, Infiniband *ib); virtual ~RDMADispatcher(); void handle_async_event(); @@ -106,6 +107,7 @@ class RDMADispatcher { ++num_pending_workers; } RDMAStack* get_stack() { return stack; } + Infiniband *get_ib() { return ib; } RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp); void erase_qpn_lockless(uint32_t qpn); void erase_qpn(uint32_t qpn); -- 2.39.5