]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: Make port number an attribute of the Connection not of the Device 14297/head
authorAmir Vadai <amir@vadai.me>
Wed, 22 Mar 2017 10:41:12 +0000 (12:41 +0200)
committerAdir Lev <adirl@mellanox.com>
Tue, 4 Apr 2017 14:52:10 +0000 (17:52 +0300)
Since multiple connections on different ports could exist, shouldn't use
device->active_port, instead use conn->ibport.
Or in other words, now Device object doesn't have an active_port,
instead every port specific action (create_qp, get_lid, get_gid etc.)
need to specify the port number. The information about the port number
is known to the connection (RDMAConnectedSocket*) who is the caller of
those actions.

Issue: 995322
Change-Id: I482cb87c04ba99845dc44f6dd0547835fe814ebf
Signed-off-by: Amir Vadai <amir@vadai.me>
src/msg/async/rdma/Device.cc
src/msg/async/rdma/Device.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc

index fc09063989ffcd3bc5d9d52da371cc3319255ac7..392ed7f69c75b882e289ea142d24a6350c45f63b 100644 (file)
@@ -116,7 +116,7 @@ Port::~Port()
 Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d)
   : cct(cct), device(d), lock("ibdev_lock"),
     async_handler(new C_handle_cq_async(this)), infiniband(ib),
-    device_attr(new ibv_device_attr), active_port(nullptr)
+    device_attr(new ibv_device_attr)
 {
   if (device == NULL) {
     lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl;
@@ -134,6 +134,15 @@ Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d)
     ceph_abort();
   }
 
+  port_cnt = device_attr->phys_port_cnt;
+  ports = new Port *[port_cnt + 1];
+  assert(ports);
+
+  for (int i = 1; i <= port_cnt; i++) {
+    ports[i] = new Port(cct, ctxt, i);
+    assert(ports[i]);
+  }
+
   tx_cc = create_comp_channel(cct);
   assert(tx_cc);
 
@@ -147,6 +156,8 @@ void Device::init(int ibport)
 {
   Mutex::Locker l(lock);
 
+  verify_port(ibport);
+
   if (initialized)
     return;
 
@@ -175,8 +186,6 @@ void Device::init(int ibport)
   rx_cq = create_comp_queue(cct, rx_cc);
   assert(rx_cq);
 
-  binding_port(cct, ibport);
-
   initialized = true;
 
   ldout(cct, 5) << __func__ << ":" << __LINE__ << " device " << *this << " is initialized" << dendl;
@@ -210,31 +219,35 @@ Device::~Device()
 
   uninit();
 
-  if (active_port) {
-    delete active_port;
-    assert(ibv_close_device(ctxt) == 0);
-  }
+  for (int i = 1; i <= port_cnt; i++)
+    delete ports[i];
+  delete[] ports;
 
+  assert(ibv_close_device(ctxt) == 0);
   delete device_attr;
 }
 
