]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async/rdma: fixes crash in fio
authorAlex Mikheev <alexm@mellanox.com>
Mon, 12 Jun 2017 08:32:38 +0000 (08:32 +0000)
committerHaomai Wang <haomai@xsky.com>
Fri, 11 Aug 2017 02:00:01 +0000 (10:00 +0800)
fio creates multiple CephContext in a single process.
Crash(es) happen because rdma stack has a global resources that
are still used from one ceph context while have already been destroyed
by another context.

The commit removes global instances of RDMA dispatcher and infiniband
and makes them context (rdma stack) specific.

Signed-off-by: Adir Lev <adirl@mellanox.com>
Signed-off-by: Alex Mikheev <alexm@mellanox.com>
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index cf9031e068d6ac301db19326b7c7ae82ab5f2349..5173feb94340b045c0492fa9efaf36dc28d9233a 100644 (file)
@@ -18,6 +18,8 @@
 #include "common/errno.h"
 #include "common/debug.h"
 #include "RDMAStack.h"
+#include <sys/time.h>
+#include <sys/resource.h>
 
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
@@ -776,9 +778,45 @@ int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t b
   return send->get_buffers(c, bytes);
 }
 
-Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
-  : cct(cct), lock("IB lock"), device_name(device_name), port_num(port_num)
+bool Infiniband::init_prereq = false;
+
+void Infiniband::verify_prereq(CephContext *cct) {
+
+  //On RDMA MUST be called before fork
+   int rc = ibv_fork_init();
+   if (rc) {
+      lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
+      ceph_abort();
+   }
+
+   ldout(cct, 20) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage <<  dendl;
+   if (cct->_conf->ms_async_rdma_enable_hugepage){
+     rc =  setenv("RDMAV_HUGEPAGES_SAFE","1",1);
+     ldout(cct, 20) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") <<  dendl;
+     if (rc) {
+       lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl;
+       ceph_abort();
+     }
+   }
+
+   //Check ulimit
+   struct rlimit limit;
+   getrlimit(RLIMIT_MEMLOCK, &limit);
+   if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) {
+      lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
+                                 " We recommend setting this parameter to infinity" << dendl;
+   }
+   init_prereq = true;
+}
+
+Infiniband::Infiniband(CephContext *cct)
+  : cct(cct), lock("IB lock"),
+    device_name(cct->_conf->ms_async_rdma_device_name),
+    port_num( cct->_conf->ms_async_rdma_port_num)
 {
+  if (!init_prereq)
+    verify_prereq(cct);
+  ldout(cct, 20) << __func__ << " constructing Infiniband..." << dendl;
 }
 
 void Infiniband::init()
@@ -836,7 +874,6 @@ void Infiniband::init()
   srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
 
   post_chunks_to_srq(rx_queue_len); //add to srq
-  dispatcher->polling_start();
 }
 
 Infiniband::~Infiniband()
@@ -844,23 +881,11 @@ Infiniband::~Infiniband()
   if (!initialized)
     return;
 
-  if (dispatcher)
-    dispatcher->polling_stop();
-
   ibv_destroy_srq(srq);
   delete memory_manager;
   delete pd;
 }
 
