}
-Device::Device(CephContext *cct, ibv_device* d)
+Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d)
: cct(cct), device(d), lock("ibdev_lock"),
+ async_handler(new C_handle_cq_async(this)), infiniband(ib),
device_attr(new ibv_device_attr), active_port(nullptr)
{
if (device == NULL) {
assert(!ret);
}
+void Device::handle_async_event()
+{
+ ibv_async_event async_event;
+
+ ldout(cct, 30) << __func__ << dendl;
-DeviceList::DeviceList(CephContext *cct)
+ while (!ibv_get_async_event(ctxt, &async_event)) {
+ infiniband->process_async_event(async_event);
+
+ ibv_ack_async_event(&async_event);
+ }
+
+ if (errno != EAGAIN) {
+ lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
+ << " " << cpp_strerror(errno) << ")" << dendl;
+ }
+}
+
+
+DeviceList::DeviceList(CephContext *cct, Infiniband *ib)
: cct(cct), device_list(ibv_get_device_list(&num))
{
if (device_list == NULL || num == 0) {
struct pollfd *pfd = &poll_fds[i * 2];
struct Device *d;
- d = new Device(cct, device_list[i]);
+ d = new Device(cct, ib, device_list[i]);
devices[i] = d;
pfd[0].fd = d->tx_cc->get_fd();
for (int i = 0; i < num; i++)
devices[i]->rearm_cqs();
}
+
+void DeviceList::handle_async_event()
+{
+ for (int i = 0; i < num; i++)
+ devices[i]->handle_async_event();
+}
#include "msg/msg_types.h"
#include "msg/async/net_handler.h"
#include "common/Mutex.h"
+#include "msg/async/Event.h"
typedef Infiniband::QueuePair QueuePair;
typedef Infiniband::CompletionChannel CompletionChannel;
class Device {
+ class C_handle_cq_async : public EventCallback {
+ Device *device;
+ public:
+ C_handle_cq_async(Device *d): device(d) {}
+ void do_request(int fd) {
+ device->handle_async_event();
+ }
+ };
+
CephContext *cct;
ibv_device *device;
const char *name;
Mutex lock; // Protects from concurrent intialization of the device
bool initialized = false;
+ EventCallbackRef async_handler;
+ Infiniband *infiniband;
public:
- explicit Device(CephContext *c, ibv_device* d);
+ explicit Device(CephContext *c, Infiniband *ib, ibv_device* d);
~Device();
void init();
void uninit();
+ void handle_async_event();
+
const char* get_name() const { return name;}
uint16_t get_lid() { return active_port->get_lid(); }
ibv_gid get_gid() { return active_port->get_gid(); }
struct pollfd *poll_fds;
public:
- DeviceList(CephContext *cct);
+ DeviceList(CephContext *cct, Infiniband *ib);
~DeviceList();
Device* get_device(const char* device_name);
int poll_tx(int n, Device **d, ibv_wc *wc);
int poll_rx(int n, Device **d, ibv_wc *wc);
int poll_blocking(bool &done);
+
+ void handle_async_event();
};
#endif
Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
- : device_list(new DeviceList(cct))
+ : device_list(new DeviceList(cct, this))
{
device = device_list->get_device(device_name.c_str());
{
device_list->rearm_notify();
}
+
+void Infiniband::handle_async_event()
+{
+ device_list->handle_async_event();
+}
+
+void Infiniband::process_async_event(ibv_async_event &async_event)
+{
+ dispatcher->process_async_event(async_event);
+}
int poll_rx(int n, Device **d, ibv_wc *wc);
int poll_blocking(bool &done);
void rearm_notify();
+ void handle_async_event();
+ void process_async_event(ibv_async_event &async_event);
};
#endif
}
RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
- : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
+ : cct(c), lock("RDMADispatcher::lock"),
w_lock("RDMADispatcher::for worker pending list"), stack(s)
{
PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
t.join();
}
-void RDMADispatcher::handle_async_event()
+void RDMADispatcher::process_async_event(ibv_async_event &async_event)
{
- ldout(cct, 30) << __func__ << dendl;
- while (1) {
- ibv_async_event async_event;
- if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) {
- if (errno != EAGAIN)
- lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
- << " " << cpp_strerror(errno) << ")" << dendl;
- return;
- }
- perf_logger->inc(l_msgr_rdma_total_async_events);
- // FIXME: Currently we must ensure no other factor make QP in ERROR state,
- // otherwise this qp can't be deleted in current cleanup flow.
- if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
- perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
- uint64_t qpn = async_event.element.qp->qp_num;
- ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
- << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
- Mutex::Locker l(lock);
- RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
- if (!conn) {
- ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
- } else {
- ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
- conn->fault();
- erase_qpn_lockless(qpn);
- }
+ perf_logger->inc(l_msgr_rdma_total_async_events);
+ // FIXME: Currently we must ensure no other factor make QP in ERROR state,
+ // otherwise this qp can't be deleted in current cleanup flow.
+ if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
+ perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
+ uint64_t qpn = async_event.element.qp->qp_num;
+ ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
+ << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
+ Mutex::Locker l(lock);
+ RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
+ if (!conn) {
+ ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
} else {
- ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
- << " evt: " << ibv_event_type_str(async_event.event_type)
- << dendl;
+ ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
+ conn->fault();
+ erase_qpn_lockless(qpn);
}
- ibv_ack_async_event(&async_event);
+ } else {
+ ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
+ << " evt: " << ibv_event_type_str(async_event.event_type)
+ << dendl;
}
}
break;
if ((ceph_clock_now() - last_inactive).to_nsec() / 1000 > cct->_conf->ms_async_rdma_polling_us) {
- handle_async_event();
+ global_infiniband->handle_async_event();
if (!rearmed) {
// Clean up cq events after rearm notify ensure no new incoming event
// arrived between polling and rearm
std::thread t;
CephContext *cct;
- EventCallbackRef async_handler;
bool done = false;
std::atomic<uint64_t> num_dead_queue_pair = {0};
std::atomic<uint64_t> num_qp_conn = {0};
std::list<RDMAWorker*> pending_workers;
RDMAStack* stack;
- class C_handle_cq_async : public EventCallback {
- RDMADispatcher *dispatcher;
- public:
- C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {}
- void do_request(int fd) {
- // worker->handle_tx_event();
- dispatcher->handle_async_event();
- }
- };
-
public:
PerfCounters *perf_logger;
explicit RDMADispatcher(CephContext* c, RDMAStack* s);
virtual ~RDMADispatcher();
- void handle_async_event();
+ void process_async_event(ibv_async_event &async_event);
void polling_start();
void polling_stop();