Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d)
: cct(cct), device(d), lock("ibdev_lock"),
async_handler(new C_handle_cq_async(this)), infiniband(ib),
- device_attr(new ibv_device_attr), active_port(nullptr)
+ device_attr(new ibv_device_attr)
{
if (device == NULL) {
lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl;
ceph_abort();
}
+ port_cnt = device_attr->phys_port_cnt;
+ ports = new Port *[port_cnt + 1];
+ assert(ports);
+
+ for (int i = 1; i <= port_cnt; i++) {
+ ports[i] = new Port(cct, ctxt, i);
+ assert(ports[i]);
+ }
+
tx_cc = create_comp_channel(cct);
assert(tx_cc);
{
Mutex::Locker l(lock);
+ verify_port(ibport);
+
if (initialized)
return;
rx_cq = create_comp_queue(cct, rx_cc);
assert(rx_cq);
- binding_port(cct, ibport);
-
initialized = true;
ldout(cct, 5) << __func__ << ":" << __LINE__ << " device " << *this << " is initialized" << dendl;
uninit();
- if (active_port) {
- delete active_port;
- assert(ibv_close_device(ctxt) == 0);
- }
+ for (int i = 1; i <= port_cnt; i++)
+ delete ports[i];
+ delete[] ports;
+ assert(ibv_close_device(ctxt) == 0);
delete device_attr;
}
-void Device::binding_port(CephContext *cct, int port_num) {
- port_cnt = device_attr->phys_port_cnt;
- for (uint8_t i = 0; i < port_cnt; ++i) {
- Port *port = new Port(cct, ctxt, i+1);
- if (i + 1 == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) {
- active_port = port;
- ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;
- break;
- } else {
- ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl;
- }
- delete port;
- }
- if (nullptr == active_port) {
+void Device::verify_port(int port_num) {
+ if (port_num < 0 || port_num > port_cnt) {
lderr(cct) << __func__ << " port not found" << dendl;
- assert(active_port);
+ ceph_abort();
}
+
+ Port *port = ports[port_num];
+
+ if (port->get_port_attr()->state == IBV_PORT_ACTIVE) {
+ ldout(cct, 1) << __func__ << " found active port " << port_num << dendl;
+ } else {
+ ldout(cct, 10) << __func__ << " port " << port_num <<
+ " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl;
+ ceph_abort();
+ }
+}
+
+Port *Device::get_port(int ibport)
+{
+ assert(ibport > 0 && ibport <= port_cnt);
+ return ports[ibport];
}
/**
* QueuePair on success or NULL if init fails
* See QueuePair::QueuePair for parameter documentation.
*/
-Infiniband::QueuePair* Device::create_queue_pair(CephContext *cct,
+Infiniband::QueuePair* Device::create_queue_pair(int port,
ibv_qp_type type)
{
Infiniband::QueuePair *qp = new QueuePair(
- cct, *this, type, active_port->get_port_num(), srq, tx_cq, rx_cq, max_send_wr, max_recv_wr);
+ cct, *this, type, port, srq, tx_cq, rx_cq, max_send_wr, max_recv_wr);
if (qp->init()) {
delete qp;
return NULL;
CephContext *cct;
ibv_device *device;
const char *name;
- uint8_t port_cnt;
+
+ Port **ports; // Array of Port objects. index is 1 based (IB port #1 is in
+ // index 1). Index 0 is not used
+
+ int port_cnt;
uint32_t max_send_wr;
uint32_t max_recv_wr;
EventCallbackRef async_handler;
Infiniband *infiniband;
+ void verify_port(int port_num);
+
public:
explicit Device(CephContext *c, Infiniband *ib, ibv_device* d);
~Device();
void handle_async_event();
const char* get_name() const { return name;}
- uint16_t get_lid() { return active_port->get_lid(); }
- ibv_gid get_gid() { return active_port->get_gid(); }
- int get_gid_idx() { return active_port->get_gid_idx(); }
- void binding_port(CephContext *c, int port_num);
- QueuePair* create_queue_pair(CephContext *c, ibv_qp_type type);
+ Port *get_port(int ibport);
+ uint16_t get_lid(int p) { return get_port(p)->get_lid(); }
+ ibv_gid get_gid(int p) { return get_port(p)->get_gid(); }
+ int get_gid_idx(int p) { return get_port(p)->get_gid_idx(); }
+
+ QueuePair *create_queue_pair(int port,
+ ibv_qp_type type);
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
CompletionChannel *create_comp_channel(CephContext *c);
CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
struct ibv_context *ctxt;
ibv_device_attr *device_attr;
- Port* active_port;
MemoryManager* memory_manager = nullptr;
ibv_srq *srq = nullptr;
ibdev = d;
ibport = p;
- qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
+ qp = ibdev->create_queue_pair(ibport, IBV_QPT_RC);
local_qpn = qp->get_local_qp_number();
my_msg.qpn = socket->local_qpn;
my_msg.psn = qp->get_initial_psn();
- my_msg.lid = ibdev->get_lid();
+ my_msg.lid = ibdev->get_lid(ibport);
my_msg.peer_qpn = 0;
- my_msg.gid = ibdev->get_gid();
+ my_msg.gid = ibdev->get_gid(ibport);
socket->register_qp(qp);
}
ibv_qp_attr qpa;
int r;
+ Device *ibdev = socket->get_device();
+ int ibport = socket->get_ibport();
+
socket->remote_qpn = peer_msg.qpn;
// now connect up the qps and switch to RTR
qpa.ah_attr.grh.hop_limit = 6;
qpa.ah_attr.grh.dgid = peer_msg.gid;
- qpa.ah_attr.grh.sgid_index = socket->get_device()->get_gid_idx();
+ qpa.ah_attr.grh.sgid_index = ibdev->get_gid_idx(ibport);
qpa.ah_attr.dlid = peer_msg.lid;
qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
qpa.ah_attr.src_path_bits = 0;
- qpa.ah_attr.port_num = (uint8_t)socket->get_ibport();
+ qpa.ah_attr.port_num = (uint8_t)ibport;
ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;