assert(NetHandler(cct).set_nonblock(ctxt->async_fd) == 0);
}
-void Device::init(int ibport)
+void Device::init()
{
Mutex::Locker l(lock);
rx_cq = create_comp_queue(cct, rx_cc);
assert(rx_cq);
- binding_port(cct, ibport);
-
initialized = true;
-
ldout(cct, 5) << __func__ << ":" << __LINE__ << " device " << *this << " is initialized" << dendl;
}
ldout(cct, 30) << __func__ << dendl;
while (!ibv_get_async_event(ctxt, &async_event)) {
- RDMADispatcher *d = infiniband->get_dispatcher();
- d->process_async_event(this, async_event);
+ infiniband->process_async_event(async_event);
ibv_ack_async_event(&async_event);
}
for (int i = 0; i < num; i++)
devices[i]->handle_async_event();
}
-
-void DeviceList::uninit()
-{
- for (int i = 0; i < num; i++)
- devices[i]->uninit();
-}
explicit Device(CephContext *c, Infiniband *ib, ibv_device* d);
~Device();
- void init(int ibport = -1);
+ void init();
void uninit();
void handle_async_event();
Device* get_device(const char* device_name);
- void uninit();
-
void rearm_notify();
int poll_tx(int n, Device **d, ibv_wc *wc);
int poll_rx(int n, Device **d, ibv_wc *wc);
bound = 0;
}
+void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib)
+{
+ ib->device->post_chunk(this);
+}
+
Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
: manager(m), buffer_size(s), lock("cluster_lock")
{
}
-Infiniband::Infiniband(CephContext *cct)
+Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
: device_list(new DeviceList(cct, this))
{
+ device = device_list->get_device(device_name.c_str());
+
+ device->init();
+
+ device->binding_port(cct, port_num);
+ assert(device);
+ ib_physical_port = device->active_port->get_port_num();
}
Infiniband::~Infiniband()
dispatcher = d;
}
-Device* Infiniband::get_device(const char* device_name)
-{
- return device_list->get_device(device_name);
-}
-
// 1 means no valid buffer read, 0 means got enough buffer
// else return < 0 means error
int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
void Infiniband::handle_pre_fork()
{
- device_list->uninit();
+ device->uninit();
+}
+
+void Infiniband::handle_post_fork()
+{
+ device->init();
}
int Infiniband::poll_tx(int n, Device **d, ibv_wc *wc)
{
device_list->handle_async_event();
}
+
+void Infiniband::process_async_event(ibv_async_event &async_event)
+{
+ dispatcher->process_async_event(async_event);
+}
bool full();
bool over();
void clear();
+ void post_srq(Infiniband *ib);
public:
ibv_mr* mr;
};
private:
+ uint8_t ib_physical_port;
+ Device *device;
DeviceList *device_list;
RDMADispatcher *dispatcher = nullptr;
void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
public:
- explicit Infiniband(CephContext *c);
+ explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);
~Infiniband();
void set_dispatcher(RDMADispatcher *d);
};
public:
+ typedef MemoryManager::Cluster Cluster;
+ typedef MemoryManager::Chunk Chunk;
+ uint8_t get_ib_physical_port() { return ib_physical_port; }
int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
+ Device* get_device() { return device; }
static const char* wc_status_to_string(int status);
static const char* qp_state_string(int status);
void handle_pre_fork();
-
- Device* get_device(const char* device_name);
+ void handle_post_fork();
int poll_tx(int n, Device **d, ibv_wc *wc);
int poll_rx(int n, Device **d, ibv_wc *wc);
int poll_blocking(bool &done);
void rearm_notify();
void handle_async_event();
- RDMADispatcher *get_dispatcher() { return dispatcher; }
+ void process_async_event(ibv_async_event &async_event);
};
#endif
is_server(false), con_handler(new C_handle_connection(this)),
active(false)
{
- ibdev = ib->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
- ibport = cct->_conf->ms_async_rdma_port_num;
-
- assert(ibdev);
- assert(ibport > 0);
-
- ibdev->init(ibport);
+ ibdev = ib->get_device();
+ ibport = ib->get_ib_physical_port();
qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
my_msg.qpn = qp->get_local_qp_number();
qpa.ah_attr.grh.hop_limit = 6;
qpa.ah_attr.grh.dgid = peer_msg.gid;
- qpa.ah_attr.grh.sgid_index = ibdev->get_gid_idx();
+ qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx();
qpa.ah_attr.dlid = peer_msg.lid;
qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
qpa.ah_attr.src_path_bits = 0;
- qpa.ah_attr.port_num = (uint8_t)ibport;
+ qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port());
ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
#include "msg/async/net_handler.h"
#include "RDMAStack.h"
-#include "Device.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
RDMAServerSocketImpl::RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
: cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a)
{
- ibdev = infiniband->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
- ibport = cct->_conf->ms_async_rdma_port_num;
-
- assert(ibdev);
- assert(ibport > 0);
-
- ibdev->init(ibport);
}
int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
t.join();
}
-void RDMADispatcher::process_async_event(Device *ibdev, ibv_async_event &async_event)
+void RDMADispatcher::process_async_event(ibv_async_event &async_event)
{
perf_logger->inc(l_msgr_rdma_total_async_events);
// FIXME: Currently we must ensure no other factor make QP in ERROR state,
erase_qpn_lockless(qpn);
}
} else {
- ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << *ibdev
+ ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
<< " evt: " << ibv_event_type_str(async_event.event_type)
<< dendl;
}
void RDMADispatcher::handle_post_fork()
{
if (!global_infiniband) {
- global_infiniband.construct(cct);
+ global_infiniband.construct(
+ cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
global_infiniband->set_dispatcher(this);
}
+ global_infiniband->handle_post_fork();
+
polling_start();
}
}
if (!global_infiniband)
- global_infiniband.construct(cct);
+ global_infiniband.construct(
+ cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
dispatcher = new RDMADispatcher(cct, this);
global_infiniband->set_dispatcher(dispatcher);
explicit RDMADispatcher(CephContext* c, RDMAStack* s);
virtual ~RDMADispatcher();
- void process_async_event(Device *ibdev, ibv_async_event &async_event);
+ void process_async_event(ibv_async_event &async_event);
void polling_start();
void polling_stop();
class RDMAServerSocketImpl : public ServerSocketImpl {
CephContext *cct;
- Device *ibdev;
- int ibport;
NetHandler net;
int server_setup_socket;
Infiniband* infiniband;