]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: Use RDMA resources only after fork 15262/head
authorAmir Vadai <amir@vadai.me>
Wed, 12 Apr 2017 12:17:56 +0000 (15:17 +0300)
committerAmir Vadai <amir@vadai.me>
Tue, 23 May 2017 14:04:44 +0000 (17:04 +0300)
Thanks to previous patch [1], no need to access RDMA resources before
the fork. Initialize Infiniband class only before a connection is
established or a listener is created. [1] is making sure that the call
to RDMAWorker::listen() is postponed till after the fork.

[1] - 7393db45644d ("msg/async: Postpone bind if network stack is not ready")

While backporting from master branch into the stable branch had to pull
some missing stuff from master:
- Make Infiniband ctor an empty shell. Actual initialization is done
  through a call to init()
- RDMADispatcher polling thread is started only after Infiniband::init()
  is called

(cherry picked from commit 972c7416deae2dd3a763643be6c9334d4edd1c17)

Issue: 995322
Signed-off-by: Amir Vadai <amir@vadai.me>
Change-Id: Iab1c450937713e6c4b83daf03c903e2fe5562ba2

src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index 644cc78dc9f0fee2063b5cdbff4b6d41ed2b745b..2a31eccc4e5632d55878d967cdb9add7c3131ba3 100644 (file)
@@ -17,6 +17,7 @@
 #include "Infiniband.h"
 #include "common/errno.h"
 #include "common/debug.h"
+#include "RDMAStack.h"
 
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
@@ -708,9 +709,22 @@ int Infiniband::MemoryManager::get_channel_buffers(std::vector<Chunk*> &chunks,
 }
 
 
-Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num): device_list(cct)
+Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
+  : cct(cct), lock("IB lock"), device_name(device_name), port_num(port_num)
 {
-  device = device_list.get_device(device_name.c_str());
+}
+
+void Infiniband::init()
+{
+  Mutex::Locker l(lock);
+
+  if (initialized)
+    return;
+
+  device_list = new DeviceList(cct);
+  initialized = true;
+
+  device = device_list->get_device(device_name.c_str());
   device->binding_port(cct, port_num);
   assert(device);
   ib_physical_port = device->active_port->get_port_num();
@@ -743,15 +757,30 @@ Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t
 
   srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);
   post_channel_cluster();
+
+  dispatcher->polling_start();
 }
 
 Infiniband::~Infiniband()
 {
+  if (!initialized)
+    return;
+
+  if (dispatcher)
+    dispatcher->polling_stop();
+
   assert(ibv_destroy_srq(srq) == 0);
   delete memory_manager;
   delete pd;
 }
 