-void Infiniband::set_dispatcher(RDMADispatcher *d)
-{
-  assert(!d ^ !dispatcher);
-
-  dispatcher = d;
-  if (dispatcher != nullptr)
-    MemoryManager::RxAllocator::set_perf_logger(dispatcher->perf_logger);
-}
-
 /**
  * Create a shared receive queue. This basically wraps the verbs call. 
  *
index ee8fb2758584e374a71607ddd3f00efa5ef2d3fc..67b3b9b457a2595334c93cf3d6349975536787be 100644 (file)
@@ -321,7 +321,6 @@ class Infiniband {
   Device *device = NULL;
   ProtectionDomain *pd = NULL;
   DeviceList *device_list = nullptr;
-  RDMADispatcher *dispatcher = nullptr;
   void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
   void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
   CephContext *cct;
@@ -329,13 +328,13 @@ class Infiniband {
   bool initialized = false;
   const std::string &device_name;
   uint8_t port_num;
+  static bool init_prereq;
 
  public:
-  explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);
+  explicit Infiniband(CephContext *c);
   ~Infiniband();
   void init();
-
-  void set_dispatcher(RDMADispatcher *d);
+  static void verify_prereq(CephContext *cct);
 
   class CompletionChannel {
     static const uint32_t MAX_ACK_EVENT = 5000;
index d8a8a63bad6250c866c9d0f30eb3dbc7c4d49ed3..a477cf531a02ee52dff18a15a81b2941930b4615 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << "RDMAStack "
 
-static Tub<Infiniband> global_infiniband;
-
 RDMADispatcher::~RDMADispatcher()
 {
-  done = true;
-  polling_stop();
   ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
+  polling_stop();
 
   assert(qp_conns.empty());
   assert(num_qp_conn == 0);
   assert(dead_queue_pairs.empty());
   assert(num_dead_queue_pair == 0);
 
-  tx_cc->ack_events();
-  rx_cc->ack_events();
-  delete tx_cq;
-  delete rx_cq;
-  delete tx_cc;
-  delete rx_cc;
   delete async_handler;
-
-  global_infiniband->set_dispatcher(nullptr);
 }
 
 RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
@@ -86,13 +75,19 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
 
 void RDMADispatcher::polling_start()
 {
-  tx_cc = global_infiniband->create_comp_channel(cct);
+  // take lock because listen/connect can happen from different worker threads
+  Mutex::Locker l(lock);
+
+  if (t.joinable()) 
+    return; // dispatcher thread already running 
+
+  tx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
   assert(tx_cc);
-  rx_cc = global_infiniband->create_comp_channel(cct);
+  rx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
   assert(rx_cc);
-  tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
+  tx_cq = get_stack()->get_infiniband().create_comp_queue(cct, tx_cc);
   assert(tx_cq);
-  rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
+  rx_cq = get_stack()->get_infiniband().create_comp_queue(cct, rx_cc);
   assert(rx_cq);
 
   t = std::thread(&RDMADispatcher::polling, this);
@@ -100,8 +95,18 @@ void RDMADispatcher::polling_start()
 
 void RDMADispatcher::polling_stop()
 {
-  if (t.joinable())
-    t.join();
+  done = true;
+  if (!t.joinable())
+    return;
+
+  t.join();
+
+  tx_cc->ack_events();
+  rx_cc->ack_events();
+  delete tx_cq;
+  delete rx_cq;
+  delete tx_cc;
+  delete rx_cc;
 }
 
 void RDMADispatcher::handle_async_event()
@@ -109,7 +114,7 @@ void RDMADispatcher::handle_async_event()
   ldout(cct, 30) << __func__ << dendl;
   while (1) {
     ibv_async_event async_event;
-    if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) {
+    if (ibv_get_async_event(get_stack()->get_infiniband().get_device()->ctxt, &async_event)) {
       if (errno != EAGAIN)
        lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
                   << " " << cpp_strerror(errno) << ")" << dendl;
@@ -133,7 +138,7 @@ void RDMADispatcher::handle_async_event()
         erase_qpn_lockless(qpn);
       }
     } else {
-      ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
+      ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << get_stack()->get_infiniband().get_device()->ctxt
                     << " evt: " << ibv_event_type_str(async_event.event_type)
                     << dendl;
     }
@@ -143,7 +148,7 @@ void RDMADispatcher::handle_async_event()
 
 void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) {
   Mutex::Locker l(lock);
-  global_infiniband->post_chunk_to_pool(chunk);
+  get_stack()->get_infiniband().post_chunk_to_pool(chunk);
   perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
 }
 
@@ -176,7 +181,7 @@ void RDMADispatcher::polling()
       perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret);
 
       Mutex::Locker l(lock);//make sure connected socket alive when pass wc
-      global_infiniband->post_chunks_to_srq(rx_ret);
+      get_stack()->get_infiniband().post_chunks_to_srq(rx_ret);
       for (int i = 0; i < rx_ret; ++i) {
         ibv_wc* response = &wc[i];
         Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
@@ -188,7 +193,7 @@ void RDMADispatcher::polling()
           conn = get_conn_lockless(response->qp_num);
           if (!conn) {
             ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
-            global_infiniband->post_chunk_to_pool(chunk);
+            get_stack()->get_infiniband().post_chunk_to_pool(chunk);
             perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
           } else {
            polled[conn].push_back(*response);
@@ -199,12 +204,12 @@ void RDMADispatcher::polling()
 
           ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
               << ") status(" << response->status << ":"
-              << global_infiniband->wc_status_to_string(response->status) << ")" << dendl;
+              << get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl;
           conn = get_conn_lockless(response->qp_num);
           if (conn && conn->is_connected())
             conn->fault();
 
-          global_infiniband->post_chunk_to_pool(chunk);
+          get_stack()->get_infiniband().post_chunk_to_pool(chunk);
         }
       }
       for (auto &&i : polled)
@@ -335,7 +340,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
     Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
     ldout(cct, 25) << __func__ << " QP: " << response->qp_num
                    << " len: " << response->byte_len << " , addr:" << chunk
-                   << " " << global_infiniband->wc_status_to_string(response->status) << dendl;
+                   << " " << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
 
     if (response->status != IBV_WC_SUCCESS) {
       perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
@@ -350,7 +355,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
       } else {
         ldout(cct, 1) << __func__ << " send work request returned error for buffer("
                       << response->wr_id << ") status(" << response->status << "): "
-                      << global_infiniband->wc_status_to_string(response->status) << dendl;
+                      << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
       }
 
       Mutex::Locker l(lock);//make sure connected socket alive when pass wc
@@ -366,7 +371,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
 
     //TX completion may come either from regular send message or from 'fin' message.
     //In the case of 'fin' wr_id points to the QueuePair.
-    if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer)) {
+    if (get_stack()->get_infiniband().get_memory_manager()->is_tx_buffer(chunk->buffer)) {
       tx_chunks.push_back(chunk);
     } else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
       ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
@@ -394,7 +399,7 @@ void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
     return ;
 
   inflight -= chunks.size();
-  global_infiniband->get_memory_manager()->return_tx(chunks);
+  get_stack()->get_infiniband().get_memory_manager()->return_tx(chunks);
   ldout(cct, 30) << __func__ << " release " << chunks.size()
                  << " chunks, inflight " << inflight << dendl;
   notify_pending_workers();
@@ -432,15 +437,16 @@ RDMAWorker::~RDMAWorker()
 void RDMAWorker::initialize()
 {
   if (!dispatcher) {
-    dispatcher = stack->get_dispatcher();
+    dispatcher = &stack->get_dispatcher();
   }
 }
 
 int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
 {
-  global_infiniband->init();
+  get_stack()->get_infiniband().init();
+  dispatcher->polling_start();
 
-  auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
+  auto p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa);
   int r = p->listen(sa, opt);
   if (r < 0) {
     delete p;
@@ -453,9 +459,10 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket
 
 int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
 {
-  global_infiniband->init();
+  get_stack()->get_infiniband().init();
+  dispatcher->polling_start();
 
-  RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this);
+  RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this);
   int r = p->try_connect(addr, opts);
 
   if (r < 0) {
@@ -471,11 +478,11 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co
 int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
 {
   assert(center.in_thread());
-  int r = global_infiniband->get_tx_buffers(c, bytes);
+  int r = get_stack()->get_infiniband().get_tx_buffers(c, bytes);
   assert(r >= 0);
-  size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r;
+  size_t got = get_stack()->get_infiniband().get_memory_manager()->get_tx_buffer_size() * r;
   ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered  bytes, inflight " << dispatcher->inflight << dendl;
-  stack->get_dispatcher()->inflight += r;
+  stack->get_dispatcher().inflight += r;
   if (got >= bytes)
     return r;
 
@@ -513,50 +520,17 @@ void RDMAWorker::handle_pending_message()
   dispatcher->notify_pending_workers();
 }
 
-RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
+RDMAStack::RDMAStack(CephContext *cct, const string &t)
+  : NetworkStack(cct, t), ib(cct), dispatcher(cct, this)
 {
-  //
-  //On RDMA MUST be called before fork
-  //
-
-  int rc = ibv_fork_init();
-  if (rc) {
-     lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
-     ceph_abort();
-  }
-
-  ldout(cct, 1) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage <<  dendl;
-  if (cct->_conf->ms_async_rdma_enable_hugepage) {
-    rc =  setenv("RDMAV_HUGEPAGES_SAFE","1",1);
-    ldout(cct, 1) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") <<  dendl;
-    if (rc) {
-      lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl;
-      ceph_abort();
-    }
-  }
-
-  //Check ulimit
-  struct rlimit limit;
-  getrlimit(RLIMIT_MEMLOCK, &limit);
-  if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) {
-     lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
-                                 " We recommend setting this parameter to infinity" << dendl;
-  }
-
-  if (!global_infiniband)
-    global_infiniband.construct(
-      cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
   ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
-  dispatcher = new RDMADispatcher(cct, this);
-  global_infiniband->set_dispatcher(dispatcher);
 
   unsigned num = get_num_worker();
   for (unsigned i = 0; i < num; ++i) {
     RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
     w->set_stack(this);
   }
-
-  ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl;
+  ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << &dispatcher << dendl;
 }
 
 RDMAStack::~RDMAStack()
@@ -565,7 +539,7 @@ RDMAStack::~RDMAStack()
     unsetenv("RDMAV_HUGEPAGES_SAFE");  //remove env variable on destruction
   }
 
-  delete dispatcher;
+  dispatcher.polling_stop();
 }
 
 void RDMAStack::spawn_worker(unsigned i, std::function<void ()> &&func)
index bbb97af1f77d19874f63e746cf79a9236dae0c90..764ea33f39e82ceba6e2b6b86a448d65f6add061 100644 (file)
@@ -256,7 +256,9 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
 
 class RDMAStack : public NetworkStack {
   vector<std::thread> threads;
-  RDMADispatcher *dispatcher;
+  PerfCounters *perf_counter;
+  Infiniband ib;
+  RDMADispatcher dispatcher;
 
   std::atomic<bool> fork_finished = {false};
 
@@ -268,10 +270,11 @@ class RDMAStack : public NetworkStack {
 
   virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
   virtual void join_worker(unsigned i) override;
-  RDMADispatcher *get_dispatcher() { return dispatcher; }
-
+  RDMADispatcher &get_dispatcher() { return dispatcher; }
+  Infiniband &get_infiniband() { return ib; }
   virtual bool is_ready() override { return fork_finished.load(); };
   virtual void ready() override { fork_finished = true; };
 };
 
+
 #endif