]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Revert "msg/async/rdma: Initialize device on first connect"
authorAmir Vadai <amir@vadai.me>
Tue, 23 May 2017 07:33:48 +0000 (10:33 +0300)
committerAmir Vadai <amir@vadai.me>
Tue, 23 May 2017 14:04:24 +0000 (17:04 +0300)
This reverts commit 94eddb1ec53205e3e1b21033a9b841c4a72d7a77.

Change-Id: Ia16384eaed1295af6a4352e03308bc13bfb2f6d2
Issue: 995322
Signed-off-by: Amir Vadai <amir@vadai.me>
src/msg/async/rdma/Device.cc
src/msg/async/rdma/Device.h
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAServerSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index d993d9ca5c043c2048fc2013e4ef4e794f044a7f..58700db2c210e67391d40be35feedf5d0b4e3f3b 100644 (file)
@@ -138,7 +138,7 @@ Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d)
   assert(NetHandler(cct).set_nonblock(ctxt->async_fd) == 0);
 }
 
-void Device::init(int ibport)
+void Device::init()
 {
   Mutex::Locker l(lock);
 
@@ -170,10 +170,7 @@ void Device::init(int ibport)
   rx_cq = create_comp_queue(cct, rx_cc);
   assert(rx_cq);
 
-  binding_port(cct, ibport);
-
   initialized = true;
-
   ldout(cct, 5) << __func__ << ":" << __LINE__ << " device " << *this << " is initialized" << dendl;
 }
 
@@ -367,8 +364,7 @@ void Device::handle_async_event()
   ldout(cct, 30) << __func__ << dendl;
 
   while (!ibv_get_async_event(ctxt, &async_event)) {
-    RDMADispatcher *d = infiniband->get_dispatcher();
-    d->process_async_event(this, async_event);
+    infiniband->process_async_event(async_event);
 
     ibv_ack_async_event(&async_event);
   }
@@ -499,9 +495,3 @@ void DeviceList::handle_async_event()
   for (int i = 0; i < num; i++)
     devices[i]->handle_async_event();
 }
-
-void DeviceList::uninit()
-{
-  for (int i = 0; i < num; i++)
-    devices[i]->uninit();
-}
index 5e99bf56cca19e4220f09d5c40b8d72d5438c7f2..2fb46dc13b4c11514638b3c49c4a0d227714e016 100644 (file)
@@ -85,7 +85,7 @@ class Device {
   explicit Device(CephContext *c, Infiniband *ib, ibv_device* d);
   ~Device();
 
-  void init(int ibport = -1);
+  void init();
   void uninit();
 
   void handle_async_event();
@@ -146,8 +146,6 @@ class DeviceList {
 
   Device* get_device(const char* device_name);
 
-  void uninit();
-
   void rearm_notify();
   int poll_tx(int n, Device **d, ibv_wc *wc);
   int poll_rx(int n, Device **d, ibv_wc *wc);
index 020d1be47b6e93a84adc5a3aecf6d5581a1fb9ce..b3bf617864e7659b98d296846a3e8b0257a5710d 100644 (file)
@@ -437,6 +437,11 @@ void Infiniband::MemoryManager::Chunk::clear()
   bound = 0;
 }
 
+void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib)
+{
+  ib->device->post_chunk(this);
+}
+
 Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
   : manager(m), buffer_size(s), lock("cluster_lock")
 {
@@ -585,9 +590,16 @@ int Infiniband::MemoryManager::get_channel_buffers(std::vector<Chunk*> &chunks,
 }
 
 
-Infiniband::Infiniband(CephContext *cct)
+Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
   : device_list(new DeviceList(cct, this))
 {
+  device = device_list->get_device(device_name.c_str());
+
+  device->init();
+
+  device->binding_port(cct, port_num);
+  assert(device);
+  ib_physical_port = device->active_port->get_port_num();
 }
 
 Infiniband::~Infiniband()
@@ -605,11 +617,6 @@ void Infiniband::set_dispatcher(RDMADispatcher *d)
   dispatcher = d;
 }
 
-Device* Infiniband::get_device(const char* device_name)
-{
-  return device_list->get_device(device_name);
-}
-
 // 1 means no valid buffer read, 0 means got enough buffer
 // else return < 0 means error
 int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
