]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async/rdma: Initialize device on first connect
authorAmir Vadai <amir@vadai.me>
Mon, 6 Feb 2017 17:31:20 +0000 (19:31 +0200)
committerAdir Lev <adirl@mellanox.com>
Tue, 28 Mar 2017 06:49:41 +0000 (09:49 +0300)
Allocate Device's IB resources only when first connect for a device is
called.

Also, removed Infiniband::MemoryManager::Chunk::post_srq() which is not
used anywhere in the code.

Issue: 995322
Change-Id: I2ac1e36aff645ad7d8bbc06c87530bf33c4b3ecf
Signed-off-by: Amir Vadai <amir@vadai.me>
src/msg/async/rdma/Device.cc
src/msg/async/rdma/Device.h
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAServerSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index 58700db2c210e67391d40be35feedf5d0b4e3f3b..d993d9ca5c043c2048fc2013e4ef4e794f044a7f 100644 (file)
@@ -138,7 +138,7 @@ Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d)
   assert(NetHandler(cct).set_nonblock(ctxt->async_fd) == 0);
 }
 
-void Device::init()
+void Device::init(int ibport)
 {
   Mutex::Locker l(lock);
 
@@ -170,7 +170,10 @@ void Device::init()
   rx_cq = create_comp_queue(cct, rx_cc);
   assert(rx_cq);
 
+  binding_port(cct, ibport);
+
   initialized = true;
+
   ldout(cct, 5) << __func__ << ":" << __LINE__ << " device " << *this << " is initialized" << dendl;
 }
 
@@ -364,7 +367,8 @@ void Device::handle_async_event()
   ldout(cct, 30) << __func__ << dendl;
 
   while (!ibv_get_async_event(ctxt, &async_event)) {
-    infiniband->process_async_event(async_event);
+    RDMADispatcher *d = infiniband->get_dispatcher();
+    d->process_async_event(this, async_event);
 
     ibv_ack_async_event(&async_event);
   }
@@ -495,3 +499,9 @@ void DeviceList::handle_async_event()
   for (int i = 0; i < num; i++)
     devices[i]->handle_async_event();
 }
+
+void DeviceList::uninit()
+{
+  for (int i = 0; i < num; i++)
+    devices[i]->uninit();
+}
index 2fb46dc13b4c11514638b3c49c4a0d227714e016..5e99bf56cca19e4220f09d5c40b8d72d5438c7f2 100644 (file)
@@ -85,7 +85,7 @@ class Device {
   explicit Device(CephContext *c, Infiniband *ib, ibv_device* d);
   ~Device();
 
-  void init();
+  void init(int ibport = -1);
   void uninit();
 
   void handle_async_event();
@@ -146,6 +146,8 @@ class DeviceList {
 
   Device* get_device(const char* device_name);
 
+  void uninit();
+
   void rearm_notify();
   int poll_tx(int n, Device **d, ibv_wc *wc);
   int poll_rx(int n, Device **d, ibv_wc *wc);
index 9fe77b6742206d4843219378e8979a9b7adb8a34..74f5c0436abd3d2088d49bf9616da90eb6628425 100644 (file)
@@ -437,11 +437,6 @@ void Infiniband::MemoryManager::Chunk::clear()
   bound = 0;
 }
 
-void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib)
-{
-  ib->device->post_chunk(this);
-}
-
 Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
   : manager(m), buffer_size(s), lock("cluster_lock")
 {
@@ -593,16 +588,9 @@ int Infiniband::MemoryManager::get_channel_buffers(std::vector<Chunk*> &chunks,
 }
 
 
-Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
+Infiniband::Infiniband(CephContext *cct)
   : device_list(new DeviceList(cct, this))
 {
-  device = device_list->get_device(device_name.c_str());
-
-  device->init();
-
-  device->binding_port(cct, port_num);
-  assert(device);
-  ib_physical_port = device->active_port->get_port_num();
 }
 
 Infiniband::~Infiniband()
@@ -620,6 +608,11 @@ void Infiniband::set_dispatcher(RDMADispatcher *d)
   dispatcher = d;
 }
 
+Device* Infiniband::get_device(const char* device_name)
+{
+  return device_list->get_device(device_name);
+}
+
 // 1 means no valid buffer read, 0 means got enough buffer
 // else return < 0 means error
 int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
