}
-Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d)
+Device::Device(CephContext *cct, ibv_device* d)
: cct(cct), device(d), lock("ibdev_lock"),
- async_handler(new C_handle_cq_async(this)), infiniband(ib),
device_attr(new ibv_device_attr), active_port(nullptr)
{
if (device == NULL) {
assert(!ret);
}
-void Device::handle_async_event()
-{
- ibv_async_event async_event;
-
- ldout(cct, 30) << __func__ << dendl;
- while (!ibv_get_async_event(ctxt, &async_event)) {
- infiniband->process_async_event(async_event);
-
- ibv_ack_async_event(&async_event);
- }
-
- if (errno != EAGAIN) {
- lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
- << " " << cpp_strerror(errno) << ")" << dendl;
- }
-}
-
-
-DeviceList::DeviceList(CephContext *cct, Infiniband *ib)
+DeviceList::DeviceList(CephContext *cct)
: cct(cct), device_list(ibv_get_device_list(&num))
{
if (device_list == NULL || num == 0) {
struct pollfd *pfd = &poll_fds[i * 2];
struct Device *d;
- d = new Device(cct, ib, device_list[i]);
+ d = new Device(cct, device_list[i]);
devices[i] = d;
pfd[0].fd = d->tx_cc->get_fd();
for (int i = 0; i < num; i++)
devices[i]->rearm_cqs();
}
-
-void DeviceList::handle_async_event()
-{
- for (int i = 0; i < num; i++)
- devices[i]->handle_async_event();
-}
#include "msg/msg_types.h"
#include "msg/async/net_handler.h"
#include "common/Mutex.h"
-#include "msg/async/Event.h"
typedef Infiniband::QueuePair QueuePair;
typedef Infiniband::CompletionChannel CompletionChannel;
class Device {
- class C_handle_cq_async : public EventCallback {
- Device *device;
- public:
- C_handle_cq_async(Device *d): device(d) {}
- void do_request(int fd) {
- device->handle_async_event();
- }
- };
-
CephContext *cct;
ibv_device *device;
const char *name;
Mutex lock; // Protects from concurrent intialization of the device
bool initialized = false;
- EventCallbackRef async_handler;
- Infiniband *infiniband;
public:
- explicit Device(CephContext *c, Infiniband *ib, ibv_device* d);
+ explicit Device(CephContext *c, ibv_device* d);
~Device();
void init();
void uninit();
- void handle_async_event();
-
const char* get_name() const { return name;}
uint16_t get_lid() { return active_port->get_lid(); }
ibv_gid get_gid() { return active_port->get_gid(); }
struct pollfd *poll_fds;
public:
- DeviceList(CephContext *cct, Infiniband *ib);
+ DeviceList(CephContext *cct);
~DeviceList();
Device* get_device(const char* device_name);
int poll_tx(int n, Device **d, ibv_wc *wc);
int poll_rx(int n, Device **d, ibv_wc *wc);
int poll_blocking(bool &done);
-
- void handle_async_event();
};
#endif
Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
- : device_list(new DeviceList(cct, this))
+ : device_list(new DeviceList(cct))
{
device = device_list->get_device(device_name.c_str());
{
device_list->rearm_notify();
}
-
-void Infiniband::handle_async_event()
-{
- device_list->handle_async_event();
-}
-
-void Infiniband::process_async_event(ibv_async_event &async_event)
-{
- dispatcher->process_async_event(async_event);
-}
int poll_rx(int n, Device **d, ibv_wc *wc);
int poll_blocking(bool &done);
void rearm_notify();
- void handle_async_event();
- void process_async_event(ibv_async_event &async_event);
};
#endif
}
RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
- : cct(c), lock("RDMADispatcher::lock"),
+ : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
w_lock("RDMADispatcher::for worker pending list"), stack(s)
{
PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
t.join();
}
-void RDMADispatcher::process_async_event(ibv_async_event &async_event)
+void RDMADispatcher::handle_async_event()
{
- perf_logger->inc(l_msgr_rdma_total_async_events);
- // FIXME: Currently we must ensure no other factor make QP in ERROR state,
- // otherwise this qp can't be deleted in current cleanup flow.
- if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
- perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
- uint64_t qpn = async_event.element.qp->qp_num;
- ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
- << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
- Mutex::Locker l(lock);
- RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
- if (!conn) {
- ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
+ ldout(cct, 30) << __func__ << dendl;
+ while (1) {
+ ibv_async_event async_event;
+ if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) {
+ if (errno != EAGAIN)
+ lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
+ << " " << cpp_strerror(errno) << ")" << dendl;
+ return;
+ }
+ perf_logger->inc(l_msgr_rdma_total_async_events);
+ // FIXME: Currently we must ensure no other factor make QP in ERROR state,
+ // otherwise this qp can't be deleted in current cleanup flow.
+ if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
+ perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
+ uint64_t qpn = async_event.element.qp->qp_num;
+ ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
+ << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
+ Mutex::Locker l(lock);
+ RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
+ if (!conn) {
+ ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
+ } else {
+ ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
+ conn->fault();
+ erase_qpn_lockless(qpn);
+ }
} else {
- ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
- conn->fault();
- erase_qpn_lockless(qpn);
+ ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
+ << " evt: " << ibv_event_type_str(async_event.event_type)
+ << dendl;
}
- } else {
- ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
- << " evt: " << ibv_event_type_str(async_event.event_type)
- << dendl;
+ ibv_ack_async_event(&async_event);
}
}
break;
if ((ceph_clock_now() - last_inactive).to_nsec() / 1000 > cct->_conf->ms_async_rdma_polling_us) {
- global_infiniband->handle_async_event();
+ handle_async_event();
if (!rearmed) {
// Clean up cq events after rearm notify ensure no new incoming event
// arrived between polling and rearm
std::thread t;
CephContext *cct;
+ EventCallbackRef async_handler;
bool done = false;
std::atomic<uint64_t> num_dead_queue_pair = {0};
std::atomic<uint64_t> num_qp_conn = {0};
std::list<RDMAWorker*> pending_workers;
RDMAStack* stack;
+ class C_handle_cq_async : public EventCallback {
+ RDMADispatcher *dispatcher;
+ public:
+ C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {}
+ void do_request(int fd) {
+ // worker->handle_tx_event();
+ dispatcher->handle_async_event();
+ }
+ };
+
public:
PerfCounters *perf_logger;
explicit RDMADispatcher(CephContext* c, RDMAStack* s);
virtual ~RDMADispatcher();
- void process_async_event(ibv_async_event &async_event);
+ void handle_async_event();
void polling_start();
void polling_stop();