assert(NetHandler(cct).set_nonblock(ctxt->async_fd) == 0);
}
-void Device::init()
+void Device::init(int ibport)
{
Mutex::Locker l(lock);
rx_cq = create_comp_queue(cct, rx_cc);
assert(rx_cq);
+ binding_port(cct, ibport);
+
initialized = true;
+
ldout(cct, 5) << __func__ << ":" << __LINE__ << " device " << *this << " is initialized" << dendl;
}
ldout(cct, 30) << __func__ << dendl;
while (!ibv_get_async_event(ctxt, &async_event)) {
- infiniband->process_async_event(async_event);
+ RDMADispatcher *d = infiniband->get_dispatcher();
+ d->process_async_event(this, async_event);
ibv_ack_async_event(&async_event);
}
for (int i = 0; i < num; i++)
devices[i]->handle_async_event();
}
+
+void DeviceList::uninit()
+{
+ for (int i = 0; i < num; i++)
+ devices[i]->uninit();
+}
explicit Device(CephContext *c, Infiniband *ib, ibv_device* d);
~Device();
- void init();
+ void init(int ibport = -1);
void uninit();
void handle_async_event();
Device* get_device(const char* device_name);
+ void uninit();
+
void rearm_notify();
int poll_tx(int n, Device **d, ibv_wc *wc);
int poll_rx(int n, Device **d, ibv_wc *wc);
bound = 0;
}
-void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib)
-{
- ib->device->post_chunk(this);
-}
-
Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
: manager(m), buffer_size(s), lock("cluster_lock")
{
}
-Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
+Infiniband::Infiniband(CephContext *cct)
: device_list(new DeviceList(cct, this))
{
- device = device_list->get_device(device_name.c_str());
-
- device->init();
-
- device->binding_port(cct, port_num);
- assert(device);
- ib_physical_port = device->active_port->get_port_num();
}
Infiniband::~Infiniband()
dispatcher = d;
}
+Device* Infiniband::get_device(const char* device_name)
+{
+ return device_list->get_device(device_name);
+}
+
// 1 means no valid buffer read, 0 means got enough buffer
// else return < 0 means error
int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
void Infiniband::handle_pre_fork()
{
- device->uninit();
-}
-
-void Infiniband::handle_post_fork()
-{
- device->init();
+ device_list->uninit();
}
int Infiniband::poll_tx(int n, Device **d, ibv_wc *wc)
{
device_list->handle_async_event();
}
-
-void Infiniband::process_async_event(ibv_async_event &async_event)
-{
- dispatcher->process_async_event(async_event);
-}
bool full();
bool over();
void clear();
- void post_srq(Infiniband *ib);
public:
ibv_mr* mr;
};
private:
- uint8_t ib_physical_port;
- Device *device;
DeviceList *device_list;
RDMADispatcher *dispatcher = nullptr;
void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
public:
- explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);
+ explicit Infiniband(CephContext *c);
~Infiniband();
void set_dispatcher(RDMADispatcher *d);
};
public:
- typedef MemoryManager::Cluster Cluster;
- typedef MemoryManager::Chunk Chunk;
- uint8_t get_ib_physical_port() { return ib_physical_port; }
int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
- Device* get_device() { return device; }
static const char* wc_status_to_string(int status);
static const char* qp_state_string(int status);
void handle_pre_fork();
- void handle_post_fork();
+
+ Device* get_device(const char* device_name);
int poll_tx(int n, Device **d, ibv_wc *wc);
int poll_rx(int n, Device **d, ibv_wc *wc);
int poll_blocking(bool &done);
void rearm_notify();
void handle_async_event();
- void process_async_event(ibv_async_event &async_event);
+ RDMADispatcher *get_dispatcher() { return dispatcher; }
};
#endif
is_server(false), con_handler(new C_handle_connection(this)),
active(false)
{
- ibdev = ib->get_device();
- ibport = ib->get_ib_physical_port();
+ ibdev = ib->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
+ ibport = cct->_conf->ms_async_rdma_port_num;
+
+ assert(ibdev);
+ assert(ibport > 0);
+
+ ibdev->init(ibport);
qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
my_msg.qpn = qp->get_local_qp_number();
qpa.ah_attr.grh.hop_limit = 6;
qpa.ah_attr.grh.dgid = peer_msg.gid;
- qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx();
+ qpa.ah_attr.grh.sgid_index = ibdev->get_gid_idx();
qpa.ah_attr.dlid = peer_msg.lid;
qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
qpa.ah_attr.src_path_bits = 0;
- qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port());
+ qpa.ah_attr.port_num = (uint8_t)ibport;
ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
#include "msg/async/net_handler.h"
#include "RDMAStack.h"
+#include "Device.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
RDMAServerSocketImpl::RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
: cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a)
{
+ ibdev = infiniband->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
+ ibport = cct->_conf->ms_async_rdma_port_num;
+
+ assert(ibdev);
+ assert(ibport > 0);
+
+ ibdev->init(ibport);
}
int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
t.join();
}
-void RDMADispatcher::process_async_event(ibv_async_event &async_event)
+void RDMADispatcher::process_async_event(Device *ibdev, ibv_async_event &async_event)
{
perf_logger->inc(l_msgr_rdma_total_async_events);
// FIXME: Currently we must ensure no other factor make QP in ERROR state,
erase_qpn_lockless(qpn);
}
} else {
- ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
+ ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << *ibdev
<< " evt: " << ibv_event_type_str(async_event.event_type)
<< dendl;
}
void RDMADispatcher::handle_post_fork()
{
if (!global_infiniband) {
- global_infiniband.construct(
- cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
+ global_infiniband.construct(cct);
global_infiniband->set_dispatcher(this);
}
- global_infiniband->handle_post_fork();
-
polling_start();
}
}
if (!global_infiniband)
- global_infiniband.construct(
- cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
+ global_infiniband.construct(cct);
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
dispatcher = new RDMADispatcher(cct, this);
global_infiniband->set_dispatcher(dispatcher);
explicit RDMADispatcher(CephContext* c, RDMAStack* s);
virtual ~RDMADispatcher();
- void process_async_event(ibv_async_event &async_event);
+ void process_async_event(Device *ibdev, ibv_async_event &async_event);
void polling_start();
void polling_stop();
class RDMAServerSocketImpl : public ServerSocketImpl {
CephContext *cct;
+ Device *ibdev;
+ int ibport;
NetHandler net;
int server_setup_socket;
Infiniband* infiniband;