-void Device::binding_port(CephContext *cct, int port_num) {
-  port_cnt = device_attr->phys_port_cnt;
-  for (uint8_t i = 0; i < port_cnt; ++i) {
-    Port *port = new Port(cct, ctxt, i+1);
-    if (i + 1 == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) {
-      active_port = port;
-      ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;
-      break;
-    } else {
-      ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl;
-    }
-    delete port;
-  }
-  if (nullptr == active_port) {
+void Device::verify_port(int port_num) {
+  if (port_num < 0 || port_num > port_cnt) {
     lderr(cct) << __func__ << "  port not found" << dendl;
-    assert(active_port);
+    ceph_abort();
   }
+
+  Port *port = ports[port_num];
+
+  if (port->get_port_attr()->state == IBV_PORT_ACTIVE) {
+    ldout(cct, 1) << __func__ << " found active port " << port_num << dendl;
+  } else {
+    ldout(cct, 10) << __func__ << " port " << port_num <<
+      " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl;
+    ceph_abort();
+  }
+}
+
+Port *Device::get_port(int ibport)
+{
+  assert(ibport > 0 && ibport <= port_cnt);
+  return ports[ibport];
 }
 
 /**
@@ -247,11 +260,11 @@ void Device::binding_port(CephContext *cct, int port_num) {
  *      QueuePair on success or NULL if init fails
  * See QueuePair::QueuePair for parameter documentation.
  */
-Infiniband::QueuePair* Device::create_queue_pair(CephContext *cct,
+Infiniband::QueuePair* Device::create_queue_pair(int port,
                                                 ibv_qp_type type)
 {
   Infiniband::QueuePair *qp = new QueuePair(
-      cct, *this, type, active_port->get_port_num(), srq, tx_cq, rx_cq, max_send_wr, max_recv_wr);
+      cct, *this, type, port, srq, tx_cq, rx_cq, max_send_wr, max_recv_wr);
   if (qp->init()) {
     delete qp;
     return NULL;
index 636fd76eb9383d8f92545ce5786fe069f6fe605a..5ba7d5ca4efe4d66d0a46d62c12cedca8ae46f5b 100644 (file)
@@ -71,7 +71,11 @@ class Device {
   CephContext *cct;
   ibv_device *device;
   const char *name;
-  uint8_t  port_cnt;
+
+  Port **ports; // Array of Port objects. index is 1 based (IB port #1 is in
+                // index 1). Index 0 is not used
+
+  int port_cnt;
 
   uint32_t max_send_wr;
   uint32_t max_recv_wr;
@@ -82,6 +86,8 @@ class Device {
   EventCallbackRef async_handler;
   Infiniband *infiniband;
 
+  void verify_port(int port_num);
+
  public:
   explicit Device(CephContext *c, Infiniband *ib, ibv_device* d);
   ~Device();
@@ -92,12 +98,14 @@ class Device {
   void handle_async_event();
 
   const char* get_name() const { return name;}
-  uint16_t get_lid() { return active_port->get_lid(); }
-  ibv_gid get_gid() { return active_port->get_gid(); }
-  int get_gid_idx() { return active_port->get_gid_idx(); }
-  void binding_port(CephContext *c, int port_num);
 
-  QueuePair* create_queue_pair(CephContext *c, ibv_qp_type type);
+  Port *get_port(int ibport);
+  uint16_t get_lid(int p) { return get_port(p)->get_lid(); }
+  ibv_gid get_gid(int p) { return get_port(p)->get_gid(); }
+  int get_gid_idx(int p) { return get_port(p)->get_gid_idx(); }
+
+  QueuePair *create_queue_pair(int port,
+                              ibv_qp_type type);
   ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
   CompletionChannel *create_comp_channel(CephContext *c);
   CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
@@ -115,7 +123,6 @@ class Device {
 
   struct ibv_context *ctxt;
   ibv_device_attr *device_attr;
-  Port* active_port;
 
   MemoryManager* memory_manager = nullptr;
   ibv_srq *srq = nullptr;
index 80f3c014adb0db9f2f4d8f5afbe411faad461d2f..83c8b5387b9d23112ee1742c35856cb073212737 100644 (file)
@@ -44,7 +44,7 @@ QueuePair *RDMAConnectedSocketImpl::create_queue_pair(Device *d, int p)
   ibdev = d;
   ibport = p;
 
-  qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
+  qp = ibdev->create_queue_pair(ibport, IBV_QPT_RC);
 
   local_qpn = qp->get_local_qp_number();
 
@@ -67,9 +67,9 @@ RDMAConnTCP::RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock,
 
   my_msg.qpn = socket->local_qpn;
   my_msg.psn = qp->get_initial_psn();
-  my_msg.lid = ibdev->get_lid();
+  my_msg.lid = ibdev->get_lid(ibport);
   my_msg.peer_qpn = 0;
-  my_msg.gid = ibdev->get_gid();
+  my_msg.gid = ibdev->get_gid(ibport);
   socket->register_qp(qp);
 }
 
@@ -132,6 +132,9 @@ int RDMAConnTCP::activate()
   ibv_qp_attr qpa;
   int r;
 
+  Device *ibdev = socket->get_device();
+  int ibport = socket->get_ibport();
+
   socket->remote_qpn = peer_msg.qpn;
 
   // now connect up the qps and switch to RTR
@@ -147,12 +150,12 @@ int RDMAConnTCP::activate()
   qpa.ah_attr.grh.hop_limit = 6;
   qpa.ah_attr.grh.dgid = peer_msg.gid;
 
-  qpa.ah_attr.grh.sgid_index = socket->get_device()->get_gid_idx();
+  qpa.ah_attr.grh.sgid_index = ibdev->get_gid_idx(ibport);
 
   qpa.ah_attr.dlid = peer_msg.lid;
   qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
   qpa.ah_attr.src_path_bits = 0;
-  qpa.ah_attr.port_num = (uint8_t)socket->get_ibport();
+  qpa.ah_attr.port_num = (uint8_t)ibport;
 
   ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;