+void Infiniband::set_dispatcher(RDMADispatcher *d)
+{
+  assert(!d ^ !dispatcher);
+
+  dispatcher = d;
+}
+
 /**
  * Create a shared receive queue. This basically wraps the verbs call. 
  *
index d389b27c1cfee7fcbc89ca0743649be5fc08f20d..013eba055256238462ae17550702d509ae38e737 100644 (file)
@@ -122,6 +122,8 @@ class DeviceList {
 };
 
 
+class RDMADispatcher;
+
 class Infiniband {
  public:
   class ProtectionDomain {
@@ -223,13 +225,22 @@ class Infiniband {
   ibv_srq* srq;             // shared receive work queue
   Device *device;
   ProtectionDomain *pd;
-  DeviceList device_list;
+  DeviceList *device_list = nullptr;
+  RDMADispatcher *dispatcher = nullptr;
   void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
   void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
+  CephContext *cct;
+  Mutex lock;
+  bool initialized = false;
+  const std::string &device_name;
+  uint8_t port_num;
 
  public:
   explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);
   ~Infiniband();
+  void init();
+
+  void set_dispatcher(RDMADispatcher *d);
 
   class CompletionChannel {
     static const uint32_t MAX_ACK_EVENT = 5000;
index c89c437306496b5cd99d9ae9b389e2d5aa6a3d83..77cb4b7a9630209d949960cd7ab52c063043da66 100644 (file)
@@ -32,7 +32,7 @@ static Tub<Infiniband> global_infiniband;
 RDMADispatcher::~RDMADispatcher()
 {
   done = true;
-  t.join();
+  polling_stop();
   ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
 
   assert(qp_conns.empty());
@@ -47,21 +47,14 @@ RDMADispatcher::~RDMADispatcher()
   delete tx_cc;
   delete rx_cc;
   delete async_handler;
+
+  global_infiniband->set_dispatcher(nullptr);
 }
 
 RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
   : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
   w_lock("RDMADispatcher::for worker pending list"), stack(s)
 {
-  tx_cc = global_infiniband->create_comp_channel(c);
-  assert(tx_cc);
-  rx_cc = global_infiniband->create_comp_channel(c);
-  assert(rx_cc);
-  tx_cq = global_infiniband->create_comp_queue(c, tx_cc);
-  assert(tx_cq);
-  rx_cq = global_infiniband->create_comp_queue(c, rx_cc);
-  assert(rx_cq);
-
   PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
 
   plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
@@ -88,9 +81,26 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
 
   perf_logger = plb.create_perf_counters();
   cct->get_perfcounters_collection()->add(perf_logger);
+}
+
+void RDMADispatcher::polling_start()
+{
+  tx_cc = global_infiniband->create_comp_channel(cct);
+  assert(tx_cc);
+  rx_cc = global_infiniband->create_comp_channel(cct);
+  assert(rx_cc);
+  tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
+  assert(tx_cq);
+  rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
+  assert(rx_cq);
 
   t = std::thread(&RDMADispatcher::polling, this);
-  cct->register_fork_watcher(this);
+}
+
+void RDMADispatcher::polling_stop()
+{
+  if (t.joinable())
+    t.join();
 }
 
 void RDMADispatcher::handle_async_event()
@@ -314,40 +324,6 @@ void RDMADispatcher::erase_qpn(uint32_t qpn)
   erase_qpn_lockless(qpn);
 }
 
-void RDMADispatcher::handle_pre_fork()
-{
-  done = true;
-  t.join();
-  done = false;
-
-  tx_cc->ack_events();
-  rx_cc->ack_events();
-  delete tx_cq;
-  delete rx_cq;
-  delete tx_cc;
-  delete rx_cc;
-
-  global_infiniband.destroy();
-}
-
-void RDMADispatcher::handle_post_fork()
-{
-  if (!global_infiniband)
-    global_infiniband.construct(
-      cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
-
-  tx_cc = global_infiniband->create_comp_channel(cct);
-  assert(tx_cc);
-  rx_cc = global_infiniband->create_comp_channel(cct);
-  assert(rx_cc);
-  tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
-  assert(tx_cq);
-  rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
-  assert(rx_cq);
-
-  t = std::thread(&RDMADispatcher::polling, this);
-}
-
 void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
 {
   std::vector<Chunk*> tx_chunks;
@@ -455,6 +431,8 @@ void RDMAWorker::initialize()
 
 int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
 {
+  global_infiniband->init();
+
   auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
   int r = p->listen(sa, opt);
   if (r < 0) {
@@ -468,6 +446,8 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket
 
 int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
 {
+  global_infiniband->init();
+
   RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this);
   int r = p->try_connect(addr, opts);
 
@@ -550,6 +530,8 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
       cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
   ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
   dispatcher = new RDMADispatcher(cct, this);
+  global_infiniband->set_dispatcher(dispatcher);
+
   unsigned num = get_num_worker();
   for (unsigned i = 0; i < num; ++i) {
     RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
index aa459731ce923105ae159acc2cf3eb2193895b72..4f93c157f06670c8a39ee17bc5707845d1802fde 100644 (file)
@@ -62,7 +62,7 @@ enum {
 };
 
 
-class RDMADispatcher : public CephContext::ForkWatcher {
+class RDMADispatcher {
   typedef Infiniband::MemoryManager::Chunk Chunk;
   typedef Infiniband::QueuePair QueuePair;
 
@@ -120,6 +120,9 @@ class RDMADispatcher : public CephContext::ForkWatcher {
   explicit RDMADispatcher(CephContext* c, RDMAStack* s);
   virtual ~RDMADispatcher();
   void handle_async_event();
+
+  void polling_start();
+  void polling_stop();
   void polling();
   int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
   void make_pending_worker(RDMAWorker* w) {
@@ -136,8 +139,6 @@ class RDMADispatcher : public CephContext::ForkWatcher {
   Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
   Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
   void notify_pending_workers();
-  virtual void handle_pre_fork() override;
-  virtual void handle_post_fork() override;
   void handle_tx_event(ibv_wc *cqe, int n);
   void post_tx_buffer(std::vector<Chunk*> &chunks);