]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: restart Infiniband resources to handle fork properly 13525/head
authorHaomai Wang <haomai@xsky.com>
Mon, 20 Feb 2017 03:58:15 +0000 (11:58 +0800)
committerHaomai Wang <haomai@xsky.com>
Thu, 2 Mar 2017 03:36:12 +0000 (11:36 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index fcad99bd6d3dd236cd7562c4810ccd213fa57d25..a0ef4a7857334f73db397cfd5d0d262d484545b2 100644 (file)
@@ -127,16 +127,16 @@ Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_
 
 void Device::binding_port(CephContext *cct, uint8_t port_num) {
   port_cnt = device_attr->phys_port_cnt;
-  ports = new Port*[port_cnt];
   for (uint8_t i = 0; i < port_cnt; ++i) {
-    ports[i] = new Port(cct, ctxt, i+1);
-    if (i+1 == port_num && ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) {
-      active_port = ports[i];
+    Port *port = new Port(cct, ctxt, i+1);
+    if (i + 1 == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) {
+      active_port = port;
       ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;
-      return ;
+      break;
     } else {
-      ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << ports[i]->get_port_attr()->state << ")"<< dendl;
+      ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl;
     }
+    delete port;
   }
   if (nullptr == active_port) {
     lderr(cct) << __func__ << "  port not found" << dendl;
@@ -567,16 +567,24 @@ Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
 
 Infiniband::MemoryManager::Cluster::~Cluster()
 {
+  char *p = chunk_base;
+  for (uint32_t i = 0; i < num_chunk; i++){
+    Chunk *chunk = reinterpret_cast<Chunk*>(p);
+    chunk->~Chunk();
+    p += sizeof(Chunk);
+  }
+
   ::free(chunk_base);
   if (manager.enabled_huge_page)
     manager.free_huge_pages(base);
   else
-    delete base;
+    ::free(base);
 }
 
 int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
 {
   assert(!base);
+  num_chunk = num;
   uint32_t bytes = buffer_size * num;
   if (manager.enabled_huge_page) {
     base = (char*)manager.malloc_huge_pages(bytes);
index 71296d3eafc3f284c99a89a390fb3998ebbd9952..325ff39868888111f7c26484171f5f0eb2528dcd 100644 (file)
@@ -67,14 +67,13 @@ class Device {
   ibv_device *device;
   const char* name;
   uint8_t  port_cnt;
-  Port** ports;
  public:
   explicit Device(CephContext *c, ibv_device* d);
   ~Device() {
-    for (uint8_t i = 0; i < port_cnt; ++i)
-      delete ports[i];
-    delete []ports;
-    assert(ibv_close_device(ctxt) == 0);
+    if (active_port) {
+      delete active_port;
+      assert(ibv_close_device(ctxt) == 0);
+    }
   }
   const char* get_name() { return name;}
   uint16_t get_lid() { return active_port->get_lid(); }
@@ -180,6 +179,7 @@ class Infiniband {
 
       MemoryManager& manager;
       uint32_t buffer_size;
+      uint32_t num_chunk;
       Mutex lock;
       std::vector<Chunk*> free_chunks;
       char *base = nullptr;
index f8b8db9c4e00916c29ff65eadf99bec958666dbe..d3ccfbe1ad2b5e842c4ee044017d93b3dcd34873 100644 (file)
 
 #include "include/str_list.h"
 #include "common/deleter.h"
+#include "common/Tub.h"
 #include "RDMAStack.h"
 
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
 #define dout_prefix *_dout << "RDMAStack "
 
-static Infiniband* global_infiniband;
+static Tub<Infiniband> global_infiniband;
 
 RDMADispatcher::~RDMADispatcher()
 {
@@ -44,17 +45,17 @@ RDMADispatcher::~RDMADispatcher()
   delete async_handler;
 }
 
-RDMADispatcher::RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s)
-  : cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
+RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
+  : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
   w_lock("RDMADispatcher::for worker pending list"), stack(s)
 {
-  tx_cc = ib->create_comp_channel(c);
+  tx_cc = global_infiniband->create_comp_channel(c);
   assert(tx_cc);
-  rx_cc = ib->create_comp_channel(c);
+  rx_cc = global_infiniband->create_comp_channel(c);
   assert(rx_cc);
-  tx_cq = ib->create_comp_queue(c, tx_cc);
+  tx_cq = global_infiniband->create_comp_queue(c, tx_cc);
   assert(tx_cq);
-  rx_cq = ib->create_comp_queue(c, rx_cc);
+  rx_cq = global_infiniband->create_comp_queue(c, rx_cc);
   assert(rx_cq);
 
   PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
@@ -92,7 +93,7 @@ void RDMADispatcher::handle_async_event()
   ldout(cct, 30) << __func__ << dendl;
   while (1) {
     ibv_async_event async_event;
-    if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
+    if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) {
       if (errno != EAGAIN)
        lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
                   << " " << cpp_strerror(errno) << ")" << dendl;
@@ -115,7 +116,7 @@ void RDMADispatcher::handle_async_event()
         erase_qpn(qpn);
       }
     } else {
-      ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << ib->get_device()->ctxt
+      ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
                     << " evt: " << ibv_event_type_str(async_event.event_type)
                     << dendl;
     }
@@ -161,8 +162,8 @@ void RDMADispatcher::polling()
         if (response->status == IBV_WC_SUCCESS) {
           conn = get_conn_lockless(response->qp_num);
           if (!conn) {
-            assert(ib->is_rx_buffer(chunk->buffer));
-            r = ib->post_chunk(chunk);
+            assert(global_infiniband->is_rx_buffer(chunk->buffer));
+            r = global_infiniband->post_chunk(chunk);
             ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
             assert(r == 0);
           } else {
@@ -172,9 +173,9 @@ void RDMADispatcher::polling()
           perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
           ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
               << ") status(" << response->status << ":"
-              << ib->wc_status_to_string(response->status) << ")" << dendl;
-          assert(ib->is_rx_buffer(chunk->buffer));
-          r = ib->post_chunk(chunk);
+              << global_infiniband->wc_status_to_string(response->status) << ")" << dendl;
+          assert(global_infiniband->is_rx_buffer(chunk->buffer));
+          r = global_infiniband->post_chunk(chunk);
           if (r) {
             ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl;
             assert(r == 0);
@@ -312,10 +313,32 @@ void RDMADispatcher::handle_pre_fork()
   done = true;
   t.join();
   done = false;
+
+  tx_cc->ack_events();
+  rx_cc->ack_events();
+  delete tx_cq;
+  delete rx_cq;
+  delete tx_cc;
+  delete rx_cc;
+
+  global_infiniband.destroy();
 }
 
 void RDMADispatcher::handle_post_fork()
 {
+  if (!global_infiniband)
+    global_infiniband.construct(
+      cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
+
+  tx_cc = global_infiniband->create_comp_channel(cct);
+  assert(tx_cc);
+  rx_cc = global_infiniband->create_comp_channel(cct);
+  assert(rx_cc);
+  tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
+  assert(tx_cq);
+  rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
+  assert(rx_cq);
+
   t = std::thread(&RDMADispatcher::polling, this);
 }
 
@@ -328,7 +351,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
     Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
     ldout(cct, 25) << __func__ << " QP: " << response->qp_num
                    << " len: " << response->byte_len << " , addr:" << chunk
-                   << " " << ib->wc_status_to_string(response->status) << dendl;
+                   << " " << global_infiniband->wc_status_to_string(response->status) << dendl;
 
     if (response->status != IBV_WC_SUCCESS) {
       perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
@@ -343,7 +366,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
       } else {
         ldout(cct, 1) << __func__ << " send work request returned error for buffer("
                       << response->wr_id << ") status(" << response->status << "): "
-                      << ib->wc_status_to_string(response->status) << dendl;
+                      << global_infiniband->wc_status_to_string(response->status) << dendl;
       }
 
       Mutex::Locker l(lock);//make sure connected socket alive when pass wc
@@ -358,7 +381,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
     }
 
     // FIXME: why not tx?
-    if (ib->get_memory_manager()->is_tx_buffer(chunk->buffer))
+    if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer))
       tx_chunks.push_back(chunk);
     else
       ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
@@ -382,7 +405,7 @@ void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
     return ;
 
   inflight -= chunks.size();
-  ib->get_memory_manager()->return_tx(chunks);
+  global_infiniband->get_memory_manager()->return_tx(chunks);
   ldout(cct, 30) << __func__ << " release " << chunks.size()
                  << " chunks, inflight " << inflight << dendl;
   notify_pending_workers();
@@ -390,8 +413,8 @@ void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
 
 
 RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
-  : Worker(c, i), stack(nullptr), infiniband(NULL),
-    tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL), lock("RDMAWorker::lock")
+  : Worker(c, i), stack(nullptr),
+    tx_handler(new C_handle_cq_tx(this)), lock("RDMAWorker::lock")
 {
   // initialize perf_logger
   char name[128];
@@ -421,13 +444,12 @@ void RDMAWorker::initialize()
 {
   if (!dispatcher) {
     dispatcher = stack->get_dispatcher();
-    memory_manager = infiniband->get_memory_manager();
   }
 }
 
 int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
 {
-  auto p = new RDMAServerSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this, sa);
+  auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
   int r = p->listen(sa, opt);
   if (r < 0) {
     delete p;
@@ -440,7 +462,7 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket
 
 int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
 {
-  RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this);
+  RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this);
   int r = p->try_connect(addr, opts);
 
   if (r < 0) {
@@ -456,9 +478,9 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co
 int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
 {
   assert(center.in_thread());
-  int r = infiniband->get_tx_buffers(c, bytes);
+  int r = global_infiniband->get_tx_buffers(c, bytes);
   assert(r >= 0);
-  size_t got = infiniband->get_memory_manager()->get_tx_chunk_size() * r;
+  size_t got = global_infiniband->get_memory_manager()->get_tx_chunk_size() * r;
   ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered  bytes, inflight " << dispatcher->inflight << dendl;
   stack->get_dispatcher()->inflight += r;
   if (got == bytes)
@@ -501,14 +523,13 @@ void RDMAWorker::handle_pending_message()
 RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
 {
   if (!global_infiniband)
-    global_infiniband = new Infiniband(
+    global_infiniband.construct(
       cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
   ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
-  dispatcher = new RDMADispatcher(cct, global_infiniband, this);
+  dispatcher = new RDMADispatcher(cct, this);
   unsigned num = get_num_worker();
   for (unsigned i = 0; i < num; ++i) {
     RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
-    w->set_ib(global_infiniband);
     w->set_stack(this);
   }
 
index be9e70a826503d1e5d7d43ffc1a98af203d5a432..7657885d6e415f9ed8cc29a1214b5385ebd68cd9 100644 (file)
@@ -67,7 +67,6 @@ class RDMADispatcher : public CephContext::ForkWatcher {
 
   std::thread t;
   CephContext *cct;
-  Infiniband* ib;
   Infiniband::CompletionQueue* tx_cq;
   Infiniband::CompletionQueue* rx_cq;
   Infiniband::CompletionChannel *tx_cc, *rx_cc;
@@ -115,7 +114,7 @@ class RDMADispatcher : public CephContext::ForkWatcher {
  public:
   PerfCounters *perf_logger;
 
-  explicit RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s);
+  explicit RDMADispatcher(CephContext* c, RDMAStack* s);
   virtual ~RDMADispatcher();
   void handle_async_event();
   void polling();
@@ -166,9 +165,7 @@ class RDMAWorker : public Worker {
   typedef Infiniband::MemoryManager MemoryManager;
   typedef std::vector<Chunk*>::iterator ChunkIter;
   RDMAStack *stack;
-  Infiniband *infiniband;
   EventCallbackRef tx_handler;
-  MemoryManager *memory_manager;
   std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
   RDMADispatcher* dispatcher = nullptr;
   Mutex lock;
@@ -196,7 +193,6 @@ class RDMAWorker : public Worker {
     pending_sent_conns.remove(o);
   }
   void handle_pending_message();
-  void set_ib(Infiniband* ib) { infiniband = ib; }
   void set_stack(RDMAStack *s) { stack = s; }
   void notify_worker() {
     center.dispatch_event_external(tx_handler);