@@ -761,7 +768,12 @@ const char* Infiniband::qp_state_string(int status) {
 
 void Infiniband::handle_pre_fork()
 {
-  device_list->uninit();
+  device->uninit();
+}
+
+void Infiniband::handle_post_fork()
+{
+  device->init();
 }
 
 int Infiniband::poll_tx(int n, Device **d, ibv_wc *wc)
@@ -788,3 +800,8 @@ void Infiniband::handle_async_event()
 {
   device_list->handle_async_event();
 }
+
+void Infiniband::process_async_event(ibv_async_event &async_event)
+{
+  dispatcher->process_async_event(async_event);
+}
index 2ac392738e4e81b9bf6c978967c8195385cf36b4..c24448d824f61297c2c934106873c461e333dd21 100644 (file)
@@ -77,6 +77,7 @@ class Infiniband {
       bool full();
       bool over();
       void clear();
+      void post_srq(Infiniband *ib);
 
      public:
       ibv_mr* mr;
@@ -141,6 +142,8 @@ class Infiniband {
   };
 
  private:
+  uint8_t  ib_physical_port;
+  Device *device;
   DeviceList *device_list;
   RDMADispatcher *dispatcher = nullptr;
 
@@ -148,7 +151,7 @@ class Infiniband {
   void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
 
  public:
-  explicit Infiniband(CephContext *c);
+  explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);
   ~Infiniband();
 
   void set_dispatcher(RDMADispatcher *d);
@@ -268,21 +271,24 @@ class Infiniband {
   };
 
  public:
+  typedef MemoryManager::Cluster Cluster;
+  typedef MemoryManager::Chunk Chunk;
+  uint8_t get_ib_physical_port() { return ib_physical_port; }
   int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
   int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
+  Device* get_device() { return device; }
   static const char* wc_status_to_string(int status);
   static const char* qp_state_string(int status);
 
   void handle_pre_fork();
-
-  Device* get_device(const char* device_name);
+  void handle_post_fork();
 
   int poll_tx(int n, Device **d, ibv_wc *wc);
   int poll_rx(int n, Device **d, ibv_wc *wc);
   int poll_blocking(bool &done);
   void rearm_notify();
   void handle_async_event();
-  RDMADispatcher *get_dispatcher() { return dispatcher; }
+  void process_async_event(ibv_async_event &async_event);
 };
 
 #endif
index e7066963de9aaf65a50bd31e2ee85f57614d04f4..154df978e92e65e38e6923dba7f46871b6620c4d 100644 (file)
@@ -28,13 +28,8 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* i
     is_server(false), con_handler(new C_handle_connection(this)),
     active(false)
 {
-  ibdev = ib->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
-  ibport = cct->_conf->ms_async_rdma_port_num;
-
-  assert(ibdev);
-  assert(ibport > 0);
-
-  ibdev->init(ibport);
+  ibdev = ib->get_device();
+  ibport = ib->get_ib_physical_port();
 
   qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
   my_msg.qpn = qp->get_local_qp_number();
@@ -108,12 +103,12 @@ int RDMAConnectedSocketImpl::activate()
   qpa.ah_attr.grh.hop_limit = 6;
   qpa.ah_attr.grh.dgid = peer_msg.gid;
 
-  qpa.ah_attr.grh.sgid_index = ibdev->get_gid_idx();
+  qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx();
 
   qpa.ah_attr.dlid = peer_msg.lid;
   qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
   qpa.ah_attr.src_path_bits = 0;
-  qpa.ah_attr.port_num = (uint8_t)ibport;
+  qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port());
 
   ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
 
index 43536d4f3b5984d74c0078ea2381eac5fdae9fe5..8f5fd81d3dc751eab08adf86f2a031979c1c3dd3 100644 (file)
@@ -16,7 +16,6 @@
 
 #include "msg/async/net_handler.h"
 #include "RDMAStack.h"
-#include "Device.h"
 
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
 RDMAServerSocketImpl::RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
   : cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a)
 {
-  ibdev = infiniband->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
-  ibport = cct->_conf->ms_async_rdma_port_num;
-
-  assert(ibdev);
-  assert(ibport > 0);
-
-  ibdev->init(ibport);
 }
 
 int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
index ad49b7d2b79e940e587458c5a2b11caef5dee34f..21241e28cb93ef90562e656c66323ff27ce008eb 100644 (file)
@@ -91,7 +91,7 @@ void RDMADispatcher::polling_stop()
   t.join();
 }
 
-void RDMADispatcher::process_async_event(Device *ibdev, ibv_async_event &async_event)
+void RDMADispatcher::process_async_event(ibv_async_event &async_event)
 {
   perf_logger->inc(l_msgr_rdma_total_async_events);
   // FIXME: Currently we must ensure no other factor make QP in ERROR state,
@@ -111,7 +111,7 @@ void RDMADispatcher::process_async_event(Device *ibdev, ibv_async_event &async_e
       erase_qpn_lockless(qpn);
     }
   } else {
-    ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << *ibdev
+    ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
       << " evt: " << ibv_event_type_str(async_event.event_type)
       << dendl;
   }
@@ -299,10 +299,13 @@ void RDMADispatcher::handle_pre_fork()
 void RDMADispatcher::handle_post_fork()
 {
   if (!global_infiniband) {
-    global_infiniband.construct(cct);
+    global_infiniband.construct(
+      cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
     global_infiniband->set_dispatcher(this);
   }
 
+  global_infiniband->handle_post_fork();
+
   polling_start();
 }
 
@@ -506,7 +509,8 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
   }
 
   if (!global_infiniband)
-    global_infiniband.construct(cct);
+    global_infiniband.construct(
+      cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
   ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
   dispatcher = new RDMADispatcher(cct, this);
   global_infiniband->set_dispatcher(dispatcher);
index ea058faf6fe9d8f14a19c1957fcf0ee8f70e35f7..4122975d5d7ea892e23ed245d6bfeb9983f75785 100644 (file)
@@ -106,7 +106,7 @@ class RDMADispatcher : public CephContext::ForkWatcher {
   explicit RDMADispatcher(CephContext* c, RDMAStack* s);
   virtual ~RDMADispatcher();
 
-  void process_async_event(Device *ibdev, ibv_async_event &async_event);
+  void process_async_event(ibv_async_event &async_event);
 
   void polling_start();
   void polling_stop();
@@ -268,8 +268,6 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
 
 class RDMAServerSocketImpl : public ServerSocketImpl {
   CephContext *cct;
-  Device *ibdev;
-  int ibport;
   NetHandler net;
   int server_setup_socket;
   Infiniband* infiniband;