Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d)
: cct(cct), device(d), lock("ibdev_lock"),
async_handler(new C_handle_cq_async(this)), infiniband(ib),
- device_attr(new ibv_device_attr)
+ device_attr(new ibv_device_attr), active_port(nullptr)
{
if (device == NULL) {
lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl;
ceph_abort();
}
- port_cnt = device_attr->phys_port_cnt;
- ports = new Port *[port_cnt + 1];
- assert(ports);
-
- for (int i = 1; i <= port_cnt; i++) {
- ports[i] = new Port(cct, ctxt, i);
- assert(ports[i]);
- }
-
tx_cc = create_comp_channel(cct);
assert(tx_cc);
{
Mutex::Locker l(lock);
- verify_port(ibport);
-
if (initialized)
return;
rx_cq = create_comp_queue(cct, rx_cc);
assert(rx_cq);
+ binding_port(cct, ibport);
+
initialized = true;
ldout(cct, 5) << __func__ << ":" << __LINE__ << " device " << *this << " is initialized" << dendl;
uninit();
- for (int i = 1; i <= port_cnt; i++)
- delete ports[i];
- delete[] ports;
+ if (active_port) {
+ delete active_port;
+ assert(ibv_close_device(ctxt) == 0);
+ }
- assert(ibv_close_device(ctxt) == 0);
delete device_attr;
}
-void Device::verify_port(int port_num) {
- if (port_num < 0 || port_num > port_cnt) {
- lderr(cct) << __func__ << " port not found" << dendl;
- ceph_abort();
+void Device::binding_port(CephContext *cct, int port_num) {
+ port_cnt = device_attr->phys_port_cnt;
+ for (uint8_t i = 0; i < port_cnt; ++i) {
+ Port *port = new Port(cct, ctxt, i+1);
+ if (i + 1 == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) {
+ active_port = port;
+ ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;
+ break;
+ } else {
+ ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl;
+ }
+ delete port;
}
-
- Port *port = ports[port_num];
-
- if (port->get_port_attr()->state == IBV_PORT_ACTIVE) {
- ldout(cct, 1) << __func__ << " found active port " << port_num << dendl;
- } else {
- ldout(cct, 10) << __func__ << " port " << port_num <<
- " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl;
- ceph_abort();
+ if (nullptr == active_port) {
+ lderr(cct) << __func__ << " port not found" << dendl;
+ assert(active_port);
}
}
-Port *Device::get_port(int ibport)
-{
- assert(ibport > 0 && ibport <= port_cnt);
- return ports[ibport];
-}
-
/**
* Create a new QueuePair. This factory should be used in preference to
* the QueuePair constructor directly, since this lets derivatives of
* QueuePair on success or NULL if init fails
* See QueuePair::QueuePair for parameter documentation.
*/
-Infiniband::QueuePair* Device::create_queue_pair(int port,
+Infiniband::QueuePair* Device::create_queue_pair(CephContext *cct,
ibv_qp_type type)
{
Infiniband::QueuePair *qp = new QueuePair(
- cct, *this, type, port, srq, tx_cq, rx_cq, max_send_wr, max_recv_wr);
+ cct, *this, type, active_port->get_port_num(), srq, tx_cq, rx_cq, max_send_wr, max_recv_wr);
if (qp->init()) {
delete qp;
return NULL;
CephContext *cct;
ibv_device *device;
const char *name;
-
- Port **ports; // Array of Port objects. index is 1 based (IB port #1 is in
- // index 1). Index 0 is not used
-
- int port_cnt;
+ uint8_t port_cnt;
uint32_t max_send_wr;
uint32_t max_recv_wr;
EventCallbackRef async_handler;
Infiniband *infiniband;
- void verify_port(int port_num);
-
public:
explicit Device(CephContext *c, Infiniband *ib, ibv_device* d);
~Device();
void handle_async_event();
const char* get_name() const { return name;}
+ uint16_t get_lid() { return active_port->get_lid(); }
+ ibv_gid get_gid() { return active_port->get_gid(); }
+ int get_gid_idx() { return active_port->get_gid_idx(); }
+ void binding_port(CephContext *c, int port_num);
- Port *get_port(int ibport);
- uint16_t get_lid(int p) { return get_port(p)->get_lid(); }
- ibv_gid get_gid(int p) { return get_port(p)->get_gid(); }
- int get_gid_idx(int p) { return get_port(p)->get_gid_idx(); }
-
- QueuePair *create_queue_pair(int port,
- ibv_qp_type type);
+ QueuePair* create_queue_pair(CephContext *c, ibv_qp_type type);
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
CompletionChannel *create_comp_channel(CephContext *c);
CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
struct ibv_context *ctxt;
ibv_device_attr *device_attr;
+ Port* active_port;
MemoryManager* memory_manager = nullptr;
ibv_srq *srq = nullptr;
ibdev = d;
ibport = p;
- qp = ibdev->create_queue_pair(ibport, IBV_QPT_RC);
+ qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
local_qpn = qp->get_local_qp_number();
my_msg.qpn = socket->local_qpn;
my_msg.psn = qp->get_initial_psn();
- my_msg.lid = ibdev->get_lid(ibport);
+ my_msg.lid = ibdev->get_lid();
my_msg.peer_qpn = 0;
- my_msg.gid = ibdev->get_gid(ibport);
+ my_msg.gid = ibdev->get_gid();
socket->register_qp(qp);
}
ibv_qp_attr qpa;
int r;
- Device *ibdev = socket->get_device();
- int ibport = socket->get_ibport();
-
socket->remote_qpn = peer_msg.qpn;
// now connect up the qps and switch to RTR
qpa.ah_attr.grh.hop_limit = 6;
qpa.ah_attr.grh.dgid = peer_msg.gid;
- qpa.ah_attr.grh.sgid_index = ibdev->get_gid_idx(ibport);
+ qpa.ah_attr.grh.sgid_index = socket->get_device()->get_gid_idx();
qpa.ah_attr.dlid = peer_msg.lid;
qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
qpa.ah_attr.src_path_bits = 0;
- qpa.ah_attr.port_num = (uint8_t)ibport;
+ qpa.ah_attr.port_num = (uint8_t)socket->get_ibport();
ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;