From f8a7e705c5e948f17d3a02f5bf5beabed9d106e4 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Fri, 11 Aug 2017 09:57:41 +0800 Subject: [PATCH] Revert "msg/async/rdma: fix multi cephcontext confllicting" --- src/msg/async/rdma/RDMAStack.cc | 74 +++++++++++++-------------------- src/msg/async/rdma/RDMAStack.h | 4 +- 2 files changed, 31 insertions(+), 47 deletions(-) diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index 94b0fd3168117..d8a8a63bad625 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -27,20 +27,7 @@ #undef dout_prefix #define dout_prefix *_dout << "RDMAStack " -struct InfinibandSingleton { - CephContext *cct; - std::shared_ptr ib; - - InfinibandSingleton(CephContext *c): cct(c), ib(nullptr) {} - void ready() { - if (!ib) { - ib.reset(new Infiniband(cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num)); - } - } - ~InfinibandSingleton() { - } -}; - +static Tub global_infiniband; RDMADispatcher::~RDMADispatcher() { @@ -61,11 +48,11 @@ RDMADispatcher::~RDMADispatcher() delete rx_cc; delete async_handler; - ib->set_dispatcher(nullptr); + global_infiniband->set_dispatcher(nullptr); } -RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s, Infiniband *i) - : cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"), +RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s) + : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"), w_lock("RDMADispatcher::for worker pending list"), stack(s) { PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last); @@ -99,13 +86,13 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s, Infiniband *i) void RDMADispatcher::polling_start() { - tx_cc = ib->create_comp_channel(cct); + tx_cc = global_infiniband->create_comp_channel(cct); assert(tx_cc); - rx_cc = ib->create_comp_channel(cct); + rx_cc = global_infiniband->create_comp_channel(cct); assert(rx_cc); - tx_cq = ib->create_comp_queue(cct, tx_cc); + tx_cq = global_infiniband->create_comp_queue(cct, tx_cc); assert(tx_cq); - rx_cq = ib->create_comp_queue(cct, rx_cc); + rx_cq = global_infiniband->create_comp_queue(cct, rx_cc); assert(rx_cq); t = std::thread(&RDMADispatcher::polling, this); @@ -122,7 +109,7 @@ void RDMADispatcher::handle_async_event() ldout(cct, 30) << __func__ << dendl; while (1) { ibv_async_event async_event; - if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) { + if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) { if (errno != EAGAIN) lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno << " " << cpp_strerror(errno) << ")" << dendl; @@ -146,7 +133,7 @@ void RDMADispatcher::handle_async_event() erase_qpn_lockless(qpn); } } else { - ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << ib->get_device()->ctxt + ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt << " evt: " << ibv_event_type_str(async_event.event_type) << dendl; } @@ -156,7 +143,7 @@ void RDMADispatcher::handle_async_event() void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) { Mutex::Locker l(lock); - ib->post_chunk_to_pool(chunk); + global_infiniband->post_chunk_to_pool(chunk); perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); } @@ -189,7 +176,7 @@ void RDMADispatcher::polling() perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret); Mutex::Locker l(lock);//make sure connected socket alive when pass wc - ib->post_chunks_to_srq(rx_ret); + global_infiniband->post_chunks_to_srq(rx_ret); for (int i = 0; i < rx_ret; ++i) { ibv_wc* response = &wc[i]; Chunk* chunk = reinterpret_cast(response->wr_id); @@ -201,7 +188,7 @@ void RDMADispatcher::polling() conn = get_conn_lockless(response->qp_num); if (!conn) { ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl; - ib->post_chunk_to_pool(chunk); + global_infiniband->post_chunk_to_pool(chunk); perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); } else { polled[conn].push_back(*response); @@ -212,12 +199,12 @@ void RDMADispatcher::polling() ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk << ") status(" << response->status << ":" - << ib->wc_status_to_string(response->status) << ")" << dendl; + << global_infiniband->wc_status_to_string(response->status) << ")" << dendl; conn = get_conn_lockless(response->qp_num); if (conn && conn->is_connected()) conn->fault(); - ib->post_chunk_to_pool(chunk); + global_infiniband->post_chunk_to_pool(chunk); } } for (auto &&i : polled) @@ -348,7 +335,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) Chunk* chunk = reinterpret_cast(response->wr_id); ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk - << " " << ib->wc_status_to_string(response->status) << dendl; + << " " << global_infiniband->wc_status_to_string(response->status) << dendl; if (response->status != IBV_WC_SUCCESS) { perf_logger->inc(l_msgr_rdma_tx_total_wc_errors); @@ -363,7 +350,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) } else { ldout(cct, 1) << __func__ << " send work request returned error for buffer(" << response->wr_id << ") status(" << response->status << "): " - << ib->wc_status_to_string(response->status) << dendl; + << global_infiniband->wc_status_to_string(response->status) << dendl; } Mutex::Locker l(lock);//make sure connected socket alive when pass wc @@ -379,7 +366,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) //TX completion may come either from regular send message or from 'fin' message. //In the case of 'fin' wr_id points to the QueuePair. - if (ib->get_memory_manager()->is_tx_buffer(chunk->buffer)) { + if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer)) { tx_chunks.push_back(chunk); } else if (reinterpret_cast(response->wr_id)->get_local_qp_number() == response->qp_num ) { ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl; @@ -407,7 +394,7 @@ void RDMADispatcher::post_tx_buffer(std::vector &chunks) return ; inflight -= chunks.size(); - ib->get_memory_manager()->return_tx(chunks); + global_infiniband->get_memory_manager()->return_tx(chunks); ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << inflight << dendl; notify_pending_workers(); @@ -451,9 +438,9 @@ void RDMAWorker::initialize() int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock) { - dispatcher->get_ib()->init(); + global_infiniband->init(); - auto p = new RDMAServerSocketImpl(cct, dispatcher->get_ib(), get_stack()->get_dispatcher(), this, sa); + auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa); int r = p->listen(sa, opt); if (r < 0) { delete p; @@ -466,9 +453,9 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) { - dispatcher->get_ib()->init(); + global_infiniband->init(); - RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, dispatcher->get_ib(), get_stack()->get_dispatcher(), this); + RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this); int r = p->try_connect(addr, opts); if (r < 0) { @@ -484,9 +471,9 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes) { assert(center.in_thread()); - int r = dispatcher->get_ib()->get_tx_buffers(c, bytes); + int r = global_infiniband->get_tx_buffers(c, bytes); assert(r >= 0); - size_t got = dispatcher->get_ib()->get_memory_manager()->get_tx_buffer_size() * r; + size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r; ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl; stack->get_dispatcher()->inflight += r; if (got >= bytes) @@ -556,13 +543,12 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t) " We recommend setting this parameter to infinity" << dendl; } - InfinibandSingleton *single; - cct->lookup_or_create_singleton_object(single, "Infiniband"); - single->ready(); - + if (!global_infiniband) + global_infiniband.construct( + cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num); ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl; - dispatcher = new RDMADispatcher(cct, this, single->ib.get()); - single->ib->set_dispatcher(dispatcher); + dispatcher = new RDMADispatcher(cct, this); + global_infiniband->set_dispatcher(dispatcher); unsigned num = get_num_worker(); for (unsigned i = 0; i < num; ++i) { diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index 73874fe341d0f..bbb97af1f77d1 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -40,7 +40,6 @@ class RDMADispatcher { std::thread t; CephContext *cct; - Infiniband *ib; Infiniband::CompletionQueue* tx_cq; Infiniband::CompletionQueue* rx_cq; Infiniband::CompletionChannel *tx_cc, *rx_cc; @@ -90,7 +89,7 @@ class RDMADispatcher { public: PerfCounters *perf_logger; - explicit RDMADispatcher(CephContext* c, RDMAStack* s, Infiniband *ib); + explicit RDMADispatcher(CephContext* c, RDMAStack* s); virtual ~RDMADispatcher(); void handle_async_event(); @@ -107,7 +106,6 @@ class RDMADispatcher { ++num_pending_workers; } RDMAStack* get_stack() { return stack; } - Infiniband *get_ib() { return ib; } RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp); void erase_qpn_lockless(uint32_t qpn); void erase_qpn(uint32_t qpn); -- 2.39.5