From: Amir Vadai Date: Wed, 8 Feb 2017 11:43:13 +0000 (+0200) Subject: msg/async/rdma: Move async event handling to Device X-Git-Tag: v12.0.2~300^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4b10ba7578b5d378edf715711bbf222dfe41b91d;p=ceph-ci.git msg/async/rdma: Move async event handling to Device issue: none Change-Id: I5e1ee73bb2d0d751774eaeb7dd2950c704894caf Signed-off-by: Amir Vadai --- diff --git a/src/msg/async/rdma/Device.cc b/src/msg/async/rdma/Device.cc index c629f2c9edf..58700db2c21 100644 --- a/src/msg/async/rdma/Device.cc +++ b/src/msg/async/rdma/Device.cc @@ -108,8 +108,9 @@ Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt } -Device::Device(CephContext *cct, ibv_device* d) +Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d) : cct(cct), device(d), lock("ibdev_lock"), + async_handler(new C_handle_cq_async(this)), infiniband(ib), device_attr(new ibv_device_attr), active_port(nullptr) { if (device == NULL) { @@ -356,8 +357,26 @@ void Device::rearm_cqs() assert(!ret); } +void Device::handle_async_event() +{ + ibv_async_event async_event; + + ldout(cct, 30) << __func__ << dendl; -DeviceList::DeviceList(CephContext *cct) + while (!ibv_get_async_event(ctxt, &async_event)) { + infiniband->process_async_event(async_event); + + ibv_ack_async_event(&async_event); + } + + if (errno != EAGAIN) { + lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno + << " " << cpp_strerror(errno) << ")" << dendl; + } +} + + +DeviceList::DeviceList(CephContext *cct, Infiniband *ib) : cct(cct), device_list(ibv_get_device_list(&num)) { if (device_list == NULL || num == 0) { @@ -372,7 +391,7 @@ DeviceList::DeviceList(CephContext *cct) struct pollfd *pfd = &poll_fds[i * 2]; struct Device *d; - d = new Device(cct, device_list[i]); + d = new Device(cct, ib, device_list[i]); devices[i] = d; pfd[0].fd = d->tx_cc->get_fd(); @@ -470,3 +489,9 @@ void DeviceList::rearm_notify() for (int i = 0; i < num; i++) devices[i]->rearm_cqs(); } + +void DeviceList::handle_async_event() +{ + for (int i = 0; i < num; i++) + devices[i]->handle_async_event(); +} diff --git a/src/msg/async/rdma/Device.h b/src/msg/async/rdma/Device.h index a19640c2baf..2fb46dc13b4 100644 --- a/src/msg/async/rdma/Device.h +++ b/src/msg/async/rdma/Device.h @@ -29,6 +29,7 @@ #include "msg/msg_types.h" #include "msg/async/net_handler.h" #include "common/Mutex.h" +#include "msg/async/Event.h" typedef Infiniband::QueuePair QueuePair; typedef Infiniband::CompletionChannel CompletionChannel; @@ -57,6 +58,15 @@ class Port { class Device { + class C_handle_cq_async : public EventCallback { + Device *device; + public: + C_handle_cq_async(Device *d): device(d) {} + void do_request(int fd) { + device->handle_async_event(); + } + }; + CephContext *cct; ibv_device *device; const char *name; @@ -68,14 +78,18 @@ class Device { Mutex lock; // Protects from concurrent intialization of the device bool initialized = false; + EventCallbackRef async_handler; + Infiniband *infiniband; public: - explicit Device(CephContext *c, ibv_device* d); + explicit Device(CephContext *c, Infiniband *ib, ibv_device* d); ~Device(); void init(); void uninit(); + void handle_async_event(); + const char* get_name() const { return name;} uint16_t get_lid() { return active_port->get_lid(); } ibv_gid get_gid() { return active_port->get_gid(); } @@ -127,7 +141,7 @@ class DeviceList { struct pollfd *poll_fds; public: - DeviceList(CephContext *cct); + DeviceList(CephContext *cct, Infiniband *ib); ~DeviceList(); Device* get_device(const char* device_name); @@ -136,6 +150,8 @@ class DeviceList { int poll_tx(int n, Device **d, ibv_wc *wc); int poll_rx(int n, Device **d, ibv_wc *wc); int poll_blocking(bool &done); + + void handle_async_event(); }; #endif diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index 73f36ae9587..9fe77b67422 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -594,7 +594,7 @@ int Infiniband::MemoryManager::get_channel_buffers(std::vector &chunks, Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num) - : device_list(new DeviceList(cct)) + : device_list(new DeviceList(cct, this)) { device = device_list->get_device(device_name.c_str()); @@ -798,3 +798,13 @@ void Infiniband::rearm_notify() { device_list->rearm_notify(); } + +void Infiniband::handle_async_event() +{ + device_list->handle_async_event(); +} + +void Infiniband::process_async_event(ibv_async_event &async_event) +{ + dispatcher->process_async_event(async_event); +} diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index 68b03cebce3..10184823d98 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -287,6 +287,8 @@ class Infiniband { int poll_rx(int n, Device **d, ibv_wc *wc); int poll_blocking(bool &done); void rearm_notify(); + void handle_async_event(); + void process_async_event(ibv_async_event &async_event); }; #endif diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index be09a75717b..b50b5f62f0d 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -44,7 +44,7 @@ RDMADispatcher::~RDMADispatcher() } RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s) - : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"), + : cct(c), lock("RDMADispatcher::lock"), w_lock("RDMADispatcher::for worker pending list"), stack(s) { PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last); @@ -90,40 +90,29 @@ void RDMADispatcher::polling_stop() t.join(); } -void RDMADispatcher::handle_async_event() +void RDMADispatcher::process_async_event(ibv_async_event &async_event) { - ldout(cct, 30) << __func__ << dendl; - while (1) { - ibv_async_event async_event; - if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) { - if (errno != EAGAIN) - lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno - << " " << cpp_strerror(errno) << ")" << dendl; - return; - } - perf_logger->inc(l_msgr_rdma_total_async_events); - // FIXME: Currently we must ensure no other factor make QP in ERROR state, - // otherwise this qp can't be deleted in current cleanup flow. - if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) { - perf_logger->inc(l_msgr_rdma_async_last_wqe_events); - uint64_t qpn = async_event.element.qp->qp_num; - ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp - << " evt: " << ibv_event_type_str(async_event.event_type) << dendl; - Mutex::Locker l(lock); - RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn); - if (!conn) { - ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl; - } else { - ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl; - conn->fault(); - erase_qpn_lockless(qpn); - } + perf_logger->inc(l_msgr_rdma_total_async_events); + // FIXME: Currently we must ensure no other factor make QP in ERROR state, + // otherwise this qp can't be deleted in current cleanup flow. + if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) { + perf_logger->inc(l_msgr_rdma_async_last_wqe_events); + uint64_t qpn = async_event.element.qp->qp_num; + ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp + << " evt: " << ibv_event_type_str(async_event.event_type) << dendl; + Mutex::Locker l(lock); + RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn); + if (!conn) { + ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl; } else { - ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt - << " evt: " << ibv_event_type_str(async_event.event_type) - << dendl; + ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl; + conn->fault(); + erase_qpn_lockless(qpn); } - ibv_ack_async_event(&async_event); + } else { + ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt + << " evt: " << ibv_event_type_str(async_event.event_type) + << dendl; } } @@ -217,7 +206,7 @@ void RDMADispatcher::polling() break; if ((ceph_clock_now() - last_inactive).to_nsec() / 1000 > cct->_conf->ms_async_rdma_polling_us) { - handle_async_event(); + global_infiniband->handle_async_event(); if (!rearmed) { // Clean up cq events after rearm notify ensure no new incoming event // arrived between polling and rearm diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index da279f04866..2a775e4ac87 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -67,7 +67,6 @@ class RDMADispatcher : public CephContext::ForkWatcher { std::thread t; CephContext *cct; - EventCallbackRef async_handler; bool done = false; std::atomic num_dead_queue_pair = {0}; std::atomic num_qp_conn = {0}; @@ -100,23 +99,13 @@ class RDMADispatcher : public CephContext::ForkWatcher { std::list pending_workers; RDMAStack* stack; - class C_handle_cq_async : public EventCallback { - RDMADispatcher *dispatcher; - public: - C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {} - void do_request(int fd) { - // worker->handle_tx_event(); - dispatcher->handle_async_event(); - } - }; - public: PerfCounters *perf_logger; explicit RDMADispatcher(CephContext* c, RDMAStack* s); virtual ~RDMADispatcher(); - void handle_async_event(); + void process_async_event(ibv_async_event &async_event); void polling_start(); void polling_stop();