]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: use shared_ptr to manage Infiniband obj
authorChangcheng Liu <changcheng.liu@aliyun.com>
Wed, 7 Aug 2019 06:19:11 +0000 (14:19 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 23 Aug 2019 06:36:05 +0000 (14:36 +0800)
1. Don't use bare pointer to manage Infiniband obj.

2. access Infiniband obj directly instead of accessing it from
RDMAStack. This could avoid caching RDMAStack obj in RDMAWorker
& RDMADispatcher.

Signed-off-by: Changcheng Liu <changcheng.liu@aliyun.com>
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc
src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc
src/msg/async/rdma/RDMAServerSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index 8fb486834ab86b54e80c0a7fff418811be010430..d4f902de7cf77d4a31e1b108e46cd9e1eb460d49 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << " RDMAConnectedSocketImpl "
 
-RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband> &ib, RDMADispatcher* s,
                                                 RDMAWorker *w)
-  : cct(cct), connected(0), error(0), infiniband(ib),
+  : cct(cct), connected(0), error(0), ib(ib),
     dispatcher(s), worker(w),
     is_server(false), con_handler(new C_handle_connection(this)),
     active(false), pending(false)
 {
   if (!cct->_conf->ms_async_rdma_cm) {
-    qp = infiniband->create_queue_pair(cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC, NULL);
+    qp = ib->create_queue_pair(cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC, NULL);
     my_msg.qpn = qp->get_local_qp_number();
     my_msg.psn = qp->get_initial_psn();
-    my_msg.lid = infiniband->get_lid();
+    my_msg.lid = ib->get_lid();
     my_msg.peer_qpn = 0;
-    my_msg.gid = infiniband->get_gid();
+    my_msg.gid = ib->get_gid();
     notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
     dispatcher->register_qp(qp, this);
     dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
@@ -98,13 +98,13 @@ int RDMAConnectedSocketImpl::activate()
   qpa.ah_attr.grh.hop_limit = 6;
   qpa.ah_attr.grh.dgid = peer_msg.gid;
 
-  qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx();
+  qpa.ah_attr.grh.sgid_index = ib->get_device()->get_gid_idx();
 
   qpa.ah_attr.dlid = peer_msg.lid;
   qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
   qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp;
   qpa.ah_attr.src_path_bits = 0;
-  qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port());
+  qpa.ah_attr.port_num = (uint8_t)(ib->get_ib_physical_port());
 
   ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
 
@@ -189,7 +189,7 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
   ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
   net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
   my_msg.peer_qpn = 0;
-  r = infiniband->send_msg(cct, tcp_fd, my_msg);
+  r = ib->send_msg(cct, tcp_fd, my_msg);
   if (r < 0)
     return r;
 
@@ -199,7 +199,7 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
 
 void RDMAConnectedSocketImpl::handle_connection() {
   ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
-  int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);
+  int r = ib->recv_msg(cct, tcp_fd, peer_msg);
   if (r <= 0) {
     if (r != -EAGAIN) {
       dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
@@ -224,7 +224,7 @@ void RDMAConnectedSocketImpl::handle_connection() {
       ceph_assert(!r);
     }
     notify();
-    r = infiniband->send_msg(cct, tcp_fd, my_msg);
+    r = ib->send_msg(cct, tcp_fd, my_msg);
     if (r < 0) {
       ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
       dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
@@ -238,7 +238,7 @@ void RDMAConnectedSocketImpl::handle_connection() {
       }
       r = activate();
       ceph_assert(!r);
-      r = infiniband->send_msg(cct, tcp_fd, my_msg);
+      r = ib->send_msg(cct, tcp_fd, my_msg);
       if (r < 0) {
         ldout(cct, 1) << __func__ << " server ack failed." << dendl;
         dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
@@ -475,7 +475,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
   auto copy_start = it;
   size_t total_copied = 0, wait_copy_len = 0;
   while (it != pending_bl.buffers().end()) {
-    if (infiniband->is_tx_buffer(it->raw_c_str())) {
+    if (ib->is_tx_buffer(it->raw_c_str())) {
       if (wait_copy_len) {
         size_t copied = tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
         total_copied += copied;
@@ -484,7 +484,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
         wait_copy_len = 0;
       }
       ceph_assert(copy_start == it);
-      tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str()));
+      tx_buffers.push_back(ib->get_tx_chunk_by_buffer(it->raw_c_str()));
       total_copied += it->length();
       ++copy_start;
     } else {
@@ -650,7 +650,7 @@ void RDMAConnectedSocketImpl::set_accept_fd(int sd)
 
 void RDMAConnectedSocketImpl::post_chunks_to_rq(int num)
 {
-  post_backlog += num - infiniband->post_chunks_to_rq(num, qp->get_qp());
+  post_backlog += num - ib->post_chunks_to_rq(num, qp->get_qp());
 }
 
 void RDMAConnectedSocketImpl::update_post_backlog()
index ca39b09f26d4c98e123a51257d9194ea9039feb2..6505bda9cb66ae1da0fba5955feaa75c8b025a82 100644 (file)
@@ -7,7 +7,7 @@
 #define TIMEOUT_MS 3000
 #define RETRY_COUNT 7
 
-RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher* s,
                                                 RDMAWorker *w, RDMACMInfo *info)
   : RDMAConnectedSocketImpl(cct, ib, s, w), cm_con_handler(new C_handle_cm_connection(this))
 {
@@ -156,13 +156,13 @@ void RDMAIWARPConnectedSocketImpl::activate() {
 
 int RDMAIWARPConnectedSocketImpl::alloc_resource() {
   ldout(cct, 30) << __func__ << dendl;
-  qp = infiniband->create_queue_pair(cct, dispatcher->get_tx_cq(),
+  qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(),
       dispatcher->get_rx_cq(), IBV_QPT_RC, cm_id);
   if (!qp) {
     return -1;
   }
   if (!cct->_conf->ms_async_rdma_support_srq)
-    dispatcher->post_chunks_to_rq(infiniband->get_rx_queue_len(), qp->get_qp());
+    dispatcher->post_chunks_to_rq(ib->get_rx_queue_len(), qp->get_qp());
   dispatcher->register_qp(qp, this);
   dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
   dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
index 7ffb8fbe56a5178aea08db805cf32e99e9bfede3..dbbb5427af12be659e86ae198d951a5ff90d166b 100644 (file)
@@ -8,9 +8,9 @@
 #define dout_prefix *_dout << " RDMAIWARPServerSocketImpl "
 
 RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl(
-  CephContext *cct, Infiniband* i,
+  CephContext *cct, shared_ptr<Infiniband>& ib,
   RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a, unsigned addr_slot)
-  : RDMAServerSocketImpl(cct, i, s, w, a, addr_slot)
+  : RDMAServerSocketImpl(cct, ib, s, w, a, addr_slot)
 {
 }
 
@@ -76,7 +76,7 @@ int RDMAIWARPServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions
 
   RDMACMInfo info(new_cm_id, event_channel, remote_conn_param->qp_num);
   RDMAIWARPConnectedSocketImpl* server =
-    new RDMAIWARPConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w), &info);
+    new RDMAIWARPConnectedSocketImpl(cct, ib, dispatcher, dynamic_cast<RDMAWorker*>(w), &info);
 
   memset(&local_conn_param, 0, sizeof(local_conn_param));
   local_conn_param.qp_num = server->get_local_qpn();
index 98402cfd35469a2a852819ce3ab26ac7167e93d1..cdbed1aa114c98caa37438e22568eddaa3cdab33 100644 (file)
 #define dout_prefix *_dout << " RDMAServerSocketImpl "
 
 RDMAServerSocketImpl::RDMAServerSocketImpl(
-  CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w,
+  CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher *s, RDMAWorker *w,
   entity_addr_t& a, unsigned slot)
   : ServerSocketImpl(a.get_type(), slot),
-    cct(cct), net(cct), server_setup_socket(-1), infiniband(i),
+    cct(cct), net(cct), server_setup_socket(-1), ib(ib),
     dispatcher(s), worker(w), sa(a)
 {
 }
@@ -111,7 +111,7 @@ int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt
 
   RDMAConnectedSocketImpl* server;
   //Worker* w = dispatcher->get_stack()->get_worker();
-  server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w));
+  server = new RDMAConnectedSocketImpl(cct, ib, dispatcher, dynamic_cast<RDMAWorker*>(w));
   server->set_accept_fd(sd);
   ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl;
   std::unique_ptr<RDMAConnectedSocketImpl> csi(server);
index 254f45f2a6c9918304f528a9eddc44d14c78ebe7..fa60a59f45eec8c0b37cc426128acc20bace6ed6 100644 (file)
@@ -43,8 +43,8 @@ RDMADispatcher::~RDMADispatcher()
   delete async_handler;
 }
 
-RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
-  : cct(c), async_handler(new C_handle_cq_async(this)),
+RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s, shared_ptr<Infiniband>& ib)
+  : cct(c), ib(ib), async_handler(new C_handle_cq_async(this)),
   stack(s)
 {
   PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
@@ -85,15 +85,15 @@ void RDMADispatcher::polling_start()
   if (t.joinable()) 
     return; // dispatcher thread already running 
 
-  get_stack()->get_infiniband().get_memory_manager()->set_rx_stat_logger(perf_logger);
+  ib->get_memory_manager()->set_rx_stat_logger(perf_logger);
 
-  tx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
+  tx_cc = ib->create_comp_channel(cct);
   ceph_assert(tx_cc);
-  rx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
+  rx_cc = ib->create_comp_channel(cct);
   ceph_assert(rx_cc);
-  tx_cq = get_stack()->get_infiniband().create_comp_queue(cct, tx_cc);
+  tx_cq = ib->create_comp_queue(cct, tx_cc);
   ceph_assert(tx_cq);
-  rx_cq = get_stack()->get_infiniband().create_comp_queue(cct, rx_cc);
+  rx_cq = ib->create_comp_queue(cct, rx_cc);
   ceph_assert(rx_cq);
 
   t = std::thread(&RDMADispatcher::polling, this);
@@ -125,7 +125,7 @@ void RDMADispatcher::handle_async_event()
   ldout(cct, 30) << __func__ << dendl;
   while (1) {
     ibv_async_event async_event;
-    if (ibv_get_async_event(get_stack()->get_infiniband().get_device()->ctxt, &async_event)) {
+    if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
       if (errno != EAGAIN)
        lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
                   << " " << cpp_strerror(errno) << ")" << dendl;
@@ -135,14 +135,14 @@ void RDMADispatcher::handle_async_event()
     switch (async_event.event_type) {
       /***********************CQ events********************/
       case IBV_EVENT_CQ_ERR:
-        lderr(cct) << __func__ << " CQ Overflow, dev = " << get_stack()->get_infiniband().get_device()->ctxt
+        lderr(cct) << __func__ << " CQ Overflow, dev = " << ib->get_device()->ctxt
                    << " Need destroy and recreate resource " << dendl;
         break;
       /***********************QP events********************/
       case IBV_EVENT_QP_FATAL:
         /* Error occurred on a QP and it transitioned to error state */
         lderr(cct) << __func__ << " Error occurred on a QP and it transitioned to error state, dev = "
-                   << get_stack()->get_infiniband().get_device()->ctxt << " Need destroy and recreate resource " << dendl;
+                   << ib->get_device()->ctxt << " Need destroy and recreate resource " << dendl;
         break;
       case IBV_EVENT_QP_LAST_WQE_REACHED:
         /* Last WQE Reached on a QP associated with and SRQ */
@@ -222,11 +222,11 @@ void RDMADispatcher::handle_async_event()
       //CA events:
       case IBV_EVENT_DEVICE_FATAL:
         /* CA is in FATAL state */
-        ldout(cct, 1) << __func__ << " ibv_get_async_event: dev = " << get_stack()->get_infiniband().get_device()->ctxt
+        ldout(cct, 1) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
                       << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
        break;
       default:
-        lderr(cct) << __func__ << " ibv_get_async_event: dev = " << get_stack()->get_infiniband().get_device()->ctxt
+        lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
                    << " unknown event: " << async_event.event_type << dendl;
     }
     ibv_ack_async_event(&async_event);
@@ -236,14 +236,14 @@ void RDMADispatcher::handle_async_event()
 void RDMADispatcher::post_chunk_to_pool(Chunk* chunk)
 {
   std::lock_guard l{lock};
-  get_stack()->get_infiniband().post_chunk_to_pool(chunk);
+  ib->post_chunk_to_pool(chunk);
   perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
 }
 
 int RDMADispatcher::post_chunks_to_rq(int num, ibv_qp *qp)
 {
   std::lock_guard l{lock};
-  return get_stack()->get_infiniband().post_chunks_to_rq(num, qp);
+  return ib->post_chunks_to_rq(num, qp);
 }
 
 void RDMADispatcher::polling()
@@ -418,7 +418,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
     Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
     ldout(cct, 25) << __func__ << " QP: " << response->qp_num
                    << " len: " << response->byte_len << " , addr:" << chunk
-                   << " " << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
+                   << " " << ib->wc_status_to_string(response->status) << dendl;
 
     QueuePair *qp = get_qp(response->qp_num);
     if (qp)
@@ -437,7 +437,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
       } else {
         ldout(cct, 1) << __func__ << " send work request returned error for buffer("
                       << response->wr_id << ") status(" << response->status << "): "
-                      << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
+                      << ib->wc_status_to_string(response->status) << dendl;
        std::lock_guard l{lock};//make sure connected socket alive when pass wc
         RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
 
@@ -452,7 +452,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
 
     //TX completion may come either from regular send message or from 'fin' message.
     //In the case of 'fin' wr_id points to the QueuePair.
-    if (get_stack()->get_infiniband().get_memory_manager()->is_tx_buffer(chunk->buffer)) {
+    if (ib->get_memory_manager()->is_tx_buffer(chunk->buffer)) {
       tx_chunks.push_back(chunk);
     } else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
       ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
@@ -480,7 +480,7 @@ void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
     return ;
 
   inflight -= chunks.size();
-  get_stack()->get_infiniband().get_memory_manager()->return_tx(chunks);
+  ib->get_memory_manager()->return_tx(chunks);
   ldout(cct, 30) << __func__ << " release " << chunks.size()
                  << " chunks, inflight " << inflight << dendl;
   notify_pending_workers();
@@ -505,7 +505,7 @@ void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number)
       conn = get_conn_lockless(response->qp_num);
       if (!conn) {
         ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back." << dendl;
-        get_stack()->get_infiniband().post_chunk_to_pool(chunk);
+        ib->post_chunk_to_pool(chunk);
         perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
       } else {
         conn->post_chunks_to_rq(1);
@@ -515,13 +515,13 @@ void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number)
       perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
       ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
                     << ") status(" << response->status << ":"
-                    << get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl;
+                    << ib->wc_status_to_string(response->status) << ")" << dendl;
       if (response->status != IBV_WC_WR_FLUSH_ERR) {
         conn = get_conn_lockless(response->qp_num);
         if (conn && conn->is_connected())
           conn->fault();
       }
-      get_stack()->get_infiniband().post_chunk_to_pool(chunk);
+      ib->post_chunk_to_pool(chunk);
       perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
     }
   }
@@ -569,15 +569,14 @@ void RDMAWorker::initialize()
 int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot,
                       const SocketOptions &opt,ServerSocket *sock)
 {
-  Infiniband &ib = stack->get_infiniband();
-  ib.init();
+  ib->init();
   dispatcher->polling_start();
 
   RDMAServerSocketImpl *p;
   if (cct->_conf->ms_async_rdma_type == "iwarp") {
-    p = new RDMAIWARPServerSocketImpl(cct, &ib, dispatcher, this, sa, addr_slot);
+    p = new RDMAIWARPServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
   } else {
-    p = new RDMAServerSocketImpl(cct, &ib, dispatcher, this, sa, addr_slot);
+    p = new RDMAServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
   }
   int r = p->listen(sa, opt);
   if (r < 0) {
@@ -591,15 +590,14 @@ int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot,
 
 int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
 {
-  Infiniband &ib = stack->get_infiniband();
-  ib.init();
+  ib->init();
   dispatcher->polling_start();
 
   RDMAConnectedSocketImpl* p;
   if (cct->_conf->ms_async_rdma_type == "iwarp") {
-    p = new RDMAIWARPConnectedSocketImpl(cct, &ib, dispatcher, this);
+    p = new RDMAIWARPConnectedSocketImpl(cct, ib, dispatcher, this);
   } else {
-    p = new RDMAConnectedSocketImpl(cct, &ib, dispatcher, this);
+    p = new RDMAConnectedSocketImpl(cct, ib, dispatcher, this);
   }
   int r = p->try_connect(addr, opts);
 
@@ -616,8 +614,8 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co
 int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
 {
   ceph_assert(center.in_thread());
-  int r = stack->get_infiniband().get_tx_buffers(c, bytes);
-  size_t got = stack->get_infiniband().get_memory_manager()->get_tx_buffer_size() * r;
+  int r = ib->get_tx_buffers(c, bytes);
+  size_t got = ib->get_memory_manager()->get_tx_buffer_size() * r;
   ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered  bytes, inflight " << dispatcher->inflight << dendl;
   dispatcher->inflight += r;
   if (got >= bytes)
@@ -658,7 +656,7 @@ void RDMAWorker::handle_pending_message()
 }
 
 RDMAStack::RDMAStack(CephContext *cct, const string &t)
-  : NetworkStack(cct, t), ib(cct), dispatcher(cct, this)
+  : NetworkStack(cct, t), ib(make_shared<Infiniband>(cct)), dispatcher(cct, this, ib)
 {
   ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
 
@@ -666,6 +664,7 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t)
   for (unsigned i = 0; i < num; ++i) {
     RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
     w->set_stack(this);
+    w->set_ib(ib);
   }
   ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << &dispatcher << dendl;
 }
index 13a80f68a5cf39ac7267104c98b03038c9805f34..cf180f892f52e4717dfc6b5dc0694cd72243a476 100644 (file)
@@ -40,6 +40,7 @@ class RDMADispatcher {
 
   std::thread t;
   CephContext *cct;
+  shared_ptr<Infiniband> ib;
   Infiniband::CompletionQueue* tx_cq = nullptr;
   Infiniband::CompletionQueue* rx_cq = nullptr;
   Infiniband::CompletionChannel *tx_cc = nullptr, *rx_cc = nullptr;
@@ -92,7 +93,7 @@ class RDMADispatcher {
  public:
   PerfCounters *perf_logger;
 
-  explicit RDMADispatcher(CephContext* c, RDMAStack* s);
+  explicit RDMADispatcher(CephContext* c, RDMAStack* s, shared_ptr<Infiniband>& ib);
   virtual ~RDMADispatcher();
   void handle_async_event();
 
@@ -133,6 +134,7 @@ class RDMAWorker : public Worker {
   typedef Infiniband::MemoryManager MemoryManager;
   typedef std::vector<Chunk*>::iterator ChunkIter;
   RDMAStack *stack;
+  shared_ptr<Infiniband> ib;
   EventCallbackRef tx_handler;
   std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
   RDMADispatcher* dispatcher = nullptr;
@@ -164,6 +166,7 @@ class RDMAWorker : public Worker {
   }
   void handle_pending_message();
   void set_stack(RDMAStack *s) { stack = s; }
+  void set_ib(shared_ptr<Infiniband> &ib) {this->ib = ib;}
   void notify_worker() {
     center.dispatch_event_external(tx_handler);
   }
@@ -190,7 +193,7 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
   IBSYNMsg my_msg;
   int connected;
   int error;
-  Infiniband* infiniband;
+  shared_ptr<Infiniband> ib;
   RDMADispatcher* dispatcher;
   RDMAWorker* worker;
   std::vector<Chunk*> buffers;
@@ -215,7 +218,7 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
       const decltype(std::cbegin(pending_bl.buffers()))& end);
 
  public:
-  RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+  RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher* s
                           RDMAWorker *w);
   virtual ~RDMAConnectedSocketImpl();
 
@@ -272,7 +275,7 @@ enum RDMA_CM_STATUS {
 
 class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl {
   public:
-    RDMAIWARPConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+    RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher* s,
                           RDMAWorker *w, RDMACMInfo *info = nullptr);
     ~RDMAIWARPConnectedSocketImpl();
     virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
@@ -312,13 +315,13 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
     CephContext *cct;
     NetHandler net;
     int server_setup_socket;
-    Infiniband* infiniband;
+    shared_ptr<Infiniband> ib;
     RDMADispatcher *dispatcher;
     RDMAWorker *worker;
     entity_addr_t sa;
 
  public:
-  RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s,
+  RDMAServerSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher *s,
                       RDMAWorker *w, entity_addr_t& a, unsigned slot);
 
   virtual int listen(entity_addr_t &sa, const SocketOptions &opt);
@@ -330,7 +333,7 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
 class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl {
   public:
     RDMAIWARPServerSocketImpl(
-      CephContext *cct, Infiniband *i, RDMADispatcher *s, RDMAWorker *w,
+      CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher *s, RDMAWorker *w,
       entity_addr_t& addr, unsigned addr_slot);
     virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override;
     virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
@@ -343,7 +346,7 @@ class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl {
 class RDMAStack : public NetworkStack {
   vector<std::thread> threads;
   PerfCounters *perf_counter;
-  Infiniband ib;
+  shared_ptr<Infiniband> ib;
   RDMADispatcher dispatcher;
 
   std::atomic<bool> fork_finished = {false};
@@ -357,7 +360,6 @@ class RDMAStack : public NetworkStack {
   virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
   virtual void join_worker(unsigned i) override;
   RDMADispatcher &get_dispatcher() { return dispatcher; }
-  Infiniband &get_infiniband() { return ib; }
   virtual bool is_ready() override { return fork_finished.load(); };
   virtual void ready() override { fork_finished = true; };
 };