From: Amir Vadai Date: Wed, 12 Apr 2017 12:17:56 +0000 (+0300) Subject: msg/async/rdma: Use RDMA resources only after fork X-Git-Tag: v12.1.0~311^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ec2e6931e04a0765d05aa1c54b7ba6ce7f155571;p=ceph-ci.git msg/async/rdma: Use RDMA resources only after fork Thanks to previous patch [1], no need to access RDMA resources before the fork. Initialize Infiniband class only before a connection is established or a listener is created. [1] is making sure that the call to RDMAWorker::listen() is postponed till after the fork. [1] - 7393db45644d ("msg/async: Postpone bind if network stack is not ready") While backporting from master branch into the stable branch had to pull some missing stuff from master: - Make Infiniband ctor an empty shell. Actual initialization is done through a call to init() - RDMADispatcher polling thread is started only after Infiniband::init() is called (cherry picked from commit 972c7416deae2dd3a763643be6c9334d4edd1c17) Issue: 995322 Signed-off-by: Amir Vadai Change-Id: Iab1c450937713e6c4b83daf03c903e2fe5562ba2 --- diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index 644cc78dc9f..2a31eccc4e5 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -17,6 +17,7 @@ #include "Infiniband.h" #include "common/errno.h" #include "common/debug.h" +#include "RDMAStack.h" #define dout_subsys ceph_subsys_ms #undef dout_prefix @@ -708,9 +709,22 @@ int Infiniband::MemoryManager::get_channel_buffers(std::vector &chunks, } -Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num): device_list(cct) +Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num) + : cct(cct), lock("IB lock"), device_name(device_name), port_num(port_num) { - device = device_list.get_device(device_name.c_str()); +} + +void Infiniband::init() +{ + Mutex::Locker l(lock); + + if (initialized) + return; + + device_list = new DeviceList(cct); + initialized = true; + + device = device_list->get_device(device_name.c_str()); device->binding_port(cct, port_num); assert(device); ib_physical_port = device->active_port->get_port_num(); @@ -743,15 +757,30 @@ Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT); post_channel_cluster(); + + dispatcher->polling_start(); } Infiniband::~Infiniband() { + if (!initialized) + return; + + if (dispatcher) + dispatcher->polling_stop(); + assert(ibv_destroy_srq(srq) == 0); delete memory_manager; delete pd; } +void Infiniband::set_dispatcher(RDMADispatcher *d) +{ + assert(!d ^ !dispatcher); + + dispatcher = d; +} + /** * Create a shared receive queue. This basically wraps the verbs call. * diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index d389b27c1cf..013eba05525 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -122,6 +122,8 @@ class DeviceList { }; +class RDMADispatcher; + class Infiniband { public: class ProtectionDomain { @@ -223,13 +225,22 @@ class Infiniband { ibv_srq* srq; // shared receive work queue Device *device; ProtectionDomain *pd; - DeviceList device_list; + DeviceList *device_list = nullptr; + RDMADispatcher *dispatcher = nullptr; void wire_gid_to_gid(const char *wgid, union ibv_gid *gid); void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]); + CephContext *cct; + Mutex lock; + bool initialized = false; + const std::string &device_name; + uint8_t port_num; public: explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p); ~Infiniband(); + void init(); + + void set_dispatcher(RDMADispatcher *d); class CompletionChannel { static const uint32_t MAX_ACK_EVENT = 5000; diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index c89c4373064..77cb4b7a963 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -32,7 +32,7 @@ static Tub global_infiniband; RDMADispatcher::~RDMADispatcher() { done = true; - t.join(); + polling_stop(); ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl; assert(qp_conns.empty()); @@ -47,21 +47,14 @@ RDMADispatcher::~RDMADispatcher() delete tx_cc; delete rx_cc; delete async_handler; + + global_infiniband->set_dispatcher(nullptr); } RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s) : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"), w_lock("RDMADispatcher::for worker pending list"), stack(s) { - tx_cc = global_infiniband->create_comp_channel(c); - assert(tx_cc); - rx_cc = global_infiniband->create_comp_channel(c); - assert(rx_cc); - tx_cq = global_infiniband->create_comp_queue(c, tx_cc); - assert(tx_cq); - rx_cq = global_infiniband->create_comp_queue(c, rx_cc); - assert(rx_cq); - PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last); plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling"); @@ -88,9 +81,26 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s) perf_logger = plb.create_perf_counters(); cct->get_perfcounters_collection()->add(perf_logger); +} + +void RDMADispatcher::polling_start() +{ + tx_cc = global_infiniband->create_comp_channel(cct); + assert(tx_cc); + rx_cc = global_infiniband->create_comp_channel(cct); + assert(rx_cc); + tx_cq = global_infiniband->create_comp_queue(cct, tx_cc); + assert(tx_cq); + rx_cq = global_infiniband->create_comp_queue(cct, rx_cc); + assert(rx_cq); t = std::thread(&RDMADispatcher::polling, this); - cct->register_fork_watcher(this); +} + +void RDMADispatcher::polling_stop() +{ + if (t.joinable()) + t.join(); } void RDMADispatcher::handle_async_event() @@ -314,40 +324,6 @@ void RDMADispatcher::erase_qpn(uint32_t qpn) erase_qpn_lockless(qpn); } -void RDMADispatcher::handle_pre_fork() -{ - done = true; - t.join(); - done = false; - - tx_cc->ack_events(); - rx_cc->ack_events(); - delete tx_cq; - delete rx_cq; - delete tx_cc; - delete rx_cc; - - global_infiniband.destroy(); -} - -void RDMADispatcher::handle_post_fork() -{ - if (!global_infiniband) - global_infiniband.construct( - cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num); - - tx_cc = global_infiniband->create_comp_channel(cct); - assert(tx_cc); - rx_cc = global_infiniband->create_comp_channel(cct); - assert(rx_cc); - tx_cq = global_infiniband->create_comp_queue(cct, tx_cc); - assert(tx_cq); - rx_cq = global_infiniband->create_comp_queue(cct, rx_cc); - assert(rx_cq); - - t = std::thread(&RDMADispatcher::polling, this); -} - void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) { std::vector tx_chunks; @@ -455,6 +431,8 @@ void RDMAWorker::initialize() int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock) { + global_infiniband->init(); + auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa); int r = p->listen(sa, opt); if (r < 0) { @@ -468,6 +446,8 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) { + global_infiniband->init(); + RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this); int r = p->try_connect(addr, opts); @@ -550,6 +530,8 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t) cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num); ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl; dispatcher = new RDMADispatcher(cct, this); + global_infiniband->set_dispatcher(dispatcher); + unsigned num = get_num_worker(); for (unsigned i = 0; i < num; ++i) { RDMAWorker* w = dynamic_cast(get_worker(i)); diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index aa459731ce9..4f93c157f06 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -62,7 +62,7 @@ enum { }; -class RDMADispatcher : public CephContext::ForkWatcher { +class RDMADispatcher { typedef Infiniband::MemoryManager::Chunk Chunk; typedef Infiniband::QueuePair QueuePair; @@ -120,6 +120,9 @@ class RDMADispatcher : public CephContext::ForkWatcher { explicit RDMADispatcher(CephContext* c, RDMAStack* s); virtual ~RDMADispatcher(); void handle_async_event(); + + void polling_start(); + void polling_stop(); void polling(); int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi); void make_pending_worker(RDMAWorker* w) { @@ -136,8 +139,6 @@ class RDMADispatcher : public CephContext::ForkWatcher { Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; } Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; } void notify_pending_workers(); - virtual void handle_pre_fork() override; - virtual void handle_post_fork() override; void handle_tx_event(ibv_wc *cqe, int n); void post_tx_buffer(std::vector &chunks);