@@ -771,12 +764,7 @@ const char* Infiniband::qp_state_string(int status) {
 
 void Infiniband::handle_pre_fork()
 {
-  device->uninit();
-}
-
-void Infiniband::handle_post_fork()
-{
-  device->init();
+  device_list->uninit();
 }
 
 int Infiniband::poll_tx(int n, Device **d, ibv_wc *wc)
@@ -803,8 +791,3 @@ void Infiniband::handle_async_event()
 {
   device_list->handle_async_event();
 }
-
-void Infiniband::process_async_event(ibv_async_event &async_event)
-{
-  dispatcher->process_async_event(async_event);
-}
index 10184823d98470669bdc9da2ebce3c84bad9b521..682aeffbc6fabf129f3cc5eec4320dd687fc28d9 100644 (file)
@@ -77,7 +77,6 @@ class Infiniband {
       bool full();
       bool over();
       void clear();
-      void post_srq(Infiniband *ib);
 
      public:
       ibv_mr* mr;
@@ -142,8 +141,6 @@ class Infiniband {
   };
 
  private:
-  uint8_t  ib_physical_port;
-  Device *device;
   DeviceList *device_list;
   RDMADispatcher *dispatcher = nullptr;
 
@@ -151,7 +148,7 @@ class Infiniband {
   void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
 
  public:
-  explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);
+  explicit Infiniband(CephContext *c);
   ~Infiniband();
 
   void set_dispatcher(RDMADispatcher *d);
@@ -271,24 +268,21 @@ class Infiniband {
   };
 
  public:
-  typedef MemoryManager::Cluster Cluster;
-  typedef MemoryManager::Chunk Chunk;
-  uint8_t get_ib_physical_port() { return ib_physical_port; }
   int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
   int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
-  Device* get_device() { return device; }
   static const char* wc_status_to_string(int status);
   static const char* qp_state_string(int status);
 
   void handle_pre_fork();
-  void handle_post_fork();
+
+  Device* get_device(const char* device_name);
 
   int poll_tx(int n, Device **d, ibv_wc *wc);
   int poll_rx(int n, Device **d, ibv_wc *wc);
   int poll_blocking(bool &done);
   void rearm_notify();
   void handle_async_event();
-  void process_async_event(ibv_async_event &async_event);
+  RDMADispatcher *get_dispatcher() { return dispatcher; }
 };
 
 #endif
index 9a82abf3881ddff3b17b6a4d9c9a73d0ce16bf6d..718b7a25c125b77876437547e6e2fb9a0a589328 100644 (file)
@@ -28,8 +28,13 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* i
     is_server(false), con_handler(new C_handle_connection(this)),
     active(false)
 {
-  ibdev = ib->get_device();
-  ibport = ib->get_ib_physical_port();
+  ibdev = ib->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
+  ibport = cct->_conf->ms_async_rdma_port_num;
+
+  assert(ibdev);
+  assert(ibport > 0);
+
+  ibdev->init(ibport);
 
   qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
   my_msg.qpn = qp->get_local_qp_number();
@@ -101,12 +106,12 @@ int RDMAConnectedSocketImpl::activate()
   qpa.ah_attr.grh.hop_limit = 6;
   qpa.ah_attr.grh.dgid = peer_msg.gid;
 
-  qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx();
+  qpa.ah_attr.grh.sgid_index = ibdev->get_gid_idx();
 
   qpa.ah_attr.dlid = peer_msg.lid;
   qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
   qpa.ah_attr.src_path_bits = 0;
-  qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port());
+  qpa.ah_attr.port_num = (uint8_t)ibport;
 
   ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
 
index f15de84bec4bc49c063e077e4235cd7ff563b265..93a22532796d132c3ff7517b2e7d616a2d67d56d 100644 (file)
@@ -16,6 +16,7 @@
 
 #include "msg/async/net_handler.h"
 #include "RDMAStack.h"
+#include "Device.h"
 
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
 RDMAServerSocketImpl::RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
   : cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a)
 {
+  ibdev = infiniband->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
+  ibport = cct->_conf->ms_async_rdma_port_num;
+
+  assert(ibdev);
+  assert(ibport > 0);
+
+  ibdev->init(ibport);
 }
 
 int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
index 373d8a682b398822c598d14838693bef1180233b..dc856b965e8e3c18241cd75dfe0a0f5428a164bb 100644 (file)
@@ -90,7 +90,7 @@ void RDMADispatcher::polling_stop()
   t.join();
 }
 
-void RDMADispatcher::process_async_event(ibv_async_event &async_event)
+void RDMADispatcher::process_async_event(Device *ibdev, ibv_async_event &async_event)
 {
   perf_logger->inc(l_msgr_rdma_total_async_events);
   // FIXME: Currently we must ensure no other factor make QP in ERROR state,
@@ -110,7 +110,7 @@ void RDMADispatcher::process_async_event(ibv_async_event &async_event)
       erase_qpn_lockless(qpn);
     }
   } else {
-    ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
+    ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << *ibdev
       << " evt: " << ibv_event_type_str(async_event.event_type)
       << dendl;
   }
@@ -296,13 +296,10 @@ void RDMADispatcher::handle_pre_fork()
 void RDMADispatcher::handle_post_fork()
 {
   if (!global_infiniband) {
-    global_infiniband.construct(
-      cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
+    global_infiniband.construct(cct);
     global_infiniband->set_dispatcher(this);
   }
 
-  global_infiniband->handle_post_fork();
-
   polling_start();
 }
 
@@ -506,8 +503,7 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
   }
 
   if (!global_infiniband)
-    global_infiniband.construct(
-      cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
+    global_infiniband.construct(cct);
   ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
   dispatcher = new RDMADispatcher(cct, this);
   global_infiniband->set_dispatcher(dispatcher);
index 2a775e4ac872aa02e211818c773433e9b3edc5e2..d5bd0ac3b5e50d5e8f009445dd110dc97c2e9d87 100644 (file)
@@ -105,7 +105,7 @@ class RDMADispatcher : public CephContext::ForkWatcher {
   explicit RDMADispatcher(CephContext* c, RDMAStack* s);
   virtual ~RDMADispatcher();
 
-  void process_async_event(ibv_async_event &async_event);
+  void process_async_event(Device *ibdev, ibv_async_event &async_event);
 
   void polling_start();
   void polling_stop();
@@ -267,6 +267,8 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
 
 class RDMAServerSocketImpl : public ServerSocketImpl {
   CephContext *cct;
+  Device *ibdev;
+  int ibport;
   NetHandler net;
   int server_setup_socket;
   Infiniband* infiniband;