From: Haomai Wang Date: Mon, 20 Feb 2017 03:58:15 +0000 (+0800) Subject: msg/async/rdma: restart Infiniband resources to handle fork properly X-Git-Tag: v12.0.1~210^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F13525%2Fhead;p=ceph.git msg/async/rdma: restart Infiniband resources to handle fork properly Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index fcad99bd6d3d..a0ef4a785733 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -127,16 +127,16 @@ Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_ void Device::binding_port(CephContext *cct, uint8_t port_num) { port_cnt = device_attr->phys_port_cnt; - ports = new Port*[port_cnt]; for (uint8_t i = 0; i < port_cnt; ++i) { - ports[i] = new Port(cct, ctxt, i+1); - if (i+1 == port_num && ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) { - active_port = ports[i]; + Port *port = new Port(cct, ctxt, i+1); + if (i + 1 == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) { + active_port = port; ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl; - return ; + break; } else { - ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << ports[i]->get_port_attr()->state << ")"<< dendl; + ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl; } + delete port; } if (nullptr == active_port) { lderr(cct) << __func__ << " port not found" << dendl; @@ -567,16 +567,24 @@ Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s) Infiniband::MemoryManager::Cluster::~Cluster() { + char *p = chunk_base; + for (uint32_t i = 0; i < num_chunk; i++){ + Chunk *chunk = reinterpret_cast(p); + chunk->~Chunk(); + p += sizeof(Chunk); + } + ::free(chunk_base); if (manager.enabled_huge_page) manager.free_huge_pages(base); else - delete base; + ::free(base); } int Infiniband::MemoryManager::Cluster::fill(uint32_t num) { assert(!base); + num_chunk = num; uint32_t bytes = buffer_size * num; if (manager.enabled_huge_page) { base = (char*)manager.malloc_huge_pages(bytes); diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index 71296d3eafc3..325ff3986888 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -67,14 +67,13 @@ class Device { ibv_device *device; const char* name; uint8_t port_cnt; - Port** ports; public: explicit Device(CephContext *c, ibv_device* d); ~Device() { - for (uint8_t i = 0; i < port_cnt; ++i) - delete ports[i]; - delete []ports; - assert(ibv_close_device(ctxt) == 0); + if (active_port) { + delete active_port; + assert(ibv_close_device(ctxt) == 0); + } } const char* get_name() { return name;} uint16_t get_lid() { return active_port->get_lid(); } @@ -180,6 +179,7 @@ class Infiniband { MemoryManager& manager; uint32_t buffer_size; + uint32_t num_chunk; Mutex lock; std::vector free_chunks; char *base = nullptr; diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index f8b8db9c4e00..d3ccfbe1ad2b 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -18,13 +18,14 @@ #include "include/str_list.h" #include "common/deleter.h" +#include "common/Tub.h" #include "RDMAStack.h" #define dout_subsys ceph_subsys_ms #undef dout_prefix #define dout_prefix *_dout << "RDMAStack " -static Infiniband* global_infiniband; +static Tub global_infiniband; RDMADispatcher::~RDMADispatcher() { @@ -44,17 +45,17 @@ RDMADispatcher::~RDMADispatcher() delete async_handler; } -RDMADispatcher::RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s) - : cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"), +RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s) + : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"), w_lock("RDMADispatcher::for worker pending list"), stack(s) { - tx_cc = ib->create_comp_channel(c); + tx_cc = global_infiniband->create_comp_channel(c); assert(tx_cc); - rx_cc = ib->create_comp_channel(c); + rx_cc = global_infiniband->create_comp_channel(c); assert(rx_cc); - tx_cq = ib->create_comp_queue(c, tx_cc); + tx_cq = global_infiniband->create_comp_queue(c, tx_cc); assert(tx_cq); - rx_cq = ib->create_comp_queue(c, rx_cc); + rx_cq = global_infiniband->create_comp_queue(c, rx_cc); assert(rx_cq); PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last); @@ -92,7 +93,7 @@ void RDMADispatcher::handle_async_event() ldout(cct, 30) << __func__ << dendl; while (1) { ibv_async_event async_event; - if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) { + if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) { if (errno != EAGAIN) lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno << " " << cpp_strerror(errno) << ")" << dendl; @@ -115,7 +116,7 @@ void RDMADispatcher::handle_async_event() erase_qpn(qpn); } } else { - ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << ib->get_device()->ctxt + ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt << " evt: " << ibv_event_type_str(async_event.event_type) << dendl; } @@ -161,8 +162,8 @@ void RDMADispatcher::polling() if (response->status == IBV_WC_SUCCESS) { conn = get_conn_lockless(response->qp_num); if (!conn) { - assert(ib->is_rx_buffer(chunk->buffer)); - r = ib->post_chunk(chunk); + assert(global_infiniband->is_rx_buffer(chunk->buffer)); + r = global_infiniband->post_chunk(chunk); ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl; assert(r == 0); } else { @@ -172,9 +173,9 @@ void RDMADispatcher::polling() perf_logger->inc(l_msgr_rdma_rx_total_wc_errors); ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk << ") status(" << response->status << ":" - << ib->wc_status_to_string(response->status) << ")" << dendl; - assert(ib->is_rx_buffer(chunk->buffer)); - r = ib->post_chunk(chunk); + << global_infiniband->wc_status_to_string(response->status) << ")" << dendl; + assert(global_infiniband->is_rx_buffer(chunk->buffer)); + r = global_infiniband->post_chunk(chunk); if (r) { ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl; assert(r == 0); @@ -312,10 +313,32 @@ void RDMADispatcher::handle_pre_fork() done = true; t.join(); done = false; + + tx_cc->ack_events(); + rx_cc->ack_events(); + delete tx_cq; + delete rx_cq; + delete tx_cc; + delete rx_cc; + + global_infiniband.destroy(); } void RDMADispatcher::handle_post_fork() { + if (!global_infiniband) + global_infiniband.construct( + cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num); + + tx_cc = global_infiniband->create_comp_channel(cct); + assert(tx_cc); + rx_cc = global_infiniband->create_comp_channel(cct); + assert(rx_cc); + tx_cq = global_infiniband->create_comp_queue(cct, tx_cc); + assert(tx_cq); + rx_cq = global_infiniband->create_comp_queue(cct, rx_cc); + assert(rx_cq); + t = std::thread(&RDMADispatcher::polling, this); } @@ -328,7 +351,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) Chunk* chunk = reinterpret_cast(response->wr_id); ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk - << " " << ib->wc_status_to_string(response->status) << dendl; + << " " << global_infiniband->wc_status_to_string(response->status) << dendl; if (response->status != IBV_WC_SUCCESS) { perf_logger->inc(l_msgr_rdma_tx_total_wc_errors); @@ -343,7 +366,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) } else { ldout(cct, 1) << __func__ << " send work request returned error for buffer(" << response->wr_id << ") status(" << response->status << "): " - << ib->wc_status_to_string(response->status) << dendl; + << global_infiniband->wc_status_to_string(response->status) << dendl; } Mutex::Locker l(lock);//make sure connected socket alive when pass wc @@ -358,7 +381,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) } // FIXME: why not tx? - if (ib->get_memory_manager()->is_tx_buffer(chunk->buffer)) + if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer)) tx_chunks.push_back(chunk); else ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl; @@ -382,7 +405,7 @@ void RDMADispatcher::post_tx_buffer(std::vector &chunks) return ; inflight -= chunks.size(); - ib->get_memory_manager()->return_tx(chunks); + global_infiniband->get_memory_manager()->return_tx(chunks); ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << inflight << dendl; notify_pending_workers(); @@ -390,8 +413,8 @@ void RDMADispatcher::post_tx_buffer(std::vector &chunks) RDMAWorker::RDMAWorker(CephContext *c, unsigned i) - : Worker(c, i), stack(nullptr), infiniband(NULL), - tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL), lock("RDMAWorker::lock") + : Worker(c, i), stack(nullptr), + tx_handler(new C_handle_cq_tx(this)), lock("RDMAWorker::lock") { // initialize perf_logger char name[128]; @@ -421,13 +444,12 @@ void RDMAWorker::initialize() { if (!dispatcher) { dispatcher = stack->get_dispatcher(); - memory_manager = infiniband->get_memory_manager(); } } int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock) { - auto p = new RDMAServerSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this, sa); + auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa); int r = p->listen(sa, opt); if (r < 0) { delete p; @@ -440,7 +462,7 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) { - RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this); + RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this); int r = p->try_connect(addr, opts); if (r < 0) { @@ -456,9 +478,9 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes) { assert(center.in_thread()); - int r = infiniband->get_tx_buffers(c, bytes); + int r = global_infiniband->get_tx_buffers(c, bytes); assert(r >= 0); - size_t got = infiniband->get_memory_manager()->get_tx_chunk_size() * r; + size_t got = global_infiniband->get_memory_manager()->get_tx_chunk_size() * r; ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl; stack->get_dispatcher()->inflight += r; if (got == bytes) @@ -501,14 +523,13 @@ void RDMAWorker::handle_pending_message() RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t) { if (!global_infiniband) - global_infiniband = new Infiniband( + global_infiniband.construct( cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num); ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl; - dispatcher = new RDMADispatcher(cct, global_infiniband, this); + dispatcher = new RDMADispatcher(cct, this); unsigned num = get_num_worker(); for (unsigned i = 0; i < num; ++i) { RDMAWorker* w = dynamic_cast(get_worker(i)); - w->set_ib(global_infiniband); w->set_stack(this); } diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index be9e70a82650..7657885d6e41 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -67,7 +67,6 @@ class RDMADispatcher : public CephContext::ForkWatcher { std::thread t; CephContext *cct; - Infiniband* ib; Infiniband::CompletionQueue* tx_cq; Infiniband::CompletionQueue* rx_cq; Infiniband::CompletionChannel *tx_cc, *rx_cc; @@ -115,7 +114,7 @@ class RDMADispatcher : public CephContext::ForkWatcher { public: PerfCounters *perf_logger; - explicit RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s); + explicit RDMADispatcher(CephContext* c, RDMAStack* s); virtual ~RDMADispatcher(); void handle_async_event(); void polling(); @@ -166,9 +165,7 @@ class RDMAWorker : public Worker { typedef Infiniband::MemoryManager MemoryManager; typedef std::vector::iterator ChunkIter; RDMAStack *stack; - Infiniband *infiniband; EventCallbackRef tx_handler; - MemoryManager *memory_manager; std::list pending_sent_conns; RDMADispatcher* dispatcher = nullptr; Mutex lock; @@ -196,7 +193,6 @@ class RDMAWorker : public Worker { pending_sent_conns.remove(o); } void handle_pending_message(); - void set_ib(Infiniband* ib) { infiniband = ib; } void set_stack(RDMAStack *s) { stack = s; } void notify_worker() { center.dispatch_event_external(tx_handler);