From: Alex Mikheev Date: Mon, 12 Jun 2017 08:32:38 +0000 (+0000) Subject: msg/async/rdma: fixes crash in fio X-Git-Tag: v13.0.0~137^2~1 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=6b773887a331eff26faadeba5de7359aaa09efc5;p=ceph-ci.git msg/async/rdma: fixes crash in fio fio creates multiple CephContext in a single process. Crash(es) happen because rdma stack has a global resources that are still used from one ceph context while have already been destroyed by another context. The commit removes global instances of RDMA dispatcher and infiniband and makes them context (rdma stack) specific. Signed-off-by: Adir Lev Signed-off-by: Alex Mikheev --- diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index cf9031e068d..5173feb9434 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -18,6 +18,8 @@ #include "common/errno.h" #include "common/debug.h" #include "RDMAStack.h" +#include +#include #define dout_subsys ceph_subsys_ms #undef dout_prefix @@ -776,9 +778,45 @@ int Infiniband::MemoryManager::get_send_buffers(std::vector &c, size_t b return send->get_buffers(c, bytes); } -Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num) - : cct(cct), lock("IB lock"), device_name(device_name), port_num(port_num) +bool Infiniband::init_prereq = false; + +void Infiniband::verify_prereq(CephContext *cct) { + + //On RDMA MUST be called before fork + int rc = ibv_fork_init(); + if (rc) { + lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl; + ceph_abort(); + } + + ldout(cct, 20) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl; + if (cct->_conf->ms_async_rdma_enable_hugepage){ + rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1); + ldout(cct, 20) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl; + if (rc) { + lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl; + ceph_abort(); + } + } + + //Check ulimit + struct rlimit limit; + getrlimit(RLIMIT_MEMLOCK, &limit); + if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) { + lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory." + " We recommend setting this parameter to infinity" << dendl; + } + init_prereq = true; +} + +Infiniband::Infiniband(CephContext *cct) + : cct(cct), lock("IB lock"), + device_name(cct->_conf->ms_async_rdma_device_name), + port_num( cct->_conf->ms_async_rdma_port_num) { + if (!init_prereq) + verify_prereq(cct); + ldout(cct, 20) << __func__ << " constructing Infiniband..." << dendl; } void Infiniband::init() @@ -836,7 +874,6 @@ void Infiniband::init() srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT); post_chunks_to_srq(rx_queue_len); //add to srq - dispatcher->polling_start(); } Infiniband::~Infiniband() @@ -844,23 +881,11 @@ Infiniband::~Infiniband() if (!initialized) return; - if (dispatcher) - dispatcher->polling_stop(); - ibv_destroy_srq(srq); delete memory_manager; delete pd; } -void Infiniband::set_dispatcher(RDMADispatcher *d) -{ - assert(!d ^ !dispatcher); - - dispatcher = d; - if (dispatcher != nullptr) - MemoryManager::RxAllocator::set_perf_logger(dispatcher->perf_logger); -} - /** * Create a shared receive queue. This basically wraps the verbs call. * diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index ee8fb275858..67b3b9b457a 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -321,7 +321,6 @@ class Infiniband { Device *device = NULL; ProtectionDomain *pd = NULL; DeviceList *device_list = nullptr; - RDMADispatcher *dispatcher = nullptr; void wire_gid_to_gid(const char *wgid, union ibv_gid *gid); void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]); CephContext *cct; @@ -329,13 +328,13 @@ class Infiniband { bool initialized = false; const std::string &device_name; uint8_t port_num; + static bool init_prereq; public: - explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p); + explicit Infiniband(CephContext *c); ~Infiniband(); void init(); - - void set_dispatcher(RDMADispatcher *d); + static void verify_prereq(CephContext *cct); class CompletionChannel { static const uint32_t MAX_ACK_EVENT = 5000; diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index d8a8a63bad6..a477cf531a0 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -27,28 +27,17 @@ #undef dout_prefix #define dout_prefix *_dout << "RDMAStack " -static Tub global_infiniband; - RDMADispatcher::~RDMADispatcher() { - done = true; - polling_stop(); ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl; + polling_stop(); assert(qp_conns.empty()); assert(num_qp_conn == 0); assert(dead_queue_pairs.empty()); assert(num_dead_queue_pair == 0); - tx_cc->ack_events(); - rx_cc->ack_events(); - delete tx_cq; - delete rx_cq; - delete tx_cc; - delete rx_cc; delete async_handler; - - global_infiniband->set_dispatcher(nullptr); } RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s) @@ -86,13 +75,19 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s) void RDMADispatcher::polling_start() { - tx_cc = global_infiniband->create_comp_channel(cct); + // take lock because listen/connect can happen from different worker threads + Mutex::Locker l(lock); + + if (t.joinable()) + return; // dispatcher thread already running + + tx_cc = get_stack()->get_infiniband().create_comp_channel(cct); assert(tx_cc); - rx_cc = global_infiniband->create_comp_channel(cct); + rx_cc = get_stack()->get_infiniband().create_comp_channel(cct); assert(rx_cc); - tx_cq = global_infiniband->create_comp_queue(cct, tx_cc); + tx_cq = get_stack()->get_infiniband().create_comp_queue(cct, tx_cc); assert(tx_cq); - rx_cq = global_infiniband->create_comp_queue(cct, rx_cc); + rx_cq = get_stack()->get_infiniband().create_comp_queue(cct, rx_cc); assert(rx_cq); t = std::thread(&RDMADispatcher::polling, this); @@ -100,8 +95,18 @@ void RDMADispatcher::polling_start() void RDMADispatcher::polling_stop() { - if (t.joinable()) - t.join(); + done = true; + if (!t.joinable()) + return; + + t.join(); + + tx_cc->ack_events(); + rx_cc->ack_events(); + delete tx_cq; + delete rx_cq; + delete tx_cc; + delete rx_cc; } void RDMADispatcher::handle_async_event() @@ -109,7 +114,7 @@ void RDMADispatcher::handle_async_event() ldout(cct, 30) << __func__ << dendl; while (1) { ibv_async_event async_event; - if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) { + if (ibv_get_async_event(get_stack()->get_infiniband().get_device()->ctxt, &async_event)) { if (errno != EAGAIN) lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno << " " << cpp_strerror(errno) << ")" << dendl; @@ -133,7 +138,7 @@ void RDMADispatcher::handle_async_event() erase_qpn_lockless(qpn); } } else { - ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt + ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << get_stack()->get_infiniband().get_device()->ctxt << " evt: " << ibv_event_type_str(async_event.event_type) << dendl; } @@ -143,7 +148,7 @@ void RDMADispatcher::handle_async_event() void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) { Mutex::Locker l(lock); - global_infiniband->post_chunk_to_pool(chunk); + get_stack()->get_infiniband().post_chunk_to_pool(chunk); perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); } @@ -176,7 +181,7 @@ void RDMADispatcher::polling() perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret); Mutex::Locker l(lock);//make sure connected socket alive when pass wc - global_infiniband->post_chunks_to_srq(rx_ret); + get_stack()->get_infiniband().post_chunks_to_srq(rx_ret); for (int i = 0; i < rx_ret; ++i) { ibv_wc* response = &wc[i]; Chunk* chunk = reinterpret_cast(response->wr_id); @@ -188,7 +193,7 @@ void RDMADispatcher::polling() conn = get_conn_lockless(response->qp_num); if (!conn) { ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl; - global_infiniband->post_chunk_to_pool(chunk); + get_stack()->get_infiniband().post_chunk_to_pool(chunk); perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); } else { polled[conn].push_back(*response); @@ -199,12 +204,12 @@ void RDMADispatcher::polling() ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk << ") status(" << response->status << ":" - << global_infiniband->wc_status_to_string(response->status) << ")" << dendl; + << get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl; conn = get_conn_lockless(response->qp_num); if (conn && conn->is_connected()) conn->fault(); - global_infiniband->post_chunk_to_pool(chunk); + get_stack()->get_infiniband().post_chunk_to_pool(chunk); } } for (auto &&i : polled) @@ -335,7 +340,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) Chunk* chunk = reinterpret_cast(response->wr_id); ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk - << " " << global_infiniband->wc_status_to_string(response->status) << dendl; + << " " << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl; if (response->status != IBV_WC_SUCCESS) { perf_logger->inc(l_msgr_rdma_tx_total_wc_errors); @@ -350,7 +355,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) } else { ldout(cct, 1) << __func__ << " send work request returned error for buffer(" << response->wr_id << ") status(" << response->status << "): " - << global_infiniband->wc_status_to_string(response->status) << dendl; + << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl; } Mutex::Locker l(lock);//make sure connected socket alive when pass wc @@ -366,7 +371,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) //TX completion may come either from regular send message or from 'fin' message. //In the case of 'fin' wr_id points to the QueuePair. - if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer)) { + if (get_stack()->get_infiniband().get_memory_manager()->is_tx_buffer(chunk->buffer)) { tx_chunks.push_back(chunk); } else if (reinterpret_cast(response->wr_id)->get_local_qp_number() == response->qp_num ) { ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl; @@ -394,7 +399,7 @@ void RDMADispatcher::post_tx_buffer(std::vector &chunks) return ; inflight -= chunks.size(); - global_infiniband->get_memory_manager()->return_tx(chunks); + get_stack()->get_infiniband().get_memory_manager()->return_tx(chunks); ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << inflight << dendl; notify_pending_workers(); @@ -432,15 +437,16 @@ RDMAWorker::~RDMAWorker() void RDMAWorker::initialize() { if (!dispatcher) { - dispatcher = stack->get_dispatcher(); + dispatcher = &stack->get_dispatcher(); } } int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock) { - global_infiniband->init(); + get_stack()->get_infiniband().init(); + dispatcher->polling_start(); - auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa); + auto p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa); int r = p->listen(sa, opt); if (r < 0) { delete p; @@ -453,9 +459,10 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) { - global_infiniband->init(); + get_stack()->get_infiniband().init(); + dispatcher->polling_start(); - RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this); + RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this); int r = p->try_connect(addr, opts); if (r < 0) { @@ -471,11 +478,11 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes) { assert(center.in_thread()); - int r = global_infiniband->get_tx_buffers(c, bytes); + int r = get_stack()->get_infiniband().get_tx_buffers(c, bytes); assert(r >= 0); - size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r; + size_t got = get_stack()->get_infiniband().get_memory_manager()->get_tx_buffer_size() * r; ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl; - stack->get_dispatcher()->inflight += r; + stack->get_dispatcher().inflight += r; if (got >= bytes) return r; @@ -513,50 +520,17 @@ void RDMAWorker::handle_pending_message() dispatcher->notify_pending_workers(); } -RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t) +RDMAStack::RDMAStack(CephContext *cct, const string &t) + : NetworkStack(cct, t), ib(cct), dispatcher(cct, this) { - // - //On RDMA MUST be called before fork - // - - int rc = ibv_fork_init(); - if (rc) { - lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl; - ceph_abort(); - } - - ldout(cct, 1) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl; - if (cct->_conf->ms_async_rdma_enable_hugepage) { - rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1); - ldout(cct, 1) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl; - if (rc) { - lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl; - ceph_abort(); - } - } - - //Check ulimit - struct rlimit limit; - getrlimit(RLIMIT_MEMLOCK, &limit); - if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) { - lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory." - " We recommend setting this parameter to infinity" << dendl; - } - - if (!global_infiniband) - global_infiniband.construct( - cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num); ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl; - dispatcher = new RDMADispatcher(cct, this); - global_infiniband->set_dispatcher(dispatcher); unsigned num = get_num_worker(); for (unsigned i = 0; i < num; ++i) { RDMAWorker* w = dynamic_cast(get_worker(i)); w->set_stack(this); } - - ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl; + ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << &dispatcher << dendl; } RDMAStack::~RDMAStack() @@ -565,7 +539,7 @@ RDMAStack::~RDMAStack() unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction } - delete dispatcher; + dispatcher.polling_stop(); } void RDMAStack::spawn_worker(unsigned i, std::function &&func) diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index bbb97af1f77..764ea33f39e 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -256,7 +256,9 @@ class RDMAServerSocketImpl : public ServerSocketImpl { class RDMAStack : public NetworkStack { vector threads; - RDMADispatcher *dispatcher; + PerfCounters *perf_counter; + Infiniband ib; + RDMADispatcher dispatcher; std::atomic fork_finished = {false}; @@ -268,10 +270,11 @@ class RDMAStack : public NetworkStack { virtual void spawn_worker(unsigned i, std::function &&func) override; virtual void join_worker(unsigned i) override; - RDMADispatcher *get_dispatcher() { return dispatcher; } - + RDMADispatcher &get_dispatcher() { return dispatcher; } + Infiniband &get_infiniband() { return ib; } virtual bool is_ready() override { return fork_finished.load(); }; virtual void ready() override { fork_finished = true; }; }; + #endif